This commit is contained in:
Shreyas Goenka 2024-09-16 03:50:40 +02:00
parent bdecd08206
commit 274fd636e0
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
4 changed files with 47 additions and 40 deletions

View File

@ -129,15 +129,13 @@ func collectLocalLibraries(b *bundle.Bundle) (map[string][]configLocation, error
} }
func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
// If the client is not initialized, initialize it
// We use client field in mutator to allow for mocking client in testing
client, uploadPath, diags := GetFilerForLibraries(ctx, b) client, uploadPath, diags := GetFilerForLibraries(ctx, b)
if diags.HasError() { if diags.HasError() {
return diags return diags
} }
// Only set the filer client if it's not already set. This allows for using // Only set the filer client if it's not already set. We use client field
// a mock client in tests. // in mutator to mock the filer client in testing
if u.client == nil { if u.client == nil {
u.client = client u.client = client
} }
@ -191,7 +189,7 @@ func (u *upload) Name() string {
return "libraries.Upload" return "libraries.Upload"
} }
// This function returns the right filer to use, to upload artifacts to the configured locations. // This function returns the right filer to use, to upload artifacts to the configured location.
// Supported locations: // Supported locations:
// 1. WSFS // 1. WSFS
// 2. UC volumes // 2. UC volumes
@ -200,7 +198,7 @@ func (u *upload) Name() string {
// Then: // Then:
// 1. If the UC volume existing in the workspace: // 1. If the UC volume existing in the workspace:
// Returns a filer for the UC volume. // Returns a filer for the UC volume.
// 2. If the UC volume does not exist in the workspace but is very likely to be defined in // 2. If the UC volume does not exist in the workspace but is (with high confidence) defined in
// the bundle configuration: // the bundle configuration:
// Returns a warning along with the error that instructs the user to deploy the // Returns a warning along with the error that instructs the user to deploy the
// UC volume before using it in the artifact path. // UC volume before using it in the artifact path.
@ -212,11 +210,11 @@ func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, s
return nil, "", diag.Errorf("remote artifact path not configured") return nil, "", diag.Errorf("remote artifact path not configured")
} }
// path to upload artifact files to.
uploadPath := path.Join(artifactPath, ".internal")
w := b.WorkspaceClient() w := b.WorkspaceClient()
isVolumesPath := strings.HasPrefix(uploadPath, "/Volumes/") isVolumesPath := strings.HasPrefix(artifactPath, "/Volumes/")
// Path to upload artifact files to.
uploadPath := path.Join(artifactPath, ".internal")
// Return early with a WSFS filer if the artifact path is not a UC volume path. // Return early with a WSFS filer if the artifact path is not a UC volume path.
if !isVolumesPath { if !isVolumesPath {
@ -225,7 +223,7 @@ func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, s
} }
parts := strings.Split(artifactPath, "/") parts := strings.Split(artifactPath, "/")
volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<path>, got %s", uploadPath) volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", uploadPath)
// Incorrect format. // Incorrect format.
if len(parts) < 5 { if len(parts) < 5 {
@ -253,21 +251,21 @@ func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, s
diags := diag.Errorf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path: %s", volumePath, err) diags := diag.Errorf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path: %s", volumePath, err)
path, locations, ok := matchVolumeInBundle(b, catalogName, schemaName, volumeName) path, locations, ok := findVolumeInBundle(b, catalogName, schemaName, volumeName)
if !ok { if !ok {
return nil, "", diags return nil, "", diags
} }
warning := diag.Diagnostic{ warning := diag.Diagnostic{
Severity: diag.Warning, Severity: diag.Warning,
Summary: `the UC volume that is likely being used in the artifact_path has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.`, Summary: `You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.`,
Locations: locations, Locations: locations,
Paths: []dyn.Path{path}, Paths: []dyn.Path{path},
} }
return nil, "", diags.Append(warning) return nil, "", diags.Append(warning)
} }
func matchVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) { func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) {
volumes := b.Config.Resources.Volumes volumes := b.Config.Resources.Volumes
for k, v := range volumes { for k, v := range volumes {
if v.CatalogName != catalogName || v.Name != volumeName { if v.CatalogName != catalogName || v.Name != volumeName {
@ -277,7 +275,7 @@ func matchVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName s
// at runtime via the ${resources.schemas.<name>} syntax. Thus we match the volume // at runtime via the ${resources.schemas.<name>} syntax. Thus we match the volume
// definition if the schema name is the same as the one in the bundle, or if the // definition if the schema name is the same as the one in the bundle, or if the
// schema name is interpolated. // schema name is interpolated.
if v.SchemaName != schemaName && !dynvar.ContainsVariableReference(v.SchemaName) { if v.SchemaName != schemaName && !dynvar.IsPureVariableReference(v.SchemaName) {
continue continue
} }
pathString := fmt.Sprintf("resources.volumes.%s", k) pathString := fmt.Sprintf("resources.volumes.%s", k)

View File

@ -344,7 +344,7 @@ func TestUploadMultipleLibraries(t *testing.T) {
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/Users/foo@bar.com/mywheel.whl") require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/Users/foo@bar.com/mywheel.whl")
} }
func TestMatchVolumeInBundle(t *testing.T) { func TestFindVolumeInBundle(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
@ -364,7 +364,7 @@ func TestMatchVolumeInBundle(t *testing.T) {
bundletest.SetLocation(b, "resources.volumes.foo", "volume.yml") bundletest.SetLocation(b, "resources.volumes.foo", "volume.yml")
// volume is in DAB. // volume is in DAB.
path, locations, ok := matchVolumeInBundle(b, "main", "my_schema", "my_volume") path, locations, ok := findVolumeInBundle(b, "main", "my_schema", "my_volume")
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, []dyn.Location{{ assert.Equal(t, []dyn.Location{{
File: "volume.yml", File: "volume.yml",
@ -372,16 +372,20 @@ func TestMatchVolumeInBundle(t *testing.T) {
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path) assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
// wrong volume name // wrong volume name
_, _, ok = matchVolumeInBundle(b, "main", "my_schema", "doesnotexist") _, _, ok = findVolumeInBundle(b, "main", "my_schema", "doesnotexist")
assert.False(t, ok) assert.False(t, ok)
// wrong schema name // wrong schema name
_, _, ok = matchVolumeInBundle(b, "main", "doesnotexist", "my_volume") _, _, ok = findVolumeInBundle(b, "main", "doesnotexist", "my_volume")
assert.False(t, ok)
// wrong catalog name
_, _, ok = findVolumeInBundle(b, "doesnotexist", "my_schema", "my_volume")
assert.False(t, ok) assert.False(t, ok)
// schema name is interpolated. // schema name is interpolated.
b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema}" b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema}"
path, locations, ok = matchVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume") path, locations, ok = findVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume")
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, []dyn.Location{{ assert.Equal(t, []dyn.Location{{
File: "volume.yml", File: "volume.yml",
@ -442,7 +446,8 @@ func TestGetFilerForLibraries(t *testing.T) {
b.SetWorkpaceClient(m.WorkspaceClient) b.SetWorkpaceClient(m.WorkspaceClient)
_, _, diags := GetFilerForLibraries(context.Background(), b) _, _, diags := GetFilerForLibraries(context.Background(), b)
require.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/doesnotexist that is configured in the artifact_path: error from API") assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/doesnotexist that is configured in the artifact_path: error from API")
assert.Len(t, diags, 1)
}) })
t.Run("volume in DAB config", func(t *testing.T) { t.Run("volume in DAB config", func(t *testing.T) {
@ -477,7 +482,7 @@ func TestGetFilerForLibraries(t *testing.T) {
assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/my_volume that is configured in the artifact_path: error from API") assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/my_volume that is configured in the artifact_path: error from API")
assert.Contains(t, diags, diag.Diagnostic{ assert.Contains(t, diags, diag.Diagnostic{
Severity: diag.Warning, Severity: diag.Warning,
Summary: "the UC volume that is likely being used in the artifact_path has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.", Summary: "You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.",
Locations: []dyn.Location{{ Locations: []dyn.Location{{
File: "volume.yml", File: "volume.yml",
}}, }},
@ -515,7 +520,7 @@ func TestGetFilerForLibraries(t *testing.T) {
} }
_, _, diags := GetFilerForLibraries(context.Background(), b) _, _, diags := GetFilerForLibraries(context.Background(), b)
require.EqualError(t, diags.Error(), fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<path>, got %s", path.Join(p, ".internal"))) require.EqualError(t, diags.Error(), fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", path.Join(p, ".internal")))
} }
}) })
} }

View File

@ -14,10 +14,12 @@ import (
"github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/internal" "github.com/databricks/cli/internal"
"github.com/databricks/cli/internal/acc" "github.com/databricks/cli/internal/acc"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -272,7 +274,7 @@ func TestAccUploadArtifactFileToInvalidVolume(t *testing.T) {
} }
diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload())) diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload()))
require.EqualError(t, diags.Error(), fmt.Sprintf("the bundle is configured to upload artifacts to %s but a UC volume at %s does not exist", path.Join(volumePath, ".internal"), volumePath)) assert.ErrorContains(t, diags.Error(), fmt.Sprintf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path:", volumePath))
}) })
t.Run("volume in DAB config", func(t *testing.T) { t.Run("volume in DAB config", func(t *testing.T) {
@ -318,17 +320,23 @@ func TestAccUploadArtifactFileToInvalidVolume(t *testing.T) {
}) })
diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload())) diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload()))
require.EqualError( assert.Contains(t, diags, diag.Diagnostic{
t, Severity: diag.Error,
diags.Error(), Summary: fmt.Sprintf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path: Not Found", volumePath),
fmt.Sprintf(`the bundle is configured to upload artifacts to %s but a })
UC volume at %s does not exist. Note: We detected that you have a UC volume assert.Contains(t, diags, diag.Diagnostic{
defined that matched the path above at %s. Please deploy the UC volume Severity: diag.Warning,
in a separate deployment before using it in as a destination to upload Summary: "You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.",
artifacts.`, path.Join(volumePath, ".internal"), volumePath, dyn.Location{ Locations: []dyn.Location{
File: filepath.Join(dir, "databricks.yml"), {
Line: 1, File: filepath.Join(dir, "databricks.yml"),
Column: 2, Line: 1,
})) Column: 2,
},
},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.volumes.foo"),
},
})
}) })
} }

View File

@ -71,7 +71,3 @@ func (v ref) references() []string {
func IsPureVariableReference(s string) bool { func IsPureVariableReference(s string) bool {
return len(s) > 0 && re.FindString(s) == s return len(s) > 0 && re.FindString(s) == s
} }
func ContainsVariableReference(s string) bool {
return len(s) > 0 && re.FindString(s) != ""
}