Refactor library to artifact matching to not use pointers (#1172)

## Changes

The approach to do this was:
1. Iterate over all libraries in all job tasks
2. Find references to local libraries
3. Store pointer to `compute.Library` in the matching artifact file to
signal it should be uploaded

This breaks down when introducing #1098 because we can no longer track
unexported state across mutators. The approach in this PR performs the
path matching twice; once in the matching mutator where we check if each
referenced file has an artifacts section, and once during artifact
upload to rewrite the library path from a local file reference to an
absolute Databricks path.

## Tests

Integration tests pass.
This commit is contained in:
Pieter Noordhuis 2024-02-05 16:29:45 +01:00 committed by GitHub
parent cb3ad737f1
commit 33c446dadd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 431 additions and 191 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/artifacts/whl"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
@ -106,7 +107,7 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) error {
return err
}
err = uploadArtifact(ctx, artifact, uploadPath, client)
err = uploadArtifact(ctx, b, artifact, uploadPath, client)
if err != nil {
return fmt.Errorf("upload for %s failed, error: %w", m.name, err)
}
@ -114,10 +115,19 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) error {
return nil
}
func uploadArtifact(ctx context.Context, a *config.Artifact, uploadPath string, client filer.Filer) error {
func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error {
filesToLibraries := libraries.MapFilesToTaskLibraries(ctx, b)
for i := range a.Files {
f := &a.Files[i]
if f.NeedsUpload() {
// Lookup all tasks that reference this file.
libs, ok := filesToLibraries[f.Source]
if !ok {
log.Debugf(ctx, "No tasks reference %s. Skipping upload.", f.Source)
continue
}
filename := filepath.Base(f.Source)
cmdio.LogString(ctx, fmt.Sprintf("Uploading %s...", filename))
@ -125,12 +135,25 @@ func uploadArtifact(ctx context.Context, a *config.Artifact, uploadPath string,
if err != nil {
return err
}
log.Infof(ctx, "Upload succeeded")
f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source))
// Update all tasks that reference this file.
for _, lib := range libs {
wsfsBase := "/Workspace"
remotePath := path.Join(wsfsBase, f.RemotePath)
if lib.Whl != "" {
lib.Whl = remotePath
continue
}
if lib.Jar != "" {
lib.Jar = remotePath
continue
}
}
}
a.NormalisePaths()
return nil
}

View File

@ -3,11 +3,9 @@ package config
import (
"context"
"fmt"
"path"
"github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/exec"
"github.com/databricks/databricks-sdk-go/service/compute"
)
type Artifacts map[string]*Artifact
@ -24,8 +22,7 @@ const ArtifactPythonWheel ArtifactType = `whl`
type ArtifactFile struct {
Source string `json:"source"`
RemotePath string `json:"-" bundle:"readonly"`
Libraries []*compute.Library `json:"-" bundle:"readonly"`
RemotePath string `json:"remote_path" bundle:"readonly"`
}
// Artifact defines a single local code artifact that can be
@ -65,36 +62,3 @@ func (a *Artifact) Build(ctx context.Context) ([]byte, error) {
}
return e.Exec(ctx, a.BuildCommand)
}
func (a *Artifact) NormalisePaths() {
for _, f := range a.Files {
// If no libraries attached, nothing to normalise, skipping
if f.Libraries == nil {
continue
}
wsfsBase := "/Workspace"
remotePath := path.Join(wsfsBase, f.RemotePath)
for i := range f.Libraries {
lib := f.Libraries[i]
if lib.Whl != "" {
lib.Whl = remotePath
continue
}
if lib.Jar != "" {
lib.Jar = remotePath
continue
}
}
}
}
// This function determines if artifact files needs to be uploaded.
// During the bundle processing we analyse which library uses which artifact file.
// If artifact file is used as a library, we store the reference to this library in artifact file Libraries field.
// If artifact file has libraries it's been used in, it means than we need to upload this file.
// Otherwise this artifact file is not used and we skip uploading
func (af *ArtifactFile) NeedsUpload() bool {
return af.Libraries != nil
}

View File

@ -0,0 +1,16 @@
package libraries
import "github.com/databricks/databricks-sdk-go/service/compute"
func libraryPath(library *compute.Library) string {
if library.Whl != "" {
return library.Whl
}
if library.Jar != "" {
return library.Jar
}
if library.Egg != "" {
return library.Egg
}
return ""
}

View File

@ -0,0 +1,17 @@
package libraries
import (
"testing"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/assert"
)
func TestLibraryPath(t *testing.T) {
path := "/some/path"
assert.Equal(t, path, libraryPath(&compute.Library{Whl: path}))
assert.Equal(t, path, libraryPath(&compute.Library{Jar: path}))
assert.Equal(t, path, libraryPath(&compute.Library{Egg: path}))
assert.Equal(t, "", libraryPath(&compute.Library{}))
}

View File

@ -3,46 +3,16 @@ package libraries
import (
"context"
"fmt"
"net/url"
"path"
"path/filepath"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
type match struct {
}
func MatchWithArtifacts() bundle.Mutator {
return &match{}
}
func (a *match) Name() string {
return "libraries.MatchWithArtifacts"
}
func (a *match) Apply(ctx context.Context, b *bundle.Bundle) error {
tasks := findAllTasks(b)
for _, task := range tasks {
if isMissingRequiredLibraries(task) {
return fmt.Errorf("task '%s' is missing required libraries. Please include your package code in task libraries block", task.TaskKey)
}
for j := range task.Libraries {
lib := &task.Libraries[j]
err := findArtifactsAndMarkForUpload(ctx, lib, b)
if err != nil {
return err
}
}
}
return nil
}
func findAllTasks(b *bundle.Bundle) []*jobs.Task {
r := b.Config.Resources
result := make([]*jobs.Task, 0)
@ -71,7 +41,7 @@ func FindAllWheelTasksWithLocalLibraries(b *bundle.Bundle) []*jobs.Task {
func IsTaskWithLocalLibraries(task *jobs.Task) bool {
for _, l := range task.Libraries {
if isLocalLibrary(&l) {
if IsLocalLibrary(&l) {
return true
}
}
@ -81,8 +51,7 @@ func IsTaskWithLocalLibraries(task *jobs.Task) bool {
func IsTaskWithWorkspaceLibraries(task *jobs.Task) bool {
for _, l := range task.Libraries {
path := libPath(&l)
if isWorkspacePath(path) {
if IsWorkspaceLibrary(&l) {
return true
}
}
@ -90,16 +59,8 @@ func IsTaskWithWorkspaceLibraries(task *jobs.Task) bool {
return false
}
func isMissingRequiredLibraries(task *jobs.Task) bool {
if task.Libraries != nil {
return false
}
return task.PythonWheelTask != nil || task.SparkJarTask != nil
}
func findLibraryMatches(lib *compute.Library, b *bundle.Bundle) ([]string, error) {
path := libPath(lib)
path := libraryPath(lib)
if path == "" {
return nil, nil
}
@ -108,26 +69,27 @@ func findLibraryMatches(lib *compute.Library, b *bundle.Bundle) ([]string, error
return filepath.Glob(fullPath)
}
func findArtifactsAndMarkForUpload(ctx context.Context, lib *compute.Library, b *bundle.Bundle) error {
func findArtifactFiles(ctx context.Context, lib *compute.Library, b *bundle.Bundle) ([]*config.ArtifactFile, error) {
matches, err := findLibraryMatches(lib, b)
if err != nil {
return err
return nil, err
}
if len(matches) == 0 && isLocalLibrary(lib) {
return fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libPath(lib))
if len(matches) == 0 && IsLocalLibrary(lib) {
return nil, fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libraryPath(lib))
}
var out []*config.ArtifactFile
for _, match := range matches {
af, err := findArtifactFileByLocalPath(match, b)
if err != nil {
cmdio.LogString(ctx, fmt.Sprintf("%s. Skipping uploading. In order to use the define 'artifacts' section", err.Error()))
} else {
af.Libraries = append(af.Libraries, lib)
out = append(out, af)
}
}
return nil
return out, nil
}
func findArtifactFileByLocalPath(path string, b *bundle.Bundle) (*config.ArtifactFile, error) {
@ -142,67 +104,27 @@ func findArtifactFileByLocalPath(path string, b *bundle.Bundle) (*config.Artifac
return nil, fmt.Errorf("artifact section is not defined for file at %s", path)
}
func libPath(library *compute.Library) string {
if library.Whl != "" {
return library.Whl
}
if library.Jar != "" {
return library.Jar
}
if library.Egg != "" {
return library.Egg
func MapFilesToTaskLibraries(ctx context.Context, b *bundle.Bundle) map[string][]*compute.Library {
tasks := findAllTasks(b)
out := make(map[string][]*compute.Library)
for _, task := range tasks {
for j := range task.Libraries {
lib := &task.Libraries[j]
if !IsLocalLibrary(lib) {
continue
}
return ""
}
func isLocalLibrary(library *compute.Library) bool {
path := libPath(library)
if path == "" {
return false
}
return IsLocalPath(path)
}
func IsLocalPath(path string) bool {
if isExplicitFileScheme(path) {
return true
}
if isRemoteStorageScheme(path) {
return false
}
return !isAbsoluteRemotePath(path)
}
func isExplicitFileScheme(path string) bool {
return strings.HasPrefix(path, "file://")
}
func isRemoteStorageScheme(path string) bool {
url, err := url.Parse(path)
matches, err := findLibraryMatches(lib, b)
if err != nil {
return false
log.Warnf(ctx, "Error matching library to files: %s", err.Error())
continue
}
if url.Scheme == "" {
return false
for _, match := range matches {
out[match] = append(out[match], lib)
}
}
}
// If the path starts with scheme:/ format, it's a correct remote storage scheme
return strings.HasPrefix(path, url.Scheme+":/")
}
func isWorkspacePath(path string) bool {
return strings.HasPrefix(path, "/Workspace/") ||
strings.HasPrefix(path, "/Users/") ||
strings.HasPrefix(path, "/Shared/")
}
func isAbsoluteRemotePath(p string) bool {
// If path for library starts with /, it's a remote absolute path
return path.IsAbs(p)
return out
}

View File

@ -1,31 +1,88 @@
package libraries
import (
"fmt"
"context"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/require"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
)
var testCases map[string]bool = map[string]bool{
"./some/local/path": true,
"/some/full/path": false,
"/Workspace/path/to/package": false,
"/Users/path/to/package": false,
"file://path/to/package": true,
"C:\\path\\to\\package": true,
"dbfs://path/to/package": false,
"dbfs:/path/to/package": false,
"s3://path/to/package": false,
"abfss://path/to/package": false,
func TestMapFilesToTaskLibrariesNoGlob(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Path: "testdata",
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: "library1",
},
{
Whl: "library2",
},
{
Whl: "/absolute/path/in/workspace/library3",
},
},
},
{
Libraries: []compute.Library{
{
Whl: "library1",
},
{
Whl: "library2",
},
},
},
},
},
},
"job2": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: "library1",
},
{
Whl: "library2",
},
},
},
},
},
},
},
},
},
}
func TestIsLocalLbrary(t *testing.T) {
for p, result := range testCases {
lib := compute.Library{
Whl: p,
}
require.Equal(t, result, isLocalLibrary(&lib), fmt.Sprintf("isLocalLibrary must return %t for path %s ", result, p))
}
out := MapFilesToTaskLibraries(context.Background(), b)
assert.Len(t, out, 2)
// Pointer equality for "library1"
assert.Equal(t, []*compute.Library{
&b.Config.Resources.Jobs["job1"].JobSettings.Tasks[0].Libraries[0],
&b.Config.Resources.Jobs["job1"].JobSettings.Tasks[1].Libraries[0],
&b.Config.Resources.Jobs["job2"].JobSettings.Tasks[0].Libraries[0],
}, out[filepath.Clean("testdata/library1")])
// Pointer equality for "library2"
assert.Equal(t, []*compute.Library{
&b.Config.Resources.Jobs["job1"].JobSettings.Tasks[0].Libraries[1],
&b.Config.Resources.Jobs["job1"].JobSettings.Tasks[1].Libraries[1],
&b.Config.Resources.Jobs["job2"].JobSettings.Tasks[0].Libraries[1],
}, out[filepath.Clean("testdata/library2")])
}

View File

@ -0,0 +1,63 @@
package libraries
import (
"net/url"
"path"
"strings"
"github.com/databricks/databricks-sdk-go/service/compute"
)
// IsLocalPath returns true if the specified path indicates that
// it should be interpreted as a path on the local file system.
//
// The following paths are considered local:
//
// - myfile.txt
// - ./myfile.txt
// - ../myfile.txt
// - file:///foo/bar/myfile.txt
//
// The following paths are considered remote:
//
// - dbfs:/mnt/myfile.txt
// - s3:/mybucket/myfile.txt
// - /Users/jane@doe.com/myfile.txt
func IsLocalPath(p string) bool {
// If the path has the explicit file scheme, it's a local path.
if strings.HasPrefix(p, "file://") {
return true
}
// If the path has another scheme, it's a remote path.
if isRemoteStorageScheme(p) {
return false
}
// If path starts with /, it's a remote absolute path
return !path.IsAbs(p)
}
func isRemoteStorageScheme(path string) bool {
url, err := url.Parse(path)
if err != nil {
return false
}
if url.Scheme == "" {
return false
}
// If the path starts with scheme:/ format, it's a correct remote storage scheme
return strings.HasPrefix(path, url.Scheme+":/")
}
// IsLocalLibrary returns true if the specified library refers to a local path.
func IsLocalLibrary(library *compute.Library) bool {
path := libraryPath(library)
if path == "" {
return false
}
return IsLocalPath(path)
}

View File

@ -0,0 +1,43 @@
package libraries
import (
"testing"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/assert"
)
func TestIsLocalPath(t *testing.T) {
// Relative paths, paths with the file scheme, and Windows paths.
assert.True(t, IsLocalPath("./some/local/path"))
assert.True(t, IsLocalPath("file://path/to/package"))
assert.True(t, IsLocalPath("C:\\path\\to\\package"))
assert.True(t, IsLocalPath("myfile.txt"))
assert.True(t, IsLocalPath("./myfile.txt"))
assert.True(t, IsLocalPath("../myfile.txt"))
assert.True(t, IsLocalPath("file:///foo/bar/myfile.txt"))
// Absolute paths.
assert.False(t, IsLocalPath("/some/full/path"))
assert.False(t, IsLocalPath("/Workspace/path/to/package"))
assert.False(t, IsLocalPath("/Users/path/to/package"))
// Paths with schemes.
assert.False(t, IsLocalPath("dbfs://path/to/package"))
assert.False(t, IsLocalPath("dbfs:/path/to/package"))
assert.False(t, IsLocalPath("s3://path/to/package"))
assert.False(t, IsLocalPath("abfss://path/to/package"))
}
func TestIsLocalLibrary(t *testing.T) {
// Local paths.
assert.True(t, IsLocalLibrary(&compute.Library{Whl: "./file.whl"}))
assert.True(t, IsLocalLibrary(&compute.Library{Jar: "../target/some.jar"}))
// Non-local paths.
assert.False(t, IsLocalLibrary(&compute.Library{Whl: "/Workspace/path/to/file.whl"}))
assert.False(t, IsLocalLibrary(&compute.Library{Jar: "s3:/bucket/path/some.jar"}))
// Empty.
assert.False(t, IsLocalLibrary(&compute.Library{}))
}

45
bundle/libraries/match.go Normal file
View File

@ -0,0 +1,45 @@
package libraries
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
type match struct {
}
func MatchWithArtifacts() bundle.Mutator {
return &match{}
}
func (a *match) Name() string {
return "libraries.MatchWithArtifacts"
}
func (a *match) Apply(ctx context.Context, b *bundle.Bundle) error {
tasks := findAllTasks(b)
for _, task := range tasks {
if isMissingRequiredLibraries(task) {
return fmt.Errorf("task '%s' is missing required libraries. Please include your package code in task libraries block", task.TaskKey)
}
for j := range task.Libraries {
lib := &task.Libraries[j]
_, err := findArtifactFiles(ctx, lib, b)
if err != nil {
return err
}
}
}
return nil
}
func isMissingRequiredLibraries(task *jobs.Task) bool {
if task.Libraries != nil {
return false
}
return task.PythonWheelTask != nil || task.SparkJarTask != nil
}

View File

@ -0,0 +1 @@
package libraries

0
bundle/libraries/testdata/library1 vendored Normal file
View File

0
bundle/libraries/testdata/library2 vendored Normal file
View File

View File

@ -0,0 +1,38 @@
package libraries
import (
"strings"
"github.com/databricks/databricks-sdk-go/service/compute"
)
// IsWorkspacePath returns true if the specified path indicates that
// it should be interpreted as a Databricks Workspace path.
//
// The following paths are considered workspace paths:
//
// - /Workspace/Users/jane@doe.com/myfile
// - /Users/jane@doe.com/myfile
// - /Shared/project/myfile
//
// The following paths are not considered workspace paths:
//
// - myfile.txt
// - ./myfile.txt
// - ../myfile.txt
// - /foo/bar/myfile.txt
func IsWorkspacePath(path string) bool {
return strings.HasPrefix(path, "/Workspace/") ||
strings.HasPrefix(path, "/Users/") ||
strings.HasPrefix(path, "/Shared/")
}
// IsWorkspaceLibrary returns true if the specified library refers to a workspace path.
func IsWorkspaceLibrary(library *compute.Library) bool {
path := libraryPath(library)
if path == "" {
return false
}
return IsWorkspacePath(path)
}

View File

@ -0,0 +1,33 @@
package libraries
import (
"testing"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/assert"
)
func TestIsWorkspacePath(t *testing.T) {
// Absolute paths with particular prefixes.
assert.True(t, IsWorkspacePath("/Workspace/path/to/package"))
assert.True(t, IsWorkspacePath("/Users/path/to/package"))
assert.True(t, IsWorkspacePath("/Shared/path/to/package"))
// Relative paths.
assert.False(t, IsWorkspacePath("myfile.txt"))
assert.False(t, IsWorkspacePath("./myfile.txt"))
assert.False(t, IsWorkspacePath("../myfile.txt"))
}
func TestIsWorkspaceLibrary(t *testing.T) {
// Workspace paths.
assert.True(t, IsWorkspaceLibrary(&compute.Library{Whl: "/Workspace/path/to/file.whl"}))
// Non-workspace paths.
assert.False(t, IsWorkspaceLibrary(&compute.Library{Whl: "./file.whl"}))
assert.False(t, IsWorkspaceLibrary(&compute.Library{Jar: "../target/some.jar"}))
assert.False(t, IsWorkspaceLibrary(&compute.Library{Jar: "s3:/bucket/path/some.jar"}))
// Empty.
assert.False(t, IsWorkspaceLibrary(&compute.Library{}))
}

View File

@ -84,5 +84,4 @@ func TestBundlePythonWheelBuildNoBuildJustUpload(t *testing.T) {
"package",
"my_test_code-0.0.1-py3-none-any.whl",
))
require.True(t, artifact.Files[0].NeedsUpload())
}

View File

@ -1,7 +1,6 @@
package bundle
import (
"context"
"os"
"path"
"path/filepath"
@ -11,9 +10,11 @@ import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/artifacts"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/internal"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/cli/internal/acc"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)
@ -26,8 +27,8 @@ func touchEmptyFile(t *testing.T, path string) {
}
func TestAccUploadArtifactFileToCorrectRemotePath(t *testing.T) {
t.Log(internal.GetEnvOrSkipTest(t, "CLOUD_ENV"))
ctx, wt := acc.WorkspaceTest(t)
w := wt.W
dir := t.TempDir()
whlPath := filepath.Join(dir, "dist", "test.whl")
touchEmptyFile(t, whlPath)
@ -37,14 +38,10 @@ func TestAccUploadArtifactFileToCorrectRemotePath(t *testing.T) {
Files: []config.ArtifactFile{
{
Source: whlPath,
Libraries: []*compute.Library{
{Whl: "dist\\test.whl"},
},
},
},
}
w := databricks.Must(databricks.NewWorkspaceClient())
wsDir := internal.TemporaryWorkspaceDir(t, w)
b := &bundle.Bundle{
@ -59,11 +56,33 @@ func TestAccUploadArtifactFileToCorrectRemotePath(t *testing.T) {
Artifacts: config.Artifacts{
"test": artifact,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"test": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: "dist/test.whl",
},
},
},
},
},
},
},
},
},
}
err := bundle.Apply(context.Background(), b, artifacts.BasicUpload("test"))
err := bundle.Apply(ctx, b, artifacts.BasicUpload("test"))
require.NoError(t, err)
// The remote path attribute on the artifact file should have been set.
require.Regexp(t, regexp.MustCompile(path.Join(regexp.QuoteMeta(wsDir), `.internal/test\.whl`)), artifact.Files[0].RemotePath)
require.Regexp(t, regexp.MustCompile(path.Join("/Workspace", regexp.QuoteMeta(wsDir), `.internal/test\.whl`)), artifact.Files[0].Libraries[0].Whl)
// The task library path should have been updated to the remote path.
lib := b.Config.Resources.Jobs["test"].JobSettings.Tasks[0].Libraries[0]
require.Regexp(t, regexp.MustCompile(path.Join("/Workspace", regexp.QuoteMeta(wsDir), `.internal/test\.whl`)), lib.Whl)
}