From 6edab932337ce9d2bd76303b9375278e7933c222 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Wed, 7 Feb 2024 12:17:17 +0100 Subject: [PATCH] Added warning when trying to deploy bundle with `--fail-if-running` and running resources (#1163) ## Changes Deploying bundle when there are bundle resources running at the same time can be disruptive for jobs and pipelines in progress. With this change during deployment phase (before uploading any resources) if there is `--fail-if-running` specified DABs will check if there are any resources running and if so, will fail the deployment ## Tests Manual + add tests --- bundle/config/bundle.go | 6 +- bundle/config/deployment.go | 10 ++ bundle/deploy/check_running_resources.go | 143 ++++++++++++++++++ bundle/deploy/check_running_resources_test.go | 125 +++++++++++++++ bundle/deploy/lock/acquire.go | 4 +- bundle/deploy/lock/release.go | 2 +- bundle/phases/deploy.go | 4 +- cmd/bundle/deploy.go | 8 +- cmd/bundle/destroy.go | 2 +- 9 files changed, 295 insertions(+), 9 deletions(-) create mode 100644 bundle/config/deployment.go create mode 100644 bundle/deploy/check_running_resources.go create mode 100644 bundle/deploy/check_running_resources_test.go diff --git a/bundle/config/bundle.go b/bundle/config/bundle.go index 933e88bf..21278151 100644 --- a/bundle/config/bundle.go +++ b/bundle/config/bundle.go @@ -25,9 +25,6 @@ type Bundle struct { // For example, where to find the binary, which version to use, etc. Terraform *Terraform `json:"terraform,omitempty" bundle:"readonly"` - // Lock configures locking behavior on deployment. - Lock Lock `json:"lock" bundle:"readonly"` - // Force-override Git branch validation. Force bool `json:"force,omitempty" bundle:"readonly"` @@ -43,4 +40,7 @@ type Bundle struct { // Overrides the compute used for jobs and other supported assets. ComputeID string `json:"compute_id,omitempty"` + + // Deployment section specifies deployment related configuration for bundle + Deployment Deployment `json:"deployment"` } diff --git a/bundle/config/deployment.go b/bundle/config/deployment.go new file mode 100644 index 00000000..f89c7b3e --- /dev/null +++ b/bundle/config/deployment.go @@ -0,0 +1,10 @@ +package config + +type Deployment struct { + // FailOnActiveRuns specifies whether to fail the deployment if there are + // running jobs or pipelines in the workspace. Defaults to false. + FailOnActiveRuns bool `json:"fail_on_active_runs,omitempty"` + + // Lock configures locking behavior on deployment. + Lock Lock `json:"lock" bundle:"readonly"` +} diff --git a/bundle/deploy/check_running_resources.go b/bundle/deploy/check_running_resources.go new file mode 100644 index 00000000..deb7775c --- /dev/null +++ b/bundle/deploy/check_running_resources.go @@ -0,0 +1,143 @@ +package deploy + +import ( + "context" + "fmt" + "strconv" + + "github.com/databricks/cli/bundle" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/hashicorp/terraform-exec/tfexec" + tfjson "github.com/hashicorp/terraform-json" + "golang.org/x/sync/errgroup" +) + +type ErrResourceIsRunning struct { + resourceType string + resourceId string +} + +func (e ErrResourceIsRunning) Error() string { + return fmt.Sprintf("%s %s is running", e.resourceType, e.resourceId) +} + +type checkRunningResources struct { +} + +func (l *checkRunningResources) Name() string { + return "check-running-resources" +} + +func (l *checkRunningResources) Apply(ctx context.Context, b *bundle.Bundle) error { + if !b.Config.Bundle.Deployment.FailOnActiveRuns { + return nil + } + + tf := b.Terraform + if tf == nil { + return fmt.Errorf("terraform not initialized") + } + + err := tf.Init(ctx, tfexec.Upgrade(true)) + if err != nil { + return fmt.Errorf("terraform init: %w", err) + } + + state, err := b.Terraform.Show(ctx) + if err != nil { + return err + } + + err = checkAnyResourceRunning(ctx, b.WorkspaceClient(), state) + if err != nil { + return fmt.Errorf("deployment aborted, err: %w", err) + } + + return nil +} + +func CheckRunningResource() *checkRunningResources { + return &checkRunningResources{} +} + +func checkAnyResourceRunning(ctx context.Context, w *databricks.WorkspaceClient, state *tfjson.State) error { + if state.Values == nil || state.Values.RootModule == nil { + return nil + } + + errs, errCtx := errgroup.WithContext(ctx) + + for _, resource := range state.Values.RootModule.Resources { + // Limit to resources. + if resource.Mode != tfjson.ManagedResourceMode { + continue + } + + value, ok := resource.AttributeValues["id"] + if !ok { + continue + } + id, ok := value.(string) + if !ok { + continue + } + + switch resource.Type { + case "databricks_job": + errs.Go(func() error { + isRunning, err := IsJobRunning(errCtx, w, id) + // If there's an error retrieving the job, we assume it's not running + if err != nil { + return err + } + if isRunning { + return &ErrResourceIsRunning{resourceType: "job", resourceId: id} + } + return nil + }) + case "databricks_pipeline": + errs.Go(func() error { + isRunning, err := IsPipelineRunning(errCtx, w, id) + // If there's an error retrieving the pipeline, we assume it's not running + if err != nil { + return nil + } + if isRunning { + return &ErrResourceIsRunning{resourceType: "pipeline", resourceId: id} + } + return nil + }) + } + } + + return errs.Wait() +} + +func IsJobRunning(ctx context.Context, w *databricks.WorkspaceClient, jobId string) (bool, error) { + id, err := strconv.Atoi(jobId) + if err != nil { + return false, err + } + + runs, err := w.Jobs.ListRunsAll(ctx, jobs.ListRunsRequest{JobId: int64(id), ActiveOnly: true}) + if err != nil { + return false, err + } + + return len(runs) > 0, nil +} + +func IsPipelineRunning(ctx context.Context, w *databricks.WorkspaceClient, pipelineId string) (bool, error) { + resp, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{PipelineId: pipelineId}) + if err != nil { + return false, err + } + switch resp.State { + case pipelines.PipelineStateIdle, pipelines.PipelineStateFailed, pipelines.PipelineStateDeleted: + return false, nil + default: + return true, nil + } +} diff --git a/bundle/deploy/check_running_resources_test.go b/bundle/deploy/check_running_resources_test.go new file mode 100644 index 00000000..7dc1fb86 --- /dev/null +++ b/bundle/deploy/check_running_resources_test.go @@ -0,0 +1,125 @@ +package deploy + +import ( + "context" + "errors" + "testing" + + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + tfjson "github.com/hashicorp/terraform-json" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestIsAnyResourceRunningWithEmptyState(t *testing.T) { + mock := mocks.NewMockWorkspaceClient(t) + state := &tfjson.State{} + err := checkAnyResourceRunning(context.Background(), mock.WorkspaceClient, state) + require.NoError(t, err) +} + +func TestIsAnyResourceRunningWithJob(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + state := &tfjson.State{ + Values: &tfjson.StateValues{ + RootModule: &tfjson.StateModule{ + Resources: []*tfjson.StateResource{ + { + Type: "databricks_job", + AttributeValues: map[string]interface{}{ + "id": "123", + }, + Mode: tfjson.ManagedResourceMode, + }, + }, + }, + }, + } + + jobsApi := m.GetMockJobsAPI() + jobsApi.EXPECT().ListRunsAll(mock.Anything, jobs.ListRunsRequest{ + JobId: 123, + ActiveOnly: true, + }).Return([]jobs.BaseRun{ + {RunId: 1234}, + }, nil).Once() + + err := checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state) + require.ErrorContains(t, err, "job 123 is running") + + jobsApi.EXPECT().ListRunsAll(mock.Anything, jobs.ListRunsRequest{ + JobId: 123, + ActiveOnly: true, + }).Return([]jobs.BaseRun{}, nil).Once() + + err = checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state) + require.NoError(t, err) +} + +func TestIsAnyResourceRunningWithPipeline(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + state := &tfjson.State{ + Values: &tfjson.StateValues{ + RootModule: &tfjson.StateModule{ + Resources: []*tfjson.StateResource{ + { + Type: "databricks_pipeline", + AttributeValues: map[string]interface{}{ + "id": "123", + }, + Mode: tfjson.ManagedResourceMode, + }, + }, + }, + }, + } + + pipelineApi := m.GetMockPipelinesAPI() + pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "123", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "123", + State: pipelines.PipelineStateRunning, + }, nil).Once() + + err := checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state) + require.ErrorContains(t, err, "pipeline 123 is running") + + pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "123", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "123", + State: pipelines.PipelineStateIdle, + }, nil).Once() + err = checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state) + require.NoError(t, err) +} + +func TestIsAnyResourceRunningWithAPIFailure(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + state := &tfjson.State{ + Values: &tfjson.StateValues{ + RootModule: &tfjson.StateModule{ + Resources: []*tfjson.StateResource{ + { + Type: "databricks_pipeline", + AttributeValues: map[string]interface{}{ + "id": "123", + }, + Mode: tfjson.ManagedResourceMode, + }, + }, + }, + }, + } + + pipelineApi := m.GetMockPipelinesAPI() + pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "123", + }).Return(nil, errors.New("API failure")).Once() + + err := checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state) + require.NoError(t, err) +} diff --git a/bundle/deploy/lock/acquire.go b/bundle/deploy/lock/acquire.go index 1335f780..69e6663f 100644 --- a/bundle/deploy/lock/acquire.go +++ b/bundle/deploy/lock/acquire.go @@ -35,7 +35,7 @@ func (m *acquire) init(b *bundle.Bundle) error { func (m *acquire) Apply(ctx context.Context, b *bundle.Bundle) error { // Return early if locking is disabled. - if !b.Config.Bundle.Lock.IsEnabled() { + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { log.Infof(ctx, "Skipping; locking is disabled") return nil } @@ -45,7 +45,7 @@ func (m *acquire) Apply(ctx context.Context, b *bundle.Bundle) error { return err } - force := b.Config.Bundle.Lock.Force + force := b.Config.Bundle.Deployment.Lock.Force log.Infof(ctx, "Acquiring deployment lock (force: %v)", force) err = b.Locker.Lock(ctx, force) if err != nil { diff --git a/bundle/deploy/lock/release.go b/bundle/deploy/lock/release.go index 52d27194..68d4e0f9 100644 --- a/bundle/deploy/lock/release.go +++ b/bundle/deploy/lock/release.go @@ -30,7 +30,7 @@ func (m *release) Name() string { func (m *release) Apply(ctx context.Context, b *bundle.Bundle) error { // Return early if locking is disabled. - if !b.Config.Bundle.Lock.IsEnabled() { + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { log.Infof(ctx, "Skipping; locking is disabled") return nil } diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 20fe2e41..5c657550 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -5,6 +5,7 @@ import ( "github.com/databricks/cli/bundle/artifacts" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/deploy" "github.com/databricks/cli/bundle/deploy/files" "github.com/databricks/cli/bundle/deploy/lock" "github.com/databricks/cli/bundle/deploy/metadata" @@ -22,6 +23,8 @@ func Deploy() bundle.Mutator { lock.Acquire(), bundle.Defer( bundle.Seq( + terraform.StatePull(), + deploy.CheckRunningResource(), mutator.ValidateGitDetails(), libraries.MatchWithArtifacts(), artifacts.CleanUp(), @@ -31,7 +34,6 @@ func Deploy() bundle.Mutator { permissions.ApplyWorkspaceRootPermissions(), terraform.Interpolate(), terraform.Write(), - terraform.StatePull(), bundle.Defer( terraform.Apply(), bundle.Seq( diff --git a/cmd/bundle/deploy.go b/cmd/bundle/deploy.go index 8818bbbf..a83c268b 100644 --- a/cmd/bundle/deploy.go +++ b/cmd/bundle/deploy.go @@ -15,18 +15,24 @@ func newDeployCommand() *cobra.Command { var force bool var forceLock bool + var failOnActiveRuns bool var computeID string cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.") cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.") + cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.") cmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.") cmd.RunE = func(cmd *cobra.Command, args []string) error { b := bundle.Get(cmd.Context()) b.Config.Bundle.Force = force - b.Config.Bundle.Lock.Force = forceLock + b.Config.Bundle.Deployment.Lock.Force = forceLock b.Config.Bundle.ComputeID = computeID + if cmd.Flag("fail-on-active-runs").Changed { + b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns + } + return bundle.Apply(cmd.Context(), b, bundle.Seq( phases.Initialize(), phases.Build(), diff --git a/cmd/bundle/destroy.go b/cmd/bundle/destroy.go index 22d998ab..dad199bf 100644 --- a/cmd/bundle/destroy.go +++ b/cmd/bundle/destroy.go @@ -30,7 +30,7 @@ func newDestroyCommand() *cobra.Command { b := bundle.Get(ctx) // If `--force-lock` is specified, force acquisition of the deployment lock. - b.Config.Bundle.Lock.Force = forceDestroy + b.Config.Bundle.Deployment.Lock.Force = forceDestroy // If `--auto-approve`` is specified, we skip confirmation checks b.AutoApprove = autoApprove