Add prompt when a pipeline recreation happens

This commit is contained in:
Shreyas Goenka 2024-08-12 16:00:20 +02:00
parent 26bb4b9c75
commit 105a3dfc42
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
7 changed files with 206 additions and 27 deletions

View File

@ -19,8 +19,37 @@ import (
"github.com/databricks/cli/bundle/scripts" "github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
terraformlib "github.com/databricks/cli/libs/terraform" terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
) )
func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action {
res := make([]terraformlib.Action, 0)
for _, rc := range changes {
if !toInclude(rc.Type, rc.Change.Actions) {
continue
}
var actionType terraformlib.ActionType
switch {
case rc.Change.Actions.Delete():
actionType = terraformlib.ActionTypeDelete
case rc.Change.Actions.Replace():
actionType = terraformlib.ActionTypeRecreate
default:
// No use case for other action types yet.
continue
}
res = append(res, terraformlib.Action{
Action: actionType,
ResourceType: rc.Type,
ResourceName: rc.Name,
})
}
return res
}
func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) { func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform tf := b.Terraform
if tf == nil { if tf == nil {
@ -33,42 +62,48 @@ func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, err
return false, err return false, err
} }
actions := make([]terraformlib.Action, 0) schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
for _, rc := range plan.ResourceChanges { // Filter in only UC schema resources.
// We only care about destructive actions on UC schema resources. if typ != "databricks_schema" {
if rc.Type != "databricks_schema" { return false
continue
} }
var actionType terraformlib.ActionType // We only display prompts for destructive actions like deleting or
// recreating a schema.
switch { return actions.Delete() || actions.Replace()
case rc.Change.Actions.Delete():
actionType = terraformlib.ActionTypeDelete
case rc.Change.Actions.Replace():
actionType = terraformlib.ActionTypeRecreate
default:
// We don't need a prompt for non-destructive actions like creating
// or updating a schema.
continue
}
actions = append(actions, terraformlib.Action{
Action: actionType,
ResourceType: rc.Type,
ResourceName: rc.Name,
}) })
dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only DLT pipeline resources.
if typ != "databricks_pipeline" {
return false
} }
// No restricted actions planned. No need for approval. // Recreating DLT pipeline leads to metadata loss and for a transient period
if len(actions) == 0 { // the underling tables will be unavailable.
return actions.Replace()
})
// We don't need to display any prompts in this case.
if len(dltActions) == 0 && len(schemaActions) == 0 {
return true, nil return true, nil
} }
// One or more UC schema resources will be deleted or recreated.
if len(schemaActions) != 0 {
cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:") cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range actions { for _, action := range schemaActions {
cmdio.Log(ctx, action) cmdio.Log(ctx, action)
} }
}
// One or more DLT pipelines is being recreated.
if len(dltActions) != 0 {
cmdio.LogString(ctx, "The following DLT pipelines will be recreated. Underlying tables will be unavailable for a transient period, until the newly recreated pipeline is run once successfully. History of previous pipeline update runs will be lost as a result of the recreation:")
for _, action := range dltActions {
cmdio.Log(ctx, action)
}
}
if b.AutoApprove { if b.AutoApprove {
return true, nil return true, nil

View File

@ -0,0 +1,67 @@
package phases
import (
"testing"
terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
"github.com/stretchr/testify/assert"
)
func TestParseTerraformActions(t *testing.T) {
changes := []*tfjson.ResourceChange{
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionCreate},
},
Name: "create pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete},
},
Name: "delete pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate pipeline",
},
{
Type: "databricks_whatever",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate whatever",
},
}
res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool {
if typ != "databricks_pipeline" {
return false
}
if actions.Delete() || actions.Replace() {
return true
}
return false
})
assert.Equal(t, []terraformlib.Action{
{
Action: terraformlib.ActionTypeDelete,
ResourceType: "databricks_pipeline",
ResourceName: "delete pipeline",
},
{
Action: terraformlib.ActionTypeRecreate,
ResourceType: "databricks_pipeline",
ResourceName: "recreate pipeline",
},
}, res)
}

View File

@ -29,6 +29,12 @@ func (f *progressLoggerFlag) resolveModeDefault(format flags.ProgressLogFormat)
} }
func (f *progressLoggerFlag) initializeContext(ctx context.Context) (context.Context, error) { func (f *progressLoggerFlag) initializeContext(ctx context.Context) (context.Context, error) {
// No need to initialize the logger if it's already set in the context. This
// happens in unit tests where the logger is setup as a fixture.
if _, ok := cmdio.FromContext(ctx); ok {
return ctx, nil
}
if f.log.level.String() != "disabled" && f.log.file.String() == "stderr" && if f.log.level.String() != "disabled" && f.log.file.String() == "stderr" &&
f.ProgressLogFormat == flags.ModeInplace { f.ProgressLogFormat == flags.ModeInplace {
return nil, fmt.Errorf("inplace progress logging cannot be used when log-file is stderr") return nil, fmt.Errorf("inplace progress logging cannot be used when log-file is stderr")

View File

@ -0,0 +1,8 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for the schema and pipeline names"
}
}
}

View File

@ -0,0 +1,25 @@
bundle:
name: "bundle-playground"
variables:
catalog:
description: The catalog the DLT pipeline should use.
default: main
resources:
pipelines:
foo:
name: test-pipeline-{{.unique_id}}
libraries:
- notebook:
path: ./nb.sql
development: true
catalog: ${var.catalog}
include:
- "*.yml"
targets:
development:
default: true

View File

@ -0,0 +1,2 @@
-- Databricks notebook source
select 1

View File

@ -119,7 +119,43 @@ func TestAccBundleDeployUcSchemaFailsWithoutAutoApprove(t *testing.T) {
t.Setenv("BUNDLE_ROOT", bundleRoot) t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb") t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock") c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
stdout, _, err := c.Run() stdout, stderr, err := c.Run()
assert.EqualError(t, err, root.ErrAlreadyPrinted.Error()) assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), "The following UC schemas will be deleted or recreated. Any underlying data may be lost:\n delete schema bar")
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}
func TestAccBundlePipelineRecreateWithoutAutoApprove(t *testing.T) {
ctx, wt := acc.UcWorkspaceTest(t)
w := wt.W
uniqueId := uuid.New().String()
bundleRoot, err := initTestTemplate(t, ctx, "recreate_pipeline", map[string]any{
"unique_id": uniqueId,
})
require.NoError(t, err)
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)
t.Cleanup(func() {
destroyBundle(t, ctx, bundleRoot)
})
// Assert the pipeline is created
pipelineName := "test-pipeline-" + uniqueId
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
require.NoError(t, err)
require.Equal(t, pipelineName, pipeline.Name)
// Redeploy the bundle, pointing the DLT pipeline to a different UC catalog.
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock", "--var=\"catalog=whatever\"")
stdout, stderr, err := c.Run()
assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), "The following DLT pipelines will be recreated. Underlying tables will be unavailable for a transient period, until the newly recreated pipeline is run once successfully. History of previous pipeline update runs will be lost as a result of the recreation:\n recreate pipeline foo")
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed") assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
} }