Added `bundle deployment bind` and `unbind` command (#1131)

## Changes
Added `bundle deployment bind` and `unbind` command.

This command allows to bind bundle-defined resources to existing
resources in Databricks workspace so they become DABs-managed.

## Tests
Manually + added E2E test
This commit is contained in:
Andrew Nester 2024-02-14 19:04:45 +01:00 committed by GitHub
parent e8b0698e19
commit 80670eceed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 643 additions and 34 deletions

View File

@ -1,9 +1,11 @@
package config package config
import ( import (
"context"
"fmt" "fmt"
"github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go"
) )
// Resources defines Databricks resources associated with the bundle. // Resources defines Databricks resources associated with the bundle.
@ -168,3 +170,36 @@ func (r *Resources) Merge() error {
} }
return nil return nil
} }
type ConfigResource interface {
Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error)
TerraformResourceName() string
}
func (r *Resources) FindResourceByConfigKey(key string) (ConfigResource, error) {
found := make([]ConfigResource, 0)
for k := range r.Jobs {
if k == key {
found = append(found, r.Jobs[k])
}
}
for k := range r.Pipelines {
if k == key {
found = append(found, r.Pipelines[k])
}
}
if len(found) == 0 {
return nil, fmt.Errorf("no such resource: %s", key)
}
if len(found) > 1 {
keys := make([]string, 0, len(found))
for _, r := range found {
keys = append(keys, fmt.Sprintf("%s:%s", r.TerraformResourceName(), key))
}
return nil, fmt.Errorf("ambiguous: %s (can resolve to all of %s)", key, keys)
}
return found[0], nil
}

View File

@ -1,7 +1,12 @@
package resources package resources
import ( import (
"context"
"strconv"
"github.com/databricks/cli/bundle/config/paths" "github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/imdario/mergo" "github.com/imdario/mergo"
@ -90,3 +95,22 @@ func (j *Job) MergeTasks() error {
j.Tasks = tasks j.Tasks = tasks
return nil return nil
} }
func (j *Job) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
jobId, err := strconv.Atoi(id)
if err != nil {
return false, err
}
_, err = w.Jobs.Get(ctx, jobs.GetJobRequest{
JobId: int64(jobId),
})
if err != nil {
log.Debugf(ctx, "job %s does not exist", id)
return false, err
}
return true, nil
}
func (j *Job) TerraformResourceName() string {
return "databricks_job"
}

View File

@ -1,9 +1,12 @@
package resources package resources
import ( import (
"context"
"strings" "strings"
"github.com/databricks/cli/bundle/config/paths" "github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/imdario/mergo" "github.com/imdario/mergo"
@ -73,3 +76,18 @@ func (p *Pipeline) MergeClusters() error {
p.Clusters = output p.Clusters = output
return nil return nil
} }
func (p *Pipeline) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
_, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{
PipelineId: id,
})
if err != nil {
log.Debugf(ctx, "pipeline %s does not exist", id)
return false, err
}
return true, nil
}
func (p *Pipeline) TerraformResourceName() string {
return "databricks_pipeline"
}

View File

@ -12,6 +12,8 @@ import (
type Goal string type Goal string
const ( const (
GoalBind = Goal("bind")
GoalUnbind = Goal("unbind")
GoalDeploy = Goal("deploy") GoalDeploy = Goal("deploy")
GoalDestroy = Goal("destroy") GoalDestroy = Goal("destroy")
) )
@ -46,6 +48,8 @@ func (m *release) Apply(ctx context.Context, b *bundle.Bundle) error {
switch m.goal { switch m.goal {
case GoalDeploy: case GoalDeploy:
return b.Locker.Unlock(ctx) return b.Locker.Unlock(ctx)
case GoalBind, GoalUnbind:
return b.Locker.Unlock(ctx)
case GoalDestroy: case GoalDestroy:
return b.Locker.Unlock(ctx, locker.AllowLockFileNotExist) return b.Locker.Unlock(ctx, locker.AllowLockFileNotExist)
default: default:

View File

@ -0,0 +1,108 @@
package terraform
import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/cmdio"
"github.com/hashicorp/terraform-exec/tfexec"
)
type BindOptions struct {
AutoApprove bool
ResourceType string
ResourceKey string
ResourceId string
}
type importResource struct {
opts *BindOptions
}
// Apply implements bundle.Mutator.
func (m *importResource) Apply(ctx context.Context, b *bundle.Bundle) error {
dir, err := Dir(ctx, b)
if err != nil {
return err
}
tf := b.Terraform
if tf == nil {
return fmt.Errorf("terraform not initialized")
}
err = tf.Init(ctx, tfexec.Upgrade(true))
if err != nil {
return fmt.Errorf("terraform init: %w", err)
}
tmpDir, err := os.MkdirTemp("", "state-*")
if err != nil {
return fmt.Errorf("terraform init: %w", err)
}
tmpState := filepath.Join(tmpDir, TerraformStateFileName)
importAddress := fmt.Sprintf("%s.%s", m.opts.ResourceType, m.opts.ResourceKey)
err = tf.Import(ctx, importAddress, m.opts.ResourceId, tfexec.StateOut(tmpState))
if err != nil {
return fmt.Errorf("terraform import: %w", err)
}
buf := bytes.NewBuffer(nil)
tf.SetStdout(buf)
//lint:ignore SA1019 We use legacy -state flag for now to plan the import changes based on temporary state file
changed, err := tf.Plan(ctx, tfexec.State(tmpState), tfexec.Target(importAddress))
if err != nil {
return fmt.Errorf("terraform plan: %w", err)
}
defer os.RemoveAll(tmpDir)
if changed && !m.opts.AutoApprove {
output := buf.String()
// Remove output starting from Warning until end of output
output = output[:bytes.Index([]byte(output), []byte("Warning:"))]
cmdio.LogString(ctx, output)
ans, err := cmdio.AskYesOrNo(ctx, "Confirm import changes? Changes will be remotely applied only after running 'bundle deploy'.")
if err != nil {
return err
}
if !ans {
return fmt.Errorf("import aborted")
}
}
// If user confirmed changes, move the state file from temp dir to state location
f, err := os.Create(filepath.Join(dir, TerraformStateFileName))
if err != nil {
return err
}
defer f.Close()
tmpF, err := os.Open(tmpState)
if err != nil {
return err
}
defer tmpF.Close()
_, err = io.Copy(f, tmpF)
if err != nil {
return err
}
return nil
}
// Name implements bundle.Mutator.
func (*importResource) Name() string {
return "terraform.Import"
}
func Import(opts *BindOptions) bundle.Mutator {
return &importResource{opts: opts}
}

View File

@ -0,0 +1,41 @@
package terraform
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/hashicorp/terraform-exec/tfexec"
)
type unbind struct {
resourceType string
resourceKey string
}
func (m *unbind) Apply(ctx context.Context, b *bundle.Bundle) error {
tf := b.Terraform
if tf == nil {
return fmt.Errorf("terraform not initialized")
}
err := tf.Init(ctx, tfexec.Upgrade(true))
if err != nil {
return fmt.Errorf("terraform init: %w", err)
}
err = tf.StateRm(ctx, fmt.Sprintf("%s.%s", m.resourceType, m.resourceKey))
if err != nil {
return fmt.Errorf("terraform state rm: %w", err)
}
return nil
}
func (*unbind) Name() string {
return "terraform.Unbind"
}
func Unbind(resourceType string, resourceKey string) bundle.Mutator {
return &unbind{resourceType: resourceType, resourceKey: resourceKey}
}

45
bundle/phases/bind.go Normal file
View File

@ -0,0 +1,45 @@
package phases
import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/deploy/lock"
"github.com/databricks/cli/bundle/deploy/terraform"
)
func Bind(opts *terraform.BindOptions) bundle.Mutator {
return newPhase(
"bind",
[]bundle.Mutator{
lock.Acquire(),
bundle.Defer(
bundle.Seq(
terraform.StatePull(),
terraform.Interpolate(),
terraform.Write(),
terraform.Import(opts),
terraform.StatePush(),
),
lock.Release(lock.GoalBind),
),
},
)
}
func Unbind(resourceType string, resourceKey string) bundle.Mutator {
return newPhase(
"unbind",
[]bundle.Mutator{
lock.Acquire(),
bundle.Defer(
bundle.Seq(
terraform.StatePull(),
terraform.Interpolate(),
terraform.Write(),
terraform.Unbind(resourceType, resourceKey),
terraform.StatePush(),
),
lock.Release(lock.GoalUnbind),
),
},
)
}

View File

@ -14,9 +14,9 @@ func Destroy() bundle.Mutator {
lock.Acquire(), lock.Acquire(),
bundle.Defer( bundle.Defer(
bundle.Seq( bundle.Seq(
terraform.StatePull(),
terraform.Interpolate(), terraform.Interpolate(),
terraform.Write(), terraform.Write(),
terraform.StatePull(),
terraform.Plan(terraform.PlanGoal("destroy")), terraform.Plan(terraform.PlanGoal("destroy")),
terraform.Destroy(), terraform.Destroy(),
terraform.StatePush(), terraform.StatePush(),

View File

@ -1,6 +1,7 @@
package bundle package bundle
import ( import (
"github.com/databricks/cli/cmd/bundle/deployment"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -24,5 +25,6 @@ func New() *cobra.Command {
cmd.AddCommand(newInitCommand()) cmd.AddCommand(newInitCommand())
cmd.AddCommand(newSummaryCommand()) cmd.AddCommand(newSummaryCommand())
cmd.AddCommand(newGenerateCommand()) cmd.AddCommand(newGenerateCommand())
cmd.AddCommand(deployment.NewDeploymentCommand())
return cmd return cmd
} }

View File

@ -3,6 +3,7 @@ package bundle
import ( import (
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/phases"
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -10,7 +11,7 @@ func newDeployCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "deploy", Use: "deploy",
Short: "Deploy bundle", Short: "Deploy bundle",
PreRunE: ConfigureBundleWithVariables, PreRunE: utils.ConfigureBundleWithVariables,
} }
var force bool var force bool

View File

@ -0,0 +1,65 @@
package deployment
import (
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/deploy/terraform"
"github.com/databricks/cli/bundle/phases"
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/libs/cmdio"
"github.com/spf13/cobra"
)
func newBindCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "bind KEY RESOURCE_ID",
Short: "Bind bundle-defined resources to existing resources",
Args: cobra.ExactArgs(2),
PreRunE: utils.ConfigureBundleWithVariables,
}
var autoApprove bool
var forceLock bool
cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Automatically approve the binding")
cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.")
cmd.RunE = func(cmd *cobra.Command, args []string) error {
b := bundle.Get(cmd.Context())
r := b.Config.Resources
resource, err := r.FindResourceByConfigKey(args[0])
if err != nil {
return err
}
w := b.WorkspaceClient()
ctx := cmd.Context()
exists, err := resource.Exists(ctx, w, args[1])
if err != nil {
return fmt.Errorf("failed to fetch the resource, err: %w", err)
}
if !exists {
return fmt.Errorf("%s with an id '%s' is not found", resource.TerraformResourceName(), args[1])
}
b.Config.Bundle.Deployment.Lock.Force = forceLock
err = bundle.Apply(cmd.Context(), b, bundle.Seq(
phases.Initialize(),
phases.Bind(&terraform.BindOptions{
AutoApprove: autoApprove,
ResourceType: resource.TerraformResourceName(),
ResourceKey: args[0],
ResourceId: args[1],
}),
))
if err != nil {
return fmt.Errorf("failed to bind the resource, err: %w", err)
}
cmdio.LogString(ctx, fmt.Sprintf("Successfully bound %s with an id '%s'. Run 'bundle deploy' to deploy changes to your workspace", resource.TerraformResourceName(), args[1]))
return nil
}
return cmd
}

View File

@ -0,0 +1,17 @@
package deployment
import (
"github.com/spf13/cobra"
)
func NewDeploymentCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "deployment",
Short: "Deployment related commands",
Long: "Deployment related commands",
}
cmd.AddCommand(newBindCommand())
cmd.AddCommand(newUnbindCommand())
return cmd
}

View File

@ -0,0 +1,37 @@
package deployment
import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/phases"
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/spf13/cobra"
)
func newUnbindCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "unbind KEY",
Short: "Unbind bundle-defined resources from its managed remote resource",
Args: cobra.ExactArgs(1),
PreRunE: utils.ConfigureBundleWithVariables,
}
var forceLock bool
cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.")
cmd.RunE = func(cmd *cobra.Command, args []string) error {
b := bundle.Get(cmd.Context())
r := b.Config.Resources
resource, err := r.FindResourceByConfigKey(args[0])
if err != nil {
return err
}
b.Config.Bundle.Deployment.Lock.Force = forceLock
return bundle.Apply(cmd.Context(), b, bundle.Seq(
phases.Initialize(),
phases.Unbind(resource.TerraformResourceName(), args[0]),
))
}
return cmd
}

View File

@ -6,6 +6,7 @@ import (
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/phases"
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/flags" "github.com/databricks/cli/libs/flags"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -17,7 +18,7 @@ func newDestroyCommand() *cobra.Command {
Use: "destroy", Use: "destroy",
Short: "Destroy deployed bundle resources", Short: "Destroy deployed bundle resources",
PreRunE: ConfigureBundleWithVariables, PreRunE: utils.ConfigureBundleWithVariables,
} }
var autoApprove bool var autoApprove bool

View File

@ -2,6 +2,7 @@ package bundle
import ( import (
"github.com/databricks/cli/cmd/bundle/generate" "github.com/databricks/cli/cmd/bundle/generate"
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -12,7 +13,7 @@ func newGenerateCommand() *cobra.Command {
Use: "generate", Use: "generate",
Short: "Generate bundle configuration", Short: "Generate bundle configuration",
Long: "Generate bundle configuration", Long: "Generate bundle configuration",
PreRunE: ConfigureBundleWithVariables, PreRunE: utils.ConfigureBundleWithVariables,
} }
cmd.AddCommand(generate.NewGenerateJobCommand()) cmd.AddCommand(generate.NewGenerateJobCommand())

View File

@ -8,6 +8,7 @@ import (
"github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/deploy/terraform"
"github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/phases"
"github.com/databricks/cli/bundle/run" "github.com/databricks/cli/bundle/run"
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/cmd/root" "github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/flags" "github.com/databricks/cli/libs/flags"
@ -20,7 +21,7 @@ func newRunCommand() *cobra.Command {
Short: "Run a resource (e.g. a job or a pipeline)", Short: "Run a resource (e.g. a job or a pipeline)",
Args: cobra.MaximumNArgs(1), Args: cobra.MaximumNArgs(1),
PreRunE: ConfigureBundleWithVariables, PreRunE: utils.ConfigureBundleWithVariables,
} }
var runOptions run.Options var runOptions run.Options

View File

@ -10,6 +10,7 @@ import (
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/deploy/terraform"
"github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/phases"
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/cmd/root" "github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/flags" "github.com/databricks/cli/libs/flags"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -20,7 +21,7 @@ func newSummaryCommand() *cobra.Command {
Use: "summary", Use: "summary",
Short: "Describe the bundle resources and their deployment states", Short: "Describe the bundle resources and their deployment states",
PreRunE: ConfigureBundleWithVariables, PreRunE: utils.ConfigureBundleWithVariables,
// This command is currently intended for the Databricks VSCode extension only // This command is currently intended for the Databricks VSCode extension only
Hidden: true, Hidden: true,

View File

@ -6,6 +6,7 @@ import (
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/phases"
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/sync" "github.com/databricks/cli/libs/sync"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -48,7 +49,7 @@ func newSyncCommand() *cobra.Command {
Short: "Synchronize bundle tree to the workspace", Short: "Synchronize bundle tree to the workspace",
Args: cobra.NoArgs, Args: cobra.NoArgs,
PreRunE: ConfigureBundleWithVariables, PreRunE: utils.ConfigureBundleWithVariables,
} }
var f syncFlags var f syncFlags

24
cmd/bundle/utils/utils.go Normal file
View File

@ -0,0 +1,24 @@
package utils
import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/cmd/root"
"github.com/spf13/cobra"
)
func ConfigureBundleWithVariables(cmd *cobra.Command, args []string) error {
// Load bundle config and apply target
err := root.MustConfigureBundle(cmd, args)
if err != nil {
return err
}
variables, err := cmd.Flags().GetStringSlice("var")
if err != nil {
return err
}
// Initialize variables by assigning them values passed as command line flags
b := bundle.Get(cmd.Context())
return b.Config.InitializeVariables(variables)
}

View File

@ -5,6 +5,7 @@ import (
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/phases" "github.com/databricks/cli/bundle/phases"
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -13,7 +14,7 @@ func newValidateCommand() *cobra.Command {
Use: "validate", Use: "validate",
Short: "Validate configuration", Short: "Validate configuration",
PreRunE: ConfigureBundleWithVariables, PreRunE: utils.ConfigureBundleWithVariables,
} }
cmd.RunE = func(cmd *cobra.Command, args []string) error { cmd.RunE = func(cmd *cobra.Command, args []string) error {

View File

@ -1,28 +1,9 @@
package bundle package bundle
import ( import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/cmd/root"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
func ConfigureBundleWithVariables(cmd *cobra.Command, args []string) error {
// Load bundle config and apply target
err := root.MustConfigureBundle(cmd, args)
if err != nil {
return err
}
variables, err := cmd.Flags().GetStringSlice("var")
if err != nil {
return err
}
// Initialize variables by assigning them values passed as command line flags
b := bundle.Get(cmd.Context())
return b.Config.InitializeVariables(variables)
}
func initVariableFlag(cmd *cobra.Command) { func initVariableFlag(cmd *cobra.Command) {
cmd.PersistentFlags().StringSlice("var", []string{}, `set values for variables defined in bundle config. Example: --var="foo=bar"`) cmd.PersistentFlags().StringSlice("var", []string{}, `set values for variables defined in bundle config. Example: --var="foo=bar"`)
} }

View File

@ -0,0 +1,185 @@
package bundle
import (
"fmt"
"os"
"path/filepath"
"testing"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/internal/acc"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
func TestAccBindJobToExistingJob(t *testing.T) {
env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
t.Log(env)
ctx, wt := acc.WorkspaceTest(t)
gt := &generateJobTest{T: t, w: wt.W}
nodeTypeId := internal.GetNodeTypeId(env)
uniqueId := uuid.New().String()
bundleRoot, err := initTestTemplate(t, ctx, "basic", map[string]any{
"unique_id": uniqueId,
"spark_version": "13.3.x-scala2.12",
"node_type_id": nodeTypeId,
})
require.NoError(t, err)
jobId := gt.createTestJob(ctx)
t.Cleanup(func() {
gt.destroyJob(ctx, jobId)
require.NoError(t, err)
})
t.Setenv("BUNDLE_ROOT", bundleRoot)
c := internal.NewCobraTestRunner(t, "bundle", "deployment", "bind", "foo", fmt.Sprint(jobId), "--auto-approve")
_, _, err = c.Run()
require.NoError(t, err)
// Remove .databricks directory to simulate a fresh deployment
err = os.RemoveAll(filepath.Join(bundleRoot, ".databricks"))
require.NoError(t, err)
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
// Check that job is bound and updated with config from bundle
job, err := w.Jobs.Get(ctx, jobs.GetJobRequest{
JobId: jobId,
})
require.NoError(t, err)
require.Equal(t, job.Settings.Name, fmt.Sprintf("test-job-basic-%s", uniqueId))
require.Contains(t, job.Settings.Tasks[0].SparkPythonTask.PythonFile, "hello_world.py")
c = internal.NewCobraTestRunner(t, "bundle", "deployment", "unbind", "foo")
_, _, err = c.Run()
require.NoError(t, err)
// Remove .databricks directory to simulate a fresh deployment
err = os.RemoveAll(filepath.Join(bundleRoot, ".databricks"))
require.NoError(t, err)
err = destroyBundle(t, ctx, bundleRoot)
require.NoError(t, err)
// Check that job is unbound and exists after bundle is destroyed
job, err = w.Jobs.Get(ctx, jobs.GetJobRequest{
JobId: jobId,
})
require.NoError(t, err)
require.Equal(t, job.Settings.Name, fmt.Sprintf("test-job-basic-%s", uniqueId))
require.Contains(t, job.Settings.Tasks[0].SparkPythonTask.PythonFile, "hello_world.py")
}
func TestAccAbortBind(t *testing.T) {
env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
t.Log(env)
ctx, wt := acc.WorkspaceTest(t)
gt := &generateJobTest{T: t, w: wt.W}
nodeTypeId := internal.GetNodeTypeId(env)
uniqueId := uuid.New().String()
bundleRoot, err := initTestTemplate(t, ctx, "basic", map[string]any{
"unique_id": uniqueId,
"spark_version": "13.3.x-scala2.12",
"node_type_id": nodeTypeId,
})
require.NoError(t, err)
jobId := gt.createTestJob(ctx)
t.Cleanup(func() {
gt.destroyJob(ctx, jobId)
destroyBundle(t, ctx, bundleRoot)
})
t.Setenv("BUNDLE_ROOT", bundleRoot)
c := internal.NewCobraTestRunner(t, "bundle", "deployment", "bind", "foo", fmt.Sprint(jobId))
// Simulate user aborting the bind. This is done by not providing any input to the prompt in non-interactive mode.
_, _, err = c.Run()
require.ErrorContains(t, err, "failed to bind the resource")
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
// Check that job is not bound and not updated with config from bundle
job, err := w.Jobs.Get(ctx, jobs.GetJobRequest{
JobId: jobId,
})
require.NoError(t, err)
require.NotEqual(t, job.Settings.Name, fmt.Sprintf("test-job-basic-%s", uniqueId))
require.Contains(t, job.Settings.Tasks[0].NotebookTask.NotebookPath, "test")
}
func TestAccGenerateAndBind(t *testing.T) {
env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
t.Log(env)
ctx, wt := acc.WorkspaceTest(t)
gt := &generateJobTest{T: t, w: wt.W}
uniqueId := uuid.New().String()
bundleRoot, err := initTestTemplate(t, ctx, "with_includes", map[string]any{
"unique_id": uniqueId,
})
require.NoError(t, err)
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
jobId := gt.createTestJob(ctx)
t.Cleanup(func() {
_, err = w.Jobs.Get(ctx, jobs.GetJobRequest{
JobId: jobId,
})
if err == nil {
gt.destroyJob(ctx, jobId)
}
})
t.Setenv("BUNDLE_ROOT", bundleRoot)
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "generate", "job",
"--key", "test_job_key",
"--existing-job-id", fmt.Sprint(jobId),
"--config-dir", filepath.Join(bundleRoot, "resources"),
"--source-dir", filepath.Join(bundleRoot, "src"))
_, _, err = c.Run()
require.NoError(t, err)
_, err = os.Stat(filepath.Join(bundleRoot, "src", "test.py"))
require.NoError(t, err)
matches, err := filepath.Glob(filepath.Join(bundleRoot, "resources", "test_job_key.yml"))
require.NoError(t, err)
require.Len(t, matches, 1)
c = internal.NewCobraTestRunner(t, "bundle", "deployment", "bind", "test_job_key", fmt.Sprint(jobId), "--auto-approve")
_, _, err = c.Run()
require.NoError(t, err)
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)
err = destroyBundle(t, ctx, bundleRoot)
require.NoError(t, err)
// Check that job is bound and does not extsts after bundle is destroyed
_, err = w.Jobs.Get(ctx, jobs.GetJobRequest{
JobId: jobId,
})
require.ErrorContains(t, err, "does not exist.")
}

View File

@ -4,5 +4,5 @@ bundle:
workspace: workspace:
root_path: "~/.bundle/{{.unique_id}}" root_path: "~/.bundle/{{.unique_id}}"
includes: include:
- resources/*yml - resources/*.yml

View File

@ -28,7 +28,7 @@ func TestAccGenerateFromExistingPipelineAndDeploy(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
pipelineId := gt.createTestPipeline(ctx) pipelineId, name := gt.createTestPipeline(ctx)
t.Cleanup(func() { t.Cleanup(func() {
gt.destroyPipeline(ctx, pipelineId) gt.destroyPipeline(ctx, pipelineId)
}) })
@ -52,9 +52,16 @@ func TestAccGenerateFromExistingPipelineAndDeploy(t *testing.T) {
require.Len(t, matches, 1) require.Len(t, matches, 1)
// check the content of generated yaml // check the content of generated yaml
data, err := os.ReadFile(matches[0]) fileName := matches[0]
data, err := os.ReadFile(fileName)
require.NoError(t, err) require.NoError(t, err)
generatedYaml := string(data) generatedYaml := string(data)
// Replace pipeline name
generatedYaml = strings.ReplaceAll(generatedYaml, name, internal.RandomName("copy-generated-pipeline-"))
err = os.WriteFile(fileName, []byte(generatedYaml), 0644)
require.NoError(t, err)
require.Contains(t, generatedYaml, "libraries:") require.Contains(t, generatedYaml, "libraries:")
require.Contains(t, generatedYaml, "- notebook:") require.Contains(t, generatedYaml, "- notebook:")
require.Contains(t, generatedYaml, fmt.Sprintf("path: %s", filepath.Join("..", "src", "notebook.py"))) require.Contains(t, generatedYaml, fmt.Sprintf("path: %s", filepath.Join("..", "src", "notebook.py")))
@ -73,7 +80,7 @@ type generatePipelineTest struct {
w *databricks.WorkspaceClient w *databricks.WorkspaceClient
} }
func (gt *generatePipelineTest) createTestPipeline(ctx context.Context) string { func (gt *generatePipelineTest) createTestPipeline(ctx context.Context) (string, string) {
t := gt.T t := gt.T
w := gt.w w := gt.w
@ -87,8 +94,9 @@ func (gt *generatePipelineTest) createTestPipeline(ctx context.Context) string {
err = f.Write(ctx, "test.py", strings.NewReader("print('Hello!')")) err = f.Write(ctx, "test.py", strings.NewReader("print('Hello!')"))
require.NoError(t, err) require.NoError(t, err)
name := internal.RandomName("generated-pipeline-")
resp, err := w.Pipelines.Create(ctx, pipelines.CreatePipeline{ resp, err := w.Pipelines.Create(ctx, pipelines.CreatePipeline{
Name: internal.RandomName("generated-pipeline-"), Name: name,
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
Notebook: &pipelines.NotebookLibrary{ Notebook: &pipelines.NotebookLibrary{
@ -104,7 +112,7 @@ func (gt *generatePipelineTest) createTestPipeline(ctx context.Context) string {
}) })
require.NoError(t, err) require.NoError(t, err)
return resp.PipelineId return resp.PipelineId, name
} }
func (gt *generatePipelineTest) destroyPipeline(ctx context.Context, pipelineId string) { func (gt *generatePipelineTest) destroyPipeline(ctx context.Context, pipelineId string) {

View File

@ -131,6 +131,14 @@ func (t *cobraTestRunner) WaitForTextPrinted(text string, timeout time.Duration)
}, timeout, 50*time.Millisecond) }, timeout, 50*time.Millisecond)
} }
func (t *cobraTestRunner) WaitForOutput(text string, timeout time.Duration) {
require.Eventually(t.T, func() bool {
currentStdout := t.stdout.String()
currentErrout := t.stderr.String()
return strings.Contains(currentStdout, text) || strings.Contains(currentErrout, text)
}, timeout, 50*time.Millisecond)
}
func (t *cobraTestRunner) WithStdin() { func (t *cobraTestRunner) WithStdin() {
reader, writer := io.Pipe() reader, writer := io.Pipe()
t.stdinR = reader t.stdinR = reader