mirror of https://github.com/databricks/cli.git
Merge branch 'main' into feature/logout
This commit is contained in:
commit
b044a6c0e0
|
@ -433,10 +433,20 @@ func rewriteShorthands(v dyn.Value) (dyn.Value, error) {
|
|||
}, variable.Locations()), nil
|
||||
|
||||
case dyn.KindMap, dyn.KindSequence:
|
||||
lookup, err := dyn.Get(variable, "lookup")
|
||||
// If lookup is set, we don't want to rewrite the variable and return it as is.
|
||||
if err == nil && lookup.Kind() != dyn.KindInvalid {
|
||||
return variable, nil
|
||||
}
|
||||
|
||||
// Check if the original definition of variable has a type field.
|
||||
// Type might not be found if the variable overriden in a separate file
|
||||
// and configuration is not merged yet.
|
||||
typeV, err := dyn.GetByPath(v, p.Append(dyn.Key("type")))
|
||||
if err != nil {
|
||||
return variable, nil
|
||||
return dyn.NewValue(map[string]dyn.Value{
|
||||
"default": variable,
|
||||
}, variable.Locations()), nil
|
||||
}
|
||||
|
||||
if typeV.MustString() == "complex" {
|
||||
|
|
|
@ -19,9 +19,38 @@ import (
|
|||
"github.com/databricks/cli/bundle/scripts"
|
||||
"github.com/databricks/cli/libs/cmdio"
|
||||
terraformlib "github.com/databricks/cli/libs/terraform"
|
||||
tfjson "github.com/hashicorp/terraform-json"
|
||||
)
|
||||
|
||||
func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) {
|
||||
func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action {
|
||||
res := make([]terraformlib.Action, 0)
|
||||
for _, rc := range changes {
|
||||
if !toInclude(rc.Type, rc.Change.Actions) {
|
||||
continue
|
||||
}
|
||||
|
||||
var actionType terraformlib.ActionType
|
||||
switch {
|
||||
case rc.Change.Actions.Delete():
|
||||
actionType = terraformlib.ActionTypeDelete
|
||||
case rc.Change.Actions.Replace():
|
||||
actionType = terraformlib.ActionTypeRecreate
|
||||
default:
|
||||
// No use case for other action types yet.
|
||||
continue
|
||||
}
|
||||
|
||||
res = append(res, terraformlib.Action{
|
||||
Action: actionType,
|
||||
ResourceType: rc.Type,
|
||||
ResourceName: rc.Name,
|
||||
})
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func approvalForDeploy(ctx context.Context, b *bundle.Bundle) (bool, error) {
|
||||
tf := b.Terraform
|
||||
if tf == nil {
|
||||
return false, fmt.Errorf("terraform not initialized")
|
||||
|
@ -33,42 +62,53 @@ func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, err
|
|||
return false, err
|
||||
}
|
||||
|
||||
actions := make([]terraformlib.Action, 0)
|
||||
for _, rc := range plan.ResourceChanges {
|
||||
// We only care about destructive actions on UC schema resources.
|
||||
if rc.Type != "databricks_schema" {
|
||||
continue
|
||||
schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
|
||||
// Filter in only UC schema resources.
|
||||
if typ != "databricks_schema" {
|
||||
return false
|
||||
}
|
||||
|
||||
var actionType terraformlib.ActionType
|
||||
|
||||
switch {
|
||||
case rc.Change.Actions.Delete():
|
||||
actionType = terraformlib.ActionTypeDelete
|
||||
case rc.Change.Actions.Replace():
|
||||
actionType = terraformlib.ActionTypeRecreate
|
||||
default:
|
||||
// We don't need a prompt for non-destructive actions like creating
|
||||
// or updating a schema.
|
||||
continue
|
||||
}
|
||||
|
||||
actions = append(actions, terraformlib.Action{
|
||||
Action: actionType,
|
||||
ResourceType: rc.Type,
|
||||
ResourceName: rc.Name,
|
||||
// We only display prompts for destructive actions like deleting or
|
||||
// recreating a schema.
|
||||
return actions.Delete() || actions.Replace()
|
||||
})
|
||||
|
||||
dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
|
||||
// Filter in only DLT pipeline resources.
|
||||
if typ != "databricks_pipeline" {
|
||||
return false
|
||||
}
|
||||
|
||||
// No restricted actions planned. No need for approval.
|
||||
if len(actions) == 0 {
|
||||
// Recreating DLT pipeline leads to metadata loss and for a transient period
|
||||
// the underling tables will be unavailable.
|
||||
return actions.Replace() || actions.Delete()
|
||||
})
|
||||
|
||||
// We don't need to display any prompts in this case.
|
||||
if len(dltActions) == 0 && len(schemaActions) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// One or more UC schema resources will be deleted or recreated.
|
||||
if len(schemaActions) != 0 {
|
||||
cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
|
||||
for _, action := range actions {
|
||||
for _, action := range schemaActions {
|
||||
cmdio.Log(ctx, action)
|
||||
}
|
||||
}
|
||||
|
||||
// One or more DLT pipelines is being recreated.
|
||||
if len(dltActions) != 0 {
|
||||
msg := `
|
||||
This action will result in the deletion or recreation of the following DLT Pipelines along with the
|
||||
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
|
||||
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
|
||||
properties such as the 'catalog' or 'storage' are changed:`
|
||||
cmdio.LogString(ctx, msg)
|
||||
for _, action := range dltActions {
|
||||
cmdio.Log(ctx, action)
|
||||
}
|
||||
}
|
||||
|
||||
if b.AutoApprove {
|
||||
return true, nil
|
||||
|
@ -126,7 +166,7 @@ func Deploy() bundle.Mutator {
|
|||
terraform.CheckRunningResource(),
|
||||
terraform.Plan(terraform.PlanGoal("deploy")),
|
||||
bundle.If(
|
||||
approvalForUcSchemaDelete,
|
||||
approvalForDeploy,
|
||||
deployCore,
|
||||
bundle.LogString("Deployment cancelled!"),
|
||||
),
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
package phases
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
terraformlib "github.com/databricks/cli/libs/terraform"
|
||||
tfjson "github.com/hashicorp/terraform-json"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestParseTerraformActions(t *testing.T) {
|
||||
changes := []*tfjson.ResourceChange{
|
||||
{
|
||||
Type: "databricks_pipeline",
|
||||
Change: &tfjson.Change{
|
||||
Actions: tfjson.Actions{tfjson.ActionCreate},
|
||||
},
|
||||
Name: "create pipeline",
|
||||
},
|
||||
{
|
||||
Type: "databricks_pipeline",
|
||||
Change: &tfjson.Change{
|
||||
Actions: tfjson.Actions{tfjson.ActionDelete},
|
||||
},
|
||||
Name: "delete pipeline",
|
||||
},
|
||||
{
|
||||
Type: "databricks_pipeline",
|
||||
Change: &tfjson.Change{
|
||||
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
|
||||
},
|
||||
Name: "recreate pipeline",
|
||||
},
|
||||
{
|
||||
Type: "databricks_whatever",
|
||||
Change: &tfjson.Change{
|
||||
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
|
||||
},
|
||||
Name: "recreate whatever",
|
||||
},
|
||||
}
|
||||
|
||||
res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool {
|
||||
if typ != "databricks_pipeline" {
|
||||
return false
|
||||
}
|
||||
|
||||
if actions.Delete() || actions.Replace() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
})
|
||||
|
||||
assert.Equal(t, []terraformlib.Action{
|
||||
{
|
||||
Action: terraformlib.ActionTypeDelete,
|
||||
ResourceType: "databricks_pipeline",
|
||||
ResourceName: "delete pipeline",
|
||||
},
|
||||
{
|
||||
Action: terraformlib.ActionTypeRecreate,
|
||||
ResourceType: "databricks_pipeline",
|
||||
ResourceName: "recreate pipeline",
|
||||
},
|
||||
}, res)
|
||||
}
|
|
@ -68,3 +68,22 @@ func TestComplexVariablesOverride(t *testing.T) {
|
|||
require.Equal(t, "", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.SparkConf["spark.random"])
|
||||
require.Equal(t, "", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.PolicyId)
|
||||
}
|
||||
|
||||
func TestComplexVariablesOverrideWithMultipleFiles(t *testing.T) {
|
||||
b, diags := loadTargetWithDiags("variables/complex_multiple_files", "dev")
|
||||
require.Empty(t, diags)
|
||||
|
||||
diags = bundle.Apply(context.Background(), b, bundle.Seq(
|
||||
mutator.SetVariables(),
|
||||
mutator.ResolveVariableReferencesInComplexVariables(),
|
||||
mutator.ResolveVariableReferences(
|
||||
"variables",
|
||||
),
|
||||
))
|
||||
require.NoError(t, diags.Error())
|
||||
|
||||
require.Equal(t, "14.2.x-scala2.11", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.SparkVersion)
|
||||
require.Equal(t, "Standard_DS3_v2", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.NodeTypeId)
|
||||
require.Equal(t, 4, b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.NumWorkers)
|
||||
require.Equal(t, "false", b.Config.Resources.Jobs["my_job"].JobClusters[0].NewCluster.SparkConf["spark.speculation"])
|
||||
}
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
bundle:
|
||||
name: complex-variables-multiple-files
|
||||
|
||||
resources:
|
||||
jobs:
|
||||
my_job:
|
||||
job_clusters:
|
||||
- job_cluster_key: key
|
||||
new_cluster: ${var.cluster}
|
||||
|
||||
variables:
|
||||
cluster:
|
||||
type: complex
|
||||
description: "A cluster definition"
|
||||
|
||||
include:
|
||||
- ./variables/*.yml
|
|
@ -0,0 +1,11 @@
|
|||
targets:
|
||||
default:
|
||||
dev:
|
||||
variables:
|
||||
cluster:
|
||||
spark_version: "14.2.x-scala2.11"
|
||||
node_type_id: "Standard_DS3_v2"
|
||||
num_workers: 4
|
||||
spark_conf:
|
||||
spark.speculation: false
|
||||
spark.databricks.delta.retentionDurationCheck.enabled: false
|
|
@ -19,7 +19,7 @@ import (
|
|||
|
||||
func promptForProfile(ctx context.Context, defaultValue string) (string, error) {
|
||||
if !cmdio.IsInTTY(ctx) {
|
||||
return "", fmt.Errorf("the command is being run in a non-interactive environment, please specify a profile using --profile")
|
||||
return "", nil
|
||||
}
|
||||
|
||||
prompt := cmdio.Prompt(ctx)
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
{
|
||||
"properties": {
|
||||
"unique_id": {
|
||||
"type": "string",
|
||||
"description": "Unique ID for the schema and pipeline names"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
bundle:
|
||||
name: "bundle-playground"
|
||||
|
||||
variables:
|
||||
catalog:
|
||||
description: The catalog the DLT pipeline should use.
|
||||
default: main
|
||||
|
||||
|
||||
resources:
|
||||
pipelines:
|
||||
foo:
|
||||
name: test-pipeline-{{.unique_id}}
|
||||
libraries:
|
||||
- notebook:
|
||||
path: ./nb.sql
|
||||
development: true
|
||||
catalog: ${var.catalog}
|
||||
|
||||
include:
|
||||
- "*.yml"
|
||||
|
||||
targets:
|
||||
development:
|
||||
default: true
|
|
@ -0,0 +1,2 @@
|
|||
-- Databricks notebook source
|
||||
select 1
|
|
@ -120,8 +120,97 @@ func TestAccBundleDeployUcSchemaFailsWithoutAutoApprove(t *testing.T) {
|
|||
t.Setenv("BUNDLE_ROOT", bundleRoot)
|
||||
t.Setenv("TERM", "dumb")
|
||||
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
|
||||
stdout, _, err := c.Run()
|
||||
stdout, stderr, err := c.Run()
|
||||
|
||||
assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
|
||||
assert.Contains(t, stderr.String(), "The following UC schemas will be deleted or recreated. Any underlying data may be lost:\n delete schema bar")
|
||||
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
|
||||
}
|
||||
|
||||
func TestAccBundlePipelineDeleteWithoutAutoApprove(t *testing.T) {
|
||||
ctx, wt := acc.WorkspaceTest(t)
|
||||
w := wt.W
|
||||
|
||||
nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
|
||||
uniqueId := uuid.New().String()
|
||||
bundleRoot, err := initTestTemplate(t, ctx, "deploy_then_remove_resources", map[string]any{
|
||||
"unique_id": uniqueId,
|
||||
"node_type_id": nodeTypeId,
|
||||
"spark_version": defaultSparkVersion,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// deploy pipeline
|
||||
err = deployBundle(t, ctx, bundleRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
// assert pipeline is created
|
||||
pipelineName := "test-bundle-pipeline-" + uniqueId
|
||||
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, pipeline.Name, pipelineName)
|
||||
|
||||
// assert job is created
|
||||
jobName := "test-bundle-job-" + uniqueId
|
||||
job, err := w.Jobs.GetBySettingsName(ctx, jobName)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, job.Settings.Name, jobName)
|
||||
|
||||
// delete resources.yml
|
||||
err = os.Remove(filepath.Join(bundleRoot, "resources.yml"))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Redeploy the bundle. Expect it to fail because deleting the pipeline requires --auto-approve.
|
||||
t.Setenv("BUNDLE_ROOT", bundleRoot)
|
||||
t.Setenv("TERM", "dumb")
|
||||
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
|
||||
stdout, stderr, err := c.Run()
|
||||
|
||||
assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
|
||||
assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following DLT Pipelines along with the
|
||||
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
|
||||
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
|
||||
properties such as the 'catalog' or 'storage' are changed:
|
||||
delete pipeline bar`)
|
||||
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
|
||||
|
||||
}
|
||||
|
||||
func TestAccBundlePipelineRecreateWithoutAutoApprove(t *testing.T) {
|
||||
ctx, wt := acc.UcWorkspaceTest(t)
|
||||
w := wt.W
|
||||
uniqueId := uuid.New().String()
|
||||
|
||||
bundleRoot, err := initTestTemplate(t, ctx, "recreate_pipeline", map[string]any{
|
||||
"unique_id": uniqueId,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = deployBundle(t, ctx, bundleRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
destroyBundle(t, ctx, bundleRoot)
|
||||
})
|
||||
|
||||
// Assert the pipeline is created
|
||||
pipelineName := "test-pipeline-" + uniqueId
|
||||
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, pipelineName, pipeline.Name)
|
||||
|
||||
// Redeploy the bundle, pointing the DLT pipeline to a different UC catalog.
|
||||
t.Setenv("BUNDLE_ROOT", bundleRoot)
|
||||
t.Setenv("TERM", "dumb")
|
||||
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock", "--var=\"catalog=whatever\"")
|
||||
stdout, stderr, err := c.Run()
|
||||
|
||||
assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
|
||||
assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following DLT Pipelines along with the
|
||||
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
|
||||
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
|
||||
properties such as the 'catalog' or 'storage' are changed:
|
||||
recreate pipeline foo`)
|
||||
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue