From 105a3dfc42a52f267c1c4dac493c18240fb8e78b Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 12 Aug 2024 16:00:20 +0200 Subject: [PATCH] Add prompt when a pipeline recreation happens --- bundle/phases/deploy.go | 87 +++++++++++++------ bundle/phases/deploy_test.go | 67 ++++++++++++++ cmd/root/progress_logger.go | 6 ++ .../databricks_template_schema.json | 8 ++ .../template/databricks.yml.tmpl | 25 ++++++ .../bundles/recreate_pipeline/template/nb.sql | 2 + internal/bundle/deploy_test.go | 38 +++++++- 7 files changed, 206 insertions(+), 27 deletions(-) create mode 100644 bundle/phases/deploy_test.go create mode 100644 internal/bundle/bundles/recreate_pipeline/databricks_template_schema.json create mode 100644 internal/bundle/bundles/recreate_pipeline/template/databricks.yml.tmpl create mode 100644 internal/bundle/bundles/recreate_pipeline/template/nb.sql diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 6929f74ba..91e3b9878 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -19,8 +19,37 @@ import ( "github.com/databricks/cli/bundle/scripts" "github.com/databricks/cli/libs/cmdio" terraformlib "github.com/databricks/cli/libs/terraform" + tfjson "github.com/hashicorp/terraform-json" ) +func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action { + res := make([]terraformlib.Action, 0) + for _, rc := range changes { + if !toInclude(rc.Type, rc.Change.Actions) { + continue + } + + var actionType terraformlib.ActionType + switch { + case rc.Change.Actions.Delete(): + actionType = terraformlib.ActionTypeDelete + case rc.Change.Actions.Replace(): + actionType = terraformlib.ActionTypeRecreate + default: + // No use case for other action types yet. + continue + } + + res = append(res, terraformlib.Action{ + Action: actionType, + ResourceType: rc.Type, + ResourceName: rc.Name, + }) + } + + return res +} + func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) { tf := b.Terraform if tf == nil { @@ -33,41 +62,47 @@ func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, err return false, err } - actions := make([]terraformlib.Action, 0) - for _, rc := range plan.ResourceChanges { - // We only care about destructive actions on UC schema resources. - if rc.Type != "databricks_schema" { - continue + schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool { + // Filter in only UC schema resources. + if typ != "databricks_schema" { + return false } - var actionType terraformlib.ActionType + // We only display prompts for destructive actions like deleting or + // recreating a schema. + return actions.Delete() || actions.Replace() + }) - switch { - case rc.Change.Actions.Delete(): - actionType = terraformlib.ActionTypeDelete - case rc.Change.Actions.Replace(): - actionType = terraformlib.ActionTypeRecreate - default: - // We don't need a prompt for non-destructive actions like creating - // or updating a schema. - continue + dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool { + // Filter in only DLT pipeline resources. + if typ != "databricks_pipeline" { + return false } - actions = append(actions, terraformlib.Action{ - Action: actionType, - ResourceType: rc.Type, - ResourceName: rc.Name, - }) - } + // Recreating DLT pipeline leads to metadata loss and for a transient period + // the underling tables will be unavailable. + return actions.Replace() + }) - // No restricted actions planned. No need for approval. - if len(actions) == 0 { + // We don't need to display any prompts in this case. + if len(dltActions) == 0 && len(schemaActions) == 0 { return true, nil } - cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:") - for _, action := range actions { - cmdio.Log(ctx, action) + // One or more UC schema resources will be deleted or recreated. + if len(schemaActions) != 0 { + cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:") + for _, action := range schemaActions { + cmdio.Log(ctx, action) + } + } + + // One or more DLT pipelines is being recreated. + if len(dltActions) != 0 { + cmdio.LogString(ctx, "The following DLT pipelines will be recreated. Underlying tables will be unavailable for a transient period, until the newly recreated pipeline is run once successfully. History of previous pipeline update runs will be lost as a result of the recreation:") + for _, action := range dltActions { + cmdio.Log(ctx, action) + } } if b.AutoApprove { diff --git a/bundle/phases/deploy_test.go b/bundle/phases/deploy_test.go new file mode 100644 index 000000000..e00370b38 --- /dev/null +++ b/bundle/phases/deploy_test.go @@ -0,0 +1,67 @@ +package phases + +import ( + "testing" + + terraformlib "github.com/databricks/cli/libs/terraform" + tfjson "github.com/hashicorp/terraform-json" + "github.com/stretchr/testify/assert" +) + +func TestParseTerraformActions(t *testing.T) { + changes := []*tfjson.ResourceChange{ + { + Type: "databricks_pipeline", + Change: &tfjson.Change{ + Actions: tfjson.Actions{tfjson.ActionCreate}, + }, + Name: "create pipeline", + }, + { + Type: "databricks_pipeline", + Change: &tfjson.Change{ + Actions: tfjson.Actions{tfjson.ActionDelete}, + }, + Name: "delete pipeline", + }, + { + Type: "databricks_pipeline", + Change: &tfjson.Change{ + Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate}, + }, + Name: "recreate pipeline", + }, + { + Type: "databricks_whatever", + Change: &tfjson.Change{ + Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate}, + }, + Name: "recreate whatever", + }, + } + + res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool { + if typ != "databricks_pipeline" { + return false + } + + if actions.Delete() || actions.Replace() { + return true + } + + return false + }) + + assert.Equal(t, []terraformlib.Action{ + { + Action: terraformlib.ActionTypeDelete, + ResourceType: "databricks_pipeline", + ResourceName: "delete pipeline", + }, + { + Action: terraformlib.ActionTypeRecreate, + ResourceType: "databricks_pipeline", + ResourceName: "recreate pipeline", + }, + }, res) +} diff --git a/cmd/root/progress_logger.go b/cmd/root/progress_logger.go index c05ecb043..7d6a1fa46 100644 --- a/cmd/root/progress_logger.go +++ b/cmd/root/progress_logger.go @@ -29,6 +29,12 @@ func (f *progressLoggerFlag) resolveModeDefault(format flags.ProgressLogFormat) } func (f *progressLoggerFlag) initializeContext(ctx context.Context) (context.Context, error) { + // No need to initialize the logger if it's already set in the context. This + // happens in unit tests where the logger is setup as a fixture. + if _, ok := cmdio.FromContext(ctx); ok { + return ctx, nil + } + if f.log.level.String() != "disabled" && f.log.file.String() == "stderr" && f.ProgressLogFormat == flags.ModeInplace { return nil, fmt.Errorf("inplace progress logging cannot be used when log-file is stderr") diff --git a/internal/bundle/bundles/recreate_pipeline/databricks_template_schema.json b/internal/bundle/bundles/recreate_pipeline/databricks_template_schema.json new file mode 100644 index 000000000..762f4470c --- /dev/null +++ b/internal/bundle/bundles/recreate_pipeline/databricks_template_schema.json @@ -0,0 +1,8 @@ +{ + "properties": { + "unique_id": { + "type": "string", + "description": "Unique ID for the schema and pipeline names" + } + } +} diff --git a/internal/bundle/bundles/recreate_pipeline/template/databricks.yml.tmpl b/internal/bundle/bundles/recreate_pipeline/template/databricks.yml.tmpl new file mode 100644 index 000000000..10350f13e --- /dev/null +++ b/internal/bundle/bundles/recreate_pipeline/template/databricks.yml.tmpl @@ -0,0 +1,25 @@ +bundle: + name: "bundle-playground" + +variables: + catalog: + description: The catalog the DLT pipeline should use. + default: main + + +resources: + pipelines: + foo: + name: test-pipeline-{{.unique_id}} + libraries: + - notebook: + path: ./nb.sql + development: true + catalog: ${var.catalog} + +include: + - "*.yml" + +targets: + development: + default: true diff --git a/internal/bundle/bundles/recreate_pipeline/template/nb.sql b/internal/bundle/bundles/recreate_pipeline/template/nb.sql new file mode 100644 index 000000000..199ff5078 --- /dev/null +++ b/internal/bundle/bundles/recreate_pipeline/template/nb.sql @@ -0,0 +1,2 @@ +-- Databricks notebook source +select 1 diff --git a/internal/bundle/deploy_test.go b/internal/bundle/deploy_test.go index 3da885705..6e4896939 100644 --- a/internal/bundle/deploy_test.go +++ b/internal/bundle/deploy_test.go @@ -119,7 +119,43 @@ func TestAccBundleDeployUcSchemaFailsWithoutAutoApprove(t *testing.T) { t.Setenv("BUNDLE_ROOT", bundleRoot) t.Setenv("TERM", "dumb") c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock") - stdout, _, err := c.Run() + stdout, stderr, err := c.Run() + assert.EqualError(t, err, root.ErrAlreadyPrinted.Error()) + assert.Contains(t, stderr.String(), "The following UC schemas will be deleted or recreated. Any underlying data may be lost:\n delete schema bar") + assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed") +} + +func TestAccBundlePipelineRecreateWithoutAutoApprove(t *testing.T) { + ctx, wt := acc.UcWorkspaceTest(t) + w := wt.W + uniqueId := uuid.New().String() + + bundleRoot, err := initTestTemplate(t, ctx, "recreate_pipeline", map[string]any{ + "unique_id": uniqueId, + }) + require.NoError(t, err) + + err = deployBundle(t, ctx, bundleRoot) + require.NoError(t, err) + + t.Cleanup(func() { + destroyBundle(t, ctx, bundleRoot) + }) + + // Assert the pipeline is created + pipelineName := "test-pipeline-" + uniqueId + pipeline, err := w.Pipelines.GetByName(ctx, pipelineName) + require.NoError(t, err) + require.Equal(t, pipelineName, pipeline.Name) + + // Redeploy the bundle, pointing the DLT pipeline to a different UC catalog. + t.Setenv("BUNDLE_ROOT", bundleRoot) + t.Setenv("TERM", "dumb") + c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock", "--var=\"catalog=whatever\"") + stdout, stderr, err := c.Run() + + assert.EqualError(t, err, root.ErrAlreadyPrinted.Error()) + assert.Contains(t, stderr.String(), "The following DLT pipelines will be recreated. Underlying tables will be unavailable for a transient period, until the newly recreated pipeline is run once successfully. History of previous pipeline update runs will be lost as a result of the recreation:\n recreate pipeline foo") assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed") }