Patch references to UC schemas to capture dependencies automatically (#1989)

## Changes
Fixes https://github.com/databricks/cli/issues/1977.  

This PR modifies the bundle configuration to capture the dependency that
a UC Volume or a DLT pipeline might have on a UC schema at deployment
time. It does so by replacing the schema name with a reference of the
form `${resources.schemas.foo.name}`.

For example:
The following UC Volume definition depends on the UC schema with the
name `schema_name`. This mutator converts this configuration

from:
```
resources:
  volumes:
    bar:
      catalog_name: catalog_name
      name: volume_name
      schema_name: schema_name

  schemas:
    foo:
      catalog_name: catalog_name
      name: schema_name
```

to:

```
resources:
  volumes:
    bar:
      catalog_name: catalog_name
      name: volume_name
      schema_name: ${resources.schemas.foo.name}`

  schemas:
    foo:
      catalog_name: catalog_name
      name: schema_name
```


## Tests
Unit tests and manually.
This commit is contained in:
shreyas-goenka 2025-01-16 18:57:00 +05:30 committed by GitHub
parent fa87f22706
commit f2bba632cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 379 additions and 0 deletions

View File

@ -0,0 +1,100 @@
package mutator
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/diag"
)
type captureSchemaDependency struct{}
// If a user defines a UC schema in the bundle, they can refer to it in DLT pipelines
// or UC Volumes using the `${resources.schemas.<schema_key>.name}` syntax. Using this
// syntax allows TF to capture the deploy time dependency this DLT pipeline or UC Volume
// has on the schema and deploy changes to the schema before deploying the pipeline or volume.
//
// This mutator translates any implicit schema references in DLT pipelines or UC Volumes
// to the explicit syntax.
func CaptureSchemaDependency() bundle.Mutator {
return &captureSchemaDependency{}
}
func (m *captureSchemaDependency) Name() string {
return "CaptureSchemaDependency"
}
func schemaNameRef(key string) string {
return fmt.Sprintf("${resources.schemas.%s.name}", key)
}
func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *resources.Schema) {
if catalogName == "" || schemaName == "" {
return "", nil
}
for k, s := range b.Config.Resources.Schemas {
if s != nil && s.CreateSchema != nil && s.CatalogName == catalogName && s.Name == schemaName {
return k, s
}
}
return "", nil
}
func resolveVolume(v *resources.Volume, b *bundle.Bundle) {
if v == nil || v.CreateVolumeRequestContent == nil {
return
}
schemaK, schema := findSchema(b, v.CatalogName, v.SchemaName)
if schema == nil {
return
}
v.SchemaName = schemaNameRef(schemaK)
}
func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) {
if p == nil || p.PipelineSpec == nil {
return
}
if p.Schema == "" {
return
}
schemaK, schema := findSchema(b, p.Catalog, p.Schema)
if schema == nil {
return
}
p.Schema = schemaNameRef(schemaK)
}
func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) {
if p == nil || p.PipelineSpec == nil {
return
}
if p.Target == "" {
return
}
schemaK, schema := findSchema(b, p.Catalog, p.Target)
if schema == nil {
return
}
p.Target = schemaNameRef(schemaK)
}
func (m *captureSchemaDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
for _, p := range b.Config.Resources.Pipelines {
// "schema" and "target" have the same semantics in the DLT API but are mutually
// exclusive i.e. only one can be set at a time. If schema is set, the pipeline
// is in direct publishing mode and can write tables to multiple schemas
// (vs target which is limited to a single schema).
resolvePipelineTarget(p, b)
resolvePipelineSchema(p, b)
}
for _, v := range b.Config.Resources.Volumes {
resolveVolume(v, b)
}
return nil
}

View File

@ -0,0 +1,277 @@
package mutator
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCaptureSchemaDependencyForVolume(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": nil,
"emptyschema": {},
},
Volumes: map[string]*resources.Volume{
"volume1": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "foobar",
},
},
"volume2": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog2",
SchemaName: "foobar",
},
},
"volume3": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "barfoo",
},
},
"volume4": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalogX",
SchemaName: "foobar",
},
},
"volume5": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "schemaX",
},
},
"nilVolume": nil,
"emptyVolume": {},
},
},
},
}
d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)
assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Volumes["volume1"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Volumes["volume2"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Volumes["volume3"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "foobar", b.Config.Resources.Volumes["volume4"].CreateVolumeRequestContent.SchemaName)
assert.Equal(t, "schemaX", b.Config.Resources.Volumes["volume5"].CreateVolumeRequestContent.SchemaName)
assert.Nil(t, b.Config.Resources.Volumes["nilVolume"])
assert.Nil(t, b.Config.Resources.Volumes["emptyVolume"].CreateVolumeRequestContent)
}
func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": nil,
"emptyschema": {},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "foobar",
},
},
"pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog2",
Schema: "foobar",
},
},
"pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "barfoo",
},
},
"pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalogX",
Schema: "foobar",
},
},
"pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "schemaX",
},
},
"pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Schema: "foobar",
},
},
"pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Schema: "",
Name: "whatever",
},
},
"nilPipeline": nil,
"emptyPipeline": {},
},
},
},
}
d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)
assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Pipelines["pipeline1"].Schema)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Pipelines["pipeline2"].Schema)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Pipelines["pipeline3"].Schema)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline4"].Schema)
assert.Equal(t, "schemaX", b.Config.Resources.Pipelines["pipeline5"].Schema)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Schema)
assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Schema)
assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"])
assert.Nil(t, b.Config.Resources.Pipelines["emptyPipeline"].PipelineSpec)
for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Target)
}
}
func TestCaptureSchemaDependencyForPipelinesWithSchema(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
"nilschema": nil,
"emptyschema": {},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "foobar",
},
},
"pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog2",
Target: "foobar",
},
},
"pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "barfoo",
},
},
"pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalogX",
Target: "foobar",
},
},
"pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "schemaX",
},
},
"pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Target: "foobar",
},
},
"pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Target: "",
Name: "whatever",
},
},
},
},
},
}
d := bundle.Apply(context.Background(), b, CaptureSchemaDependency())
require.Nil(t, d)
assert.Equal(t, "${resources.schemas.schema1.name}", b.Config.Resources.Pipelines["pipeline1"].Target)
assert.Equal(t, "${resources.schemas.schema2.name}", b.Config.Resources.Pipelines["pipeline2"].Target)
assert.Equal(t, "${resources.schemas.schema3.name}", b.Config.Resources.Pipelines["pipeline3"].Target)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline4"].Target)
assert.Equal(t, "schemaX", b.Config.Resources.Pipelines["pipeline5"].Target)
assert.Equal(t, "foobar", b.Config.Resources.Pipelines["pipeline6"].Target)
assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Target)
for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Schema)
}
}

View File

@ -78,6 +78,8 @@ func Initialize() bundle.Mutator {
mutator.MergePipelineClusters(), mutator.MergePipelineClusters(),
mutator.MergeApps(), mutator.MergeApps(),
mutator.CaptureSchemaDependency(),
// Provide permission config errors & warnings after initializing all variables // Provide permission config errors & warnings after initializing all variables
permissions.PermissionDiagnostics(), permissions.PermissionDiagnostics(),
mutator.SetRunAs(), mutator.SetRunAs(),