databricks-cli/bundle/libraries/libraries.go

178 lines
3.8 KiB
Go
Raw Normal View History

package libraries
import (
"context"
"fmt"
"net/url"
"path/filepath"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
type match struct {
}
func MatchWithArtifacts() bundle.Mutator {
return &match{}
}
func (a *match) Name() string {
return "libraries.MatchWithArtifacts"
}
func (a *match) Apply(ctx context.Context, b *bundle.Bundle) error {
tasks := findAllTasks(b)
for _, task := range tasks {
if isMissingRequiredLibraries(task) {
return fmt.Errorf("task '%s' is missing required libraries. Please include your package code in task libraries block", task.TaskKey)
}
for j := range task.Libraries {
lib := &task.Libraries[j]
err := findArtifactsAndMarkForUpload(ctx, lib, b)
if err != nil {
return err
}
}
}
return nil
}
func findAllTasks(b *bundle.Bundle) []*jobs.Task {
r := b.Config.Resources
result := make([]*jobs.Task, 0)
for k := range b.Config.Resources.Jobs {
tasks := r.Jobs[k].JobSettings.Tasks
for i := range tasks {
task := &tasks[i]
result = append(result, task)
}
}
return result
}
func FindAllWheelTasks(b *bundle.Bundle) []*jobs.Task {
tasks := findAllTasks(b)
wheelTasks := make([]*jobs.Task, 0)
for _, task := range tasks {
if task.PythonWheelTask != nil {
wheelTasks = append(wheelTasks, task)
}
}
return wheelTasks
}
func isMissingRequiredLibraries(task *jobs.Task) bool {
if task.Libraries != nil {
return false
}
return task.PythonWheelTask != nil || task.SparkJarTask != nil
}
func findLibraryMatches(lib *compute.Library, b *bundle.Bundle) ([]string, error) {
path := libPath(lib)
if path == "" {
return nil, nil
}
fullPath := filepath.Join(b.Config.Path, path)
return filepath.Glob(fullPath)
}
func findArtifactsAndMarkForUpload(ctx context.Context, lib *compute.Library, b *bundle.Bundle) error {
matches, err := findLibraryMatches(lib, b)
if err != nil {
return err
}
if len(matches) == 0 && isLocalLibrary(lib) {
return fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libPath(lib))
}
for _, match := range matches {
af, err := findArtifactFileByLocalPath(match, b)
if err != nil {
cmdio.LogString(ctx, fmt.Sprintf("%s. Skipping uploading. In order to use the define 'artifacts' section", err.Error()))
} else {
af.Libraries = append(af.Libraries, lib)
}
}
return nil
}
func findArtifactFileByLocalPath(path string, b *bundle.Bundle) (*config.ArtifactFile, error) {
for _, a := range b.Config.Artifacts {
for k := range a.Files {
if a.Files[k].Source == path {
return &a.Files[k], nil
}
}
}
return nil, fmt.Errorf("artifact section is not defined for file at %s", path)
}
func libPath(library *compute.Library) string {
if library.Whl != "" {
return library.Whl
}
if library.Jar != "" {
return library.Jar
}
if library.Egg != "" {
return library.Egg
}
return ""
}
func isLocalLibrary(library *compute.Library) bool {
path := libPath(library)
if path == "" {
return false
}
if isExplicitFileScheme(path) {
return true
}
if isRemoteStorageScheme(path) {
return false
}
return !isWorkspacePath(path)
}
func isExplicitFileScheme(path string) bool {
return strings.HasPrefix(path, "file://")
}
func isRemoteStorageScheme(path string) bool {
url, err := url.Parse(path)
if err != nil {
return false
}
if url.Scheme == "" {
return false
}
// If the path starts with scheme:// format, it's a correct remote storage scheme
return strings.HasPrefix(path, url.Scheme+"://")
}
func isWorkspacePath(path string) bool {
return strings.HasPrefix(path, "/Workspace/") ||
strings.HasPrefix(path, "/Users/") ||
strings.HasPrefix(path, "/Shared/")
}