databricks-cli/bundle/libraries/upload.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

207 lines
5.3 KiB
Go
Raw Normal View History

package libraries
import (
"context"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
"golang.org/x/sync/errgroup"
)
// The Files API backend has a rate limit of 10 concurrent
// requests and 100 QPS. We limit the number of concurrent requests to 5 to
// avoid hitting the rate limit.
var maxFilesRequestsInFlight = 5
func Upload() bundle.Mutator {
return &upload{}
}
func UploadWithClient(client filer.Filer) bundle.Mutator {
return &upload{
client: client,
}
}
type upload struct {
client filer.Filer
}
type configLocation struct {
configPath dyn.Path
location dyn.Location
}
// Collect all libraries from the bundle configuration and their config paths.
// By this stage all glob references are expanded and we have a list of all libraries that need to be uploaded.
// We collect them from task libraries, foreach task libraries, environment dependencies, and artifacts.
// We return a map of library source to a list of config paths and locations where the library is used.
// We use map so we don't upload the same library multiple times.
// Instead we upload it once and update all the config paths to point to the uploaded location.
func collectLocalLibraries(b *bundle.Bundle) (map[string][]configLocation, error) {
libs := make(map[string]([]configLocation))
patterns := []dyn.Pattern{
taskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("whl")),
taskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("jar")),
forEachTaskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("whl")),
forEachTaskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("jar")),
envDepsPattern.Append(dyn.AnyIndex()),
}
for _, pattern := range patterns {
err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
return dyn.MapByPattern(v, pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
source, ok := v.AsString()
if !ok {
return v, fmt.Errorf("expected string, got %s", v.Kind())
}
if !IsLibraryLocal(source) {
return v, nil
}
source = filepath.Join(b.SyncRootPath, source)
libs[source] = append(libs[source], configLocation{
configPath: p,
location: v.Location(),
})
return v, nil
})
})
if err != nil {
return nil, err
}
}
artifactPattern := dyn.NewPattern(
dyn.Key("artifacts"),
dyn.AnyKey(),
dyn.Key("files"),
dyn.AnyIndex(),
)
err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
return dyn.MapByPattern(v, artifactPattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
file, ok := v.AsMap()
if !ok {
return v, fmt.Errorf("expected map, got %s", v.Kind())
}
sv, ok := file.GetByString("source")
if !ok {
return v, nil
}
source, ok := sv.AsString()
if !ok {
return v, fmt.Errorf("expected string, got %s", v.Kind())
}
libs[source] = append(libs[source], configLocation{
configPath: p.Append(dyn.Key("remote_path")),
location: v.Location(),
})
return v, nil
})
})
if err != nil {
return nil, err
}
return libs, nil
}
func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
Add DABs support for Unity Catalog volumes (#1762) ## Changes This PR adds support for UC volumes to DABs. ### Can I use a UC volume managed by DABs in `artifact_path`? Yes, but we require the volume to exist before being referenced in `artifact_path`. Otherwise you'll see an error that the volume does not exist. For this case, this PR also adds a warning if we detect that the UC volume is defined in the DAB itself, which informs the user to deploy the UC volume in a separate deployment first before using it in `artifact_path`. We cannot create the UC volume and then upload the artifacts to it in the same `bundle deploy` because `bundle deploy` always uploads the artifacts to `artifact_path` before materializing any resources defined in the bundle. Supporting this in a single deployment requires us to migrate away from our dependency on the Databricks Terraform provider to manage the CRUD lifecycle of DABs resources. ### Why do we not support `preset.name_prefix` for UC volumes? UC volumes will not have a `dev_shreyas_goenka` prefix added in `mode: development`. Configuring `presets.name_prefix` will be a no-op for UC volumes. We have decided not to support prefixing for UC resources. This is because: 1. UC provides its own namespace hierarchy that is independent of DABs. 2. Users can always manually use `${workspace.current_user.short_name}` to configure the prefixes manually. Customers often manually set up a UC hierarchy for dev and prod, including a schema or catalog per developer. Thus, it's often unnecessary for us to add prefixing in `mode: development` by default for UC resources. In retrospect, supporting prefixing for UC schemas and registered models was a mistake and will be removed in a future release of DABs. ## Tests Unit, integration test, and manually. ### Manual Testing cases: 1. UC volume does not exist: ``` ➜ bundle-playground git:(master) ✗ cli bundle deploy Error: failed to fetch metadata for the UC volume /Volumes/main/caps/my_volume that is configured in the artifact_path: Not Found ``` 2. UC Volume does not exist, but is defined in the DAB ``` ➜ bundle-playground git:(master) ✗ cli bundle deploy Error: failed to fetch metadata for the UC volume /Volumes/main/caps/managed_by_dab that is configured in the artifact_path: Not Found Warning: You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path. at resources.volumes.bar in databricks.yml:24:7 ``` --------- Co-authored-by: Pieter Noordhuis <pieter.noordhuis@databricks.com>
2024-12-02 21:18:07 +00:00
client, uploadPath, diags := GetFilerForLibraries(ctx, b)
if diags.HasError() {
return diags
}
Add DABs support for Unity Catalog volumes (#1762) ## Changes This PR adds support for UC volumes to DABs. ### Can I use a UC volume managed by DABs in `artifact_path`? Yes, but we require the volume to exist before being referenced in `artifact_path`. Otherwise you'll see an error that the volume does not exist. For this case, this PR also adds a warning if we detect that the UC volume is defined in the DAB itself, which informs the user to deploy the UC volume in a separate deployment first before using it in `artifact_path`. We cannot create the UC volume and then upload the artifacts to it in the same `bundle deploy` because `bundle deploy` always uploads the artifacts to `artifact_path` before materializing any resources defined in the bundle. Supporting this in a single deployment requires us to migrate away from our dependency on the Databricks Terraform provider to manage the CRUD lifecycle of DABs resources. ### Why do we not support `preset.name_prefix` for UC volumes? UC volumes will not have a `dev_shreyas_goenka` prefix added in `mode: development`. Configuring `presets.name_prefix` will be a no-op for UC volumes. We have decided not to support prefixing for UC resources. This is because: 1. UC provides its own namespace hierarchy that is independent of DABs. 2. Users can always manually use `${workspace.current_user.short_name}` to configure the prefixes manually. Customers often manually set up a UC hierarchy for dev and prod, including a schema or catalog per developer. Thus, it's often unnecessary for us to add prefixing in `mode: development` by default for UC resources. In retrospect, supporting prefixing for UC schemas and registered models was a mistake and will be removed in a future release of DABs. ## Tests Unit, integration test, and manually. ### Manual Testing cases: 1. UC volume does not exist: ``` ➜ bundle-playground git:(master) ✗ cli bundle deploy Error: failed to fetch metadata for the UC volume /Volumes/main/caps/my_volume that is configured in the artifact_path: Not Found ``` 2. UC Volume does not exist, but is defined in the DAB ``` ➜ bundle-playground git:(master) ✗ cli bundle deploy Error: failed to fetch metadata for the UC volume /Volumes/main/caps/managed_by_dab that is configured in the artifact_path: Not Found Warning: You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path. at resources.volumes.bar in databricks.yml:24:7 ``` --------- Co-authored-by: Pieter Noordhuis <pieter.noordhuis@databricks.com>
2024-12-02 21:18:07 +00:00
// Only set the filer client if it's not already set. We use the client field
// in the mutator to mock the filer client in testing
if u.client == nil {
Add DABs support for Unity Catalog volumes (#1762) ## Changes This PR adds support for UC volumes to DABs. ### Can I use a UC volume managed by DABs in `artifact_path`? Yes, but we require the volume to exist before being referenced in `artifact_path`. Otherwise you'll see an error that the volume does not exist. For this case, this PR also adds a warning if we detect that the UC volume is defined in the DAB itself, which informs the user to deploy the UC volume in a separate deployment first before using it in `artifact_path`. We cannot create the UC volume and then upload the artifacts to it in the same `bundle deploy` because `bundle deploy` always uploads the artifacts to `artifact_path` before materializing any resources defined in the bundle. Supporting this in a single deployment requires us to migrate away from our dependency on the Databricks Terraform provider to manage the CRUD lifecycle of DABs resources. ### Why do we not support `preset.name_prefix` for UC volumes? UC volumes will not have a `dev_shreyas_goenka` prefix added in `mode: development`. Configuring `presets.name_prefix` will be a no-op for UC volumes. We have decided not to support prefixing for UC resources. This is because: 1. UC provides its own namespace hierarchy that is independent of DABs. 2. Users can always manually use `${workspace.current_user.short_name}` to configure the prefixes manually. Customers often manually set up a UC hierarchy for dev and prod, including a schema or catalog per developer. Thus, it's often unnecessary for us to add prefixing in `mode: development` by default for UC resources. In retrospect, supporting prefixing for UC schemas and registered models was a mistake and will be removed in a future release of DABs. ## Tests Unit, integration test, and manually. ### Manual Testing cases: 1. UC volume does not exist: ``` ➜ bundle-playground git:(master) ✗ cli bundle deploy Error: failed to fetch metadata for the UC volume /Volumes/main/caps/my_volume that is configured in the artifact_path: Not Found ``` 2. UC Volume does not exist, but is defined in the DAB ``` ➜ bundle-playground git:(master) ✗ cli bundle deploy Error: failed to fetch metadata for the UC volume /Volumes/main/caps/managed_by_dab that is configured in the artifact_path: Not Found Warning: You might be using a UC volume in your artifact_path that is managed by this bundle but which has not been deployed yet. Please deploy the UC volume in a separate bundle deploy before using it in the artifact_path. at resources.volumes.bar in databricks.yml:24:7 ``` --------- Co-authored-by: Pieter Noordhuis <pieter.noordhuis@databricks.com>
2024-12-02 21:18:07 +00:00
u.client = client
}
libs, err := collectLocalLibraries(b)
if err != nil {
return diag.FromErr(err)
}
errs, errCtx := errgroup.WithContext(ctx)
errs.SetLimit(maxFilesRequestsInFlight)
for source := range libs {
errs.Go(func() error {
return UploadFile(errCtx, source, u.client)
})
}
if err := errs.Wait(); err != nil {
return diag.FromErr(err)
}
// Update all the config paths to point to the uploaded location
for source, locations := range libs {
err = b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
remotePath := path.Join(uploadPath, filepath.Base(source))
// If the remote path does not start with /Workspace or /Volumes, prepend /Workspace
if !strings.HasPrefix(remotePath, "/Workspace") && !strings.HasPrefix(remotePath, "/Volumes") {
remotePath = "/Workspace" + remotePath
}
for _, location := range locations {
v, err = dyn.SetByPath(v, location.configPath, dyn.NewValue(remotePath, []dyn.Location{location.location}))
if err != nil {
return v, err
}
}
return v, nil
})
if err != nil {
diags = diags.Extend(diag.FromErr(err))
}
}
return diags
}
func (u *upload) Name() string {
return "libraries.Upload"
}
// Function to upload file (a library, artifact and etc) to Workspace or UC volume
func UploadFile(ctx context.Context, file string, client filer.Filer) error {
filename := filepath.Base(file)
cmdio.LogString(ctx, fmt.Sprintf("Uploading %s...", filename))
f, err := os.Open(file)
if err != nil {
return fmt.Errorf("unable to open %s: %w", file, errors.Unwrap(err))
}
defer f.Close()
err = client.Write(ctx, filename, f, filer.OverwriteIfExists, filer.CreateParentDirectories)
if err != nil {
return fmt.Errorf("unable to import %s: %w", filename, err)
}
log.Infof(ctx, "Upload succeeded")
return nil
}