Added support for job environments (#1379)

## Changes
The main changes are:
1. Don't link artifacts to libraries anymore and instead just iterate
over all jobs and tasks when uploading artifacts and update local path
to remote
2. Iterating over `jobs.environments` to check if there are any local
libraries and checking that they exist locally
3. Added tests to check environments are handled correctly

End-to-end test will follow up

## Tests
Added regression test, existing tests (including integration one) pass
This commit is contained in:
Andrew Nester 2024-04-22 13:44:34 +02:00 committed by GitHub
parent 000a7fef8c
commit 1872aa12b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 820 additions and 241 deletions

View File

@ -12,7 +12,6 @@ import (
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/artifacts/whl" "github.com/databricks/cli/bundle/artifacts/whl"
"github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
@ -117,8 +116,6 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost
} }
func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error { func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error {
filesToLibraries := libraries.MapFilesToTaskLibraries(ctx, b)
for i := range a.Files { for i := range a.Files {
f := &a.Files[i] f := &a.Files[i]
@ -133,24 +130,32 @@ func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, u
log.Infof(ctx, "Upload succeeded") log.Infof(ctx, "Upload succeeded")
f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source)) f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source))
// Lookup all tasks that reference this file. // TODO: confirm if we still need to update the remote path to start with /Workspace
libs, ok := filesToLibraries[f.Source]
if !ok {
log.Debugf(ctx, "No tasks reference %s", f.Source)
continue
}
// Update all tasks that reference this file.
for _, lib := range libs {
wsfsBase := "/Workspace" wsfsBase := "/Workspace"
remotePath := path.Join(wsfsBase, f.RemotePath) remotePath := path.Join(wsfsBase, f.RemotePath)
if lib.Whl != "" {
for _, job := range b.Config.Resources.Jobs {
for i := range job.Tasks {
task := &job.Tasks[i]
for j := range task.Libraries {
lib := &task.Libraries[j]
if lib.Whl != "" && isArtifactMatchLibrary(f, lib.Whl, b) {
lib.Whl = remotePath lib.Whl = remotePath
continue
} }
if lib.Jar != "" { if lib.Jar != "" && isArtifactMatchLibrary(f, lib.Jar, b) {
lib.Jar = remotePath lib.Jar = remotePath
continue }
}
}
for i := range job.Environments {
env := &job.Environments[i]
for j := range env.Spec.Dependencies {
lib := env.Spec.Dependencies[j]
if isArtifactMatchLibrary(f, lib, b) {
env.Spec.Dependencies[j] = remotePath
}
}
} }
} }
} }
@ -158,6 +163,26 @@ func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, u
return nil return nil
} }
func isArtifactMatchLibrary(f *config.ArtifactFile, libPath string, b *bundle.Bundle) bool {
if !filepath.IsAbs(libPath) {
libPath = filepath.Join(b.RootPath, libPath)
}
// libPath can be a glob pattern, so do the match first
matches, err := filepath.Glob(libPath)
if err != nil {
return false
}
for _, m := range matches {
if m == f.Source {
return true
}
}
return false
}
// Function to upload artifact file to Workspace // Function to upload artifact file to Workspace
func uploadArtifactFile(ctx context.Context, file string, client filer.Filer) error { func uploadArtifactFile(ctx context.Context, file string, client filer.Filer) error {
raw, err := os.ReadFile(file) raw, err := os.ReadFile(file)

View File

@ -0,0 +1,91 @@
package artifacts
import (
"context"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
mockfiler "github.com/databricks/cli/internal/mocks/libs/filer"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestArtifactUpload(t *testing.T) {
tmpDir := t.TempDir()
whlFolder := filepath.Join(tmpDir, "whl")
testutil.Touch(t, whlFolder, "source.whl")
whlLocalPath := filepath.Join(whlFolder, "source.whl")
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/foo/bar/artifacts",
},
Artifacts: config.Artifacts{
"whl": {
Type: config.ArtifactPythonWheel,
Files: []config.ArtifactFile{
{Source: whlLocalPath},
},
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
},
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
filepath.Join("whl", "source.whl"),
"/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
},
},
},
},
},
},
}
artifact := b.Config.Artifacts["whl"]
mockFiler := mockfiler.NewMockFiler(t)
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source.whl"),
mock.AnythingOfType("*bytes.Reader"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil)
err := uploadArtifact(context.Background(), b, artifact, "/foo/bar/artifacts", mockFiler)
require.NoError(t, err)
// Test that libraries path is updated
require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl)
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl)
require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0])
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1])
}

View File

@ -30,10 +30,31 @@ func (*fromLibraries) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost
tasks := libraries.FindAllWheelTasksWithLocalLibraries(b) tasks := libraries.FindAllWheelTasksWithLocalLibraries(b)
for _, task := range tasks { for _, task := range tasks {
for _, lib := range task.Libraries { for _, lib := range task.Libraries {
matches, err := filepath.Glob(filepath.Join(b.RootPath, lib.Whl)) matchAndAdd(ctx, lib.Whl, b)
}
}
envs := libraries.FindAllEnvironments(b)
for _, jobEnvs := range envs {
for _, env := range jobEnvs {
if env.Spec != nil {
for _, dep := range env.Spec.Dependencies {
if libraries.IsEnvironmentDependencyLocal(dep) {
matchAndAdd(ctx, dep, b)
}
}
}
}
}
return nil
}
func matchAndAdd(ctx context.Context, lib string, b *bundle.Bundle) {
matches, err := filepath.Glob(filepath.Join(b.RootPath, lib))
// File referenced from libraries section does not exists, skipping // File referenced from libraries section does not exists, skipping
if err != nil { if err != nil {
continue return
} }
for _, match := range matches { for _, match := range matches {
@ -50,8 +71,4 @@ func (*fromLibraries) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost
Type: config.ArtifactPythonWheel, Type: config.ArtifactPythonWheel,
} }
} }
}
}
return nil
} }

View File

@ -152,6 +152,13 @@ func translateNoOp(literal, localFullPath, localRelPath, remotePath string) (str
return localRelPath, nil return localRelPath, nil
} }
func translateNoOpWithPrefix(literal, localFullPath, localRelPath, remotePath string) (string, error) {
if !strings.HasPrefix(localRelPath, ".") {
localRelPath = "." + string(filepath.Separator) + localRelPath
}
return localRelPath, nil
}
func (m *translatePaths) rewriteValue(b *bundle.Bundle, p dyn.Path, v dyn.Value, fn rewriteFunc, dir string) (dyn.Value, error) { func (m *translatePaths) rewriteValue(b *bundle.Bundle, p dyn.Path, v dyn.Value, fn rewriteFunc, dir string) (dyn.Value, error) {
out := v.MustString() out := v.MustString()
err := m.rewritePath(dir, b, &out, fn) err := m.rewritePath(dir, b, &out, fn)

View File

@ -5,39 +5,51 @@ import (
"slices" "slices"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn"
) )
type jobTaskRewritePattern struct { type jobRewritePattern struct {
pattern dyn.Pattern pattern dyn.Pattern
fn rewriteFunc fn rewriteFunc
skipRewrite func(string) bool
} }
func rewritePatterns(base dyn.Pattern) []jobTaskRewritePattern { func noSkipRewrite(string) bool {
return []jobTaskRewritePattern{ return false
}
func rewritePatterns(base dyn.Pattern) []jobRewritePattern {
return []jobRewritePattern{
{ {
base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")), base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")),
translateNotebookPath, translateNotebookPath,
noSkipRewrite,
}, },
{ {
base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")), base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")),
translateFilePath, translateFilePath,
noSkipRewrite,
}, },
{ {
base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")), base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")),
translateDirectoryPath, translateDirectoryPath,
noSkipRewrite,
}, },
{ {
base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")), base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")),
translateFilePath, translateFilePath,
noSkipRewrite,
}, },
{ {
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")), base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")),
translateNoOp, translateNoOp,
noSkipRewrite,
}, },
{ {
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")), base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")),
translateNoOp, translateNoOp,
noSkipRewrite,
}, },
} }
} }
@ -73,9 +85,28 @@ func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dy
) )
// Compile list of patterns and their respective rewrite functions. // Compile list of patterns and their respective rewrite functions.
jobEnvironmentsPatterns := []jobRewritePattern{
{
dyn.NewPattern(
dyn.Key("resources"),
dyn.Key("jobs"),
dyn.AnyKey(),
dyn.Key("environments"),
dyn.AnyIndex(),
dyn.Key("spec"),
dyn.Key("dependencies"),
dyn.AnyIndex(),
),
translateNoOpWithPrefix,
func(s string) bool {
return !libraries.IsEnvironmentDependencyLocal(s)
},
},
}
taskPatterns := rewritePatterns(base) taskPatterns := rewritePatterns(base)
forEachPatterns := rewritePatterns(base.Append(dyn.Key("for_each_task"), dyn.Key("task"))) forEachPatterns := rewritePatterns(base.Append(dyn.Key("for_each_task"), dyn.Key("task")))
allPatterns := append(taskPatterns, forEachPatterns...) allPatterns := append(taskPatterns, jobEnvironmentsPatterns...)
allPatterns = append(allPatterns, forEachPatterns...)
for _, t := range allPatterns { for _, t := range allPatterns {
v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
@ -91,6 +122,10 @@ func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dy
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err) return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err)
} }
sv := v.MustString()
if t.skipRewrite(sv) {
return v, nil
}
return m.rewriteRelativeTo(b, p, v, t.fn, dir, fallback[key]) return m.rewriteRelativeTo(b, p, v, t.fn, dir, fallback[key])
}) })
if err != nil { if err != nil {

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"testing" "testing"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
@ -651,3 +652,45 @@ func TestPipelineFileLibraryWithNotebookSourceError(t *testing.T) {
diags := bundle.Apply(context.Background(), b, mutator.TranslatePaths()) diags := bundle.Apply(context.Background(), b, mutator.TranslatePaths())
assert.ErrorContains(t, diags.Error(), `expected a file for "resources.pipelines.pipeline.libraries[0].file.path" but got a notebook`) assert.ErrorContains(t, diags.Error(), `expected a file for "resources.pipelines.pipeline.libraries[0].file.path" but got a notebook`)
} }
func TestTranslatePathJobEnvironments(t *testing.T) {
dir := t.TempDir()
touchEmptyFile(t, filepath.Join(dir, "env1.py"))
touchEmptyFile(t, filepath.Join(dir, "env2.py"))
b := &bundle.Bundle{
RootPath: dir,
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
"./dist/env1.whl",
"../dist/env2.whl",
"simplejson",
"/Workspace/Users/foo@bar.com/test.whl",
},
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs", filepath.Join(dir, "job/resource.yml"))
diags := bundle.Apply(context.Background(), b, mutator.TranslatePaths())
require.NoError(t, diags.Error())
assert.Equal(t, strings.Join([]string{".", "job", "dist", "env1.whl"}, string(os.PathSeparator)), b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0])
assert.Equal(t, strings.Join([]string{".", "dist", "env2.whl"}, string(os.PathSeparator)), b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1])
assert.Equal(t, "simplejson", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[2])
assert.Equal(t, "/Workspace/Users/foo@bar.com/test.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[3])
}

View File

@ -24,6 +24,7 @@ func convertJobResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
"tasks": "task", "tasks": "task",
"job_clusters": "job_cluster", "job_clusters": "job_cluster",
"parameters": "parameter", "parameters": "parameter",
"environments": "environment",
}) })
if err != nil { if err != nil {
return dyn.InvalidValue, err return dyn.InvalidValue, err

View File

@ -1,45 +1,71 @@
package libraries package libraries
import ( import (
"context"
"fmt"
"path/filepath"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
) )
func findAllTasks(b *bundle.Bundle) []*jobs.Task { func findAllTasks(b *bundle.Bundle) map[string]([]jobs.Task) {
r := b.Config.Resources r := b.Config.Resources
result := make([]*jobs.Task, 0) result := make(map[string]([]jobs.Task), 0)
for k := range b.Config.Resources.Jobs { for k := range b.Config.Resources.Jobs {
tasks := r.Jobs[k].JobSettings.Tasks result[k] = append(result[k], r.Jobs[k].JobSettings.Tasks...)
for i := range tasks {
task := &tasks[i]
result = append(result, task)
}
} }
return result return result
} }
func FindAllEnvironments(b *bundle.Bundle) map[string]([]jobs.JobEnvironment) {
jobEnvs := make(map[string]([]jobs.JobEnvironment), 0)
for jobKey, job := range b.Config.Resources.Jobs {
if len(job.Environments) == 0 {
continue
}
jobEnvs[jobKey] = job.Environments
}
return jobEnvs
}
func isEnvsWithLocalLibraries(envs []jobs.JobEnvironment) bool {
for _, e := range envs {
for _, l := range e.Spec.Dependencies {
if IsEnvironmentDependencyLocal(l) {
return true
}
}
}
return false
}
func FindAllWheelTasksWithLocalLibraries(b *bundle.Bundle) []*jobs.Task { func FindAllWheelTasksWithLocalLibraries(b *bundle.Bundle) []*jobs.Task {
tasks := findAllTasks(b) tasks := findAllTasks(b)
envs := FindAllEnvironments(b)
wheelTasks := make([]*jobs.Task, 0) wheelTasks := make([]*jobs.Task, 0)
for _, task := range tasks { for k, jobTasks := range tasks {
if task.PythonWheelTask != nil && IsTaskWithLocalLibraries(task) { for i := range jobTasks {
task := &jobTasks[i]
if task.PythonWheelTask == nil {
continue
}
if isTaskWithLocalLibraries(*task) {
wheelTasks = append(wheelTasks, task) wheelTasks = append(wheelTasks, task)
} }
if envs[k] != nil && isEnvsWithLocalLibraries(envs[k]) {
wheelTasks = append(wheelTasks, task)
}
}
} }
return wheelTasks return wheelTasks
} }
func IsTaskWithLocalLibraries(task *jobs.Task) bool { func isTaskWithLocalLibraries(task jobs.Task) bool {
for _, l := range task.Libraries { for _, l := range task.Libraries {
if IsLocalLibrary(&l) { if IsLocalLibrary(&l) {
return true return true
@ -49,7 +75,7 @@ func IsTaskWithLocalLibraries(task *jobs.Task) bool {
return false return false
} }
func IsTaskWithWorkspaceLibraries(task *jobs.Task) bool { func IsTaskWithWorkspaceLibraries(task jobs.Task) bool {
for _, l := range task.Libraries { for _, l := range task.Libraries {
if IsWorkspaceLibrary(&l) { if IsWorkspaceLibrary(&l) {
return true return true
@ -58,73 +84,3 @@ func IsTaskWithWorkspaceLibraries(task *jobs.Task) bool {
return false return false
} }
func findLibraryMatches(lib *compute.Library, b *bundle.Bundle) ([]string, error) {
path := libraryPath(lib)
if path == "" {
return nil, nil
}
fullPath := filepath.Join(b.RootPath, path)
return filepath.Glob(fullPath)
}
func findArtifactFiles(ctx context.Context, lib *compute.Library, b *bundle.Bundle) ([]*config.ArtifactFile, error) {
matches, err := findLibraryMatches(lib, b)
if err != nil {
return nil, err
}
if len(matches) == 0 && IsLocalLibrary(lib) {
return nil, fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libraryPath(lib))
}
var out []*config.ArtifactFile
for _, match := range matches {
af, err := findArtifactFileByLocalPath(match, b)
if err != nil {
cmdio.LogString(ctx, fmt.Sprintf("%s. Skipping uploading. In order to use the define 'artifacts' section", err.Error()))
} else {
out = append(out, af)
}
}
return out, nil
}
func findArtifactFileByLocalPath(path string, b *bundle.Bundle) (*config.ArtifactFile, error) {
for _, a := range b.Config.Artifacts {
for k := range a.Files {
if a.Files[k].Source == path {
return &a.Files[k], nil
}
}
}
return nil, fmt.Errorf("artifact section is not defined for file at %s", path)
}
func MapFilesToTaskLibraries(ctx context.Context, b *bundle.Bundle) map[string][]*compute.Library {
tasks := findAllTasks(b)
out := make(map[string][]*compute.Library)
for _, task := range tasks {
for j := range task.Libraries {
lib := &task.Libraries[j]
if !IsLocalLibrary(lib) {
continue
}
matches, err := findLibraryMatches(lib, b)
if err != nil {
log.Warnf(ctx, "Error matching library to files: %s", err.Error())
continue
}
for _, match := range matches {
out[match] = append(out[match], lib)
}
}
}
return out
}

View File

@ -1,88 +0,0 @@
package libraries
import (
"context"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
)
func TestMapFilesToTaskLibrariesNoGlob(t *testing.T) {
b := &bundle.Bundle{
RootPath: "testdata",
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: "library1",
},
{
Whl: "library2",
},
{
Whl: "/absolute/path/in/workspace/library3",
},
},
},
{
Libraries: []compute.Library{
{
Whl: "library1",
},
{
Whl: "library2",
},
},
},
},
},
},
"job2": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: "library1",
},
{
Whl: "library2",
},
},
},
},
},
},
},
},
},
}
out := MapFilesToTaskLibraries(context.Background(), b)
assert.Len(t, out, 2)
// Pointer equality for "library1"
assert.Equal(t, []*compute.Library{
&b.Config.Resources.Jobs["job1"].JobSettings.Tasks[0].Libraries[0],
&b.Config.Resources.Jobs["job1"].JobSettings.Tasks[1].Libraries[0],
&b.Config.Resources.Jobs["job2"].JobSettings.Tasks[0].Libraries[0],
}, out[filepath.Clean("testdata/library1")])
// Pointer equality for "library2"
assert.Equal(t, []*compute.Library{
&b.Config.Resources.Jobs["job1"].JobSettings.Tasks[0].Libraries[1],
&b.Config.Resources.Jobs["job1"].JobSettings.Tasks[1].Libraries[1],
&b.Config.Resources.Jobs["job2"].JobSettings.Tasks[0].Libraries[1],
}, out[filepath.Clean("testdata/library2")])
}

View File

@ -38,6 +38,25 @@ func IsLocalPath(p string) bool {
return !path.IsAbs(p) return !path.IsAbs(p)
} }
// IsEnvironmentDependencyLocal returns true if the specified dependency
// should be interpreted as a local path.
// We use this to check if the dependency in environment spec is local.
// We can't use IsLocalPath beacuse environment dependencies can be
// a pypi package name which can be misinterpreted as a local path by IsLocalPath.
func IsEnvironmentDependencyLocal(dep string) bool {
possiblePrefixes := []string{
".",
}
for _, prefix := range possiblePrefixes {
if strings.HasPrefix(dep, prefix) {
return true
}
}
return false
}
func isRemoteStorageScheme(path string) bool { func isRemoteStorageScheme(path string) bool {
url, err := url.Parse(path) url, err := url.Parse(path)
if err != nil { if err != nil {

View File

@ -5,6 +5,7 @@ import (
"github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestIsLocalPath(t *testing.T) { func TestIsLocalPath(t *testing.T) {
@ -41,3 +42,31 @@ func TestIsLocalLibrary(t *testing.T) {
// Empty. // Empty.
assert.False(t, IsLocalLibrary(&compute.Library{})) assert.False(t, IsLocalLibrary(&compute.Library{}))
} }
func TestIsEnvironmentDependencyLocal(t *testing.T) {
testCases := [](struct {
path string
expected bool
}){
{path: "./local/*.whl", expected: true},
{path: ".\\local\\*.whl", expected: true},
{path: "./local/mypath.whl", expected: true},
{path: ".\\local\\mypath.whl", expected: true},
{path: "../local/*.whl", expected: true},
{path: "..\\local\\*.whl", expected: true},
{path: "./../local/*.whl", expected: true},
{path: ".\\..\\local\\*.whl", expected: true},
{path: "../../local/*.whl", expected: true},
{path: "..\\..\\local\\*.whl", expected: true},
{path: "pypipackage", expected: false},
{path: "pypipackage/test.whl", expected: false},
{path: "pypipackage/*.whl", expected: false},
{path: "/Volumes/catalog/schema/volume/path.whl", expected: false},
{path: "/Workspace/my_project/dist.whl", expected: false},
{path: "-r /Workspace/my_project/requirements.txt", expected: false},
}
for _, tc := range testCases {
require.Equal(t, IsEnvironmentDependencyLocal(tc.path), tc.expected)
}
}

View File

@ -2,44 +2,77 @@ package libraries
import ( import (
"context" "context"
"fmt"
"path/filepath"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
) )
type match struct { type match struct {
} }
func MatchWithArtifacts() bundle.Mutator { func ValidateLocalLibrariesExist() bundle.Mutator {
return &match{} return &match{}
} }
func (a *match) Name() string { func (a *match) Name() string {
return "libraries.MatchWithArtifacts" return "libraries.ValidateLocalLibrariesExist"
} }
func (a *match) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { func (a *match) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
tasks := findAllTasks(b) for _, job := range b.Config.Resources.Jobs {
for _, task := range tasks { err := validateEnvironments(job.Environments, b)
if isMissingRequiredLibraries(task) { if err != nil {
return diag.Errorf("task '%s' is missing required libraries. Please include your package code in task libraries block", task.TaskKey) return diag.FromErr(err)
} }
for j := range task.Libraries {
lib := &task.Libraries[j] for _, task := range job.JobSettings.Tasks {
_, err := findArtifactFiles(ctx, lib, b) err := validateTaskLibraries(task.Libraries, b)
if err != nil { if err != nil {
return diag.FromErr(err) return diag.FromErr(err)
} }
} }
} }
return nil return nil
} }
func isMissingRequiredLibraries(task *jobs.Task) bool { func validateTaskLibraries(libs []compute.Library, b *bundle.Bundle) error {
if task.Libraries != nil { for _, lib := range libs {
return false path := libraryPath(&lib)
if path == "" || !IsLocalPath(path) {
continue
} }
return task.PythonWheelTask != nil || task.SparkJarTask != nil matches, err := filepath.Glob(filepath.Join(b.RootPath, path))
if err != nil {
return err
}
if len(matches) == 0 {
return fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libraryPath(&lib))
}
}
return nil
}
func validateEnvironments(envs []jobs.JobEnvironment, b *bundle.Bundle) error {
for _, env := range envs {
for _, dep := range env.Spec.Dependencies {
matches, err := filepath.Glob(filepath.Join(b.RootPath, dep))
if err != nil {
return err
}
if len(matches) == 0 && IsEnvironmentDependencyLocal(dep) {
return fmt.Errorf("file %s is referenced in environments section but doesn't exist on the local file system", dep)
}
}
}
return nil
} }

View File

@ -1 +1,148 @@
package libraries package libraries
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)
func TestValidateEnvironments(t *testing.T) {
tmpDir := t.TempDir()
testutil.Touch(t, tmpDir, "wheel.whl")
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
"./wheel.whl",
"simplejson",
"/Workspace/Users/foo@bar.com/artifacts/test.whl",
},
},
},
},
},
},
},
},
},
}
diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist())
require.Nil(t, diags)
}
func TestValidateEnvironmentsNoFile(t *testing.T) {
tmpDir := t.TempDir()
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
"./wheel.whl",
"simplejson",
"/Workspace/Users/foo@bar.com/artifacts/test.whl",
},
},
},
},
},
},
},
},
},
}
diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist())
require.Len(t, diags, 1)
require.Equal(t, "file ./wheel.whl is referenced in environments section but doesn't exist on the local file system", diags[0].Summary)
}
func TestValidateTaskLibraries(t *testing.T) {
tmpDir := t.TempDir()
testutil.Touch(t, tmpDir, "wheel.whl")
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: "./wheel.whl",
},
{
Whl: "/Workspace/Users/foo@bar.com/artifacts/test.whl",
},
},
},
},
},
},
},
},
},
}
diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist())
require.Nil(t, diags)
}
func TestValidateTaskLibrariesNoFile(t *testing.T) {
tmpDir := t.TempDir()
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: "./wheel.whl",
},
{
Whl: "/Workspace/Users/foo@bar.com/artifacts/test.whl",
},
},
},
},
},
},
},
},
},
}
diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist())
require.Len(t, diags, 1)
require.Equal(t, "file ./wheel.whl is referenced in libraries section but doesn't exist on the local file system", diags[0].Summary)
}

View File

@ -26,7 +26,7 @@ func Deploy() bundle.Mutator {
terraform.StatePull(), terraform.StatePull(),
deploy.StatePull(), deploy.StatePull(),
mutator.ValidateGitDetails(), mutator.ValidateGitDetails(),
libraries.MatchWithArtifacts(), libraries.ValidateLocalLibrariesExist(),
artifacts.CleanUp(), artifacts.CleanUp(),
artifacts.UploadAll(), artifacts.UploadAll(),
python.TransformWheelTask(), python.TransformWheelTask(),

View File

@ -104,7 +104,7 @@ func (t *pythonTrampoline) GetTasks(b *bundle.Bundle) []mutator.TaskWithJobKey {
// At this point of moment we don't have local paths in Libraries sections anymore // At this point of moment we don't have local paths in Libraries sections anymore
// Local paths have been replaced with the remote when the artifacts where uploaded // Local paths have been replaced with the remote when the artifacts where uploaded
// in artifacts.UploadAll mutator. // in artifacts.UploadAll mutator.
if task.PythonWheelTask == nil || !needsTrampoline(task) { if task.PythonWheelTask == nil || !needsTrampoline(*task) {
continue continue
} }
@ -117,7 +117,7 @@ func (t *pythonTrampoline) GetTasks(b *bundle.Bundle) []mutator.TaskWithJobKey {
return result return result
} }
func needsTrampoline(task *jobs.Task) bool { func needsTrampoline(task jobs.Task) bool {
return libraries.IsTaskWithWorkspaceLibraries(task) return libraries.IsTaskWithWorkspaceLibraries(task)
} }

View File

@ -0,0 +1,12 @@
package config_tests
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestEnvironmentKeySupported(t *testing.T) {
_, diags := loadTargetWithDiags("./python_wheel/environment_key", "default")
require.Empty(t, diags)
}

View File

@ -0,0 +1,3 @@
build/
*.egg-info
.databricks

View File

@ -0,0 +1,26 @@
bundle:
name: environment_key
artifacts:
my_test_code:
type: whl
path: "./my_test_code"
build: "python3 setup.py bdist_wheel"
resources:
jobs:
test_job:
name: "My Wheel Job"
tasks:
- task_key: TestTask
existing_cluster_id: "0717-132531-5opeqon1"
python_wheel_task:
package_name: "my_test_code"
entry_point: "run"
environment_key: "test_env"
environments:
- environment_key: "test_env"
spec:
client: "1"
dependencies:
- ./my_test_code/dist/*.whl

View File

@ -0,0 +1,15 @@
from setuptools import setup, find_packages
import src
setup(
name="my_test_code",
version=src.__version__,
author=src.__author__,
url="https://databricks.com",
author_email="john.doe@databricks.com",
description="my test wheel",
packages=find_packages(include=["src"]),
entry_points={"group_1": "run=src.__main__:main"},
install_requires=["setuptools"],
)

View File

@ -0,0 +1,2 @@
__version__ = "0.0.1"
__author__ = "Databricks"

View File

@ -0,0 +1,16 @@
"""
The entry point of the Python Wheel
"""
import sys
def main():
# This method will print the provided arguments
print('Hello from my func')
print('Got arguments:')
print(sys.argv)
if __name__ == '__main__':
main()

View File

@ -23,7 +23,7 @@ func TestPythonWheelBuild(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, len(matches)) require.Equal(t, 1, len(matches))
match := libraries.MatchWithArtifacts() match := libraries.ValidateLocalLibrariesExist()
diags = bundle.Apply(ctx, b, match) diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
} }
@ -40,7 +40,7 @@ func TestPythonWheelBuildAutoDetect(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, len(matches)) require.Equal(t, 1, len(matches))
match := libraries.MatchWithArtifacts() match := libraries.ValidateLocalLibrariesExist()
diags = bundle.Apply(ctx, b, match) diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
} }
@ -53,7 +53,7 @@ func TestPythonWheelWithDBFSLib(t *testing.T) {
diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build())) diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build()))
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
match := libraries.MatchWithArtifacts() match := libraries.ValidateLocalLibrariesExist()
diags = bundle.Apply(ctx, b, match) diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
} }
@ -66,7 +66,7 @@ func TestPythonWheelBuildNoBuildJustUpload(t *testing.T) {
diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build())) diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build()))
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
match := libraries.MatchWithArtifacts() match := libraries.ValidateLocalLibrariesExist()
diags = bundle.Apply(ctx, b, match) diags = bundle.Apply(ctx, b, match)
require.ErrorContains(t, diags.Error(), "./non-existing/*.whl") require.ErrorContains(t, diags.Error(), "./non-existing/*.whl")
@ -79,3 +79,20 @@ func TestPythonWheelBuildNoBuildJustUpload(t *testing.T) {
"my_test_code-0.0.1-py3-none-any.whl", "my_test_code-0.0.1-py3-none-any.whl",
)) ))
} }
func TestPythonWheelBuildWithEnvironmentKey(t *testing.T) {
ctx := context.Background()
b, err := bundle.Load(ctx, "./python_wheel/environment_key")
require.NoError(t, err)
diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build()))
require.NoError(t, diags.Error())
matches, err := filepath.Glob("./python_wheel/environment_key/my_test_code/dist/my_test_code-*.whl")
require.NoError(t, err)
require.Equal(t, 1, len(matches))
match := libraries.ValidateLocalLibrariesExist()
diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error())
}

View File

@ -89,3 +89,67 @@ func TestAccUploadArtifactFileToCorrectRemotePath(t *testing.T) {
b.Config.Resources.Jobs["test"].JobSettings.Tasks[0].Libraries[0].Whl, b.Config.Resources.Jobs["test"].JobSettings.Tasks[0].Libraries[0].Whl,
) )
} }
func TestAccUploadArtifactFileToCorrectRemotePathWithEnvironments(t *testing.T) {
ctx, wt := acc.WorkspaceTest(t)
w := wt.W
dir := t.TempDir()
whlPath := filepath.Join(dir, "dist", "test.whl")
touchEmptyFile(t, whlPath)
wsDir := internal.TemporaryWorkspaceDir(t, w)
b := &bundle.Bundle{
RootPath: dir,
Config: config.Root{
Bundle: config.Bundle{
Target: "whatever",
},
Workspace: config.Workspace{
ArtifactPath: wsDir,
},
Artifacts: config.Artifacts{
"test": &config.Artifact{
Type: "whl",
Files: []config.ArtifactFile{
{
Source: whlPath,
},
},
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"test": {
JobSettings: &jobs.JobSettings{
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
"dist/test.whl",
},
},
},
},
},
},
},
},
},
}
diags := bundle.Apply(ctx, b, artifacts.BasicUpload("test"))
require.NoError(t, diags.Error())
// The remote path attribute on the artifact file should have been set.
require.Regexp(t,
regexp.MustCompile(path.Join(regexp.QuoteMeta(wsDir), `.internal/test\.whl`)),
b.Config.Artifacts["test"].Files[0].RemotePath,
)
// The job environment deps path should have been updated to the remote path.
require.Regexp(t,
regexp.MustCompile(path.Join("/Workspace", regexp.QuoteMeta(wsDir), `.internal/test\.whl`)),
b.Config.Resources.Jobs["test"].JobSettings.Environments[0].Spec.Dependencies[0],
)
}

View File

@ -0,0 +1,13 @@
{
"properties": {
"project_name": {
"type": "string",
"default": "my_test_code",
"description": "Unique name for this project"
},
"unique_id": {
"type": "string",
"description": "Unique ID for job name"
}
}
}

View File

@ -0,0 +1,25 @@
bundle:
name: wheel-task-with-environments
workspace:
root_path: "~/.bundle/{{.unique_id}}"
resources:
jobs:
some_other_job:
name: "[${bundle.target}] Test Wheel Job With Environments {{.unique_id}}"
tasks:
- task_key: TestTask
python_wheel_task:
package_name: my_test_code
entry_point: run
parameters:
- "one"
- "two"
environment_key: "test"
environments:
- environment_key: "test"
spec:
client: "1"
dependencies:
- ./dist/*.whl

View File

@ -0,0 +1,15 @@
from setuptools import setup, find_packages
import {{.project_name}}
setup(
name="{{.project_name}}",
version={{.project_name}}.__version__,
author={{.project_name}}.__author__,
url="https://databricks.com",
author_email="john.doe@databricks.com",
description="my example wheel",
packages=find_packages(include=["{{.project_name}}"]),
entry_points={"group1": "run={{.project_name}}.__main__:main"},
install_requires=["setuptools"],
)

View File

@ -0,0 +1,2 @@
__version__ = "0.0.1"
__author__ = "Databricks"

View File

@ -0,0 +1,16 @@
"""
The entry point of the Python Wheel
"""
import sys
def main():
# This method will print the provided arguments
print("Hello from my func")
print("Got arguments:")
print(sys.argv)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,39 @@
package bundle
import (
"testing"
"github.com/databricks/cli/internal/acc"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
func TestAccPythonWheelTaskWithEnvironmentsDeployAndRun(t *testing.T) {
t.Skip("Skipping test until serveless is enabled")
ctx, _ := acc.WorkspaceTest(t)
bundleRoot, err := initTestTemplate(t, ctx, "python_wheel_task_with_environments", map[string]any{
"unique_id": uuid.New().String(),
})
require.NoError(t, err)
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)
t.Cleanup(func() {
destroyBundle(t, ctx, bundleRoot)
})
out, err := runResource(t, ctx, bundleRoot, "some_other_job")
require.NoError(t, err)
require.Contains(t, out, "Hello from my func")
require.Contains(t, out, "Got arguments:")
require.Contains(t, out, "['my_test_code', 'one', 'two']")
out, err = runResourceWithParams(t, ctx, bundleRoot, "some_other_job", "--python-params=param1,param2")
require.NoError(t, err)
require.Contains(t, out, "Hello from my func")
require.Contains(t, out, "Got arguments:")
require.Contains(t, out, "['my_test_code', 'param1', 'param2']")
}

View File

@ -43,7 +43,6 @@ func runPythonWheelTest(t *testing.T, sparkVersion string, pythonWheelWrapper bo
} }
func TestAccPythonWheelTaskDeployAndRunWithoutWrapper(t *testing.T) { func TestAccPythonWheelTaskDeployAndRunWithoutWrapper(t *testing.T) {
// This is the first DBR version where we can install Python wheels from the Workspace File System.
runPythonWheelTest(t, "13.3.x-snapshot-scala2.12", false) runPythonWheelTest(t, "13.3.x-snapshot-scala2.12", false)
} }