diff --git a/bundle/artifacts/artifacts.go b/bundle/artifacts/artifacts.go index b7a22d09..101b598d 100644 --- a/bundle/artifacts/artifacts.go +++ b/bundle/artifacts/artifacts.go @@ -12,7 +12,6 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/artifacts/whl" "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/filer" @@ -117,8 +116,6 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost } func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error { - filesToLibraries := libraries.MapFilesToTaskLibraries(ctx, b) - for i := range a.Files { f := &a.Files[i] @@ -133,24 +130,32 @@ func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, u log.Infof(ctx, "Upload succeeded") f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source)) - // Lookup all tasks that reference this file. - libs, ok := filesToLibraries[f.Source] - if !ok { - log.Debugf(ctx, "No tasks reference %s", f.Source) - continue - } + // TODO: confirm if we still need to update the remote path to start with /Workspace + wsfsBase := "/Workspace" + remotePath := path.Join(wsfsBase, f.RemotePath) - // Update all tasks that reference this file. - for _, lib := range libs { - wsfsBase := "/Workspace" - remotePath := path.Join(wsfsBase, f.RemotePath) - if lib.Whl != "" { - lib.Whl = remotePath - continue + for _, job := range b.Config.Resources.Jobs { + for i := range job.Tasks { + task := &job.Tasks[i] + for j := range task.Libraries { + lib := &task.Libraries[j] + if lib.Whl != "" && isArtifactMatchLibrary(f, lib.Whl, b) { + lib.Whl = remotePath + } + if lib.Jar != "" && isArtifactMatchLibrary(f, lib.Jar, b) { + lib.Jar = remotePath + } + } } - if lib.Jar != "" { - lib.Jar = remotePath - continue + + for i := range job.Environments { + env := &job.Environments[i] + for j := range env.Spec.Dependencies { + lib := env.Spec.Dependencies[j] + if isArtifactMatchLibrary(f, lib, b) { + env.Spec.Dependencies[j] = remotePath + } + } } } } @@ -158,6 +163,26 @@ func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, u return nil } +func isArtifactMatchLibrary(f *config.ArtifactFile, libPath string, b *bundle.Bundle) bool { + if !filepath.IsAbs(libPath) { + libPath = filepath.Join(b.RootPath, libPath) + } + + // libPath can be a glob pattern, so do the match first + matches, err := filepath.Glob(libPath) + if err != nil { + return false + } + + for _, m := range matches { + if m == f.Source { + return true + } + } + + return false +} + // Function to upload artifact file to Workspace func uploadArtifactFile(ctx context.Context, file string, client filer.Filer) error { raw, err := os.ReadFile(file) diff --git a/bundle/artifacts/artifacts_test.go b/bundle/artifacts/artifacts_test.go new file mode 100644 index 00000000..ca0e578b --- /dev/null +++ b/bundle/artifacts/artifacts_test.go @@ -0,0 +1,91 @@ +package artifacts + +import ( + "context" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + mockfiler "github.com/databricks/cli/internal/mocks/libs/filer" + "github.com/databricks/cli/internal/testutil" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestArtifactUpload(t *testing.T) { + tmpDir := t.TempDir() + whlFolder := filepath.Join(tmpDir, "whl") + testutil.Touch(t, whlFolder, "source.whl") + whlLocalPath := filepath.Join(whlFolder, "source.whl") + + b := &bundle.Bundle{ + RootPath: tmpDir, + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/foo/bar/artifacts", + }, + Artifacts: config.Artifacts{ + "whl": { + Type: config.ArtifactPythonWheel, + Files: []config.ArtifactFile{ + {Source: whlLocalPath}, + }, + }, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + Libraries: []compute.Library{ + { + Whl: filepath.Join("whl", "*.whl"), + }, + { + Whl: "/Workspace/Users/foo@bar.com/mywheel.whl", + }, + }, + }, + }, + Environments: []jobs.JobEnvironment{ + { + Spec: &compute.Environment{ + Dependencies: []string{ + filepath.Join("whl", "source.whl"), + "/Workspace/Users/foo@bar.com/mywheel.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + artifact := b.Config.Artifacts["whl"] + mockFiler := mockfiler.NewMockFiler(t) + mockFiler.EXPECT().Write( + mock.Anything, + filepath.Join("source.whl"), + mock.AnythingOfType("*bytes.Reader"), + filer.OverwriteIfExists, + filer.CreateParentDirectories, + ).Return(nil) + + err := uploadArtifact(context.Background(), b, artifact, "/foo/bar/artifacts", mockFiler) + require.NoError(t, err) + + // Test that libraries path is updated + require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl) + require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl) + require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0]) + require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1]) +} diff --git a/bundle/artifacts/whl/from_libraries.go b/bundle/artifacts/whl/from_libraries.go index 84ef712a..ad321557 100644 --- a/bundle/artifacts/whl/from_libraries.go +++ b/bundle/artifacts/whl/from_libraries.go @@ -30,24 +30,18 @@ func (*fromLibraries) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost tasks := libraries.FindAllWheelTasksWithLocalLibraries(b) for _, task := range tasks { for _, lib := range task.Libraries { - matches, err := filepath.Glob(filepath.Join(b.RootPath, lib.Whl)) - // File referenced from libraries section does not exists, skipping - if err != nil { - continue - } + matchAndAdd(ctx, lib.Whl, b) + } + } - for _, match := range matches { - name := filepath.Base(match) - if b.Config.Artifacts == nil { - b.Config.Artifacts = make(map[string]*config.Artifact) - } - - log.Debugf(ctx, "Adding an artifact block for %s", match) - b.Config.Artifacts[name] = &config.Artifact{ - Files: []config.ArtifactFile{ - {Source: match}, - }, - Type: config.ArtifactPythonWheel, + envs := libraries.FindAllEnvironments(b) + for _, jobEnvs := range envs { + for _, env := range jobEnvs { + if env.Spec != nil { + for _, dep := range env.Spec.Dependencies { + if libraries.IsEnvironmentDependencyLocal(dep) { + matchAndAdd(ctx, dep, b) + } } } } @@ -55,3 +49,26 @@ func (*fromLibraries) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost return nil } + +func matchAndAdd(ctx context.Context, lib string, b *bundle.Bundle) { + matches, err := filepath.Glob(filepath.Join(b.RootPath, lib)) + // File referenced from libraries section does not exists, skipping + if err != nil { + return + } + + for _, match := range matches { + name := filepath.Base(match) + if b.Config.Artifacts == nil { + b.Config.Artifacts = make(map[string]*config.Artifact) + } + + log.Debugf(ctx, "Adding an artifact block for %s", match) + b.Config.Artifacts[name] = &config.Artifact{ + Files: []config.ArtifactFile{ + {Source: match}, + }, + Type: config.ArtifactPythonWheel, + } + } +} diff --git a/bundle/config/mutator/translate_paths.go b/bundle/config/mutator/translate_paths.go index 8fab3abb..018fd79c 100644 --- a/bundle/config/mutator/translate_paths.go +++ b/bundle/config/mutator/translate_paths.go @@ -152,6 +152,13 @@ func translateNoOp(literal, localFullPath, localRelPath, remotePath string) (str return localRelPath, nil } +func translateNoOpWithPrefix(literal, localFullPath, localRelPath, remotePath string) (string, error) { + if !strings.HasPrefix(localRelPath, ".") { + localRelPath = "." + string(filepath.Separator) + localRelPath + } + return localRelPath, nil +} + func (m *translatePaths) rewriteValue(b *bundle.Bundle, p dyn.Path, v dyn.Value, fn rewriteFunc, dir string) (dyn.Value, error) { out := v.MustString() err := m.rewritePath(dir, b, &out, fn) diff --git a/bundle/config/mutator/translate_paths_jobs.go b/bundle/config/mutator/translate_paths_jobs.go index e761bda0..d4166072 100644 --- a/bundle/config/mutator/translate_paths_jobs.go +++ b/bundle/config/mutator/translate_paths_jobs.go @@ -5,39 +5,51 @@ import ( "slices" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/libs/dyn" ) -type jobTaskRewritePattern struct { - pattern dyn.Pattern - fn rewriteFunc +type jobRewritePattern struct { + pattern dyn.Pattern + fn rewriteFunc + skipRewrite func(string) bool } -func rewritePatterns(base dyn.Pattern) []jobTaskRewritePattern { - return []jobTaskRewritePattern{ +func noSkipRewrite(string) bool { + return false +} + +func rewritePatterns(base dyn.Pattern) []jobRewritePattern { + return []jobRewritePattern{ { base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")), translateNotebookPath, + noSkipRewrite, }, { base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")), translateFilePath, + noSkipRewrite, }, { base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")), translateDirectoryPath, + noSkipRewrite, }, { base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")), translateFilePath, + noSkipRewrite, }, { base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")), translateNoOp, + noSkipRewrite, }, { base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")), translateNoOp, + noSkipRewrite, }, } } @@ -73,9 +85,28 @@ func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dy ) // Compile list of patterns and their respective rewrite functions. + jobEnvironmentsPatterns := []jobRewritePattern{ + { + dyn.NewPattern( + dyn.Key("resources"), + dyn.Key("jobs"), + dyn.AnyKey(), + dyn.Key("environments"), + dyn.AnyIndex(), + dyn.Key("spec"), + dyn.Key("dependencies"), + dyn.AnyIndex(), + ), + translateNoOpWithPrefix, + func(s string) bool { + return !libraries.IsEnvironmentDependencyLocal(s) + }, + }, + } taskPatterns := rewritePatterns(base) forEachPatterns := rewritePatterns(base.Append(dyn.Key("for_each_task"), dyn.Key("task"))) - allPatterns := append(taskPatterns, forEachPatterns...) + allPatterns := append(taskPatterns, jobEnvironmentsPatterns...) + allPatterns = append(allPatterns, forEachPatterns...) for _, t := range allPatterns { v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { @@ -91,6 +122,10 @@ func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dy return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err) } + sv := v.MustString() + if t.skipRewrite(sv) { + return v, nil + } return m.rewriteRelativeTo(b, p, v, t.fn, dir, fallback[key]) }) if err != nil { diff --git a/bundle/config/mutator/translate_paths_test.go b/bundle/config/mutator/translate_paths_test.go index 9650ae8b..29afb997 100644 --- a/bundle/config/mutator/translate_paths_test.go +++ b/bundle/config/mutator/translate_paths_test.go @@ -4,6 +4,7 @@ import ( "context" "os" "path/filepath" + "strings" "testing" "github.com/databricks/cli/bundle" @@ -651,3 +652,45 @@ func TestPipelineFileLibraryWithNotebookSourceError(t *testing.T) { diags := bundle.Apply(context.Background(), b, mutator.TranslatePaths()) assert.ErrorContains(t, diags.Error(), `expected a file for "resources.pipelines.pipeline.libraries[0].file.path" but got a notebook`) } + +func TestTranslatePathJobEnvironments(t *testing.T) { + dir := t.TempDir() + touchEmptyFile(t, filepath.Join(dir, "env1.py")) + touchEmptyFile(t, filepath.Join(dir, "env2.py")) + + b := &bundle.Bundle{ + RootPath: dir, + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Environments: []jobs.JobEnvironment{ + { + Spec: &compute.Environment{ + Dependencies: []string{ + "./dist/env1.whl", + "../dist/env2.whl", + "simplejson", + "/Workspace/Users/foo@bar.com/test.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs", filepath.Join(dir, "job/resource.yml")) + + diags := bundle.Apply(context.Background(), b, mutator.TranslatePaths()) + require.NoError(t, diags.Error()) + + assert.Equal(t, strings.Join([]string{".", "job", "dist", "env1.whl"}, string(os.PathSeparator)), b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0]) + assert.Equal(t, strings.Join([]string{".", "dist", "env2.whl"}, string(os.PathSeparator)), b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1]) + assert.Equal(t, "simplejson", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[2]) + assert.Equal(t, "/Workspace/Users/foo@bar.com/test.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[3]) +} diff --git a/bundle/deploy/terraform/tfdyn/convert_job.go b/bundle/deploy/terraform/tfdyn/convert_job.go index 65ac8b9b..d1e7e73e 100644 --- a/bundle/deploy/terraform/tfdyn/convert_job.go +++ b/bundle/deploy/terraform/tfdyn/convert_job.go @@ -24,6 +24,7 @@ func convertJobResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { "tasks": "task", "job_clusters": "job_cluster", "parameters": "parameter", + "environments": "environment", }) if err != nil { return dyn.InvalidValue, err diff --git a/bundle/libraries/libraries.go b/bundle/libraries/libraries.go index 8dd63a75..a79adedb 100644 --- a/bundle/libraries/libraries.go +++ b/bundle/libraries/libraries.go @@ -1,45 +1,71 @@ package libraries import ( - "context" - "fmt" - "path/filepath" - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/libs/cmdio" - "github.com/databricks/cli/libs/log" - "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" ) -func findAllTasks(b *bundle.Bundle) []*jobs.Task { +func findAllTasks(b *bundle.Bundle) map[string]([]jobs.Task) { r := b.Config.Resources - result := make([]*jobs.Task, 0) + result := make(map[string]([]jobs.Task), 0) for k := range b.Config.Resources.Jobs { - tasks := r.Jobs[k].JobSettings.Tasks - for i := range tasks { - task := &tasks[i] - result = append(result, task) - } + result[k] = append(result[k], r.Jobs[k].JobSettings.Tasks...) } return result } +func FindAllEnvironments(b *bundle.Bundle) map[string]([]jobs.JobEnvironment) { + jobEnvs := make(map[string]([]jobs.JobEnvironment), 0) + for jobKey, job := range b.Config.Resources.Jobs { + if len(job.Environments) == 0 { + continue + } + + jobEnvs[jobKey] = job.Environments + } + + return jobEnvs +} + +func isEnvsWithLocalLibraries(envs []jobs.JobEnvironment) bool { + for _, e := range envs { + for _, l := range e.Spec.Dependencies { + if IsEnvironmentDependencyLocal(l) { + return true + } + } + } + + return false +} + func FindAllWheelTasksWithLocalLibraries(b *bundle.Bundle) []*jobs.Task { tasks := findAllTasks(b) + envs := FindAllEnvironments(b) + wheelTasks := make([]*jobs.Task, 0) - for _, task := range tasks { - if task.PythonWheelTask != nil && IsTaskWithLocalLibraries(task) { - wheelTasks = append(wheelTasks, task) + for k, jobTasks := range tasks { + for i := range jobTasks { + task := &jobTasks[i] + if task.PythonWheelTask == nil { + continue + } + + if isTaskWithLocalLibraries(*task) { + wheelTasks = append(wheelTasks, task) + } + + if envs[k] != nil && isEnvsWithLocalLibraries(envs[k]) { + wheelTasks = append(wheelTasks, task) + } } } return wheelTasks } -func IsTaskWithLocalLibraries(task *jobs.Task) bool { +func isTaskWithLocalLibraries(task jobs.Task) bool { for _, l := range task.Libraries { if IsLocalLibrary(&l) { return true @@ -49,7 +75,7 @@ func IsTaskWithLocalLibraries(task *jobs.Task) bool { return false } -func IsTaskWithWorkspaceLibraries(task *jobs.Task) bool { +func IsTaskWithWorkspaceLibraries(task jobs.Task) bool { for _, l := range task.Libraries { if IsWorkspaceLibrary(&l) { return true @@ -58,73 +84,3 @@ func IsTaskWithWorkspaceLibraries(task *jobs.Task) bool { return false } - -func findLibraryMatches(lib *compute.Library, b *bundle.Bundle) ([]string, error) { - path := libraryPath(lib) - if path == "" { - return nil, nil - } - - fullPath := filepath.Join(b.RootPath, path) - return filepath.Glob(fullPath) -} - -func findArtifactFiles(ctx context.Context, lib *compute.Library, b *bundle.Bundle) ([]*config.ArtifactFile, error) { - matches, err := findLibraryMatches(lib, b) - if err != nil { - return nil, err - } - - if len(matches) == 0 && IsLocalLibrary(lib) { - return nil, fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libraryPath(lib)) - } - - var out []*config.ArtifactFile - for _, match := range matches { - af, err := findArtifactFileByLocalPath(match, b) - if err != nil { - cmdio.LogString(ctx, fmt.Sprintf("%s. Skipping uploading. In order to use the define 'artifacts' section", err.Error())) - } else { - out = append(out, af) - } - } - - return out, nil -} - -func findArtifactFileByLocalPath(path string, b *bundle.Bundle) (*config.ArtifactFile, error) { - for _, a := range b.Config.Artifacts { - for k := range a.Files { - if a.Files[k].Source == path { - return &a.Files[k], nil - } - } - } - - return nil, fmt.Errorf("artifact section is not defined for file at %s", path) -} - -func MapFilesToTaskLibraries(ctx context.Context, b *bundle.Bundle) map[string][]*compute.Library { - tasks := findAllTasks(b) - out := make(map[string][]*compute.Library) - for _, task := range tasks { - for j := range task.Libraries { - lib := &task.Libraries[j] - if !IsLocalLibrary(lib) { - continue - } - - matches, err := findLibraryMatches(lib, b) - if err != nil { - log.Warnf(ctx, "Error matching library to files: %s", err.Error()) - continue - } - - for _, match := range matches { - out[match] = append(out[match], lib) - } - } - } - - return out -} diff --git a/bundle/libraries/libraries_test.go b/bundle/libraries/libraries_test.go deleted file mode 100644 index 3da10d47..00000000 --- a/bundle/libraries/libraries_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package libraries - -import ( - "context" - "path/filepath" - "testing" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/config/resources" - "github.com/databricks/databricks-sdk-go/service/compute" - "github.com/databricks/databricks-sdk-go/service/jobs" - "github.com/stretchr/testify/assert" -) - -func TestMapFilesToTaskLibrariesNoGlob(t *testing.T) { - b := &bundle.Bundle{ - RootPath: "testdata", - Config: config.Root{ - Resources: config.Resources{ - Jobs: map[string]*resources.Job{ - "job1": { - JobSettings: &jobs.JobSettings{ - Tasks: []jobs.Task{ - { - Libraries: []compute.Library{ - { - Whl: "library1", - }, - { - Whl: "library2", - }, - { - Whl: "/absolute/path/in/workspace/library3", - }, - }, - }, - { - Libraries: []compute.Library{ - { - Whl: "library1", - }, - { - Whl: "library2", - }, - }, - }, - }, - }, - }, - "job2": { - JobSettings: &jobs.JobSettings{ - Tasks: []jobs.Task{ - { - Libraries: []compute.Library{ - { - Whl: "library1", - }, - { - Whl: "library2", - }, - }, - }, - }, - }, - }, - }, - }, - }, - } - - out := MapFilesToTaskLibraries(context.Background(), b) - assert.Len(t, out, 2) - - // Pointer equality for "library1" - assert.Equal(t, []*compute.Library{ - &b.Config.Resources.Jobs["job1"].JobSettings.Tasks[0].Libraries[0], - &b.Config.Resources.Jobs["job1"].JobSettings.Tasks[1].Libraries[0], - &b.Config.Resources.Jobs["job2"].JobSettings.Tasks[0].Libraries[0], - }, out[filepath.Clean("testdata/library1")]) - - // Pointer equality for "library2" - assert.Equal(t, []*compute.Library{ - &b.Config.Resources.Jobs["job1"].JobSettings.Tasks[0].Libraries[1], - &b.Config.Resources.Jobs["job1"].JobSettings.Tasks[1].Libraries[1], - &b.Config.Resources.Jobs["job2"].JobSettings.Tasks[0].Libraries[1], - }, out[filepath.Clean("testdata/library2")]) -} diff --git a/bundle/libraries/local_path.go b/bundle/libraries/local_path.go index a5c0cc96..f1e3788f 100644 --- a/bundle/libraries/local_path.go +++ b/bundle/libraries/local_path.go @@ -38,6 +38,25 @@ func IsLocalPath(p string) bool { return !path.IsAbs(p) } +// IsEnvironmentDependencyLocal returns true if the specified dependency +// should be interpreted as a local path. +// We use this to check if the dependency in environment spec is local. +// We can't use IsLocalPath beacuse environment dependencies can be +// a pypi package name which can be misinterpreted as a local path by IsLocalPath. +func IsEnvironmentDependencyLocal(dep string) bool { + possiblePrefixes := []string{ + ".", + } + + for _, prefix := range possiblePrefixes { + if strings.HasPrefix(dep, prefix) { + return true + } + } + + return false +} + func isRemoteStorageScheme(path string) bool { url, err := url.Parse(path) if err != nil { diff --git a/bundle/libraries/local_path_test.go b/bundle/libraries/local_path_test.go index 640afa85..d2492d6b 100644 --- a/bundle/libraries/local_path_test.go +++ b/bundle/libraries/local_path_test.go @@ -5,6 +5,7 @@ import ( "github.com/databricks/databricks-sdk-go/service/compute" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestIsLocalPath(t *testing.T) { @@ -41,3 +42,31 @@ func TestIsLocalLibrary(t *testing.T) { // Empty. assert.False(t, IsLocalLibrary(&compute.Library{})) } + +func TestIsEnvironmentDependencyLocal(t *testing.T) { + testCases := [](struct { + path string + expected bool + }){ + {path: "./local/*.whl", expected: true}, + {path: ".\\local\\*.whl", expected: true}, + {path: "./local/mypath.whl", expected: true}, + {path: ".\\local\\mypath.whl", expected: true}, + {path: "../local/*.whl", expected: true}, + {path: "..\\local\\*.whl", expected: true}, + {path: "./../local/*.whl", expected: true}, + {path: ".\\..\\local\\*.whl", expected: true}, + {path: "../../local/*.whl", expected: true}, + {path: "..\\..\\local\\*.whl", expected: true}, + {path: "pypipackage", expected: false}, + {path: "pypipackage/test.whl", expected: false}, + {path: "pypipackage/*.whl", expected: false}, + {path: "/Volumes/catalog/schema/volume/path.whl", expected: false}, + {path: "/Workspace/my_project/dist.whl", expected: false}, + {path: "-r /Workspace/my_project/requirements.txt", expected: false}, + } + + for _, tc := range testCases { + require.Equal(t, IsEnvironmentDependencyLocal(tc.path), tc.expected) + } +} diff --git a/bundle/libraries/match.go b/bundle/libraries/match.go index d051e163..096cdf4a 100644 --- a/bundle/libraries/match.go +++ b/bundle/libraries/match.go @@ -2,44 +2,77 @@ package libraries import ( "context" + "fmt" + "path/filepath" "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/diag" + "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" ) type match struct { } -func MatchWithArtifacts() bundle.Mutator { +func ValidateLocalLibrariesExist() bundle.Mutator { return &match{} } func (a *match) Name() string { - return "libraries.MatchWithArtifacts" + return "libraries.ValidateLocalLibrariesExist" } func (a *match) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - tasks := findAllTasks(b) - for _, task := range tasks { - if isMissingRequiredLibraries(task) { - return diag.Errorf("task '%s' is missing required libraries. Please include your package code in task libraries block", task.TaskKey) + for _, job := range b.Config.Resources.Jobs { + err := validateEnvironments(job.Environments, b) + if err != nil { + return diag.FromErr(err) } - for j := range task.Libraries { - lib := &task.Libraries[j] - _, err := findArtifactFiles(ctx, lib, b) + + for _, task := range job.JobSettings.Tasks { + err := validateTaskLibraries(task.Libraries, b) if err != nil { return diag.FromErr(err) } } } + return nil } -func isMissingRequiredLibraries(task *jobs.Task) bool { - if task.Libraries != nil { - return false +func validateTaskLibraries(libs []compute.Library, b *bundle.Bundle) error { + for _, lib := range libs { + path := libraryPath(&lib) + if path == "" || !IsLocalPath(path) { + continue + } + + matches, err := filepath.Glob(filepath.Join(b.RootPath, path)) + if err != nil { + return err + } + + if len(matches) == 0 { + return fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libraryPath(&lib)) + } } - return task.PythonWheelTask != nil || task.SparkJarTask != nil + return nil +} + +func validateEnvironments(envs []jobs.JobEnvironment, b *bundle.Bundle) error { + for _, env := range envs { + for _, dep := range env.Spec.Dependencies { + matches, err := filepath.Glob(filepath.Join(b.RootPath, dep)) + if err != nil { + return err + } + + if len(matches) == 0 && IsEnvironmentDependencyLocal(dep) { + return fmt.Errorf("file %s is referenced in environments section but doesn't exist on the local file system", dep) + } + } + } + + return nil } diff --git a/bundle/libraries/match_test.go b/bundle/libraries/match_test.go index 828c6564..bb4b1510 100644 --- a/bundle/libraries/match_test.go +++ b/bundle/libraries/match_test.go @@ -1 +1,148 @@ package libraries + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/internal/testutil" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/require" +) + +func TestValidateEnvironments(t *testing.T) { + tmpDir := t.TempDir() + testutil.Touch(t, tmpDir, "wheel.whl") + + b := &bundle.Bundle{ + RootPath: tmpDir, + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Environments: []jobs.JobEnvironment{ + { + Spec: &compute.Environment{ + Dependencies: []string{ + "./wheel.whl", + "simplejson", + "/Workspace/Users/foo@bar.com/artifacts/test.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist()) + require.Nil(t, diags) +} + +func TestValidateEnvironmentsNoFile(t *testing.T) { + tmpDir := t.TempDir() + + b := &bundle.Bundle{ + RootPath: tmpDir, + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Environments: []jobs.JobEnvironment{ + { + Spec: &compute.Environment{ + Dependencies: []string{ + "./wheel.whl", + "simplejson", + "/Workspace/Users/foo@bar.com/artifacts/test.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist()) + require.Len(t, diags, 1) + require.Equal(t, "file ./wheel.whl is referenced in environments section but doesn't exist on the local file system", diags[0].Summary) +} + +func TestValidateTaskLibraries(t *testing.T) { + tmpDir := t.TempDir() + testutil.Touch(t, tmpDir, "wheel.whl") + + b := &bundle.Bundle{ + RootPath: tmpDir, + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + Libraries: []compute.Library{ + { + Whl: "./wheel.whl", + }, + { + Whl: "/Workspace/Users/foo@bar.com/artifacts/test.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist()) + require.Nil(t, diags) +} + +func TestValidateTaskLibrariesNoFile(t *testing.T) { + tmpDir := t.TempDir() + + b := &bundle.Bundle{ + RootPath: tmpDir, + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + Libraries: []compute.Library{ + { + Whl: "./wheel.whl", + }, + { + Whl: "/Workspace/Users/foo@bar.com/artifacts/test.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist()) + require.Len(t, diags, 1) + require.Equal(t, "file ./wheel.whl is referenced in libraries section but doesn't exist on the local file system", diags[0].Summary) +} diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index de94c5a0..fce98b03 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -26,7 +26,7 @@ func Deploy() bundle.Mutator { terraform.StatePull(), deploy.StatePull(), mutator.ValidateGitDetails(), - libraries.MatchWithArtifacts(), + libraries.ValidateLocalLibrariesExist(), artifacts.CleanUp(), artifacts.UploadAll(), python.TransformWheelTask(), diff --git a/bundle/python/transform.go b/bundle/python/transform.go index 728d4e83..457b45f7 100644 --- a/bundle/python/transform.go +++ b/bundle/python/transform.go @@ -104,7 +104,7 @@ func (t *pythonTrampoline) GetTasks(b *bundle.Bundle) []mutator.TaskWithJobKey { // At this point of moment we don't have local paths in Libraries sections anymore // Local paths have been replaced with the remote when the artifacts where uploaded // in artifacts.UploadAll mutator. - if task.PythonWheelTask == nil || !needsTrampoline(task) { + if task.PythonWheelTask == nil || !needsTrampoline(*task) { continue } @@ -117,7 +117,7 @@ func (t *pythonTrampoline) GetTasks(b *bundle.Bundle) []mutator.TaskWithJobKey { return result } -func needsTrampoline(task *jobs.Task) bool { +func needsTrampoline(task jobs.Task) bool { return libraries.IsTaskWithWorkspaceLibraries(task) } diff --git a/bundle/tests/enviroment_key_test.go b/bundle/tests/enviroment_key_test.go new file mode 100644 index 00000000..3e12ddb6 --- /dev/null +++ b/bundle/tests/enviroment_key_test.go @@ -0,0 +1,12 @@ +package config_tests + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEnvironmentKeySupported(t *testing.T) { + _, diags := loadTargetWithDiags("./python_wheel/environment_key", "default") + require.Empty(t, diags) +} diff --git a/bundle/tests/python_wheel/environment_key/.gitignore b/bundle/tests/python_wheel/environment_key/.gitignore new file mode 100644 index 00000000..f03e23bc --- /dev/null +++ b/bundle/tests/python_wheel/environment_key/.gitignore @@ -0,0 +1,3 @@ +build/ +*.egg-info +.databricks diff --git a/bundle/tests/python_wheel/environment_key/databricks.yml b/bundle/tests/python_wheel/environment_key/databricks.yml new file mode 100644 index 00000000..198f8c0d --- /dev/null +++ b/bundle/tests/python_wheel/environment_key/databricks.yml @@ -0,0 +1,26 @@ +bundle: + name: environment_key + +artifacts: + my_test_code: + type: whl + path: "./my_test_code" + build: "python3 setup.py bdist_wheel" + +resources: + jobs: + test_job: + name: "My Wheel Job" + tasks: + - task_key: TestTask + existing_cluster_id: "0717-132531-5opeqon1" + python_wheel_task: + package_name: "my_test_code" + entry_point: "run" + environment_key: "test_env" + environments: + - environment_key: "test_env" + spec: + client: "1" + dependencies: + - ./my_test_code/dist/*.whl diff --git a/bundle/tests/python_wheel/environment_key/my_test_code/setup.py b/bundle/tests/python_wheel/environment_key/my_test_code/setup.py new file mode 100644 index 00000000..0bd871dd --- /dev/null +++ b/bundle/tests/python_wheel/environment_key/my_test_code/setup.py @@ -0,0 +1,15 @@ +from setuptools import setup, find_packages + +import src + +setup( + name="my_test_code", + version=src.__version__, + author=src.__author__, + url="https://databricks.com", + author_email="john.doe@databricks.com", + description="my test wheel", + packages=find_packages(include=["src"]), + entry_points={"group_1": "run=src.__main__:main"}, + install_requires=["setuptools"], +) diff --git a/bundle/tests/python_wheel/environment_key/my_test_code/src/__init__.py b/bundle/tests/python_wheel/environment_key/my_test_code/src/__init__.py new file mode 100644 index 00000000..909f1f32 --- /dev/null +++ b/bundle/tests/python_wheel/environment_key/my_test_code/src/__init__.py @@ -0,0 +1,2 @@ +__version__ = "0.0.1" +__author__ = "Databricks" diff --git a/bundle/tests/python_wheel/environment_key/my_test_code/src/__main__.py b/bundle/tests/python_wheel/environment_key/my_test_code/src/__main__.py new file mode 100644 index 00000000..73d045af --- /dev/null +++ b/bundle/tests/python_wheel/environment_key/my_test_code/src/__main__.py @@ -0,0 +1,16 @@ +""" +The entry point of the Python Wheel +""" + +import sys + + +def main(): + # This method will print the provided arguments + print('Hello from my func') + print('Got arguments:') + print(sys.argv) + + +if __name__ == '__main__': + main() diff --git a/bundle/tests/python_wheel_test.go b/bundle/tests/python_wheel_test.go index e2266516..8d0036a7 100644 --- a/bundle/tests/python_wheel_test.go +++ b/bundle/tests/python_wheel_test.go @@ -23,7 +23,7 @@ func TestPythonWheelBuild(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(matches)) - match := libraries.MatchWithArtifacts() + match := libraries.ValidateLocalLibrariesExist() diags = bundle.Apply(ctx, b, match) require.NoError(t, diags.Error()) } @@ -40,7 +40,7 @@ func TestPythonWheelBuildAutoDetect(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(matches)) - match := libraries.MatchWithArtifacts() + match := libraries.ValidateLocalLibrariesExist() diags = bundle.Apply(ctx, b, match) require.NoError(t, diags.Error()) } @@ -53,7 +53,7 @@ func TestPythonWheelWithDBFSLib(t *testing.T) { diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build())) require.NoError(t, diags.Error()) - match := libraries.MatchWithArtifacts() + match := libraries.ValidateLocalLibrariesExist() diags = bundle.Apply(ctx, b, match) require.NoError(t, diags.Error()) } @@ -66,7 +66,7 @@ func TestPythonWheelBuildNoBuildJustUpload(t *testing.T) { diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build())) require.NoError(t, diags.Error()) - match := libraries.MatchWithArtifacts() + match := libraries.ValidateLocalLibrariesExist() diags = bundle.Apply(ctx, b, match) require.ErrorContains(t, diags.Error(), "./non-existing/*.whl") @@ -79,3 +79,20 @@ func TestPythonWheelBuildNoBuildJustUpload(t *testing.T) { "my_test_code-0.0.1-py3-none-any.whl", )) } + +func TestPythonWheelBuildWithEnvironmentKey(t *testing.T) { + ctx := context.Background() + b, err := bundle.Load(ctx, "./python_wheel/environment_key") + require.NoError(t, err) + + diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build())) + require.NoError(t, diags.Error()) + + matches, err := filepath.Glob("./python_wheel/environment_key/my_test_code/dist/my_test_code-*.whl") + require.NoError(t, err) + require.Equal(t, 1, len(matches)) + + match := libraries.ValidateLocalLibrariesExist() + diags = bundle.Apply(ctx, b, match) + require.NoError(t, diags.Error()) +} diff --git a/internal/bundle/artifacts_test.go b/internal/bundle/artifacts_test.go index 866a1f6e..222b2304 100644 --- a/internal/bundle/artifacts_test.go +++ b/internal/bundle/artifacts_test.go @@ -89,3 +89,67 @@ func TestAccUploadArtifactFileToCorrectRemotePath(t *testing.T) { b.Config.Resources.Jobs["test"].JobSettings.Tasks[0].Libraries[0].Whl, ) } + +func TestAccUploadArtifactFileToCorrectRemotePathWithEnvironments(t *testing.T) { + ctx, wt := acc.WorkspaceTest(t) + w := wt.W + dir := t.TempDir() + whlPath := filepath.Join(dir, "dist", "test.whl") + touchEmptyFile(t, whlPath) + + wsDir := internal.TemporaryWorkspaceDir(t, w) + + b := &bundle.Bundle{ + RootPath: dir, + Config: config.Root{ + Bundle: config.Bundle{ + Target: "whatever", + }, + Workspace: config.Workspace{ + ArtifactPath: wsDir, + }, + Artifacts: config.Artifacts{ + "test": &config.Artifact{ + Type: "whl", + Files: []config.ArtifactFile{ + { + Source: whlPath, + }, + }, + }, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "test": { + JobSettings: &jobs.JobSettings{ + Environments: []jobs.JobEnvironment{ + { + Spec: &compute.Environment{ + Dependencies: []string{ + "dist/test.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.Apply(ctx, b, artifacts.BasicUpload("test")) + require.NoError(t, diags.Error()) + + // The remote path attribute on the artifact file should have been set. + require.Regexp(t, + regexp.MustCompile(path.Join(regexp.QuoteMeta(wsDir), `.internal/test\.whl`)), + b.Config.Artifacts["test"].Files[0].RemotePath, + ) + + // The job environment deps path should have been updated to the remote path. + require.Regexp(t, + regexp.MustCompile(path.Join("/Workspace", regexp.QuoteMeta(wsDir), `.internal/test\.whl`)), + b.Config.Resources.Jobs["test"].JobSettings.Environments[0].Spec.Dependencies[0], + ) +} diff --git a/internal/bundle/bundles/python_wheel_task_with_environments/databricks_template_schema.json b/internal/bundle/bundles/python_wheel_task_with_environments/databricks_template_schema.json new file mode 100644 index 00000000..ae765c58 --- /dev/null +++ b/internal/bundle/bundles/python_wheel_task_with_environments/databricks_template_schema.json @@ -0,0 +1,13 @@ +{ + "properties": { + "project_name": { + "type": "string", + "default": "my_test_code", + "description": "Unique name for this project" + }, + "unique_id": { + "type": "string", + "description": "Unique ID for job name" + } + } +} diff --git a/internal/bundle/bundles/python_wheel_task_with_environments/template/databricks.yml.tmpl b/internal/bundle/bundles/python_wheel_task_with_environments/template/databricks.yml.tmpl new file mode 100644 index 00000000..4a674dce --- /dev/null +++ b/internal/bundle/bundles/python_wheel_task_with_environments/template/databricks.yml.tmpl @@ -0,0 +1,25 @@ +bundle: + name: wheel-task-with-environments + +workspace: + root_path: "~/.bundle/{{.unique_id}}" + +resources: + jobs: + some_other_job: + name: "[${bundle.target}] Test Wheel Job With Environments {{.unique_id}}" + tasks: + - task_key: TestTask + python_wheel_task: + package_name: my_test_code + entry_point: run + parameters: + - "one" + - "two" + environment_key: "test" + environments: + - environment_key: "test" + spec: + client: "1" + dependencies: + - ./dist/*.whl diff --git a/internal/bundle/bundles/python_wheel_task_with_environments/template/setup.py.tmpl b/internal/bundle/bundles/python_wheel_task_with_environments/template/setup.py.tmpl new file mode 100644 index 00000000..b528657b --- /dev/null +++ b/internal/bundle/bundles/python_wheel_task_with_environments/template/setup.py.tmpl @@ -0,0 +1,15 @@ +from setuptools import setup, find_packages + +import {{.project_name}} + +setup( + name="{{.project_name}}", + version={{.project_name}}.__version__, + author={{.project_name}}.__author__, + url="https://databricks.com", + author_email="john.doe@databricks.com", + description="my example wheel", + packages=find_packages(include=["{{.project_name}}"]), + entry_points={"group1": "run={{.project_name}}.__main__:main"}, + install_requires=["setuptools"], +) diff --git a/internal/bundle/bundles/python_wheel_task_with_environments/template/{{.project_name}}/__init__.py b/internal/bundle/bundles/python_wheel_task_with_environments/template/{{.project_name}}/__init__.py new file mode 100644 index 00000000..909f1f32 --- /dev/null +++ b/internal/bundle/bundles/python_wheel_task_with_environments/template/{{.project_name}}/__init__.py @@ -0,0 +1,2 @@ +__version__ = "0.0.1" +__author__ = "Databricks" diff --git a/internal/bundle/bundles/python_wheel_task_with_environments/template/{{.project_name}}/__main__.py b/internal/bundle/bundles/python_wheel_task_with_environments/template/{{.project_name}}/__main__.py new file mode 100644 index 00000000..ea918ce2 --- /dev/null +++ b/internal/bundle/bundles/python_wheel_task_with_environments/template/{{.project_name}}/__main__.py @@ -0,0 +1,16 @@ +""" +The entry point of the Python Wheel +""" + +import sys + + +def main(): + # This method will print the provided arguments + print("Hello from my func") + print("Got arguments:") + print(sys.argv) + + +if __name__ == "__main__": + main() diff --git a/internal/bundle/environments_test.go b/internal/bundle/environments_test.go new file mode 100644 index 00000000..5cffe885 --- /dev/null +++ b/internal/bundle/environments_test.go @@ -0,0 +1,39 @@ +package bundle + +import ( + "testing" + + "github.com/databricks/cli/internal/acc" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func TestAccPythonWheelTaskWithEnvironmentsDeployAndRun(t *testing.T) { + t.Skip("Skipping test until serveless is enabled") + + ctx, _ := acc.WorkspaceTest(t) + + bundleRoot, err := initTestTemplate(t, ctx, "python_wheel_task_with_environments", map[string]any{ + "unique_id": uuid.New().String(), + }) + require.NoError(t, err) + + err = deployBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + t.Cleanup(func() { + destroyBundle(t, ctx, bundleRoot) + }) + + out, err := runResource(t, ctx, bundleRoot, "some_other_job") + require.NoError(t, err) + require.Contains(t, out, "Hello from my func") + require.Contains(t, out, "Got arguments:") + require.Contains(t, out, "['my_test_code', 'one', 'two']") + + out, err = runResourceWithParams(t, ctx, bundleRoot, "some_other_job", "--python-params=param1,param2") + require.NoError(t, err) + require.Contains(t, out, "Hello from my func") + require.Contains(t, out, "Got arguments:") + require.Contains(t, out, "['my_test_code', 'param1', 'param2']") +} diff --git a/internal/bundle/python_wheel_test.go b/internal/bundle/python_wheel_test.go index 1299194b..bf246292 100644 --- a/internal/bundle/python_wheel_test.go +++ b/internal/bundle/python_wheel_test.go @@ -43,7 +43,6 @@ func runPythonWheelTest(t *testing.T, sparkVersion string, pythonWheelWrapper bo } func TestAccPythonWheelTaskDeployAndRunWithoutWrapper(t *testing.T) { - // This is the first DBR version where we can install Python wheels from the Workspace File System. runPythonWheelTest(t, "13.3.x-snapshot-scala2.12", false) }