mirror of https://github.com/databricks/cli.git
split into filer files
This commit is contained in:
parent
a9b8575bc3
commit
c5a02ef8fb
|
@ -0,0 +1,29 @@
|
||||||
|
package libraries
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/bundle"
|
||||||
|
"github.com/databricks/cli/libs/diag"
|
||||||
|
"github.com/databricks/cli/libs/filer"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This function returns the right filer to use, to upload artifacts to the configured location.
|
||||||
|
// Supported locations:
|
||||||
|
// 1. WSFS
|
||||||
|
// 2. UC volumes
|
||||||
|
func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) {
|
||||||
|
artifactPath := b.Config.Workspace.ArtifactPath
|
||||||
|
if artifactPath == "" {
|
||||||
|
return nil, "", diag.Errorf("remote artifact path not configured")
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case strings.HasPrefix(artifactPath, "/Volumes/"):
|
||||||
|
return filerForVolume(ctx, b)
|
||||||
|
|
||||||
|
default:
|
||||||
|
return filerForWorkspace(b)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,63 @@
|
||||||
|
package libraries
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/bundle"
|
||||||
|
"github.com/databricks/cli/bundle/config"
|
||||||
|
"github.com/databricks/cli/libs/filer"
|
||||||
|
sdkconfig "github.com/databricks/databricks-sdk-go/config"
|
||||||
|
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestGetFilerForLibrariesValidWsfs(t *testing.T) {
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Workspace: config.Workspace{
|
||||||
|
ArtifactPath: "/foo/bar/artifacts",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
client, uploadPath, diags := GetFilerForLibraries(context.Background(), b)
|
||||||
|
require.NoError(t, diags.Error())
|
||||||
|
assert.Equal(t, "/foo/bar/artifacts/.internal", uploadPath)
|
||||||
|
|
||||||
|
assert.IsType(t, &filer.WorkspaceFilesClient{}, client)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFilerForLibrariesValidUcVolume(t *testing.T) {
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Workspace: config.Workspace{
|
||||||
|
ArtifactPath: "/Volumes/main/my_schema/my_volume",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
m := mocks.NewMockWorkspaceClient(t)
|
||||||
|
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
||||||
|
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil)
|
||||||
|
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||||
|
|
||||||
|
client, uploadPath, diags := GetFilerForLibraries(context.Background(), b)
|
||||||
|
require.NoError(t, diags.Error())
|
||||||
|
assert.Equal(t, "/Volumes/main/my_schema/my_volume/.internal", uploadPath)
|
||||||
|
|
||||||
|
assert.IsType(t, &filer.FilesClient{}, client)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetFilerForLibrariesRemotePathNotSet(t *testing.T) {
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Workspace: config.Workspace{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, diags := GetFilerForLibraries(context.Background(), b)
|
||||||
|
require.EqualError(t, diags.Error(), "remote artifact path not configured")
|
||||||
|
}
|
|
@ -0,0 +1,94 @@
|
||||||
|
package libraries
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/bundle"
|
||||||
|
"github.com/databricks/cli/libs/diag"
|
||||||
|
"github.com/databricks/cli/libs/dyn"
|
||||||
|
"github.com/databricks/cli/libs/dyn/dynvar"
|
||||||
|
"github.com/databricks/cli/libs/filer"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This function returns a filer for ".internal" folder inside the directory configured
|
||||||
|
// at `workspace.artifact_path`.
|
||||||
|
// This function also checks if the UC volume exists in the workspace and then:
|
||||||
|
// 1. If the UC volume exists in the workspace:
|
||||||
|
// Returns a filer for the UC volume.
|
||||||
|
// 2. If the UC volume does not exist in the workspace but is (with high confidence) defined in
|
||||||
|
// the bundle configuration:
|
||||||
|
// Returns an error and a warning that instructs the user to deploy the
|
||||||
|
// UC volume before using it in the artifact path.
|
||||||
|
// 3. If the UC volume does not exist in the workspace and is not defined in the bundle configuration:
|
||||||
|
// Returns an error.
|
||||||
|
func filerForVolume(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) {
|
||||||
|
artifactPath := b.Config.Workspace.ArtifactPath
|
||||||
|
|
||||||
|
w := b.WorkspaceClient()
|
||||||
|
|
||||||
|
parts := strings.Split(artifactPath, "/")
|
||||||
|
volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", artifactPath)
|
||||||
|
|
||||||
|
// Incorrect format.
|
||||||
|
if len(parts) < 5 {
|
||||||
|
return nil, "", diag.FromErr(volumeFormatErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
catalogName := parts[2]
|
||||||
|
schemaName := parts[3]
|
||||||
|
volumeName := parts[4]
|
||||||
|
|
||||||
|
// Incorrect format.
|
||||||
|
if catalogName == "" || schemaName == "" || volumeName == "" {
|
||||||
|
return nil, "", diag.FromErr(volumeFormatErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the UC volume exists in the workspace.
|
||||||
|
volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName)
|
||||||
|
err := w.Files.GetDirectoryMetadataByDirectoryPath(ctx, volumePath)
|
||||||
|
|
||||||
|
// If the volume exists already, directly return the filer for the path to
|
||||||
|
// upload the artifacts to.
|
||||||
|
if err == nil {
|
||||||
|
uploadPath := path.Join(artifactPath, ".internal")
|
||||||
|
f, err := filer.NewFilesClient(w, uploadPath)
|
||||||
|
return f, uploadPath, diag.FromErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
diags := diag.Errorf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path: %s", volumePath, err)
|
||||||
|
|
||||||
|
path, locations, ok := findVolumeInBundle(b, catalogName, schemaName, volumeName)
|
||||||
|
if !ok {
|
||||||
|
return nil, "", diags
|
||||||
|
}
|
||||||
|
|
||||||
|
warning := diag.Diagnostic{
|
||||||
|
Severity: diag.Warning,
|
||||||
|
Summary: `You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.`,
|
||||||
|
Locations: locations,
|
||||||
|
Paths: []dyn.Path{path},
|
||||||
|
}
|
||||||
|
return nil, "", diags.Append(warning)
|
||||||
|
}
|
||||||
|
|
||||||
|
func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) {
|
||||||
|
volumes := b.Config.Resources.Volumes
|
||||||
|
for k, v := range volumes {
|
||||||
|
if v.CatalogName != catalogName || v.Name != volumeName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// UC schemas can be defined in the bundle itself, and thus might be interpolated
|
||||||
|
// at runtime via the ${resources.schemas.<name>} syntax. Thus we match the volume
|
||||||
|
// definition if the schema name is the same as the one in the bundle, or if the
|
||||||
|
// schema name is interpolated.
|
||||||
|
if v.SchemaName != schemaName && !dynvar.IsPureVariableReference(v.SchemaName) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pathString := fmt.Sprintf("resources.volumes.%s", k)
|
||||||
|
return dyn.MustPathFromString(pathString), b.Config.GetLocations(pathString), true
|
||||||
|
}
|
||||||
|
return nil, nil, false
|
||||||
|
}
|
|
@ -0,0 +1,204 @@
|
||||||
|
package libraries
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"path"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/bundle"
|
||||||
|
"github.com/databricks/cli/bundle/config"
|
||||||
|
"github.com/databricks/cli/bundle/config/resources"
|
||||||
|
"github.com/databricks/cli/bundle/internal/bundletest"
|
||||||
|
"github.com/databricks/cli/libs/diag"
|
||||||
|
"github.com/databricks/cli/libs/dyn"
|
||||||
|
"github.com/databricks/cli/libs/filer"
|
||||||
|
sdkconfig "github.com/databricks/databricks-sdk-go/config"
|
||||||
|
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
||||||
|
"github.com/databricks/databricks-sdk-go/service/catalog"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFindVolumeInBundle(t *testing.T) {
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Resources: config.Resources{
|
||||||
|
Volumes: map[string]*resources.Volume{
|
||||||
|
"foo": {
|
||||||
|
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
|
||||||
|
CatalogName: "main",
|
||||||
|
Name: "my_volume",
|
||||||
|
SchemaName: "my_schema",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{
|
||||||
|
{
|
||||||
|
File: "volume.yml",
|
||||||
|
Line: 1,
|
||||||
|
Column: 2,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// volume is in DAB.
|
||||||
|
path, locations, ok := findVolumeInBundle(b, "main", "my_schema", "my_volume")
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, []dyn.Location{{
|
||||||
|
File: "volume.yml",
|
||||||
|
Line: 1,
|
||||||
|
Column: 2,
|
||||||
|
}}, locations)
|
||||||
|
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
|
||||||
|
|
||||||
|
// wrong volume name
|
||||||
|
_, _, ok = findVolumeInBundle(b, "main", "my_schema", "doesnotexist")
|
||||||
|
assert.False(t, ok)
|
||||||
|
|
||||||
|
// wrong schema name
|
||||||
|
_, _, ok = findVolumeInBundle(b, "main", "doesnotexist", "my_volume")
|
||||||
|
assert.False(t, ok)
|
||||||
|
|
||||||
|
// wrong catalog name
|
||||||
|
_, _, ok = findVolumeInBundle(b, "doesnotexist", "my_schema", "my_volume")
|
||||||
|
assert.False(t, ok)
|
||||||
|
|
||||||
|
// schema name is interpolated.
|
||||||
|
b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema}"
|
||||||
|
path, locations, ok = findVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume")
|
||||||
|
assert.True(t, ok)
|
||||||
|
assert.Equal(t, []dyn.Location{{
|
||||||
|
File: "volume.yml",
|
||||||
|
Line: 1,
|
||||||
|
Column: 2,
|
||||||
|
}}, locations)
|
||||||
|
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFilerForVolumeNotInBundle(t *testing.T) {
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Workspace: config.Workspace{
|
||||||
|
ArtifactPath: "/Volumes/main/my_schema/doesnotexist",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
m := mocks.NewMockWorkspaceClient(t)
|
||||||
|
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
||||||
|
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/doesnotexist").Return(fmt.Errorf("error from API"))
|
||||||
|
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||||
|
|
||||||
|
_, _, diags := filerForVolume(context.Background(), b)
|
||||||
|
assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/doesnotexist that is configured in the artifact_path: error from API")
|
||||||
|
assert.Len(t, diags, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFilerForVolumeInBundle(t *testing.T) {
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Workspace: config.Workspace{
|
||||||
|
ArtifactPath: "/Volumes/main/my_schema/my_volume",
|
||||||
|
},
|
||||||
|
Resources: config.Resources{
|
||||||
|
Volumes: map[string]*resources.Volume{
|
||||||
|
"foo": {
|
||||||
|
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
|
||||||
|
CatalogName: "main",
|
||||||
|
Name: "my_volume",
|
||||||
|
VolumeType: "MANAGED",
|
||||||
|
SchemaName: "my_schema",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{
|
||||||
|
{
|
||||||
|
File: "volume.yml",
|
||||||
|
Line: 1,
|
||||||
|
Column: 2,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
m := mocks.NewMockWorkspaceClient(t)
|
||||||
|
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
||||||
|
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(fmt.Errorf("error from API"))
|
||||||
|
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||||
|
|
||||||
|
_, _, diags := GetFilerForLibraries(context.Background(), b)
|
||||||
|
assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/my_volume that is configured in the artifact_path: error from API")
|
||||||
|
assert.Contains(t, diags, diag.Diagnostic{
|
||||||
|
Severity: diag.Warning,
|
||||||
|
Summary: "You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.",
|
||||||
|
Locations: []dyn.Location{{
|
||||||
|
File: "volume.yml",
|
||||||
|
Line: 1,
|
||||||
|
Column: 2,
|
||||||
|
}},
|
||||||
|
Paths: []dyn.Path{dyn.MustPathFromString("resources.volumes.foo")},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFilerForVolumeWithInvalidVolumePaths(t *testing.T) {
|
||||||
|
invalidPaths := []string{
|
||||||
|
"/Volumes/",
|
||||||
|
"/Volumes/main",
|
||||||
|
"/Volumes/main/",
|
||||||
|
"/Volumes/main//",
|
||||||
|
"/Volumes/main//my_schema",
|
||||||
|
"/Volumes/main/my_schema",
|
||||||
|
"/Volumes/main/my_schema/",
|
||||||
|
"/Volumes/main/my_schema//",
|
||||||
|
"/Volumes//my_schema/my_volume",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range invalidPaths {
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Workspace: config.Workspace{
|
||||||
|
ArtifactPath: p,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, diags := GetFilerForLibraries(context.Background(), b)
|
||||||
|
require.EqualError(t, diags.Error(), fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", path.Join(p, ".internal")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFilerForVolumeWithValidlVolumePaths(t *testing.T) {
|
||||||
|
validPaths := []string{
|
||||||
|
"/Volumes/main/my_schema/my_volume",
|
||||||
|
"/Volumes/main/my_schema/my_volume/",
|
||||||
|
"/Volumes/main/my_schema/my_volume/a/b/c",
|
||||||
|
"/Volumes/main/my_schema/my_volume/a/a/a",
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range validPaths {
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Workspace: config.Workspace{
|
||||||
|
ArtifactPath: p,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
m := mocks.NewMockWorkspaceClient(t)
|
||||||
|
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
||||||
|
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil)
|
||||||
|
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||||
|
|
||||||
|
client, uploadPath, diags := filerForVolume(context.Background(), b)
|
||||||
|
require.NoError(t, diags.Error())
|
||||||
|
assert.Equal(t, path.Join(p, ".internal"), uploadPath)
|
||||||
|
assert.IsType(t, &filer.FilesClient{}, client)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
package libraries
|
||||||
|
|
||||||
|
import (
|
||||||
|
"path"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/bundle"
|
||||||
|
"github.com/databricks/cli/libs/diag"
|
||||||
|
"github.com/databricks/cli/libs/filer"
|
||||||
|
)
|
||||||
|
|
||||||
|
func filerForWorkspace(b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) {
|
||||||
|
uploadPath := path.Join(b.Config.Workspace.ArtifactPath, ".internal")
|
||||||
|
f, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), uploadPath)
|
||||||
|
return f, uploadPath, diag.FromErr(err)
|
||||||
|
}
|
|
@ -13,7 +13,6 @@ import (
|
||||||
"github.com/databricks/cli/libs/cmdio"
|
"github.com/databricks/cli/libs/cmdio"
|
||||||
"github.com/databricks/cli/libs/diag"
|
"github.com/databricks/cli/libs/diag"
|
||||||
"github.com/databricks/cli/libs/dyn"
|
"github.com/databricks/cli/libs/dyn"
|
||||||
"github.com/databricks/cli/libs/dyn/dynvar"
|
|
||||||
"github.com/databricks/cli/libs/filer"
|
"github.com/databricks/cli/libs/filer"
|
||||||
"github.com/databricks/cli/libs/log"
|
"github.com/databricks/cli/libs/log"
|
||||||
|
|
||||||
|
@ -189,101 +188,6 @@ func (u *upload) Name() string {
|
||||||
return "libraries.Upload"
|
return "libraries.Upload"
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function returns the right filer to use, to upload artifacts to the configured location.
|
|
||||||
// Supported locations:
|
|
||||||
// 1. WSFS
|
|
||||||
// 2. UC volumes
|
|
||||||
//
|
|
||||||
// If a UC Volume is configured, this function checks if the UC volume exists in the workspace.
|
|
||||||
// Then:
|
|
||||||
// 1. If the UC volume exists in the workspace:
|
|
||||||
// Returns a filer for the UC volume.
|
|
||||||
// 2. If the UC volume does not exist in the workspace but is (with high confidence) defined in
|
|
||||||
// the bundle configuration:
|
|
||||||
// Returns an error and a warning that instructs the user to deploy the
|
|
||||||
// UC volume before using it in the artifact path.
|
|
||||||
// 3. If the UC volume does not exist in the workspace and is not defined in the bundle configuration:
|
|
||||||
// Returns an error.
|
|
||||||
func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle) (filer.Filer, string, diag.Diagnostics) {
|
|
||||||
artifactPath := b.Config.Workspace.ArtifactPath
|
|
||||||
if artifactPath == "" {
|
|
||||||
return nil, "", diag.Errorf("remote artifact path not configured")
|
|
||||||
}
|
|
||||||
|
|
||||||
w := b.WorkspaceClient()
|
|
||||||
isVolumesPath := strings.HasPrefix(artifactPath, "/Volumes/")
|
|
||||||
|
|
||||||
// Path to upload artifact files to.
|
|
||||||
uploadPath := path.Join(artifactPath, ".internal")
|
|
||||||
|
|
||||||
// Return early with a WSFS filer if the artifact path is not a UC volume path.
|
|
||||||
if !isVolumesPath {
|
|
||||||
f, err := filer.NewWorkspaceFilesClient(w, uploadPath)
|
|
||||||
return f, uploadPath, diag.FromErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
parts := strings.Split(artifactPath, "/")
|
|
||||||
volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", uploadPath)
|
|
||||||
|
|
||||||
// Incorrect format.
|
|
||||||
if len(parts) < 5 {
|
|
||||||
return nil, "", diag.FromErr(volumeFormatErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
catalogName := parts[2]
|
|
||||||
schemaName := parts[3]
|
|
||||||
volumeName := parts[4]
|
|
||||||
|
|
||||||
// Incorrect format.
|
|
||||||
if catalogName == "" || schemaName == "" || volumeName == "" {
|
|
||||||
return nil, "", diag.FromErr(volumeFormatErr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the UC volume exists in the workspace.
|
|
||||||
volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName)
|
|
||||||
err := w.Files.GetDirectoryMetadataByDirectoryPath(ctx, volumePath)
|
|
||||||
|
|
||||||
// If the volume exists already, directly return the filer for the upload path.
|
|
||||||
if err == nil {
|
|
||||||
f, err := filer.NewFilesClient(w, uploadPath)
|
|
||||||
return f, uploadPath, diag.FromErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
diags := diag.Errorf("failed to fetch metadata for the UC volume %s that is configured in the artifact_path: %s", volumePath, err)
|
|
||||||
|
|
||||||
path, locations, ok := findVolumeInBundle(b, catalogName, schemaName, volumeName)
|
|
||||||
if !ok {
|
|
||||||
return nil, "", diags
|
|
||||||
}
|
|
||||||
|
|
||||||
warning := diag.Diagnostic{
|
|
||||||
Severity: diag.Warning,
|
|
||||||
Summary: `You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.`,
|
|
||||||
Locations: locations,
|
|
||||||
Paths: []dyn.Path{path},
|
|
||||||
}
|
|
||||||
return nil, "", diags.Append(warning)
|
|
||||||
}
|
|
||||||
|
|
||||||
func findVolumeInBundle(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Path, []dyn.Location, bool) {
|
|
||||||
volumes := b.Config.Resources.Volumes
|
|
||||||
for k, v := range volumes {
|
|
||||||
if v.CatalogName != catalogName || v.Name != volumeName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// UC schemas can be defined in the bundle itself, and thus might be interpolated
|
|
||||||
// at runtime via the ${resources.schemas.<name>} syntax. Thus we match the volume
|
|
||||||
// definition if the schema name is the same as the one in the bundle, or if the
|
|
||||||
// schema name is interpolated.
|
|
||||||
if v.SchemaName != schemaName && !dynvar.IsPureVariableReference(v.SchemaName) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
pathString := fmt.Sprintf("resources.volumes.%s", k)
|
|
||||||
return dyn.MustPathFromString(pathString), b.Config.GetLocations(pathString), true
|
|
||||||
}
|
|
||||||
return nil, nil, false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Function to upload file (a library, artifact and etc) to Workspace or UC volume
|
// Function to upload file (a library, artifact and etc) to Workspace or UC volume
|
||||||
func UploadFile(ctx context.Context, file string, client filer.Filer) error {
|
func UploadFile(ctx context.Context, file string, client filer.Filer) error {
|
||||||
filename := filepath.Base(file)
|
filename := filepath.Base(file)
|
||||||
|
|
|
@ -2,26 +2,19 @@ package libraries
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"path"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/databricks/cli/bundle"
|
"github.com/databricks/cli/bundle"
|
||||||
"github.com/databricks/cli/bundle/config"
|
"github.com/databricks/cli/bundle/config"
|
||||||
"github.com/databricks/cli/bundle/config/resources"
|
"github.com/databricks/cli/bundle/config/resources"
|
||||||
"github.com/databricks/cli/bundle/internal/bundletest"
|
|
||||||
mockfiler "github.com/databricks/cli/internal/mocks/libs/filer"
|
mockfiler "github.com/databricks/cli/internal/mocks/libs/filer"
|
||||||
"github.com/databricks/cli/internal/testutil"
|
"github.com/databricks/cli/internal/testutil"
|
||||||
"github.com/databricks/cli/libs/diag"
|
|
||||||
"github.com/databricks/cli/libs/dyn"
|
|
||||||
"github.com/databricks/cli/libs/filer"
|
"github.com/databricks/cli/libs/filer"
|
||||||
sdkconfig "github.com/databricks/databricks-sdk-go/config"
|
sdkconfig "github.com/databricks/databricks-sdk-go/config"
|
||||||
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
||||||
"github.com/databricks/databricks-sdk-go/service/catalog"
|
|
||||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -343,200 +336,3 @@ func TestUploadMultipleLibraries(t *testing.T) {
|
||||||
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source4.whl")
|
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source4.whl")
|
||||||
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/Users/foo@bar.com/mywheel.whl")
|
require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/Users/foo@bar.com/mywheel.whl")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFindVolumeInBundle(t *testing.T) {
|
|
||||||
b := &bundle.Bundle{
|
|
||||||
Config: config.Root{
|
|
||||||
Resources: config.Resources{
|
|
||||||
Volumes: map[string]*resources.Volume{
|
|
||||||
"foo": {
|
|
||||||
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
|
|
||||||
CatalogName: "main",
|
|
||||||
Name: "my_volume",
|
|
||||||
SchemaName: "my_schema",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{
|
|
||||||
{
|
|
||||||
File: "volume.yml",
|
|
||||||
Line: 1,
|
|
||||||
Column: 2,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// volume is in DAB.
|
|
||||||
path, locations, ok := findVolumeInBundle(b, "main", "my_schema", "my_volume")
|
|
||||||
assert.True(t, ok)
|
|
||||||
assert.Equal(t, []dyn.Location{{
|
|
||||||
File: "volume.yml",
|
|
||||||
Line: 1,
|
|
||||||
Column: 2,
|
|
||||||
}}, locations)
|
|
||||||
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
|
|
||||||
|
|
||||||
// wrong volume name
|
|
||||||
_, _, ok = findVolumeInBundle(b, "main", "my_schema", "doesnotexist")
|
|
||||||
assert.False(t, ok)
|
|
||||||
|
|
||||||
// wrong schema name
|
|
||||||
_, _, ok = findVolumeInBundle(b, "main", "doesnotexist", "my_volume")
|
|
||||||
assert.False(t, ok)
|
|
||||||
|
|
||||||
// wrong catalog name
|
|
||||||
_, _, ok = findVolumeInBundle(b, "doesnotexist", "my_schema", "my_volume")
|
|
||||||
assert.False(t, ok)
|
|
||||||
|
|
||||||
// schema name is interpolated.
|
|
||||||
b.Config.Resources.Volumes["foo"].SchemaName = "${resources.schemas.my_schema}"
|
|
||||||
path, locations, ok = findVolumeInBundle(b, "main", "valuedoesnotmatter", "my_volume")
|
|
||||||
assert.True(t, ok)
|
|
||||||
assert.Equal(t, []dyn.Location{{
|
|
||||||
File: "volume.yml",
|
|
||||||
Line: 1,
|
|
||||||
Column: 2,
|
|
||||||
}}, locations)
|
|
||||||
assert.Equal(t, dyn.MustPathFromString("resources.volumes.foo"), path)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetFilerForLibrariesValidWsfs(t *testing.T) {
|
|
||||||
b := &bundle.Bundle{
|
|
||||||
Config: config.Root{
|
|
||||||
Workspace: config.Workspace{
|
|
||||||
ArtifactPath: "/foo/bar/artifacts",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
client, uploadPath, diags := GetFilerForLibraries(context.Background(), b)
|
|
||||||
require.NoError(t, diags.Error())
|
|
||||||
assert.Equal(t, "/foo/bar/artifacts/.internal", uploadPath)
|
|
||||||
|
|
||||||
assert.IsType(t, &filer.WorkspaceFilesClient{}, client)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetFilerForLibrariesValidUcVolume(t *testing.T) {
|
|
||||||
b := &bundle.Bundle{
|
|
||||||
Config: config.Root{
|
|
||||||
Workspace: config.Workspace{
|
|
||||||
ArtifactPath: "/Volumes/main/my_schema/my_volume",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
m := mocks.NewMockWorkspaceClient(t)
|
|
||||||
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
|
||||||
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(nil)
|
|
||||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
|
||||||
|
|
||||||
client, uploadPath, diags := GetFilerForLibraries(context.Background(), b)
|
|
||||||
require.NoError(t, diags.Error())
|
|
||||||
assert.Equal(t, "/Volumes/main/my_schema/my_volume/.internal", uploadPath)
|
|
||||||
|
|
||||||
assert.IsType(t, &filer.FilesClient{}, client)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetFilerForLibrariesVolumeNotInBundle(t *testing.T) {
|
|
||||||
b := &bundle.Bundle{
|
|
||||||
Config: config.Root{
|
|
||||||
Workspace: config.Workspace{
|
|
||||||
ArtifactPath: "/Volumes/main/my_schema/doesnotexist",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
m := mocks.NewMockWorkspaceClient(t)
|
|
||||||
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
|
||||||
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/doesnotexist").Return(fmt.Errorf("error from API"))
|
|
||||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
|
||||||
|
|
||||||
_, _, diags := GetFilerForLibraries(context.Background(), b)
|
|
||||||
assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/doesnotexist that is configured in the artifact_path: error from API")
|
|
||||||
assert.Len(t, diags, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetFilerForLibrariesVolumeInBundle(t *testing.T) {
|
|
||||||
b := &bundle.Bundle{
|
|
||||||
Config: config.Root{
|
|
||||||
Workspace: config.Workspace{
|
|
||||||
ArtifactPath: "/Volumes/main/my_schema/my_volume",
|
|
||||||
},
|
|
||||||
Resources: config.Resources{
|
|
||||||
Volumes: map[string]*resources.Volume{
|
|
||||||
"foo": {
|
|
||||||
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
|
|
||||||
CatalogName: "main",
|
|
||||||
Name: "my_volume",
|
|
||||||
VolumeType: "MANAGED",
|
|
||||||
SchemaName: "my_schema",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
bundletest.SetLocation(b, "resources.volumes.foo", []dyn.Location{
|
|
||||||
{
|
|
||||||
File: "volume.yml",
|
|
||||||
Line: 1,
|
|
||||||
Column: 2,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
m := mocks.NewMockWorkspaceClient(t)
|
|
||||||
m.WorkspaceClient.Config = &sdkconfig.Config{}
|
|
||||||
m.GetMockFilesAPI().EXPECT().GetDirectoryMetadataByDirectoryPath(mock.Anything, "/Volumes/main/my_schema/my_volume").Return(fmt.Errorf("error from API"))
|
|
||||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
|
||||||
|
|
||||||
_, _, diags := GetFilerForLibraries(context.Background(), b)
|
|
||||||
assert.EqualError(t, diags.Error(), "failed to fetch metadata for the UC volume /Volumes/main/my_schema/my_volume that is configured in the artifact_path: error from API")
|
|
||||||
assert.Contains(t, diags, diag.Diagnostic{
|
|
||||||
Severity: diag.Warning,
|
|
||||||
Summary: "You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path.",
|
|
||||||
Locations: []dyn.Location{{
|
|
||||||
File: "volume.yml",
|
|
||||||
Line: 1,
|
|
||||||
Column: 2,
|
|
||||||
}},
|
|
||||||
Paths: []dyn.Path{dyn.MustPathFromString("resources.volumes.foo")},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetFilerForLibrariesVolumeInBundleWithArtifactPath(t *testing.T) {
|
|
||||||
b := &bundle.Bundle{}
|
|
||||||
|
|
||||||
_, _, diags := GetFilerForLibraries(context.Background(), b)
|
|
||||||
require.EqualError(t, diags.Error(), "remote artifact path not configured")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetFilerForLibrariesWithInvalidVolumePaths(t *testing.T) {
|
|
||||||
invalidPaths := []string{
|
|
||||||
"/Volumes/",
|
|
||||||
"/Volumes/main",
|
|
||||||
"/Volumes/main/",
|
|
||||||
"/Volumes/main//",
|
|
||||||
"/Volumes/main//my_schema",
|
|
||||||
"/Volumes/main/my_schema",
|
|
||||||
"/Volumes/main/my_schema/",
|
|
||||||
"/Volumes/main/my_schema//",
|
|
||||||
"/Volumes//my_schema/my_volume",
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, p := range invalidPaths {
|
|
||||||
b := &bundle.Bundle{
|
|
||||||
Config: config.Root{
|
|
||||||
Workspace: config.Workspace{
|
|
||||||
ArtifactPath: p,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
_, _, diags := GetFilerForLibraries(context.Background(), b)
|
|
||||||
require.EqualError(t, diags.Error(), fmt.Sprintf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got %s", path.Join(p, ".internal")))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue