diff --git a/bundle/config/bundle.go b/bundle/config/bundle.go index 933e88bf..21278151 100644 --- a/bundle/config/bundle.go +++ b/bundle/config/bundle.go @@ -25,9 +25,6 @@ type Bundle struct { // For example, where to find the binary, which version to use, etc. Terraform *Terraform `json:"terraform,omitempty" bundle:"readonly"` - // Lock configures locking behavior on deployment. - Lock Lock `json:"lock" bundle:"readonly"` - // Force-override Git branch validation. Force bool `json:"force,omitempty" bundle:"readonly"` @@ -43,4 +40,7 @@ type Bundle struct { // Overrides the compute used for jobs and other supported assets. ComputeID string `json:"compute_id,omitempty"` + + // Deployment section specifies deployment related configuration for bundle + Deployment Deployment `json:"deployment"` } diff --git a/bundle/config/deployment.go b/bundle/config/deployment.go new file mode 100644 index 00000000..f89c7b3e --- /dev/null +++ b/bundle/config/deployment.go @@ -0,0 +1,10 @@ +package config + +type Deployment struct { + // FailOnActiveRuns specifies whether to fail the deployment if there are + // running jobs or pipelines in the workspace. Defaults to false. + FailOnActiveRuns bool `json:"fail_on_active_runs,omitempty"` + + // Lock configures locking behavior on deployment. + Lock Lock `json:"lock" bundle:"readonly"` +} diff --git a/bundle/deploy/check_running_resources.go b/bundle/deploy/check_running_resources.go new file mode 100644 index 00000000..deb7775c --- /dev/null +++ b/bundle/deploy/check_running_resources.go @@ -0,0 +1,143 @@ +package deploy + +import ( + "context" + "fmt" + "strconv" + + "github.com/databricks/cli/bundle" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/hashicorp/terraform-exec/tfexec" + tfjson "github.com/hashicorp/terraform-json" + "golang.org/x/sync/errgroup" +) + +type ErrResourceIsRunning struct { + resourceType string + resourceId string +} + +func (e ErrResourceIsRunning) Error() string { + return fmt.Sprintf("%s %s is running", e.resourceType, e.resourceId) +} + +type checkRunningResources struct { +} + +func (l *checkRunningResources) Name() string { + return "check-running-resources" +} + +func (l *checkRunningResources) Apply(ctx context.Context, b *bundle.Bundle) error { + if !b.Config.Bundle.Deployment.FailOnActiveRuns { + return nil + } + + tf := b.Terraform + if tf == nil { + return fmt.Errorf("terraform not initialized") + } + + err := tf.Init(ctx, tfexec.Upgrade(true)) + if err != nil { + return fmt.Errorf("terraform init: %w", err) + } + + state, err := b.Terraform.Show(ctx) + if err != nil { + return err + } + + err = checkAnyResourceRunning(ctx, b.WorkspaceClient(), state) + if err != nil { + return fmt.Errorf("deployment aborted, err: %w", err) + } + + return nil +} + +func CheckRunningResource() *checkRunningResources { + return &checkRunningResources{} +} + +func checkAnyResourceRunning(ctx context.Context, w *databricks.WorkspaceClient, state *tfjson.State) error { + if state.Values == nil || state.Values.RootModule == nil { + return nil + } + + errs, errCtx := errgroup.WithContext(ctx) + + for _, resource := range state.Values.RootModule.Resources { + // Limit to resources. + if resource.Mode != tfjson.ManagedResourceMode { + continue + } + + value, ok := resource.AttributeValues["id"] + if !ok { + continue + } + id, ok := value.(string) + if !ok { + continue + } + + switch resource.Type { + case "databricks_job": + errs.Go(func() error { + isRunning, err := IsJobRunning(errCtx, w, id) + // If there's an error retrieving the job, we assume it's not running + if err != nil { + return err + } + if isRunning { + return &ErrResourceIsRunning{resourceType: "job", resourceId: id} + } + return nil + }) + case "databricks_pipeline": + errs.Go(func() error { + isRunning, err := IsPipelineRunning(errCtx, w, id) + // If there's an error retrieving the pipeline, we assume it's not running + if err != nil { + return nil + } + if isRunning { + return &ErrResourceIsRunning{resourceType: "pipeline", resourceId: id} + } + return nil + }) + } + } + + return errs.Wait() +} + +func IsJobRunning(ctx context.Context, w *databricks.WorkspaceClient, jobId string) (bool, error) { + id, err := strconv.Atoi(jobId) + if err != nil { + return false, err + } + + runs, err := w.Jobs.ListRunsAll(ctx, jobs.ListRunsRequest{JobId: int64(id), ActiveOnly: true}) + if err != nil { + return false, err + } + + return len(runs) > 0, nil +} + +func IsPipelineRunning(ctx context.Context, w *databricks.WorkspaceClient, pipelineId string) (bool, error) { + resp, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{PipelineId: pipelineId}) + if err != nil { + return false, err + } + switch resp.State { + case pipelines.PipelineStateIdle, pipelines.PipelineStateFailed, pipelines.PipelineStateDeleted: + return false, nil + default: + return true, nil + } +} diff --git a/bundle/deploy/check_running_resources_test.go b/bundle/deploy/check_running_resources_test.go new file mode 100644 index 00000000..7dc1fb86 --- /dev/null +++ b/bundle/deploy/check_running_resources_test.go @@ -0,0 +1,125 @@ +package deploy + +import ( + "context" + "errors" + "testing" + + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + tfjson "github.com/hashicorp/terraform-json" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestIsAnyResourceRunningWithEmptyState(t *testing.T) { + mock := mocks.NewMockWorkspaceClient(t) + state := &tfjson.State{} + err := checkAnyResourceRunning(context.Background(), mock.WorkspaceClient, state) + require.NoError(t, err) +} + +func TestIsAnyResourceRunningWithJob(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + state := &tfjson.State{ + Values: &tfjson.StateValues{ + RootModule: &tfjson.StateModule{ + Resources: []*tfjson.StateResource{ + { + Type: "databricks_job", + AttributeValues: map[string]interface{}{ + "id": "123", + }, + Mode: tfjson.ManagedResourceMode, + }, + }, + }, + }, + } + + jobsApi := m.GetMockJobsAPI() + jobsApi.EXPECT().ListRunsAll(mock.Anything, jobs.ListRunsRequest{ + JobId: 123, + ActiveOnly: true, + }).Return([]jobs.BaseRun{ + {RunId: 1234}, + }, nil).Once() + + err := checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state) + require.ErrorContains(t, err, "job 123 is running") + + jobsApi.EXPECT().ListRunsAll(mock.Anything, jobs.ListRunsRequest{ + JobId: 123, + ActiveOnly: true, + }).Return([]jobs.BaseRun{}, nil).Once() + + err = checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state) + require.NoError(t, err) +} + +func TestIsAnyResourceRunningWithPipeline(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + state := &tfjson.State{ + Values: &tfjson.StateValues{ + RootModule: &tfjson.StateModule{ + Resources: []*tfjson.StateResource{ + { + Type: "databricks_pipeline", + AttributeValues: map[string]interface{}{ + "id": "123", + }, + Mode: tfjson.ManagedResourceMode, + }, + }, + }, + }, + } + + pipelineApi := m.GetMockPipelinesAPI() + pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "123", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "123", + State: pipelines.PipelineStateRunning, + }, nil).Once() + + err := checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state) + require.ErrorContains(t, err, "pipeline 123 is running") + + pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "123", + }).Return(&pipelines.GetPipelineResponse{ + PipelineId: "123", + State: pipelines.PipelineStateIdle, + }, nil).Once() + err = checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state) + require.NoError(t, err) +} + +func TestIsAnyResourceRunningWithAPIFailure(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + state := &tfjson.State{ + Values: &tfjson.StateValues{ + RootModule: &tfjson.StateModule{ + Resources: []*tfjson.StateResource{ + { + Type: "databricks_pipeline", + AttributeValues: map[string]interface{}{ + "id": "123", + }, + Mode: tfjson.ManagedResourceMode, + }, + }, + }, + }, + } + + pipelineApi := m.GetMockPipelinesAPI() + pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{ + PipelineId: "123", + }).Return(nil, errors.New("API failure")).Once() + + err := checkAnyResourceRunning(context.Background(), m.WorkspaceClient, state) + require.NoError(t, err) +} diff --git a/bundle/deploy/lock/acquire.go b/bundle/deploy/lock/acquire.go index 1335f780..69e6663f 100644 --- a/bundle/deploy/lock/acquire.go +++ b/bundle/deploy/lock/acquire.go @@ -35,7 +35,7 @@ func (m *acquire) init(b *bundle.Bundle) error { func (m *acquire) Apply(ctx context.Context, b *bundle.Bundle) error { // Return early if locking is disabled. - if !b.Config.Bundle.Lock.IsEnabled() { + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { log.Infof(ctx, "Skipping; locking is disabled") return nil } @@ -45,7 +45,7 @@ func (m *acquire) Apply(ctx context.Context, b *bundle.Bundle) error { return err } - force := b.Config.Bundle.Lock.Force + force := b.Config.Bundle.Deployment.Lock.Force log.Infof(ctx, "Acquiring deployment lock (force: %v)", force) err = b.Locker.Lock(ctx, force) if err != nil { diff --git a/bundle/deploy/lock/release.go b/bundle/deploy/lock/release.go index 52d27194..68d4e0f9 100644 --- a/bundle/deploy/lock/release.go +++ b/bundle/deploy/lock/release.go @@ -30,7 +30,7 @@ func (m *release) Name() string { func (m *release) Apply(ctx context.Context, b *bundle.Bundle) error { // Return early if locking is disabled. - if !b.Config.Bundle.Lock.IsEnabled() { + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { log.Infof(ctx, "Skipping; locking is disabled") return nil } diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 20fe2e41..5c657550 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -5,6 +5,7 @@ import ( "github.com/databricks/cli/bundle/artifacts" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/deploy" "github.com/databricks/cli/bundle/deploy/files" "github.com/databricks/cli/bundle/deploy/lock" "github.com/databricks/cli/bundle/deploy/metadata" @@ -22,6 +23,8 @@ func Deploy() bundle.Mutator { lock.Acquire(), bundle.Defer( bundle.Seq( + terraform.StatePull(), + deploy.CheckRunningResource(), mutator.ValidateGitDetails(), libraries.MatchWithArtifacts(), artifacts.CleanUp(), @@ -31,7 +34,6 @@ func Deploy() bundle.Mutator { permissions.ApplyWorkspaceRootPermissions(), terraform.Interpolate(), terraform.Write(), - terraform.StatePull(), bundle.Defer( terraform.Apply(), bundle.Seq( diff --git a/cmd/bundle/deploy.go b/cmd/bundle/deploy.go index 8818bbbf..a83c268b 100644 --- a/cmd/bundle/deploy.go +++ b/cmd/bundle/deploy.go @@ -15,18 +15,24 @@ func newDeployCommand() *cobra.Command { var force bool var forceLock bool + var failOnActiveRuns bool var computeID string cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.") cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.") + cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.") cmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.") cmd.RunE = func(cmd *cobra.Command, args []string) error { b := bundle.Get(cmd.Context()) b.Config.Bundle.Force = force - b.Config.Bundle.Lock.Force = forceLock + b.Config.Bundle.Deployment.Lock.Force = forceLock b.Config.Bundle.ComputeID = computeID + if cmd.Flag("fail-on-active-runs").Changed { + b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns + } + return bundle.Apply(cmd.Context(), b, bundle.Seq( phases.Initialize(), phases.Build(), diff --git a/cmd/bundle/destroy.go b/cmd/bundle/destroy.go index 22d998ab..dad199bf 100644 --- a/cmd/bundle/destroy.go +++ b/cmd/bundle/destroy.go @@ -30,7 +30,7 @@ func newDestroyCommand() *cobra.Command { b := bundle.Get(ctx) // If `--force-lock` is specified, force acquisition of the deployment lock. - b.Config.Bundle.Lock.Force = forceDestroy + b.Config.Bundle.Deployment.Lock.Force = forceDestroy // If `--auto-approve`` is specified, we skip confirmation checks b.AutoApprove = autoApprove