diff --git a/bundle/artifacts/expand_globs_test.go b/bundle/artifacts/expand_globs_test.go index dc7c77de..af048f8e 100644 --- a/bundle/artifacts/expand_globs_test.go +++ b/bundle/artifacts/expand_globs_test.go @@ -7,8 +7,8 @@ import ( "testing" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/bundletest" "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/internal/bundletest" "github.com/databricks/cli/internal/testutil" "github.com/databricks/cli/libs/dyn" "github.com/stretchr/testify/assert" diff --git a/bundle/artifacts/upload.go b/bundle/artifacts/upload.go index 58c006dc..c69939e8 100644 --- a/bundle/artifacts/upload.go +++ b/bundle/artifacts/upload.go @@ -21,18 +21,13 @@ func (m *cleanUp) Name() string { } func (m *cleanUp) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - uploadPath, err := libraries.GetUploadBasePath(b) - if err != nil { - return diag.FromErr(err) - } - - client, err := libraries.GetFilerForLibraries(b.WorkspaceClient(), uploadPath) - if err != nil { - return diag.FromErr(err) + client, uploadPath, diags := libraries.GetFilerForLibraries(ctx, b) + if diags.HasError() { + return diags } // We intentionally ignore the error because it is not critical to the deployment - err = client.Delete(ctx, ".", filer.DeleteRecursively) + err := client.Delete(ctx, ".", filer.DeleteRecursively) if err != nil { log.Errorf(ctx, "failed to delete %s: %v", uploadPath, err) } diff --git a/bundle/internal/bundletest/location.go b/bundle/bundletest/location.go similarity index 100% rename from bundle/internal/bundletest/location.go rename to bundle/bundletest/location.go diff --git a/bundle/internal/bundletest/mutate.go b/bundle/bundletest/mutate.go similarity index 100% rename from bundle/internal/bundletest/mutate.go rename to bundle/bundletest/mutate.go diff --git a/bundle/config/mutator/apply_presets.go b/bundle/config/mutator/apply_presets.go index 59b8547b..45ee79ef 100644 --- a/bundle/config/mutator/apply_presets.go +++ b/bundle/config/mutator/apply_presets.go @@ -11,6 +11,8 @@ import ( "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/dynvar" + "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/textutil" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -193,6 +195,30 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos // the Databricks UI and via the SQL API. } + // Volumes: Prefix + for _, v := range r.Volumes { + if containsUserIdentity(v.CatalogName, b.Config.Workspace.CurrentUser) { + log.Debugf(ctx, "Skipping prefix for volume %s because catalog %s contains the current user's identity", v.Name, v.CatalogName) + continue + } + if containsUserIdentity(v.SchemaName, b.Config.Workspace.CurrentUser) { + log.Debugf(ctx, "Skipping prefix for volume %s because schema %s contains the current user's identity", v.Name, v.SchemaName) + continue + } + // We only have to check for ${resources.schemas...} references because any + // other valid reference (like ${var.foo}) would have been interpolated by this point. + if p, ok := dynvar.PureReferenceToPath(v.SchemaName); ok && p.HasPrefix(dyn.Path{dyn.Key("resources"), dyn.Key("schemas")}) { + log.Debugf(ctx, "Skipping prefix for volume %s because schema %s is defined in the bundle and the schema name will be interpolated at runtime", v.Name, v.SchemaName) + continue + + } + + v.Name = normalizePrefix(prefix) + v.Name + + // HTTP API for volumes doesn't yet support tags. It's only supported in + // the Databricks UI and via the SQL API. + } + // Clusters: Prefix, Tags for key, c := range r.Clusters { if c.ClusterSpec == nil { diff --git a/bundle/config/mutator/apply_presets_test.go b/bundle/config/mutator/apply_presets_test.go index 24295da4..50c5618b 100644 --- a/bundle/config/mutator/apply_presets_test.go +++ b/bundle/config/mutator/apply_presets_test.go @@ -9,7 +9,9 @@ import ( "github.com/databricks/cli/bundle/config/mutator" "github.com/databricks/cli/bundle/config/resources" "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -125,6 +127,147 @@ func TestApplyPresetsPrefixForUcSchema(t *testing.T) { } } +func TestApplyPresentsReminderToAddSupportForSkippingPrefixes(t *testing.T) { + _, ok := config.SupportedResources()["catalogs"] + assert.False(t, ok, + `Since you are adding support for UC catalogs to DABs please ensure that +you add logic to skip applying presets.name_prefix for UC schemas, UC volumes and +any other resources that fall under a catalog in the UC hierarchy (like registered models). +Once you do so feel free to remove this test.`) +} + +func TestApplyPresetsPrefixForUcVolumes(t *testing.T) { + tests := []struct { + name string + prefix string + volume *resources.Volume + want string + }{ + { + name: "add prefix to volume", + prefix: "[prefix]", + volume: &resources.Volume{ + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "volume1", + CatalogName: "catalog1", + SchemaName: "schema1", + }, + }, + want: "prefix_volume1", + }, + { + name: "add empty prefix to volume", + prefix: "", + volume: &resources.Volume{ + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "volume1", + }, + }, + want: "volume1", + }, + { + name: "skip prefix when catalog name contains user short name", + prefix: "[prefix]", + volume: &resources.Volume{ + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "volume1", + CatalogName: "dev_john_wick_targets", + }, + }, + want: "volume1", + }, + { + name: "skip prefix when catalog name contains user email", + prefix: "[prefix]", + volume: &resources.Volume{ + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "volume1", + CatalogName: "dev_john.wick@continental.com_targets", + }, + }, + want: "volume1", + }, + { + name: "skip prefix when schema name contains user short name", + prefix: "[prefix]", + volume: &resources.Volume{ + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "volume1", + SchemaName: "dev_john_wick_weapons", + }, + }, + want: "volume1", + }, + { + name: "skip prefix when schema name contains user email", + prefix: "[prefix]", + volume: &resources.Volume{ + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "volume1", + SchemaName: "dev_john.wick@continental.com_targets", + }, + }, + want: "volume1", + }, + { + name: "skip prefix when schema is defined in the bundle and will be interpolated at runtime", + prefix: "[prefix]", + volume: &resources.Volume{ + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "volume1", + SchemaName: "${resources.schemas.schema1.name}", + }, + }, + want: "volume1", + }, + { + name: "add prefix when schema is a reference without the resources.schemas prefix", + prefix: "[prefix]", + volume: &resources.Volume{ + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "volume1", + SchemaName: "${foo.bar.baz}", + }, + }, + want: "prefix_volume1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Volumes: map[string]*resources.Volume{ + "volume1": tt.volume, + }, + }, + Presets: config.Presets{ + NamePrefix: tt.prefix, + }, + Workspace: config.Workspace{ + CurrentUser: &config.User{ + ShortName: "john_wick", + User: &iam.User{ + UserName: "john.wick@continental.com", + }, + }, + }, + }, + } + + ctx := context.Background() + diag := bundle.Apply(ctx, b, mutator.ApplyPresets()) + + if diag.HasError() { + t.Fatalf("unexpected error: %v", diag) + } + + require.Equal(t, tt.want, b.Config.Resources.Volumes["volume1"].Name) + }) + } +} + func TestApplyPresetsTags(t *testing.T) { tests := []struct { name string diff --git a/bundle/config/mutator/configure_dashboard_defaults_test.go b/bundle/config/mutator/configure_dashboard_defaults_test.go index 2234f9a7..a5248219 100644 --- a/bundle/config/mutator/configure_dashboard_defaults_test.go +++ b/bundle/config/mutator/configure_dashboard_defaults_test.go @@ -5,10 +5,10 @@ import ( "testing" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/bundletest" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/mutator" "github.com/databricks/cli/bundle/config/resources" - "github.com/databricks/cli/bundle/internal/bundletest" "github.com/databricks/cli/libs/dyn" "github.com/databricks/databricks-sdk-go/service/dashboards" "github.com/stretchr/testify/assert" diff --git a/bundle/config/mutator/expand_pipeline_glob_paths_test.go b/bundle/config/mutator/expand_pipeline_glob_paths_test.go index 9f70b74a..063c76b3 100644 --- a/bundle/config/mutator/expand_pipeline_glob_paths_test.go +++ b/bundle/config/mutator/expand_pipeline_glob_paths_test.go @@ -7,9 +7,9 @@ import ( "testing" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/bundletest" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/resources" - "github.com/databricks/cli/bundle/internal/bundletest" "github.com/databricks/cli/libs/dyn" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/pipelines" diff --git a/bundle/config/mutator/process_target_mode.go b/bundle/config/mutator/process_target_mode.go index 44b53681..870afe9d 100644 --- a/bundle/config/mutator/process_target_mode.go +++ b/bundle/config/mutator/process_target_mode.go @@ -63,6 +63,10 @@ func transformDevelopmentMode(ctx context.Context, b *bundle.Bundle) { } } +func containsUserIdentity(s string, u *config.User) bool { + return strings.Contains(s, u.ShortName) || strings.Contains(s, u.UserName) +} + func validateDevelopmentMode(b *bundle.Bundle) diag.Diagnostics { var diags diag.Diagnostics p := b.Config.Presets @@ -92,7 +96,7 @@ func validateDevelopmentMode(b *bundle.Bundle) diag.Diagnostics { diags = diags.Extend(diag.Errorf("%s must start with '~/' or contain the current username to ensure uniqueness when using 'mode: development'", path)) } } - if p.NamePrefix != "" && !strings.Contains(p.NamePrefix, u.ShortName) && !strings.Contains(p.NamePrefix, u.UserName) { + if p.NamePrefix != "" && !containsUserIdentity(p.NamePrefix, u) { // Resources such as pipelines require a unique name, e.g. '[dev steve] my_pipeline'. // For this reason we require the name prefix to contain the current username; // it's a pitfall for users if they don't include it and later find out that diff --git a/bundle/config/mutator/process_target_mode_test.go b/bundle/config/mutator/process_target_mode_test.go index 4135d5fd..b87c72be 100644 --- a/bundle/config/mutator/process_target_mode_test.go +++ b/bundle/config/mutator/process_target_mode_test.go @@ -128,6 +128,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle { Schemas: map[string]*resources.Schema{ "schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}}, }, + Volumes: map[string]*resources.Volume{ + "volume1": {CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{Name: "volume1"}}, + }, Clusters: map[string]*resources.Cluster{ "cluster1": {ClusterSpec: &compute.ClusterSpec{ClusterName: "cluster1", SparkVersion: "13.2.x", NumWorkers: 1}}, }, @@ -307,6 +310,8 @@ func TestProcessTargetModeDefault(t *testing.T) { assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) + assert.Equal(t, "schema1", b.Config.Resources.Schemas["schema1"].Name) + assert.Equal(t, "volume1", b.Config.Resources.Volumes["volume1"].Name) assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName) } @@ -351,6 +356,8 @@ func TestProcessTargetModeProduction(t *testing.T) { assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) + assert.Equal(t, "schema1", b.Config.Resources.Schemas["schema1"].Name) + assert.Equal(t, "volume1", b.Config.Resources.Volumes["volume1"].Name) assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName) } diff --git a/bundle/config/mutator/rewrite_sync_paths_test.go b/bundle/config/mutator/rewrite_sync_paths_test.go index 0e4dfc4c..90da8792 100644 --- a/bundle/config/mutator/rewrite_sync_paths_test.go +++ b/bundle/config/mutator/rewrite_sync_paths_test.go @@ -6,9 +6,9 @@ import ( "testing" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/bundletest" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/mutator" - "github.com/databricks/cli/bundle/internal/bundletest" "github.com/databricks/cli/libs/dyn" "github.com/stretchr/testify/assert" ) diff --git a/bundle/config/mutator/run_as_test.go b/bundle/config/mutator/run_as_test.go index acb6c3a4..dbf4bf80 100644 --- a/bundle/config/mutator/run_as_test.go +++ b/bundle/config/mutator/run_as_test.go @@ -42,6 +42,7 @@ func allResourceTypes(t *testing.T) []string { "quality_monitors", "registered_models", "schemas", + "volumes", }, resourceTypes, ) @@ -141,6 +142,7 @@ func TestRunAsErrorForUnsupportedResources(t *testing.T) { "registered_models", "experiments", "schemas", + "volumes", } base := config.Root{ diff --git a/bundle/config/mutator/sync_infer_root_test.go b/bundle/config/mutator/sync_infer_root_test.go index f507cbc7..98c9d362 100644 --- a/bundle/config/mutator/sync_infer_root_test.go +++ b/bundle/config/mutator/sync_infer_root_test.go @@ -6,9 +6,9 @@ import ( "testing" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/bundletest" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/mutator" - "github.com/databricks/cli/bundle/internal/bundletest" "github.com/databricks/cli/libs/dyn" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/bundle/config/mutator/translate_paths_test.go b/bundle/config/mutator/translate_paths_test.go index 9d655b27..04325b68 100644 --- a/bundle/config/mutator/translate_paths_test.go +++ b/bundle/config/mutator/translate_paths_test.go @@ -8,11 +8,11 @@ import ( "testing" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/bundletest" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/mutator" "github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/bundle/config/variable" - "github.com/databricks/cli/bundle/internal/bundletest" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/vfs" diff --git a/bundle/config/resources.go b/bundle/config/resources.go index 0affb6ef..2e3f4dbf 100644 --- a/bundle/config/resources.go +++ b/bundle/config/resources.go @@ -20,6 +20,7 @@ type Resources struct { RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"` QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"` Schemas map[string]*resources.Schema `json:"schemas,omitempty"` + Volumes map[string]*resources.Volume `json:"volumes,omitempty"` Clusters map[string]*resources.Cluster `json:"clusters,omitempty"` Dashboards map[string]*resources.Dashboard `json:"dashboards,omitempty"` } diff --git a/bundle/config/resources/volume.go b/bundle/config/resources/volume.go new file mode 100644 index 00000000..f7af05bd --- /dev/null +++ b/bundle/config/resources/volume.go @@ -0,0 +1,27 @@ +package resources + +import ( + "github.com/databricks/databricks-sdk-go/marshal" + "github.com/databricks/databricks-sdk-go/service/catalog" +) + +type Volume struct { + // List of grants to apply on this volume. + Grants []Grant `json:"grants,omitempty"` + + // Full name of the volume (catalog_name.schema_name.volume_name). This value is read from + // the terraform state after deployment succeeds. + ID string `json:"id,omitempty" bundle:"readonly"` + + *catalog.CreateVolumeRequestContent + + ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"` +} + +func (v *Volume) UnmarshalJSON(b []byte) error { + return marshal.Unmarshal(b, v) +} + +func (v Volume) MarshalJSON() ([]byte, error) { + return marshal.Marshal(v) +} diff --git a/bundle/deploy/metadata/compute_test.go b/bundle/deploy/metadata/compute_test.go index 2c2c7237..2d852bf7 100644 --- a/bundle/deploy/metadata/compute_test.go +++ b/bundle/deploy/metadata/compute_test.go @@ -5,9 +5,9 @@ import ( "testing" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/bundletest" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/resources" - "github.com/databricks/cli/bundle/internal/bundletest" "github.com/databricks/cli/bundle/metadata" "github.com/databricks/cli/libs/dyn" "github.com/databricks/databricks-sdk-go/service/jobs" diff --git a/bundle/deploy/terraform/convert.go b/bundle/deploy/terraform/convert.go index 0ace7c66..b710c690 100644 --- a/bundle/deploy/terraform/convert.go +++ b/bundle/deploy/terraform/convert.go @@ -166,6 +166,16 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error { } cur.ID = instance.Attributes.ID config.Resources.Schemas[resource.Name] = cur + case "databricks_volume": + if config.Resources.Volumes == nil { + config.Resources.Volumes = make(map[string]*resources.Volume) + } + cur := config.Resources.Volumes[resource.Name] + if cur == nil { + cur = &resources.Volume{ModifiedStatus: resources.ModifiedStatusDeleted} + } + cur.ID = instance.Attributes.ID + config.Resources.Volumes[resource.Name] = cur case "databricks_cluster": if config.Resources.Clusters == nil { config.Resources.Clusters = make(map[string]*resources.Cluster) @@ -235,6 +245,11 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error { src.ModifiedStatus = resources.ModifiedStatusCreated } } + for _, src := range config.Resources.Volumes { + if src.ModifiedStatus == "" && src.ID == "" { + src.ModifiedStatus = resources.ModifiedStatusCreated + } + } for _, src := range config.Resources.Clusters { if src.ModifiedStatus == "" && src.ID == "" { src.ModifiedStatus = resources.ModifiedStatusCreated diff --git a/bundle/deploy/terraform/convert_test.go b/bundle/deploy/terraform/convert_test.go index 6ed34d43..bb6b9611 100644 --- a/bundle/deploy/terraform/convert_test.go +++ b/bundle/deploy/terraform/convert_test.go @@ -670,6 +670,14 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) { {Attributes: stateInstanceAttributes{ID: "1"}}, }, }, + { + Type: "databricks_volume", + Mode: "managed", + Name: "test_volume", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "1"}}, + }, + }, { Type: "databricks_cluster", Mode: "managed", @@ -715,6 +723,9 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) { assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus) + assert.Equal(t, "1", config.Resources.Volumes["test_volume"].ID) + assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Volumes["test_volume"].ModifiedStatus) + assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster"].ModifiedStatus) @@ -783,6 +794,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) { }, }, }, + Volumes: map[string]*resources.Volume{ + "test_volume": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "test_volume", + }, + }, + }, Clusters: map[string]*resources.Cluster{ "test_cluster": { ClusterSpec: &compute.ClusterSpec{ @@ -829,6 +847,9 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) { assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus) + assert.Equal(t, "", config.Resources.Volumes["test_volume"].ID) + assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Volumes["test_volume"].ModifiedStatus) + assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ID) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster"].ModifiedStatus) @@ -937,6 +958,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { }, }, }, + Volumes: map[string]*resources.Volume{ + "test_volume": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "test_volume", + }, + }, + "test_volume_new": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + Name: "test_volume_new", + }, + }, + }, Clusters: map[string]*resources.Cluster{ "test_cluster": { ClusterSpec: &compute.ClusterSpec{ @@ -1093,6 +1126,14 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { {Attributes: stateInstanceAttributes{ID: "2"}}, }, }, + { + Type: "databricks_volume", + Mode: "managed", + Name: "test_volume", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "1"}}, + }, + }, { Type: "databricks_cluster", Mode: "managed", @@ -1101,6 +1142,14 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { {Attributes: stateInstanceAttributes{ID: "1"}}, }, }, + { + Type: "databricks_volume", + Mode: "managed", + Name: "test_volume_old", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "2"}}, + }, + }, { Type: "databricks_cluster", Mode: "managed", @@ -1186,6 +1235,13 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus) + assert.Equal(t, "1", config.Resources.Volumes["test_volume"].ID) + assert.Equal(t, "", config.Resources.Volumes["test_volume"].ModifiedStatus) + assert.Equal(t, "2", config.Resources.Volumes["test_volume_old"].ID) + assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Volumes["test_volume_old"].ModifiedStatus) + assert.Equal(t, "", config.Resources.Volumes["test_volume_new"].ID) + assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Volumes["test_volume_new"].ModifiedStatus) + assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID) assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ModifiedStatus) assert.Equal(t, "2", config.Resources.Clusters["test_cluster_old"].ID) diff --git a/bundle/deploy/terraform/interpolate.go b/bundle/deploy/terraform/interpolate.go index eb15c63e..9c2126ae 100644 --- a/bundle/deploy/terraform/interpolate.go +++ b/bundle/deploy/terraform/interpolate.go @@ -58,6 +58,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...) case dyn.Key("schemas"): path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...) + case dyn.Key("volumes"): + path = dyn.NewPath(dyn.Key("databricks_volume")).Append(path[2:]...) case dyn.Key("clusters"): path = dyn.NewPath(dyn.Key("databricks_cluster")).Append(path[2:]...) case dyn.Key("dashboards"): diff --git a/bundle/deploy/terraform/interpolate_test.go b/bundle/deploy/terraform/interpolate_test.go index b26ef928..fc5c4d18 100644 --- a/bundle/deploy/terraform/interpolate_test.go +++ b/bundle/deploy/terraform/interpolate_test.go @@ -31,6 +31,7 @@ func TestInterpolate(t *testing.T) { "other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}", "other_registered_model": "${resources.registered_models.other_registered_model.id}", "other_schema": "${resources.schemas.other_schema.id}", + "other_volume": "${resources.volumes.other_volume.id}", "other_cluster": "${resources.clusters.other_cluster.id}", "other_dashboard": "${resources.dashboards.other_dashboard.id}", }, @@ -69,6 +70,7 @@ func TestInterpolate(t *testing.T) { assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"]) assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"]) assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"]) + assert.Equal(t, "${databricks_volume.other_volume.id}", j.Tags["other_volume"]) assert.Equal(t, "${databricks_cluster.other_cluster.id}", j.Tags["other_cluster"]) assert.Equal(t, "${databricks_dashboard.other_dashboard.id}", j.Tags["other_dashboard"]) diff --git a/bundle/deploy/terraform/tfdyn/convert_volume.go b/bundle/deploy/terraform/tfdyn/convert_volume.go new file mode 100644 index 00000000..4211e1f9 --- /dev/null +++ b/bundle/deploy/terraform/tfdyn/convert_volume.go @@ -0,0 +1,45 @@ +package tfdyn + +import ( + "context" + "fmt" + + "github.com/databricks/cli/bundle/internal/tf/schema" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/log" +) + +func convertVolumeResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { + // Normalize the output value to the target schema. + vout, diags := convert.Normalize(schema.ResourceVolume{}, vin) + for _, diag := range diags { + log.Debugf(ctx, "volume normalization diagnostic: %s", diag.Summary) + } + + return vout, nil +} + +type volumeConverter struct{} + +func (volumeConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error { + vout, err := convertVolumeResource(ctx, vin) + if err != nil { + return err + } + + // Add the converted resource to the output. + out.Volume[key] = vout.AsAny() + + // Configure grants for this resource. + if grants := convertGrantsResource(ctx, vin); grants != nil { + grants.Volume = fmt.Sprintf("${databricks_volume.%s.id}", key) + out.Grants["volume_"+key] = grants + } + + return nil +} + +func init() { + registerConverter("volumes", volumeConverter{}) +} diff --git a/bundle/deploy/terraform/tfdyn/convert_volume_test.go b/bundle/deploy/terraform/tfdyn/convert_volume_test.go new file mode 100644 index 00000000..c897ae69 --- /dev/null +++ b/bundle/deploy/terraform/tfdyn/convert_volume_test.go @@ -0,0 +1,70 @@ +package tfdyn + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/tf/schema" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConvertVolume(t *testing.T) { + var src = resources.Volume{ + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "catalog", + Comment: "comment", + Name: "name", + SchemaName: "schema", + StorageLocation: "s3://bucket/path", + VolumeType: "EXTERNAL", + }, + Grants: []resources.Grant{ + { + Privileges: []string{"READ_VOLUME"}, + Principal: "jack@gmail.com", + }, + { + Privileges: []string{"WRITE_VOLUME"}, + Principal: "jane@gmail.com", + }, + }, + } + + vin, err := convert.FromTyped(src, dyn.NilValue) + require.NoError(t, err) + + ctx := context.Background() + out := schema.NewResources() + err = volumeConverter{}.Convert(ctx, "my_volume", vin, out) + require.NoError(t, err) + + // Assert equality on the volume + require.Equal(t, map[string]any{ + "catalog_name": "catalog", + "comment": "comment", + "name": "name", + "schema_name": "schema", + "storage_location": "s3://bucket/path", + "volume_type": "EXTERNAL", + }, out.Volume["my_volume"]) + + // Assert equality on the grants + assert.Equal(t, &schema.ResourceGrants{ + Volume: "${databricks_volume.my_volume.id}", + Grant: []schema.ResourceGrantsGrant{ + { + Privileges: []string{"READ_VOLUME"}, + Principal: "jack@gmail.com", + }, + { + Privileges: []string{"WRITE_VOLUME"}, + Principal: "jane@gmail.com", + }, + }, + }, out.Grants["volume_my_volume"]) +} diff --git a/bundle/libraries/expand_glob_references_test.go b/bundle/libraries/expand_glob_references_test.go index 2dfbddb7..bdd04216 100644 --- a/bundle/libraries/expand_glob_references_test.go +++ b/bundle/libraries/expand_glob_references_test.go @@ -6,9 +6,9 @@ import ( "testing" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/bundletest" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/resources" - "github.com/databricks/cli/bundle/internal/bundletest" "github.com/databricks/cli/internal/testutil" "github.com/databricks/cli/libs/dyn" "github.com/databricks/databricks-sdk-go/service/compute" diff --git a/bundle/libraries/filer.go b/bundle/libraries/filer.go new file mode 100644 index 00000000..8cbf1a51 --- /dev/null +++ b/bundle/libraries/filer.go @@ -0,0 +1,28 @@ +package libraries + +import ( + "context" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/filer" +) + +// This function returns the right filer to use, to upload artifacts to the configured location. +// Supported locations: +// 1. WSFS +// 2. UC volumes +func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) { + artifactPath := b.Config.Workspace.ArtifactPath + if artifactPath == "" { + return nil, "", diag.Errorf("remote artifact path not configured") + } + + switch { + case IsVolumesPath(artifactPath): + return filerForVolume(ctx, b) + + default: + return filerForWorkspace(b) + } +} diff --git a/bundle/libraries/filer_test.go b/bundle/libraries/filer_test.go new file mode 100644 index 00000000..88ba152f --- /dev/null +++ b/bundle/libraries/filer_test.go @@ -0,0 +1,63 @@ +package libraries + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/libs/filer" + sdkconfig "github.com/databricks/databricks-sdk-go/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestGetFilerForLibrariesValidWsfs(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/foo/bar/artifacts", + }, + }, + } + + client, uploadPath, diags := GetFilerForLibraries(context.Background(), b) + require.NoError(t, diags.Error()) + assert.Equal(t, "/foo/bar/artifacts/.internal", uploadPath) + + assert.IsType(t, &filer.WorkspaceFilesClient{}, client) +} + +func TestGetFilerForLibrariesValidUcVolume(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/main/my_schema/my_volume", + }, + }, + } + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil) + b.SetWorkpaceClient(m.WorkspaceClient) + + client, uploadPath, diags := GetFilerForLibraries(context.Background(), b) + require.NoError(t, diags.Error()) + assert.Equal(t, "/Volumes/main/my_schema/my_volume/.internal", uploadPath) + + assert.IsType(t, &filer.FilesClient{}, client) +} + +func TestGetFilerForLibrariesRemotePathNotSet(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{}, + }, + } + + _, _, diags := GetFilerForLibraries(context.Background(), b) + require.EqualError(t, diags.Error(), "remote artifact path not configured") +} diff --git a/bundle/libraries/filer_volume.go b/bundle/libraries/filer_volume.go new file mode 100644 index 00000000..ec88bb94 --- /dev/null +++ b/bundle/libraries/filer_volume.go @@ -0,0 +1,101 @@ +package libraries + +import ( + "context" + "fmt" + "path" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/dynvar" + "github.com/databricks/cli/libs/filer" +) + +// This function returns a filer for ".internal" folder inside the directory configured +// at `workspace.artifact_path`. +// This function also checks if the UC volume exists in the workspace and then: +// 1. If the UC volume exists in the workspace: +// Returns a filer for the UC volume. +// 2. If the UC volume does not exist in the workspace but is (with high confidence) defined in +// the bundle configuration: +// Returns an error and a warning that instructs the user to deploy the +// UC volume before using it in the artifact path. +// 3. If the UC volume does not exist in the workspace and is not defined in the bundle configuration: +// Returns an error. +func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) { + artifactPath := b.Config.Workspace.ArtifactPath + w := b.WorkspaceClient() + + if !strings.HasPrefix(artifactPath, "/Volumes/") { + return nil, "", diag.Errorf("expected artifact_path to start with /Volumes/, got %s", artifactPath) + } + + parts := strings.Split(artifactPath, "/") + volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes////..., got %s", artifactPath) + + // Incorrect format. + if len(parts) < 5 { + return nil, "", diag.FromErr(volumeFormatErr) + } + + catalogName := parts[2] + schemaName := parts[3] + volumeName := parts[4] + + // Incorrect format. + if catalogName == "" || schemaName == "" || volumeName == "" { + return nil, "", diag.FromErr(volumeFormatErr) + } + + // Check if the UC volume exists in the workspace. + volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName) + err := w.Files.GetDirectoryMetadataByDirectoryPath(ctx, volumePath) + + // If the volume exists already, directly return the filer for the path to + // upload the artifacts to. + if err == nil { + uploadPath := path.Join(artifactPath, ".internal") + f, err := filer.NewFilesClient(w, uploadPath) + return f, uploadPath, diag.FromErr(err) + } + + diags := diag.Errorf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path: %s", volumePath, err) + + path, locations, ok := findVolumeInBundle(b, catalogName, schemaName, volumeName) + if !ok { + return nil, "", diags + } + + warning := diag.Diagnostic{ + Severity: diag.Warning, + Summary: `You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.`, + Locations: locations, + Paths: []dyn.Path{path}, + } + return nil, "", diags.Append(warning) +} + +func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) { + volumes := b.Config.Resources.Volumes + for k, v := range volumes { + if v.CatalogName != catalogName || v.Name != volumeName { + continue + } + // UC schemas can be defined in the bundle itself, and thus might be interpolated + // at runtime via the ${resources.schemas.} syntax. Thus we match the volume + // definition if the schema name is the same as the one in the bundle, or if the + // schema name is interpolated. + // We only have to check for ${resources.schemas...} references because any + // other valid reference (like ${var.foo}) would have been interpolated by this point. + p, ok := dynvar.PureReferenceToPath(v.SchemaName) + isSchemaDefinedInBundle := ok && p.HasPrefix(dyn.Path{dyn.Key("resources"), dyn.Key("schemas")}) + if v.SchemaName != schemaName && !isSchemaDefinedInBundle { + continue + } + pathString := fmt.Sprintf("resources.volumes.%s", k) + return dyn.MustPathFromString(pathString), b.Config.GetLocations(pathString), true + } + return nil, nil, false +} diff --git a/bundle/libraries/filer_volume_test.go b/bundle/libraries/filer_volume_test.go new file mode 100644 index 00000000..987c57cb --- /dev/null +++ b/bundle/libraries/filer_volume_test.go @@ -0,0 +1,223 @@ +package libraries + +import ( + "context" + "fmt" + "path" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/bundletest" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/filer" + sdkconfig "github.com/databricks/databricks-sdk-go/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestFindVolumeInBundle(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Volumes: map[string]*resources.Volume{ + "foo": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "main", + Name: "my_volume", + SchemaName: "my_schema", + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{ + { + File: "volume.yml", + Line: 1, + Column: 2, + }, + }) + + // volume is in DAB. + path, locations, ok := findVolumeInBundle(b, "main", "my_schema", "my_volume") + assert.True(t, ok) + assert.Equal(t, []dyn.Location{{ + File: "volume.yml", + Line: 1, + Column: 2, + }}, locations) + assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path) + + // wrong volume name + _, _, ok = findVolumeInBundle(b, "main", "my_schema", "doesnotexist") + assert.False(t, ok) + + // wrong schema name + _, _, ok = findVolumeInBundle(b, "main", "doesnotexist", "my_volume") + assert.False(t, ok) + + // wrong catalog name + _, _, ok = findVolumeInBundle(b, "doesnotexist", "my_schema", "my_volume") + assert.False(t, ok) + + // schema name is interpolated but does not have the right prefix. In this case + // we should not match the volume. + b.Config.Resources.Volumes["foo"].SchemaName = "${foo.bar.baz}" + _, _, ok = findVolumeInBundle(b, "main", "my_schema", "my_volume") + assert.False(t, ok) + + // schema name is interpolated. + b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema.name}" + path, locations, ok = findVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume") + assert.True(t, ok) + assert.Equal(t, []dyn.Location{{ + File: "volume.yml", + Line: 1, + Column: 2, + }}, locations) + assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path) +} + +func TestFilerForVolumeNotInBundle(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/main/my_schema/doesnotexist", + }, + }, + } + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/doesnotexist").Return(fmt.Errorf("error from API")) + b.SetWorkpaceClient(m.WorkspaceClient) + + _, _, diags := filerForVolume(context.Background(), b) + assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/doesnotexist that is configured in the artifact_path: error from API") + assert.Len(t, diags, 1) +} + +func TestFilerForVolumeInBundle(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/main/my_schema/my_volume", + }, + Resources: config.Resources{ + Volumes: map[string]*resources.Volume{ + "foo": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "main", + Name: "my_volume", + VolumeType: "MANAGED", + SchemaName: "my_schema", + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{ + { + File: "volume.yml", + Line: 1, + Column: 2, + }, + }) + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(fmt.Errorf("error from API")) + b.SetWorkpaceClient(m.WorkspaceClient) + + _, _, diags := GetFilerForLibraries(context.Background(), b) + assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/my_volume that is configured in the artifact_path: error from API") + assert.Contains(t, diags, diag.Diagnostic{ + Severity: diag.Warning, + Summary: "You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.", + Locations: []dyn.Location{{ + File: "volume.yml", + Line: 1, + Column: 2, + }}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.volumes.foo")}, + }) +} + +func TestFilerForVolumeWithInvalidVolumePaths(t *testing.T) { + invalidPaths := []string{ + "/Volumes/", + "/Volumes/main", + "/Volumes/main/", + "/Volumes/main//", + "/Volumes/main//my_schema", + "/Volumes/main/my_schema", + "/Volumes/main/my_schema/", + "/Volumes/main/my_schema//", + "/Volumes//my_schema/my_volume", + } + + for _, p := range invalidPaths { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: p, + }, + }, + } + + _, _, diags := GetFilerForLibraries(context.Background(), b) + require.EqualError(t, diags.Error(), fmt.Sprintf("expected UC volume path to be in the format /Volumes////..., got %s", p)) + } +} + +func TestFilerForVolumeWithInvalidPrefix(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volume/main/my_schema/my_volume", + }, + }, + } + + _, _, diags := filerForVolume(context.Background(), b) + require.EqualError(t, diags.Error(), "expected artifact_path to start with /Volumes/, got /Volume/main/my_schema/my_volume") +} + +func TestFilerForVolumeWithValidlVolumePaths(t *testing.T) { + validPaths := []string{ + "/Volumes/main/my_schema/my_volume", + "/Volumes/main/my_schema/my_volume/", + "/Volumes/main/my_schema/my_volume/a/b/c", + "/Volumes/main/my_schema/my_volume/a/a/a", + } + + for _, p := range validPaths { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: p, + }, + }, + } + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil) + b.SetWorkpaceClient(m.WorkspaceClient) + + client, uploadPath, diags := filerForVolume(context.Background(), b) + require.NoError(t, diags.Error()) + assert.Equal(t, path.Join(p, ".internal"), uploadPath) + assert.IsType(t, &filer.FilesClient{}, client) + } +} diff --git a/bundle/libraries/filer_workspace.go b/bundle/libraries/filer_workspace.go new file mode 100644 index 00000000..9172fef9 --- /dev/null +++ b/bundle/libraries/filer_workspace.go @@ -0,0 +1,15 @@ +package libraries + +import ( + "path" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/filer" +) + +func filerForWorkspace(b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) { + uploadPath := path.Join(b.Config.Workspace.ArtifactPath, ".internal") + f, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), uploadPath) + return f, uploadPath, diag.FromErr(err) +} diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go index 90a1a21f..8f1b425f 100644 --- a/bundle/libraries/upload.go +++ b/bundle/libraries/upload.go @@ -16,8 +16,6 @@ import ( "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" - "github.com/databricks/databricks-sdk-go" - "golang.org/x/sync/errgroup" ) @@ -130,24 +128,17 @@ func collectLocalLibraries(b *bundle.Bundle) (map[string][]configLocation, error } func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - uploadPath, err := GetUploadBasePath(b) - if err != nil { - return diag.FromErr(err) + client, uploadPath, diags := GetFilerForLibraries(ctx, b) + if diags.HasError() { + return diags } - // If the client is not initialized, initialize it - // We use client field in mutator to allow for mocking client in testing + // Only set the filer client if it's not already set. We use client field + // in mutator to mock the filer client in testing if u.client == nil { - filer, err := GetFilerForLibraries(b.WorkspaceClient(), uploadPath) - if err != nil { - return diag.FromErr(err) - } - - u.client = filer + u.client = client } - var diags diag.Diagnostics - libs, err := collectLocalLibraries(b) if err != nil { return diag.FromErr(err) @@ -197,17 +188,6 @@ func (u *upload) Name() string { return "libraries.Upload" } -func GetFilerForLibraries(w *databricks.WorkspaceClient, uploadPath string) (filer.Filer, error) { - if isVolumesPath(uploadPath) { - return filer.NewFilesClient(w, uploadPath) - } - return filer.NewWorkspaceFilesClient(w, uploadPath) -} - -func isVolumesPath(path string) bool { - return strings.HasPrefix(path, "/Volumes/") -} - // Function to upload file (a library, artifact and etc) to Workspace or UC volume func UploadFile(ctx context.Context, file string, client filer.Filer) error { filename := filepath.Base(file) @@ -227,12 +207,3 @@ func UploadFile(ctx context.Context, file string, client filer.Filer) error { log.Infof(ctx, "Upload succeeded") return nil } - -func GetUploadBasePath(b *bundle.Bundle) (string, error) { - artifactPath := b.Config.Workspace.ArtifactPath - if artifactPath == "" { - return "", fmt.Errorf("remote artifact path not configured") - } - - return path.Join(artifactPath, ".internal"), nil -} diff --git a/bundle/libraries/upload_test.go b/bundle/libraries/upload_test.go index 44b194c5..493785bf 100644 --- a/bundle/libraries/upload_test.go +++ b/bundle/libraries/upload_test.go @@ -11,6 +11,8 @@ import ( mockfiler "github.com/databricks/cli/internal/mocks/libs/filer" "github.com/databricks/cli/internal/testutil" "github.com/databricks/cli/libs/filer" + sdkconfig "github.com/databricks/databricks-sdk-go/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/stretchr/testify/mock" @@ -181,6 +183,11 @@ func TestArtifactUploadForVolumes(t *testing.T) { filer.CreateParentDirectories, ).Return(nil) + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{} + m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/foo/bar/artifacts").Return(nil) + b.SetWorkpaceClient(m.WorkspaceClient) + diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler))) require.NoError(t, diags.Error()) diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index e623c364..f5fe42b0 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -23,10 +23,10 @@ import ( tfjson "github.com/hashicorp/terraform-json" ) -func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action { +func filterDeleteOrRecreateActions(changes []*tfjson.ResourceChange, resourceType string) []terraformlib.Action { res := make([]terraformlib.Action, 0) for _, rc := range changes { - if !toInclude(rc.Type, rc.Change.Actions) { + if rc.Type != resourceType { continue } @@ -37,7 +37,7 @@ func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ case rc.Change.Actions.Replace(): actionType = terraformlib.ActionTypeRecreate default: - // No use case for other action types yet. + // Filter other action types.. continue } @@ -63,30 +63,12 @@ func approvalForDeploy(ctx context.Context, b *bundle.Bundle) (bool, error) { return false, err } - schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool { - // Filter in only UC schema resources. - if typ != "databricks_schema" { - return false - } - - // We only display prompts for destructive actions like deleting or - // recreating a schema. - return actions.Delete() || actions.Replace() - }) - - dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool { - // Filter in only DLT pipeline resources. - if typ != "databricks_pipeline" { - return false - } - - // Recreating DLT pipeline leads to metadata loss and for a transient period - // the underling tables will be unavailable. - return actions.Replace() || actions.Delete() - }) + schemaActions := filterDeleteOrRecreateActions(plan.ResourceChanges, "databricks_schema") + dltActions := filterDeleteOrRecreateActions(plan.ResourceChanges, "databricks_pipeline") + volumeActions := filterDeleteOrRecreateActions(plan.ResourceChanges, "databricks_volume") // We don't need to display any prompts in this case. - if len(dltActions) == 0 && len(schemaActions) == 0 { + if len(schemaActions) == 0 && len(dltActions) == 0 && len(volumeActions) == 0 { return true, nil } @@ -111,6 +93,19 @@ properties such as the 'catalog' or 'storage' are changed:` } } + // One or more volumes is being recreated. + if len(volumeActions) != 0 { + msg := ` +This action will result in the deletion or recreation of the following Volumes. +For managed volumes, the files stored in the volume are also deleted from your +cloud tenant within 30 days. For external volumes, the metadata about the volume +is removed from the catalog, but the underlying files are not deleted:` + cmdio.LogString(ctx, msg) + for _, action := range volumeActions { + cmdio.Log(ctx, action) + } + } + if b.AutoApprove { return true, nil } diff --git a/bundle/phases/deploy_test.go b/bundle/phases/deploy_test.go index e00370b3..2cef9540 100644 --- a/bundle/phases/deploy_test.go +++ b/bundle/phases/deploy_test.go @@ -40,17 +40,7 @@ func TestParseTerraformActions(t *testing.T) { }, } - res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool { - if typ != "databricks_pipeline" { - return false - } - - if actions.Delete() || actions.Replace() { - return true - } - - return false - }) + res := filterDeleteOrRecreateActions(changes, "databricks_pipeline") assert.Equal(t, []terraformlib.Action{ { diff --git a/internal/bundle/artifacts_test.go b/internal/bundle/artifacts_test.go index 775327e1..3cc731f4 100644 --- a/internal/bundle/artifacts_test.go +++ b/internal/bundle/artifacts_test.go @@ -1,6 +1,7 @@ package bundle import ( + "fmt" "os" "path" "path/filepath" @@ -8,13 +9,18 @@ import ( "testing" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/bundletest" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/internal" "github.com/databricks/cli/internal/acc" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -225,3 +231,112 @@ func TestAccUploadArtifactFileToCorrectRemotePathForVolumes(t *testing.T) { b.Config.Resources.Jobs["test"].JobSettings.Tasks[0].Libraries[0].Whl, ) } + +func TestAccUploadArtifactFileToInvalidVolume(t *testing.T) { + ctx, wt := acc.UcWorkspaceTest(t) + w := wt.W + + schemaName := internal.RandomName("schema-") + + _, err := w.Schemas.Create(ctx, catalog.CreateSchema{ + CatalogName: "main", + Comment: "test schema", + Name: schemaName, + }) + require.NoError(t, err) + + t.Cleanup(func() { + err = w.Schemas.DeleteByFullName(ctx, "main."+schemaName) + require.NoError(t, err) + }) + + t.Run("volume not in DAB", func(t *testing.T) { + volumePath := fmt.Sprintf("/Volumes/main/%s/doesnotexist", schemaName) + dir := t.TempDir() + + b := &bundle.Bundle{ + BundleRootPath: dir, + SyncRootPath: dir, + Config: config.Root{ + Bundle: config.Bundle{ + Target: "whatever", + }, + Workspace: config.Workspace{ + ArtifactPath: volumePath, + }, + Resources: config.Resources{ + Volumes: map[string]*resources.Volume{ + "foo": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "main", + Name: "my_volume", + VolumeType: "MANAGED", + SchemaName: schemaName, + }, + }, + }, + }, + }, + } + + diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload())) + assert.ErrorContains(t, diags.Error(), fmt.Sprintf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path:", volumePath)) + }) + + t.Run("volume in DAB config", func(t *testing.T) { + volumePath := fmt.Sprintf("/Volumes/main/%s/my_volume", schemaName) + dir := t.TempDir() + + b := &bundle.Bundle{ + BundleRootPath: dir, + SyncRootPath: dir, + Config: config.Root{ + Bundle: config.Bundle{ + Target: "whatever", + }, + Workspace: config.Workspace{ + ArtifactPath: volumePath, + }, + Resources: config.Resources{ + Volumes: map[string]*resources.Volume{ + "foo": { + CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ + CatalogName: "main", + Name: "my_volume", + VolumeType: "MANAGED", + SchemaName: schemaName, + }, + }, + }, + }, + }, + } + + // set location of volume definition in config. + bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{{ + File: filepath.Join(dir, "databricks.yml"), + Line: 1, + Column: 2, + }}) + + diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload())) + assert.Contains(t, diags, diag.Diagnostic{ + Severity: diag.Error, + Summary: fmt.Sprintf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path: Not Found", volumePath), + }) + assert.Contains(t, diags, diag.Diagnostic{ + Severity: diag.Warning, + Summary: "You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.", + Locations: []dyn.Location{ + { + File: filepath.Join(dir, "databricks.yml"), + Line: 1, + Column: 2, + }, + }, + Paths: []dyn.Path{ + dyn.MustPathFromString("resources.volumes.foo"), + }, + }) + }) +} diff --git a/internal/bundle/bundles/volume/databricks_template_schema.json b/internal/bundle/bundles/volume/databricks_template_schema.json new file mode 100644 index 00000000..d849dacf --- /dev/null +++ b/internal/bundle/bundles/volume/databricks_template_schema.json @@ -0,0 +1,8 @@ +{ + "properties": { + "unique_id": { + "type": "string", + "description": "Unique ID for the schema names" + } + } +} diff --git a/internal/bundle/bundles/volume/template/databricks.yml.tmpl b/internal/bundle/bundles/volume/template/databricks.yml.tmpl new file mode 100644 index 00000000..d7f31439 --- /dev/null +++ b/internal/bundle/bundles/volume/template/databricks.yml.tmpl @@ -0,0 +1,31 @@ +bundle: + name: test-uc-volumes-{{.unique_id}} + +variables: + schema_name: + default: ${resources.schemas.schema1.name} + +resources: + schemas: + schema1: + name: schema1-{{.unique_id}} + catalog_name: main + comment: This schema was created from DABs + + schema2: + name: schema2-{{.unique_id}} + catalog_name: main + comment: This schema was created from DABs + + volumes: + foo: + catalog_name: main + name: my_volume + schema_name: ${var.schema_name} + volume_type: MANAGED + comment: This volume was created from DABs. + + grants: + - principal: account users + privileges: + - WRITE_VOLUME diff --git a/internal/bundle/bundles/volume/template/nb.sql b/internal/bundle/bundles/volume/template/nb.sql new file mode 100644 index 00000000..199ff507 --- /dev/null +++ b/internal/bundle/bundles/volume/template/nb.sql @@ -0,0 +1,2 @@ +-- Databricks notebook source +select 1 diff --git a/internal/bundle/deploy_test.go b/internal/bundle/deploy_test.go index 88543585..0ddc79f3 100644 --- a/internal/bundle/deploy_test.go +++ b/internal/bundle/deploy_test.go @@ -243,3 +243,73 @@ func TestAccDeployBasicBundleLogs(t *testing.T) { }, "\n"), stderr) assert.Equal(t, "", stdout) } + +func TestAccDeployUcVolume(t *testing.T) { + ctx, wt := acc.UcWorkspaceTest(t) + w := wt.W + + uniqueId := uuid.New().String() + bundleRoot, err := initTestTemplate(t, ctx, "volume", map[string]any{ + "unique_id": uniqueId, + }) + require.NoError(t, err) + + err = deployBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + t.Cleanup(func() { + destroyBundle(t, ctx, bundleRoot) + }) + + // Assert the volume is created successfully + catalogName := "main" + schemaName := "schema1-" + uniqueId + volumeName := "my_volume" + fullName := fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName) + volume, err := w.Volumes.ReadByName(ctx, fullName) + require.NoError(t, err) + require.Equal(t, volume.Name, volumeName) + require.Equal(t, catalogName, volume.CatalogName) + require.Equal(t, schemaName, volume.SchemaName) + + // Assert that the grants were successfully applied. + grants, err := w.Grants.GetBySecurableTypeAndFullName(ctx, catalog.SecurableTypeVolume, fullName) + require.NoError(t, err) + assert.Len(t, grants.PrivilegeAssignments, 1) + assert.Equal(t, "account users", grants.PrivilegeAssignments[0].Principal) + assert.Equal(t, []catalog.Privilege{catalog.PrivilegeWriteVolume}, grants.PrivilegeAssignments[0].Privileges) + + // Recreation of the volume without --auto-approve should fail since prompting is not possible + t.Setenv("TERM", "dumb") + t.Setenv("BUNDLE_ROOT", bundleRoot) + stdout, stderr, err := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--var=schema_name=${resources.schemas.schema2.name}").Run() + assert.Error(t, err) + assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following Volumes. +For managed volumes, the files stored in the volume are also deleted from your +cloud tenant within 30 days. For external volumes, the metadata about the volume +is removed from the catalog, but the underlying files are not deleted: + recreate volume foo`) + assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed") + + // Successfully recreate the volume with --auto-approve + t.Setenv("TERM", "dumb") + t.Setenv("BUNDLE_ROOT", bundleRoot) + _, _, err = internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--var=schema_name=${resources.schemas.schema2.name}", "--auto-approve").Run() + assert.NoError(t, err) + + // Assert the volume is updated successfully + schemaName = "schema2-" + uniqueId + fullName = fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName) + volume, err = w.Volumes.ReadByName(ctx, fullName) + require.NoError(t, err) + require.Equal(t, volume.Name, volumeName) + require.Equal(t, catalogName, volume.CatalogName) + require.Equal(t, schemaName, volume.SchemaName) + + // assert that the grants were applied / retained on recreate. + grants, err = w.Grants.GetBySecurableTypeAndFullName(ctx, catalog.SecurableTypeVolume, fullName) + require.NoError(t, err) + assert.Len(t, grants.PrivilegeAssignments, 1) + assert.Equal(t, "account users", grants.PrivilegeAssignments[0].Principal) + assert.Equal(t, []catalog.Privilege{catalog.PrivilegeWriteVolume}, grants.PrivilegeAssignments[0].Privileges) +} diff --git a/libs/dyn/dynvar/ref.go b/libs/dyn/dynvar/ref.go index 338ac8ce..a2893882 100644 --- a/libs/dyn/dynvar/ref.go +++ b/libs/dyn/dynvar/ref.go @@ -71,3 +71,23 @@ func (v ref) references() []string { func IsPureVariableReference(s string) bool { return len(s) > 0 && re.FindString(s) == s } + +// If s is a pure variable reference, this function returns the corresponding +// dyn.Path. Otherwise, it returns false. +func PureReferenceToPath(s string) (dyn.Path, bool) { + ref, ok := newRef(dyn.V(s)) + if !ok { + return nil, false + } + + if !ref.isPure() { + return nil, false + } + + p, err := dyn.NewPathFromString(ref.references()[0]) + if err != nil { + return nil, false + } + + return p, true +} diff --git a/libs/dyn/dynvar/ref_test.go b/libs/dyn/dynvar/ref_test.go index aff3643e..4110732f 100644 --- a/libs/dyn/dynvar/ref_test.go +++ b/libs/dyn/dynvar/ref_test.go @@ -51,3 +51,34 @@ func TestIsPureVariableReference(t *testing.T) { assert.False(t, IsPureVariableReference("prefix ${foo.bar}")) assert.True(t, IsPureVariableReference("${foo.bar}")) } + +func TestPureReferenceToPath(t *testing.T) { + for _, tc := range []struct { + in string + out string + ok bool + }{ + {"${foo.bar}", "foo.bar", true}, + {"${foo.bar.baz}", "foo.bar.baz", true}, + {"${foo.bar.baz[0]}", "foo.bar.baz[0]", true}, + {"${foo.bar.baz[0][1]}", "foo.bar.baz[0][1]", true}, + {"${foo.bar.baz[0][1].qux}", "foo.bar.baz[0][1].qux", true}, + + {"${foo.one}${foo.two}", "", false}, + {"prefix ${foo.bar}", "", false}, + {"${foo.bar} suffix", "", false}, + {"${foo.bar", "", false}, + {"foo.bar}", "", false}, + {"foo.bar", "", false}, + {"{foo.bar}", "", false}, + {"", "", false}, + } { + path, ok := PureReferenceToPath(tc.in) + if tc.ok { + assert.True(t, ok) + assert.Equal(t, dyn.MustPathFromString(tc.out), path) + } else { + assert.False(t, ok) + } + } +} diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 4bb03aea..9e0a7ce5 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -114,7 +114,7 @@ type apiClient interface { // NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled. // It can access any workspace path if files-in-workspace is enabled. -type workspaceFilesClient struct { +type WorkspaceFilesClient struct { workspaceClient *databricks.WorkspaceClient apiClient apiClient @@ -128,7 +128,7 @@ func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, return nil, err } - return &workspaceFilesClient{ + return &WorkspaceFilesClient{ workspaceClient: w, apiClient: apiClient, @@ -136,7 +136,7 @@ func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, }, nil } -func (w *workspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { +func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) if err != nil { return err @@ -214,7 +214,7 @@ func (w *workspaceFilesClient) Write(ctx context.Context, name string, reader io return err } -func (w *workspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { +func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err @@ -238,7 +238,7 @@ func (w *workspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCl return w.workspaceClient.Workspace.Download(ctx, absPath) } -func (w *workspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { +func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { absPath, err := w.root.Join(name) if err != nil { return err @@ -282,7 +282,7 @@ func (w *workspaceFilesClient) Delete(ctx context.Context, name string, mode ... return err } -func (w *workspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { +func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err @@ -315,7 +315,7 @@ func (w *workspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.D return wsfsDirEntriesFromObjectInfos(objects), nil } -func (w *workspaceFilesClient) Mkdir(ctx context.Context, name string) error { +func (w *WorkspaceFilesClient) Mkdir(ctx context.Context, name string) error { dirPath, err := w.root.Join(name) if err != nil { return err @@ -325,7 +325,7 @@ func (w *workspaceFilesClient) Mkdir(ctx context.Context, name string) error { }) } -func (w *workspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { +func (w *WorkspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err diff --git a/libs/filer/workspace_files_extensions_client_test.go b/libs/filer/workspace_files_extensions_client_test.go index 974a6a37..10c176b3 100644 --- a/libs/filer/workspace_files_extensions_client_test.go +++ b/libs/filer/workspace_files_extensions_client_test.go @@ -174,7 +174,7 @@ func TestFilerWorkspaceFilesExtensionsErrorsOnDupName(t *testing.T) { "return_export_info": "true", }, mock.AnythingOfType("*filer.wsfsFileInfo"), []func(*http.Request) error(nil)).Return(nil, statNotebook) - workspaceFilesClient := workspaceFilesClient{ + workspaceFilesClient := WorkspaceFilesClient{ workspaceClient: mockedWorkspaceClient.WorkspaceClient, apiClient: &mockedApiClient, root: NewWorkspaceRootPath("/dir"),