Add validation mutator for volume `arifact_path`

This commit is contained in:
Shreyas Goenka 2024-12-30 15:15:51 +05:30
parent a002475a6a
commit b21f623788
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
5 changed files with 235 additions and 18 deletions

View File

@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
JobTaskClusterSpec(), JobTaskClusterSpec(),
ValidateFolderPermissions(), ValidateFolderPermissions(),
SingleNodeCluster(), SingleNodeCluster(),
ValidateArtifactPath(),
)) ))
} }

View File

@ -0,0 +1,101 @@
package validate
import (
"context"
"errors"
"fmt"
"slices"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/service/catalog"
)
type validateArtifactPath struct{}
func ValidateArtifactPath() bundle.ReadOnlyMutator {
return &validateArtifactPath{}
}
func (v *validateArtifactPath) Name() string {
return "validate:artifact_paths"
}
func (v *validateArtifactPath) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
// We only validate UC Volumes paths right now.
// TODO?
if !libraries.IsVolumesPath(rb.Config().Workspace.ArtifactPath) {
return nil
}
catalogName, schemaName, volumeName, err := libraries.ExtractVolumeFromPath(rb.Config().Workspace.ArtifactPath)
if err != nil {
return diag.FromErr(err)
}
volumeFullName := fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName)
w := rb.WorkspaceClient()
p, err := w.Grants.GetEffectiveBySecurableTypeAndFullName(ctx, catalog.SecurableTypeVolume, volumeFullName)
wrapErrorMsg := func(s string) diag.Diagnostics {
return diag.Diagnostics{
{
Summary: s,
Severity: diag.Error,
Locations: rb.Config().GetLocations("workspace.artifact_path"),
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
},
}
}
if errors.Is(err, apierr.ErrPermissionDenied) {
return wrapErrorMsg(fmt.Sprintf("cannot access volume %s: %s", volumeFullName, err))
}
if errors.Is(err, apierr.ErrNotFound) {
path, locations, ok := libraries.FindVolumeInBundle(rb.Config(), catalogName, schemaName, volumeName)
if !ok {
return wrapErrorMsg(fmt.Sprintf("volume %s does not exist", volumeFullName))
}
// If the volume is defined in the bundle, provide a more helpful error diagnostic,
// with more details and location information.
return diag.Diagnostics{{
Summary: fmt.Sprintf("volume %s does not exist", volumeFullName),
Severity: diag.Error,
Detail: `You are using a volume in your artifact_path that is managed by
this bundle but which has not been deployed yet. Please first deploy
the volume using 'bundle deploy' and then switch over to using it in
the artifact_path.`,
Locations: slices.Concat(rb.Config().GetLocations("workspace.artifact_path"), locations),
Paths: append([]dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, path),
}}
}
if err != nil {
return wrapErrorMsg(fmt.Sprintf("could not fetch grants for volume %s: %s", volumeFullName, err))
}
allPrivileges := []catalog.Privilege{}
for _, assignments := range p.PrivilegeAssignments {
for _, privilege := range assignments.Privileges {
allPrivileges = append(allPrivileges, privilege.Privilege)
}
}
// UC Volumes have the following privileges: [READ_VOLUME, WRITE_VOLUME, MANAGE, ALL_PRIVILEGES, APPLY TAG]
// The user needs to have either WRITE_VOLUME or ALL_PRIVILEGES to write to the volume.
canWrite := slices.Contains(allPrivileges, catalog.PrivilegeWriteVolume) || slices.Contains(allPrivileges, catalog.PrivilegeAllPrivileges)
if !canWrite {
return wrapErrorMsg(fmt.Sprintf("user does not have WRITE_VOLUME grant on volume %s", volumeFullName))
}
// READ_VOLUME is implied since the user was able to fetch the associated grants with the volume.
// We still add this explicit check out of caution incase the API behavior changes in the future.
canRead := slices.Contains(allPrivileges, catalog.PrivilegeReadVolume) || slices.Contains(allPrivileges, catalog.PrivilegeAllPrivileges)
if !canRead {
return wrapErrorMsg(fmt.Sprintf("user does not have READ_VOLUME grant on volume %s", volumeFullName))
}
return nil
}

View File

@ -0,0 +1,109 @@
package validate_test
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/validate"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/experimental/mocks"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func TestVali
func TestValidateArtifactPath(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volumes/catalogN/schemaN/volumeN/abc",
},
},
}
bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "file", Line: 1, Column: 1}})
assertDiags := func(t *testing.T, diags diag.Diagnostics, expected string) {
assert.Len(t, diags, 1)
assert.Equal(t, diag.Diagnostics{{
Severity: diag.Error,
Summary: expected,
Locations: []dyn.Location{{File: "file", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
}}, diags)
}
wrapPrivileges := func(privileges ...catalog.Privilege) *catalog.EffectivePermissionsList {
perms := &catalog.EffectivePermissionsList{}
for _, p := range privileges {
perms.PrivilegeAssignments = append(perms.PrivilegeAssignments, catalog.EffectivePrivilegeAssignment{
Privileges: []catalog.EffectivePrivilege{{Privilege: p}},
})
}
return perms
}
rb := bundle.ReadOnly(b)
ctx := context.Background()
tcases := []struct {
err error
permissions *catalog.EffectivePermissionsList
expectedSummary string
}{
{
err: &apierr.APIError{
StatusCode: 403,
Message: "User does not have USE SCHEMA on Schema 'catalogN.schemaN'",
},
expectedSummary: "cannot access volume catalogN.schemaN.volumeN: User does not have USE SCHEMA on Schema 'catalogN.schemaN'",
},
{
err: &apierr.APIError{
StatusCode: 404,
},
expectedSummary: "volume catalogN.schemaN.volumeN does not exist",
},
{
err: &apierr.APIError{
StatusCode: 500,
Message: "Internal Server Error",
},
expectedSummary: "could not fetch grants for volume catalogN.schemaN.volumeN: Internal Server Error",
},
{
permissions: wrapPrivileges(catalog.PrivilegeAllPrivileges),
},
{
permissions: wrapPrivileges(catalog.PrivilegeApplyTag, catalog.PrivilegeManage),
expectedSummary: "user does not have WRITE_VOLUME grant on volume catalogN.schemaN.volumeN",
},
{
permissions: wrapPrivileges(catalog.PrivilegeWriteVolume),
expectedSummary: "user does not have READ_VOLUME grant on volume catalogN.schemaN.volumeN",
},
{
permissions: wrapPrivileges(catalog.PrivilegeWriteVolume, catalog.PrivilegeReadVolume),
},
}
for _, tc := range tcases {
m := mocks.NewMockWorkspaceClient(t)
api := m.GetMockGrantsAPI()
api.EXPECT().GetEffectiveBySecurableTypeAndFullName(mock.Anything, catalog.SecurableTypeVolume, "catalogN.schemaN.volumeN").Return(tc.permissions, tc.err)
b.SetWorkpaceClient(m.WorkspaceClient)
diags := bundle.ApplyReadOnly(ctx, rb, validate.ValidateArtifactPath())
if tc.expectedSummary != "" {
assertDiags(t, diags, tc.expectedSummary)
} else {
assert.Len(t, diags, 0)
}
}
}

View File

@ -8,6 +8,7 @@ import (
"strings" "strings"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/dynvar" "github.com/databricks/cli/libs/dyn/dynvar"
@ -15,7 +16,7 @@ import (
"github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/apierr"
) )
func extractVolumeFromPath(artifactPath string) (string, string, string, error) { func ExtractVolumeFromPath(artifactPath string) (string, string, string, error) {
if !IsVolumesPath(artifactPath) { if !IsVolumesPath(artifactPath) {
return "", "", "", fmt.Errorf("expected artifact_path to start with /Volumes/, got %s", artifactPath) return "", "", "", fmt.Errorf("expected artifact_path to start with /Volumes/, got %s", artifactPath)
} }
@ -55,7 +56,7 @@ func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string,
artifactPath := b.Config.Workspace.ArtifactPath artifactPath := b.Config.Workspace.ArtifactPath
w := b.WorkspaceClient() w := b.WorkspaceClient()
catalogName, schemaName, volumeName, err := extractVolumeFromPath(artifactPath) catalogName, schemaName, volumeName, err := ExtractVolumeFromPath(artifactPath)
if err != nil { if err != nil {
return nil, "", diag.Diagnostics{ return nil, "", diag.Diagnostics{
{ {
@ -68,8 +69,8 @@ func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string,
} }
// Check if the UC volume exists in the workspace. // Check if the UC volume exists in the workspace.
volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName) volumeFullName := fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName)
err = w.Files.GetDirectoryMetadataByDirectoryPath(ctx, volumePath) _, err = w.Volumes.ReadByName(ctx, volumeName)
// If the volume exists already, directly return the filer for the path to // If the volume exists already, directly return the filer for the path to
// upload the artifacts to. // upload the artifacts to.
@ -81,19 +82,24 @@ func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string,
baseErr := diag.Diagnostic{ baseErr := diag.Diagnostic{
Severity: diag.Error, Severity: diag.Error,
Summary: fmt.Sprintf("unable to determine if volume at %s exists: %s", volumePath, err), Summary: fmt.Sprintf("unable to determine if volume at %s exists: %s", volumeFullName, err),
Locations: b.Config.GetLocations("workspace.artifact_path"), Locations: b.Config.GetLocations("workspace.artifact_path"),
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
} }
if errors.Is(err, apierr.ErrPermissionDenied) {
// If the API returned a 403, the user does not have permission to access the volume.
// Modify the error message to provide more context.
baseErr.Summary = fmt.Sprintf("cannot access volume %s: %s", volumeFullName, err)
}
if errors.Is(err, apierr.ErrNotFound) { if errors.Is(err, apierr.ErrNotFound) {
// Since the API returned a 404, the volume does not exist. // Since the API returned a 404, the volume does not exist.
// Modify the error message to provide more context. // Modify the error message to provide more context.
baseErr.Summary = fmt.Sprintf("volume %s does not exist: %s", volumePath, err) baseErr.Summary = fmt.Sprintf("volume %s does not exist", volumeFullName)
// If the volume is defined in the bundle, provide a more helpful error diagnostic, // If the volume is defined in the bundle, provide a more helpful error diagnostic,
// with more details and location information. // with more details and location information.
path, locations, ok := findVolumeInBundle(b, catalogName, schemaName, volumeName) path, locations, ok := FindVolumeInBundle(b.Config, catalogName, schemaName, volumeName)
if !ok { if !ok {
return nil, "", diag.Diagnostics{baseErr} return nil, "", diag.Diagnostics{baseErr}
} }
@ -108,8 +114,8 @@ the artifact_path.`
return nil, "", diag.Diagnostics{baseErr} return nil, "", diag.Diagnostics{baseErr}
} }
func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) { func FindVolumeInBundle(r config.Root, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) {
volumes := b.Config.Resources.Volumes volumes := r.Resources.Volumes
for k, v := range volumes { for k, v := range volumes {
if v.CatalogName != catalogName || v.Name != volumeName { if v.CatalogName != catalogName || v.Name != volumeName {
continue continue
@ -126,7 +132,7 @@ func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName st
continue continue
} }
pathString := fmt.Sprintf("resources.volumes.%s", k) pathString := fmt.Sprintf("resources.volumes.%s", k)
return dyn.MustPathFromString(pathString), b.Config.GetLocations(pathString), true return dyn.MustPathFromString(pathString), r.GetLocations(pathString), true
} }
return nil, nil, false return nil, nil, false
} }

View File

@ -48,7 +48,7 @@ func TestFindVolumeInBundle(t *testing.T) {
}) })
// volume is in DAB. // volume is in DAB.
path, locations, ok := findVolumeInBundle(b, "main", "my_schema", "my_volume") path, locations, ok := FindVolumeInBundle(b.Config, "main", "my_schema", "my_volume")
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, []dyn.Location{{ assert.Equal(t, []dyn.Location{{
File: "volume.yml", File: "volume.yml",
@ -58,26 +58,26 @@ func TestFindVolumeInBundle(t *testing.T) {
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path) assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
// wrong volume name // wrong volume name
_, _, ok = findVolumeInBundle(b, "main", "my_schema", "doesnotexist") _, _, ok = FindVolumeInBundle(b.Config, "main", "my_schema", "doesnotexist")
assert.False(t, ok) assert.False(t, ok)
// wrong schema name // wrong schema name
_, _, ok = findVolumeInBundle(b, "main", "doesnotexist", "my_volume") _, _, ok = FindVolumeInBundle(b.Config, "main", "doesnotexist", "my_volume")
assert.False(t, ok) assert.False(t, ok)
// wrong catalog name // wrong catalog name
_, _, ok = findVolumeInBundle(b, "doesnotexist", "my_schema", "my_volume") _, _, ok = FindVolumeInBundle(b.Config, "doesnotexist", "my_schema", "my_volume")
assert.False(t, ok) assert.False(t, ok)
// schema name is interpolated but does not have the right prefix. In this case // schema name is interpolated but does not have the right prefix. In this case
// we should not match the volume. // we should not match the volume.
b.Config.Resources.Volumes["foo"].SchemaName = "${foo.bar.baz}" b.Config.Resources.Volumes["foo"].SchemaName = "${foo.bar.baz}"
_, _, ok = findVolumeInBundle(b, "main", "my_schema", "my_volume") _, _, ok = FindVolumeInBundle(b.Config, "main", "my_schema", "my_volume")
assert.False(t, ok) assert.False(t, ok)
// schema name is interpolated. // schema name is interpolated.
b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema.name}" b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema.name}"
path, locations, ok = findVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume") path, locations, ok = FindVolumeInBundle(b.Config, "main", "valuedoesnotmatter", "my_volume")
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, []dyn.Location{{ assert.Equal(t, []dyn.Location{{
File: "volume.yml", File: "volume.yml",
@ -264,14 +264,14 @@ func TestFilerForVolumeWithValidVolumePaths(t *testing.T) {
} }
func TestExtractVolumeFromPath(t *testing.T) { func TestExtractVolumeFromPath(t *testing.T) {
catalogName, schemaName, volumeName, err := extractVolumeFromPath("/Volumes/main/my_schema/my_volume") catalogName, schemaName, volumeName, err := ExtractVolumeFromPath("/Volumes/main/my_schema/my_volume")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "main", catalogName) assert.Equal(t, "main", catalogName)
assert.Equal(t, "my_schema", schemaName) assert.Equal(t, "my_schema", schemaName)
assert.Equal(t, "my_volume", volumeName) assert.Equal(t, "my_volume", volumeName)
for _, p := range invalidVolumePaths() { for _, p := range invalidVolumePaths() {
_, _, _, err := extractVolumeFromPath(p) _, _, _, err := ExtractVolumeFromPath(p)
assert.EqualError(t, err, fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", p)) assert.EqualError(t, err, fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", p))
} }
} }