Add resource for UC schemas to DABs (#1413)

## Changes
This PR adds support for UC Schemas to DABs. This allows users to define
schemas for tables and other assets their pipelines/workflows create as
part of the DAB, thus managing the life-cycle in the DAB.

The first version has a couple of intentional limitations:
1. The owner of the schema will be the deployment user. Changing the
owner of the schema is not allowed (yet). `run_as` will not be
restricted for DABs containing UC schemas. Let's limit the scope of
run_as to the compute identity used instead of ownership of data assets
like UC schemas.
2. API fields that are present in the update API but not the create API.
For example: enabling predictive optimization is not supported in the
create schema API and thus is not available in DABs at the moment.

## Tests
Manually and integration test. Manually verified the following work:
1. Development mode adds a "dev_" prefix.
2. Modified status is correctly computed in the `bundle summary`
command.
3. Grants work as expected, for assigning privileges.
4. Variable interpolation works for the schema ID.
This commit is contained in:
shreyas-goenka 2024-07-31 17:46:28 +05:30 committed by GitHub
parent 5afcc25d27
commit 89c0af5bdc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 540 additions and 13 deletions

View File

@ -112,6 +112,13 @@ func transformDevelopmentMode(ctx context.Context, b *bundle.Bundle) diag.Diagno
} }
} }
for i := range r.Schemas {
prefix = "dev_" + b.Config.Workspace.CurrentUser.ShortName + "_"
r.Schemas[i].Name = prefix + r.Schemas[i].Name
// HTTP API for schemas doesn't yet support tags. It's only supported in
// the Databricks UI and via the SQL API.
}
return nil return nil
} }

View File

@ -114,6 +114,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle {
}, },
}, },
}, },
Schemas: map[string]*resources.Schema{
"schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}},
},
}, },
}, },
// Use AWS implementation for testing. // Use AWS implementation for testing.
@ -167,6 +170,9 @@ func TestProcessTargetModeDevelopment(t *testing.T) {
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
assert.Nil(t, b.Config.Resources.QualityMonitors["qualityMonitor2"].Schedule) assert.Nil(t, b.Config.Resources.QualityMonitors["qualityMonitor2"].Schedule)
assert.Equal(t, catalog.MonitorCronSchedulePauseStatusUnpaused, b.Config.Resources.QualityMonitors["qualityMonitor3"].Schedule.PauseStatus) assert.Equal(t, catalog.MonitorCronSchedulePauseStatusUnpaused, b.Config.Resources.QualityMonitors["qualityMonitor3"].Schedule.PauseStatus)
// Schema 1
assert.Equal(t, "dev_lennart_schema1", b.Config.Resources.Schemas["schema1"].Name)
} }
func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) { func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) {

View File

@ -39,6 +39,7 @@ func allResourceTypes(t *testing.T) []string {
"pipelines", "pipelines",
"quality_monitors", "quality_monitors",
"registered_models", "registered_models",
"schemas",
}, },
resourceTypes, resourceTypes,
) )
@ -136,6 +137,7 @@ func TestRunAsErrorForUnsupportedResources(t *testing.T) {
"models", "models",
"registered_models", "registered_models",
"experiments", "experiments",
"schemas",
} }
base := config.Root{ base := config.Root{

View File

@ -18,6 +18,7 @@ type Resources struct {
ModelServingEndpoints map[string]*resources.ModelServingEndpoint `json:"model_serving_endpoints,omitempty"` ModelServingEndpoints map[string]*resources.ModelServingEndpoint `json:"model_serving_endpoints,omitempty"`
RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"` RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"`
QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"` QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"`
Schemas map[string]*resources.Schema `json:"schemas,omitempty"`
} }
type resource struct { type resource struct {

View File

@ -0,0 +1,27 @@
package resources
import (
"github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/catalog"
)
type Schema struct {
// List of grants to apply on this schema.
Grants []Grant `json:"grants,omitempty"`
// Full name of the schema (catalog_name.schema_name). This value is read from
// the terraform state after deployment succeeds.
ID string `json:"id,omitempty" bundle:"readonly"`
*catalog.CreateSchema
ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
}
func (s *Schema) UnmarshalJSON(b []byte) error {
return marshal.Unmarshal(b, s)
}
func (s Schema) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s)
}

View File

@ -66,8 +66,10 @@ func convGrants(acl []resources.Grant) *schema.ResourceGrants {
// BundleToTerraform converts resources in a bundle configuration // BundleToTerraform converts resources in a bundle configuration
// to the equivalent Terraform JSON representation. // to the equivalent Terraform JSON representation.
// //
// NOTE: THIS IS CURRENTLY A HACK. WE NEED A BETTER WAY TO // Note: This function is an older implementation of the conversion logic. It is
// CONVERT TO/FROM TERRAFORM COMPATIBLE FORMAT. // no longer used in any code paths. It is kept around to be used in tests.
// New resources do not need to modify this function and can instead can define
// the conversion login in the tfdyn package.
func BundleToTerraform(config *config.Root) *schema.Root { func BundleToTerraform(config *config.Root) *schema.Root {
tfroot := schema.NewRoot() tfroot := schema.NewRoot()
tfroot.Provider = schema.NewProviders() tfroot.Provider = schema.NewProviders()
@ -382,6 +384,16 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
} }
cur.ID = instance.Attributes.ID cur.ID = instance.Attributes.ID
config.Resources.QualityMonitors[resource.Name] = cur config.Resources.QualityMonitors[resource.Name] = cur
case "databricks_schema":
if config.Resources.Schemas == nil {
config.Resources.Schemas = make(map[string]*resources.Schema)
}
cur := config.Resources.Schemas[resource.Name]
if cur == nil {
cur = &resources.Schema{ModifiedStatus: resources.ModifiedStatusDeleted}
}
cur.ID = instance.Attributes.ID
config.Resources.Schemas[resource.Name] = cur
case "databricks_permissions": case "databricks_permissions":
case "databricks_grants": case "databricks_grants":
// Ignore; no need to pull these back into the configuration. // Ignore; no need to pull these back into the configuration.
@ -426,6 +438,11 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
src.ModifiedStatus = resources.ModifiedStatusCreated src.ModifiedStatus = resources.ModifiedStatusCreated
} }
} }
for _, src := range config.Resources.Schemas {
if src.ModifiedStatus == "" && src.ID == "" {
src.ModifiedStatus = resources.ModifiedStatusCreated
}
}
return nil return nil
} }

View File

@ -655,6 +655,14 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "1"}}, {Attributes: stateInstanceAttributes{ID: "1"}},
}, },
}, },
{
Type: "databricks_schema",
Mode: "managed",
Name: "test_schema",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
}, },
} }
err := TerraformToBundle(&tfState, &config) err := TerraformToBundle(&tfState, &config)
@ -681,6 +689,9 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
assert.Equal(t, "1", config.Resources.QualityMonitors["test_monitor"].ID) assert.Equal(t, "1", config.Resources.QualityMonitors["test_monitor"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.QualityMonitors["test_monitor"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.QualityMonitors["test_monitor"].ModifiedStatus)
assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }
@ -736,6 +747,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
}, },
}, },
}, },
Schemas: map[string]*resources.Schema{
"test_schema": {
CreateSchema: &catalog.CreateSchema{
Name: "test_schema",
},
},
},
}, },
} }
var tfState = resourcesState{ var tfState = resourcesState{
@ -765,6 +783,9 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
assert.Equal(t, "", config.Resources.QualityMonitors["test_monitor"].ID) assert.Equal(t, "", config.Resources.QualityMonitors["test_monitor"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.QualityMonitors["test_monitor"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.QualityMonitors["test_monitor"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }
@ -855,6 +876,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
}, },
}, },
}, },
Schemas: map[string]*resources.Schema{
"test_schema": {
CreateSchema: &catalog.CreateSchema{
Name: "test_schema",
},
},
"test_schema_new": {
CreateSchema: &catalog.CreateSchema{
Name: "test_schema_new",
},
},
},
}, },
} }
var tfState = resourcesState{ var tfState = resourcesState{
@ -971,6 +1004,22 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "test_monitor_old"}}, {Attributes: stateInstanceAttributes{ID: "test_monitor_old"}},
}, },
}, },
{
Type: "databricks_schema",
Mode: "managed",
Name: "test_schema",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
{
Type: "databricks_schema",
Mode: "managed",
Name: "test_schema_old",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "2"}},
},
},
}, },
} }
err := TerraformToBundle(&tfState, &config) err := TerraformToBundle(&tfState, &config)
@ -1024,6 +1073,14 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.QualityMonitors["test_monitor_old"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.QualityMonitors["test_monitor_old"].ModifiedStatus)
assert.Equal(t, "", config.Resources.QualityMonitors["test_monitor_new"].ID) assert.Equal(t, "", config.Resources.QualityMonitors["test_monitor_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.QualityMonitors["test_monitor_new"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.QualityMonitors["test_monitor_new"].ModifiedStatus)
assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, "", config.Resources.Schemas["test_schema"].ModifiedStatus)
assert.Equal(t, "2", config.Resources.Schemas["test_schema_old"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema_old"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }

View File

@ -56,6 +56,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D
path = dyn.NewPath(dyn.Key("databricks_registered_model")).Append(path[2:]...) path = dyn.NewPath(dyn.Key("databricks_registered_model")).Append(path[2:]...)
case dyn.Key("quality_monitors"): case dyn.Key("quality_monitors"):
path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...) path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...)
case dyn.Key("schemas"):
path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...)
default: default:
// Trigger "key not found" for unknown resource types. // Trigger "key not found" for unknown resource types.
return dyn.GetByPath(root, path) return dyn.GetByPath(root, path)

View File

@ -30,6 +30,7 @@ func TestInterpolate(t *testing.T) {
"other_experiment": "${resources.experiments.other_experiment.id}", "other_experiment": "${resources.experiments.other_experiment.id}",
"other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}", "other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}",
"other_registered_model": "${resources.registered_models.other_registered_model.id}", "other_registered_model": "${resources.registered_models.other_registered_model.id}",
"other_schema": "${resources.schemas.other_schema.id}",
}, },
Tasks: []jobs.Task{ Tasks: []jobs.Task{
{ {
@ -65,6 +66,7 @@ func TestInterpolate(t *testing.T) {
assert.Equal(t, "${databricks_mlflow_experiment.other_experiment.id}", j.Tags["other_experiment"]) assert.Equal(t, "${databricks_mlflow_experiment.other_experiment.id}", j.Tags["other_experiment"])
assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"]) assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"])
assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"]) assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"])
assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"])
m := b.Config.Resources.Models["my_model"] m := b.Config.Resources.Models["my_model"]
assert.Equal(t, "my_model", m.Model.Name) assert.Equal(t, "my_model", m.Model.Name)

View File

@ -0,0 +1,53 @@
package tfdyn
import (
"context"
"fmt"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/log"
)
func convertSchemaResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the output value to the target schema.
v, diags := convert.Normalize(schema.ResourceSchema{}, vin)
for _, diag := range diags {
log.Debugf(ctx, "schema normalization diagnostic: %s", diag.Summary)
}
// We always set force destroy as it allows DABs to manage the lifecycle
// of the schema. It's the responsibility of the CLI to ensure the user
// is adequately warned when they try to delete a UC schema.
vout, err := dyn.SetByPath(v, dyn.MustPathFromString("force_destroy"), dyn.V(true))
if err != nil {
return dyn.InvalidValue, err
}
return vout, nil
}
type schemaConverter struct{}
func (schemaConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error {
vout, err := convertSchemaResource(ctx, vin)
if err != nil {
return err
}
// Add the converted resource to the output.
out.Schema[key] = vout.AsAny()
// Configure grants for this resource.
if grants := convertGrantsResource(ctx, vin); grants != nil {
grants.Schema = fmt.Sprintf("${databricks_schema.%s.id}", key)
out.Grants["schema_"+key] = grants
}
return nil
}
func init() {
registerConverter("schemas", schemaConverter{})
}

View File

@ -0,0 +1,75 @@
package tfdyn
import (
"context"
"testing"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConvertSchema(t *testing.T) {
var src = resources.Schema{
CreateSchema: &catalog.CreateSchema{
Name: "name",
CatalogName: "catalog",
Comment: "comment",
Properties: map[string]string{
"k1": "v1",
"k2": "v2",
},
StorageRoot: "root",
},
Grants: []resources.Grant{
{
Privileges: []string{"EXECUTE"},
Principal: "jack@gmail.com",
},
{
Privileges: []string{"RUN"},
Principal: "jane@gmail.com",
},
},
}
vin, err := convert.FromTyped(src, dyn.NilValue)
require.NoError(t, err)
ctx := context.Background()
out := schema.NewResources()
err = schemaConverter{}.Convert(ctx, "my_schema", vin, out)
require.NoError(t, err)
// Assert equality on the schema
assert.Equal(t, map[string]any{
"name": "name",
"catalog_name": "catalog",
"comment": "comment",
"properties": map[string]any{
"k1": "v1",
"k2": "v2",
},
"force_destroy": true,
"storage_root": "root",
}, out.Schema["my_schema"])
// Assert equality on the grants
assert.Equal(t, &schema.ResourceGrants{
Schema: "${databricks_schema.my_schema.id}",
Grant: []schema.ResourceGrantsGrant{
{
Privileges: []string{"EXECUTE"},
Principal: "jack@gmail.com",
},
{
Privileges: []string{"RUN"},
Principal: "jane@gmail.com",
},
},
}, out.Grants["schema_my_schema"])
}

View File

@ -1,6 +1,9 @@
package phases package phases
import ( import (
"context"
"fmt"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/artifacts" "github.com/databricks/cli/bundle/artifacts"
"github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config"
@ -14,10 +17,91 @@ import (
"github.com/databricks/cli/bundle/permissions" "github.com/databricks/cli/bundle/permissions"
"github.com/databricks/cli/bundle/python" "github.com/databricks/cli/bundle/python"
"github.com/databricks/cli/bundle/scripts" "github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio"
terraformlib "github.com/databricks/cli/libs/terraform"
) )
func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform
if tf == nil {
return false, fmt.Errorf("terraform not initialized")
}
// read plan file
plan, err := tf.ShowPlanFile(ctx, b.Plan.Path)
if err != nil {
return false, err
}
actions := make([]terraformlib.Action, 0)
for _, rc := range plan.ResourceChanges {
// We only care about destructive actions on UC schema resources.
if rc.Type != "databricks_schema" {
continue
}
var actionType terraformlib.ActionType
switch {
case rc.Change.Actions.Delete():
actionType = terraformlib.ActionTypeDelete
case rc.Change.Actions.Replace():
actionType = terraformlib.ActionTypeRecreate
default:
// We don't need a prompt for non-destructive actions like creating
// or updating a schema.
continue
}
actions = append(actions, terraformlib.Action{
Action: actionType,
ResourceType: rc.Type,
ResourceName: rc.Name,
})
}
// No restricted actions planned. No need for approval.
if len(actions) == 0 {
return true, nil
}
cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range actions {
cmdio.Log(ctx, action)
}
if b.AutoApprove {
return true, nil
}
if !cmdio.IsPromptSupported(ctx) {
return false, fmt.Errorf("the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}
cmdio.LogString(ctx, "")
approved, err := cmdio.AskYesOrNo(ctx, "Would you like to proceed?")
if err != nil {
return false, err
}
return approved, nil
}
// The deploy phase deploys artifacts and resources. // The deploy phase deploys artifacts and resources.
func Deploy() bundle.Mutator { func Deploy() bundle.Mutator {
// Core mutators that CRUD resources and modify deployment state. These
// mutators need informed consent if they are potentially destructive.
deployCore := bundle.Defer(
terraform.Apply(),
bundle.Seq(
terraform.StatePush(),
terraform.Load(),
metadata.Compute(),
metadata.Upload(),
bundle.LogString("Deployment complete!"),
),
)
deployMutator := bundle.Seq( deployMutator := bundle.Seq(
scripts.Execute(config.ScriptPreDeploy), scripts.Execute(config.ScriptPreDeploy),
lock.Acquire(), lock.Acquire(),
@ -37,20 +121,16 @@ func Deploy() bundle.Mutator {
terraform.Interpolate(), terraform.Interpolate(),
terraform.Write(), terraform.Write(),
terraform.CheckRunningResource(), terraform.CheckRunningResource(),
bundle.Defer( terraform.Plan(terraform.PlanGoal("deploy")),
terraform.Apply(), bundle.If(
bundle.Seq( approvalForUcSchemaDelete,
terraform.StatePush(), deployCore,
terraform.Load(), bundle.LogString("Deployment cancelled!"),
metadata.Compute(),
metadata.Upload(),
),
), ),
), ),
lock.Release(lock.GoalDeploy), lock.Release(lock.GoalDeploy),
), ),
scripts.Execute(config.ScriptPostDeploy), scripts.Execute(config.ScriptPostDeploy),
bundle.LogString("Deployment complete!"),
) )
return newPhase( return newPhase(

View File

@ -24,10 +24,12 @@ func newDeployCommand() *cobra.Command {
var forceLock bool var forceLock bool
var failOnActiveRuns bool var failOnActiveRuns bool
var computeID string var computeID string
var autoApprove bool
cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.") cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.")
cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.") cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.")
cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.") cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.")
cmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.") cmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.")
cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals that might be required for deployment.")
cmd.RunE = func(cmd *cobra.Command, args []string) error { cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context() ctx := cmd.Context()
@ -37,10 +39,11 @@ func newDeployCommand() *cobra.Command {
bundle.ApplyFunc(ctx, b, func(context.Context, *bundle.Bundle) diag.Diagnostics { bundle.ApplyFunc(ctx, b, func(context.Context, *bundle.Bundle) diag.Diagnostics {
b.Config.Bundle.Force = force b.Config.Bundle.Force = force
b.Config.Bundle.Deployment.Lock.Force = forceLock b.Config.Bundle.Deployment.Lock.Force = forceLock
b.AutoApprove = autoApprove
if cmd.Flag("compute-id").Changed { if cmd.Flag("compute-id").Changed {
b.Config.Bundle.ComputeID = computeID b.Config.Bundle.ComputeID = computeID
} }
if cmd.Flag("fail-on-active-runs").Changed { if cmd.Flag("fail-on-active-runs").Changed {
b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns
} }

View File

@ -2,6 +2,7 @@ package acc
import ( import (
"context" "context"
"os"
"testing" "testing"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
@ -38,6 +39,33 @@ func WorkspaceTest(t *testing.T) (context.Context, *WorkspaceT) {
return wt.ctx, wt return wt.ctx, wt
} }
// Run the workspace test only on UC workspaces.
func UcWorkspaceTest(t *testing.T) (context.Context, *WorkspaceT) {
loadDebugEnvIfRunFromIDE(t, "workspace")
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
if os.Getenv("TEST_METASTORE_ID") == "" {
t.Skipf("Skipping on non-UC workspaces")
}
if os.Getenv("DATABRICKS_ACCOUNT_ID") != "" {
t.Skipf("Skipping on accounts")
}
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
wt := &WorkspaceT{
T: t,
W: w,
ctx: context.Background(),
}
return wt.ctx, wt
}
func (t *WorkspaceT) TestClusterID() string { func (t *WorkspaceT) TestClusterID() string {
clusterID := GetEnvOrSkipTest(t.T, "TEST_BRICKS_CLUSTER_ID") clusterID := GetEnvOrSkipTest(t.T, "TEST_BRICKS_CLUSTER_ID")
err := t.W.Clusters.EnsureClusterIsRunning(t.ctx, clusterID) err := t.W.Clusters.EnsureClusterIsRunning(t.ctx, clusterID)

View File

@ -0,0 +1,8 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for the schema and pipeline names"
}
}
}

View File

@ -0,0 +1,19 @@
bundle:
name: "bundle-playground"
resources:
pipelines:
foo:
name: test-pipeline-{{.unique_id}}
libraries:
- notebook:
path: ./nb.sql
development: true
catalog: main
include:
- "*.yml"
targets:
development:
default: true

View File

@ -0,0 +1,2 @@
-- Databricks notebook source
select 1

View File

@ -0,0 +1,13 @@
resources:
schemas:
bar:
name: test-schema-{{.unique_id}}
catalog_name: main
comment: This schema was created from DABs
targets:
development:
resources:
pipelines:
foo:
target: ${resources.schemas.bar.id}

View File

@ -0,0 +1,125 @@
package bundle
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"testing"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/internal/acc"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/files"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func setupUcSchemaBundle(t *testing.T, ctx context.Context, w *databricks.WorkspaceClient, uniqueId string) string {
bundleRoot, err := initTestTemplate(t, ctx, "uc_schema", map[string]any{
"unique_id": uniqueId,
})
require.NoError(t, err)
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)
t.Cleanup(func() {
destroyBundle(t, ctx, bundleRoot)
})
// Assert the schema is created
catalogName := "main"
schemaName := "test-schema-" + uniqueId
schema, err := w.Schemas.GetByFullName(ctx, strings.Join([]string{catalogName, schemaName}, "."))
require.NoError(t, err)
require.Equal(t, strings.Join([]string{catalogName, schemaName}, "."), schema.FullName)
require.Equal(t, "This schema was created from DABs", schema.Comment)
// Assert the pipeline is created
pipelineName := "test-pipeline-" + uniqueId
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
require.NoError(t, err)
require.Equal(t, pipelineName, pipeline.Name)
id := pipeline.PipelineId
// Assert the pipeline uses the schema
i, err := w.Pipelines.GetByPipelineId(ctx, id)
require.NoError(t, err)
require.Equal(t, catalogName, i.Spec.Catalog)
require.Equal(t, strings.Join([]string{catalogName, schemaName}, "."), i.Spec.Target)
// Create a volume in the schema, and add a file to it. This ensures that the
// schema has some data in it and deletion will fail unless the generated
// terraform configuration has force_destroy set to true.
volumeName := "test-volume-" + uniqueId
volume, err := w.Volumes.Create(ctx, catalog.CreateVolumeRequestContent{
CatalogName: catalogName,
SchemaName: schemaName,
Name: volumeName,
VolumeType: catalog.VolumeTypeManaged,
})
require.NoError(t, err)
require.Equal(t, volume.Name, volumeName)
fileName := "test-file-" + uniqueId
err = w.Files.Upload(ctx, files.UploadRequest{
Contents: io.NopCloser(strings.NewReader("Hello, world!")),
FilePath: fmt.Sprintf("/Volumes/%s/%s/%s/%s", catalogName, schemaName, volumeName, fileName),
})
require.NoError(t, err)
return bundleRoot
}
func TestAccBundleDeployUcSchema(t *testing.T) {
ctx, wt := acc.UcWorkspaceTest(t)
w := wt.W
uniqueId := uuid.New().String()
schemaName := "test-schema-" + uniqueId
catalogName := "main"
bundleRoot := setupUcSchemaBundle(t, ctx, w, uniqueId)
// Remove the UC schema from the resource configuration.
err := os.Remove(filepath.Join(bundleRoot, "schema.yml"))
require.NoError(t, err)
// Redeploy the bundle
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)
// Assert the schema is deleted
_, err = w.Schemas.GetByFullName(ctx, strings.Join([]string{catalogName, schemaName}, "."))
apiErr := &apierr.APIError{}
assert.True(t, errors.As(err, &apiErr))
assert.Equal(t, "SCHEMA_DOES_NOT_EXIST", apiErr.ErrorCode)
}
func TestAccBundleDeployUcSchemaFailsWithoutAutoApprove(t *testing.T) {
ctx, wt := acc.UcWorkspaceTest(t)
w := wt.W
uniqueId := uuid.New().String()
bundleRoot := setupUcSchemaBundle(t, ctx, w, uniqueId)
// Remove the UC schema from the resource configuration.
err := os.Remove(filepath.Join(bundleRoot, "schema.yml"))
require.NoError(t, err)
// Redeploy the bundle
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
stdout, _, err := c.Run()
assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}

View File

@ -64,7 +64,7 @@ func validateBundle(t *testing.T, ctx context.Context, path string) ([]byte, err
func deployBundle(t *testing.T, ctx context.Context, path string) error { func deployBundle(t *testing.T, ctx context.Context, path string) error {
t.Setenv("BUNDLE_ROOT", path) t.Setenv("BUNDLE_ROOT", path)
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock") c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock", "--auto-approve")
_, _, err := c.Run() _, _, err := c.Run()
return err return err
} }