From aa2e16d757c4aa24a5892b587b4249b7d93fd094 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 16 Sep 2024 02:28:58 +0200 Subject: [PATCH] cleanup and add test --- bundle/artifacts/upload.go | 9 +- bundle/libraries/upload.go | 110 ++++++------ bundle/libraries/upload_test.go | 157 ++++++++++++++++-- libs/filer/workspace_files_client.go | 16 +- .../workspace_files_extensions_client_test.go | 2 +- 5 files changed, 213 insertions(+), 81 deletions(-) diff --git a/bundle/artifacts/upload.go b/bundle/artifacts/upload.go index d7b3e1baa..c69939e8c 100644 --- a/bundle/artifacts/upload.go +++ b/bundle/artifacts/upload.go @@ -21,18 +21,13 @@ func (m *cleanUp) Name() string { } func (m *cleanUp) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - uploadPath, err := libraries.GetUploadBasePath(b) - if err != nil { - return diag.FromErr(err) - } - - client, diags := libraries.GetFilerForLibraries(ctx, b, uploadPath) + client, uploadPath, diags := libraries.GetFilerForLibraries(ctx, b) if diags.HasError() { return diags } // We intentionally ignore the error because it is not critical to the deployment - err = client.Delete(ctx, ".", filer.DeleteRecursively) + err := client.Delete(ctx, ".", filer.DeleteRecursively) if err != nil { log.Errorf(ctx, "failed to delete %s: %v", uploadPath, err) } diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go index cac9fa18f..163b56131 100644 --- a/bundle/libraries/upload.go +++ b/bundle/libraries/upload.go @@ -129,24 +129,17 @@ func collectLocalLibraries(b *bundle.Bundle) (map[string][]configLocation, error } func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - uploadPath, err := GetUploadBasePath(b) - if err != nil { - return diag.FromErr(err) - } - // If the client is not initialized, initialize it // We use client field in mutator to allow for mocking client in testing + var uploadPath string + var diags diag.Diagnostics if u.client == nil { - filer, diags := GetFilerForLibraries(ctx, b, uploadPath) + u.client, uploadPath, diags = GetFilerForLibraries(ctx, b) if diags.HasError() { return diags } - - u.client = filer } - var diags diag.Diagnostics - libs, err := collectLocalLibraries(b) if err != nil { return diag.FromErr(err) @@ -196,30 +189,45 @@ func (u *upload) Name() string { return "libraries.Upload" } -// TODO: TODO: Nicer comments here. -// Case 1: UC volume path is valid. Return the client. -// Case 2: invalid path. -// (a) Not enough elements. -// (b) catalog and schema correspond to a volume define in the DAB. +// This function returns the right filer to use, to upload artifacts to the configured locations. +// Supported locations: +// 1. WSFS +// 2. UC volumes // -// -> exception for when the schema value is fully or partially interpolated. -// In that case only check the catalog name. -// -// TODO: Convert to warning? We don't error today if you specify an invalid volume path. -func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle, uploadPath string) (filer.Filer, diag.Diagnostics) { +// If a UC Volume is configured, this function checks if the UC volume exists in the workspace. +// Then: +// 1. If the UC volume existing in the workspace: +// Returns a filer for the UC volume. +// 2. If the UC volume does not exist in the workspace but is very likely to be defined in +// the bundle configuration: +// Returns a warning along with the error that instructs the user to deploy the +// UC volume before using it in the artifact path. +// 3. If the UC volume does not exist in the workspace and is not defined in the bundle configuration: +// Returns an error. +func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) { + artifactPath := b.Config.Workspace.ArtifactPath + if artifactPath == "" { + return nil, "", diag.Errorf("remote artifact path not configured") + } + + // path to upload artifact files to. + uploadPath := path.Join(artifactPath, ".internal") + w := b.WorkspaceClient() isVolumesPath := strings.HasPrefix(uploadPath, "/Volumes/") - // If the path is not a volume path, use the workspace file system. + // Return early with a WSFS filer if the artifact path is not a UC volume path. if !isVolumesPath { f, err := filer.NewWorkspaceFilesClient(w, uploadPath) - return f, diag.FromErr(err) + return f, uploadPath, diag.FromErr(err) } - parts := strings.Split(uploadPath, "/") + parts := strings.Split(artifactPath, "/") volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes///, got %s", uploadPath) - if len(strings.Split(uploadPath, "/")) < 5 { - return nil, diag.FromErr(volumeFormatErr) + + // Incorrect format. + if len(parts) < 5 { + return nil, "", diag.FromErr(volumeFormatErr) } catalogName := parts[2] @@ -228,36 +236,36 @@ func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle, uploadPath stri // Incorrect format. if catalogName == "" || schemaName == "" || volumeName == "" { - return nil, diag.FromErr(volumeFormatErr) + return nil, "", diag.FromErr(volumeFormatErr) } + // Check if the UC volume exists in the workspace. + volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName) + err := w.Files.GetDirectoryMetadataByDirectoryPath(ctx, volumePath) + // If the volume exists already, directly return the filer for the upload path. - volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName) - vf, err := filer.NewFilesClient(w, volumePath) - if err != nil { - return nil, diag.FromErr(err) - } - if _, err := vf.Stat(ctx, "."); err == nil { + if err == nil { f, err := filer.NewFilesClient(w, uploadPath) - return f, diag.FromErr(err) + return f, uploadPath, diag.FromErr(err) } - // The volume does not exist. Check if the volume is defined in the bundle. - // TODO: Note that this is not a breaking change. - // TODO: Include error as well for why the stat call failed. It's more context. - l, ok := locationOfVolumeInBundle(b, catalogName, schemaName, volumeName) + diags := diag.Errorf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path: %s", volumePath, err) + + path, locations, ok := matchVolumeInBundle(b, catalogName, schemaName, volumeName) if !ok { - return nil, diag.Errorf("the bundle is configured to upload artifacts to %s but a UC volume at %s does not exist", uploadPath, volumePath) + return nil, "", diags } - return nil, diag.Errorf(`the bundle is configured to upload artifacts to %s but a -UC volume at %s does not exist. Note: We detected that you have a UC volume -defined that matched the path above at %s. Please deploy the UC volume -in a separate deployment before using it in as a destination to upload -artifacts.`, uploadPath, volumePath, l) + warning := diag.Diagnostic{ + Severity: diag.Warning, + Summary: `the UC volume that is likely being used in the artifact_path has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.`, + Locations: locations, + Paths: []dyn.Path{path}, + } + return nil, "", diags.Append(warning) } -func locationOfVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Location, bool) { +func matchVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) { volumes := b.Config.Resources.Volumes for k, v := range volumes { if v.CatalogName != catalogName || v.Name != volumeName { @@ -270,9 +278,10 @@ func locationOfVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeN if v.SchemaName != schemaName && !dynvar.ContainsVariableReference(v.SchemaName) { continue } - return b.Config.GetLocation(fmt.Sprintf("resources.volumes.%s", k)), true + pathString := fmt.Sprintf("resources.volumes.%s", k) + return dyn.MustPathFromString(pathString), b.Config.GetLocations(pathString), true } - return dyn.Location{}, false + return nil, nil, false } // Function to upload file (a library, artifact and etc) to Workspace or UC volume @@ -294,12 +303,3 @@ func UploadFile(ctx context.Context, file string, client filer.Filer) error { log.Infof(ctx, "Upload succeeded") return nil } - -func GetUploadBasePath(b *bundle.Bundle) (string, error) { - artifactPath := b.Config.Workspace.ArtifactPath - if artifactPath == "" { - return "", fmt.Errorf("remote artifact path not configured") - } - - return path.Join(artifactPath, ".internal"), nil -} diff --git a/bundle/libraries/upload_test.go b/bundle/libraries/upload_test.go index 3eec46920..f15e2b93e 100644 --- a/bundle/libraries/upload_test.go +++ b/bundle/libraries/upload_test.go @@ -2,6 +2,7 @@ package libraries import ( "context" + "fmt" "path/filepath" "testing" @@ -11,8 +12,11 @@ import ( "github.com/databricks/cli/bundle/internal/bundletest" mockfiler "github.com/databricks/cli/internal/mocks/libs/filer" "github.com/databricks/cli/internal/testutil" + "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/filer" + sdkconfig "github.com/databricks/databricks-sdk-go/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -334,7 +338,7 @@ func TestUploadMultipleLibraries(t *testing.T) { require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/Users/foo@bar.com/mywheel.whl") } -func TestLocationOfVolumeInBundle(t *testing.T) { +func TestMatchVolumeInBundle(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -353,26 +357,159 @@ func TestLocationOfVolumeInBundle(t *testing.T) { bundletest.SetLocation(b, "resources.volumes.foo", "volume.yml") - // volume is in DAB directly. - l, ok := locationOfVolumeInBundle(b, "main", "my_schema", "my_volume") + // volume is in DAB. + path, locations, ok := matchVolumeInBundle(b, "main", "my_schema", "my_volume") assert.True(t, ok) - assert.Equal(t, dyn.Location{ + assert.Equal(t, []dyn.Location{{ File: "volume.yml", - }, l) + }}, locations) + assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path) // wrong volume name - _, ok = locationOfVolumeInBundle(b, "main", "my_schema", "doesnotexist") + _, _, ok = matchVolumeInBundle(b, "main", "my_schema", "doesnotexist") assert.False(t, ok) // wrong schema name - _, ok = locationOfVolumeInBundle(b, "main", "doesnotexist", "my_volume") + _, _, ok = matchVolumeInBundle(b, "main", "doesnotexist", "my_volume") assert.False(t, ok) // schema name is interpolated. b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema}" - l, ok = locationOfVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume") + path, locations, ok = matchVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume") assert.True(t, ok) - assert.Equal(t, dyn.Location{ + assert.Equal(t, []dyn.Location{{ File: "volume.yml", - }, l) + }}, locations) + assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path) +} + +func TestGetFilerForLibraries(t *testing.T) { + t.Run("valid wsfs", func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/foo/bar/artifacts", + }, + }, + } + + client, uploadPath, diags := GetFilerForLibraries(context.Background(), b) + require.NoError(t, diags.Error()) + assert.Equal(t, "/foo/bar/artifacts/.internal", uploadPath) + + assert.IsType(t, &filer.WorkspaceFilesClient{}, client) + }) + + t.Run("valid uc volume", func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/main/my_schema/my_volume", + }, + }, + } + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil) + b.SetWorkpaceClient(m.WorkspaceClient) + + client, uploadPath, diags := GetFilerForLibraries(context.Background(), b) + require.NoError(t, diags.Error()) + assert.Equal(t, "/Volumes/main/my_schema/my_volume/.internal", uploadPath) + + assert.IsType(t, &filer.FilesClient{}, client) + }) + + t.Run("volume not in DAB", func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/main/my_schema/doesnotexist", + }, + }, + } + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/doesnotexist").Return(fmt.Errorf("error from API")) + b.SetWorkpaceClient(m.WorkspaceClient) + + _, _, diags := GetFilerForLibraries(context.Background(), b) + require.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/doesnotexist that is configured in the artifact_path: error from API") + }) + + t.Run("volume in DAB config", func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/main/my_schema/my_volume", + }, + Resources: config.Resources{ + Volumes: map[string]*resources.Volume{ + "foo": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "main", + Name: "my_volume", + VolumeType: "MANAGED", + SchemaName: "my_schema", + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.volumes.foo", "volume.yml") + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(fmt.Errorf("error from API")) + b.SetWorkpaceClient(m.WorkspaceClient) + + _, _, diags := GetFilerForLibraries(context.Background(), b) + assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/my_volume that is configured in the artifact_path: error from API") + assert.Contains(t, diags, diag.Diagnostic{ + Severity: diag.Warning, + Summary: "the UC volume that is likely being used in the artifact_path has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.", + Locations: []dyn.Location{{ + File: "volume.yml", + }}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.volumes.foo")}, + }) + }) + + t.Run("remote path is not set", func(t *testing.T) { + b := &bundle.Bundle{} + + _, _, diags := GetFilerForLibraries(context.Background(), b) + require.EqualError(t, diags.Error(), "remote artifact path not configured") + }) + + t.Run("invalid volume paths", func(t *testing.T) { + invalidPaths := []string{ + "/Volumes", + "/Volumes/main", + "/Volumes/main/", + "/Volumes/main//", + "/Volumes/main//my_schema", + "/Volumes/main/my_schema", + "/Volumes/main/my_schema/", + "/Volumes/main/my_schema//", + "/Volumes//my_schema/my_volume", + } + + for _, path := range invalidPaths { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: path, + }, + }, + } + + _, _, diags := GetFilerForLibraries(context.Background(), b) + require.EqualError(t, diags.Error(), fmt.Sprintf("expected UC volume path to be in the format /Volumes///, got %s", filepath.Join(path, ".internal"))) + } + }) } diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index d8ab5a6bb..5e2a9813b 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -114,7 +114,7 @@ type apiClient interface { // NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled. // It can access any workspace path if files-in-workspace is enabled. -type workspaceFilesClient struct { +type WorkspaceFilesClient struct { workspaceClient *databricks.WorkspaceClient apiClient apiClient @@ -128,7 +128,7 @@ func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, return nil, err } - return &workspaceFilesClient{ + return &WorkspaceFilesClient{ workspaceClient: w, apiClient: apiClient, @@ -136,7 +136,7 @@ func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, }, nil } -func (w *workspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { +func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) if err != nil { return err @@ -206,7 +206,7 @@ func (w *workspaceFilesClient) Write(ctx context.Context, name string, reader io return err } -func (w *workspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { +func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err @@ -230,7 +230,7 @@ func (w *workspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCl return w.workspaceClient.Workspace.Download(ctx, absPath) } -func (w *workspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { +func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { absPath, err := w.root.Join(name) if err != nil { return err @@ -274,7 +274,7 @@ func (w *workspaceFilesClient) Delete(ctx context.Context, name string, mode ... return err } -func (w *workspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { +func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err @@ -307,7 +307,7 @@ func (w *workspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.D return wsfsDirEntriesFromObjectInfos(objects), nil } -func (w *workspaceFilesClient) Mkdir(ctx context.Context, name string) error { +func (w *WorkspaceFilesClient) Mkdir(ctx context.Context, name string) error { dirPath, err := w.root.Join(name) if err != nil { return err @@ -317,7 +317,7 @@ func (w *workspaceFilesClient) Mkdir(ctx context.Context, name string) error { }) } -func (w *workspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { +func (w *WorkspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err diff --git a/libs/filer/workspace_files_extensions_client_test.go b/libs/filer/workspace_files_extensions_client_test.go index 321c43712..33c558884 100644 --- a/libs/filer/workspace_files_extensions_client_test.go +++ b/libs/filer/workspace_files_extensions_client_test.go @@ -123,7 +123,7 @@ func TestFilerWorkspaceFilesExtensionsErrorsOnDupName(t *testing.T) { "return_export_info": "true", }, mock.AnythingOfType("*filer.wsfsFileInfo"), []func(*http.Request) error(nil)).Return(nil, statNotebook) - workspaceFilesClient := workspaceFilesClient{ + workspaceFilesClient := WorkspaceFilesClient{ workspaceClient: mockedWorkspaceClient.WorkspaceClient, apiClient: &mockedApiClient, root: NewWorkspaceRootPath("/dir"),