Upload local libraries even if they don't have artifact defined (#1664)

## Changes
Previously for all the libraries referenced in configuration DABs made
sure that there is corresponding artifact section.
But this is not really necessary and flexible, because local libraries
might be built outside of dabs context.
It also created difficult to follow logic in code where we back
referenced libraries to artifacts which was difficult to fllow


This PR does 3 things:
1. Allows all local libraries referenced in DABs config to be uploaded
to remote
2. Simplifies upload and glob references expand logic by doing this in
single place
3. Speed things up by uploading library only once and doing this in
parallel

## Tests
Added unit + integration tests + made sure that change is backward
compatible (no changes in existing tests)

---------

Co-authored-by: Pieter Noordhuis <pieter.noordhuis@databricks.com>
This commit is contained in:
Andrew Nester 2024-08-14 11:03:44 +02:00 committed by GitHub
parent 7ae80de351
commit 48ff18e5fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 1103 additions and 778 deletions

View File

@ -1,25 +1,16 @@
package artifacts
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/artifacts/whl"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
)
type mutatorFactory = func(name string) bundle.Mutator
@ -28,8 +19,6 @@ var buildMutators map[config.ArtifactType]mutatorFactory = map[config.ArtifactTy
config.ArtifactPythonWheel: whl.Build,
}
var uploadMutators map[config.ArtifactType]mutatorFactory = map[config.ArtifactType]mutatorFactory{}
var prepareMutators map[config.ArtifactType]mutatorFactory = map[config.ArtifactType]mutatorFactory{
config.ArtifactPythonWheel: whl.Prepare,
}
@ -43,15 +32,6 @@ func getBuildMutator(t config.ArtifactType, name string) bundle.Mutator {
return mutatorFactory(name)
}
func getUploadMutator(t config.ArtifactType, name string) bundle.Mutator {
mutatorFactory, ok := uploadMutators[t]
if !ok {
mutatorFactory = BasicUpload
}
return mutatorFactory(name)
}
func getPrepareMutator(t config.ArtifactType, name string) bundle.Mutator {
mutatorFactory, ok := prepareMutators[t]
if !ok {
@ -92,174 +72,3 @@ func (m *basicBuild) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnosti
return nil
}
// Basic Upload defines a general upload mutator which uploads artifact as a library to workspace
type basicUpload struct {
name string
}
func BasicUpload(name string) bundle.Mutator {
return &basicUpload{name: name}
}
func (m *basicUpload) Name() string {
return fmt.Sprintf("artifacts.Upload(%s)", m.name)
}
func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
artifact, ok := b.Config.Artifacts[m.name]
if !ok {
return diag.Errorf("artifact doesn't exist: %s", m.name)
}
if len(artifact.Files) == 0 {
return diag.Errorf("artifact source is not configured: %s", m.name)
}
uploadPath, err := getUploadBasePath(b)
if err != nil {
return diag.FromErr(err)
}
client, err := getFilerForArtifacts(b.WorkspaceClient(), uploadPath)
if err != nil {
return diag.FromErr(err)
}
err = uploadArtifact(ctx, b, artifact, uploadPath, client)
if err != nil {
return diag.Errorf("upload for %s failed, error: %v", m.name, err)
}
return nil
}
func getFilerForArtifacts(w *databricks.WorkspaceClient, uploadPath string) (filer.Filer, error) {
if isVolumesPath(uploadPath) {
return filer.NewFilesClient(w, uploadPath)
}
return filer.NewWorkspaceFilesClient(w, uploadPath)
}
func isVolumesPath(path string) bool {
return strings.HasPrefix(path, "/Volumes/")
}
func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error {
for i := range a.Files {
f := &a.Files[i]
filename := filepath.Base(f.Source)
cmdio.LogString(ctx, fmt.Sprintf("Uploading %s...", filename))
err := uploadArtifactFile(ctx, f.Source, client)
if err != nil {
return err
}
log.Infof(ctx, "Upload succeeded")
f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source))
remotePath := f.RemotePath
if !strings.HasPrefix(f.RemotePath, "/Workspace/") && !strings.HasPrefix(f.RemotePath, "/Volumes/") {
wsfsBase := "/Workspace"
remotePath = path.Join(wsfsBase, f.RemotePath)
}
for _, job := range b.Config.Resources.Jobs {
rewriteArtifactPath(b, f, job, remotePath)
}
}
return nil
}
func rewriteArtifactPath(b *bundle.Bundle, f *config.ArtifactFile, job *resources.Job, remotePath string) {
// Rewrite artifact path in job task libraries
for i := range job.Tasks {
task := &job.Tasks[i]
for j := range task.Libraries {
lib := &task.Libraries[j]
if lib.Whl != "" && isArtifactMatchLibrary(f, lib.Whl, b) {
lib.Whl = remotePath
}
if lib.Jar != "" && isArtifactMatchLibrary(f, lib.Jar, b) {
lib.Jar = remotePath
}
}
// Rewrite artifact path in job task libraries for ForEachTask
if task.ForEachTask != nil {
forEachTask := task.ForEachTask
for j := range forEachTask.Task.Libraries {
lib := &forEachTask.Task.Libraries[j]
if lib.Whl != "" && isArtifactMatchLibrary(f, lib.Whl, b) {
lib.Whl = remotePath
}
if lib.Jar != "" && isArtifactMatchLibrary(f, lib.Jar, b) {
lib.Jar = remotePath
}
}
}
}
// Rewrite artifact path in job environments
for i := range job.Environments {
env := &job.Environments[i]
if env.Spec == nil {
continue
}
for j := range env.Spec.Dependencies {
lib := env.Spec.Dependencies[j]
if isArtifactMatchLibrary(f, lib, b) {
env.Spec.Dependencies[j] = remotePath
}
}
}
}
func isArtifactMatchLibrary(f *config.ArtifactFile, libPath string, b *bundle.Bundle) bool {
if !filepath.IsAbs(libPath) {
libPath = filepath.Join(b.RootPath, libPath)
}
// libPath can be a glob pattern, so do the match first
matches, err := filepath.Glob(libPath)
if err != nil {
return false
}
for _, m := range matches {
if m == f.Source {
return true
}
}
return false
}
// Function to upload artifact file to Workspace
func uploadArtifactFile(ctx context.Context, file string, client filer.Filer) error {
raw, err := os.ReadFile(file)
if err != nil {
return fmt.Errorf("unable to read %s: %w", file, errors.Unwrap(err))
}
filename := filepath.Base(file)
err = client.Write(ctx, filename, bytes.NewReader(raw), filer.OverwriteIfExists, filer.CreateParentDirectories)
if err != nil {
return fmt.Errorf("unable to import %s: %w", filename, err)
}
return nil
}
func getUploadBasePath(b *bundle.Bundle) (string, error) {
artifactPath := b.Config.Workspace.ArtifactPath
if artifactPath == "" {
return "", fmt.Errorf("remote artifact path not configured")
}
return path.Join(artifactPath, ".internal"), nil
}

View File

@ -1,196 +0,0 @@
package artifacts
import (
"context"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
mockfiler "github.com/databricks/cli/internal/mocks/libs/filer"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestArtifactUploadForWorkspace(t *testing.T) {
tmpDir := t.TempDir()
whlFolder := filepath.Join(tmpDir, "whl")
testutil.Touch(t, whlFolder, "source.whl")
whlLocalPath := filepath.Join(whlFolder, "source.whl")
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/foo/bar/artifacts",
},
Artifacts: config.Artifacts{
"whl": {
Type: config.ArtifactPythonWheel,
Files: []config.ArtifactFile{
{Source: whlLocalPath},
},
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
},
},
},
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
filepath.Join("whl", "source.whl"),
"/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
},
},
},
},
},
},
}
artifact := b.Config.Artifacts["whl"]
mockFiler := mockfiler.NewMockFiler(t)
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source.whl"),
mock.AnythingOfType("*bytes.Reader"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil)
err := uploadArtifact(context.Background(), b, artifact, "/foo/bar/artifacts", mockFiler)
require.NoError(t, err)
// Test that libraries path is updated
require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl)
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl)
require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0])
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1])
require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl)
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl)
}
func TestArtifactUploadForVolumes(t *testing.T) {
tmpDir := t.TempDir()
whlFolder := filepath.Join(tmpDir, "whl")
testutil.Touch(t, whlFolder, "source.whl")
whlLocalPath := filepath.Join(whlFolder, "source.whl")
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volumes/foo/bar/artifacts",
},
Artifacts: config.Artifacts{
"whl": {
Type: config.ArtifactPythonWheel,
Files: []config.ArtifactFile{
{Source: whlLocalPath},
},
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Volumes/some/path/mywheel.whl",
},
},
},
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Volumes/some/path/mywheel.whl",
},
},
},
},
},
},
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
filepath.Join("whl", "source.whl"),
"/Volumes/some/path/mywheel.whl",
},
},
},
},
},
},
},
},
},
}
artifact := b.Config.Artifacts["whl"]
mockFiler := mockfiler.NewMockFiler(t)
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source.whl"),
mock.AnythingOfType("*bytes.Reader"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil)
err := uploadArtifact(context.Background(), b, artifact, "/Volumes/foo/bar/artifacts", mockFiler)
require.NoError(t, err)
// Test that libraries path is updated
require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl)
require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl)
require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0])
require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1])
require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl)
require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl)
}

View File

@ -29,6 +29,5 @@ func (m *autodetect) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnosti
return bundle.Apply(ctx, b, bundle.Seq(
whl.DetectPackage(),
whl.DefineArtifactsFromLibraries(),
))
}

View File

@ -2,50 +2,18 @@ package artifacts
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
)
func UploadAll() bundle.Mutator {
return &all{
name: "Upload",
fn: uploadArtifactByName,
}
}
func CleanUp() bundle.Mutator {
return &cleanUp{}
}
type upload struct {
name string
}
func uploadArtifactByName(name string) (bundle.Mutator, error) {
return &upload{name}, nil
}
func (m *upload) Name() string {
return fmt.Sprintf("artifacts.Upload(%s)", m.name)
}
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
artifact, ok := b.Config.Artifacts[m.name]
if !ok {
return diag.Errorf("artifact doesn't exist: %s", m.name)
}
if len(artifact.Files) == 0 {
return diag.Errorf("artifact source is not configured: %s", m.name)
}
return bundle.Apply(ctx, b, getUploadMutator(artifact.Type, m.name))
}
type cleanUp struct{}
func (m *cleanUp) Name() string {
@ -53,12 +21,12 @@ func (m *cleanUp) Name() string {
}
func (m *cleanUp) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
uploadPath, err := getUploadBasePath(b)
uploadPath, err := libraries.GetUploadBasePath(b)
if err != nil {
return diag.FromErr(err)
}
client, err := getFilerForArtifacts(b.WorkspaceClient(), uploadPath)
client, err := libraries.GetFilerForLibraries(b.WorkspaceClient(), uploadPath)
if err != nil {
return diag.FromErr(err)
}

View File

@ -1,114 +0,0 @@
package artifacts
import (
"context"
"os"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/testfile"
"github.com/stretchr/testify/require"
)
type noop struct{}
func (n *noop) Apply(context.Context, *bundle.Bundle) diag.Diagnostics {
return nil
}
func (n *noop) Name() string {
return "noop"
}
func TestExpandGlobFilesSource(t *testing.T) {
rootPath := t.TempDir()
err := os.Mkdir(filepath.Join(rootPath, "test"), 0755)
require.NoError(t, err)
t1 := testfile.CreateFile(t, filepath.Join(rootPath, "test", "myjar1.jar"))
t1.Close(t)
t2 := testfile.CreateFile(t, filepath.Join(rootPath, "test", "myjar2.jar"))
t2.Close(t)
b := &bundle.Bundle{
RootPath: rootPath,
Config: config.Root{
Artifacts: map[string]*config.Artifact{
"test": {
Type: "custom",
Files: []config.ArtifactFile{
{
Source: filepath.Join("..", "test", "*.jar"),
},
},
},
},
},
}
bundletest.SetLocation(b, ".", filepath.Join(rootPath, "resources", "artifacts.yml"))
u := &upload{"test"}
uploadMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator {
return &noop{}
}
bm := &build{"test"}
buildMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator {
return &noop{}
}
pm := &prepare{"test"}
prepareMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator {
return &noop{}
}
diags := bundle.Apply(context.Background(), b, bundle.Seq(pm, bm, u))
require.NoError(t, diags.Error())
require.Equal(t, 2, len(b.Config.Artifacts["test"].Files))
require.Equal(t, filepath.Join(rootPath, "test", "myjar1.jar"), b.Config.Artifacts["test"].Files[0].Source)
require.Equal(t, filepath.Join(rootPath, "test", "myjar2.jar"), b.Config.Artifacts["test"].Files[1].Source)
}
func TestExpandGlobFilesSourceWithNoMatches(t *testing.T) {
rootPath := t.TempDir()
err := os.Mkdir(filepath.Join(rootPath, "test"), 0755)
require.NoError(t, err)
b := &bundle.Bundle{
RootPath: rootPath,
Config: config.Root{
Artifacts: map[string]*config.Artifact{
"test": {
Type: "custom",
Files: []config.ArtifactFile{
{
Source: filepath.Join("..", "test", "myjar.jar"),
},
},
},
},
},
}
bundletest.SetLocation(b, ".", filepath.Join(rootPath, "resources", "artifacts.yml"))
u := &upload{"test"}
uploadMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator {
return &noop{}
}
bm := &build{"test"}
buildMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator {
return &noop{}
}
diags := bundle.Apply(context.Background(), b, bundle.Seq(bm, u))
require.ErrorContains(t, diags.Error(), "no matching files")
}

View File

@ -1,79 +0,0 @@
package whl
import (
"context"
"path/filepath"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/log"
)
type fromLibraries struct{}
func DefineArtifactsFromLibraries() bundle.Mutator {
return &fromLibraries{}
}
func (m *fromLibraries) Name() string {
return "artifacts.whl.DefineArtifactsFromLibraries"
}
func (*fromLibraries) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
if len(b.Config.Artifacts) != 0 {
log.Debugf(ctx, "Skipping defining artifacts from libraries because artifacts section is explicitly defined")
return nil
}
tasks := libraries.FindTasksWithLocalLibraries(b)
for _, task := range tasks {
// Skip tasks that are not PythonWheelTasks for now, we can later support Jars too
if task.PythonWheelTask == nil {
continue
}
for _, lib := range task.Libraries {
matchAndAdd(ctx, lib.Whl, b)
}
}
envs := libraries.FindAllEnvironments(b)
for _, jobEnvs := range envs {
for _, env := range jobEnvs {
if env.Spec != nil {
for _, dep := range env.Spec.Dependencies {
if libraries.IsEnvironmentDependencyLocal(dep) {
matchAndAdd(ctx, dep, b)
}
}
}
}
}
return nil
}
func matchAndAdd(ctx context.Context, lib string, b *bundle.Bundle) {
matches, err := filepath.Glob(filepath.Join(b.RootPath, lib))
// File referenced from libraries section does not exists, skipping
if err != nil {
return
}
for _, match := range matches {
name := filepath.Base(match)
if b.Config.Artifacts == nil {
b.Config.Artifacts = make(map[string]*config.Artifact)
}
log.Debugf(ctx, "Adding an artifact block for %s", match)
b.Config.Artifacts[name] = &config.Artifact{
Files: []config.ArtifactFile{
{Source: match},
},
Type: config.ArtifactPythonWheel,
}
}
}

View File

@ -78,7 +78,7 @@ func (t *translateContext) jobRewritePatterns() []jobRewritePattern {
),
t.translateNoOpWithPrefix,
func(s string) bool {
return !libraries.IsEnvironmentDependencyLocal(s)
return !libraries.IsLibraryLocal(s)
},
},
}

View File

@ -0,0 +1,221 @@
package libraries
import (
"context"
"fmt"
"path/filepath"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
)
type expand struct {
}
func matchError(p dyn.Path, l []dyn.Location, message string) diag.Diagnostic {
return diag.Diagnostic{
Severity: diag.Error,
Summary: message,
Paths: []dyn.Path{
p.Append(),
},
Locations: l,
}
}
func getLibDetails(v dyn.Value) (string, string, bool) {
m := v.MustMap()
whl, ok := m.GetByString("whl")
if ok {
return whl.MustString(), "whl", true
}
jar, ok := m.GetByString("jar")
if ok {
return jar.MustString(), "jar", true
}
return "", "", false
}
func findMatches(b *bundle.Bundle, path string) ([]string, error) {
matches, err := filepath.Glob(filepath.Join(b.RootPath, path))
if err != nil {
return nil, err
}
if len(matches) == 0 {
if isGlobPattern(path) {
return nil, fmt.Errorf("no files match pattern: %s", path)
} else {
return nil, fmt.Errorf("file doesn't exist %s", path)
}
}
// We make the matched path relative to the root path before storing it
// to allow upload mutator to distinguish between local and remote paths
for i, match := range matches {
matches[i], err = filepath.Rel(b.RootPath, match)
if err != nil {
return nil, err
}
}
return matches, nil
}
// Checks if the path is a glob pattern
// It can contain *, [] or ? characters
func isGlobPattern(path string) bool {
return strings.ContainsAny(path, "*?[")
}
func expandLibraries(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostics, []dyn.Value) {
var output []dyn.Value
var diags diag.Diagnostics
libs := v.MustSequence()
for i, lib := range libs {
lp := p.Append(dyn.Index(i))
path, libType, supported := getLibDetails(lib)
if !supported || !IsLibraryLocal(path) {
output = append(output, lib)
continue
}
lp = lp.Append(dyn.Key(libType))
matches, err := findMatches(b, path)
if err != nil {
diags = diags.Append(matchError(lp, lib.Locations(), err.Error()))
continue
}
for _, match := range matches {
output = append(output, dyn.NewValue(map[string]dyn.Value{
libType: dyn.V(match),
}, lib.Locations()))
}
}
return diags, output
}
func expandEnvironmentDeps(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostics, []dyn.Value) {
var output []dyn.Value
var diags diag.Diagnostics
deps := v.MustSequence()
for i, dep := range deps {
lp := p.Append(dyn.Index(i))
path := dep.MustString()
if !IsLibraryLocal(path) {
output = append(output, dep)
continue
}
matches, err := findMatches(b, path)
if err != nil {
diags = diags.Append(matchError(lp, dep.Locations(), err.Error()))
continue
}
for _, match := range matches {
output = append(output, dyn.NewValue(match, dep.Locations()))
}
}
return diags, output
}
type expandPattern struct {
pattern dyn.Pattern
fn func(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostics, []dyn.Value)
}
var taskLibrariesPattern = dyn.NewPattern(
dyn.Key("resources"),
dyn.Key("jobs"),
dyn.AnyKey(),
dyn.Key("tasks"),
dyn.AnyIndex(),
dyn.Key("libraries"),
)
var forEachTaskLibrariesPattern = dyn.NewPattern(
dyn.Key("resources"),
dyn.Key("jobs"),
dyn.AnyKey(),
dyn.Key("tasks"),
dyn.AnyIndex(),
dyn.Key("for_each_task"),
dyn.Key("task"),
dyn.Key("libraries"),
)
var envDepsPattern = dyn.NewPattern(
dyn.Key("resources"),
dyn.Key("jobs"),
dyn.AnyKey(),
dyn.Key("environments"),
dyn.AnyIndex(),
dyn.Key("spec"),
dyn.Key("dependencies"),
)
func (e *expand) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
expanders := []expandPattern{
{
pattern: taskLibrariesPattern,
fn: expandLibraries,
},
{
pattern: forEachTaskLibrariesPattern,
fn: expandLibraries,
},
{
pattern: envDepsPattern,
fn: expandEnvironmentDeps,
},
}
var diags diag.Diagnostics
err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
var err error
for _, expander := range expanders {
v, err = dyn.MapByPattern(v, expander.pattern, func(p dyn.Path, lv dyn.Value) (dyn.Value, error) {
d, output := expander.fn(b, p, lv)
diags = diags.Extend(d)
return dyn.V(output), nil
})
if err != nil {
return dyn.InvalidValue, err
}
}
return v, nil
})
if err != nil {
diags = diags.Extend(diag.FromErr(err))
}
return diags
}
func (e *expand) Name() string {
return "libraries.ExpandGlobReferences"
}
// ExpandGlobReferences expands any glob references in the libraries or environments section
// to corresponding local paths.
// We only expand local paths (i.e. paths that are relative to the root path).
// After expanding we make the paths relative to the root path to allow upload mutator later in the chain to
// distinguish between local and remote paths.
func ExpandGlobReferences() bundle.Mutator {
return &expand{}
}

View File

@ -0,0 +1,239 @@
package libraries
import (
"context"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)
func TestGlobReferencesExpandedForTaskLibraries(t *testing.T) {
dir := t.TempDir()
testutil.Touch(t, dir, "whl", "my1.whl")
testutil.Touch(t, dir, "whl", "my2.whl")
testutil.Touch(t, dir, "jar", "my1.jar")
testutil.Touch(t, dir, "jar", "my2.jar")
b := &bundle.Bundle{
RootPath: dir,
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
TaskKey: "task",
Libraries: []compute.Library{
{
Whl: "whl/*.whl",
},
{
Whl: "/Workspace/path/to/whl/my.whl",
},
{
Jar: "./jar/*.jar",
},
{
Egg: "egg/*.egg",
},
{
Jar: "/Workspace/path/to/jar/*.jar",
},
{
Whl: "/some/full/path/to/whl/*.whl",
},
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, ".", filepath.Join(dir, "resource.yml"))
diags := bundle.Apply(context.Background(), b, ExpandGlobReferences())
require.Empty(t, diags)
job := b.Config.Resources.Jobs["job"]
task := job.JobSettings.Tasks[0]
require.Equal(t, []compute.Library{
{
Whl: filepath.Join("whl", "my1.whl"),
},
{
Whl: filepath.Join("whl", "my2.whl"),
},
{
Whl: "/Workspace/path/to/whl/my.whl",
},
{
Jar: filepath.Join("jar", "my1.jar"),
},
{
Jar: filepath.Join("jar", "my2.jar"),
},
{
Egg: "egg/*.egg",
},
{
Jar: "/Workspace/path/to/jar/*.jar",
},
{
Whl: "/some/full/path/to/whl/*.whl",
},
}, task.Libraries)
}
func TestGlobReferencesExpandedForForeachTaskLibraries(t *testing.T) {
dir := t.TempDir()
testutil.Touch(t, dir, "whl", "my1.whl")
testutil.Touch(t, dir, "whl", "my2.whl")
testutil.Touch(t, dir, "jar", "my1.jar")
testutil.Touch(t, dir, "jar", "my2.jar")
b := &bundle.Bundle{
RootPath: dir,
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
TaskKey: "task",
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
Libraries: []compute.Library{
{
Whl: "whl/*.whl",
},
{
Whl: "/Workspace/path/to/whl/my.whl",
},
{
Jar: "./jar/*.jar",
},
{
Egg: "egg/*.egg",
},
{
Jar: "/Workspace/path/to/jar/*.jar",
},
{
Whl: "/some/full/path/to/whl/*.whl",
},
},
},
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, ".", filepath.Join(dir, "resource.yml"))
diags := bundle.Apply(context.Background(), b, ExpandGlobReferences())
require.Empty(t, diags)
job := b.Config.Resources.Jobs["job"]
task := job.JobSettings.Tasks[0].ForEachTask.Task
require.Equal(t, []compute.Library{
{
Whl: filepath.Join("whl", "my1.whl"),
},
{
Whl: filepath.Join("whl", "my2.whl"),
},
{
Whl: "/Workspace/path/to/whl/my.whl",
},
{
Jar: filepath.Join("jar", "my1.jar"),
},
{
Jar: filepath.Join("jar", "my2.jar"),
},
{
Egg: "egg/*.egg",
},
{
Jar: "/Workspace/path/to/jar/*.jar",
},
{
Whl: "/some/full/path/to/whl/*.whl",
},
}, task.Libraries)
}
func TestGlobReferencesExpandedForEnvironmentsDeps(t *testing.T) {
dir := t.TempDir()
testutil.Touch(t, dir, "whl", "my1.whl")
testutil.Touch(t, dir, "whl", "my2.whl")
testutil.Touch(t, dir, "jar", "my1.jar")
testutil.Touch(t, dir, "jar", "my2.jar")
b := &bundle.Bundle{
RootPath: dir,
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
TaskKey: "task",
EnvironmentKey: "env",
},
},
Environments: []jobs.JobEnvironment{
{
EnvironmentKey: "env",
Spec: &compute.Environment{
Dependencies: []string{
"./whl/*.whl",
"/Workspace/path/to/whl/my.whl",
"./jar/*.jar",
"/some/local/path/to/whl/*.whl",
},
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, ".", filepath.Join(dir, "resource.yml"))
diags := bundle.Apply(context.Background(), b, ExpandGlobReferences())
require.Empty(t, diags)
job := b.Config.Resources.Jobs["job"]
env := job.JobSettings.Environments[0]
require.Equal(t, []string{
filepath.Join("whl", "my1.whl"),
filepath.Join("whl", "my2.whl"),
"/Workspace/path/to/whl/my.whl",
filepath.Join("jar", "my1.jar"),
filepath.Join("jar", "my2.jar"),
"/some/local/path/to/whl/*.whl",
}, env.Spec.Dependencies)
}

View File

@ -35,7 +35,7 @@ func isEnvsWithLocalLibraries(envs []jobs.JobEnvironment) bool {
}
for _, l := range e.Spec.Dependencies {
if IsEnvironmentDependencyLocal(l) {
if IsLibraryLocal(l) {
return true
}
}
@ -67,7 +67,7 @@ func FindTasksWithLocalLibraries(b *bundle.Bundle) []jobs.Task {
func isTaskWithLocalLibraries(task jobs.Task) bool {
for _, l := range task.Libraries {
if IsLocalLibrary(&l) {
if IsLibraryLocal(libraryPath(&l)) {
return true
}
}

View File

@ -4,8 +4,6 @@ import (
"net/url"
"path"
"strings"
"github.com/databricks/databricks-sdk-go/service/compute"
)
// IsLocalPath returns true if the specified path indicates that
@ -38,12 +36,12 @@ func IsLocalPath(p string) bool {
return !path.IsAbs(p)
}
// IsEnvironmentDependencyLocal returns true if the specified dependency
// IsLibraryLocal returns true if the specified library or environment dependency
// should be interpreted as a local path.
// We use this to check if the dependency in environment spec is local.
// We use this to check if the dependency in environment spec is local or that library is local.
// We can't use IsLocalPath beacuse environment dependencies can be
// a pypi package name which can be misinterpreted as a local path by IsLocalPath.
func IsEnvironmentDependencyLocal(dep string) bool {
func IsLibraryLocal(dep string) bool {
possiblePrefixes := []string{
".",
}
@ -54,7 +52,22 @@ func IsEnvironmentDependencyLocal(dep string) bool {
}
}
return false
// If the dependency is a requirements file, it's not a valid local path
if strings.HasPrefix(dep, "-r") {
return false
}
// If the dependency has no extension, it's a PyPi package name
if isPackage(dep) {
return false
}
return IsLocalPath(dep)
}
func isPackage(name string) bool {
// If the dependency has no extension, it's a PyPi package name
return path.Ext(name) == ""
}
func isRemoteStorageScheme(path string) bool {
@ -67,16 +80,6 @@ func isRemoteStorageScheme(path string) bool {
return false
}
// If the path starts with scheme:/ format, it's a correct remote storage scheme
return strings.HasPrefix(path, url.Scheme+":/")
}
// IsLocalLibrary returns true if the specified library refers to a local path.
func IsLocalLibrary(library *compute.Library) bool {
path := libraryPath(library)
if path == "" {
return false
}
return IsLocalPath(path)
// If the path starts with scheme:/ format (not file), it's a correct remote storage scheme
return strings.HasPrefix(path, url.Scheme+":/") && url.Scheme != "file"
}

View File

@ -3,13 +3,13 @@ package libraries
import (
"testing"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestIsLocalPath(t *testing.T) {
// Relative paths, paths with the file scheme, and Windows paths.
assert.True(t, IsLocalPath("some/local/path"))
assert.True(t, IsLocalPath("./some/local/path"))
assert.True(t, IsLocalPath("file://path/to/package"))
assert.True(t, IsLocalPath("C:\\path\\to\\package"))
@ -30,24 +30,13 @@ func TestIsLocalPath(t *testing.T) {
assert.False(t, IsLocalPath("abfss://path/to/package"))
}
func TestIsLocalLibrary(t *testing.T) {
// Local paths.
assert.True(t, IsLocalLibrary(&compute.Library{Whl: "./file.whl"}))
assert.True(t, IsLocalLibrary(&compute.Library{Jar: "../target/some.jar"}))
// Non-local paths.
assert.False(t, IsLocalLibrary(&compute.Library{Whl: "/Workspace/path/to/file.whl"}))
assert.False(t, IsLocalLibrary(&compute.Library{Jar: "s3:/bucket/path/some.jar"}))
// Empty.
assert.False(t, IsLocalLibrary(&compute.Library{}))
}
func TestIsEnvironmentDependencyLocal(t *testing.T) {
func TestIsLibraryLocal(t *testing.T) {
testCases := [](struct {
path string
expected bool
}){
{path: "local/*.whl", expected: true},
{path: "local/test.whl", expected: true},
{path: "./local/*.whl", expected: true},
{path: ".\\local\\*.whl", expected: true},
{path: "./local/mypath.whl", expected: true},
@ -58,15 +47,16 @@ func TestIsEnvironmentDependencyLocal(t *testing.T) {
{path: ".\\..\\local\\*.whl", expected: true},
{path: "../../local/*.whl", expected: true},
{path: "..\\..\\local\\*.whl", expected: true},
{path: "file://path/to/package/whl.whl", expected: true},
{path: "pypipackage", expected: false},
{path: "pypipackage/test.whl", expected: false},
{path: "pypipackage/*.whl", expected: false},
{path: "/Volumes/catalog/schema/volume/path.whl", expected: false},
{path: "/Workspace/my_project/dist.whl", expected: false},
{path: "-r /Workspace/my_project/requirements.txt", expected: false},
{path: "s3://mybucket/path/to/package", expected: false},
{path: "dbfs:/mnt/path/to/package", expected: false},
}
for _, tc := range testCases {
require.Equal(t, IsEnvironmentDependencyLocal(tc.path), tc.expected)
for i, tc := range testCases {
require.Equalf(t, tc.expected, IsLibraryLocal(tc.path), "failed case: %d, path: %s", i, tc.path)
}
}

View File

@ -1,82 +0,0 @@
package libraries
import (
"context"
"fmt"
"path/filepath"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
type match struct {
}
func ValidateLocalLibrariesExist() bundle.Mutator {
return &match{}
}
func (a *match) Name() string {
return "libraries.ValidateLocalLibrariesExist"
}
func (a *match) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
for _, job := range b.Config.Resources.Jobs {
err := validateEnvironments(job.Environments, b)
if err != nil {
return diag.FromErr(err)
}
for _, task := range job.JobSettings.Tasks {
err := validateTaskLibraries(task.Libraries, b)
if err != nil {
return diag.FromErr(err)
}
}
}
return nil
}
func validateTaskLibraries(libs []compute.Library, b *bundle.Bundle) error {
for _, lib := range libs {
path := libraryPath(&lib)
if path == "" || !IsLocalPath(path) {
continue
}
matches, err := filepath.Glob(filepath.Join(b.RootPath, path))
if err != nil {
return err
}
if len(matches) == 0 {
return fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libraryPath(&lib))
}
}
return nil
}
func validateEnvironments(envs []jobs.JobEnvironment, b *bundle.Bundle) error {
for _, env := range envs {
if env.Spec == nil {
continue
}
for _, dep := range env.Spec.Dependencies {
matches, err := filepath.Glob(filepath.Join(b.RootPath, dep))
if err != nil {
return err
}
if len(matches) == 0 && IsEnvironmentDependencyLocal(dep) {
return fmt.Errorf("file %s is referenced in environments section but doesn't exist on the local file system", dep)
}
}
}
return nil
}

View File

@ -42,7 +42,7 @@ func TestValidateEnvironments(t *testing.T) {
},
}
diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist())
diags := bundle.Apply(context.Background(), b, ExpandGlobReferences())
require.Nil(t, diags)
}
@ -74,9 +74,9 @@ func TestValidateEnvironmentsNoFile(t *testing.T) {
},
}
diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist())
diags := bundle.Apply(context.Background(), b, ExpandGlobReferences())
require.Len(t, diags, 1)
require.Equal(t, "file ./wheel.whl is referenced in environments section but doesn't exist on the local file system", diags[0].Summary)
require.Equal(t, "file doesn't exist ./wheel.whl", diags[0].Summary)
}
func TestValidateTaskLibraries(t *testing.T) {
@ -109,7 +109,7 @@ func TestValidateTaskLibraries(t *testing.T) {
},
}
diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist())
diags := bundle.Apply(context.Background(), b, ExpandGlobReferences())
require.Nil(t, diags)
}
@ -142,7 +142,7 @@ func TestValidateTaskLibrariesNoFile(t *testing.T) {
},
}
diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist())
diags := bundle.Apply(context.Background(), b, ExpandGlobReferences())
require.Len(t, diags, 1)
require.Equal(t, "file ./wheel.whl is referenced in libraries section but doesn't exist on the local file system", diags[0].Summary)
require.Equal(t, "file doesn't exist ./wheel.whl", diags[0].Summary)
}

238
bundle/libraries/upload.go Normal file
View File

@ -0,0 +1,238 @@
package libraries
import (
"context"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"golang.org/x/sync/errgroup"
)
// The Files API backend has a rate limit of 10 concurrent
// requests and 100 QPS. We limit the number of concurrent requests to 5 to
// avoid hitting the rate limit.
var maxFilesRequestsInFlight = 5
func Upload() bundle.Mutator {
return &upload{}
}
func UploadWithClient(client filer.Filer) bundle.Mutator {
return &upload{
client: client,
}
}
type upload struct {
client filer.Filer
}
type configLocation struct {
configPath dyn.Path
location dyn.Location
}
// Collect all libraries from the bundle configuration and their config paths.
// By this stage all glob references are expanded and we have a list of all libraries that need to be uploaded.
// We collect them from task libraries, foreach task libraries, environment dependencies, and artifacts.
// We return a map of library source to a list of config paths and locations where the library is used.
// We use map so we don't upload the same library multiple times.
// Instead we upload it once and update all the config paths to point to the uploaded location.
func collectLocalLibraries(b *bundle.Bundle) (map[string][]configLocation, error) {
libs := make(map[string]([]configLocation))
patterns := []dyn.Pattern{
taskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("whl")),
taskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("jar")),
forEachTaskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("whl")),
forEachTaskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("jar")),
envDepsPattern.Append(dyn.AnyIndex()),
}
for _, pattern := range patterns {
err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
return dyn.MapByPattern(v, pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
source, ok := v.AsString()
if !ok {
return v, fmt.Errorf("expected string, got %s", v.Kind())
}
if !IsLibraryLocal(source) {
return v, nil
}
source = filepath.Join(b.RootPath, source)
libs[source] = append(libs[source], configLocation{
configPath: p.Append(), // Hack to get the copy of path
location: v.Location(),
})
return v, nil
})
})
if err != nil {
return nil, err
}
}
artifactPattern := dyn.NewPattern(
dyn.Key("artifacts"),
dyn.AnyKey(),
dyn.Key("files"),
dyn.AnyIndex(),
)
err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
return dyn.MapByPattern(v, artifactPattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
file, ok := v.AsMap()
if !ok {
return v, fmt.Errorf("expected map, got %s", v.Kind())
}
sv, ok := file.GetByString("source")
if !ok {
return v, nil
}
source, ok := sv.AsString()
if !ok {
return v, fmt.Errorf("expected string, got %s", v.Kind())
}
libs[source] = append(libs[source], configLocation{
configPath: p.Append(dyn.Key("remote_path")),
location: v.Location(),
})
return v, nil
})
})
if err != nil {
return nil, err
}
return libs, nil
}
func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
uploadPath, err := GetUploadBasePath(b)
if err != nil {
return diag.FromErr(err)
}
// If the client is not initialized, initialize it
// We use client field in mutator to allow for mocking client in testing
if u.client == nil {
filer, err := GetFilerForLibraries(b.WorkspaceClient(), uploadPath)
if err != nil {
return diag.FromErr(err)
}
u.client = filer
}
var diags diag.Diagnostics
libs, err := collectLocalLibraries(b)
if err != nil {
return diag.FromErr(err)
}
errs, errCtx := errgroup.WithContext(ctx)
errs.SetLimit(maxFilesRequestsInFlight)
for source := range libs {
errs.Go(func() error {
return UploadFile(errCtx, source, u.client)
})
}
if err := errs.Wait(); err != nil {
return diag.FromErr(err)
}
// Update all the config paths to point to the uploaded location
for source, locations := range libs {
err = b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
remotePath := path.Join(uploadPath, filepath.Base(source))
// If the remote path does not start with /Workspace or /Volumes, prepend /Workspace
if !strings.HasPrefix(remotePath, "/Workspace") && !strings.HasPrefix(remotePath, "/Volumes") {
remotePath = "/Workspace" + remotePath
}
for _, location := range locations {
v, err = dyn.SetByPath(v, location.configPath, dyn.NewValue(remotePath, []dyn.Location{location.location}))
if err != nil {
return v, err
}
}
return v, nil
})
if err != nil {
diags = diags.Extend(diag.FromErr(err))
}
}
return diags
}
func (u *upload) Name() string {
return "libraries.Upload"
}
func GetFilerForLibraries(w *databricks.WorkspaceClient, uploadPath string) (filer.Filer, error) {
if isVolumesPath(uploadPath) {
return filer.NewFilesClient(w, uploadPath)
}
return filer.NewWorkspaceFilesClient(w, uploadPath)
}
func isVolumesPath(path string) bool {
return strings.HasPrefix(path, "/Volumes/")
}
// Function to upload file (a library, artifact and etc) to Workspace or UC volume
func UploadFile(ctx context.Context, file string, client filer.Filer) error {
filename := filepath.Base(file)
cmdio.LogString(ctx, fmt.Sprintf("Uploading %s...", filename))
f, err := os.Open(file)
if err != nil {
return fmt.Errorf("unable to open %s: %w", file, errors.Unwrap(err))
}
defer f.Close()
err = client.Write(ctx, filename, f, filer.OverwriteIfExists, filer.CreateParentDirectories)
if err != nil {
return fmt.Errorf("unable to import %s: %w", filename, err)
}
log.Infof(ctx, "Upload succeeded")
return nil
}
func GetUploadBasePath(b *bundle.Bundle) (string, error) {
artifactPath := b.Config.Workspace.ArtifactPath
if artifactPath == "" {
return "", fmt.Errorf("remote artifact path not configured")
}
return path.Join(artifactPath, ".internal"), nil
}

View File

@ -0,0 +1,331 @@
package libraries
import (
"context"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
mockfiler "github.com/databricks/cli/internal/mocks/libs/filer"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestArtifactUploadForWorkspace(t *testing.T) {
tmpDir := t.TempDir()
whlFolder := filepath.Join(tmpDir, "whl")
testutil.Touch(t, whlFolder, "source.whl")
whlLocalPath := filepath.Join(whlFolder, "source.whl")
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/foo/bar/artifacts",
},
Artifacts: config.Artifacts{
"whl": {
Type: config.ArtifactPythonWheel,
Files: []config.ArtifactFile{
{Source: whlLocalPath},
},
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
},
},
},
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
filepath.Join("whl", "source.whl"),
"/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
},
},
},
},
},
},
}
mockFiler := mockfiler.NewMockFiler(t)
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source.whl"),
mock.AnythingOfType("*os.File"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil)
diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler)))
require.NoError(t, diags.Error())
// Test that libraries path is updated
require.Equal(t, "/Workspace/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl)
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl)
require.Equal(t, "/Workspace/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0])
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1])
require.Equal(t, "/Workspace/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl)
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl)
}
func TestArtifactUploadForVolumes(t *testing.T) {
tmpDir := t.TempDir()
whlFolder := filepath.Join(tmpDir, "whl")
testutil.Touch(t, whlFolder, "source.whl")
whlLocalPath := filepath.Join(whlFolder, "source.whl")
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volumes/foo/bar/artifacts",
},
Artifacts: config.Artifacts{
"whl": {
Type: config.ArtifactPythonWheel,
Files: []config.ArtifactFile{
{Source: whlLocalPath},
},
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Volumes/some/path/mywheel.whl",
},
},
},
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Volumes/some/path/mywheel.whl",
},
},
},
},
},
},
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
filepath.Join("whl", "source.whl"),
"/Volumes/some/path/mywheel.whl",
},
},
},
},
},
},
},
},
},
}
mockFiler := mockfiler.NewMockFiler(t)
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source.whl"),
mock.AnythingOfType("*os.File"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil)
diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler)))
require.NoError(t, diags.Error())
// Test that libraries path is updated
require.Equal(t, "/Volumes/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl)
require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl)
require.Equal(t, "/Volumes/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0])
require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1])
require.Equal(t, "/Volumes/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl)
require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl)
}
func TestArtifactUploadWithNoLibraryReference(t *testing.T) {
tmpDir := t.TempDir()
whlFolder := filepath.Join(tmpDir, "whl")
testutil.Touch(t, whlFolder, "source.whl")
whlLocalPath := filepath.Join(whlFolder, "source.whl")
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Workspace/foo/bar/artifacts",
},
Artifacts: config.Artifacts{
"whl": {
Type: config.ArtifactPythonWheel,
Files: []config.ArtifactFile{
{Source: whlLocalPath},
},
},
},
},
}
mockFiler := mockfiler.NewMockFiler(t)
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source.whl"),
mock.AnythingOfType("*os.File"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil)
diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler)))
require.NoError(t, diags.Error())
require.Equal(t, "/Workspace/foo/bar/artifacts/.internal/source.whl", b.Config.Artifacts["whl"].Files[0].RemotePath)
}
func TestUploadMultipleLibraries(t *testing.T) {
tmpDir := t.TempDir()
whlFolder := filepath.Join(tmpDir, "whl")
testutil.Touch(t, whlFolder, "source1.whl")
testutil.Touch(t, whlFolder, "source2.whl")
testutil.Touch(t, whlFolder, "source3.whl")
testutil.Touch(t, whlFolder, "source4.whl")
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/foo/bar/artifacts",
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
},
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
filepath.Join("whl", "*.whl"),
"/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
},
},
},
},
},
},
}
mockFiler := mockfiler.NewMockFiler(t)
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source1.whl"),
mock.AnythingOfType("*os.File"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil).Once()
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source2.whl"),
mock.AnythingOfType("*os.File"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil).Once()
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source3.whl"),
mock.AnythingOfType("*os.File"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil).Once()
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source4.whl"),
mock.AnythingOfType("*os.File"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil).Once()
diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler)))
require.NoError(t, diags.Error())
// Test that libraries path is updated
require.Len(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, 5)
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, compute.Library{Whl: "/Workspace/foo/bar/artifacts/.internal/source1.whl"})
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, compute.Library{Whl: "/Workspace/foo/bar/artifacts/.internal/source2.whl"})
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, compute.Library{Whl: "/Workspace/foo/bar/artifacts/.internal/source3.whl"})
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, compute.Library{Whl: "/Workspace/foo/bar/artifacts/.internal/source4.whl"})
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, compute.Library{Whl: "/Workspace/Users/foo@bar.com/mywheel.whl"})
require.Len(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, 5)
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source1.whl")
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source2.whl")
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source3.whl")
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source4.whl")
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/Users/foo@bar.com/mywheel.whl")
}

View File

@ -113,9 +113,9 @@ func Deploy() bundle.Mutator {
terraform.StatePull(),
deploy.StatePull(),
mutator.ValidateGitDetails(),
libraries.ValidateLocalLibrariesExist(),
artifacts.CleanUp(),
artifacts.UploadAll(),
libraries.ExpandGlobReferences(),
libraries.Upload(),
python.TransformWheelTask(),
files.Upload(),
deploy.StateUpdate(),

View File

@ -18,6 +18,6 @@ func TestEnvironmentKeyProvidedAndNoPanic(t *testing.T) {
b, diags := loadTargetWithDiags("./environment_key_only", "default")
require.Empty(t, diags)
diags = bundle.Apply(context.Background(), b, libraries.ValidateLocalLibrariesExist())
diags = bundle.Apply(context.Background(), b, libraries.ExpandGlobReferences())
require.Empty(t, diags)
}

View File

@ -13,10 +13,3 @@ resources:
entry_point: "run"
libraries:
- whl: ./package/*.whl
- task_key: TestTask2
existing_cluster_id: "0717-aaaaa-bbbbbb"
python_wheel_task:
package_name: "my_test_code"
entry_point: "run"
libraries:
- whl: ./non-existing/*.whl

View File

@ -8,6 +8,9 @@ import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/bundle/phases"
mockfiler "github.com/databricks/cli/internal/mocks/libs/filer"
"github.com/databricks/cli/libs/filer"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
@ -23,7 +26,7 @@ func TestPythonWheelBuild(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(matches))
match := libraries.ValidateLocalLibrariesExist()
match := libraries.ExpandGlobReferences()
diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error())
}
@ -40,7 +43,7 @@ func TestPythonWheelBuildAutoDetect(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(matches))
match := libraries.ValidateLocalLibrariesExist()
match := libraries.ExpandGlobReferences()
diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error())
}
@ -57,7 +60,7 @@ func TestPythonWheelBuildAutoDetectWithNotebookTask(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(matches))
match := libraries.ValidateLocalLibrariesExist()
match := libraries.ExpandGlobReferences()
diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error())
}
@ -70,7 +73,7 @@ func TestPythonWheelWithDBFSLib(t *testing.T) {
diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build()))
require.NoError(t, diags.Error())
match := libraries.ValidateLocalLibrariesExist()
match := libraries.ExpandGlobReferences()
diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error())
}
@ -80,21 +83,23 @@ func TestPythonWheelBuildNoBuildJustUpload(t *testing.T) {
b, err := bundle.Load(ctx, "./python_wheel/python_wheel_no_artifact_no_setup")
require.NoError(t, err)
diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build()))
b.Config.Workspace.ArtifactPath = "/foo/bar"
mockFiler := mockfiler.NewMockFiler(t)
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("my_test_code-0.0.1-py3-none-any.whl"),
mock.AnythingOfType("*os.File"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil)
u := libraries.UploadWithClient(mockFiler)
diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build(), libraries.ExpandGlobReferences(), u))
require.NoError(t, diags.Error())
require.Empty(t, diags)
match := libraries.ValidateLocalLibrariesExist()
diags = bundle.Apply(ctx, b, match)
require.ErrorContains(t, diags.Error(), "./non-existing/*.whl")
require.NotZero(t, len(b.Config.Artifacts))
artifact := b.Config.Artifacts["my_test_code-0.0.1-py3-none-any.whl"]
require.NotNil(t, artifact)
require.Empty(t, artifact.BuildCommand)
require.Contains(t, artifact.Files[0].Source, filepath.Join(b.RootPath, "package",
"my_test_code-0.0.1-py3-none-any.whl",
))
require.Equal(t, "/Workspace/foo/bar/.internal/my_test_code-0.0.1-py3-none-any.whl", b.Config.Resources.Jobs["test_job"].JobSettings.Tasks[0].Libraries[0].Whl)
}
func TestPythonWheelBuildWithEnvironmentKey(t *testing.T) {
@ -109,7 +114,7 @@ func TestPythonWheelBuildWithEnvironmentKey(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(matches))
match := libraries.ValidateLocalLibrariesExist()
match := libraries.ExpandGlobReferences()
diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error())
}
@ -126,7 +131,7 @@ func TestPythonWheelBuildMultiple(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, len(matches))
match := libraries.ValidateLocalLibrariesExist()
match := libraries.ExpandGlobReferences()
diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error())
}
@ -139,7 +144,7 @@ func TestPythonWheelNoBuild(t *testing.T) {
diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build()))
require.NoError(t, diags.Error())
match := libraries.ValidateLocalLibrariesExist()
match := libraries.ExpandGlobReferences()
diags = bundle.Apply(ctx, b, match)
require.NoError(t, diags.Error())
}

View File

@ -8,9 +8,9 @@ import (
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/artifacts"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/internal/acc"
"github.com/databricks/databricks-sdk-go/service/compute"
@ -74,7 +74,7 @@ func TestAccUploadArtifactFileToCorrectRemotePath(t *testing.T) {
},
}
diags := bundle.Apply(ctx, b, artifacts.BasicUpload("test"))
diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload()))
require.NoError(t, diags.Error())
// The remote path attribute on the artifact file should have been set.
@ -138,7 +138,7 @@ func TestAccUploadArtifactFileToCorrectRemotePathWithEnvironments(t *testing.T)
},
}
diags := bundle.Apply(ctx, b, artifacts.BasicUpload("test"))
diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload()))
require.NoError(t, diags.Error())
// The remote path attribute on the artifact file should have been set.
@ -207,7 +207,7 @@ func TestAccUploadArtifactFileToCorrectRemotePathForVolumes(t *testing.T) {
},
}
diags := bundle.Apply(ctx, b, artifacts.BasicUpload("test"))
diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload()))
require.NoError(t, diags.Error())
// The remote path attribute on the artifact file should have been set.