databricks-cli/bundle/config/mutator/capture_schema_dependency.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

87 lines
2.4 KiB
Go
Raw Normal View History

2024-12-27 13:32:08 +00:00
package mutator
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/diag"
)
2025-01-07 10:03:29 +00:00
type captureSchemaDependency struct{}
2024-12-27 13:32:08 +00:00
// If a user defines a UC schema in the bundle, they can refer to it in DLT pipelines
// or UC Volumes using the `${resources.schemas.<schema_key>.name}` syntax. Using this
// syntax allows TF to capture the deploy time dependency this DLT pipeline or UC Volume
// has on the schema and deploy changes to the schema before deploying the pipeline or volume.
//
// This mutator translates any implicit schema references in DLT pipelines or UC Volumes
// to the explicit syntax.
2025-01-07 10:05:46 +00:00
func CaptureSchemaDependency() bundle.Mutator {
2025-01-07 10:03:29 +00:00
return &captureSchemaDependency{}
2024-12-27 13:32:08 +00:00
}
2025-01-07 10:03:29 +00:00
func (m *captureSchemaDependency) Name() string {
return "CaptureSchemaDependency"
2024-12-27 13:32:08 +00:00
}
2025-01-07 10:03:29 +00:00
func findSchema(b *bundle.Bundle, catalogName, schemaName string) (string, *resources.Schema) {
if catalogName == "" || schemaName == "" {
2024-12-27 13:32:08 +00:00
return "", nil
}
for k, s := range b.Config.Resources.Schemas {
2025-01-07 10:03:29 +00:00
if s.CreateSchema != nil && s.CatalogName == catalogName && s.Name == schemaName {
2024-12-27 13:32:08 +00:00
return k, s
}
}
return "", nil
}
func resolveVolume(v *resources.Volume, b *bundle.Bundle) {
2025-01-07 10:18:51 +00:00
if v.CreateVolumeRequestContent == nil {
return
}
2024-12-27 13:32:08 +00:00
schemaK, schema := findSchema(b, v.CatalogName, v.SchemaName)
if schema == nil {
return
}
v.SchemaName = fmt.Sprintf("${resources.schemas.%s.name}", schemaK)
}
func resolvePipeline(p *resources.Pipeline, b *bundle.Bundle) {
2025-01-07 10:18:51 +00:00
if p.PipelineSpec == nil {
return
}
2024-12-27 13:32:08 +00:00
// schema and target have the same semantics in the DLT API but are mutually
// exclusive. If schema is set, the pipeline is in direct publishing mode
2024-12-27 13:48:14 +00:00
// and can write tables to multiple schemas (vs target which is limited to a single schema).
2024-12-27 13:32:08 +00:00
schemaName := p.Schema
if schemaName == "" {
schemaName = p.Target
}
schemaK, schema := findSchema(b, p.Catalog, schemaName)
if schema == nil {
return
}
if p.Schema != "" {
p.Schema = fmt.Sprintf("${resources.schemas.%s.name}", schemaK)
} else if p.Target != "" {
p.Target = fmt.Sprintf("${resources.schemas.%s.name}", schemaK)
}
}
2025-01-07 10:03:29 +00:00
func (m *captureSchemaDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
2024-12-27 13:32:08 +00:00
for _, p := range b.Config.Resources.Pipelines {
resolvePipeline(p, b)
}
for _, v := range b.Config.Resources.Volumes {
resolveVolume(v, b)
}
return nil
}