Compare commits

...

5 Commits

Author SHA1 Message Date
Shreyas Goenka 6192835d63
add custom prefixing behaviour for volumes 2024-10-15 18:04:20 +02:00
Shreyas Goenka d241c2b39c
add integration test for grant on volume 2024-10-15 16:05:23 +02:00
Shreyas Goenka 3e3ddfd0cb
fix test 2024-10-15 15:29:24 +02:00
Shreyas Goenka eb94cd6717
- 2024-10-15 15:27:58 +02:00
Shreyas Goenka c5a02ef8fb
split into filer files 2024-10-15 15:22:41 +02:00
14 changed files with 618 additions and 305 deletions

View File

@ -11,6 +11,8 @@ import (
"github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/dynvar"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/textutil" "github.com/databricks/cli/libs/textutil"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
@ -194,8 +196,25 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
} }
// Apply the prefix to volumes // Apply the prefix to volumes
for i := range r.Volumes { for _, v := range r.Volumes {
r.Volumes[i].Name = normalizePrefix(prefix) + r.Volumes[i].Name if containsUserIdentity(v.CatalogName, b.Config.Workspace.CurrentUser) {
log.Debugf(ctx, "Skipping prefix for volume %s because catalog %s contains the current user's identity", v.Name, v.CatalogName)
continue
}
if containsUserIdentity(v.SchemaName, b.Config.Workspace.CurrentUser) {
log.Debugf(ctx, "Skipping prefix for volume %s because schema %s contains the current user's identity", v.Name, v.SchemaName)
continue
}
// We only have to check for ${resources.schemas...} references because any
// other valid reference (like ${var.foo}) would have been interpolated by this point.
if p, ok := dynvar.PureReferenceToPath(v.SchemaName); ok && p.HasPrefix(dyn.Path{dyn.Key("resources"), dyn.Key("schemas")}) {
log.Debugf(ctx, "Skipping prefix for volume %s because schema name %s is defined in the bundle and will be interpolated at runtime", v.Name, v.SchemaName)
continue
}
v.Name = normalizePrefix(prefix) + v.Name
// HTTP API for volumes doesn't yet support tags. It's only supported in // HTTP API for volumes doesn't yet support tags. It's only supported in
// the Databricks UI and via the SQL API. // the Databricks UI and via the SQL API.
} }

View File

@ -9,7 +9,9 @@ import (
"github.com/databricks/cli/bundle/config/mutator" "github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -125,6 +127,15 @@ func TestApplyPresetsPrefixForUcSchema(t *testing.T) {
} }
} }
func TestApplyPresentsReminderToAddSupportForSkippingPrefixes(t *testing.T) {
_, ok := config.SupportedResources()["catalogs"]
assert.False(t, ok,
`Since you are adding support for UC catalogs to DABs please ensure that
you add logic to skip applying presets.name_prefix for UC schemas, UC volumes and
any other resources that fall under a catalog in the UC hierarchy (like registered models).
Once you do so feel free to remove this test.`)
}
func TestApplyPresetsPrefixForUcVolumes(t *testing.T) { func TestApplyPresetsPrefixForUcVolumes(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -138,6 +149,8 @@ func TestApplyPresetsPrefixForUcVolumes(t *testing.T) {
volume: &resources.Volume{ volume: &resources.Volume{
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{ CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
Name: "volume1", Name: "volume1",
CatalogName: "catalog1",
SchemaName: "schema1",
}, },
}, },
want: "prefix_volume1", want: "prefix_volume1",
@ -152,6 +165,72 @@ func TestApplyPresetsPrefixForUcVolumes(t *testing.T) {
}, },
want: "volume1", want: "volume1",
}, },
{
name: "skip prefix when catalog name contains user short name",
prefix: "[prefix]",
volume: &resources.Volume{
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
Name: "volume1",
CatalogName: "dev_john_wick_targets",
},
},
want: "volume1",
},
{
name: "skip prefix when catalog name contains user email",
prefix: "[prefix]",
volume: &resources.Volume{
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
Name: "volume1",
CatalogName: "dev_john.wick@continental.com_targets",
},
},
want: "volume1",
},
{
name: "skip prefix when schema name contains user short name",
prefix: "[prefix]",
volume: &resources.Volume{
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
Name: "volume1",
SchemaName: "dev_john_wick_weapons",
},
},
want: "volume1",
},
{
name: "skip prefix when schema name contains user email",
prefix: "[prefix]",
volume: &resources.Volume{
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
Name: "volume1",
SchemaName: "dev_john.wick@continental.com_targets",
},
},
want: "volume1",
},
{
name: "skip prefix when schema is defined in the bundle and will be interpolated at runtime",
prefix: "[prefix]",
volume: &resources.Volume{
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
Name: "volume1",
SchemaName: "${resources.schemas.schema1.name}",
},
},
want: "volume1",
},
{
name: "add prefix when schema is a reference without the resources.schemas prefix",
prefix: "[prefix]",
volume: &resources.Volume{
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
Name: "volume1",
SchemaName: "${foo.bar.baz}",
},
},
want: "prefix_volume1",
},
} }
for _, tt := range tests { for _, tt := range tests {
@ -166,6 +245,14 @@ func TestApplyPresetsPrefixForUcVolumes(t *testing.T) {
Presets: config.Presets{ Presets: config.Presets{
NamePrefix: tt.prefix, NamePrefix: tt.prefix,
}, },
Workspace: config.Workspace{
CurrentUser: &config.User{
ShortName: "john_wick",
User: &iam.User{
UserName: "john.wick@continental.com",
},
},
},
}, },
} }

View File

@ -63,6 +63,10 @@ func transformDevelopmentMode(ctx context.Context, b *bundle.Bundle) {
} }
} }
func containsUserIdentity(s string, u *config.User) bool {
return strings.Contains(s, u.ShortName) || strings.Contains(s, u.UserName)
}
func validateDevelopmentMode(b *bundle.Bundle) diag.Diagnostics { func validateDevelopmentMode(b *bundle.Bundle) diag.Diagnostics {
var diags diag.Diagnostics var diags diag.Diagnostics
p := b.Config.Presets p := b.Config.Presets
@ -92,7 +96,7 @@ func validateDevelopmentMode(b *bundle.Bundle) diag.Diagnostics {
diags = diags.Extend(diag.Errorf("%s must start with '~/' or contain the current username to ensure uniqueness when using 'mode: development'", path)) diags = diags.Extend(diag.Errorf("%s must start with '~/' or contain the current username to ensure uniqueness when using 'mode: development'", path))
} }
} }
if p.NamePrefix != "" && !strings.Contains(p.NamePrefix, u.ShortName) && !strings.Contains(p.NamePrefix, u.UserName) { if p.NamePrefix != "" && !containsUserIdentity(p.NamePrefix, u) {
// Resources such as pipelines require a unique name, e.g. '[dev steve] my_pipeline'. // Resources such as pipelines require a unique name, e.g. '[dev steve] my_pipeline'.
// For this reason we require the name prefix to contain the current username; // For this reason we require the name prefix to contain the current username;
// it's a pitfall for users if they don't include it and later find out that // it's a pitfall for users if they don't include it and later find out that

29
bundle/libraries/filer.go Normal file
View File

@ -0,0 +1,29 @@
package libraries
import (
"context"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer"
)
// This function returns the right filer to use, to upload artifacts to the configured location.
// Supported locations:
// 1. WSFS
// 2. UC volumes
func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) {
artifactPath := b.Config.Workspace.ArtifactPath
if artifactPath == "" {
return nil, "", diag.Errorf("remote artifact path not configured")
}
switch {
case strings.HasPrefix(artifactPath, "/Volumes/"):
return filerForVolume(ctx, b)
default:
return filerForWorkspace(b)
}
}

View File

@ -0,0 +1,63 @@
package libraries
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/filer"
sdkconfig "github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/experimental/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestGetFilerForLibrariesValidWsfs(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/foo/bar/artifacts",
},
},
}
client, uploadPath, diags := GetFilerForLibraries(context.Background(), b)
require.NoError(t, diags.Error())
assert.Equal(t, "/foo/bar/artifacts/.internal", uploadPath)
assert.IsType(t, &filer.WorkspaceFilesClient{}, client)
}
func TestGetFilerForLibrariesValidUcVolume(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volumes/main/my_schema/my_volume",
},
},
}
m := mocks.NewMockWorkspaceClient(t)
m.WorkspaceClient.Config = &sdkconfig.Config{}
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil)
b.SetWorkpaceClient(m.WorkspaceClient)
client, uploadPath, diags := GetFilerForLibraries(context.Background(), b)
require.NoError(t, diags.Error())
assert.Equal(t, "/Volumes/main/my_schema/my_volume/.internal", uploadPath)
assert.IsType(t, &filer.FilesClient{}, client)
}
func TestGetFilerForLibrariesRemotePathNotSet(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{},
},
}
_, _, diags := GetFilerForLibraries(context.Background(), b)
require.EqualError(t, diags.Error(), "remote artifact path not configured")
}

View File

@ -0,0 +1,101 @@
package libraries
import (
"context"
"fmt"
"path"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/dynvar"
"github.com/databricks/cli/libs/filer"
)
// This function returns a filer for ".internal" folder inside the directory configured
// at `workspace.artifact_path`.
// This function also checks if the UC volume exists in the workspace and then:
// 1. If the UC volume exists in the workspace:
// Returns a filer for the UC volume.
// 2. If the UC volume does not exist in the workspace but is (with high confidence) defined in
// the bundle configuration:
// Returns an error and a warning that instructs the user to deploy the
// UC volume before using it in the artifact path.
// 3. If the UC volume does not exist in the workspace and is not defined in the bundle configuration:
// Returns an error.
func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) {
artifactPath := b.Config.Workspace.ArtifactPath
w := b.WorkspaceClient()
if !strings.HasPrefix(artifactPath, "/Volumes/") {
return nil, "", diag.Errorf("expected artifact_path to start with /Volumes/, got %s", artifactPath)
}
parts := strings.Split(artifactPath, "/")
volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", artifactPath)
// Incorrect format.
if len(parts) < 5 {
return nil, "", diag.FromErr(volumeFormatErr)
}
catalogName := parts[2]
schemaName := parts[3]
volumeName := parts[4]
// Incorrect format.
if catalogName == "" || schemaName == "" || volumeName == "" {
return nil, "", diag.FromErr(volumeFormatErr)
}
// Check if the UC volume exists in the workspace.
volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName)
err := w.Files.GetDirectoryMetadataByDirectoryPath(ctx, volumePath)
// If the volume exists already, directly return the filer for the path to
// upload the artifacts to.
if err == nil {
uploadPath := path.Join(artifactPath, ".internal")
f, err := filer.NewFilesClient(w, uploadPath)
return f, uploadPath, diag.FromErr(err)
}
diags := diag.Errorf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path: %s", volumePath, err)
path, locations, ok := findVolumeInBundle(b, catalogName, schemaName, volumeName)
if !ok {
return nil, "", diags
}
warning := diag.Diagnostic{
Severity: diag.Warning,
Summary: `You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.`,
Locations: locations,
Paths: []dyn.Path{path},
}
return nil, "", diags.Append(warning)
}
func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) {
volumes := b.Config.Resources.Volumes
for k, v := range volumes {
if v.CatalogName != catalogName || v.Name != volumeName {
continue
}
// UC schemas can be defined in the bundle itself, and thus might be interpolated
// at runtime via the ${resources.schemas.<name>} syntax. Thus we match the volume
// definition if the schema name is the same as the one in the bundle, or if the
// schema name is interpolated.
// We only have to check for ${resources.schemas...} references because any
// other valid reference (like ${var.foo}) would have been interpolated by this point.
p, ok := dynvar.PureReferenceToPath(v.SchemaName)
isSchemaDefinedInBundle := ok && p.HasPrefix(dyn.Path{dyn.Key("resources"), dyn.Key("schemas")})
if v.SchemaName != schemaName && !isSchemaDefinedInBundle {
continue
}
pathString := fmt.Sprintf("resources.volumes.%s", k)
return dyn.MustPathFromString(pathString), b.Config.GetLocations(pathString), true
}
return nil, nil, false
}

View File

@ -0,0 +1,223 @@
package libraries
import (
"context"
"fmt"
"path"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/filer"
sdkconfig "github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/experimental/mocks"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestFindVolumeInBundle(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Volumes: map[string]*resources.Volume{
"foo": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "main",
Name: "my_volume",
SchemaName: "my_schema",
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{
{
File: "volume.yml",
Line: 1,
Column: 2,
},
})
// volume is in DAB.
path, locations, ok := findVolumeInBundle(b, "main", "my_schema", "my_volume")
assert.True(t, ok)
assert.Equal(t, []dyn.Location{{
File: "volume.yml",
Line: 1,
Column: 2,
}}, locations)
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
// wrong volume name
_, _, ok = findVolumeInBundle(b, "main", "my_schema", "doesnotexist")
assert.False(t, ok)
// wrong schema name
_, _, ok = findVolumeInBundle(b, "main", "doesnotexist", "my_volume")
assert.False(t, ok)
// wrong catalog name
_, _, ok = findVolumeInBundle(b, "doesnotexist", "my_schema", "my_volume")
assert.False(t, ok)
// schema name is interpolated but does not have the right prefix. In this case
// we should not match the volume.
b.Config.Resources.Volumes["foo"].SchemaName = "${foo.bar.baz}"
_, _, ok = findVolumeInBundle(b, "main", "my_schema", "my_volume")
assert.False(t, ok)
// schema name is interpolated.
b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema.name}"
path, locations, ok = findVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume")
assert.True(t, ok)
assert.Equal(t, []dyn.Location{{
File: "volume.yml",
Line: 1,
Column: 2,
}}, locations)
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
}
func TestFilerForVolumeNotInBundle(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volumes/main/my_schema/doesnotexist",
},
},
}
m := mocks.NewMockWorkspaceClient(t)
m.WorkspaceClient.Config = &sdkconfig.Config{}
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/doesnotexist").Return(fmt.Errorf("error from API"))
b.SetWorkpaceClient(m.WorkspaceClient)
_, _, diags := filerForVolume(context.Background(), b)
assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/doesnotexist that is configured in the artifact_path: error from API")
assert.Len(t, diags, 1)
}
func TestFilerForVolumeInBundle(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volumes/main/my_schema/my_volume",
},
Resources: config.Resources{
Volumes: map[string]*resources.Volume{
"foo": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "main",
Name: "my_volume",
VolumeType: "MANAGED",
SchemaName: "my_schema",
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{
{
File: "volume.yml",
Line: 1,
Column: 2,
},
})
m := mocks.NewMockWorkspaceClient(t)
m.WorkspaceClient.Config = &sdkconfig.Config{}
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(fmt.Errorf("error from API"))
b.SetWorkpaceClient(m.WorkspaceClient)
_, _, diags := GetFilerForLibraries(context.Background(), b)
assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/my_volume that is configured in the artifact_path: error from API")
assert.Contains(t, diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: "You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.",
Locations: []dyn.Location{{
File: "volume.yml",
Line: 1,
Column: 2,
}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.volumes.foo")},
})
}
func TestFilerForVolumeWithInvalidVolumePaths(t *testing.T) {
invalidPaths := []string{
"/Volumes/",
"/Volumes/main",
"/Volumes/main/",
"/Volumes/main//",
"/Volumes/main//my_schema",
"/Volumes/main/my_schema",
"/Volumes/main/my_schema/",
"/Volumes/main/my_schema//",
"/Volumes//my_schema/my_volume",
}
for _, p := range invalidPaths {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: p,
},
},
}
_, _, diags := GetFilerForLibraries(context.Background(), b)
require.EqualError(t, diags.Error(), fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", p))
}
}
func TestFilerForVolumeWithInvalidPrefix(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volume/main/my_schema/my_volume",
},
},
}
_, _, diags := filerForVolume(context.Background(), b)
require.EqualError(t, diags.Error(), "expected artifact_path to start with /Volumes/, got /Volume/main/my_schema/my_volume")
}
func TestFilerForVolumeWithValidlVolumePaths(t *testing.T) {
validPaths := []string{
"/Volumes/main/my_schema/my_volume",
"/Volumes/main/my_schema/my_volume/",
"/Volumes/main/my_schema/my_volume/a/b/c",
"/Volumes/main/my_schema/my_volume/a/a/a",
}
for _, p := range validPaths {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: p,
},
},
}
m := mocks.NewMockWorkspaceClient(t)
m.WorkspaceClient.Config = &sdkconfig.Config{}
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil)
b.SetWorkpaceClient(m.WorkspaceClient)
client, uploadPath, diags := filerForVolume(context.Background(), b)
require.NoError(t, diags.Error())
assert.Equal(t, path.Join(p, ".internal"), uploadPath)
assert.IsType(t, &filer.FilesClient{}, client)
}
}

View File

@ -0,0 +1,15 @@
package libraries
import (
"path"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer"
)
func filerForWorkspace(b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) {
uploadPath := path.Join(b.Config.Workspace.ArtifactPath, ".internal")
f, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), uploadPath)
return f, uploadPath, diag.FromErr(err)
}

View File

@ -13,7 +13,6 @@ import (
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/dynvar"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/log"
@ -189,101 +188,6 @@ func (u *upload) Name() string {
return "libraries.Upload" return "libraries.Upload"
} }
// This function returns the right filer to use, to upload artifacts to the configured location.
// Supported locations:
// 1. WSFS
// 2. UC volumes
//
// If a UC Volume is configured, this function checks if the UC volume exists in the workspace.
// Then:
// 1. If the UC volume exists in the workspace:
// Returns a filer for the UC volume.
// 2. If the UC volume does not exist in the workspace but is (with high confidence) defined in
// the bundle configuration:
// Returns an error and a warning that instructs the user to deploy the
// UC volume before using it in the artifact path.
// 3. If the UC volume does not exist in the workspace and is not defined in the bundle configuration:
// Returns an error.
func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) {
artifactPath := b.Config.Workspace.ArtifactPath
if artifactPath == "" {
return nil, "", diag.Errorf("remote artifact path not configured")
}
w := b.WorkspaceClient()
isVolumesPath := strings.HasPrefix(artifactPath, "/Volumes/")
// Path to upload artifact files to.
uploadPath := path.Join(artifactPath, ".internal")
// Return early with a WSFS filer if the artifact path is not a UC volume path.
if !isVolumesPath {
f, err := filer.NewWorkspaceFilesClient(w, uploadPath)
return f, uploadPath, diag.FromErr(err)
}
parts := strings.Split(artifactPath, "/")
volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", uploadPath)
// Incorrect format.
if len(parts) < 5 {
return nil, "", diag.FromErr(volumeFormatErr)
}
catalogName := parts[2]
schemaName := parts[3]
volumeName := parts[4]
// Incorrect format.
if catalogName == "" || schemaName == "" || volumeName == "" {
return nil, "", diag.FromErr(volumeFormatErr)
}
// Check if the UC volume exists in the workspace.
volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName)
err := w.Files.GetDirectoryMetadataByDirectoryPath(ctx, volumePath)
// If the volume exists already, directly return the filer for the upload path.
if err == nil {
f, err := filer.NewFilesClient(w, uploadPath)
return f, uploadPath, diag.FromErr(err)
}
diags := diag.Errorf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path: %s", volumePath, err)
path, locations, ok := findVolumeInBundle(b, catalogName, schemaName, volumeName)
if !ok {
return nil, "", diags
}
warning := diag.Diagnostic{
Severity: diag.Warning,
Summary: `You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.`,
Locations: locations,
Paths: []dyn.Path{path},
}
return nil, "", diags.Append(warning)
}
func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) {
volumes := b.Config.Resources.Volumes
for k, v := range volumes {
if v.CatalogName != catalogName || v.Name != volumeName {
continue
}
// UC schemas can be defined in the bundle itself, and thus might be interpolated
// at runtime via the ${resources.schemas.<name>} syntax. Thus we match the volume
// definition if the schema name is the same as the one in the bundle, or if the
// schema name is interpolated.
if v.SchemaName != schemaName && !dynvar.IsPureVariableReference(v.SchemaName) {
continue
}
pathString := fmt.Sprintf("resources.volumes.%s", k)
return dyn.MustPathFromString(pathString), b.Config.GetLocations(pathString), true
}
return nil, nil, false
}
// Function to upload file (a library, artifact and etc) to Workspace or UC volume // Function to upload file (a library, artifact and etc) to Workspace or UC volume
func UploadFile(ctx context.Context, file string, client filer.Filer) error { func UploadFile(ctx context.Context, file string, client filer.Filer) error {
filename := filepath.Base(file) filename := filepath.Base(file)

View File

@ -2,26 +2,19 @@ package libraries
import ( import (
"context" "context"
"fmt"
"path"
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/bundletest"
mockfiler "github.com/databricks/cli/internal/mocks/libs/filer" mockfiler "github.com/databricks/cli/internal/mocks/libs/filer"
"github.com/databricks/cli/internal/testutil" "github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
sdkconfig "github.com/databricks/databricks-sdk-go/config" sdkconfig "github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/experimental/mocks" "github.com/databricks/databricks-sdk-go/experimental/mocks"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -343,200 +336,3 @@ func TestUploadMultipleLibraries(t *testing.T) {
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source4.whl") require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source4.whl")
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/Users/foo@bar.com/mywheel.whl") require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/Users/foo@bar.com/mywheel.whl")
} }
func TestFindVolumeInBundle(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Volumes: map[string]*resources.Volume{
"foo": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "main",
Name: "my_volume",
SchemaName: "my_schema",
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{
{
File: "volume.yml",
Line: 1,
Column: 2,
},
})
// volume is in DAB.
path, locations, ok := findVolumeInBundle(b, "main", "my_schema", "my_volume")
assert.True(t, ok)
assert.Equal(t, []dyn.Location{{
File: "volume.yml",
Line: 1,
Column: 2,
}}, locations)
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
// wrong volume name
_, _, ok = findVolumeInBundle(b, "main", "my_schema", "doesnotexist")
assert.False(t, ok)
// wrong schema name
_, _, ok = findVolumeInBundle(b, "main", "doesnotexist", "my_volume")
assert.False(t, ok)
// wrong catalog name
_, _, ok = findVolumeInBundle(b, "doesnotexist", "my_schema", "my_volume")
assert.False(t, ok)
// schema name is interpolated.
b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema}"
path, locations, ok = findVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume")
assert.True(t, ok)
assert.Equal(t, []dyn.Location{{
File: "volume.yml",
Line: 1,
Column: 2,
}}, locations)
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
}
func TestGetFilerForLibrariesValidWsfs(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/foo/bar/artifacts",
},
},
}
client, uploadPath, diags := GetFilerForLibraries(context.Background(), b)
require.NoError(t, diags.Error())
assert.Equal(t, "/foo/bar/artifacts/.internal", uploadPath)
assert.IsType(t, &filer.WorkspaceFilesClient{}, client)
}
func TestGetFilerForLibrariesValidUcVolume(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volumes/main/my_schema/my_volume",
},
},
}
m := mocks.NewMockWorkspaceClient(t)
m.WorkspaceClient.Config = &sdkconfig.Config{}
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil)
b.SetWorkpaceClient(m.WorkspaceClient)
client, uploadPath, diags := GetFilerForLibraries(context.Background(), b)
require.NoError(t, diags.Error())
assert.Equal(t, "/Volumes/main/my_schema/my_volume/.internal", uploadPath)
assert.IsType(t, &filer.FilesClient{}, client)
}
func TestGetFilerForLibrariesVolumeNotInBundle(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volumes/main/my_schema/doesnotexist",
},
},
}
m := mocks.NewMockWorkspaceClient(t)
m.WorkspaceClient.Config = &sdkconfig.Config{}
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/doesnotexist").Return(fmt.Errorf("error from API"))
b.SetWorkpaceClient(m.WorkspaceClient)
_, _, diags := GetFilerForLibraries(context.Background(), b)
assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/doesnotexist that is configured in the artifact_path: error from API")
assert.Len(t, diags, 1)
}
func TestGetFilerForLibrariesVolumeInBundle(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volumes/main/my_schema/my_volume",
},
Resources: config.Resources{
Volumes: map[string]*resources.Volume{
"foo": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "main",
Name: "my_volume",
VolumeType: "MANAGED",
SchemaName: "my_schema",
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{
{
File: "volume.yml",
Line: 1,
Column: 2,
},
})
m := mocks.NewMockWorkspaceClient(t)
m.WorkspaceClient.Config = &sdkconfig.Config{}
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(fmt.Errorf("error from API"))
b.SetWorkpaceClient(m.WorkspaceClient)
_, _, diags := GetFilerForLibraries(context.Background(), b)
assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/my_volume that is configured in the artifact_path: error from API")
assert.Contains(t, diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: "You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.",
Locations: []dyn.Location{{
File: "volume.yml",
Line: 1,
Column: 2,
}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.volumes.foo")},
})
}
func TestGetFilerForLibrariesVolumeInBundleWithArtifactPath(t *testing.T) {
b := &bundle.Bundle{}
_, _, diags := GetFilerForLibraries(context.Background(), b)
require.EqualError(t, diags.Error(), "remote artifact path not configured")
}
func TestGetFilerForLibrariesWithInvalidVolumePaths(t *testing.T) {
invalidPaths := []string{
"/Volumes/",
"/Volumes/main",
"/Volumes/main/",
"/Volumes/main//",
"/Volumes/main//my_schema",
"/Volumes/main/my_schema",
"/Volumes/main/my_schema/",
"/Volumes/main/my_schema//",
"/Volumes//my_schema/my_volume",
}
for _, p := range invalidPaths {
b := &bundle.Bundle{
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: p,
},
},
}
_, _, diags := GetFilerForLibraries(context.Background(), b)
require.EqualError(t, diags.Error(), fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", path.Join(p, ".internal")))
}
}

View File

@ -24,3 +24,8 @@ resources:
schema_name: ${var.schema_name} schema_name: ${var.schema_name}
volume_type: MANAGED volume_type: MANAGED
comment: This volume was created from DABs. comment: This volume was created from DABs.
grants:
- principal: account users
privileges:
- WRITE_VOLUME

View File

@ -265,12 +265,20 @@ func TestAccDeployUcVolume(t *testing.T) {
catalogName := "main" catalogName := "main"
schemaName := "schema1-" + uniqueId schemaName := "schema1-" + uniqueId
volumeName := "my_volume" volumeName := "my_volume"
volume, err := w.Volumes.ReadByName(ctx, fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName)) fullName := fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName)
volume, err := w.Volumes.ReadByName(ctx, fullName)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, volume.Name, volumeName) require.Equal(t, volume.Name, volumeName)
require.Equal(t, catalogName, volume.CatalogName) require.Equal(t, catalogName, volume.CatalogName)
require.Equal(t, schemaName, volume.SchemaName) require.Equal(t, schemaName, volume.SchemaName)
// Assert that the grants were successfully applied.
grants, err := w.Grants.GetBySecurableTypeAndFullName(ctx, catalog.SecurableTypeVolume, fullName)
require.NoError(t, err)
assert.Len(t, grants.PrivilegeAssignments, 1)
assert.Equal(t, "account users", grants.PrivilegeAssignments[0].Principal)
assert.Equal(t, []catalog.Privilege{catalog.PrivilegeWriteVolume}, grants.PrivilegeAssignments[0].Privileges)
// Recreation of the volume without --auto-approve should fail since prompting is not possible // Recreation of the volume without --auto-approve should fail since prompting is not possible
t.Setenv("TERM", "dumb") t.Setenv("TERM", "dumb")
t.Setenv("BUNDLE_ROOT", bundleRoot) t.Setenv("BUNDLE_ROOT", bundleRoot)
@ -290,9 +298,17 @@ volumes the upstream data in the cloud tenant is not affected:
// Assert the volume is updated successfully // Assert the volume is updated successfully
schemaName = "schema2-" + uniqueId schemaName = "schema2-" + uniqueId
volume, err = w.Volumes.ReadByName(ctx, fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName)) fullName = fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName)
volume, err = w.Volumes.ReadByName(ctx, fullName)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, volume.Name, volumeName) require.Equal(t, volume.Name, volumeName)
require.Equal(t, catalogName, volume.CatalogName) require.Equal(t, catalogName, volume.CatalogName)
require.Equal(t, schemaName, volume.SchemaName) require.Equal(t, schemaName, volume.SchemaName)
// assert that the grants were applied / retained on recreate.
grants, err = w.Grants.GetBySecurableTypeAndFullName(ctx, catalog.SecurableTypeVolume, fullName)
require.NoError(t, err)
assert.Len(t, grants.PrivilegeAssignments, 1)
assert.Equal(t, "account users", grants.PrivilegeAssignments[0].Principal)
assert.Equal(t, []catalog.Privilege{catalog.PrivilegeWriteVolume}, grants.PrivilegeAssignments[0].Privileges)
} }

View File

@ -71,3 +71,23 @@ func (v ref) references() []string {
func IsPureVariableReference(s string) bool { func IsPureVariableReference(s string) bool {
return len(s) > 0 && re.FindString(s) == s return len(s) > 0 && re.FindString(s) == s
} }
// If s is a pure variable reference, this function returns the corresponding
// dyn.Path. Otherwise, it returns false.
func PureReferenceToPath(s string) (dyn.Path, bool) {
ref, ok := newRef(dyn.V(s))
if !ok {
return nil, false
}
if !ref.isPure() {
return nil, false
}
p, err := dyn.NewPathFromString(ref.references()[0])
if err != nil {
return nil, false
}
return p, true
}

View File

@ -51,3 +51,34 @@ func TestIsPureVariableReference(t *testing.T) {
assert.False(t, IsPureVariableReference("prefix ${foo.bar}")) assert.False(t, IsPureVariableReference("prefix ${foo.bar}"))
assert.True(t, IsPureVariableReference("${foo.bar}")) assert.True(t, IsPureVariableReference("${foo.bar}"))
} }
func TestPureReferenceToPath(t *testing.T) {
for _, tc := range []struct {
in string
out string
ok bool
}{
{"${foo.bar}", "foo.bar", true},
{"${foo.bar.baz}", "foo.bar.baz", true},
{"${foo.bar.baz[0]}", "foo.bar.baz[0]", true},
{"${foo.bar.baz[0][1]}", "foo.bar.baz[0][1]", true},
{"${foo.bar.baz[0][1].qux}", "foo.bar.baz[0][1].qux", true},
{"${foo.one}${foo.two}", "", false},
{"prefix ${foo.bar}", "", false},
{"${foo.bar} suffix", "", false},
{"${foo.bar", "", false},
{"foo.bar}", "", false},
{"foo.bar", "", false},
{"{foo.bar}", "", false},
{"", "", false},
} {
path, ok := PureReferenceToPath(tc.in)
if tc.ok {
assert.True(t, ok)
assert.Equal(t, dyn.MustPathFromString(tc.out), path)
} else {
assert.False(t, ok)
}
}
}