From 2035516fde7f55c1cd424b9df5a76bfab5b7da4a Mon Sep 17 00:00:00 2001 From: Ilia Babanov Date: Wed, 15 May 2024 14:41:44 +0200 Subject: [PATCH] Don't merge-in remote resources during depolyments (#1432) ## Changes `check_running_resources` now pulls the remote state without modifying the bundle state, similar to how it was doing before. This avoids a problem when we fail to compute deployment metadata for a deleted job (which we shouldn't do in the first place) `deploy_then_remove_resources_test` now also deploys and deletes a job (in addition to a pipeline), which catches the error that this PR fixes. ## Tests Unit and integ tests --- .../check_running_resources.go | 81 +++++++++++-------- .../check_running_resources_test.go | 45 ++++++++--- bundle/phases/deploy.go | 3 +- .../databricks_template_schema.json | 8 ++ .../template/bar.py | 2 + .../template/resources.yml.tmpl | 11 +++ .../deploy_then_remove_resources_test.go | 17 +++- 7 files changed, 117 insertions(+), 50 deletions(-) rename bundle/deploy/{ => terraform}/check_running_resources.go (60%) rename bundle/deploy/{ => terraform}/check_running_resources_test.go (75%) create mode 100644 internal/bundle/bundles/deploy_then_remove_resources/template/bar.py diff --git a/bundle/deploy/check_running_resources.go b/bundle/deploy/terraform/check_running_resources.go similarity index 60% rename from bundle/deploy/check_running_resources.go rename to bundle/deploy/terraform/check_running_resources.go index a2305cd7..737f773e 100644 --- a/bundle/deploy/check_running_resources.go +++ b/bundle/deploy/terraform/check_running_resources.go @@ -1,4 +1,4 @@ -package deploy +package terraform import ( "context" @@ -6,11 +6,11 @@ import ( "strconv" "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/libs/diag" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/pipelines" + tfjson "github.com/hashicorp/terraform-json" "golang.org/x/sync/errgroup" ) @@ -34,8 +34,14 @@ func (l *checkRunningResources) Apply(ctx context.Context, b *bundle.Bundle) dia if !b.Config.Bundle.Deployment.FailOnActiveRuns { return nil } + + state, err := ParseResourcesState(ctx, b) + if err != nil && state == nil { + return diag.FromErr(err) + } + w := b.WorkspaceClient() - err := checkAnyResourceRunning(ctx, w, &b.Config.Resources) + err = checkAnyResourceRunning(ctx, w, state) if err != nil { return diag.FromErr(err) } @@ -46,43 +52,50 @@ func CheckRunningResource() *checkRunningResources { return &checkRunningResources{} } -func checkAnyResourceRunning(ctx context.Context, w *databricks.WorkspaceClient, resources *config.Resources) error { - errs, errCtx := errgroup.WithContext(ctx) - - for _, job := range resources.Jobs { - id := job.ID - if id == "" { - continue - } - errs.Go(func() error { - isRunning, err := IsJobRunning(errCtx, w, id) - // If there's an error retrieving the job, we assume it's not running - if err != nil { - return err - } - if isRunning { - return &ErrResourceIsRunning{resourceType: "job", resourceId: id} - } - return nil - }) +func checkAnyResourceRunning(ctx context.Context, w *databricks.WorkspaceClient, state *resourcesState) error { + if state == nil { + return nil } - for _, pipeline := range resources.Pipelines { - id := pipeline.ID - if id == "" { + errs, errCtx := errgroup.WithContext(ctx) + + for _, resource := range state.Resources { + if resource.Mode != tfjson.ManagedResourceMode { continue } - errs.Go(func() error { - isRunning, err := IsPipelineRunning(errCtx, w, id) - // If there's an error retrieving the pipeline, we assume it's not running - if err != nil { - return nil + for _, instance := range resource.Instances { + id := instance.Attributes.ID + if id == "" { + continue } - if isRunning { - return &ErrResourceIsRunning{resourceType: "pipeline", resourceId: id} + + switch resource.Type { + case "databricks_job": + errs.Go(func() error { + isRunning, err := IsJobRunning(errCtx, w, id) + // If there's an error retrieving the job, we assume it's not running + if err != nil { + return err + } + if isRunning { + return &ErrResourceIsRunning{resourceType: "job", resourceId: id} + } + return nil + }) + case "databricks_pipeline": + errs.Go(func() error { + isRunning, err := IsPipelineRunning(errCtx, w, id) + // If there's an error retrieving the pipeline, we assume it's not running + if err != nil { + return nil + } + if isRunning { + return &ErrResourceIsRunning{resourceType: "pipeline", resourceId: id} + } + return nil + }) } - return nil - }) + } } return errs.Wait() diff --git a/bundle/deploy/check_running_resources_test.go b/bundle/deploy/terraform/check_running_resources_test.go similarity index 75% rename from bundle/deploy/check_running_resources_test.go rename to bundle/deploy/terraform/check_running_resources_test.go index d61c80fc..a1bbbd37 100644 --- a/bundle/deploy/check_running_resources_test.go +++ b/bundle/deploy/terraform/check_running_resources_test.go @@ -1,12 +1,10 @@ -package deploy +package terraform import ( "context" "errors" "testing" - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/config/resources" "github.com/databricks/databricks-sdk-go/experimental/mocks" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/pipelines" @@ -16,15 +14,22 @@ import ( func TestIsAnyResourceRunningWithEmptyState(t *testing.T) { mock := mocks.NewMockWorkspaceClient(t) - err := checkAnyResourceRunning(context.Background(), mock.WorkspaceClient, &config.Resources{}) + err := checkAnyResourceRunning(context.Background(), mock.WorkspaceClient, &resourcesState{}) require.NoError(t, err) } func TestIsAnyResourceRunningWithJob(t *testing.T) { m := mocks.NewMockWorkspaceClient(t) - resources := &config.Resources{ - Jobs: map[string]*resources.Job{ - "job1": {ID: "123"}, + resources := &resourcesState{ + Resources: []stateResource{ + { + Type: "databricks_job", + Mode: "managed", + Name: "job1", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "123"}}, + }, + }, }, } @@ -50,9 +55,16 @@ func TestIsAnyResourceRunningWithJob(t *testing.T) { func TestIsAnyResourceRunningWithPipeline(t *testing.T) { m := mocks.NewMockWorkspaceClient(t) - resources := &config.Resources{ - Pipelines: map[string]*resources.Pipeline{ - "pipeline1": {ID: "123"}, + resources := &resourcesState{ + Resources: []stateResource{ + { + Type: "databricks_pipeline", + Mode: "managed", + Name: "pipeline1", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "123"}}, + }, + }, }, } @@ -79,9 +91,16 @@ func TestIsAnyResourceRunningWithPipeline(t *testing.T) { func TestIsAnyResourceRunningWithAPIFailure(t *testing.T) { m := mocks.NewMockWorkspaceClient(t) - resources := &config.Resources{ - Pipelines: map[string]*resources.Pipeline{ - "pipeline1": {ID: "123"}, + resources := &resourcesState{ + Resources: []stateResource{ + { + Type: "databricks_pipeline", + Mode: "managed", + Name: "pipeline1", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "123"}}, + }, + }, }, } diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 4fc4f630..46c38918 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -36,8 +36,7 @@ func Deploy() bundle.Mutator { permissions.ApplyWorkspaceRootPermissions(), terraform.Interpolate(), terraform.Write(), - terraform.Load(), - deploy.CheckRunningResource(), + terraform.CheckRunningResource(), bundle.Defer( terraform.Apply(), bundle.Seq( diff --git a/internal/bundle/bundles/deploy_then_remove_resources/databricks_template_schema.json b/internal/bundle/bundles/deploy_then_remove_resources/databricks_template_schema.json index 8fca7a7c..f03ad1c2 100644 --- a/internal/bundle/bundles/deploy_then_remove_resources/databricks_template_schema.json +++ b/internal/bundle/bundles/deploy_then_remove_resources/databricks_template_schema.json @@ -3,6 +3,14 @@ "unique_id": { "type": "string", "description": "Unique ID for pipeline name" + }, + "spark_version": { + "type": "string", + "description": "Spark version used for job cluster" + }, + "node_type_id": { + "type": "string", + "description": "Node type id for job cluster" } } } diff --git a/internal/bundle/bundles/deploy_then_remove_resources/template/bar.py b/internal/bundle/bundles/deploy_then_remove_resources/template/bar.py new file mode 100644 index 00000000..4914a743 --- /dev/null +++ b/internal/bundle/bundles/deploy_then_remove_resources/template/bar.py @@ -0,0 +1,2 @@ +# Databricks notebook source +print("hello") diff --git a/internal/bundle/bundles/deploy_then_remove_resources/template/resources.yml.tmpl b/internal/bundle/bundles/deploy_then_remove_resources/template/resources.yml.tmpl index e3a67677..f3be9aaf 100644 --- a/internal/bundle/bundles/deploy_then_remove_resources/template/resources.yml.tmpl +++ b/internal/bundle/bundles/deploy_then_remove_resources/template/resources.yml.tmpl @@ -1,4 +1,15 @@ resources: + jobs: + foo: + name: test-bundle-job-{{.unique_id}} + tasks: + - task_key: my_notebook_task + new_cluster: + num_workers: 1 + spark_version: "{{.spark_version}}" + node_type_id: "{{.node_type_id}}" + notebook_task: + notebook_path: "./bar.py" pipelines: bar: name: test-bundle-pipeline-{{.unique_id}} diff --git a/internal/bundle/deploy_then_remove_resources_test.go b/internal/bundle/deploy_then_remove_resources_test.go index 72baf798..66ec5c16 100644 --- a/internal/bundle/deploy_then_remove_resources_test.go +++ b/internal/bundle/deploy_then_remove_resources_test.go @@ -5,7 +5,9 @@ import ( "path/filepath" "testing" + "github.com/databricks/cli/internal" "github.com/databricks/cli/internal/acc" + "github.com/databricks/cli/libs/env" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -15,9 +17,12 @@ func TestAccBundleDeployThenRemoveResources(t *testing.T) { ctx, wt := acc.WorkspaceTest(t) w := wt.W + nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV")) uniqueId := uuid.New().String() bundleRoot, err := initTestTemplate(t, ctx, "deploy_then_remove_resources", map[string]any{ - "unique_id": uniqueId, + "unique_id": uniqueId, + "node_type_id": nodeTypeId, + "spark_version": defaultSparkVersion, }) require.NoError(t, err) @@ -31,6 +36,12 @@ func TestAccBundleDeployThenRemoveResources(t *testing.T) { require.NoError(t, err) assert.Equal(t, pipeline.Name, pipelineName) + // assert job is created + jobName := "test-bundle-job-" + uniqueId + job, err := w.Jobs.GetBySettingsName(ctx, jobName) + require.NoError(t, err) + assert.Equal(t, job.Settings.Name, jobName) + // delete resources.yml err = os.Remove(filepath.Join(bundleRoot, "resources.yml")) require.NoError(t, err) @@ -43,6 +54,10 @@ func TestAccBundleDeployThenRemoveResources(t *testing.T) { _, err = w.Pipelines.GetByName(ctx, pipelineName) assert.ErrorContains(t, err, "does not exist") + // assert job is deleted + _, err = w.Jobs.GetBySettingsName(ctx, jobName) + assert.ErrorContains(t, err, "does not exist") + t.Cleanup(func() { err = destroyBundle(t, ctx, bundleRoot) require.NoError(t, err)