Add prompt when a pipeline recreation happens (#1672)

## Changes
DLT pipeline recreations are destructive. They can lead to lost history
of previous updates, outage of the tables temporarily and are
potentially computationally expensive. Thus we make a breaking change
where a prompt is shown to the user if there configuration changes will
lead to a DLT recreation.

Users can skip the prompt by specifying  the `--auto-approve` flag.

This PR also fixes an issue with our test runner where logs from the
cmdio.Logger would not get propagated to the reader returned by our
cobra test runner.

## Tests
Manually, and new unit and integration tests.

```
➜  bundle-playground-3 cli bundle deploy
Uploading bundle files to /Users/63ec021d-b0c6-49c0-93a0-5123953a1cb2/.bundle/test/development/files...
The following DLT pipelines will be recreated. Underlying tables will be unavailable for a transient period until the newly recreated pipelines are run once successfully. History of previous pipeline update runs will be lost because of recreation:
  recreate pipeline foo

Would you like to proceed? [y/n]: n
Deployment cancelled!
```
This commit is contained in:
shreyas-goenka 2024-09-04 16:41:47 +05:30 committed by GitHub
parent 35cdf4010d
commit a27c24a397
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 260 additions and 29 deletions

View File

@ -19,9 +19,38 @@ import (
"github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio"
terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
)
func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) {
func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action {
res := make([]terraformlib.Action, 0)
for _, rc := range changes {
if !toInclude(rc.Type, rc.Change.Actions) {
continue
}
var actionType terraformlib.ActionType
switch {
case rc.Change.Actions.Delete():
actionType = terraformlib.ActionTypeDelete
case rc.Change.Actions.Replace():
actionType = terraformlib.ActionTypeRecreate
default:
// No use case for other action types yet.
continue
}
res = append(res, terraformlib.Action{
Action: actionType,
ResourceType: rc.Type,
ResourceName: rc.Name,
})
}
return res
}
func approvalForDeploy(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform
if tf == nil {
return false, fmt.Errorf("terraform not initialized")
@ -33,41 +62,52 @@ func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, err
return false, err
}
actions := make([]terraformlib.Action, 0)
for _, rc := range plan.ResourceChanges {
// We only care about destructive actions on UC schema resources.
if rc.Type != "databricks_schema" {
continue
schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only UC schema resources.
if typ != "databricks_schema" {
return false
}
var actionType terraformlib.ActionType
// We only display prompts for destructive actions like deleting or
// recreating a schema.
return actions.Delete() || actions.Replace()
})
switch {
case rc.Change.Actions.Delete():
actionType = terraformlib.ActionTypeDelete
case rc.Change.Actions.Replace():
actionType = terraformlib.ActionTypeRecreate
default:
// We don't need a prompt for non-destructive actions like creating
// or updating a schema.
continue
dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only DLT pipeline resources.
if typ != "databricks_pipeline" {
return false
}
actions = append(actions, terraformlib.Action{
Action: actionType,
ResourceType: rc.Type,
ResourceName: rc.Name,
})
}
// Recreating DLT pipeline leads to metadata loss and for a transient period
// the underling tables will be unavailable.
return actions.Replace() || actions.Delete()
})
// No restricted actions planned. No need for approval.
if len(actions) == 0 {
// We don't need to display any prompts in this case.
if len(dltActions) == 0 && len(schemaActions) == 0 {
return true, nil
}
cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range actions {
cmdio.Log(ctx, action)
// One or more UC schema resources will be deleted or recreated.
if len(schemaActions) != 0 {
cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range schemaActions {
cmdio.Log(ctx, action)
}
}
// One or more DLT pipelines is being recreated.
if len(dltActions) != 0 {
msg := `
This action will result in the deletion or recreation of the following DLT Pipelines along with the
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
properties such as the 'catalog' or 'storage' are changed:`
cmdio.LogString(ctx, msg)
for _, action := range dltActions {
cmdio.Log(ctx, action)
}
}
if b.AutoApprove {
@ -126,7 +166,7 @@ func Deploy() bundle.Mutator {
terraform.CheckRunningResource(),
terraform.Plan(terraform.PlanGoal("deploy")),
bundle.If(
approvalForUcSchemaDelete,
approvalForDeploy,
deployCore,
bundle.LogString("Deployment cancelled!"),
),

View File

@ -0,0 +1,67 @@
package phases
import (
"testing"
terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
"github.com/stretchr/testify/assert"
)
func TestParseTerraformActions(t *testing.T) {
changes := []*tfjson.ResourceChange{
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionCreate},
},
Name: "create pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete},
},
Name: "delete pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate pipeline",
},
{
Type: "databricks_whatever",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate whatever",
},
}
res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool {
if typ != "databricks_pipeline" {
return false
}
if actions.Delete() || actions.Replace() {
return true
}
return false
})
assert.Equal(t, []terraformlib.Action{
{
Action: terraformlib.ActionTypeDelete,
ResourceType: "databricks_pipeline",
ResourceName: "delete pipeline",
},
{
Action: terraformlib.ActionTypeRecreate,
ResourceType: "databricks_pipeline",
ResourceName: "recreate pipeline",
},
}, res)
}

View File

@ -0,0 +1,8 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for the schema and pipeline names"
}
}
}

View File

@ -0,0 +1,25 @@
bundle:
name: "bundle-playground"
variables:
catalog:
description: The catalog the DLT pipeline should use.
default: main
resources:
pipelines:
foo:
name: test-pipeline-{{.unique_id}}
libraries:
- notebook:
path: ./nb.sql
development: true
catalog: ${var.catalog}
include:
- "*.yml"
targets:
development:
default: true

View File

@ -0,0 +1,2 @@
-- Databricks notebook source
select 1

View File

@ -120,8 +120,97 @@ func TestAccBundleDeployUcSchemaFailsWithoutAutoApprove(t *testing.T) {
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
stdout, _, err := c.Run()
stdout, stderr, err := c.Run()
assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), "The following UC schemas will be deleted or recreated. Any underlying data may be lost:\n delete schema bar")
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}
func TestAccBundlePipelineDeleteWithoutAutoApprove(t *testing.T) {
ctx, wt := acc.WorkspaceTest(t)
w := wt.W
nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
uniqueId := uuid.New().String()
bundleRoot, err := initTestTemplate(t, ctx, "deploy_then_remove_resources", map[string]any{
"unique_id": uniqueId,
"node_type_id": nodeTypeId,
"spark_version": defaultSparkVersion,
})
require.NoError(t, err)
// deploy pipeline
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)
// assert pipeline is created
pipelineName := "test-bundle-pipeline-" + uniqueId
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
require.NoError(t, err)
assert.Equal(t, pipeline.Name, pipelineName)
// assert job is created
jobName := "test-bundle-job-" + uniqueId
job, err := w.Jobs.GetBySettingsName(ctx, jobName)
require.NoError(t, err)
assert.Equal(t, job.Settings.Name, jobName)
// delete resources.yml
err = os.Remove(filepath.Join(bundleRoot, "resources.yml"))
require.NoError(t, err)
// Redeploy the bundle. Expect it to fail because deleting the pipeline requires --auto-approve.
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
stdout, stderr, err := c.Run()
assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following DLT Pipelines along with the
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
properties such as the 'catalog' or 'storage' are changed:
delete pipeline bar`)
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}
func TestAccBundlePipelineRecreateWithoutAutoApprove(t *testing.T) {
ctx, wt := acc.UcWorkspaceTest(t)
w := wt.W
uniqueId := uuid.New().String()
bundleRoot, err := initTestTemplate(t, ctx, "recreate_pipeline", map[string]any{
"unique_id": uniqueId,
})
require.NoError(t, err)
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)
t.Cleanup(func() {
destroyBundle(t, ctx, bundleRoot)
})
// Assert the pipeline is created
pipelineName := "test-pipeline-" + uniqueId
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
require.NoError(t, err)
require.Equal(t, pipelineName, pipeline.Name)
// Redeploy the bundle, pointing the DLT pipeline to a different UC catalog.
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock", "--var=\"catalog=whatever\"")
stdout, stderr, err := c.Run()
assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following DLT Pipelines along with the
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
properties such as the 'catalog' or 'storage' are changed:
recreate pipeline foo`)
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}