Move catalog/schema preset logic to a separate module

This commit is contained in:
Lennart Kats 2024-12-19 22:57:38 +01:00
parent 6d63b319d2
commit feb23ddefb
No known key found for this signature in database
GPG Key ID: 1EB8B57673197023
5 changed files with 722 additions and 627 deletions

View File

@ -2,21 +2,16 @@ package mutator
import ( import (
"context" "context"
"fmt"
"os"
"path" "path"
"regexp"
"slices" "slices"
"sort" "sort"
"strings" "strings"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/dbr" "github.com/databricks/cli/libs/dbr"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/textutil" "github.com/databricks/cli/libs/textutil"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
@ -27,6 +22,7 @@ type applyPresets struct{}
// Apply all presets, e.g. the prefix presets that // Apply all presets, e.g. the prefix presets that
// adds a prefix to all names of all resources. // adds a prefix to all names of all resources.
// Note the catalog/schema presets are applied in ApplyPresetsCatalogSchema.
func ApplyPresets() *applyPresets { func ApplyPresets() *applyPresets {
return &applyPresets{} return &applyPresets{}
} }
@ -43,9 +39,6 @@ func (m *applyPresets) Name() string {
func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
var diags diag.Diagnostics var diags diag.Diagnostics
if d := validateCatalogAndSchema(b); d != nil {
return d // fast fail since code below would fail
}
if d := validatePauseStatus(b); d != nil { if d := validatePauseStatus(b); d != nil {
diags = diags.Extend(d) diags = diags.Extend(d)
} }
@ -56,7 +49,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
tags := toTagArray(t.Tags) tags := toTagArray(t.Tags)
// Jobs presets. // Jobs presets.
// Supported: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus, Catalog, Schema // Supported: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus
for key, j := range r.Jobs { for key, j := range r.Jobs {
if j.JobSettings == nil { if j.JobSettings == nil {
diags = diags.Extend(diag.Errorf("job %s is not defined", key)) diags = diags.Extend(diag.Errorf("job %s is not defined", key))
@ -90,25 +83,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
j.Trigger.PauseStatus = paused j.Trigger.PauseStatus = paused
} }
} }
if t.Catalog != "" || t.Schema != "" {
for _, task := range j.Tasks {
if task.DbtTask != nil {
if task.DbtTask.Catalog == "" {
task.DbtTask.Catalog = t.Catalog
}
if task.DbtTask.Schema == "" {
task.DbtTask.Schema = t.Schema
}
}
}
diags = diags.Extend(addCatalogSchemaParameters(b, key, j, t))
diags = diags.Extend(recommendCatalogSchemaUsage(b, ctx, key, j))
}
} }
// Pipelines presets. // Pipelines presets.
// Supported: Prefix, PipelinesDevelopment, Catalog, Schema // Supported: Prefix, PipelinesDevelopment
// Not supported: Tags (as of 2024-10 not in pipelines API) // Not supported: Tags (as of 2024-10 not in pipelines API)
for key, p := range r.Pipelines { for key, p := range r.Pipelines {
if p.PipelineSpec == nil { if p.PipelineSpec == nil {
@ -122,62 +100,6 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
if t.TriggerPauseStatus == config.Paused { if t.TriggerPauseStatus == config.Paused {
p.Continuous = false p.Continuous = false
} }
if t.Catalog != "" && t.Schema != "" {
if p.Catalog == "" {
p.Catalog = t.Catalog
}
if p.Target == "" {
p.Target = t.Schema
}
if p.GatewayDefinition != nil {
if p.GatewayDefinition.GatewayStorageCatalog == "" {
p.GatewayDefinition.GatewayStorageCatalog = t.Catalog
}
if p.GatewayDefinition.GatewayStorageSchema == "" {
p.GatewayDefinition.GatewayStorageSchema = t.Schema
}
}
if p.IngestionDefinition != nil {
for _, obj := range p.IngestionDefinition.Objects {
if obj.Report != nil {
if obj.Report.DestinationCatalog == "" {
obj.Report.DestinationCatalog = t.Catalog
}
if obj.Report.DestinationSchema == "" {
obj.Report.DestinationSchema = t.Schema
}
}
if obj.Schema != nil {
if obj.Schema.SourceCatalog == "" {
obj.Schema.SourceCatalog = t.Catalog
}
if obj.Schema.SourceSchema == "" {
obj.Schema.SourceSchema = t.Schema
}
if obj.Schema.DestinationCatalog == "" {
obj.Schema.DestinationCatalog = t.Catalog
}
if obj.Schema.DestinationSchema == "" {
obj.Schema.DestinationSchema = t.Schema
}
}
if obj.Table != nil {
if obj.Table.SourceCatalog == "" {
obj.Table.SourceCatalog = t.Catalog
}
if obj.Table.SourceSchema == "" {
obj.Table.SourceSchema = t.Schema
}
if obj.Table.DestinationCatalog == "" {
obj.Table.DestinationCatalog = t.Catalog
}
if obj.Table.DestinationSchema == "" {
obj.Table.DestinationSchema = t.Schema
}
}
}
}
}
} }
// Models presets // Models presets
@ -229,7 +151,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
} }
// Model serving endpoint presets // Model serving endpoint presets
// Supported: Prefix, Catalog, Schema // Supported: Prefix
// Not supported: Tags (not in API as of 2024-10) // Not supported: Tags (not in API as of 2024-10)
for key, e := range r.ModelServingEndpoints { for key, e := range r.ModelServingEndpoints {
if e.CreateServingEndpoint == nil { if e.CreateServingEndpoint == nil {
@ -237,44 +159,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
continue continue
} }
e.Name = normalizePrefix(prefix) + e.Name e.Name = normalizePrefix(prefix) + e.Name
if t.Catalog != "" || t.Schema != "" {
// Apply catalog & schema to inference table config if not set
if e.CreateServingEndpoint.AiGateway != nil && e.CreateServingEndpoint.AiGateway.InferenceTableConfig != nil {
if t.Catalog != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName == "" {
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName = t.Catalog
}
if t.Schema != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName == "" {
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName = t.Schema
}
}
// Apply catalog & schema to auto capture config if not set
if e.CreateServingEndpoint.Config.AutoCaptureConfig != nil {
if t.Catalog != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName == "" {
e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName = t.Catalog
}
if t.Schema != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName == "" {
e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName = t.Schema
}
}
// Fully qualify served entities and models if they are not already qualified
for i := range e.CreateServingEndpoint.Config.ServedEntities {
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName = fullyQualifyName(
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName, t.Catalog, t.Schema,
)
}
for i := range e.CreateServingEndpoint.Config.ServedModels {
e.CreateServingEndpoint.Config.ServedModels[i].ModelName = fullyQualifyName(
e.CreateServingEndpoint.Config.ServedModels[i].ModelName, t.Catalog, t.Schema,
)
}
}
} }
// Registered models presets // Registered models presets
// Supported: Prefix, Catalog, Schema // Supported: Prefix
// Not supported: Tags (not in API as of 2024-10) // Not supported: Tags (not in API as of 2024-10)
for key, m := range r.RegisteredModels { for key, m := range r.RegisteredModels {
if m.CreateRegisteredModelRequest == nil { if m.CreateRegisteredModelRequest == nil {
@ -282,16 +170,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
continue continue
} }
m.Name = normalizePrefix(prefix) + m.Name m.Name = normalizePrefix(prefix) + m.Name
if t.Catalog != "" && m.CatalogName == "" {
m.CatalogName = t.Catalog
}
if t.Schema != "" && m.SchemaName == "" {
m.SchemaName = t.Schema
}
} }
// Quality monitors presets // Quality monitors presets
// Supported: Schedule, Catalog, Schema // Supported: Schedule
// Not supported: Tags (not in API as of 2024-10) // Not supported: Tags (not in API as of 2024-10)
for key, q := range r.QualityMonitors { for key, q := range r.QualityMonitors {
if q.CreateMonitor == nil { if q.CreateMonitor == nil {
@ -306,15 +188,9 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
q.Schedule = nil q.Schedule = nil
} }
} }
if t.Catalog != "" && t.Schema != "" {
q.TableName = fullyQualifyName(q.TableName, t.Catalog, t.Schema)
if q.OutputSchemaName == "" {
q.OutputSchemaName = t.Catalog + "." + t.Schema
}
}
} }
// Schemas: Prefix, Catalog, Schema // Schemas: Prefix only
// Not supported: Tags (as of 2024-10, only supported in Databricks UI / SQL API) // Not supported: Tags (as of 2024-10, only supported in Databricks UI / SQL API)
for key, s := range r.Schemas { for key, s := range r.Schemas {
if s.CreateSchema == nil { if s.CreateSchema == nil {
@ -322,18 +198,9 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
continue continue
} }
s.Name = normalizePrefix(prefix) + s.Name s.Name = normalizePrefix(prefix) + s.Name
if t.Catalog != "" && s.CatalogName == "" {
s.CatalogName = t.Catalog
}
if t.Schema != "" && s.Name == "" {
// If there is a schema preset such as 'dev', we directly
// use that name and don't add any prefix (which might result in dev_dev).
s.Name = t.Schema
}
} }
// Clusters: Prefix, Tags // Clusters: Prefix, Tags
// Not supported: Catalog / Schema (not applicable)
for key, c := range r.Clusters { for key, c := range r.Clusters {
if c.ClusterSpec == nil { if c.ClusterSpec == nil {
diags = diags.Extend(diag.Errorf("cluster %s is not defined", key)) diags = diags.Extend(diag.Errorf("cluster %s is not defined", key))
@ -385,6 +252,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
return diags return diags
} }
// validatePauseStatus checks the user-provided pause status is valid.
func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics { func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics {
p := b.Config.Presets.TriggerPauseStatus p := b.Config.Presets.TriggerPauseStatus
if p == "" || p == config.Paused || p == config.Unpaused { if p == "" || p == config.Paused || p == config.Unpaused {
@ -397,20 +265,8 @@ func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics {
}} }}
} }
func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics {
p := b.Config.Presets
if (p.Catalog != "" && p.Schema == "") || (p.Catalog == "" && p.Schema != "") {
return diag.Diagnostics{{
Summary: "presets.catalog and presets.schema must always be set together",
Severity: diag.Error,
Locations: []dyn.Location{b.Config.GetLocation("presets")},
}}
}
return nil
}
// toTagArray converts a map of tags to an array of tags. // toTagArray converts a map of tags to an array of tags.
// We sort tags so ensure stable ordering. // We sort tags to ensure stable ordering.
func toTagArray(tags map[string]string) []Tag { func toTagArray(tags map[string]string) []Tag {
var tagArray []Tag var tagArray []Tag
if tags == nil { if tags == nil {
@ -440,151 +296,3 @@ func normalizePrefix(prefix string) string {
return textutil.NormalizeString(prefix) + suffix return textutil.NormalizeString(prefix) + suffix
} }
// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist.
// Returns any warning diagnostics for existing parameters.
func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, t config.Presets) diag.Diagnostics {
var diags diag.Diagnostics
// Check for existing catalog/schema parameters
hasCatalog := false
hasSchema := false
if job.Parameters != nil {
for _, param := range job.Parameters {
if param.Name == "catalog" {
hasCatalog = true
diags = diags.Extend(diag.Diagnostics{{
Summary: fmt.Sprintf("job %s already has 'catalog' parameter defined; ignoring preset value", key),
Severity: diag.Warning,
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
}})
}
if param.Name == "schema" {
hasSchema = true
diags = diags.Extend(diag.Diagnostics{{
Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key),
Severity: diag.Warning,
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
}})
}
}
}
// Initialize parameters if nil
if job.Parameters == nil {
job.Parameters = []jobs.JobParameterDefinition{}
}
// Add catalog parameter if not already present
if !hasCatalog && t.Catalog != "" {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "catalog",
Default: t.Catalog,
})
}
// Add schema parameter if not already present
if !hasSchema && t.Schema != "" {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "schema",
Default: t.Schema,
})
}
return diags
}
func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key string, job *resources.Job) diag.Diagnostics {
var diags diag.Diagnostics
for _, t := range job.Tasks {
var relPath string
var expected string
var fix string
if t.NotebookTask != nil {
relPath = t.NotebookTask.NotebookPath
expected = `dbutils.widgets.text\(['"]schema|` +
`USE[^)]+schema`
fix = " dbutils.widgets.text('catalog')\n" +
" dbutils.widgets.text('schema')\n" +
" catalog = dbutils.widgets.get('catalog')\n" +
" schema = dbutils.widgets.get('schema')\n" +
" spark.sql(f'USE {catalog}.{schema}')\n"
} else if t.SparkPythonTask != nil {
relPath = t.SparkPythonTask.PythonFile
expected = `add_argument\(['"]--catalog'|` +
`USE[^)]+catalog`
fix = " def main():\n" +
" parser = argparse.ArgumentParser()\n" +
" parser.add_argument('--catalog', required=True)\n" +
" parser.add_argument('--schema', '-s', required=True)\n" +
" args, unknown = parser.parse_known_args()\n" +
" spark.sql(f\"USE {args.catalog}.{args.schema}\")\n"
} else if t.SqlTask != nil && t.SqlTask.File != nil {
relPath = t.SqlTask.File.Path
expected = `:schema|\{\{schema\}\}`
fix = " USE CATALOG {{catalog}};\n" +
" USE IDENTIFIER({schema});\n"
} else {
continue
}
sourceDir, err := b.Config.GetLocation("resources.jobs." + key).Directory()
if err != nil {
continue
}
localPath, _, err := GetLocalPath(ctx, b, sourceDir, relPath)
if err != nil || localPath == "" {
// We ignore errors (they're reported by another mutator)
// and ignore empty local paths (which means we'd have to download the file)
continue
}
if !fileIncludesPattern(ctx, localPath, expected) {
diags = diags.Extend(diag.Diagnostics{{
Summary: "Use the 'catalog' and 'schema' parameters provided via 'presets.catalog' and 'presets.schema' using\n\n" + fix,
Severity: diag.Recommendation,
Locations: []dyn.Location{{
File: localPath,
Line: 1,
Column: 1,
}},
}})
}
}
return diags
}
// fullyQualifyName checks if the given name is already qualified with a catalog and schema.
// If not, and both catalog and schema are available, it prefixes the name with catalog.schema.
// If name is empty, returns name as-is.
func fullyQualifyName(name, catalog, schema string) string {
if name == "" || catalog == "" || schema == "" {
return name
}
// If it's already qualified (contains at least two '.'), we assume it's fully qualified.
parts := strings.Split(name, ".")
if len(parts) >= 3 {
// Already fully qualified
return name
}
// Otherwise, fully qualify it
return fmt.Sprintf("%s.%s.%s", catalog, schema, name)
}
func fileIncludesPattern(ctx context.Context, filePath string, expected string) bool {
content, err := os.ReadFile(filePath)
if err != nil {
log.Warnf(ctx, "failed to check file %s: %v", filePath, err)
return true
}
matched, err := regexp.MatchString(expected, string(content))
if err != nil {
log.Warnf(ctx, "failed to check pattern in %s: %v", filePath, err)
return true
}
return matched
}

View File

@ -0,0 +1,364 @@
package mutator
import (
"context"
"fmt"
"os"
"regexp"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
type applyPresetsCatalogSchema struct{}
// ApplyPresetsCatalogSchema applies catalog and schema presets to bundle resources.
func ApplyPresetsCatalogSchema() *applyPresetsCatalogSchema {
return &applyPresetsCatalogSchema{}
}
func (m *applyPresetsCatalogSchema) Name() string {
return "ApplyPresetsCatalogSchema"
}
func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
diags := validateCatalogAndSchema(b)
if diags.HasError() {
return diags
}
r := b.Config.Resources
p := b.Config.Presets
// Jobs
for key, j := range r.Jobs {
if j.JobSettings == nil {
continue
}
if p.Catalog != "" || p.Schema != "" {
for _, task := range j.Tasks {
if task.DbtTask != nil {
if task.DbtTask.Catalog == "" {
task.DbtTask.Catalog = p.Catalog
}
if task.DbtTask.Schema == "" {
task.DbtTask.Schema = p.Schema
}
}
}
diags = diags.Extend(addCatalogSchemaParameters(b, key, j, p))
diags = diags.Extend(recommendCatalogSchemaUsage(b, ctx, key, j))
}
}
// Pipelines
for _, pl := range r.Pipelines {
if pl.PipelineSpec == nil {
continue
}
if p.Catalog != "" && p.Schema != "" {
if pl.Catalog == "" {
pl.Catalog = p.Catalog
}
if pl.Target == "" {
pl.Target = p.Schema
}
if pl.GatewayDefinition != nil {
if pl.GatewayDefinition.GatewayStorageCatalog == "" {
pl.GatewayDefinition.GatewayStorageCatalog = p.Catalog
}
if pl.GatewayDefinition.GatewayStorageSchema == "" {
pl.GatewayDefinition.GatewayStorageSchema = p.Schema
}
}
if pl.IngestionDefinition != nil {
for _, obj := range pl.IngestionDefinition.Objects {
if obj.Report != nil {
if obj.Report.DestinationCatalog == "" {
obj.Report.DestinationCatalog = p.Catalog
}
if obj.Report.DestinationSchema == "" {
obj.Report.DestinationSchema = p.Schema
}
}
if obj.Schema != nil {
if obj.Schema.SourceCatalog == "" {
obj.Schema.SourceCatalog = p.Catalog
}
if obj.Schema.SourceSchema == "" {
obj.Schema.SourceSchema = p.Schema
}
if obj.Schema.DestinationCatalog == "" {
obj.Schema.DestinationCatalog = p.Catalog
}
if obj.Schema.DestinationSchema == "" {
obj.Schema.DestinationSchema = p.Schema
}
}
if obj.Table != nil {
if obj.Table.SourceCatalog == "" {
obj.Table.SourceCatalog = p.Catalog
}
if obj.Table.SourceSchema == "" {
obj.Table.SourceSchema = p.Schema
}
if obj.Table.DestinationCatalog == "" {
obj.Table.DestinationCatalog = p.Catalog
}
if obj.Table.DestinationSchema == "" {
obj.Table.DestinationSchema = p.Schema
}
}
}
}
}
}
// Model serving endpoints
for _, e := range r.ModelServingEndpoints {
if e.CreateServingEndpoint == nil {
continue
}
if p.Catalog != "" || p.Schema != "" {
if e.CreateServingEndpoint.AiGateway != nil && e.CreateServingEndpoint.AiGateway.InferenceTableConfig != nil {
if p.Catalog != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName == "" {
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName = p.Catalog
}
if p.Schema != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName == "" {
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName = p.Schema
}
}
if e.CreateServingEndpoint.Config.AutoCaptureConfig != nil {
if p.Catalog != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName == "" {
e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName = p.Catalog
}
if p.Schema != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName == "" {
e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName = p.Schema
}
}
for i := range e.CreateServingEndpoint.Config.ServedEntities {
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName = fullyQualifyName(
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName, p.Catalog, p.Schema,
)
}
for i := range e.CreateServingEndpoint.Config.ServedModels {
e.CreateServingEndpoint.Config.ServedModels[i].ModelName = fullyQualifyName(
e.CreateServingEndpoint.Config.ServedModels[i].ModelName, p.Catalog, p.Schema,
)
}
}
}
// Registered models
for _, m := range r.RegisteredModels {
if m.CreateRegisteredModelRequest == nil {
continue
}
if p.Catalog != "" && m.CatalogName == "" {
m.CatalogName = p.Catalog
}
if p.Schema != "" && m.SchemaName == "" {
m.SchemaName = p.Schema
}
}
// Quality monitors
for _, q := range r.QualityMonitors {
if q.CreateMonitor == nil {
continue
}
if p.Catalog != "" && p.Schema != "" {
q.TableName = fullyQualifyName(q.TableName, p.Catalog, p.Schema)
if q.OutputSchemaName == "" {
q.OutputSchemaName = p.Catalog + "." + p.Schema
}
}
}
// Schemas
for _, s := range r.Schemas {
if s.CreateSchema == nil {
continue
}
if p.Catalog != "" && s.CatalogName == "" {
s.CatalogName = p.Catalog
}
if p.Schema != "" && s.Name == "" {
// If there is a schema preset such as 'dev', we directly
// use that name and don't add any prefix (which might result in dev_dev).
s.Name = p.Schema
}
}
return diags
}
func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics {
p := b.Config.Presets
if (p.Catalog != "" && p.Schema == "") || (p.Catalog == "" && p.Schema != "") {
return diag.Diagnostics{{
Summary: "presets.catalog and presets.schema must always be set together",
Severity: diag.Error,
Locations: []dyn.Location{b.Config.GetLocation("presets")},
}}
}
return diag.Diagnostics{}
}
// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist.
// Returns any warning diagnostics for existing parameters.
func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, t config.Presets) diag.Diagnostics {
var diags diag.Diagnostics
// Check for existing catalog/schema parameters
hasCatalog := false
hasSchema := false
if job.Parameters != nil {
for _, param := range job.Parameters {
if param.Name == "catalog" {
hasCatalog = true
diags = diags.Extend(diag.Diagnostics{{
Summary: fmt.Sprintf("job %s already has 'catalog' parameter defined; ignoring preset value", key),
Severity: diag.Warning,
Locations: b.Config.GetLocations("resources.jobs." + key + ".parameters"),
}})
}
if param.Name == "schema" {
hasSchema = true
diags = diags.Extend(diag.Diagnostics{{
Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key),
Severity: diag.Warning,
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
}})
}
}
}
// Initialize parameters if nil
if job.Parameters == nil {
job.Parameters = []jobs.JobParameterDefinition{}
}
// Add catalog parameter if not already present
if !hasCatalog && t.Catalog != "" {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "catalog",
Default: t.Catalog,
})
}
// Add schema parameter if not already present
if !hasSchema && t.Schema != "" {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "schema",
Default: t.Schema,
})
}
return diags
}
func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key string, job *resources.Job) diag.Diagnostics {
var diags diag.Diagnostics
for _, t := range job.Tasks {
var relPath string
var expected string
var fix string
if t.NotebookTask != nil {
relPath = t.NotebookTask.NotebookPath
expected = `dbutils.widgets.text\(['"]schema|` +
`USE[^)]+schema`
fix = " dbutils.widgets.text('catalog')\n" +
" dbutils.widgets.text('schema')\n" +
" catalog = dbutils.widgets.get('catalog')\n" +
" schema = dbutils.widgets.get('schema')\n" +
" spark.sql(f'USE {catalog}.{schema}')\n"
} else if t.SparkPythonTask != nil {
relPath = t.SparkPythonTask.PythonFile
expected = `add_argument\(['"]--catalog'|` +
`USE[^)]+catalog`
fix = " def main():\n" +
" parser = argparse.ArgumentParser()\n" +
" parser.add_argument('--catalog', required=True)\n" +
" parser.add_argument('--schema', '-s', required=True)\n" +
" args, unknown = parser.parse_known_args()\n" +
" spark.sql(f\"USE {args.catalog}.{args.schema}\")\n"
} else if t.SqlTask != nil && t.SqlTask.File != nil {
relPath = t.SqlTask.File.Path
expected = `:schema|\{\{schema\}\}`
fix = " USE CATALOG {{catalog}};\n" +
" USE IDENTIFIER({schema});\n"
} else {
continue
}
sourceDir, err := b.Config.GetLocation("resources.jobs." + key).Directory()
if err != nil {
continue
}
localPath, _, err := GetLocalPath(ctx, b, sourceDir, relPath)
if err != nil || localPath == "" {
// We ignore errors (they're reported by another mutator)
// and ignore empty local paths (which means we'd have to download the file)
continue
}
if !fileIncludesPattern(ctx, localPath, expected) {
diags = diags.Extend(diag.Diagnostics{{
Summary: "Use the 'catalog' and 'schema' parameters provided via 'presets.catalog' and 'presets.schema' using\n\n" + fix,
Severity: diag.Recommendation,
Locations: []dyn.Location{{
File: localPath,
Line: 1,
Column: 1,
}},
}})
}
}
return diags
}
// fullyQualifyName checks if the given name is already qualified with a catalog and schema.
// If not, and both catalog and schema are available, it prefixes the name with catalog.schema.
// If name is empty, returns name as-is.
func fullyQualifyName(name, catalog, schema string) string {
if name == "" || catalog == "" || schema == "" {
return name
}
// If it's already qualified (contains at least two '.'), we assume it's fully qualified.
parts := strings.Split(name, ".")
if len(parts) >= 3 {
// Already fully qualified
return name
}
// Otherwise, fully qualify it
return fmt.Sprintf("%s.%s.%s", catalog, schema, name)
}
func fileIncludesPattern(ctx context.Context, filePath string, expected string) bool {
content, err := os.ReadFile(filePath)
if err != nil {
log.Warnf(ctx, "failed to check file %s: %v", filePath, err)
return true
}
matched, err := regexp.MatchString(expected, string(content))
if err != nil {
log.Warnf(ctx, "failed to check pattern in %s: %v", filePath, err)
return true
}
return matched
}

View File

@ -0,0 +1,348 @@
package mutator_test
import (
"context"
"reflect"
"strings"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/databricks/databricks-sdk-go/service/serving"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type recordedField struct {
Path dyn.Path
PathString string
Placeholder string
Expected string
}
func mockPresetsCatalogSchema() *bundle.Bundle {
return &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"key": {
JobSettings: &jobs.JobSettings{
Name: "job",
Parameters: []jobs.JobParameterDefinition{
{Name: "catalog", Default: "<catalog>"},
{Name: "schema", Default: "<schema>"},
},
Tasks: []jobs.Task{
{
DbtTask: &jobs.DbtTask{
Catalog: "<catalog>",
Schema: "<schema>",
},
},
{
SparkPythonTask: &jobs.SparkPythonTask{
PythonFile: "/file",
},
},
{
NotebookTask: &jobs.NotebookTask{
NotebookPath: "/notebook",
},
},
},
},
},
},
Pipelines: map[string]*resources.Pipeline{
"key": {
PipelineSpec: &pipelines.PipelineSpec{
Name: "pipeline",
Catalog: "<catalog>",
Target: "<schema>",
GatewayDefinition: &pipelines.IngestionGatewayPipelineDefinition{
GatewayStorageCatalog: "<catalog>",
GatewayStorageSchema: "<schema>",
},
IngestionDefinition: &pipelines.IngestionPipelineDefinition{
Objects: []pipelines.IngestionConfig{
{
Report: &pipelines.ReportSpec{
DestinationCatalog: "<catalog>",
DestinationSchema: "<schema>",
},
Schema: &pipelines.SchemaSpec{
SourceCatalog: "<catalog>",
SourceSchema: "<schema>",
DestinationCatalog: "<catalog>",
DestinationSchema: "<schema>",
},
Table: &pipelines.TableSpec{
SourceCatalog: "<catalog>",
SourceSchema: "<schema>",
DestinationCatalog: "<catalog>",
DestinationSchema: "<schema>",
},
},
},
},
},
},
},
ModelServingEndpoints: map[string]*resources.ModelServingEndpoint{
"key": {
CreateServingEndpoint: &serving.CreateServingEndpoint{
Name: "serving",
AiGateway: &serving.AiGatewayConfig{
InferenceTableConfig: &serving.AiGatewayInferenceTableConfig{
CatalogName: "<catalog>",
SchemaName: "<schema>",
},
},
Config: serving.EndpointCoreConfigInput{
AutoCaptureConfig: &serving.AutoCaptureConfigInput{
CatalogName: "<catalog>",
SchemaName: "<schema>",
},
ServedEntities: []serving.ServedEntityInput{
{EntityName: "<catalog>.<schema>.entity"},
},
ServedModels: []serving.ServedModelInput{
{ModelName: "<catalog>.<schema>.model"},
},
},
},
},
},
RegisteredModels: map[string]*resources.RegisteredModel{
"key": {
CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{
Name: "registered_model",
CatalogName: "<catalog>",
SchemaName: "<schema>",
},
},
},
QualityMonitors: map[string]*resources.QualityMonitor{
"key": {
TableName: "<catalog>.<schema>.table",
CreateMonitor: &catalog.CreateMonitor{
OutputSchemaName: "<catalog>.<schema>",
},
},
},
Schemas: map[string]*resources.Schema{
"key": {
CreateSchema: &catalog.CreateSchema{
Name: "<schema>",
CatalogName: "<catalog>",
},
},
},
},
Presets: config.Presets{
Catalog: "my_catalog",
Schema: "my_schema",
},
},
}
}
// ignoredFields are fields that should be ignored in the completeness check
var ignoredFields = map[string]string{
"resources.pipelines.key.schema": "schema is still in private preview",
"resources.jobs.key.tasks[0].notebook_task.base_parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].python_wheel_task.named_parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].python_wheel_task.parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.job_parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].spark_jar_task.parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].spark_python_task.parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].spark_submit_task.parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].sql_task.parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.jar_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.notebook_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.pipeline_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.python_named_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.python_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.spark_submit_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.sql_params": "catalog/schema are passed via job parameters",
"resources.pipelines.key.ingestion_definition.objects[0].schema": "schema name is under schema.source_schema/destination_schema",
"resources.schemas": "schema name of schemas is under resources.schemas.key.Name",
}
func TestApplyPresetsCatalogSchemaWhenAlreadySet(t *testing.T) {
b := mockPresetsCatalogSchema()
recordedFields := recordPlaceholderFields(t, b)
diags := bundle.Apply(context.Background(), b, mutator.ApplyPresets())
require.NoError(t, diags.Error())
for _, f := range recordedFields {
val, err := dyn.GetByPath(b.Config.Value(), f.Path)
require.NoError(t, err, "failed to get path %s", f.Path)
require.Equal(t, f.Placeholder, val.MustString(),
"expected placeholder '%s' at %s to remain unchanged before cleanup", f.Placeholder, f.Path)
}
}
func TestApplyPresetsCatalogSchemaWhenNotSet(t *testing.T) {
b := mockPresetsCatalogSchema()
recordedFields := recordPlaceholderFields(t, b)
// Set all catalog/schema fields to empty strings / nil
require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) {
for _, f := range recordedFields {
value, err := dyn.GetByPath(root, f.Path)
require.NoError(t, err)
val := value.MustString()
cleanedVal := removePlaceholders(val)
root, err = dyn.SetByPath(root, f.Path, dyn.V(cleanedVal))
require.NoError(t, err)
}
return dyn.Set(root, "resources.jobs.key.parameters", dyn.NilValue)
}))
// Apply catalog/schema presets
diags := bundle.Apply(context.Background(), b, mutator.ApplyPresetsCatalogSchema())
require.NoError(t, diags.Error())
// Verify that all catalog/schema fields have been set to the presets
for _, f := range recordedFields {
val, err := dyn.GetByPath(b.Config.Value(), f.Path)
require.NoError(t, err, "could not find expected field(s) at %s", f.Path)
assert.Equal(t, f.Expected, val.MustString(), "preset value expected for %s based on placeholder %s", f.Path, f.Placeholder)
}
}
func TestApplyPresetsCatalogSchemaCompleteness(t *testing.T) {
b := mockPresetsCatalogSchema()
recordedFields := recordPlaceholderFields(t, b)
// Convert the recordedFields to a set for easier lookup
recordedPaths := make(map[string]struct{})
for _, field := range recordedFields {
recordedPaths[field.PathString] = struct{}{}
if i := strings.Index(field.PathString, "["); i >= 0 {
// For entries like resources.jobs.key.parameters[1].default, just add resources.jobs.key.parameters
recordedPaths[field.PathString[:i]] = struct{}{}
}
}
// Find all catalog/schema fields that we think should be covered based
// on all properties in config.Resources.
expectedFields := findCatalogSchemaFields()
assert.GreaterOrEqual(t, len(expectedFields), 42, "expected at least 42 catalog/schema fields, but got %d", len(expectedFields))
// Verify that all expected fields are there
for _, field := range expectedFields {
if _, recorded := recordedPaths[field]; !recorded {
if _, ignored := ignoredFields[field]; !ignored {
t.Errorf("Field %s was not included in the catalog/schema presets test. If this is a new field, please add it to PresetsMock or PresetsIgnoredFields and add support for it as appropriate.", field)
}
}
}
}
// recordPlaceholderFields scans the config and records all fields containing catalog/schema placeholders
func recordPlaceholderFields(t *testing.T, b *bundle.Bundle) []recordedField {
t.Helper()
var recordedFields []recordedField
err := b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) {
_, err := dyn.Walk(b.Config.Value(), func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
if v.Kind() == dyn.KindString {
val := v.MustString()
if strings.Contains(val, "<catalog>") || strings.Contains(val, "<schema>") {
pathCopy := make(dyn.Path, len(p))
copy(pathCopy, p)
recordedFields = append(recordedFields, recordedField{
Path: pathCopy,
PathString: pathCopy.String(),
Placeholder: val,
Expected: replacePlaceholders(val, "my_catalog", "my_schema"),
})
}
}
return v, nil
})
return root, err
})
require.NoError(t, err)
return recordedFields
}
// findCatalogSchemaFields finds all fields in config.Resources that might refer
// to a catalog or schema. Returns a slice of field paths.
func findCatalogSchemaFields() []string {
visited := make(map[reflect.Type]struct{})
var results []string
// verifyTypeFields is a recursive function to verify the fields of a given type
var walkTypeFields func(rt reflect.Type, path string)
walkTypeFields = func(rt reflect.Type, path string) {
if _, seen := visited[rt]; seen {
return
}
visited[rt] = struct{}{}
switch rt.Kind() {
case reflect.Slice, reflect.Array:
walkTypeFields(rt.Elem(), path+"[0]")
case reflect.Map:
walkTypeFields(rt.Elem(), path+".key")
case reflect.Ptr:
walkTypeFields(rt.Elem(), path)
case reflect.Struct:
for i := 0; i < rt.NumField(); i++ {
ft := rt.Field(i)
jsonTag := ft.Tag.Get("json")
if jsonTag == "" || jsonTag == "-" {
// Ignore field names when there's no JSON tag, e.g. for Jobs.JobSettings
walkTypeFields(ft.Type, path)
continue
}
fieldName := strings.Split(jsonTag, ",")[0]
fieldPath := path + "." + fieldName
if isCatalogOrSchemaField(fieldName) {
results = append(results, fieldPath)
}
walkTypeFields(ft.Type, fieldPath)
}
}
}
var r config.Resources
walkTypeFields(reflect.TypeOf(r), "resources")
return results
}
// isCatalogOrSchemaField returns true for a field names in config.Resources that we suspect could contain a catalog or schema name
func isCatalogOrSchemaField(name string) bool {
return strings.Contains(name, "catalog") ||
strings.Contains(name, "schema") ||
strings.Contains(name, "parameters") ||
strings.Contains(name, "params")
}
func removePlaceholders(value string) string {
value = strings.ReplaceAll(value, "<catalog>.", "")
value = strings.ReplaceAll(value, "<schema>.", "")
value = strings.ReplaceAll(value, "<catalog>", "")
value = strings.ReplaceAll(value, "<schema>", "")
return value
}
func replacePlaceholders(placeholder, catalog, schema string) string {
expected := strings.ReplaceAll(placeholder, "<catalog>", catalog)
expected = strings.ReplaceAll(expected, "<schema>", schema)
return expected
}

View File

@ -2,9 +2,7 @@ package mutator_test
import ( import (
"context" "context"
"reflect"
"runtime" "runtime"
"strings"
"testing" "testing"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
@ -16,19 +14,9 @@ import (
"github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/databricks/databricks-sdk-go/service/serving"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
type recordedField struct {
Path dyn.Path
PathString string
Placeholder string
Expected string
}
func TestApplyPresetsPrefix(t *testing.T) { func TestApplyPresetsPrefix(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -465,317 +453,3 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) {
} }
} }
func PresetsMock() *bundle.Bundle {
return &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"key": {
JobSettings: &jobs.JobSettings{
Name: "job",
Parameters: []jobs.JobParameterDefinition{
{Name: "catalog", Default: "<catalog>"},
{Name: "schema", Default: "<schema>"},
},
Tasks: []jobs.Task{
{
DbtTask: &jobs.DbtTask{
Catalog: "<catalog>",
Schema: "<schema>",
},
},
{
SparkPythonTask: &jobs.SparkPythonTask{
PythonFile: "/file",
},
},
{
NotebookTask: &jobs.NotebookTask{
NotebookPath: "/notebook",
},
},
},
},
},
},
Pipelines: map[string]*resources.Pipeline{
"key": {
PipelineSpec: &pipelines.PipelineSpec{
Name: "pipeline",
Catalog: "<catalog>",
Target: "<schema>",
GatewayDefinition: &pipelines.IngestionGatewayPipelineDefinition{
GatewayStorageCatalog: "<catalog>",
GatewayStorageSchema: "<schema>",
},
IngestionDefinition: &pipelines.IngestionPipelineDefinition{
Objects: []pipelines.IngestionConfig{
{
Report: &pipelines.ReportSpec{
DestinationCatalog: "<catalog>",
DestinationSchema: "<schema>",
},
Schema: &pipelines.SchemaSpec{
SourceCatalog: "<catalog>",
SourceSchema: "<schema>",
DestinationCatalog: "<catalog>",
DestinationSchema: "<schema>",
},
Table: &pipelines.TableSpec{
SourceCatalog: "<catalog>",
SourceSchema: "<schema>",
DestinationCatalog: "<catalog>",
DestinationSchema: "<schema>",
},
},
},
},
},
},
},
ModelServingEndpoints: map[string]*resources.ModelServingEndpoint{
"key": {
CreateServingEndpoint: &serving.CreateServingEndpoint{
Name: "serving",
AiGateway: &serving.AiGatewayConfig{
InferenceTableConfig: &serving.AiGatewayInferenceTableConfig{
CatalogName: "<catalog>",
SchemaName: "<schema>",
},
},
Config: serving.EndpointCoreConfigInput{
AutoCaptureConfig: &serving.AutoCaptureConfigInput{
CatalogName: "<catalog>",
SchemaName: "<schema>",
},
ServedEntities: []serving.ServedEntityInput{
{EntityName: "<catalog>.<schema>.entity"},
},
ServedModels: []serving.ServedModelInput{
{ModelName: "<catalog>.<schema>.model"},
},
},
},
},
},
RegisteredModels: map[string]*resources.RegisteredModel{
"key": {
CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{
Name: "registered_model",
CatalogName: "<catalog>",
SchemaName: "<schema>",
},
},
},
QualityMonitors: map[string]*resources.QualityMonitor{
"key": {
TableName: "<catalog>.<schema>.table",
CreateMonitor: &catalog.CreateMonitor{
OutputSchemaName: "<catalog>.<schema>",
},
},
},
Schemas: map[string]*resources.Schema{
"key": {
CreateSchema: &catalog.CreateSchema{
Name: "<schema>",
CatalogName: "<catalog>",
},
},
},
},
},
}
}
// Any fields that should be ignored in the completeness check
var PresetsIgnoredFields = map[string]string{
"resources.pipelines.key.schema": "schema is still in private preview",
"resources.jobs.key.tasks[0].notebook_task.base_parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].python_wheel_task.named_parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].python_wheel_task.parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.job_parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].spark_jar_task.parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].spark_python_task.parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].spark_submit_task.parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].sql_task.parameters": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.jar_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.notebook_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.pipeline_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.python_named_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.python_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.spark_submit_params": "catalog/schema are passed via job parameters",
"resources.jobs.key.tasks[0].run_job_task.sql_params": "catalog/schema are passed via job parameters",
"resources.pipelines.key.ingestion_definition.objects[0].schema": "schema name is under schema.source_schema/destination_schema",
"resources.schemas": "schema name of schemas is under resources.schemas.key.Name",
}
func TestApplyPresetsCatalogSchema(t *testing.T) {
b := PresetsMock()
b.Config.Presets = config.Presets{
Catalog: "my_catalog",
Schema: "my_schema",
}
ctx := context.Background()
// Initial scan: record all fields that contain placeholders.
// We do this before the first apply so we can verify no changes occur.
var recordedFields []recordedField
require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) {
_, err := dyn.Walk(b.Config.Value(), func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
if v.Kind() == dyn.KindString {
val := v.MustString()
if strings.Contains(val, "<catalog>") || strings.Contains(val, "<schema>") {
pathCopy := make(dyn.Path, len(p))
copy(pathCopy, p)
recordedFields = append(recordedFields, recordedField{
Path: pathCopy,
PathString: pathCopy.String(),
Placeholder: val,
Expected: replacePlaceholders(val, "my_catalog", "my_schema"),
})
}
}
return v, nil
})
return root, err
}))
// Convert the recordedFields to a set for easier lookup
recordedSet := make(map[string]struct{})
for _, field := range recordedFields {
recordedSet[field.PathString] = struct{}{}
if i := strings.Index(field.PathString, "["); i >= 0 {
// For entries like resources.jobs.key.parameters[1].default, just add resources.jobs.key.parameters
recordedSet[field.PathString[:i]] = struct{}{}
}
}
// Stage 1: Apply presets before cleanup, should be no-op.
diags := bundle.Apply(ctx, b, mutator.ApplyPresets())
require.False(t, diags.HasError(), "unexpected error before cleanup: %v", diags.Error())
// Verify that no recorded fields changed
verifyNoChangesBeforeCleanup(t, b.Config.Value(), recordedFields)
// Stage 2: Cleanup: Walk over rootVal and remove placeholders, adjusting recordedFields Expected values.
require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) {
for _, f := range recordedFields {
value, err := dyn.GetByPath(root, f.Path)
require.NoError(t, err)
val := value.MustString()
cleanedVal := removePlaceholders(val)
root, err = dyn.SetByPath(root, f.Path, dyn.V(cleanedVal))
require.NoError(t, err)
}
root, err := dyn.Set(root, "resources.jobs.key.parameters", dyn.NilValue)
require.NoError(t, err)
return root, nil
}))
// Stage 3: Apply presets after cleanup.
diags = bundle.Apply(ctx, b, mutator.ApplyPresets())
require.False(t, diags.HasError(), "unexpected error after cleanup: %v", diags.Error())
// Verify that fields have the expected replacements
config := b.Config.Value()
for _, f := range recordedFields {
val, err := dyn.GetByPath(config, f.Path)
require.NoError(t, err, "failed to get path %s", f.Path)
assert.Equal(t, f.Expected, val.MustString(), "preset value expected for %s based on placeholder %s", f.Path, f.Placeholder)
}
// Stage 4: Check completeness
expectedFields := findCatalogSchemaFields()
assert.GreaterOrEqual(t, len(expectedFields), 42, "expected at least 42 catalog/schema fields, but got %d", len(expectedFields))
for _, field := range expectedFields {
if _, recorded := recordedSet[field]; !recorded {
if _, ignored := PresetsIgnoredFields[field]; !ignored {
t.Errorf("Field %s was not included in the catalog/schema presets test. If this is a new field, please add it to PresetsMock or PresetsIgnoredFields and add support for it as appropriate.", field)
}
}
}
}
func verifyNoChangesBeforeCleanup(t *testing.T, rootVal dyn.Value, recordedFields []recordedField) {
t.Helper()
for _, f := range recordedFields {
val, err := dyn.GetByPath(rootVal, f.Path)
require.NoError(t, err, "failed to get path %s", f.Path)
require.Equal(t, f.Placeholder, val.MustString(),
"expected placeholder '%s' at %s to remain unchanged before cleanup", f.Placeholder, f.Path)
}
}
// findCatalogSchemaFields finds all fields in config.Resources that might refer
// to a catalog or schema. Returns a slice of field paths.
func findCatalogSchemaFields() []string {
visited := make(map[reflect.Type]struct{})
var results []string
// verifyTypeFields is a recursive function to verify the fields of a given type
var walkTypeFields func(rt reflect.Type, path string)
walkTypeFields = func(rt reflect.Type, path string) {
if _, seen := visited[rt]; seen {
return
}
visited[rt] = struct{}{}
switch rt.Kind() {
case reflect.Slice, reflect.Array:
walkTypeFields(rt.Elem(), path+"[0]")
case reflect.Map:
walkTypeFields(rt.Elem(), path+".key")
case reflect.Ptr:
walkTypeFields(rt.Elem(), path)
case reflect.Struct:
for i := 0; i < rt.NumField(); i++ {
ft := rt.Field(i)
jsonTag := ft.Tag.Get("json")
if jsonTag == "" || jsonTag == "-" {
// Ignore field names when there's no JSON tag, e.g. for Jobs.JobSettings
walkTypeFields(ft.Type, path)
continue
}
fieldName := strings.Split(jsonTag, ",")[0]
fieldPath := path + "." + fieldName
if isCatalogOrSchemaField(fieldName) {
results = append(results, fieldPath)
}
walkTypeFields(ft.Type, fieldPath)
}
}
}
var r config.Resources
walkTypeFields(reflect.TypeOf(r), "resources")
return results
}
// isCatalogOrSchemaField returns true for a field names in config.Resources that we suspect could contain a catalog or schema name
func isCatalogOrSchemaField(name string) bool {
return strings.Contains(name, "catalog") ||
strings.Contains(name, "schema") ||
strings.Contains(name, "parameters") ||
strings.Contains(name, "params")
}
func removePlaceholders(value string) string {
value = strings.ReplaceAll(value, "<catalog>.", "")
value = strings.ReplaceAll(value, "<schema>.", "")
value = strings.ReplaceAll(value, "<catalog>", "")
value = strings.ReplaceAll(value, "<schema>", "")
return value
}
func replacePlaceholders(placeholder, catalog, schema string) string {
expected := strings.ReplaceAll(placeholder, "<catalog>", catalog)
expected = strings.ReplaceAll(expected, "<schema>", schema)
return expected
}

View File

@ -70,6 +70,7 @@ func Initialize() bundle.Mutator {
mutator.ConfigureDashboardDefaults(), mutator.ConfigureDashboardDefaults(),
mutator.ProcessTargetMode(), mutator.ProcessTargetMode(),
mutator.ApplyPresets(), mutator.ApplyPresets(),
mutator.ApplyPresetsCatalogSchema(),
mutator.DefaultQueueing(), mutator.DefaultQueueing(),
mutator.ExpandPipelineGlobPaths(), mutator.ExpandPipelineGlobPaths(),