mirror of https://github.com/databricks/cli.git
Add validation mutator for volume `artifact_path` (#2050)
## Changes This PR: 1. Incrementally improves the error messages shown to the user when the volume they are referring to in `workspace.artifact_path` does not exist. 2. Performs this validation in both `bundle validate` and `bundle deploy` compared to before on just deployments. 3. It runs "fast" validations on `bundle deploy`, which earlier were only run on `bundle validate`. ## Tests Unit tests and manually. Also, existing integration tests provide coverage (`TestUploadArtifactToVolumeNotYetDeployed`, `TestUploadArtifactFileToVolumeThatDoesNotExist`) Examples: ``` .venv➜ bundle-playground git:(master) ✗ cli bundle validate Error: cannot access volume capital.whatever.my_volume: User does not have READ VOLUME on Volume 'capital.whatever.my_volume'. at workspace.artifact_path in databricks.yml:7:18 ``` and ``` .venv➜ bundle-playground git:(master) ✗ cli bundle validate Error: volume capital.whatever.foobar does not exist at workspace.artifact_path resources.volumes.foo in databricks.yml:7:18 databricks.yml:12:7 You are using a volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please first deploy the volume using 'bundle deploy' and then switch over to using it in the artifact_path. ```
This commit is contained in:
parent
509f5aba6a
commit
7beb0fb8b5
|
@ -0,0 +1,51 @@
|
|||
package validate
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
)
|
||||
|
||||
// FastValidate runs a subset of fast validation checks. This is a subset of the full
|
||||
// suite of validation mutators that satisfy ANY ONE of the following criteria:
|
||||
//
|
||||
// 1. No file i/o or network requests are made in the mutator.
|
||||
// 2. The validation is blocking for bundle deployments.
|
||||
//
|
||||
// The full suite of validation mutators is available in the [Validate] mutator.
|
||||
type fastValidateReadonly struct{}
|
||||
|
||||
func FastValidateReadonly() bundle.ReadOnlyMutator {
|
||||
return &fastValidateReadonly{}
|
||||
}
|
||||
|
||||
func (f *fastValidateReadonly) Name() string {
|
||||
return "fast_validate(readonly)"
|
||||
}
|
||||
|
||||
func (f *fastValidateReadonly) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
return bundle.ApplyReadOnly(ctx, rb, bundle.Parallel(
|
||||
// Fast mutators with only in-memory checks
|
||||
JobClusterKeyDefined(),
|
||||
JobTaskClusterSpec(),
|
||||
SingleNodeCluster(),
|
||||
|
||||
// Blocking mutators. Deployments will fail if these checks fail.
|
||||
ValidateArtifactPath(),
|
||||
))
|
||||
}
|
||||
|
||||
type fastValidate struct{}
|
||||
|
||||
func FastValidate() bundle.Mutator {
|
||||
return &fastValidate{}
|
||||
}
|
||||
|
||||
func (f *fastValidate) Name() string {
|
||||
return "fast_validate"
|
||||
}
|
||||
|
||||
func (f *fastValidate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
return bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), FastValidateReadonly())
|
||||
}
|
|
@ -30,12 +30,13 @@ func (l location) Path() dyn.Path {
|
|||
// Apply implements bundle.Mutator.
|
||||
func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
return bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), bundle.Parallel(
|
||||
JobClusterKeyDefined(),
|
||||
FastValidateReadonly(),
|
||||
|
||||
// Slow mutators that require network or file i/o. These are only
|
||||
// run in the `bundle validate` command.
|
||||
FilesToSync(),
|
||||
ValidateSyncPatterns(),
|
||||
JobTaskClusterSpec(),
|
||||
ValidateFolderPermissions(),
|
||||
SingleNodeCluster(),
|
||||
ValidateSyncPatterns(),
|
||||
))
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,129 @@
|
|||
package validate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/libraries"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/dyn/dynvar"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
)
|
||||
|
||||
type validateArtifactPath struct{}
|
||||
|
||||
func ValidateArtifactPath() bundle.ReadOnlyMutator {
|
||||
return &validateArtifactPath{}
|
||||
}
|
||||
|
||||
func (v *validateArtifactPath) Name() string {
|
||||
return "validate:artifact_paths"
|
||||
}
|
||||
|
||||
func extractVolumeFromPath(artifactPath string) (string, string, string, error) {
|
||||
if !libraries.IsVolumesPath(artifactPath) {
|
||||
return "", "", "", fmt.Errorf("expected artifact_path to start with /Volumes/, got %s", artifactPath)
|
||||
}
|
||||
|
||||
parts := strings.Split(artifactPath, "/")
|
||||
volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", artifactPath)
|
||||
|
||||
// Incorrect format.
|
||||
if len(parts) < 5 {
|
||||
return "", "", "", volumeFormatErr
|
||||
}
|
||||
|
||||
catalogName := parts[2]
|
||||
schemaName := parts[3]
|
||||
volumeName := parts[4]
|
||||
|
||||
// Incorrect format.
|
||||
if catalogName == "" || schemaName == "" || volumeName == "" {
|
||||
return "", "", "", volumeFormatErr
|
||||
}
|
||||
|
||||
return catalogName, schemaName, volumeName, nil
|
||||
}
|
||||
|
||||
func findVolumeInBundle(r config.Root, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) {
|
||||
volumes := r.Resources.Volumes
|
||||
for k, v := range volumes {
|
||||
if v.CatalogName != catalogName || v.Name != volumeName {
|
||||
continue
|
||||
}
|
||||
// UC schemas can be defined in the bundle itself, and thus might be interpolated
|
||||
// at runtime via the ${resources.schemas.<name>} syntax. Thus we match the volume
|
||||
// definition if the schema name is the same as the one in the bundle, or if the
|
||||
// schema name is interpolated.
|
||||
// We only have to check for ${resources.schemas...} references because any
|
||||
// other valid reference (like ${var.foo}) would have been interpolated by this point.
|
||||
p, ok := dynvar.PureReferenceToPath(v.SchemaName)
|
||||
isSchemaDefinedInBundle := ok && p.HasPrefix(dyn.Path{dyn.Key("resources"), dyn.Key("schemas")})
|
||||
if v.SchemaName != schemaName && !isSchemaDefinedInBundle {
|
||||
continue
|
||||
}
|
||||
pathString := fmt.Sprintf("resources.volumes.%s", k)
|
||||
return dyn.MustPathFromString(pathString), r.GetLocations(pathString), true
|
||||
}
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
func (v *validateArtifactPath) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
// We only validate UC Volumes paths right now.
|
||||
if !libraries.IsVolumesPath(rb.Config().Workspace.ArtifactPath) {
|
||||
return nil
|
||||
}
|
||||
|
||||
wrapErrorMsg := func(s string) diag.Diagnostics {
|
||||
return diag.Diagnostics{
|
||||
{
|
||||
Summary: s,
|
||||
Severity: diag.Error,
|
||||
Locations: rb.Config().GetLocations("workspace.artifact_path"),
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
catalogName, schemaName, volumeName, err := extractVolumeFromPath(rb.Config().Workspace.ArtifactPath)
|
||||
if err != nil {
|
||||
return wrapErrorMsg(err.Error())
|
||||
}
|
||||
volumeFullName := fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName)
|
||||
w := rb.WorkspaceClient()
|
||||
_, err = w.Volumes.ReadByName(ctx, volumeFullName)
|
||||
|
||||
if errors.Is(err, apierr.ErrPermissionDenied) {
|
||||
return wrapErrorMsg(fmt.Sprintf("cannot access volume %s: %s", volumeFullName, err))
|
||||
}
|
||||
if errors.Is(err, apierr.ErrNotFound) {
|
||||
path, locations, ok := findVolumeInBundle(rb.Config(), catalogName, schemaName, volumeName)
|
||||
if !ok {
|
||||
return wrapErrorMsg(fmt.Sprintf("volume %s does not exist", volumeFullName))
|
||||
}
|
||||
|
||||
// If the volume is defined in the bundle, provide a more helpful error diagnostic,
|
||||
// with more details and location information.
|
||||
return diag.Diagnostics{{
|
||||
Summary: fmt.Sprintf("volume %s does not exist", volumeFullName),
|
||||
Severity: diag.Error,
|
||||
Detail: `You are using a volume in your artifact_path that is managed by
|
||||
this bundle but which has not been deployed yet. Please first deploy
|
||||
the volume using 'bundle deploy' and then switch over to using it in
|
||||
the artifact_path.`,
|
||||
Locations: slices.Concat(rb.Config().GetLocations("workspace.artifact_path"), locations),
|
||||
Paths: append([]dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, path),
|
||||
}}
|
||||
|
||||
}
|
||||
if err != nil {
|
||||
return wrapErrorMsg(fmt.Sprintf("cannot read volume %s: %s", volumeFullName, err))
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,244 @@
|
|||
package validate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/cli/bundle/internal/bundletest"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
||||
"github.com/databricks/databricks-sdk-go/service/catalog"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestValidateArtifactPathWithVolumeInBundle(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Workspace: config.Workspace{
|
||||
ArtifactPath: "/Volumes/catalogN/schemaN/volumeN/abc",
|
||||
},
|
||||
Resources: config.Resources{
|
||||
Volumes: map[string]*resources.Volume{
|
||||
"foo": {
|
||||
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
|
||||
CatalogName: "catalogN",
|
||||
Name: "volumeN",
|
||||
SchemaName: "schemaN",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "file", Line: 1, Column: 1}})
|
||||
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{{File: "file", Line: 2, Column: 2}})
|
||||
|
||||
ctx := context.Background()
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
api := m.GetMockVolumesAPI()
|
||||
api.EXPECT().ReadByName(mock.Anything, "catalogN.schemaN.volumeN").Return(nil, &apierr.APIError{
|
||||
StatusCode: 404,
|
||||
})
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), ValidateArtifactPath())
|
||||
assert.Equal(t, diag.Diagnostics{{
|
||||
Severity: diag.Error,
|
||||
Summary: "volume catalogN.schemaN.volumeN does not exist",
|
||||
Locations: []dyn.Location{
|
||||
{File: "file", Line: 1, Column: 1},
|
||||
{File: "file", Line: 2, Column: 2},
|
||||
},
|
||||
Paths: []dyn.Path{
|
||||
dyn.MustPathFromString("workspace.artifact_path"),
|
||||
dyn.MustPathFromString("resources.volumes.foo"),
|
||||
},
|
||||
Detail: `You are using a volume in your artifact_path that is managed by
|
||||
this bundle but which has not been deployed yet. Please first deploy
|
||||
the volume using 'bundle deploy' and then switch over to using it in
|
||||
the artifact_path.`,
|
||||
}}, diags)
|
||||
}
|
||||
|
||||
func TestValidateArtifactPath(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Workspace: config.Workspace{
|
||||
ArtifactPath: "/Volumes/catalogN/schemaN/volumeN/abc",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "file", Line: 1, Column: 1}})
|
||||
assertDiags := func(t *testing.T, diags diag.Diagnostics, expected string) {
|
||||
assert.Len(t, diags, 1)
|
||||
assert.Equal(t, diag.Diagnostics{{
|
||||
Severity: diag.Error,
|
||||
Summary: expected,
|
||||
Locations: []dyn.Location{{File: "file", Line: 1, Column: 1}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
|
||||
}}, diags)
|
||||
}
|
||||
|
||||
rb := bundle.ReadOnly(b)
|
||||
ctx := context.Background()
|
||||
|
||||
tcases := []struct {
|
||||
err error
|
||||
expectedSummary string
|
||||
}{
|
||||
{
|
||||
err: &apierr.APIError{
|
||||
StatusCode: 403,
|
||||
Message: "User does not have USE SCHEMA on Schema 'catalogN.schemaN'",
|
||||
},
|
||||
expectedSummary: "cannot access volume catalogN.schemaN.volumeN: User does not have USE SCHEMA on Schema 'catalogN.schemaN'",
|
||||
},
|
||||
{
|
||||
err: &apierr.APIError{
|
||||
StatusCode: 404,
|
||||
},
|
||||
expectedSummary: "volume catalogN.schemaN.volumeN does not exist",
|
||||
},
|
||||
{
|
||||
err: &apierr.APIError{
|
||||
StatusCode: 500,
|
||||
Message: "Internal Server Error",
|
||||
},
|
||||
expectedSummary: "cannot read volume catalogN.schemaN.volumeN: Internal Server Error",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcases {
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
api := m.GetMockVolumesAPI()
|
||||
api.EXPECT().ReadByName(mock.Anything, "catalogN.schemaN.volumeN").Return(nil, tc.err)
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, rb, ValidateArtifactPath())
|
||||
assertDiags(t, diags, tc.expectedSummary)
|
||||
}
|
||||
}
|
||||
|
||||
func invalidVolumePaths() []string {
|
||||
return []string{
|
||||
"/Volumes/",
|
||||
"/Volumes/main",
|
||||
"/Volumes/main/",
|
||||
"/Volumes/main//",
|
||||
"/Volumes/main//my_schema",
|
||||
"/Volumes/main/my_schema",
|
||||
"/Volumes/main/my_schema/",
|
||||
"/Volumes/main/my_schema//",
|
||||
"/Volumes//my_schema/my_volume",
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractVolumeFromPath(t *testing.T) {
|
||||
catalogName, schemaName, volumeName, err := extractVolumeFromPath("/Volumes/main/my_schema/my_volume")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "main", catalogName)
|
||||
assert.Equal(t, "my_schema", schemaName)
|
||||
assert.Equal(t, "my_volume", volumeName)
|
||||
|
||||
for _, p := range invalidVolumePaths() {
|
||||
_, _, _, err := extractVolumeFromPath(p)
|
||||
assert.EqualError(t, err, fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", p))
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateArtifactPathWithInvalidPaths(t *testing.T) {
|
||||
for _, p := range invalidVolumePaths() {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Workspace: config.Workspace{
|
||||
ArtifactPath: p,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}})
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), ValidateArtifactPath())
|
||||
require.Equal(t, diag.Diagnostics{{
|
||||
Severity: diag.Error,
|
||||
Summary: fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", p),
|
||||
Locations: []dyn.Location{{File: "config.yml", Line: 1, Column: 2}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
|
||||
}}, diags)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFindVolumeInBundle(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Volumes: map[string]*resources.Volume{
|
||||
"foo": {
|
||||
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
|
||||
CatalogName: "main",
|
||||
Name: "my_volume",
|
||||
SchemaName: "my_schema",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{
|
||||
{
|
||||
File: "volume.yml",
|
||||
Line: 1,
|
||||
Column: 2,
|
||||
},
|
||||
})
|
||||
|
||||
// volume is in DAB.
|
||||
path, locations, ok := findVolumeInBundle(b.Config, "main", "my_schema", "my_volume")
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []dyn.Location{{
|
||||
File: "volume.yml",
|
||||
Line: 1,
|
||||
Column: 2,
|
||||
}}, locations)
|
||||
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
|
||||
|
||||
// wrong volume name
|
||||
_, _, ok = findVolumeInBundle(b.Config, "main", "my_schema", "doesnotexist")
|
||||
assert.False(t, ok)
|
||||
|
||||
// wrong schema name
|
||||
_, _, ok = findVolumeInBundle(b.Config, "main", "doesnotexist", "my_volume")
|
||||
assert.False(t, ok)
|
||||
|
||||
// wrong catalog name
|
||||
_, _, ok = findVolumeInBundle(b.Config, "doesnotexist", "my_schema", "my_volume")
|
||||
assert.False(t, ok)
|
||||
|
||||
// schema name is interpolated but does not have the right prefix. In this case
|
||||
// we should not match the volume.
|
||||
b.Config.Resources.Volumes["foo"].SchemaName = "${foo.bar.baz}"
|
||||
_, _, ok = findVolumeInBundle(b.Config, "main", "my_schema", "my_volume")
|
||||
assert.False(t, ok)
|
||||
|
||||
// schema name is interpolated.
|
||||
b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema.name}"
|
||||
path, locations, ok = findVolumeInBundle(b.Config, "main", "valuedoesnotmatter", "my_volume")
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []dyn.Location{{
|
||||
File: "volume.yml",
|
||||
Line: 1,
|
||||
Column: 2,
|
||||
}}, locations)
|
||||
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
|
||||
}
|
|
@ -24,7 +24,7 @@ func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, s
|
|||
|
||||
switch {
|
||||
case IsVolumesPath(artifactPath):
|
||||
return filerForVolume(ctx, b)
|
||||
return filerForVolume(b)
|
||||
|
||||
default:
|
||||
return filerForWorkspace(b)
|
||||
|
|
|
@ -7,10 +7,7 @@ import (
|
|||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/libs/filer"
|
||||
sdkconfig "github.com/databricks/databricks-sdk-go/config"
|
||||
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -39,11 +36,6 @@ func TestGetFilerForLibrariesValidUcVolume(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
||||
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil)
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
client, uploadPath, diags := GetFilerForLibraries(context.Background(), b)
|
||||
require.NoError(t, diags.Error())
|
||||
assert.Equal(t, "/Volumes/main/my_schema/my_volume/.internal", uploadPath)
|
||||
|
|
|
@ -1,132 +1,16 @@
|
|||
package libraries
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/dyn/dynvar"
|
||||
"github.com/databricks/cli/libs/filer"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
)
|
||||
|
||||
func extractVolumeFromPath(artifactPath string) (string, string, string, error) {
|
||||
if !IsVolumesPath(artifactPath) {
|
||||
return "", "", "", fmt.Errorf("expected artifact_path to start with /Volumes/, got %s", artifactPath)
|
||||
}
|
||||
|
||||
parts := strings.Split(artifactPath, "/")
|
||||
volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", artifactPath)
|
||||
|
||||
// Incorrect format.
|
||||
if len(parts) < 5 {
|
||||
return "", "", "", volumeFormatErr
|
||||
}
|
||||
|
||||
catalogName := parts[2]
|
||||
schemaName := parts[3]
|
||||
volumeName := parts[4]
|
||||
|
||||
// Incorrect format.
|
||||
if catalogName == "" || schemaName == "" || volumeName == "" {
|
||||
return "", "", "", volumeFormatErr
|
||||
}
|
||||
|
||||
return catalogName, schemaName, volumeName, nil
|
||||
}
|
||||
|
||||
// This function returns a filer for ".internal" folder inside the directory configured
|
||||
// at `workspace.artifact_path`.
|
||||
// This function also checks if the UC volume exists in the workspace and then:
|
||||
// 1. If the UC volume exists in the workspace:
|
||||
// Returns a filer for the UC volume.
|
||||
// 2. If the UC volume does not exist in the workspace but is (with high confidence) defined in
|
||||
// the bundle configuration:
|
||||
// Returns an error and a warning that instructs the user to deploy the
|
||||
// UC volume before using it in the artifact path.
|
||||
// 3. If the UC volume does not exist in the workspace and is not defined in the bundle configuration:
|
||||
// Returns an error.
|
||||
func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) {
|
||||
artifactPath := b.Config.Workspace.ArtifactPath
|
||||
func filerForVolume(b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) {
|
||||
w := b.WorkspaceClient()
|
||||
|
||||
catalogName, schemaName, volumeName, err := extractVolumeFromPath(artifactPath)
|
||||
if err != nil {
|
||||
return nil, "", diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Error,
|
||||
Summary: err.Error(),
|
||||
Locations: b.Config.GetLocations("workspace.artifact_path"),
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the UC volume exists in the workspace.
|
||||
volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName)
|
||||
err = w.Files.GetDirectoryMetadataByDirectoryPath(ctx, volumePath)
|
||||
|
||||
// If the volume exists already, directly return the filer for the path to
|
||||
// upload the artifacts to.
|
||||
if err == nil {
|
||||
uploadPath := path.Join(artifactPath, InternalDirName)
|
||||
f, err := filer.NewFilesClient(w, uploadPath)
|
||||
return f, uploadPath, diag.FromErr(err)
|
||||
}
|
||||
|
||||
baseErr := diag.Diagnostic{
|
||||
Severity: diag.Error,
|
||||
Summary: fmt.Sprintf("unable to determine if volume at %s exists: %s", volumePath, err),
|
||||
Locations: b.Config.GetLocations("workspace.artifact_path"),
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
|
||||
}
|
||||
|
||||
if errors.Is(err, apierr.ErrNotFound) {
|
||||
// Since the API returned a 404, the volume does not exist.
|
||||
// Modify the error message to provide more context.
|
||||
baseErr.Summary = fmt.Sprintf("volume %s does not exist: %s", volumePath, err)
|
||||
|
||||
// If the volume is defined in the bundle, provide a more helpful error diagnostic,
|
||||
// with more details and location information.
|
||||
path, locations, ok := findVolumeInBundle(b, catalogName, schemaName, volumeName)
|
||||
if !ok {
|
||||
return nil, "", diag.Diagnostics{baseErr}
|
||||
}
|
||||
baseErr.Detail = `You are using a volume in your artifact_path that is managed by
|
||||
this bundle but which has not been deployed yet. Please first deploy
|
||||
the volume using 'bundle deploy' and then switch over to using it in
|
||||
the artifact_path.`
|
||||
baseErr.Paths = append(baseErr.Paths, path)
|
||||
baseErr.Locations = append(baseErr.Locations, locations...)
|
||||
}
|
||||
|
||||
return nil, "", diag.Diagnostics{baseErr}
|
||||
}
|
||||
|
||||
func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) {
|
||||
volumes := b.Config.Resources.Volumes
|
||||
for k, v := range volumes {
|
||||
if v.CatalogName != catalogName || v.Name != volumeName {
|
||||
continue
|
||||
}
|
||||
// UC schemas can be defined in the bundle itself, and thus might be interpolated
|
||||
// at runtime via the ${resources.schemas.<name>} syntax. Thus we match the volume
|
||||
// definition if the schema name is the same as the one in the bundle, or if the
|
||||
// schema name is interpolated.
|
||||
// We only have to check for ${resources.schemas...} references because any
|
||||
// other valid reference (like ${var.foo}) would have been interpolated by this point.
|
||||
p, ok := dynvar.PureReferenceToPath(v.SchemaName)
|
||||
isSchemaDefinedInBundle := ok && p.HasPrefix(dyn.Path{dyn.Key("resources"), dyn.Key("schemas")})
|
||||
if v.SchemaName != schemaName && !isSchemaDefinedInBundle {
|
||||
continue
|
||||
}
|
||||
pathString := fmt.Sprintf("resources.volumes.%s", k)
|
||||
return dyn.MustPathFromString(pathString), b.Config.GetLocations(pathString), true
|
||||
}
|
||||
return nil, nil, false
|
||||
uploadPath := path.Join(b.Config.Workspace.ArtifactPath, InternalDirName)
|
||||
f, err := filer.NewFilesClient(w, uploadPath)
|
||||
return f, uploadPath, diag.FromErr(err)
|
||||
}
|
||||
|
|
|
@ -1,277 +1,27 @@
|
|||
package libraries
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/cli/bundle/internal/bundletest"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/filer"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
sdkconfig "github.com/databricks/databricks-sdk-go/config"
|
||||
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
||||
"github.com/databricks/databricks-sdk-go/service/catalog"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestFindVolumeInBundle(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Volumes: map[string]*resources.Volume{
|
||||
"foo": {
|
||||
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
|
||||
CatalogName: "main",
|
||||
Name: "my_volume",
|
||||
SchemaName: "my_schema",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{
|
||||
{
|
||||
File: "volume.yml",
|
||||
Line: 1,
|
||||
Column: 2,
|
||||
},
|
||||
})
|
||||
|
||||
// volume is in DAB.
|
||||
path, locations, ok := findVolumeInBundle(b, "main", "my_schema", "my_volume")
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []dyn.Location{{
|
||||
File: "volume.yml",
|
||||
Line: 1,
|
||||
Column: 2,
|
||||
}}, locations)
|
||||
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
|
||||
|
||||
// wrong volume name
|
||||
_, _, ok = findVolumeInBundle(b, "main", "my_schema", "doesnotexist")
|
||||
assert.False(t, ok)
|
||||
|
||||
// wrong schema name
|
||||
_, _, ok = findVolumeInBundle(b, "main", "doesnotexist", "my_volume")
|
||||
assert.False(t, ok)
|
||||
|
||||
// wrong catalog name
|
||||
_, _, ok = findVolumeInBundle(b, "doesnotexist", "my_schema", "my_volume")
|
||||
assert.False(t, ok)
|
||||
|
||||
// schema name is interpolated but does not have the right prefix. In this case
|
||||
// we should not match the volume.
|
||||
b.Config.Resources.Volumes["foo"].SchemaName = "${foo.bar.baz}"
|
||||
_, _, ok = findVolumeInBundle(b, "main", "my_schema", "my_volume")
|
||||
assert.False(t, ok)
|
||||
|
||||
// schema name is interpolated.
|
||||
b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema.name}"
|
||||
path, locations, ok = findVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume")
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, []dyn.Location{{
|
||||
File: "volume.yml",
|
||||
Line: 1,
|
||||
Column: 2,
|
||||
}}, locations)
|
||||
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
|
||||
}
|
||||
|
||||
func TestFilerForVolumeForErrorFromAPI(t *testing.T) {
|
||||
func TestFilerForVolume(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Workspace: config.Workspace{
|
||||
ArtifactPath: "/Volumes/main/my_schema/my_volume",
|
||||
ArtifactPath: "/Volumes/main/my_schema/my_volume/abc",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}})
|
||||
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
||||
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(fmt.Errorf("error from API"))
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
_, _, diags := filerForVolume(context.Background(), b)
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Error,
|
||||
Summary: "unable to determine if volume at /Volumes/main/my_schema/my_volume exists: error from API",
|
||||
Locations: []dyn.Location{{File: "config.yml", Line: 1, Column: 2}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
|
||||
},
|
||||
}, diags)
|
||||
}
|
||||
|
||||
func TestFilerForVolumeWithVolumeNotFound(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Workspace: config.Workspace{
|
||||
ArtifactPath: "/Volumes/main/my_schema/doesnotexist",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}})
|
||||
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
||||
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/doesnotexist").Return(apierr.NotFound("some error message"))
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
_, _, diags := filerForVolume(context.Background(), b)
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Error,
|
||||
Summary: "volume /Volumes/main/my_schema/doesnotexist does not exist: some error message",
|
||||
Locations: []dyn.Location{{File: "config.yml", Line: 1, Column: 2}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
|
||||
},
|
||||
}, diags)
|
||||
}
|
||||
|
||||
func TestFilerForVolumeNotFoundAndInBundle(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Workspace: config.Workspace{
|
||||
ArtifactPath: "/Volumes/main/my_schema/my_volume",
|
||||
},
|
||||
Resources: config.Resources{
|
||||
Volumes: map[string]*resources.Volume{
|
||||
"foo": {
|
||||
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
|
||||
CatalogName: "main",
|
||||
Name: "my_volume",
|
||||
VolumeType: "MANAGED",
|
||||
SchemaName: "my_schema",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}})
|
||||
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{{File: "volume.yml", Line: 1, Column: 2}})
|
||||
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
||||
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(apierr.NotFound("error from API"))
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
_, _, diags := GetFilerForLibraries(context.Background(), b)
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Error,
|
||||
Summary: "volume /Volumes/main/my_schema/my_volume does not exist: error from API",
|
||||
Locations: []dyn.Location{{File: "config.yml", Line: 1, Column: 2}, {File: "volume.yml", Line: 1, Column: 2}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path"), dyn.MustPathFromString("resources.volumes.foo")},
|
||||
Detail: `You are using a volume in your artifact_path that is managed by
|
||||
this bundle but which has not been deployed yet. Please first deploy
|
||||
the volume using 'bundle deploy' and then switch over to using it in
|
||||
the artifact_path.`,
|
||||
},
|
||||
}, diags)
|
||||
}
|
||||
|
||||
func invalidVolumePaths() []string {
|
||||
return []string{
|
||||
"/Volumes/",
|
||||
"/Volumes/main",
|
||||
"/Volumes/main/",
|
||||
"/Volumes/main//",
|
||||
"/Volumes/main//my_schema",
|
||||
"/Volumes/main/my_schema",
|
||||
"/Volumes/main/my_schema/",
|
||||
"/Volumes/main/my_schema//",
|
||||
"/Volumes//my_schema/my_volume",
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilerForVolumeWithInvalidVolumePaths(t *testing.T) {
|
||||
for _, p := range invalidVolumePaths() {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Workspace: config.Workspace{
|
||||
ArtifactPath: p,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}})
|
||||
|
||||
_, _, diags := GetFilerForLibraries(context.Background(), b)
|
||||
require.Equal(t, diag.Diagnostics{{
|
||||
Severity: diag.Error,
|
||||
Summary: fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", p),
|
||||
Locations: []dyn.Location{{File: "config.yml", Line: 1, Column: 2}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
|
||||
}}, diags)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilerForVolumeWithInvalidPrefix(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Workspace: config.Workspace{
|
||||
ArtifactPath: "/Volume/main/my_schema/my_volume",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, _, diags := filerForVolume(context.Background(), b)
|
||||
require.EqualError(t, diags.Error(), "expected artifact_path to start with /Volumes/, got /Volume/main/my_schema/my_volume")
|
||||
}
|
||||
|
||||
func TestFilerForVolumeWithValidVolumePaths(t *testing.T) {
|
||||
validPaths := []string{
|
||||
"/Volumes/main/my_schema/my_volume",
|
||||
"/Volumes/main/my_schema/my_volume/",
|
||||
"/Volumes/main/my_schema/my_volume/a/b/c",
|
||||
"/Volumes/main/my_schema/my_volume/a/a/a",
|
||||
}
|
||||
|
||||
for _, p := range validPaths {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Workspace: config.Workspace{
|
||||
ArtifactPath: p,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
||||
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil)
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
client, uploadPath, diags := filerForVolume(context.Background(), b)
|
||||
require.NoError(t, diags.Error())
|
||||
assert.Equal(t, path.Join(p, ".internal"), uploadPath)
|
||||
assert.IsType(t, &filer.FilesClient{}, client)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractVolumeFromPath(t *testing.T) {
|
||||
catalogName, schemaName, volumeName, err := extractVolumeFromPath("/Volumes/main/my_schema/my_volume")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "main", catalogName)
|
||||
assert.Equal(t, "my_schema", schemaName)
|
||||
assert.Equal(t, "my_volume", volumeName)
|
||||
|
||||
for _, p := range invalidVolumePaths() {
|
||||
_, _, _, err := extractVolumeFromPath(p)
|
||||
assert.EqualError(t, err, fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", p))
|
||||
}
|
||||
client, uploadPath, diags := filerForVolume(b)
|
||||
require.NoError(t, diags.Error())
|
||||
assert.Equal(t, path.Join("/Volumes/main/my_schema/my_volume/abc/.internal"), uploadPath)
|
||||
assert.IsType(t, &filer.FilesClient{}, client)
|
||||
}
|
||||
|
|
|
@ -11,8 +11,6 @@ import (
|
|||
mockfiler "github.com/databricks/cli/internal/mocks/libs/filer"
|
||||
"github.com/databricks/cli/internal/testutil"
|
||||
"github.com/databricks/cli/libs/filer"
|
||||
sdkconfig "github.com/databricks/databricks-sdk-go/config"
|
||||
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/stretchr/testify/mock"
|
||||
|
@ -183,11 +181,6 @@ func TestArtifactUploadForVolumes(t *testing.T) {
|
|||
filer.CreateParentDirectories,
|
||||
).Return(nil)
|
||||
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
||||
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/foo/bar/artifacts").Return(nil)
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler)))
|
||||
require.NoError(t, diags.Error())
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config/validate"
|
||||
"github.com/databricks/cli/bundle/phases"
|
||||
"github.com/databricks/cli/bundle/render"
|
||||
"github.com/databricks/cli/cmd/bundle/utils"
|
||||
|
@ -71,6 +72,7 @@ func newDeployCommand() *cobra.Command {
|
|||
diags = diags.Extend(
|
||||
bundle.Apply(ctx, b, bundle.Seq(
|
||||
phases.Initialize(),
|
||||
validate.FastValidate(),
|
||||
phases.Build(),
|
||||
phases.Deploy(outputHandler),
|
||||
)),
|
||||
|
|
|
@ -257,7 +257,7 @@ func TestUploadArtifactFileToVolumeThatDoesNotExist(t *testing.T) {
|
|||
stdout, stderr, err := testcli.RequireErrorRun(t, ctx, "bundle", "deploy")
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, fmt.Sprintf(`Error: volume /Volumes/main/%s/doesnotexist does not exist: Not Found
|
||||
assert.Equal(t, fmt.Sprintf(`Error: volume main.%s.doesnotexist does not exist
|
||||
at workspace.artifact_path
|
||||
in databricks.yml:6:18
|
||||
|
||||
|
@ -293,7 +293,7 @@ func TestUploadArtifactToVolumeNotYetDeployed(t *testing.T) {
|
|||
stdout, stderr, err := testcli.RequireErrorRun(t, ctx, "bundle", "deploy")
|
||||
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, fmt.Sprintf(`Error: volume /Volumes/main/%s/my_volume does not exist: Not Found
|
||||
assert.Equal(t, fmt.Sprintf(`Error: volume main.%s.my_volume does not exist
|
||||
at workspace.artifact_path
|
||||
resources.volumes.foo
|
||||
in databricks.yml:6:18
|
||||
|
|
Loading…
Reference in New Issue