Compare commits

...

5 Commits

Author SHA1 Message Date
Andrew Nester 039193339f
Merge branch 'main' into feature/all-purpose-clusters 2024-09-23 12:30:13 +02:00
Andrew Nester 67557b7c84
added and fixed tests 2024-09-23 12:29:11 +02:00
Ilia Babanov ac80d3dfcb
Add verbose flag to the "bundle deploy" command (#1774)
## Changes
- Extract sync output logic from `cmd/sync` into `lib/sync`
- Add hidden `verbose` flag to the `bundle deploy` command, it's false
by default and hidden from the `--help` output
- Pass output handler to the `deploy/files/upload` mutator if the
verbose option is true

The was an idea to use in-place output overriding each past file sync
event in the output, bit that wont work for the extension, since it
doesn't display deploy logs in the terminal.

Example output:
```
~/tmp/defpy: ~/cli/cli bundle deploy --sync-progress
Building defpy...
Uploading defpy-0.0.1+20240917.112755-py3-none-any.whl...
Uploading bundle files to /Users/ilia.babanov@databricks.com/.bundle/defpy/dev/files...
Action: PUT: requirements-dev.txt, resources/defpy_pipeline.yml, pytest.ini, src/defpy/main.py, src/defpy/__init__.py, src/dlt_pipeline.ipynb, tests/main_test.py, src/notebook.ipynb, setup.py, resources/defpy_job.yml, .vscode/extensions.json, .vscode/settings.json, fixtures/.gitkeep, .vscode/__builtins__.pyi, README.md, .gitignore, databricks.yml
Uploaded tests
Uploaded resources
Uploaded fixtures
Uploaded .vscode
Uploaded src/defpy
Uploaded requirements-dev.txt
Uploaded .gitignore
Uploaded fixtures/.gitkeep
Uploaded src/defpy/__init__.py
Uploaded databricks.yml
Uploaded README.md
Uploaded setup.py
Uploaded .vscode/__builtins__.pyi
Uploaded .vscode/extensions.json
Uploaded src/dlt_pipeline.ipynb
Uploaded .vscode/settings.json
Uploaded resources/defpy_job.yml
Uploaded pytest.ini
Uploaded src/defpy/main.py
Uploaded tests/main_test.py
Uploaded resources/defpy_pipeline.yml
Uploaded src/notebook.ipynb
Initial Sync Complete
Deploying resources...
Updating deployment state...
Deployment complete!
```

Output example in the extension:
<img width="1843" alt="Screenshot 2024-09-19 at 11 07 48"
src="https://github.com/user-attachments/assets/0fafd095-cdc6-44b8-b482-27a38ada0330">


## Tests
Manually for the `sync` and `bundle deploy` commands + vscode extension
sync and deploy flows
2024-09-23 10:09:11 +00:00
Lennart Kats (databricks) 7665c639bd
Use Unity Catalog for pipelines in the default-python template (#1766)
## Summary

Enables Unity Catalog for pipelines in the default template. Pipelines
will default to non-Unity Catalog pipelines if a catalog is not
specified.

*Small caveat*: there are cases where admins lock down the default
catalog of a workspace and don't allow the creation of a new schema
there. If that happens, the pipeline would fail at runtime with a clear
error indicating what happened. ("PERMISSION_DENIED: User does not have
CREATE SCHEMA on Catalog 'main'."). I've seen this with an internal
Databricks workspace, where creating new non-UC schemas wasn't locked
down, but creation in the `main` was.

## Testing

- Validated on a non-UC + UC workspace. The catalog selection logic here
is the same as applied for the SQL templates.
2024-09-23 09:52:04 +00:00
Lennart Kats (databricks) 6c57683dc6
Reduce time until the prompt is shown for bundle run (#1727)
## Summary

Makes the `databricks bundle run` command use local state before showing
the menu prompt, which makes it show more quickly. For large/busy
workspaces this means the prompt can show 2-3 seconds earlier.
2024-09-21 06:36:47 +00:00
11 changed files with 112 additions and 53 deletions

View File

@ -67,7 +67,12 @@ func rewriteComputeIdToClusterId(v dyn.Value, p dyn.Path) (dyn.Value, diag.Diagn
// Drop the "compute_id" key. // Drop the "compute_id" key.
vout, err := dyn.Walk(nv, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { vout, err := dyn.Walk(nv, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
switch len(p) { switch len(p) {
case 0, 1: case 0:
return v, nil
case 1:
if p[0] == dyn.Key("compute_id") {
return v, dyn.ErrDrop
}
return v, nil return v, nil
case 2: case 2:
if p[1] == dyn.Key("compute_id") { if p[1] == dyn.Key("compute_id") {

View File

@ -41,8 +41,13 @@ func TestComputeIdToClusterIdInTargetOverride(t *testing.T) {
}, },
} }
diags := bundle.Apply(context.Background(), b, bundle.Seq(mutator.ComputeIdToClusterId(), mutator.SelectTarget("dev"))) diags := bundle.Apply(context.Background(), b, mutator.ComputeIdToClusterId())
assert.NoError(t, diags.Error()) assert.NoError(t, diags.Error())
assert.Empty(t, b.Config.Targets["dev"].ComputeId)
diags = diags.Extend(bundle.Apply(context.Background(), b, mutator.SelectTarget("dev")))
assert.NoError(t, diags.Error())
assert.Equal(t, "compute-id-dev", b.Config.Bundle.ClusterId) assert.Equal(t, "compute-id-dev", b.Config.Bundle.ClusterId)
assert.Empty(t, b.Config.Bundle.ComputeId) assert.Empty(t, b.Config.Bundle.ComputeId)

View File

@ -8,9 +8,12 @@ import (
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/sync"
) )
type upload struct{} type upload struct {
outputHandler sync.OutputHandler
}
func (m *upload) Name() string { func (m *upload) Name() string {
return "files.Upload" return "files.Upload"
@ -18,11 +21,18 @@ func (m *upload) Name() string {
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath)) cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath))
sync, err := GetSync(ctx, bundle.ReadOnly(b)) opts, err := GetSyncOptions(ctx, bundle.ReadOnly(b))
if err != nil { if err != nil {
return diag.FromErr(err) return diag.FromErr(err)
} }
opts.OutputHandler = m.outputHandler
sync, err := sync.New(ctx, *opts)
if err != nil {
return diag.FromErr(err)
}
defer sync.Close()
b.Files, err = sync.RunOnce(ctx) b.Files, err = sync.RunOnce(ctx)
if err != nil { if err != nil {
return diag.FromErr(err) return diag.FromErr(err)
@ -32,6 +42,6 @@ func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
return nil return nil
} }
func Upload() bundle.Mutator { func Upload(outputHandler sync.OutputHandler) bundle.Mutator {
return &upload{} return &upload{outputHandler}
} }

View File

@ -71,6 +71,7 @@ func TestConvertCluster(t *testing.T) {
"availability": "SPOT", "availability": "SPOT",
}, },
"data_security_mode": "USER_ISOLATION", "data_security_mode": "USER_ISOLATION",
"no_wait": true,
"node_type_id": "m5.xlarge", "node_type_id": "m5.xlarge",
"autoscale": map[string]any{ "autoscale": map[string]any{
"min_workers": int64(1), "min_workers": int64(1),

View File

@ -18,6 +18,7 @@ import (
"github.com/databricks/cli/bundle/python" "github.com/databricks/cli/bundle/python"
"github.com/databricks/cli/bundle/scripts" "github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/sync"
terraformlib "github.com/databricks/cli/libs/terraform" terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json" tfjson "github.com/hashicorp/terraform-json"
) )
@ -128,7 +129,7 @@ properties such as the 'catalog' or 'storage' are changed:`
} }
// The deploy phase deploys artifacts and resources. // The deploy phase deploys artifacts and resources.
func Deploy() bundle.Mutator { func Deploy(outputHandler sync.OutputHandler) bundle.Mutator {
// Core mutators that CRUD resources and modify deployment state. These // Core mutators that CRUD resources and modify deployment state. These
// mutators need informed consent if they are potentially destructive. // mutators need informed consent if they are potentially destructive.
deployCore := bundle.Defer( deployCore := bundle.Defer(
@ -157,7 +158,7 @@ func Deploy() bundle.Mutator {
libraries.ExpandGlobReferences(), libraries.ExpandGlobReferences(),
libraries.Upload(), libraries.Upload(),
python.TransformWheelTask(), python.TransformWheelTask(),
files.Upload(), files.Upload(outputHandler),
deploy.StateUpdate(), deploy.StateUpdate(),
deploy.StatePush(), deploy.StatePush(),
permissions.ApplyWorkspaceRootPermissions(), permissions.ApplyWorkspaceRootPermissions(),

View File

@ -10,6 +10,7 @@ import (
"github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/cmd/root" "github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/sync"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -25,6 +26,7 @@ func newDeployCommand() *cobra.Command {
var failOnActiveRuns bool var failOnActiveRuns bool
var clusterId string var clusterId string
var autoApprove bool var autoApprove bool
var verbose bool
cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.") cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.")
cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.") cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.")
cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.") cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.")
@ -32,6 +34,9 @@ func newDeployCommand() *cobra.Command {
cmd.Flags().StringVarP(&clusterId, "cluster-id", "c", "", "Override cluster in the deployment with the given cluster ID.") cmd.Flags().StringVarP(&clusterId, "cluster-id", "c", "", "Override cluster in the deployment with the given cluster ID.")
cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals that might be required for deployment.") cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals that might be required for deployment.")
cmd.Flags().MarkDeprecated("compute-id", "use --cluster-id instead") cmd.Flags().MarkDeprecated("compute-id", "use --cluster-id instead")
cmd.Flags().BoolVar(&verbose, "verbose", false, "Enable verbose output.")
// Verbose flag currently only affects file sync output, it's used by the vscode extension
cmd.Flags().MarkHidden("verbose")
cmd.RunE = func(cmd *cobra.Command, args []string) error { cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context() ctx := cmd.Context()
@ -56,11 +61,18 @@ func newDeployCommand() *cobra.Command {
return nil return nil
}) })
var outputHandler sync.OutputHandler
if verbose {
outputHandler = func(ctx context.Context, c <-chan sync.Event) {
sync.TextOutput(ctx, c, cmd.OutOrStdout())
}
}
diags = diags.Extend( diags = diags.Extend(
bundle.Apply(ctx, b, bundle.Seq( bundle.Apply(ctx, b, bundle.Seq(
phases.Initialize(), phases.Initialize(),
phases.Build(), phases.Build(),
phases.Deploy(), phases.Deploy(outputHandler),
)), )),
) )
} }

View File

@ -55,13 +55,7 @@ task or a Python wheel task, the second example applies.
return diags.Error() return diags.Error()
} }
diags = bundle.Apply(ctx, b, bundle.Seq( diags = bundle.Apply(ctx, b, phases.Initialize())
phases.Initialize(),
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
terraform.Load(terraform.ErrorOnEmptyState),
))
if err := diags.Error(); err != nil { if err := diags.Error(); err != nil {
return err return err
} }
@ -84,6 +78,16 @@ task or a Python wheel task, the second example applies.
return fmt.Errorf("expected a KEY of the resource to run") return fmt.Errorf("expected a KEY of the resource to run")
} }
diags = bundle.Apply(ctx, b, bundle.Seq(
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
terraform.Load(terraform.ErrorOnEmptyState),
))
if err := diags.Error(); err != nil {
return err
}
runner, err := run.Find(b, args[0]) runner, err := run.Find(b, args[0])
if err != nil { if err != nil {
return err return err

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
stdsync "sync"
"time" "time"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
@ -46,6 +45,21 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
return nil, flag.ErrHelp return nil, flag.ErrHelp
} }
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch f.output {
case flags.OutputText:
outputFunc = sync.TextOutput
case flags.OutputJSON:
outputFunc = sync.JsonOutput
}
var outputHandler sync.OutputHandler
if outputFunc != nil {
outputHandler = func(ctx context.Context, events <-chan sync.Event) {
outputFunc(ctx, events, cmd.OutOrStdout())
}
}
opts := sync.SyncOptions{ opts := sync.SyncOptions{
LocalRoot: vfs.MustNew(args[0]), LocalRoot: vfs.MustNew(args[0]),
Paths: []string{"."}, Paths: []string{"."},
@ -62,6 +76,8 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
// exist and add it to the `.gitignore` file in the root. // exist and add it to the `.gitignore` file in the root.
SnapshotBasePath: filepath.Join(args[0], ".databricks"), SnapshotBasePath: filepath.Join(args[0], ".databricks"),
WorkspaceClient: root.WorkspaceClient(cmd.Context()), WorkspaceClient: root.WorkspaceClient(cmd.Context()),
OutputHandler: outputHandler,
} }
return &opts, nil return &opts, nil
} }
@ -118,23 +134,7 @@ func New() *cobra.Command {
if err != nil { if err != nil {
return err return err
} }
defer s.Close()
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch f.output {
case flags.OutputText:
outputFunc = textOutput
case flags.OutputJSON:
outputFunc = jsonOutput
}
var wg stdsync.WaitGroup
if outputFunc != nil {
wg.Add(1)
go func() {
defer wg.Done()
outputFunc(ctx, s.Events(), cmd.OutOrStdout())
}()
}
if f.watch { if f.watch {
err = s.RunContinuous(ctx) err = s.RunContinuous(ctx)
@ -142,8 +142,6 @@ func New() *cobra.Command {
_, err = s.RunOnce(ctx) _, err = s.RunOnce(ctx)
} }
s.Close()
wg.Wait()
return err return err
} }

View File

@ -5,12 +5,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"io" "io"
"github.com/databricks/cli/libs/sync"
) )
// Read synchronization events and write them as JSON to the specified writer (typically stdout). // Read synchronization events and write them as JSON to the specified writer (typically stdout).
func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { func JsonOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
for { for {
select { select {
@ -31,7 +29,7 @@ func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) {
} }
// Read synchronization events and write them as text to the specified writer (typically stdout). // Read synchronization events and write them as text to the specified writer (typically stdout).
func textOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { func TextOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
bw := bufio.NewWriter(w) bw := bufio.NewWriter(w)
for { for {

View File

@ -3,6 +3,7 @@ package sync
import ( import (
"context" "context"
"fmt" "fmt"
stdsync "sync"
"time" "time"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
@ -15,6 +16,8 @@ import (
"github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/iam"
) )
type OutputHandler func(context.Context, <-chan Event)
type SyncOptions struct { type SyncOptions struct {
LocalRoot vfs.Path LocalRoot vfs.Path
Paths []string Paths []string
@ -34,6 +37,8 @@ type SyncOptions struct {
CurrentUser *iam.User CurrentUser *iam.User
Host string Host string
OutputHandler OutputHandler
} }
type Sync struct { type Sync struct {
@ -49,6 +54,10 @@ type Sync struct {
// Synchronization progress events are sent to this event notifier. // Synchronization progress events are sent to this event notifier.
notifier EventNotifier notifier EventNotifier
seq int seq int
// WaitGroup is automatically created when an output handler is provided in the SyncOptions.
// Close call is required to ensure the output handler goroutine handles all events in time.
outputWaitGroup *stdsync.WaitGroup
} }
// New initializes and returns a new [Sync] instance. // New initializes and returns a new [Sync] instance.
@ -106,31 +115,41 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
return nil, err return nil, err
} }
var notifier EventNotifier
var outputWaitGroup = &stdsync.WaitGroup{}
if opts.OutputHandler != nil {
ch := make(chan Event, MaxRequestsInFlight)
notifier = &ChannelNotifier{ch}
outputWaitGroup.Add(1)
go func() {
defer outputWaitGroup.Done()
opts.OutputHandler(ctx, ch)
}()
} else {
notifier = &NopNotifier{}
}
return &Sync{ return &Sync{
SyncOptions: &opts, SyncOptions: &opts,
fileSet: fileSet, fileSet: fileSet,
includeFileSet: includeFileSet, includeFileSet: includeFileSet,
excludeFileSet: excludeFileSet, excludeFileSet: excludeFileSet,
snapshot: snapshot, snapshot: snapshot,
filer: filer, filer: filer,
notifier: &NopNotifier{}, notifier: notifier,
seq: 0, outputWaitGroup: outputWaitGroup,
seq: 0,
}, nil }, nil
} }
func (s *Sync) Events() <-chan Event {
ch := make(chan Event, MaxRequestsInFlight)
s.notifier = &ChannelNotifier{ch}
return ch
}
func (s *Sync) Close() { func (s *Sync) Close() {
if s.notifier == nil { if s.notifier == nil {
return return
} }
s.notifier.Close() s.notifier.Close()
s.notifier = nil s.notifier = nil
s.outputWaitGroup.Wait()
} }
func (s *Sync) notifyStart(ctx context.Context, d diff) { func (s *Sync) notifyStart(ctx context.Context, d diff) {

View File

@ -3,6 +3,12 @@ resources:
pipelines: pipelines:
{{.project_name}}_pipeline: {{.project_name}}_pipeline:
name: {{.project_name}}_pipeline name: {{.project_name}}_pipeline
{{- if eq default_catalog ""}}
## Specify the 'catalog' field to configure this pipeline to make use of Unity Catalog:
# catalog: catalog_name
{{- else}}
catalog: {{default_catalog}}
{{- end}}
target: {{.project_name}}_${bundle.environment} target: {{.project_name}}_${bundle.environment}
libraries: libraries:
- notebook: - notebook: