Don't merge-in remote resources during depolyments (#1432)

## Changes
`check_running_resources` now pulls the remote state without modifying
the bundle state, similar to how it was doing before. This avoids a
problem when we fail to compute deployment metadata for a deleted job
(which we shouldn't do in the first place)

`deploy_then_remove_resources_test` now also deploys and deletes a job
(in addition to a pipeline), which catches the error that this PR fixes.

## Tests
Unit and integ tests
This commit is contained in:
Ilia Babanov 2024-05-15 14:41:44 +02:00 committed by GitHub
parent 0a21428a48
commit 2035516fde
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 117 additions and 50 deletions

View File

@ -1,4 +1,4 @@
package deploy
package terraform
import (
"context"
@ -6,11 +6,11 @@ import (
"strconv"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
tfjson "github.com/hashicorp/terraform-json"
"golang.org/x/sync/errgroup"
)
@ -34,8 +34,14 @@ func (l *checkRunningResources) Apply(ctx context.Context, b *bundle.Bundle) dia
if !b.Config.Bundle.Deployment.FailOnActiveRuns {
return nil
}
state, err := ParseResourcesState(ctx, b)
if err != nil && state == nil {
return diag.FromErr(err)
}
w := b.WorkspaceClient()
err := checkAnyResourceRunning(ctx, w, &b.Config.Resources)
err = checkAnyResourceRunning(ctx, w, state)
if err != nil {
return diag.FromErr(err)
}
@ -46,43 +52,50 @@ func CheckRunningResource() *checkRunningResources {
return &checkRunningResources{}
}
func checkAnyResourceRunning(ctx context.Context, w *databricks.WorkspaceClient, resources *config.Resources) error {
errs, errCtx := errgroup.WithContext(ctx)
for _, job := range resources.Jobs {
id := job.ID
if id == "" {
continue
}
errs.Go(func() error {
isRunning, err := IsJobRunning(errCtx, w, id)
// If there's an error retrieving the job, we assume it's not running
if err != nil {
return err
}
if isRunning {
return &ErrResourceIsRunning{resourceType: "job", resourceId: id}
}
return nil
})
func checkAnyResourceRunning(ctx context.Context, w *databricks.WorkspaceClient, state *resourcesState) error {
if state == nil {
return nil
}
for _, pipeline := range resources.Pipelines {
id := pipeline.ID
if id == "" {
errs, errCtx := errgroup.WithContext(ctx)
for _, resource := range state.Resources {
if resource.Mode != tfjson.ManagedResourceMode {
continue
}
errs.Go(func() error {
isRunning, err := IsPipelineRunning(errCtx, w, id)
// If there's an error retrieving the pipeline, we assume it's not running
if err != nil {
return nil
for _, instance := range resource.Instances {
id := instance.Attributes.ID
if id == "" {
continue
}
if isRunning {
return &ErrResourceIsRunning{resourceType: "pipeline", resourceId: id}
switch resource.Type {
case "databricks_job":
errs.Go(func() error {
isRunning, err := IsJobRunning(errCtx, w, id)
// If there's an error retrieving the job, we assume it's not running
if err != nil {
return err
}
if isRunning {
return &ErrResourceIsRunning{resourceType: "job", resourceId: id}
}
return nil
})
case "databricks_pipeline":
errs.Go(func() error {
isRunning, err := IsPipelineRunning(errCtx, w, id)
// If there's an error retrieving the pipeline, we assume it's not running
if err != nil {
return nil
}
if isRunning {
return &ErrResourceIsRunning{resourceType: "pipeline", resourceId: id}
}
return nil
})
}
return nil
})
}
}
return errs.Wait()

View File

@ -1,12 +1,10 @@
package deploy
package terraform
import (
"context"
"errors"
"testing"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/experimental/mocks"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
@ -16,15 +14,22 @@ import (
func TestIsAnyResourceRunningWithEmptyState(t *testing.T) {
mock := mocks.NewMockWorkspaceClient(t)
err := checkAnyResourceRunning(context.Background(), mock.WorkspaceClient, &config.Resources{})
err := checkAnyResourceRunning(context.Background(), mock.WorkspaceClient, &resourcesState{})
require.NoError(t, err)
}
func TestIsAnyResourceRunningWithJob(t *testing.T) {
m := mocks.NewMockWorkspaceClient(t)
resources := &config.Resources{
Jobs: map[string]*resources.Job{
"job1": {ID: "123"},
resources := &resourcesState{
Resources: []stateResource{
{
Type: "databricks_job",
Mode: "managed",
Name: "job1",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "123"}},
},
},
},
}
@ -50,9 +55,16 @@ func TestIsAnyResourceRunningWithJob(t *testing.T) {
func TestIsAnyResourceRunningWithPipeline(t *testing.T) {
m := mocks.NewMockWorkspaceClient(t)
resources := &config.Resources{
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {ID: "123"},
resources := &resourcesState{
Resources: []stateResource{
{
Type: "databricks_pipeline",
Mode: "managed",
Name: "pipeline1",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "123"}},
},
},
},
}
@ -79,9 +91,16 @@ func TestIsAnyResourceRunningWithPipeline(t *testing.T) {
func TestIsAnyResourceRunningWithAPIFailure(t *testing.T) {
m := mocks.NewMockWorkspaceClient(t)
resources := &config.Resources{
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {ID: "123"},
resources := &resourcesState{
Resources: []stateResource{
{
Type: "databricks_pipeline",
Mode: "managed",
Name: "pipeline1",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "123"}},
},
},
},
}

View File

@ -36,8 +36,7 @@ func Deploy() bundle.Mutator {
permissions.ApplyWorkspaceRootPermissions(),
terraform.Interpolate(),
terraform.Write(),
terraform.Load(),
deploy.CheckRunningResource(),
terraform.CheckRunningResource(),
bundle.Defer(
terraform.Apply(),
bundle.Seq(

View File

@ -3,6 +3,14 @@
"unique_id": {
"type": "string",
"description": "Unique ID for pipeline name"
},
"spark_version": {
"type": "string",
"description": "Spark version used for job cluster"
},
"node_type_id": {
"type": "string",
"description": "Node type id for job cluster"
}
}
}

View File

@ -0,0 +1,2 @@
# Databricks notebook source
print("hello")

View File

@ -1,4 +1,15 @@
resources:
jobs:
foo:
name: test-bundle-job-{{.unique_id}}
tasks:
- task_key: my_notebook_task
new_cluster:
num_workers: 1
spark_version: "{{.spark_version}}"
node_type_id: "{{.node_type_id}}"
notebook_task:
notebook_path: "./bar.py"
pipelines:
bar:
name: test-bundle-pipeline-{{.unique_id}}

View File

@ -5,7 +5,9 @@ import (
"path/filepath"
"testing"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/internal/acc"
"github.com/databricks/cli/libs/env"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -15,9 +17,12 @@ func TestAccBundleDeployThenRemoveResources(t *testing.T) {
ctx, wt := acc.WorkspaceTest(t)
w := wt.W
nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
uniqueId := uuid.New().String()
bundleRoot, err := initTestTemplate(t, ctx, "deploy_then_remove_resources", map[string]any{
"unique_id": uniqueId,
"unique_id": uniqueId,
"node_type_id": nodeTypeId,
"spark_version": defaultSparkVersion,
})
require.NoError(t, err)
@ -31,6 +36,12 @@ func TestAccBundleDeployThenRemoveResources(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, pipeline.Name, pipelineName)
// assert job is created
jobName := "test-bundle-job-" + uniqueId
job, err := w.Jobs.GetBySettingsName(ctx, jobName)
require.NoError(t, err)
assert.Equal(t, job.Settings.Name, jobName)
// delete resources.yml
err = os.Remove(filepath.Join(bundleRoot, "resources.yml"))
require.NoError(t, err)
@ -43,6 +54,10 @@ func TestAccBundleDeployThenRemoveResources(t *testing.T) {
_, err = w.Pipelines.GetByName(ctx, pipelineName)
assert.ErrorContains(t, err, "does not exist")
// assert job is deleted
_, err = w.Jobs.GetBySettingsName(ctx, jobName)
assert.ErrorContains(t, err, "does not exist")
t.Cleanup(func() {
err = destroyBundle(t, ctx, bundleRoot)
require.NoError(t, err)