From b21f623788263b6720355251eb822b949f7b9b7a Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 30 Dec 2024 15:15:51 +0530 Subject: [PATCH] Add validation mutator for volume `arifact_path` --- bundle/config/validate/validate.go | 1 + .../config/validate/validate_artifact_path.go | 101 ++++++++++++++++ .../validate/validate_artifact_path_test.go | 109 ++++++++++++++++++ bundle/libraries/filer_volume.go | 26 +++-- bundle/libraries/filer_volume_test.go | 16 +-- 5 files changed, 235 insertions(+), 18 deletions(-) create mode 100644 bundle/config/validate/validate_artifact_path.go create mode 100644 bundle/config/validate/validate_artifact_path_test.go diff --git a/bundle/config/validate/validate.go b/bundle/config/validate/validate.go index 131566fc9..45661bd80 100644 --- a/bundle/config/validate/validate.go +++ b/bundle/config/validate/validate.go @@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics JobTaskClusterSpec(), ValidateFolderPermissions(), SingleNodeCluster(), + ValidateArtifactPath(), )) } diff --git a/bundle/config/validate/validate_artifact_path.go b/bundle/config/validate/validate_artifact_path.go new file mode 100644 index 000000000..c8b65983f --- /dev/null +++ b/bundle/config/validate/validate_artifact_path.go @@ -0,0 +1,101 @@ +package validate + +import ( + "context" + "errors" + "fmt" + "slices" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/libraries" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/catalog" +) + +type validateArtifactPath struct{} + +func ValidateArtifactPath() bundle.ReadOnlyMutator { + return &validateArtifactPath{} +} + +func (v *validateArtifactPath) Name() string { + return "validate:artifact_paths" +} + +func (v *validateArtifactPath) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { + // We only validate UC Volumes paths right now. + // TODO? + if !libraries.IsVolumesPath(rb.Config().Workspace.ArtifactPath) { + return nil + } + + catalogName, schemaName, volumeName, err := libraries.ExtractVolumeFromPath(rb.Config().Workspace.ArtifactPath) + if err != nil { + return diag.FromErr(err) + } + volumeFullName := fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName) + w := rb.WorkspaceClient() + p, err := w.Grants.GetEffectiveBySecurableTypeAndFullName(ctx, catalog.SecurableTypeVolume, volumeFullName) + + wrapErrorMsg := func(s string) diag.Diagnostics { + return diag.Diagnostics{ + { + Summary: s, + Severity: diag.Error, + Locations: rb.Config().GetLocations("workspace.artifact_path"), + Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, + }, + } + } + if errors.Is(err, apierr.ErrPermissionDenied) { + return wrapErrorMsg(fmt.Sprintf("cannot access volume %s: %s", volumeFullName, err)) + } + if errors.Is(err, apierr.ErrNotFound) { + path, locations, ok := libraries.FindVolumeInBundle(rb.Config(), catalogName, schemaName, volumeName) + if !ok { + return wrapErrorMsg(fmt.Sprintf("volume %s does not exist", volumeFullName)) + } + + // If the volume is defined in the bundle, provide a more helpful error diagnostic, + // with more details and location information. + return diag.Diagnostics{{ + Summary: fmt.Sprintf("volume %s does not exist", volumeFullName), + Severity: diag.Error, + Detail: `You are using a volume in your artifact_path that is managed by +this bundle but which has not been deployed yet. Please first deploy +the volume using 'bundle deploy' and then switch over to using it in +the artifact_path.`, + Locations: slices.Concat(rb.Config().GetLocations("workspace.artifact_path"), locations), + Paths: append([]dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, path), + }} + + } + if err != nil { + return wrapErrorMsg(fmt.Sprintf("could not fetch grants for volume %s: %s", volumeFullName, err)) + } + + allPrivileges := []catalog.Privilege{} + for _, assignments := range p.PrivilegeAssignments { + for _, privilege := range assignments.Privileges { + allPrivileges = append(allPrivileges, privilege.Privilege) + } + } + + // UC Volumes have the following privileges: [READ_VOLUME, WRITE_VOLUME, MANAGE, ALL_PRIVILEGES, APPLY TAG] + // The user needs to have either WRITE_VOLUME or ALL_PRIVILEGES to write to the volume. + canWrite := slices.Contains(allPrivileges, catalog.PrivilegeWriteVolume) || slices.Contains(allPrivileges, catalog.PrivilegeAllPrivileges) + if !canWrite { + return wrapErrorMsg(fmt.Sprintf("user does not have WRITE_VOLUME grant on volume %s", volumeFullName)) + } + + // READ_VOLUME is implied since the user was able to fetch the associated grants with the volume. + // We still add this explicit check out of caution incase the API behavior changes in the future. + canRead := slices.Contains(allPrivileges, catalog.PrivilegeReadVolume) || slices.Contains(allPrivileges, catalog.PrivilegeAllPrivileges) + if !canRead { + return wrapErrorMsg(fmt.Sprintf("user does not have READ_VOLUME grant on volume %s", volumeFullName)) + } + + return nil +} diff --git a/bundle/config/validate/validate_artifact_path_test.go b/bundle/config/validate/validate_artifact_path_test.go new file mode 100644 index 000000000..c91da3aae --- /dev/null +++ b/bundle/config/validate/validate_artifact_path_test.go @@ -0,0 +1,109 @@ +package validate_test + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/validate" + "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestVali + +func TestValidateArtifactPath(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/catalogN/schemaN/volumeN/abc", + }, + }, + } + + bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "file", Line: 1, Column: 1}}) + assertDiags := func(t *testing.T, diags diag.Diagnostics, expected string) { + assert.Len(t, diags, 1) + assert.Equal(t, diag.Diagnostics{{ + Severity: diag.Error, + Summary: expected, + Locations: []dyn.Location{{File: "file", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, + }}, diags) + } + + wrapPrivileges := func(privileges ...catalog.Privilege) *catalog.EffectivePermissionsList { + perms := &catalog.EffectivePermissionsList{} + for _, p := range privileges { + perms.PrivilegeAssignments = append(perms.PrivilegeAssignments, catalog.EffectivePrivilegeAssignment{ + Privileges: []catalog.EffectivePrivilege{{Privilege: p}}, + }) + } + return perms + } + + rb := bundle.ReadOnly(b) + ctx := context.Background() + + tcases := []struct { + err error + permissions *catalog.EffectivePermissionsList + expectedSummary string + }{ + { + err: &apierr.APIError{ + StatusCode: 403, + Message: "User does not have USE SCHEMA on Schema 'catalogN.schemaN'", + }, + expectedSummary: "cannot access volume catalogN.schemaN.volumeN: User does not have USE SCHEMA on Schema 'catalogN.schemaN'", + }, + { + err: &apierr.APIError{ + StatusCode: 404, + }, + expectedSummary: "volume catalogN.schemaN.volumeN does not exist", + }, + { + err: &apierr.APIError{ + StatusCode: 500, + Message: "Internal Server Error", + }, + expectedSummary: "could not fetch grants for volume catalogN.schemaN.volumeN: Internal Server Error", + }, + { + permissions: wrapPrivileges(catalog.PrivilegeAllPrivileges), + }, + { + permissions: wrapPrivileges(catalog.PrivilegeApplyTag, catalog.PrivilegeManage), + expectedSummary: "user does not have WRITE_VOLUME grant on volume catalogN.schemaN.volumeN", + }, + { + permissions: wrapPrivileges(catalog.PrivilegeWriteVolume), + expectedSummary: "user does not have READ_VOLUME grant on volume catalogN.schemaN.volumeN", + }, + { + permissions: wrapPrivileges(catalog.PrivilegeWriteVolume, catalog.PrivilegeReadVolume), + }, + } + + for _, tc := range tcases { + m := mocks.NewMockWorkspaceClient(t) + api := m.GetMockGrantsAPI() + api.EXPECT().GetEffectiveBySecurableTypeAndFullName(mock.Anything, catalog.SecurableTypeVolume, "catalogN.schemaN.volumeN").Return(tc.permissions, tc.err) + b.SetWorkpaceClient(m.WorkspaceClient) + + diags := bundle.ApplyReadOnly(ctx, rb, validate.ValidateArtifactPath()) + if tc.expectedSummary != "" { + assertDiags(t, diags, tc.expectedSummary) + } else { + assert.Len(t, diags, 0) + } + } +} diff --git a/bundle/libraries/filer_volume.go b/bundle/libraries/filer_volume.go index aecf68db1..94ff74a4c 100644 --- a/bundle/libraries/filer_volume.go +++ b/bundle/libraries/filer_volume.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn/dynvar" @@ -15,7 +16,7 @@ import ( "github.com/databricks/databricks-sdk-go/apierr" ) -func extractVolumeFromPath(artifactPath string) (string, string, string, error) { +func ExtractVolumeFromPath(artifactPath string) (string, string, string, error) { if !IsVolumesPath(artifactPath) { return "", "", "", fmt.Errorf("expected artifact_path to start with /Volumes/, got %s", artifactPath) } @@ -55,7 +56,7 @@ func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, artifactPath := b.Config.Workspace.ArtifactPath w := b.WorkspaceClient() - catalogName, schemaName, volumeName, err := extractVolumeFromPath(artifactPath) + catalogName, schemaName, volumeName, err := ExtractVolumeFromPath(artifactPath) if err != nil { return nil, "", diag.Diagnostics{ { @@ -68,8 +69,8 @@ func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, } // Check if the UC volume exists in the workspace. - volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName) - err = w.Files.GetDirectoryMetadataByDirectoryPath(ctx, volumePath) + volumeFullName := fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName) + _, err = w.Volumes.ReadByName(ctx, volumeName) // If the volume exists already, directly return the filer for the path to // upload the artifacts to. @@ -81,19 +82,24 @@ func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, baseErr := diag.Diagnostic{ Severity: diag.Error, - Summary: fmt.Sprintf("unable to determine if volume at %s exists: %s", volumePath, err), + Summary: fmt.Sprintf("unable to determine if volume at %s exists: %s", volumeFullName, err), Locations: b.Config.GetLocations("workspace.artifact_path"), Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, } + if errors.Is(err, apierr.ErrPermissionDenied) { + // If the API returned a 403, the user does not have permission to access the volume. + // Modify the error message to provide more context. + baseErr.Summary = fmt.Sprintf("cannot access volume %s: %s", volumeFullName, err) + } if errors.Is(err, apierr.ErrNotFound) { // Since the API returned a 404, the volume does not exist. // Modify the error message to provide more context. - baseErr.Summary = fmt.Sprintf("volume %s does not exist: %s", volumePath, err) + baseErr.Summary = fmt.Sprintf("volume %s does not exist", volumeFullName) // If the volume is defined in the bundle, provide a more helpful error diagnostic, // with more details and location information. - path, locations, ok := findVolumeInBundle(b, catalogName, schemaName, volumeName) + path, locations, ok := FindVolumeInBundle(b.Config, catalogName, schemaName, volumeName) if !ok { return nil, "", diag.Diagnostics{baseErr} } @@ -108,8 +114,8 @@ the artifact_path.` return nil, "", diag.Diagnostics{baseErr} } -func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) { - volumes := b.Config.Resources.Volumes +func FindVolumeInBundle(r config.Root, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) { + volumes := r.Resources.Volumes for k, v := range volumes { if v.CatalogName != catalogName || v.Name != volumeName { continue @@ -126,7 +132,7 @@ func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName st continue } pathString := fmt.Sprintf("resources.volumes.%s", k) - return dyn.MustPathFromString(pathString), b.Config.GetLocations(pathString), true + return dyn.MustPathFromString(pathString), r.GetLocations(pathString), true } return nil, nil, false } diff --git a/bundle/libraries/filer_volume_test.go b/bundle/libraries/filer_volume_test.go index 7b2f5c5ba..9855ee3cb 100644 --- a/bundle/libraries/filer_volume_test.go +++ b/bundle/libraries/filer_volume_test.go @@ -48,7 +48,7 @@ func TestFindVolumeInBundle(t *testing.T) { }) // volume is in DAB. - path, locations, ok := findVolumeInBundle(b, "main", "my_schema", "my_volume") + path, locations, ok := FindVolumeInBundle(b.Config, "main", "my_schema", "my_volume") assert.True(t, ok) assert.Equal(t, []dyn.Location{{ File: "volume.yml", @@ -58,26 +58,26 @@ func TestFindVolumeInBundle(t *testing.T) { assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path) // wrong volume name - _, _, ok = findVolumeInBundle(b, "main", "my_schema", "doesnotexist") + _, _, ok = FindVolumeInBundle(b.Config, "main", "my_schema", "doesnotexist") assert.False(t, ok) // wrong schema name - _, _, ok = findVolumeInBundle(b, "main", "doesnotexist", "my_volume") + _, _, ok = FindVolumeInBundle(b.Config, "main", "doesnotexist", "my_volume") assert.False(t, ok) // wrong catalog name - _, _, ok = findVolumeInBundle(b, "doesnotexist", "my_schema", "my_volume") + _, _, ok = FindVolumeInBundle(b.Config, "doesnotexist", "my_schema", "my_volume") assert.False(t, ok) // schema name is interpolated but does not have the right prefix. In this case // we should not match the volume. b.Config.Resources.Volumes["foo"].SchemaName = "${foo.bar.baz}" - _, _, ok = findVolumeInBundle(b, "main", "my_schema", "my_volume") + _, _, ok = FindVolumeInBundle(b.Config, "main", "my_schema", "my_volume") assert.False(t, ok) // schema name is interpolated. b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema.name}" - path, locations, ok = findVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume") + path, locations, ok = FindVolumeInBundle(b.Config, "main", "valuedoesnotmatter", "my_volume") assert.True(t, ok) assert.Equal(t, []dyn.Location{{ File: "volume.yml", @@ -264,14 +264,14 @@ func TestFilerForVolumeWithValidVolumePaths(t *testing.T) { } func TestExtractVolumeFromPath(t *testing.T) { - catalogName, schemaName, volumeName, err := extractVolumeFromPath("/Volumes/main/my_schema/my_volume") + catalogName, schemaName, volumeName, err := ExtractVolumeFromPath("/Volumes/main/my_schema/my_volume") require.NoError(t, err) assert.Equal(t, "main", catalogName) assert.Equal(t, "my_schema", schemaName) assert.Equal(t, "my_volume", volumeName) for _, p := range invalidVolumePaths() { - _, _, _, err := extractVolumeFromPath(p) + _, _, _, err := ExtractVolumeFromPath(p) assert.EqualError(t, err, fmt.Sprintf("expected UC volume path to be in the format /Volumes////..., got %s", p)) } }