mirror of https://github.com/databricks/cli.git
add filer
This commit is contained in:
parent
d47b0d6f47
commit
d180bab15d
|
@ -26,9 +26,9 @@ func (m *cleanUp) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
|
||||||
return diag.FromErr(err)
|
return diag.FromErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
client, err := libraries.GetFilerForLibraries(b.WorkspaceClient(), uploadPath)
|
client, diags := libraries.GetFilerForLibraries(ctx, b, uploadPath)
|
||||||
if err != nil {
|
if diags.HasError() {
|
||||||
return diag.FromErr(err)
|
return diags
|
||||||
}
|
}
|
||||||
|
|
||||||
// We intentionally ignore the error because it is not critical to the deployment
|
// We intentionally ignore the error because it is not critical to the deployment
|
||||||
|
|
|
@ -13,11 +13,10 @@ import (
|
||||||
"github.com/databricks/cli/libs/cmdio"
|
"github.com/databricks/cli/libs/cmdio"
|
||||||
"github.com/databricks/cli/libs/diag"
|
"github.com/databricks/cli/libs/diag"
|
||||||
"github.com/databricks/cli/libs/dyn"
|
"github.com/databricks/cli/libs/dyn"
|
||||||
|
"github.com/databricks/cli/libs/dyn/dynvar"
|
||||||
"github.com/databricks/cli/libs/filer"
|
"github.com/databricks/cli/libs/filer"
|
||||||
"github.com/databricks/cli/libs/log"
|
"github.com/databricks/cli/libs/log"
|
||||||
|
|
||||||
"github.com/databricks/databricks-sdk-go"
|
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -138,9 +137,9 @@ func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||||
// If the client is not initialized, initialize it
|
// If the client is not initialized, initialize it
|
||||||
// We use client field in mutator to allow for mocking client in testing
|
// We use client field in mutator to allow for mocking client in testing
|
||||||
if u.client == nil {
|
if u.client == nil {
|
||||||
filer, err := GetFilerForLibraries(b.WorkspaceClient(), uploadPath)
|
filer, diags := GetFilerForLibraries(ctx, b, uploadPath)
|
||||||
if err != nil {
|
if diags.HasError() {
|
||||||
return diag.FromErr(err)
|
return diags
|
||||||
}
|
}
|
||||||
|
|
||||||
u.client = filer
|
u.client = filer
|
||||||
|
@ -197,15 +196,80 @@ func (u *upload) Name() string {
|
||||||
return "libraries.Upload"
|
return "libraries.Upload"
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetFilerForLibraries(w *databricks.WorkspaceClient, uploadPath string) (filer.Filer, error) {
|
// TODO: TODO: Nicer comments here.
|
||||||
if isVolumesPath(uploadPath) {
|
// Case 1: UC volume path is valid. Return the client.
|
||||||
return filer.NewFilesClient(w, uploadPath)
|
// Case 2: invalid path.
|
||||||
}
|
// (a) Not enough elements.
|
||||||
return filer.NewWorkspaceFilesClient(w, uploadPath)
|
// (b) catalog and schema correspond to a volume define in the DAB.
|
||||||
|
//
|
||||||
|
// -> exception for when the schema value is fully or partially interpolated.
|
||||||
|
// In that case only check the catalog name.
|
||||||
|
func GetFilerForLibraries(ctx context.Context, b *bundle.Bundle, uploadPath string) (filer.Filer, diag.Diagnostics) {
|
||||||
|
w := b.WorkspaceClient()
|
||||||
|
isVolumesPath := strings.HasPrefix(uploadPath, "/Volumes/")
|
||||||
|
|
||||||
|
// If the path is not a volume path, use the workspace file system.
|
||||||
|
if !isVolumesPath {
|
||||||
|
f, err := filer.NewWorkspaceFilesClient(w, uploadPath)
|
||||||
|
return f, diag.FromErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func isVolumesPath(path string) bool {
|
parts := strings.Split(uploadPath, "/")
|
||||||
return strings.HasPrefix(path, "/Volumes/")
|
volumeFormatErr := fmt.Errorf("expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<path>, got %s", uploadPath)
|
||||||
|
if len(strings.Split(uploadPath, "/")) < 5 {
|
||||||
|
return nil, diag.FromErr(volumeFormatErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
catalogName := parts[2]
|
||||||
|
schemaName := parts[3]
|
||||||
|
volumeName := parts[4]
|
||||||
|
|
||||||
|
// Incorrect format.
|
||||||
|
if catalogName == "" || schemaName == "" || volumeName == "" {
|
||||||
|
return nil, diag.FromErr(volumeFormatErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the volume exists already, directly return the filer for the upload path.
|
||||||
|
volumePath := fmt.Sprintf("/Volumes/%s/%s/%s", catalogName, schemaName, volumeName)
|
||||||
|
vf, err := filer.NewFilesClient(w, volumePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, diag.FromErr(err)
|
||||||
|
}
|
||||||
|
if _, err := vf.Stat(ctx, "."); err == nil {
|
||||||
|
f, err := filer.NewFilesClient(w, uploadPath)
|
||||||
|
return f, diag.FromErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The volume does not exist. Check if the volume is defined in the bundle.
|
||||||
|
// TODO: Does this break? Did it work before if the volume was not defined, but
|
||||||
|
// the schema was?
|
||||||
|
l, ok := locationForVolume(b, catalogName, schemaName, volumeName)
|
||||||
|
if !ok {
|
||||||
|
return nil, diag.Errorf("UC volume %s does not exist", volumePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, diag.Errorf(`UC volume %s does not exist. Note: We detected that
|
||||||
|
you have a UC volume defined that matched the path above at %s.
|
||||||
|
Please deploy the UC volume separately before using it in as a
|
||||||
|
destination to upload artifacts.`, l, volumePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func locationForVolume(b *bundle.Bundle, catalogName, schemaName, volumeName string) (dyn.Location, bool) {
|
||||||
|
volumes := b.Config.Resources.Volumes
|
||||||
|
for k, v := range volumes {
|
||||||
|
if v.CatalogName != catalogName || v.Name != volumeName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// UC schemas can be defined in the bundle itself, and thus might be interpolated
|
||||||
|
// at runtime via the ${resources.schemas.<name>} syntax. Thus we match the volume
|
||||||
|
// definition if the schema name is the same as the one in the bundle, or if the
|
||||||
|
// schema name is interpolated.
|
||||||
|
if v.SchemaName != schemaName && !dynvar.ContainsVariableReference(v.SchemaName) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return b.Config.GetLocation(fmt.Sprintf("resources.volumes.%s", k)), true
|
||||||
|
}
|
||||||
|
return dyn.Location{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Function to upload file (a library, artifact and etc) to Workspace or UC volume
|
// Function to upload file (a library, artifact and etc) to Workspace or UC volume
|
||||||
|
|
|
@ -71,3 +71,7 @@ func (v ref) references() []string {
|
||||||
func IsPureVariableReference(s string) bool {
|
func IsPureVariableReference(s string) bool {
|
||||||
return len(s) > 0 && re.FindString(s) == s
|
return len(s) > 0 && re.FindString(s) == s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ContainsVariableReference(s string) bool {
|
||||||
|
return len(s) > 0 && re.FindString(s) != ""
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue