diff --git a/bundle/artifacts/upload.go b/bundle/artifacts/upload.go index 58c006dc1..c69939e8c 100644 --- a/bundle/artifacts/upload.go +++ b/bundle/artifacts/upload.go @@ -21,18 +21,13 @@ func (m *cleanUp) Name() string { } func (m *cleanUp) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - uploadPath, err := libraries.GetUploadBasePath(b) - if err != nil { - return diag.FromErr(err) - } - - client, err := libraries.GetFilerForLibraries(b.WorkspaceClient(), uploadPath) - if err != nil { - return diag.FromErr(err) + client, uploadPath, diags := libraries.GetFilerForLibraries(ctx, b) + if diags.HasError() { + return diags } // We intentionally ignore the error because it is not critical to the deployment - err = client.Delete(ctx, ".", filer.DeleteRecursively) + err := client.Delete(ctx, ".", filer.DeleteRecursively) if err != nil { log.Errorf(ctx, "failed to delete %s: %v", uploadPath, err) } diff --git a/bundle/config/mutator/apply_presets_test.go b/bundle/config/mutator/apply_presets_test.go index 497ef051a..91d5b62e5 100644 --- a/bundle/config/mutator/apply_presets_test.go +++ b/bundle/config/mutator/apply_presets_test.go @@ -73,7 +73,7 @@ func TestApplyPresetsPrefix(t *testing.T) { } } -func TestApplyPresetsPrefixForUcSchema(t *testing.T) { +func TestApplyPresetsPrefixForSchema(t *testing.T) { tests := []struct { name string prefix string @@ -129,6 +129,36 @@ func TestApplyPresetsPrefixForUcSchema(t *testing.T) { } } +func TestApplyPresetsVolumesShouldNotBePrefixed(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Volumes: map[string]*resources.Volume{ + "volume1": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "volume1", + CatalogName: "catalog1", + SchemaName: "schema1", + }, + }, + }, + }, + Presets: config.Presets{ + NamePrefix: "[prefix]", + }, + }, + } + + ctx := context.Background() + diag := bundle.Apply(ctx, b, mutator.ApplyPresets()) + + if diag.HasError() { + t.Fatalf("unexpected error: %v", diag) + } + + require.Equal(t, "volume1", b.Config.Resources.Volumes["volume1"].Name) +} + func TestApplyPresetsTags(t *testing.T) { tests := []struct { name string diff --git a/bundle/config/mutator/process_target_mode_test.go b/bundle/config/mutator/process_target_mode_test.go index c5ea9adea..14d524416 100644 --- a/bundle/config/mutator/process_target_mode_test.go +++ b/bundle/config/mutator/process_target_mode_test.go @@ -4,7 +4,7 @@ import ( "context" "reflect" "runtime" - "strings" + "slices" "testing" "github.com/databricks/cli/bundle" @@ -131,6 +131,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle { Schemas: map[string]*resources.Schema{ "schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}}, }, + Volumes: map[string]*resources.Volume{ + "volume1": {CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{Name: "volume1"}}, + }, Clusters: map[string]*resources.Cluster{ "cluster1": {ClusterSpec: &compute.ClusterSpec{ClusterName: "cluster1", SparkVersion: "13.2.x", NumWorkers: 1}}, }, @@ -311,6 +314,8 @@ func TestProcessTargetModeDefault(t *testing.T) { assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) + assert.Equal(t, "schema1", b.Config.Resources.Schemas["schema1"].Name) + assert.Equal(t, "volume1", b.Config.Resources.Volumes["volume1"].Name) assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName) } @@ -355,6 +360,8 @@ func TestProcessTargetModeProduction(t *testing.T) { assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) + assert.Equal(t, "schema1", b.Config.Resources.Schemas["schema1"].Name) + assert.Equal(t, "volume1", b.Config.Resources.Volumes["volume1"].Name) assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName) } @@ -388,10 +395,17 @@ func TestAllResourcesMocked(t *testing.T) { } } -// Make sure that we at least rename all resources -func TestAllResourcesRenamed(t *testing.T) { +// Make sure that we at rename all non UC resources +func TestAllNonUcResourcesAreRenamed(t *testing.T) { b := mockBundle(config.Development) + // UC resources should not have a prefix added to their name. Right now + // this list only contains the Volume resource since we have yet to remove + // prefixing support for UC schemas and registered models. + ucFields := []reflect.Type{ + reflect.TypeOf(&resources.Volume{}), + } + m := bundle.Seq(ProcessTargetMode(), ApplyPresets()) diags := bundle.Apply(context.Background(), b, m) require.NoError(t, diags.Error()) @@ -404,14 +418,14 @@ func TestAllResourcesRenamed(t *testing.T) { for _, key := range field.MapKeys() { resource := field.MapIndex(key) nameField := resource.Elem().FieldByName("Name") - if nameField.IsValid() && nameField.Kind() == reflect.String { - assert.True( - t, - strings.Contains(nameField.String(), "dev"), - "process_target_mode should rename '%s' in '%s'", - key, - resources.Type().Field(i).Name, - ) + if !nameField.IsValid() || nameField.Kind() != reflect.String { + continue + } + + if slices.Contains(ucFields, resource.Type()) { + assert.NotContains(t, nameField.String(), "dev", "process_target_mode should not rename '%s' in '%s'", key, resources.Type().Field(i).Name) + } else { + assert.Contains(t, nameField.String(), "dev", "process_target_mode should rename '%s' in '%s'", key, resources.Type().Field(i).Name) } } } diff --git a/bundle/config/mutator/run_as_test.go b/bundle/config/mutator/run_as_test.go index acb6c3a43..dbf4bf806 100644 --- a/bundle/config/mutator/run_as_test.go +++ b/bundle/config/mutator/run_as_test.go @@ -42,6 +42,7 @@ func allResourceTypes(t *testing.T) []string { "quality_monitors", "registered_models", "schemas", + "volumes", }, resourceTypes, ) @@ -141,6 +142,7 @@ func TestRunAsErrorForUnsupportedResources(t *testing.T) { "registered_models", "experiments", "schemas", + "volumes", } base := config.Root{ diff --git a/bundle/config/resources.go b/bundle/config/resources.go index 2886e3571..13cf0d462 100644 --- a/bundle/config/resources.go +++ b/bundle/config/resources.go @@ -20,6 +20,7 @@ type Resources struct { RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"` QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"` Schemas map[string]*resources.Schema `json:"schemas,omitempty"` + Volumes map[string]*resources.Volume `json:"volumes,omitempty"` Clusters map[string]*resources.Cluster `json:"clusters,omitempty"` Dashboards map[string]*resources.Dashboard `json:"dashboards,omitempty"` } @@ -85,6 +86,7 @@ func (r *Resources) AllResources() []ResourceGroup { collectResourceMap(descriptions["schemas"], r.Schemas), collectResourceMap(descriptions["clusters"], r.Clusters), collectResourceMap(descriptions["dashboards"], r.Dashboards), + collectResourceMap(descriptions["volumes"], r.Volumes), } } @@ -189,5 +191,11 @@ func SupportedResources() map[string]ResourceDescription { SingularTitle: "Dashboard", PluralTitle: "Dashboards", }, + "volumes": { + SingularName: "volume", + PluralName: "volumes", + SingularTitle: "Volume", + PluralTitle: "Volumes", + }, } } diff --git a/bundle/config/resources/volume.go b/bundle/config/resources/volume.go new file mode 100644 index 000000000..cae2a3463 --- /dev/null +++ b/bundle/config/resources/volume.go @@ -0,0 +1,62 @@ +package resources + +import ( + "context" + "fmt" + "net/url" + "strings" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/marshal" + "github.com/databricks/databricks-sdk-go/service/catalog" +) + +type Volume struct { + // List of grants to apply on this volume. + Grants []Grant `json:"grants,omitempty"` + + // Full name of the volume (catalog_name.schema_name.volume_name). This value is read from + // the terraform state after deployment succeeds. + ID string `json:"id,omitempty" bundle:"readonly"` + + *catalog.CreateVolumeRequestContent + + ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"` + URL string `json:"url,omitempty" bundle:"internal"` +} + +func (v *Volume) UnmarshalJSON(b []byte) error { + return marshal.Unmarshal(b, v) +} + +func (v Volume) MarshalJSON() ([]byte, error) { + return marshal.Marshal(v) +} + +func (v *Volume) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) { + return false, fmt.Errorf("volume.Exists() is not supported") +} + +func (v *Volume) TerraformResourceName() string { + return "databricks_volume" +} + +func (v *Volume) InitializeURL(baseURL url.URL) { + if v.ID == "" { + return + } + baseURL.Path = fmt.Sprintf("explore/data/volumes/%s", strings.ReplaceAll(v.ID, ".", "/")) + v.URL = baseURL.String() +} + +func (v *Volume) GetURL() string { + return v.URL +} + +func (v *Volume) GetName() string { + return v.Name +} + +func (v *Volume) IsNil() bool { + return v.CreateVolumeRequestContent == nil +} diff --git a/bundle/deploy/terraform/convert.go b/bundle/deploy/terraform/convert.go index 0ace7c66e..b710c690f 100644 --- a/bundle/deploy/terraform/convert.go +++ b/bundle/deploy/terraform/convert.go @@ -166,6 +166,16 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error { } cur.ID = instance.Attributes.ID config.Resources.Schemas[resource.Name] = cur + case "databricks_volume": + if config.Resources.Volumes == nil { + config.Resources.Volumes = make(map[string]*resources.Volume) + } + cur := config.Resources.Volumes[resource.Name] + if cur == nil { + cur = &resources.Volume{ModifiedStatus: resources.ModifiedStatusDeleted} + } + cur.ID = instance.Attributes.ID + config.Resources.Volumes[resource.Name] = cur case "databricks_cluster": if config.Resources.Clusters == nil { config.Resources.Clusters = make(map[string]*resources.Cluster) @@ -235,6 +245,11 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error { src.ModifiedStatus = resources.ModifiedStatusCreated } } + for _, src := range config.Resources.Volumes { + if src.ModifiedStatus == "" && src.ID == "" { + src.ModifiedStatus = resources.ModifiedStatusCreated + } + } for _, src := range config.Resources.Clusters { if src.ModifiedStatus == "" && src.ID == "" { src.ModifiedStatus = resources.ModifiedStatusCreated diff --git a/bundle/deploy/terraform/convert_test.go b/bundle/deploy/terraform/convert_test.go index 6ed34d430..076d9b7a0 100644 --- a/bundle/deploy/terraform/convert_test.go +++ b/bundle/deploy/terraform/convert_test.go @@ -670,6 +670,14 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) { {Attributes: stateInstanceAttributes{ID: "1"}}, }, }, + { + Type: "databricks_volume", + Mode: "managed", + Name: "test_volume", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "1"}}, + }, + }, { Type: "databricks_cluster", Mode: "managed", @@ -715,6 +723,9 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) { assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus) + assert.Equal(t, "1", config.Resources.Volumes["test_volume"].ID) + assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Volumes["test_volume"].ModifiedStatus) + assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster"].ModifiedStatus) @@ -783,6 +794,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) { }, }, }, + Volumes: map[string]*resources.Volume{ + "test_volume": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "test_volume", + }, + }, + }, Clusters: map[string]*resources.Cluster{ "test_cluster": { ClusterSpec: &compute.ClusterSpec{ @@ -829,6 +847,9 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) { assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus) + assert.Equal(t, "", config.Resources.Volumes["test_volume"].ID) + assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Volumes["test_volume"].ModifiedStatus) + assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ID) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster"].ModifiedStatus) @@ -937,6 +958,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { }, }, }, + Volumes: map[string]*resources.Volume{ + "test_volume": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "test_volume", + }, + }, + "test_volume_new": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "test_volume_new", + }, + }, + }, Clusters: map[string]*resources.Cluster{ "test_cluster": { ClusterSpec: &compute.ClusterSpec{ @@ -1093,6 +1126,22 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { {Attributes: stateInstanceAttributes{ID: "2"}}, }, }, + { + Type: "databricks_volume", + Mode: "managed", + Name: "test_volume", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "1"}}, + }, + }, + { + Type: "databricks_volume", + Mode: "managed", + Name: "test_volume_old", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "2"}}, + }, + }, { Type: "databricks_cluster", Mode: "managed", @@ -1186,6 +1235,13 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus) + assert.Equal(t, "1", config.Resources.Volumes["test_volume"].ID) + assert.Equal(t, "", config.Resources.Volumes["test_volume"].ModifiedStatus) + assert.Equal(t, "2", config.Resources.Volumes["test_volume_old"].ID) + assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Volumes["test_volume_old"].ModifiedStatus) + assert.Equal(t, "", config.Resources.Volumes["test_volume_new"].ID) + assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Volumes["test_volume_new"].ModifiedStatus) + assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID) assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ModifiedStatus) assert.Equal(t, "2", config.Resources.Clusters["test_cluster_old"].ID) diff --git a/bundle/deploy/terraform/interpolate.go b/bundle/deploy/terraform/interpolate.go index eb15c63ec..9c2126aec 100644 --- a/bundle/deploy/terraform/interpolate.go +++ b/bundle/deploy/terraform/interpolate.go @@ -58,6 +58,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...) case dyn.Key("schemas"): path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...) + case dyn.Key("volumes"): + path = dyn.NewPath(dyn.Key("databricks_volume")).Append(path[2:]...) case dyn.Key("clusters"): path = dyn.NewPath(dyn.Key("databricks_cluster")).Append(path[2:]...) case dyn.Key("dashboards"): diff --git a/bundle/deploy/terraform/interpolate_test.go b/bundle/deploy/terraform/interpolate_test.go index b26ef928d..fc5c4d184 100644 --- a/bundle/deploy/terraform/interpolate_test.go +++ b/bundle/deploy/terraform/interpolate_test.go @@ -31,6 +31,7 @@ func TestInterpolate(t *testing.T) { "other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}", "other_registered_model": "${resources.registered_models.other_registered_model.id}", "other_schema": "${resources.schemas.other_schema.id}", + "other_volume": "${resources.volumes.other_volume.id}", "other_cluster": "${resources.clusters.other_cluster.id}", "other_dashboard": "${resources.dashboards.other_dashboard.id}", }, @@ -69,6 +70,7 @@ func TestInterpolate(t *testing.T) { assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"]) assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"]) assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"]) + assert.Equal(t, "${databricks_volume.other_volume.id}", j.Tags["other_volume"]) assert.Equal(t, "${databricks_cluster.other_cluster.id}", j.Tags["other_cluster"]) assert.Equal(t, "${databricks_dashboard.other_dashboard.id}", j.Tags["other_dashboard"]) diff --git a/bundle/deploy/terraform/tfdyn/convert_volume.go b/bundle/deploy/terraform/tfdyn/convert_volume.go new file mode 100644 index 000000000..4211e1f9e --- /dev/null +++ b/bundle/deploy/terraform/tfdyn/convert_volume.go @@ -0,0 +1,45 @@ +package tfdyn + +import ( + "context" + "fmt" + + "github.com/databricks/cli/bundle/internal/tf/schema" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/log" +) + +func convertVolumeResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { + // Normalize the output value to the target schema. + vout, diags := convert.Normalize(schema.ResourceVolume{}, vin) + for _, diag := range diags { + log.Debugf(ctx, "volume normalization diagnostic: %s", diag.Summary) + } + + return vout, nil +} + +type volumeConverter struct{} + +func (volumeConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error { + vout, err := convertVolumeResource(ctx, vin) + if err != nil { + return err + } + + // Add the converted resource to the output. + out.Volume[key] = vout.AsAny() + + // Configure grants for this resource. + if grants := convertGrantsResource(ctx, vin); grants != nil { + grants.Volume = fmt.Sprintf("${databricks_volume.%s.id}", key) + out.Grants["volume_"+key] = grants + } + + return nil +} + +func init() { + registerConverter("volumes", volumeConverter{}) +} diff --git a/bundle/deploy/terraform/tfdyn/convert_volume_test.go b/bundle/deploy/terraform/tfdyn/convert_volume_test.go new file mode 100644 index 000000000..c897ae69a --- /dev/null +++ b/bundle/deploy/terraform/tfdyn/convert_volume_test.go @@ -0,0 +1,70 @@ +package tfdyn + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/tf/schema" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConvertVolume(t *testing.T) { + var src = resources.Volume{ + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "catalog", + Comment: "comment", + Name: "name", + SchemaName: "schema", + StorageLocation: "s3://bucket/path", + VolumeType: "EXTERNAL", + }, + Grants: []resources.Grant{ + { + Privileges: []string{"READ_VOLUME"}, + Principal: "jack@gmail.com", + }, + { + Privileges: []string{"WRITE_VOLUME"}, + Principal: "jane@gmail.com", + }, + }, + } + + vin, err := convert.FromTyped(src, dyn.NilValue) + require.NoError(t, err) + + ctx := context.Background() + out := schema.NewResources() + err = volumeConverter{}.Convert(ctx, "my_volume", vin, out) + require.NoError(t, err) + + // Assert equality on the volume + require.Equal(t, map[string]any{ + "catalog_name": "catalog", + "comment": "comment", + "name": "name", + "schema_name": "schema", + "storage_location": "s3://bucket/path", + "volume_type": "EXTERNAL", + }, out.Volume["my_volume"]) + + // Assert equality on the grants + assert.Equal(t, &schema.ResourceGrants{ + Volume: "${databricks_volume.my_volume.id}", + Grant: []schema.ResourceGrantsGrant{ + { + Privileges: []string{"READ_VOLUME"}, + Principal: "jack@gmail.com", + }, + { + Privileges: []string{"WRITE_VOLUME"}, + Principal: "jane@gmail.com", + }, + }, + }, out.Grants["volume_my_volume"]) +} diff --git a/bundle/libraries/filer.go b/bundle/libraries/filer.go new file mode 100644 index 000000000..4448ed325 --- /dev/null +++ b/bundle/libraries/filer.go @@ -0,0 +1,32 @@ +package libraries + +import ( + "context" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/filer" +) + +// We upload artifacts to the workspace in a directory named ".internal" to have +// a well defined location for artifacts that have been uploaded by the DABs. +const InternalDirName = ".internal" + +// This function returns a filer for uploading artifacts to the configured location. +// Supported locations: +// 1. WSFS +// 2. UC volumes +func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) { + artifactPath := b.Config.Workspace.ArtifactPath + if artifactPath == "" { + return nil, "", diag.Errorf("remote artifact path not configured") + } + + switch { + case IsVolumesPath(artifactPath): + return filerForVolume(ctx, b) + + default: + return filerForWorkspace(b) + } +} diff --git a/bundle/libraries/filer_test.go b/bundle/libraries/filer_test.go new file mode 100644 index 000000000..88ba152fc --- /dev/null +++ b/bundle/libraries/filer_test.go @@ -0,0 +1,63 @@ +package libraries + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/libs/filer" + sdkconfig "github.com/databricks/databricks-sdk-go/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestGetFilerForLibrariesValidWsfs(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/foo/bar/artifacts", + }, + }, + } + + client, uploadPath, diags := GetFilerForLibraries(context.Background(), b) + require.NoError(t, diags.Error()) + assert.Equal(t, "/foo/bar/artifacts/.internal", uploadPath) + + assert.IsType(t, &filer.WorkspaceFilesClient{}, client) +} + +func TestGetFilerForLibrariesValidUcVolume(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/main/my_schema/my_volume", + }, + }, + } + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil) + b.SetWorkpaceClient(m.WorkspaceClient) + + client, uploadPath, diags := GetFilerForLibraries(context.Background(), b) + require.NoError(t, diags.Error()) + assert.Equal(t, "/Volumes/main/my_schema/my_volume/.internal", uploadPath) + + assert.IsType(t, &filer.FilesClient{}, client) +} + +func TestGetFilerForLibrariesRemotePathNotSet(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{}, + }, + } + + _, _, diags := GetFilerForLibraries(context.Background(), b) + require.EqualError(t, diags.Error(), "remote artifact path not configured") +} diff --git a/bundle/libraries/filer_volume.go b/bundle/libraries/filer_volume.go new file mode 100644 index 000000000..aecf68db1 --- /dev/null +++ b/bundle/libraries/filer_volume.go @@ -0,0 +1,132 @@ +package libraries + +import ( + "context" + "errors" + "fmt" + "path" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/dynvar" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/databricks-sdk-go/apierr" +) + +func extractVolumeFromPath(artifactPath string) (string, string, string, error) { + if !IsVolumesPath(artifactPath) { + return "", "", "", fmt.Errorf("expected artifact_path to start with /Volumes/, got %s", artifactPath) + } + + parts := strings.Split(artifactPath, "/") + volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes////..., got %s", artifactPath) + + // Incorrect format. + if len(parts) < 5 { + return "", "", "", volumeFormatErr + } + + catalogName := parts[2] + schemaName := parts[3] + volumeName := parts[4] + + // Incorrect format. + if catalogName == "" || schemaName == "" || volumeName == "" { + return "", "", "", volumeFormatErr + } + + return catalogName, schemaName, volumeName, nil +} + +// This function returns a filer for ".internal" folder inside the directory configured +// at `workspace.artifact_path`. +// This function also checks if the UC volume exists in the workspace and then: +// 1. If the UC volume exists in the workspace: +// Returns a filer for the UC volume. +// 2. If the UC volume does not exist in the workspace but is (with high confidence) defined in +// the bundle configuration: +// Returns an error and a warning that instructs the user to deploy the +// UC volume before using it in the artifact path. +// 3. If the UC volume does not exist in the workspace and is not defined in the bundle configuration: +// Returns an error. +func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) { + artifactPath := b.Config.Workspace.ArtifactPath + w := b.WorkspaceClient() + + catalogName, schemaName, volumeName, err := extractVolumeFromPath(artifactPath) + if err != nil { + return nil, "", diag.Diagnostics{ + { + Severity: diag.Error, + Summary: err.Error(), + Locations: b.Config.GetLocations("workspace.artifact_path"), + Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, + }, + } + } + + // Check if the UC volume exists in the workspace. + volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName) + err = w.Files.GetDirectoryMetadataByDirectoryPath(ctx, volumePath) + + // If the volume exists already, directly return the filer for the path to + // upload the artifacts to. + if err == nil { + uploadPath := path.Join(artifactPath, InternalDirName) + f, err := filer.NewFilesClient(w, uploadPath) + return f, uploadPath, diag.FromErr(err) + } + + baseErr := diag.Diagnostic{ + Severity: diag.Error, + Summary: fmt.Sprintf("unable to determine if volume at %s exists: %s", volumePath, err), + Locations: b.Config.GetLocations("workspace.artifact_path"), + Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, + } + + if errors.Is(err, apierr.ErrNotFound) { + // Since the API returned a 404, the volume does not exist. + // Modify the error message to provide more context. + baseErr.Summary = fmt.Sprintf("volume %s does not exist: %s", volumePath, err) + + // If the volume is defined in the bundle, provide a more helpful error diagnostic, + // with more details and location information. + path, locations, ok := findVolumeInBundle(b, catalogName, schemaName, volumeName) + if !ok { + return nil, "", diag.Diagnostics{baseErr} + } + baseErr.Detail = `You are using a volume in your artifact_path that is managed by +this bundle but which has not been deployed yet. Please first deploy +the volume using 'bundle deploy' and then switch over to using it in +the artifact_path.` + baseErr.Paths = append(baseErr.Paths, path) + baseErr.Locations = append(baseErr.Locations, locations...) + } + + return nil, "", diag.Diagnostics{baseErr} +} + +func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) { + volumes := b.Config.Resources.Volumes + for k, v := range volumes { + if v.CatalogName != catalogName || v.Name != volumeName { + continue + } + // UC schemas can be defined in the bundle itself, and thus might be interpolated + // at runtime via the ${resources.schemas.} syntax. Thus we match the volume + // definition if the schema name is the same as the one in the bundle, or if the + // schema name is interpolated. + // We only have to check for ${resources.schemas...} references because any + // other valid reference (like ${var.foo}) would have been interpolated by this point. + p, ok := dynvar.PureReferenceToPath(v.SchemaName) + isSchemaDefinedInBundle := ok && p.HasPrefix(dyn.Path{dyn.Key("resources"), dyn.Key("schemas")}) + if v.SchemaName != schemaName && !isSchemaDefinedInBundle { + continue + } + pathString := fmt.Sprintf("resources.volumes.%s", k) + return dyn.MustPathFromString(pathString), b.Config.GetLocations(pathString), true + } + return nil, nil, false +} diff --git a/bundle/libraries/filer_volume_test.go b/bundle/libraries/filer_volume_test.go new file mode 100644 index 000000000..0d886824d --- /dev/null +++ b/bundle/libraries/filer_volume_test.go @@ -0,0 +1,275 @@ +package libraries + +import ( + "context" + "fmt" + "path" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/databricks-sdk-go/apierr" + sdkconfig "github.com/databricks/databricks-sdk-go/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestFindVolumeInBundle(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Volumes: map[string]*resources.Volume{ + "foo": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "main", + Name: "my_volume", + SchemaName: "my_schema", + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{ + { + File: "volume.yml", + Line: 1, + Column: 2, + }, + }) + + // volume is in DAB. + path, locations, ok := findVolumeInBundle(b, "main", "my_schema", "my_volume") + assert.True(t, ok) + assert.Equal(t, []dyn.Location{{ + File: "volume.yml", + Line: 1, + Column: 2, + }}, locations) + assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path) + + // wrong volume name + _, _, ok = findVolumeInBundle(b, "main", "my_schema", "doesnotexist") + assert.False(t, ok) + + // wrong schema name + _, _, ok = findVolumeInBundle(b, "main", "doesnotexist", "my_volume") + assert.False(t, ok) + + // wrong catalog name + _, _, ok = findVolumeInBundle(b, "doesnotexist", "my_schema", "my_volume") + assert.False(t, ok) + + // schema name is interpolated but does not have the right prefix. In this case + // we should not match the volume. + b.Config.Resources.Volumes["foo"].SchemaName = "${foo.bar.baz}" + _, _, ok = findVolumeInBundle(b, "main", "my_schema", "my_volume") + assert.False(t, ok) + + // schema name is interpolated. + b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema.name}" + path, locations, ok = findVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume") + assert.True(t, ok) + assert.Equal(t, []dyn.Location{{ + File: "volume.yml", + Line: 1, + Column: 2, + }}, locations) + assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path) +} + +func TestFilerForVolumeForErrorFromAPI(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/main/my_schema/my_volume", + }, + }, + } + + bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}}) + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(fmt.Errorf("error from API")) + b.SetWorkpaceClient(m.WorkspaceClient) + + _, _, diags := filerForVolume(context.Background(), b) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Error, + Summary: "unable to determine if volume at /Volumes/main/my_schema/my_volume exists: error from API", + Locations: []dyn.Location{{File: "config.yml", Line: 1, Column: 2}}, + Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, + }}, diags) +} + +func TestFilerForVolumeWithVolumeNotFound(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/main/my_schema/doesnotexist", + }, + }, + } + + bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}}) + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/doesnotexist").Return(apierr.NotFound("some error message")) + b.SetWorkpaceClient(m.WorkspaceClient) + + _, _, diags := filerForVolume(context.Background(), b) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Error, + Summary: "volume /Volumes/main/my_schema/doesnotexist does not exist: some error message", + Locations: []dyn.Location{{File: "config.yml", Line: 1, Column: 2}}, + Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, + }}, diags) +} + +func TestFilerForVolumeNotFoundAndInBundle(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/main/my_schema/my_volume", + }, + Resources: config.Resources{ + Volumes: map[string]*resources.Volume{ + "foo": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "main", + Name: "my_volume", + VolumeType: "MANAGED", + SchemaName: "my_schema", + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}}) + bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{{File: "volume.yml", Line: 1, Column: 2}}) + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(apierr.NotFound("error from API")) + b.SetWorkpaceClient(m.WorkspaceClient) + + _, _, diags := GetFilerForLibraries(context.Background(), b) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Error, + Summary: "volume /Volumes/main/my_schema/my_volume does not exist: error from API", + Locations: []dyn.Location{{"config.yml", 1, 2}, {"volume.yml", 1, 2}}, + Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path"), dyn.MustPathFromString("resources.volumes.foo")}, + Detail: `You are using a volume in your artifact_path that is managed by +this bundle but which has not been deployed yet. Please first deploy +the volume using 'bundle deploy' and then switch over to using it in +the artifact_path.`, + }, + }, diags) +} + +func invalidVolumePaths() []string { + return []string{ + "/Volumes/", + "/Volumes/main", + "/Volumes/main/", + "/Volumes/main//", + "/Volumes/main//my_schema", + "/Volumes/main/my_schema", + "/Volumes/main/my_schema/", + "/Volumes/main/my_schema//", + "/Volumes//my_schema/my_volume", + } +} + +func TestFilerForVolumeWithInvalidVolumePaths(t *testing.T) { + for _, p := range invalidVolumePaths() { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: p, + }, + }, + } + + bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}}) + + _, _, diags := GetFilerForLibraries(context.Background(), b) + require.Equal(t, diags, diag.Diagnostics{{ + Severity: diag.Error, + Summary: fmt.Sprintf("expected UC volume path to be in the format /Volumes////..., got %s", p), + Locations: []dyn.Location{{File: "config.yml", Line: 1, Column: 2}}, + Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, + }}) + } +} + +func TestFilerForVolumeWithInvalidPrefix(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volume/main/my_schema/my_volume", + }, + }, + } + + _, _, diags := filerForVolume(context.Background(), b) + require.EqualError(t, diags.Error(), "expected artifact_path to start with /Volumes/, got /Volume/main/my_schema/my_volume") +} + +func TestFilerForVolumeWithValidVolumePaths(t *testing.T) { + validPaths := []string{ + "/Volumes/main/my_schema/my_volume", + "/Volumes/main/my_schema/my_volume/", + "/Volumes/main/my_schema/my_volume/a/b/c", + "/Volumes/main/my_schema/my_volume/a/a/a", + } + + for _, p := range validPaths { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: p, + }, + }, + } + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil) + b.SetWorkpaceClient(m.WorkspaceClient) + + client, uploadPath, diags := filerForVolume(context.Background(), b) + require.NoError(t, diags.Error()) + assert.Equal(t, path.Join(p, ".internal"), uploadPath) + assert.IsType(t, &filer.FilesClient{}, client) + } +} + +func TestExtractVolumeFromPath(t *testing.T) { + catalogName, schemaName, volumeName, err := extractVolumeFromPath("/Volumes/main/my_schema/my_volume") + require.NoError(t, err) + assert.Equal(t, "main", catalogName) + assert.Equal(t, "my_schema", schemaName) + assert.Equal(t, "my_volume", volumeName) + + for _, p := range invalidVolumePaths() { + _, _, _, err := extractVolumeFromPath(p) + assert.EqualError(t, err, fmt.Sprintf("expected UC volume path to be in the format /Volumes////..., got %s", p)) + } +} diff --git a/bundle/libraries/filer_workspace.go b/bundle/libraries/filer_workspace.go new file mode 100644 index 000000000..8d54519ff --- /dev/null +++ b/bundle/libraries/filer_workspace.go @@ -0,0 +1,15 @@ +package libraries + +import ( + "path" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/filer" +) + +func filerForWorkspace(b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) { + uploadPath := path.Join(b.Config.Workspace.ArtifactPath, InternalDirName) + f, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), uploadPath) + return f, uploadPath, diag.FromErr(err) +} diff --git a/bundle/libraries/filer_workspace_test.go b/bundle/libraries/filer_workspace_test.go new file mode 100644 index 000000000..f761592f9 --- /dev/null +++ b/bundle/libraries/filer_workspace_test.go @@ -0,0 +1,27 @@ +package libraries + +import ( + "path" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/libs/filer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFilerForWorkspace(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Workspace/Users/shreyas.goenka@databricks.com/a/b/c", + }, + }, + } + + client, uploadPath, diags := filerForWorkspace(b) + require.NoError(t, diags.Error()) + assert.Equal(t, path.Join("/Workspace/Users/shreyas.goenka@databricks.com/a/b/c/.internal"), uploadPath) + assert.IsType(t, &filer.WorkspaceFilesClient{}, client) +} diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go index 90a1a21fc..4b6f43701 100644 --- a/bundle/libraries/upload.go +++ b/bundle/libraries/upload.go @@ -16,8 +16,6 @@ import ( "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" - "github.com/databricks/databricks-sdk-go" - "golang.org/x/sync/errgroup" ) @@ -130,24 +128,17 @@ func collectLocalLibraries(b *bundle.Bundle) (map[string][]configLocation, error } func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - uploadPath, err := GetUploadBasePath(b) - if err != nil { - return diag.FromErr(err) + client, uploadPath, diags := GetFilerForLibraries(ctx, b) + if diags.HasError() { + return diags } - // If the client is not initialized, initialize it - // We use client field in mutator to allow for mocking client in testing + // Only set the filer client if it's not already set. We use the client field + // in the mutator to mock the filer client in testing if u.client == nil { - filer, err := GetFilerForLibraries(b.WorkspaceClient(), uploadPath) - if err != nil { - return diag.FromErr(err) - } - - u.client = filer + u.client = client } - var diags diag.Diagnostics - libs, err := collectLocalLibraries(b) if err != nil { return diag.FromErr(err) @@ -197,17 +188,6 @@ func (u *upload) Name() string { return "libraries.Upload" } -func GetFilerForLibraries(w *databricks.WorkspaceClient, uploadPath string) (filer.Filer, error) { - if isVolumesPath(uploadPath) { - return filer.NewFilesClient(w, uploadPath) - } - return filer.NewWorkspaceFilesClient(w, uploadPath) -} - -func isVolumesPath(path string) bool { - return strings.HasPrefix(path, "/Volumes/") -} - // Function to upload file (a library, artifact and etc) to Workspace or UC volume func UploadFile(ctx context.Context, file string, client filer.Filer) error { filename := filepath.Base(file) @@ -227,12 +207,3 @@ func UploadFile(ctx context.Context, file string, client filer.Filer) error { log.Infof(ctx, "Upload succeeded") return nil } - -func GetUploadBasePath(b *bundle.Bundle) (string, error) { - artifactPath := b.Config.Workspace.ArtifactPath - if artifactPath == "" { - return "", fmt.Errorf("remote artifact path not configured") - } - - return path.Join(artifactPath, ".internal"), nil -} diff --git a/bundle/libraries/upload_test.go b/bundle/libraries/upload_test.go index 44b194c56..493785bf5 100644 --- a/bundle/libraries/upload_test.go +++ b/bundle/libraries/upload_test.go @@ -11,6 +11,8 @@ import ( mockfiler "github.com/databricks/cli/internal/mocks/libs/filer" "github.com/databricks/cli/internal/testutil" "github.com/databricks/cli/libs/filer" + sdkconfig "github.com/databricks/databricks-sdk-go/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/stretchr/testify/mock" @@ -181,6 +183,11 @@ func TestArtifactUploadForVolumes(t *testing.T) { filer.CreateParentDirectories, ).Return(nil) + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/foo/bar/artifacts").Return(nil) + b.SetWorkpaceClient(m.WorkspaceClient) + diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler))) require.NoError(t, diags.Error()) diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index e623c364f..2dc9623bd 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -23,10 +23,10 @@ import ( tfjson "github.com/hashicorp/terraform-json" ) -func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action { +func filterDeleteOrRecreateActions(changes []*tfjson.ResourceChange, resourceType string) []terraformlib.Action { res := make([]terraformlib.Action, 0) for _, rc := range changes { - if !toInclude(rc.Type, rc.Change.Actions) { + if rc.Type != resourceType { continue } @@ -37,7 +37,7 @@ func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ case rc.Change.Actions.Replace(): actionType = terraformlib.ActionTypeRecreate default: - // No use case for other action types yet. + // Filter other action types.. continue } @@ -63,30 +63,12 @@ func approvalForDeploy(ctx context.Context, b *bundle.Bundle) (bool, error) { return false, err } - schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool { - // Filter in only UC schema resources. - if typ != "databricks_schema" { - return false - } - - // We only display prompts for destructive actions like deleting or - // recreating a schema. - return actions.Delete() || actions.Replace() - }) - - dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool { - // Filter in only DLT pipeline resources. - if typ != "databricks_pipeline" { - return false - } - - // Recreating DLT pipeline leads to metadata loss and for a transient period - // the underling tables will be unavailable. - return actions.Replace() || actions.Delete() - }) + schemaActions := filterDeleteOrRecreateActions(plan.ResourceChanges, "databricks_schema") + dltActions := filterDeleteOrRecreateActions(plan.ResourceChanges, "databricks_pipeline") + volumeActions := filterDeleteOrRecreateActions(plan.ResourceChanges, "databricks_volume") // We don't need to display any prompts in this case. - if len(dltActions) == 0 && len(schemaActions) == 0 { + if len(schemaActions) == 0 && len(dltActions) == 0 && len(volumeActions) == 0 { return true, nil } @@ -111,6 +93,19 @@ properties such as the 'catalog' or 'storage' are changed:` } } + // One or more volumes is being recreated. + if len(volumeActions) != 0 { + msg := ` +This action will result in the deletion or recreation of the following volumes. +For managed volumes, the files stored in the volume are also deleted from your +cloud tenant within 30 days. For external volumes, the metadata about the volume +is removed from the catalog, but the underlying files are not deleted:` + cmdio.LogString(ctx, msg) + for _, action := range volumeActions { + cmdio.Log(ctx, action) + } + } + if b.AutoApprove { return true, nil } diff --git a/bundle/phases/deploy_test.go b/bundle/phases/deploy_test.go index e00370b38..2cef95404 100644 --- a/bundle/phases/deploy_test.go +++ b/bundle/phases/deploy_test.go @@ -40,17 +40,7 @@ func TestParseTerraformActions(t *testing.T) { }, } - res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool { - if typ != "databricks_pipeline" { - return false - } - - if actions.Delete() || actions.Replace() { - return true - } - - return false - }) + res := filterDeleteOrRecreateActions(changes, "databricks_pipeline") assert.Equal(t, []terraformlib.Action{ { diff --git a/internal/bundle/artifacts_test.go b/internal/bundle/artifacts_test.go index 775327e18..34d101e4f 100644 --- a/internal/bundle/artifacts_test.go +++ b/internal/bundle/artifacts_test.go @@ -1,6 +1,7 @@ package bundle import ( + "fmt" "os" "path" "path/filepath" @@ -13,8 +14,11 @@ import ( "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/internal" "github.com/databricks/cli/internal/acc" + "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -225,3 +229,84 @@ func TestAccUploadArtifactFileToCorrectRemotePathForVolumes(t *testing.T) { b.Config.Resources.Jobs["test"].JobSettings.Tasks[0].Libraries[0].Whl, ) } + +func TestAccUploadArtifactFileToVolumeThatDoesNotExist(t *testing.T) { + ctx, wt := acc.UcWorkspaceTest(t) + w := wt.W + + schemaName := internal.RandomName("schema-") + + _, err := w.Schemas.Create(ctx, catalog.CreateSchema{ + CatalogName: "main", + Comment: "test schema", + Name: schemaName, + }) + require.NoError(t, err) + + t.Cleanup(func() { + err = w.Schemas.DeleteByFullName(ctx, "main."+schemaName) + require.NoError(t, err) + }) + + bundleRoot, err := initTestTemplate(t, ctx, "artifact_path_with_volume", map[string]any{ + "unique_id": uuid.New().String(), + "schema_name": schemaName, + "volume_name": "doesnotexist", + }) + require.NoError(t, err) + + t.Setenv("BUNDLE_ROOT", bundleRoot) + stdout, stderr, err := internal.RequireErrorRun(t, "bundle", "deploy") + + assert.Error(t, err) + assert.Equal(t, fmt.Sprintf(`Error: volume /Volumes/main/%s/doesnotexist does not exist: Not Found + at workspace.artifact_path + in databricks.yml:6:18 + +`, schemaName), stdout.String()) + assert.Equal(t, "", stderr.String()) +} + +func TestAccUploadArtifactToVolumeNotYetDeployed(t *testing.T) { + ctx, wt := acc.UcWorkspaceTest(t) + w := wt.W + + schemaName := internal.RandomName("schema-") + + _, err := w.Schemas.Create(ctx, catalog.CreateSchema{ + CatalogName: "main", + Comment: "test schema", + Name: schemaName, + }) + require.NoError(t, err) + + t.Cleanup(func() { + err = w.Schemas.DeleteByFullName(ctx, "main."+schemaName) + require.NoError(t, err) + }) + + bundleRoot, err := initTestTemplate(t, ctx, "artifact_path_with_volume", map[string]any{ + "unique_id": uuid.New().String(), + "schema_name": schemaName, + "volume_name": "my_volume", + }) + require.NoError(t, err) + + t.Setenv("BUNDLE_ROOT", bundleRoot) + stdout, stderr, err := internal.RequireErrorRun(t, "bundle", "deploy") + + assert.Error(t, err) + assert.Equal(t, fmt.Sprintf(`Error: volume /Volumes/main/%s/my_volume does not exist: Not Found + at workspace.artifact_path + resources.volumes.foo + in databricks.yml:6:18 + databricks.yml:11:7 + +You are using a volume in your artifact_path that is managed by +this bundle but which has not been deployed yet. Please first deploy +the volume using 'bundle deploy' and then switch over to using it in +the artifact_path. + +`, schemaName), stdout.String()) + assert.Equal(t, "", stderr.String()) +} diff --git a/internal/bundle/bundles/artifact_path_with_volume/databricks_template_schema.json b/internal/bundle/bundles/artifact_path_with_volume/databricks_template_schema.json new file mode 100644 index 000000000..7dd81ef2a --- /dev/null +++ b/internal/bundle/bundles/artifact_path_with_volume/databricks_template_schema.json @@ -0,0 +1,16 @@ +{ + "properties": { + "unique_id": { + "type": "string", + "description": "Unique ID for job name" + }, + "schema_name": { + "type": "string", + "description": "schema name to use in the artifact_path" + }, + "volume_name": { + "type": "string", + "description": "volume name to use in the artifact_path" + } + } +} diff --git a/internal/bundle/bundles/artifact_path_with_volume/template/databricks.yml.tmpl b/internal/bundle/bundles/artifact_path_with_volume/template/databricks.yml.tmpl new file mode 100644 index 000000000..2d87620e3 --- /dev/null +++ b/internal/bundle/bundles/artifact_path_with_volume/template/databricks.yml.tmpl @@ -0,0 +1,14 @@ +bundle: + name: artifact_path_with_volume + +workspace: + root_path: "~/.bundle/{{.unique_id}}" + artifact_path: /Volumes/main/{{.schema_name}}/{{.volume_name}} + +resources: + volumes: + foo: + catalog_name: main + name: my_volume + schema_name: {{.schema_name}} + volume_type: MANAGED diff --git a/internal/bundle/bundles/volume/databricks_template_schema.json b/internal/bundle/bundles/volume/databricks_template_schema.json new file mode 100644 index 000000000..d849dacf8 --- /dev/null +++ b/internal/bundle/bundles/volume/databricks_template_schema.json @@ -0,0 +1,8 @@ +{ + "properties": { + "unique_id": { + "type": "string", + "description": "Unique ID for the schema names" + } + } +} diff --git a/internal/bundle/bundles/volume/template/databricks.yml.tmpl b/internal/bundle/bundles/volume/template/databricks.yml.tmpl new file mode 100644 index 000000000..d7f31439b --- /dev/null +++ b/internal/bundle/bundles/volume/template/databricks.yml.tmpl @@ -0,0 +1,31 @@ +bundle: + name: test-uc-volumes-{{.unique_id}} + +variables: + schema_name: + default: ${resources.schemas.schema1.name} + +resources: + schemas: + schema1: + name: schema1-{{.unique_id}} + catalog_name: main + comment: This schema was created from DABs + + schema2: + name: schema2-{{.unique_id}} + catalog_name: main + comment: This schema was created from DABs + + volumes: + foo: + catalog_name: main + name: my_volume + schema_name: ${var.schema_name} + volume_type: MANAGED + comment: This volume was created from DABs. + + grants: + - principal: account users + privileges: + - WRITE_VOLUME diff --git a/internal/bundle/bundles/volume/template/nb.sql b/internal/bundle/bundles/volume/template/nb.sql new file mode 100644 index 000000000..199ff5078 --- /dev/null +++ b/internal/bundle/bundles/volume/template/nb.sql @@ -0,0 +1,2 @@ +-- Databricks notebook source +select 1 diff --git a/internal/bundle/deploy_test.go b/internal/bundle/deploy_test.go index 885435855..759e85de5 100644 --- a/internal/bundle/deploy_test.go +++ b/internal/bundle/deploy_test.go @@ -243,3 +243,73 @@ func TestAccDeployBasicBundleLogs(t *testing.T) { }, "\n"), stderr) assert.Equal(t, "", stdout) } + +func TestAccDeployUcVolume(t *testing.T) { + ctx, wt := acc.UcWorkspaceTest(t) + w := wt.W + + uniqueId := uuid.New().String() + bundleRoot, err := initTestTemplate(t, ctx, "volume", map[string]any{ + "unique_id": uniqueId, + }) + require.NoError(t, err) + + err = deployBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + t.Cleanup(func() { + destroyBundle(t, ctx, bundleRoot) + }) + + // Assert the volume is created successfully + catalogName := "main" + schemaName := "schema1-" + uniqueId + volumeName := "my_volume" + fullName := fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName) + volume, err := w.Volumes.ReadByName(ctx, fullName) + require.NoError(t, err) + require.Equal(t, volume.Name, volumeName) + require.Equal(t, catalogName, volume.CatalogName) + require.Equal(t, schemaName, volume.SchemaName) + + // Assert that the grants were successfully applied. + grants, err := w.Grants.GetBySecurableTypeAndFullName(ctx, catalog.SecurableTypeVolume, fullName) + require.NoError(t, err) + assert.Len(t, grants.PrivilegeAssignments, 1) + assert.Equal(t, "account users", grants.PrivilegeAssignments[0].Principal) + assert.Equal(t, []catalog.Privilege{catalog.PrivilegeWriteVolume}, grants.PrivilegeAssignments[0].Privileges) + + // Recreation of the volume without --auto-approve should fail since prompting is not possible + t.Setenv("TERM", "dumb") + t.Setenv("BUNDLE_ROOT", bundleRoot) + stdout, stderr, err := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--var=schema_name=${resources.schemas.schema2.name}").Run() + assert.Error(t, err) + assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following volumes. +For managed volumes, the files stored in the volume are also deleted from your +cloud tenant within 30 days. For external volumes, the metadata about the volume +is removed from the catalog, but the underlying files are not deleted: + recreate volume foo`) + assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed") + + // Successfully recreate the volume with --auto-approve + t.Setenv("TERM", "dumb") + t.Setenv("BUNDLE_ROOT", bundleRoot) + _, _, err = internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--var=schema_name=${resources.schemas.schema2.name}", "--auto-approve").Run() + assert.NoError(t, err) + + // Assert the volume is updated successfully + schemaName = "schema2-" + uniqueId + fullName = fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName) + volume, err = w.Volumes.ReadByName(ctx, fullName) + require.NoError(t, err) + require.Equal(t, volume.Name, volumeName) + require.Equal(t, catalogName, volume.CatalogName) + require.Equal(t, schemaName, volume.SchemaName) + + // assert that the grants were applied / retained on recreate. + grants, err = w.Grants.GetBySecurableTypeAndFullName(ctx, catalog.SecurableTypeVolume, fullName) + require.NoError(t, err) + assert.Len(t, grants.PrivilegeAssignments, 1) + assert.Equal(t, "account users", grants.PrivilegeAssignments[0].Principal) + assert.Equal(t, []catalog.Privilege{catalog.PrivilegeWriteVolume}, grants.PrivilegeAssignments[0].Privileges) +} diff --git a/libs/dyn/dynvar/ref.go b/libs/dyn/dynvar/ref.go index 338ac8ce6..a28938823 100644 --- a/libs/dyn/dynvar/ref.go +++ b/libs/dyn/dynvar/ref.go @@ -71,3 +71,23 @@ func (v ref) references() []string { func IsPureVariableReference(s string) bool { return len(s) > 0 && re.FindString(s) == s } + +// If s is a pure variable reference, this function returns the corresponding +// dyn.Path. Otherwise, it returns false. +func PureReferenceToPath(s string) (dyn.Path, bool) { + ref, ok := newRef(dyn.V(s)) + if !ok { + return nil, false + } + + if !ref.isPure() { + return nil, false + } + + p, err := dyn.NewPathFromString(ref.references()[0]) + if err != nil { + return nil, false + } + + return p, true +} diff --git a/libs/dyn/dynvar/ref_test.go b/libs/dyn/dynvar/ref_test.go index aff3643e0..4110732f8 100644 --- a/libs/dyn/dynvar/ref_test.go +++ b/libs/dyn/dynvar/ref_test.go @@ -51,3 +51,34 @@ func TestIsPureVariableReference(t *testing.T) { assert.False(t, IsPureVariableReference("prefix ${foo.bar}")) assert.True(t, IsPureVariableReference("${foo.bar}")) } + +func TestPureReferenceToPath(t *testing.T) { + for _, tc := range []struct { + in string + out string + ok bool + }{ + {"${foo.bar}", "foo.bar", true}, + {"${foo.bar.baz}", "foo.bar.baz", true}, + {"${foo.bar.baz[0]}", "foo.bar.baz[0]", true}, + {"${foo.bar.baz[0][1]}", "foo.bar.baz[0][1]", true}, + {"${foo.bar.baz[0][1].qux}", "foo.bar.baz[0][1].qux", true}, + + {"${foo.one}${foo.two}", "", false}, + {"prefix ${foo.bar}", "", false}, + {"${foo.bar} suffix", "", false}, + {"${foo.bar", "", false}, + {"foo.bar}", "", false}, + {"foo.bar", "", false}, + {"{foo.bar}", "", false}, + {"", "", false}, + } { + path, ok := PureReferenceToPath(tc.in) + if tc.ok { + assert.True(t, ok) + assert.Equal(t, dyn.MustPathFromString(tc.out), path) + } else { + assert.False(t, ok) + } + } +} diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 4bb03aea5..9e0a7ce50 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -114,7 +114,7 @@ type apiClient interface { // NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled. // It can access any workspace path if files-in-workspace is enabled. -type workspaceFilesClient struct { +type WorkspaceFilesClient struct { workspaceClient *databricks.WorkspaceClient apiClient apiClient @@ -128,7 +128,7 @@ func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, return nil, err } - return &workspaceFilesClient{ + return &WorkspaceFilesClient{ workspaceClient: w, apiClient: apiClient, @@ -136,7 +136,7 @@ func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, }, nil } -func (w *workspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { +func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) if err != nil { return err @@ -214,7 +214,7 @@ func (w *workspaceFilesClient) Write(ctx context.Context, name string, reader io return err } -func (w *workspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { +func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err @@ -238,7 +238,7 @@ func (w *workspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCl return w.workspaceClient.Workspace.Download(ctx, absPath) } -func (w *workspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { +func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { absPath, err := w.root.Join(name) if err != nil { return err @@ -282,7 +282,7 @@ func (w *workspaceFilesClient) Delete(ctx context.Context, name string, mode ... return err } -func (w *workspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { +func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err @@ -315,7 +315,7 @@ func (w *workspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.D return wsfsDirEntriesFromObjectInfos(objects), nil } -func (w *workspaceFilesClient) Mkdir(ctx context.Context, name string) error { +func (w *WorkspaceFilesClient) Mkdir(ctx context.Context, name string) error { dirPath, err := w.root.Join(name) if err != nil { return err @@ -325,7 +325,7 @@ func (w *workspaceFilesClient) Mkdir(ctx context.Context, name string) error { }) } -func (w *workspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { +func (w *WorkspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err diff --git a/libs/filer/workspace_files_extensions_client_test.go b/libs/filer/workspace_files_extensions_client_test.go index 974a6a37b..10c176b31 100644 --- a/libs/filer/workspace_files_extensions_client_test.go +++ b/libs/filer/workspace_files_extensions_client_test.go @@ -174,7 +174,7 @@ func TestFilerWorkspaceFilesExtensionsErrorsOnDupName(t *testing.T) { "return_export_info": "true", }, mock.AnythingOfType("*filer.wsfsFileInfo"), []func(*http.Request) error(nil)).Return(nil, statNotebook) - workspaceFilesClient := workspaceFilesClient{ + workspaceFilesClient := WorkspaceFilesClient{ workspaceClient: mockedWorkspaceClient.WorkspaceClient, apiClient: &mockedApiClient, root: NewWorkspaceRootPath("/dir"),