diff --git a/bundle/config/resources.go b/bundle/config/resources.go index 2b453c66..d0b64d1a 100644 --- a/bundle/config/resources.go +++ b/bundle/config/resources.go @@ -1,9 +1,11 @@ package config import ( + "context" "fmt" "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go" ) // Resources defines Databricks resources associated with the bundle. @@ -168,3 +170,36 @@ func (r *Resources) Merge() error { } return nil } + +type ConfigResource interface { + Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) + TerraformResourceName() string +} + +func (r *Resources) FindResourceByConfigKey(key string) (ConfigResource, error) { + found := make([]ConfigResource, 0) + for k := range r.Jobs { + if k == key { + found = append(found, r.Jobs[k]) + } + } + for k := range r.Pipelines { + if k == key { + found = append(found, r.Pipelines[k]) + } + } + + if len(found) == 0 { + return nil, fmt.Errorf("no such resource: %s", key) + } + + if len(found) > 1 { + keys := make([]string, 0, len(found)) + for _, r := range found { + keys = append(keys, fmt.Sprintf("%s:%s", r.TerraformResourceName(), key)) + } + return nil, fmt.Errorf("ambiguous: %s (can resolve to all of %s)", key, keys) + } + + return found[0], nil +} diff --git a/bundle/config/resources/job.go b/bundle/config/resources/job.go index bd43ed0a..da85f94d 100644 --- a/bundle/config/resources/job.go +++ b/bundle/config/resources/job.go @@ -1,7 +1,12 @@ package resources import ( + "context" + "strconv" + "github.com/databricks/cli/bundle/config/paths" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/imdario/mergo" @@ -90,3 +95,22 @@ func (j *Job) MergeTasks() error { j.Tasks = tasks return nil } + +func (j *Job) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) { + jobId, err := strconv.Atoi(id) + if err != nil { + return false, err + } + _, err = w.Jobs.Get(ctx, jobs.GetJobRequest{ + JobId: int64(jobId), + }) + if err != nil { + log.Debugf(ctx, "job %s does not exist", id) + return false, err + } + return true, nil +} + +func (j *Job) TerraformResourceName() string { + return "databricks_job" +} diff --git a/bundle/config/resources/pipeline.go b/bundle/config/resources/pipeline.go index 43450dc4..97aeef15 100644 --- a/bundle/config/resources/pipeline.go +++ b/bundle/config/resources/pipeline.go @@ -1,9 +1,12 @@ package resources import ( + "context" "strings" "github.com/databricks/cli/bundle/config/paths" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/imdario/mergo" @@ -73,3 +76,18 @@ func (p *Pipeline) MergeClusters() error { p.Clusters = output return nil } + +func (p *Pipeline) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) { + _, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{ + PipelineId: id, + }) + if err != nil { + log.Debugf(ctx, "pipeline %s does not exist", id) + return false, err + } + return true, nil +} + +func (p *Pipeline) TerraformResourceName() string { + return "databricks_pipeline" +} diff --git a/bundle/deploy/lock/release.go b/bundle/deploy/lock/release.go index 68d4e0f9..4ea47c2f 100644 --- a/bundle/deploy/lock/release.go +++ b/bundle/deploy/lock/release.go @@ -12,6 +12,8 @@ import ( type Goal string const ( + GoalBind = Goal("bind") + GoalUnbind = Goal("unbind") GoalDeploy = Goal("deploy") GoalDestroy = Goal("destroy") ) @@ -46,6 +48,8 @@ func (m *release) Apply(ctx context.Context, b *bundle.Bundle) error { switch m.goal { case GoalDeploy: return b.Locker.Unlock(ctx) + case GoalBind, GoalUnbind: + return b.Locker.Unlock(ctx) case GoalDestroy: return b.Locker.Unlock(ctx, locker.AllowLockFileNotExist) default: diff --git a/bundle/deploy/terraform/import.go b/bundle/deploy/terraform/import.go new file mode 100644 index 00000000..5fc436f2 --- /dev/null +++ b/bundle/deploy/terraform/import.go @@ -0,0 +1,108 @@ +package terraform + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/cmdio" + "github.com/hashicorp/terraform-exec/tfexec" +) + +type BindOptions struct { + AutoApprove bool + ResourceType string + ResourceKey string + ResourceId string +} + +type importResource struct { + opts *BindOptions +} + +// Apply implements bundle.Mutator. +func (m *importResource) Apply(ctx context.Context, b *bundle.Bundle) error { + dir, err := Dir(ctx, b) + if err != nil { + return err + } + + tf := b.Terraform + if tf == nil { + return fmt.Errorf("terraform not initialized") + } + + err = tf.Init(ctx, tfexec.Upgrade(true)) + if err != nil { + return fmt.Errorf("terraform init: %w", err) + } + tmpDir, err := os.MkdirTemp("", "state-*") + if err != nil { + return fmt.Errorf("terraform init: %w", err) + } + tmpState := filepath.Join(tmpDir, TerraformStateFileName) + + importAddress := fmt.Sprintf("%s.%s", m.opts.ResourceType, m.opts.ResourceKey) + err = tf.Import(ctx, importAddress, m.opts.ResourceId, tfexec.StateOut(tmpState)) + if err != nil { + return fmt.Errorf("terraform import: %w", err) + } + + buf := bytes.NewBuffer(nil) + tf.SetStdout(buf) + + //lint:ignore SA1019 We use legacy -state flag for now to plan the import changes based on temporary state file + changed, err := tf.Plan(ctx, tfexec.State(tmpState), tfexec.Target(importAddress)) + if err != nil { + return fmt.Errorf("terraform plan: %w", err) + } + + defer os.RemoveAll(tmpDir) + + if changed && !m.opts.AutoApprove { + output := buf.String() + // Remove output starting from Warning until end of output + output = output[:bytes.Index([]byte(output), []byte("Warning:"))] + cmdio.LogString(ctx, output) + ans, err := cmdio.AskYesOrNo(ctx, "Confirm import changes? Changes will be remotely applied only after running 'bundle deploy'.") + if err != nil { + return err + } + if !ans { + return fmt.Errorf("import aborted") + } + } + + // If user confirmed changes, move the state file from temp dir to state location + f, err := os.Create(filepath.Join(dir, TerraformStateFileName)) + if err != nil { + return err + } + defer f.Close() + + tmpF, err := os.Open(tmpState) + if err != nil { + return err + } + defer tmpF.Close() + + _, err = io.Copy(f, tmpF) + if err != nil { + return err + } + + return nil +} + +// Name implements bundle.Mutator. +func (*importResource) Name() string { + return "terraform.Import" +} + +func Import(opts *BindOptions) bundle.Mutator { + return &importResource{opts: opts} +} diff --git a/bundle/deploy/terraform/unbind.go b/bundle/deploy/terraform/unbind.go new file mode 100644 index 00000000..74e15e18 --- /dev/null +++ b/bundle/deploy/terraform/unbind.go @@ -0,0 +1,41 @@ +package terraform + +import ( + "context" + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/hashicorp/terraform-exec/tfexec" +) + +type unbind struct { + resourceType string + resourceKey string +} + +func (m *unbind) Apply(ctx context.Context, b *bundle.Bundle) error { + tf := b.Terraform + if tf == nil { + return fmt.Errorf("terraform not initialized") + } + + err := tf.Init(ctx, tfexec.Upgrade(true)) + if err != nil { + return fmt.Errorf("terraform init: %w", err) + } + + err = tf.StateRm(ctx, fmt.Sprintf("%s.%s", m.resourceType, m.resourceKey)) + if err != nil { + return fmt.Errorf("terraform state rm: %w", err) + } + + return nil +} + +func (*unbind) Name() string { + return "terraform.Unbind" +} + +func Unbind(resourceType string, resourceKey string) bundle.Mutator { + return &unbind{resourceType: resourceType, resourceKey: resourceKey} +} diff --git a/bundle/phases/bind.go b/bundle/phases/bind.go new file mode 100644 index 00000000..b2e92d6e --- /dev/null +++ b/bundle/phases/bind.go @@ -0,0 +1,45 @@ +package phases + +import ( + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/deploy/lock" + "github.com/databricks/cli/bundle/deploy/terraform" +) + +func Bind(opts *terraform.BindOptions) bundle.Mutator { + return newPhase( + "bind", + []bundle.Mutator{ + lock.Acquire(), + bundle.Defer( + bundle.Seq( + terraform.StatePull(), + terraform.Interpolate(), + terraform.Write(), + terraform.Import(opts), + terraform.StatePush(), + ), + lock.Release(lock.GoalBind), + ), + }, + ) +} + +func Unbind(resourceType string, resourceKey string) bundle.Mutator { + return newPhase( + "unbind", + []bundle.Mutator{ + lock.Acquire(), + bundle.Defer( + bundle.Seq( + terraform.StatePull(), + terraform.Interpolate(), + terraform.Write(), + terraform.Unbind(resourceType, resourceKey), + terraform.StatePush(), + ), + lock.Release(lock.GoalUnbind), + ), + }, + ) +} diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 216d2921..f974a056 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -14,9 +14,9 @@ func Destroy() bundle.Mutator { lock.Acquire(), bundle.Defer( bundle.Seq( + terraform.StatePull(), terraform.Interpolate(), terraform.Write(), - terraform.StatePull(), terraform.Plan(terraform.PlanGoal("destroy")), terraform.Destroy(), terraform.StatePush(), diff --git a/cmd/bundle/bundle.go b/cmd/bundle/bundle.go index a82311d8..43a9ef68 100644 --- a/cmd/bundle/bundle.go +++ b/cmd/bundle/bundle.go @@ -1,6 +1,7 @@ package bundle import ( + "github.com/databricks/cli/cmd/bundle/deployment" "github.com/spf13/cobra" ) @@ -24,5 +25,6 @@ func New() *cobra.Command { cmd.AddCommand(newInitCommand()) cmd.AddCommand(newSummaryCommand()) cmd.AddCommand(newGenerateCommand()) + cmd.AddCommand(deployment.NewDeploymentCommand()) return cmd } diff --git a/cmd/bundle/deploy.go b/cmd/bundle/deploy.go index a83c268b..c76789c1 100644 --- a/cmd/bundle/deploy.go +++ b/cmd/bundle/deploy.go @@ -3,6 +3,7 @@ package bundle import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/phases" + "github.com/databricks/cli/cmd/bundle/utils" "github.com/spf13/cobra" ) @@ -10,7 +11,7 @@ func newDeployCommand() *cobra.Command { cmd := &cobra.Command{ Use: "deploy", Short: "Deploy bundle", - PreRunE: ConfigureBundleWithVariables, + PreRunE: utils.ConfigureBundleWithVariables, } var force bool diff --git a/cmd/bundle/deployment/bind.go b/cmd/bundle/deployment/bind.go new file mode 100644 index 00000000..54129280 --- /dev/null +++ b/cmd/bundle/deployment/bind.go @@ -0,0 +1,65 @@ +package deployment + +import ( + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/deploy/terraform" + "github.com/databricks/cli/bundle/phases" + "github.com/databricks/cli/cmd/bundle/utils" + "github.com/databricks/cli/libs/cmdio" + "github.com/spf13/cobra" +) + +func newBindCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "bind KEY RESOURCE_ID", + Short: "Bind bundle-defined resources to existing resources", + Args: cobra.ExactArgs(2), + PreRunE: utils.ConfigureBundleWithVariables, + } + + var autoApprove bool + var forceLock bool + cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Automatically approve the binding") + cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.") + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + b := bundle.Get(cmd.Context()) + r := b.Config.Resources + resource, err := r.FindResourceByConfigKey(args[0]) + if err != nil { + return err + } + + w := b.WorkspaceClient() + ctx := cmd.Context() + exists, err := resource.Exists(ctx, w, args[1]) + if err != nil { + return fmt.Errorf("failed to fetch the resource, err: %w", err) + } + + if !exists { + return fmt.Errorf("%s with an id '%s' is not found", resource.TerraformResourceName(), args[1]) + } + + b.Config.Bundle.Deployment.Lock.Force = forceLock + err = bundle.Apply(cmd.Context(), b, bundle.Seq( + phases.Initialize(), + phases.Bind(&terraform.BindOptions{ + AutoApprove: autoApprove, + ResourceType: resource.TerraformResourceName(), + ResourceKey: args[0], + ResourceId: args[1], + }), + )) + if err != nil { + return fmt.Errorf("failed to bind the resource, err: %w", err) + } + + cmdio.LogString(ctx, fmt.Sprintf("Successfully bound %s with an id '%s'. Run 'bundle deploy' to deploy changes to your workspace", resource.TerraformResourceName(), args[1])) + return nil + } + + return cmd +} diff --git a/cmd/bundle/deployment/deployment.go b/cmd/bundle/deployment/deployment.go new file mode 100644 index 00000000..d29a8e72 --- /dev/null +++ b/cmd/bundle/deployment/deployment.go @@ -0,0 +1,17 @@ +package deployment + +import ( + "github.com/spf13/cobra" +) + +func NewDeploymentCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "deployment", + Short: "Deployment related commands", + Long: "Deployment related commands", + } + + cmd.AddCommand(newBindCommand()) + cmd.AddCommand(newUnbindCommand()) + return cmd +} diff --git a/cmd/bundle/deployment/unbind.go b/cmd/bundle/deployment/unbind.go new file mode 100644 index 00000000..e7de8a3d --- /dev/null +++ b/cmd/bundle/deployment/unbind.go @@ -0,0 +1,37 @@ +package deployment + +import ( + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/phases" + "github.com/databricks/cli/cmd/bundle/utils" + "github.com/spf13/cobra" +) + +func newUnbindCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "unbind KEY", + Short: "Unbind bundle-defined resources from its managed remote resource", + Args: cobra.ExactArgs(1), + PreRunE: utils.ConfigureBundleWithVariables, + } + + var forceLock bool + cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.") + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + b := bundle.Get(cmd.Context()) + r := b.Config.Resources + resource, err := r.FindResourceByConfigKey(args[0]) + if err != nil { + return err + } + + b.Config.Bundle.Deployment.Lock.Force = forceLock + return bundle.Apply(cmd.Context(), b, bundle.Seq( + phases.Initialize(), + phases.Unbind(resource.TerraformResourceName(), args[0]), + )) + } + + return cmd +} diff --git a/cmd/bundle/destroy.go b/cmd/bundle/destroy.go index dad199bf..a0bfb1a4 100644 --- a/cmd/bundle/destroy.go +++ b/cmd/bundle/destroy.go @@ -6,6 +6,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/phases" + "github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/flags" "github.com/spf13/cobra" @@ -17,7 +18,7 @@ func newDestroyCommand() *cobra.Command { Use: "destroy", Short: "Destroy deployed bundle resources", - PreRunE: ConfigureBundleWithVariables, + PreRunE: utils.ConfigureBundleWithVariables, } var autoApprove bool diff --git a/cmd/bundle/generate.go b/cmd/bundle/generate.go index 89d7c6ad..6c48b158 100644 --- a/cmd/bundle/generate.go +++ b/cmd/bundle/generate.go @@ -2,6 +2,7 @@ package bundle import ( "github.com/databricks/cli/cmd/bundle/generate" + "github.com/databricks/cli/cmd/bundle/utils" "github.com/spf13/cobra" ) @@ -12,7 +13,7 @@ func newGenerateCommand() *cobra.Command { Use: "generate", Short: "Generate bundle configuration", Long: "Generate bundle configuration", - PreRunE: ConfigureBundleWithVariables, + PreRunE: utils.ConfigureBundleWithVariables, } cmd.AddCommand(generate.NewGenerateJobCommand()) diff --git a/cmd/bundle/run.go b/cmd/bundle/run.go index c1a8d4ea..54aa6ae7 100644 --- a/cmd/bundle/run.go +++ b/cmd/bundle/run.go @@ -8,6 +8,7 @@ import ( "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/run" + "github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/cmd/root" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/flags" @@ -20,7 +21,7 @@ func newRunCommand() *cobra.Command { Short: "Run a resource (e.g. a job or a pipeline)", Args: cobra.MaximumNArgs(1), - PreRunE: ConfigureBundleWithVariables, + PreRunE: utils.ConfigureBundleWithVariables, } var runOptions run.Options diff --git a/cmd/bundle/summary.go b/cmd/bundle/summary.go index efa3c679..596f7d3d 100644 --- a/cmd/bundle/summary.go +++ b/cmd/bundle/summary.go @@ -10,6 +10,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/phases" + "github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/cmd/root" "github.com/databricks/cli/libs/flags" "github.com/spf13/cobra" @@ -20,7 +21,7 @@ func newSummaryCommand() *cobra.Command { Use: "summary", Short: "Describe the bundle resources and their deployment states", - PreRunE: ConfigureBundleWithVariables, + PreRunE: utils.ConfigureBundleWithVariables, // This command is currently intended for the Databricks VSCode extension only Hidden: true, diff --git a/cmd/bundle/sync.go b/cmd/bundle/sync.go index ca81275b..d9f8582c 100644 --- a/cmd/bundle/sync.go +++ b/cmd/bundle/sync.go @@ -6,6 +6,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/phases" + "github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/sync" "github.com/spf13/cobra" @@ -48,7 +49,7 @@ func newSyncCommand() *cobra.Command { Short: "Synchronize bundle tree to the workspace", Args: cobra.NoArgs, - PreRunE: ConfigureBundleWithVariables, + PreRunE: utils.ConfigureBundleWithVariables, } var f syncFlags diff --git a/cmd/bundle/utils/utils.go b/cmd/bundle/utils/utils.go new file mode 100644 index 00000000..f68ab06b --- /dev/null +++ b/cmd/bundle/utils/utils.go @@ -0,0 +1,24 @@ +package utils + +import ( + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/cmd/root" + "github.com/spf13/cobra" +) + +func ConfigureBundleWithVariables(cmd *cobra.Command, args []string) error { + // Load bundle config and apply target + err := root.MustConfigureBundle(cmd, args) + if err != nil { + return err + } + + variables, err := cmd.Flags().GetStringSlice("var") + if err != nil { + return err + } + + // Initialize variables by assigning them values passed as command line flags + b := bundle.Get(cmd.Context()) + return b.Config.InitializeVariables(variables) +} diff --git a/cmd/bundle/validate.go b/cmd/bundle/validate.go index b98cbd52..01b8c18a 100644 --- a/cmd/bundle/validate.go +++ b/cmd/bundle/validate.go @@ -5,6 +5,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/phases" + "github.com/databricks/cli/cmd/bundle/utils" "github.com/spf13/cobra" ) @@ -13,7 +14,7 @@ func newValidateCommand() *cobra.Command { Use: "validate", Short: "Validate configuration", - PreRunE: ConfigureBundleWithVariables, + PreRunE: utils.ConfigureBundleWithVariables, } cmd.RunE = func(cmd *cobra.Command, args []string) error { diff --git a/cmd/bundle/variables.go b/cmd/bundle/variables.go index c3e4af64..f8f5167e 100644 --- a/cmd/bundle/variables.go +++ b/cmd/bundle/variables.go @@ -1,28 +1,9 @@ package bundle import ( - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/cmd/root" "github.com/spf13/cobra" ) -func ConfigureBundleWithVariables(cmd *cobra.Command, args []string) error { - // Load bundle config and apply target - err := root.MustConfigureBundle(cmd, args) - if err != nil { - return err - } - - variables, err := cmd.Flags().GetStringSlice("var") - if err != nil { - return err - } - - // Initialize variables by assigning them values passed as command line flags - b := bundle.Get(cmd.Context()) - return b.Config.InitializeVariables(variables) -} - func initVariableFlag(cmd *cobra.Command) { cmd.PersistentFlags().StringSlice("var", []string{}, `set values for variables defined in bundle config. Example: --var="foo=bar"`) } diff --git a/internal/bundle/bind_resource_test.go b/internal/bundle/bind_resource_test.go new file mode 100644 index 00000000..d44ad2c3 --- /dev/null +++ b/internal/bundle/bind_resource_test.go @@ -0,0 +1,185 @@ +package bundle + +import ( + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/databricks/cli/internal" + "github.com/databricks/cli/internal/acc" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func TestAccBindJobToExistingJob(t *testing.T) { + env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV") + t.Log(env) + + ctx, wt := acc.WorkspaceTest(t) + gt := &generateJobTest{T: t, w: wt.W} + + nodeTypeId := internal.GetNodeTypeId(env) + uniqueId := uuid.New().String() + bundleRoot, err := initTestTemplate(t, ctx, "basic", map[string]any{ + "unique_id": uniqueId, + "spark_version": "13.3.x-scala2.12", + "node_type_id": nodeTypeId, + }) + require.NoError(t, err) + + jobId := gt.createTestJob(ctx) + t.Cleanup(func() { + gt.destroyJob(ctx, jobId) + require.NoError(t, err) + }) + + t.Setenv("BUNDLE_ROOT", bundleRoot) + c := internal.NewCobraTestRunner(t, "bundle", "deployment", "bind", "foo", fmt.Sprint(jobId), "--auto-approve") + _, _, err = c.Run() + require.NoError(t, err) + + // Remove .databricks directory to simulate a fresh deployment + err = os.RemoveAll(filepath.Join(bundleRoot, ".databricks")) + require.NoError(t, err) + + err = deployBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + + // Check that job is bound and updated with config from bundle + job, err := w.Jobs.Get(ctx, jobs.GetJobRequest{ + JobId: jobId, + }) + require.NoError(t, err) + require.Equal(t, job.Settings.Name, fmt.Sprintf("test-job-basic-%s", uniqueId)) + require.Contains(t, job.Settings.Tasks[0].SparkPythonTask.PythonFile, "hello_world.py") + + c = internal.NewCobraTestRunner(t, "bundle", "deployment", "unbind", "foo") + _, _, err = c.Run() + require.NoError(t, err) + + // Remove .databricks directory to simulate a fresh deployment + err = os.RemoveAll(filepath.Join(bundleRoot, ".databricks")) + require.NoError(t, err) + + err = destroyBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + // Check that job is unbound and exists after bundle is destroyed + job, err = w.Jobs.Get(ctx, jobs.GetJobRequest{ + JobId: jobId, + }) + require.NoError(t, err) + require.Equal(t, job.Settings.Name, fmt.Sprintf("test-job-basic-%s", uniqueId)) + require.Contains(t, job.Settings.Tasks[0].SparkPythonTask.PythonFile, "hello_world.py") +} + +func TestAccAbortBind(t *testing.T) { + env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV") + t.Log(env) + + ctx, wt := acc.WorkspaceTest(t) + gt := &generateJobTest{T: t, w: wt.W} + + nodeTypeId := internal.GetNodeTypeId(env) + uniqueId := uuid.New().String() + bundleRoot, err := initTestTemplate(t, ctx, "basic", map[string]any{ + "unique_id": uniqueId, + "spark_version": "13.3.x-scala2.12", + "node_type_id": nodeTypeId, + }) + require.NoError(t, err) + + jobId := gt.createTestJob(ctx) + t.Cleanup(func() { + gt.destroyJob(ctx, jobId) + destroyBundle(t, ctx, bundleRoot) + }) + + t.Setenv("BUNDLE_ROOT", bundleRoot) + c := internal.NewCobraTestRunner(t, "bundle", "deployment", "bind", "foo", fmt.Sprint(jobId)) + + // Simulate user aborting the bind. This is done by not providing any input to the prompt in non-interactive mode. + _, _, err = c.Run() + require.ErrorContains(t, err, "failed to bind the resource") + + err = deployBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + + // Check that job is not bound and not updated with config from bundle + job, err := w.Jobs.Get(ctx, jobs.GetJobRequest{ + JobId: jobId, + }) + require.NoError(t, err) + + require.NotEqual(t, job.Settings.Name, fmt.Sprintf("test-job-basic-%s", uniqueId)) + require.Contains(t, job.Settings.Tasks[0].NotebookTask.NotebookPath, "test") +} + +func TestAccGenerateAndBind(t *testing.T) { + env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV") + t.Log(env) + + ctx, wt := acc.WorkspaceTest(t) + gt := &generateJobTest{T: t, w: wt.W} + + uniqueId := uuid.New().String() + bundleRoot, err := initTestTemplate(t, ctx, "with_includes", map[string]any{ + "unique_id": uniqueId, + }) + require.NoError(t, err) + + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + + jobId := gt.createTestJob(ctx) + t.Cleanup(func() { + _, err = w.Jobs.Get(ctx, jobs.GetJobRequest{ + JobId: jobId, + }) + if err == nil { + gt.destroyJob(ctx, jobId) + } + }) + + t.Setenv("BUNDLE_ROOT", bundleRoot) + c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "generate", "job", + "--key", "test_job_key", + "--existing-job-id", fmt.Sprint(jobId), + "--config-dir", filepath.Join(bundleRoot, "resources"), + "--source-dir", filepath.Join(bundleRoot, "src")) + _, _, err = c.Run() + require.NoError(t, err) + + _, err = os.Stat(filepath.Join(bundleRoot, "src", "test.py")) + require.NoError(t, err) + + matches, err := filepath.Glob(filepath.Join(bundleRoot, "resources", "test_job_key.yml")) + require.NoError(t, err) + + require.Len(t, matches, 1) + + c = internal.NewCobraTestRunner(t, "bundle", "deployment", "bind", "test_job_key", fmt.Sprint(jobId), "--auto-approve") + _, _, err = c.Run() + require.NoError(t, err) + + err = deployBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + err = destroyBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + // Check that job is bound and does not extsts after bundle is destroyed + _, err = w.Jobs.Get(ctx, jobs.GetJobRequest{ + JobId: jobId, + }) + require.ErrorContains(t, err, "does not exist.") +} diff --git a/internal/bundle/bundles/with_includes/template/databricks.yml.tmpl b/internal/bundle/bundles/with_includes/template/databricks.yml.tmpl index 5d17e0fd..85d31ce3 100644 --- a/internal/bundle/bundles/with_includes/template/databricks.yml.tmpl +++ b/internal/bundle/bundles/with_includes/template/databricks.yml.tmpl @@ -4,5 +4,5 @@ bundle: workspace: root_path: "~/.bundle/{{.unique_id}}" -includes: - - resources/*yml +include: + - resources/*.yml diff --git a/internal/bundle/generate_pipeline_test.go b/internal/bundle/generate_pipeline_test.go index 0005e29f..b8a1ac84 100644 --- a/internal/bundle/generate_pipeline_test.go +++ b/internal/bundle/generate_pipeline_test.go @@ -28,7 +28,7 @@ func TestAccGenerateFromExistingPipelineAndDeploy(t *testing.T) { }) require.NoError(t, err) - pipelineId := gt.createTestPipeline(ctx) + pipelineId, name := gt.createTestPipeline(ctx) t.Cleanup(func() { gt.destroyPipeline(ctx, pipelineId) }) @@ -52,9 +52,16 @@ func TestAccGenerateFromExistingPipelineAndDeploy(t *testing.T) { require.Len(t, matches, 1) // check the content of generated yaml - data, err := os.ReadFile(matches[0]) + fileName := matches[0] + data, err := os.ReadFile(fileName) require.NoError(t, err) generatedYaml := string(data) + + // Replace pipeline name + generatedYaml = strings.ReplaceAll(generatedYaml, name, internal.RandomName("copy-generated-pipeline-")) + err = os.WriteFile(fileName, []byte(generatedYaml), 0644) + require.NoError(t, err) + require.Contains(t, generatedYaml, "libraries:") require.Contains(t, generatedYaml, "- notebook:") require.Contains(t, generatedYaml, fmt.Sprintf("path: %s", filepath.Join("..", "src", "notebook.py"))) @@ -73,7 +80,7 @@ type generatePipelineTest struct { w *databricks.WorkspaceClient } -func (gt *generatePipelineTest) createTestPipeline(ctx context.Context) string { +func (gt *generatePipelineTest) createTestPipeline(ctx context.Context) (string, string) { t := gt.T w := gt.w @@ -87,8 +94,9 @@ func (gt *generatePipelineTest) createTestPipeline(ctx context.Context) string { err = f.Write(ctx, "test.py", strings.NewReader("print('Hello!')")) require.NoError(t, err) + name := internal.RandomName("generated-pipeline-") resp, err := w.Pipelines.Create(ctx, pipelines.CreatePipeline{ - Name: internal.RandomName("generated-pipeline-"), + Name: name, Libraries: []pipelines.PipelineLibrary{ { Notebook: &pipelines.NotebookLibrary{ @@ -104,7 +112,7 @@ func (gt *generatePipelineTest) createTestPipeline(ctx context.Context) string { }) require.NoError(t, err) - return resp.PipelineId + return resp.PipelineId, name } func (gt *generatePipelineTest) destroyPipeline(ctx context.Context, pipelineId string) { diff --git a/internal/helpers.go b/internal/helpers.go index 22e38e21..6377ae07 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -131,6 +131,14 @@ func (t *cobraTestRunner) WaitForTextPrinted(text string, timeout time.Duration) }, timeout, 50*time.Millisecond) } +func (t *cobraTestRunner) WaitForOutput(text string, timeout time.Duration) { + require.Eventually(t.T, func() bool { + currentStdout := t.stdout.String() + currentErrout := t.stderr.String() + return strings.Contains(currentStdout, text) || strings.Contains(currentErrout, text) + }, timeout, 50*time.Millisecond) +} + func (t *cobraTestRunner) WithStdin() { reader, writer := io.Pipe() t.stdinR = reader