From feb23ddefb3bcd32c8faafafa9ca15f55634c379 Mon Sep 17 00:00:00 2001 From: Lennart Kats Date: Thu, 19 Dec 2024 22:57:38 +0100 Subject: [PATCH] Move catalog/schema preset logic to a separate module --- bundle/config/mutator/apply_presets.go | 310 +-------------- .../mutator/apply_presets_catalog_schema.go | 364 ++++++++++++++++++ .../apply_presets_catalog_schema_test.go | 348 +++++++++++++++++ bundle/config/mutator/apply_presets_test.go | 326 ---------------- bundle/phases/initialize.go | 1 + 5 files changed, 722 insertions(+), 627 deletions(-) create mode 100644 bundle/config/mutator/apply_presets_catalog_schema.go create mode 100644 bundle/config/mutator/apply_presets_catalog_schema_test.go diff --git a/bundle/config/mutator/apply_presets.go b/bundle/config/mutator/apply_presets.go index 8c6415b35..14a99e41d 100644 --- a/bundle/config/mutator/apply_presets.go +++ b/bundle/config/mutator/apply_presets.go @@ -2,21 +2,16 @@ package mutator import ( "context" - "fmt" - "os" "path" - "regexp" "slices" "sort" "strings" "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/libs/dbr" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" - "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/textutil" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -27,6 +22,7 @@ type applyPresets struct{} // Apply all presets, e.g. the prefix presets that // adds a prefix to all names of all resources. +// Note the catalog/schema presets are applied in ApplyPresetsCatalogSchema. func ApplyPresets() *applyPresets { return &applyPresets{} } @@ -43,9 +39,6 @@ func (m *applyPresets) Name() string { func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { var diags diag.Diagnostics - if d := validateCatalogAndSchema(b); d != nil { - return d // fast fail since code below would fail - } if d := validatePauseStatus(b); d != nil { diags = diags.Extend(d) } @@ -56,7 +49,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos tags := toTagArray(t.Tags) // Jobs presets. - // Supported: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus, Catalog, Schema + // Supported: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus for key, j := range r.Jobs { if j.JobSettings == nil { diags = diags.Extend(diag.Errorf("job %s is not defined", key)) @@ -90,25 +83,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos j.Trigger.PauseStatus = paused } } - if t.Catalog != "" || t.Schema != "" { - for _, task := range j.Tasks { - if task.DbtTask != nil { - if task.DbtTask.Catalog == "" { - task.DbtTask.Catalog = t.Catalog - } - if task.DbtTask.Schema == "" { - task.DbtTask.Schema = t.Schema - } - } - } - - diags = diags.Extend(addCatalogSchemaParameters(b, key, j, t)) - diags = diags.Extend(recommendCatalogSchemaUsage(b, ctx, key, j)) - } } // Pipelines presets. - // Supported: Prefix, PipelinesDevelopment, Catalog, Schema + // Supported: Prefix, PipelinesDevelopment // Not supported: Tags (as of 2024-10 not in pipelines API) for key, p := range r.Pipelines { if p.PipelineSpec == nil { @@ -122,62 +100,6 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos if t.TriggerPauseStatus == config.Paused { p.Continuous = false } - if t.Catalog != "" && t.Schema != "" { - if p.Catalog == "" { - p.Catalog = t.Catalog - } - if p.Target == "" { - p.Target = t.Schema - } - if p.GatewayDefinition != nil { - if p.GatewayDefinition.GatewayStorageCatalog == "" { - p.GatewayDefinition.GatewayStorageCatalog = t.Catalog - } - if p.GatewayDefinition.GatewayStorageSchema == "" { - p.GatewayDefinition.GatewayStorageSchema = t.Schema - } - } - if p.IngestionDefinition != nil { - for _, obj := range p.IngestionDefinition.Objects { - if obj.Report != nil { - if obj.Report.DestinationCatalog == "" { - obj.Report.DestinationCatalog = t.Catalog - } - if obj.Report.DestinationSchema == "" { - obj.Report.DestinationSchema = t.Schema - } - } - if obj.Schema != nil { - if obj.Schema.SourceCatalog == "" { - obj.Schema.SourceCatalog = t.Catalog - } - if obj.Schema.SourceSchema == "" { - obj.Schema.SourceSchema = t.Schema - } - if obj.Schema.DestinationCatalog == "" { - obj.Schema.DestinationCatalog = t.Catalog - } - if obj.Schema.DestinationSchema == "" { - obj.Schema.DestinationSchema = t.Schema - } - } - if obj.Table != nil { - if obj.Table.SourceCatalog == "" { - obj.Table.SourceCatalog = t.Catalog - } - if obj.Table.SourceSchema == "" { - obj.Table.SourceSchema = t.Schema - } - if obj.Table.DestinationCatalog == "" { - obj.Table.DestinationCatalog = t.Catalog - } - if obj.Table.DestinationSchema == "" { - obj.Table.DestinationSchema = t.Schema - } - } - } - } - } } // Models presets @@ -229,7 +151,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos } // Model serving endpoint presets - // Supported: Prefix, Catalog, Schema + // Supported: Prefix // Not supported: Tags (not in API as of 2024-10) for key, e := range r.ModelServingEndpoints { if e.CreateServingEndpoint == nil { @@ -237,44 +159,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos continue } e.Name = normalizePrefix(prefix) + e.Name - - if t.Catalog != "" || t.Schema != "" { - // Apply catalog & schema to inference table config if not set - if e.CreateServingEndpoint.AiGateway != nil && e.CreateServingEndpoint.AiGateway.InferenceTableConfig != nil { - if t.Catalog != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName == "" { - e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName = t.Catalog - } - if t.Schema != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName == "" { - e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName = t.Schema - } - } - - // Apply catalog & schema to auto capture config if not set - if e.CreateServingEndpoint.Config.AutoCaptureConfig != nil { - if t.Catalog != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName == "" { - e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName = t.Catalog - } - if t.Schema != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName == "" { - e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName = t.Schema - } - } - - // Fully qualify served entities and models if they are not already qualified - for i := range e.CreateServingEndpoint.Config.ServedEntities { - e.CreateServingEndpoint.Config.ServedEntities[i].EntityName = fullyQualifyName( - e.CreateServingEndpoint.Config.ServedEntities[i].EntityName, t.Catalog, t.Schema, - ) - } - for i := range e.CreateServingEndpoint.Config.ServedModels { - e.CreateServingEndpoint.Config.ServedModels[i].ModelName = fullyQualifyName( - e.CreateServingEndpoint.Config.ServedModels[i].ModelName, t.Catalog, t.Schema, - ) - } - } } // Registered models presets - // Supported: Prefix, Catalog, Schema + // Supported: Prefix // Not supported: Tags (not in API as of 2024-10) for key, m := range r.RegisteredModels { if m.CreateRegisteredModelRequest == nil { @@ -282,16 +170,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos continue } m.Name = normalizePrefix(prefix) + m.Name - if t.Catalog != "" && m.CatalogName == "" { - m.CatalogName = t.Catalog - } - if t.Schema != "" && m.SchemaName == "" { - m.SchemaName = t.Schema - } } // Quality monitors presets - // Supported: Schedule, Catalog, Schema + // Supported: Schedule // Not supported: Tags (not in API as of 2024-10) for key, q := range r.QualityMonitors { if q.CreateMonitor == nil { @@ -306,15 +188,9 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos q.Schedule = nil } } - if t.Catalog != "" && t.Schema != "" { - q.TableName = fullyQualifyName(q.TableName, t.Catalog, t.Schema) - if q.OutputSchemaName == "" { - q.OutputSchemaName = t.Catalog + "." + t.Schema - } - } } - // Schemas: Prefix, Catalog, Schema + // Schemas: Prefix only // Not supported: Tags (as of 2024-10, only supported in Databricks UI / SQL API) for key, s := range r.Schemas { if s.CreateSchema == nil { @@ -322,18 +198,9 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos continue } s.Name = normalizePrefix(prefix) + s.Name - if t.Catalog != "" && s.CatalogName == "" { - s.CatalogName = t.Catalog - } - if t.Schema != "" && s.Name == "" { - // If there is a schema preset such as 'dev', we directly - // use that name and don't add any prefix (which might result in dev_dev). - s.Name = t.Schema - } } // Clusters: Prefix, Tags - // Not supported: Catalog / Schema (not applicable) for key, c := range r.Clusters { if c.ClusterSpec == nil { diags = diags.Extend(diag.Errorf("cluster %s is not defined", key)) @@ -385,6 +252,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos return diags } +// validatePauseStatus checks the user-provided pause status is valid. func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics { p := b.Config.Presets.TriggerPauseStatus if p == "" || p == config.Paused || p == config.Unpaused { @@ -397,20 +265,8 @@ func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics { }} } -func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics { - p := b.Config.Presets - if (p.Catalog != "" && p.Schema == "") || (p.Catalog == "" && p.Schema != "") { - return diag.Diagnostics{{ - Summary: "presets.catalog and presets.schema must always be set together", - Severity: diag.Error, - Locations: []dyn.Location{b.Config.GetLocation("presets")}, - }} - } - return nil -} - // toTagArray converts a map of tags to an array of tags. -// We sort tags so ensure stable ordering. +// We sort tags to ensure stable ordering. func toTagArray(tags map[string]string) []Tag { var tagArray []Tag if tags == nil { @@ -440,151 +296,3 @@ func normalizePrefix(prefix string) string { return textutil.NormalizeString(prefix) + suffix } - -// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist. -// Returns any warning diagnostics for existing parameters. -func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, t config.Presets) diag.Diagnostics { - var diags diag.Diagnostics - - // Check for existing catalog/schema parameters - hasCatalog := false - hasSchema := false - if job.Parameters != nil { - for _, param := range job.Parameters { - if param.Name == "catalog" { - hasCatalog = true - diags = diags.Extend(diag.Diagnostics{{ - Summary: fmt.Sprintf("job %s already has 'catalog' parameter defined; ignoring preset value", key), - Severity: diag.Warning, - Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)}, - }}) - } - if param.Name == "schema" { - hasSchema = true - diags = diags.Extend(diag.Diagnostics{{ - Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key), - Severity: diag.Warning, - Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)}, - }}) - } - } - } - - // Initialize parameters if nil - if job.Parameters == nil { - job.Parameters = []jobs.JobParameterDefinition{} - } - - // Add catalog parameter if not already present - if !hasCatalog && t.Catalog != "" { - job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{ - Name: "catalog", - Default: t.Catalog, - }) - } - - // Add schema parameter if not already present - if !hasSchema && t.Schema != "" { - job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{ - Name: "schema", - Default: t.Schema, - }) - } - - return diags -} - -func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key string, job *resources.Job) diag.Diagnostics { - var diags diag.Diagnostics - for _, t := range job.Tasks { - var relPath string - var expected string - var fix string - if t.NotebookTask != nil { - relPath = t.NotebookTask.NotebookPath - expected = `dbutils.widgets.text\(['"]schema|` + - `USE[^)]+schema` - fix = " dbutils.widgets.text('catalog')\n" + - " dbutils.widgets.text('schema')\n" + - " catalog = dbutils.widgets.get('catalog')\n" + - " schema = dbutils.widgets.get('schema')\n" + - " spark.sql(f'USE {catalog}.{schema}')\n" - } else if t.SparkPythonTask != nil { - relPath = t.SparkPythonTask.PythonFile - expected = `add_argument\(['"]--catalog'|` + - `USE[^)]+catalog` - fix = " def main():\n" + - " parser = argparse.ArgumentParser()\n" + - " parser.add_argument('--catalog', required=True)\n" + - " parser.add_argument('--schema', '-s', required=True)\n" + - " args, unknown = parser.parse_known_args()\n" + - " spark.sql(f\"USE {args.catalog}.{args.schema}\")\n" - } else if t.SqlTask != nil && t.SqlTask.File != nil { - relPath = t.SqlTask.File.Path - expected = `:schema|\{\{schema\}\}` - fix = " USE CATALOG {{catalog}};\n" + - " USE IDENTIFIER({schema});\n" - } else { - continue - } - - sourceDir, err := b.Config.GetLocation("resources.jobs." + key).Directory() - if err != nil { - continue - } - - localPath, _, err := GetLocalPath(ctx, b, sourceDir, relPath) - if err != nil || localPath == "" { - // We ignore errors (they're reported by another mutator) - // and ignore empty local paths (which means we'd have to download the file) - continue - } - - if !fileIncludesPattern(ctx, localPath, expected) { - diags = diags.Extend(diag.Diagnostics{{ - Summary: "Use the 'catalog' and 'schema' parameters provided via 'presets.catalog' and 'presets.schema' using\n\n" + fix, - Severity: diag.Recommendation, - Locations: []dyn.Location{{ - File: localPath, - Line: 1, - Column: 1, - }}, - }}) - } - } - - return diags - -} - -// fullyQualifyName checks if the given name is already qualified with a catalog and schema. -// If not, and both catalog and schema are available, it prefixes the name with catalog.schema. -// If name is empty, returns name as-is. -func fullyQualifyName(name, catalog, schema string) string { - if name == "" || catalog == "" || schema == "" { - return name - } - // If it's already qualified (contains at least two '.'), we assume it's fully qualified. - parts := strings.Split(name, ".") - if len(parts) >= 3 { - // Already fully qualified - return name - } - // Otherwise, fully qualify it - return fmt.Sprintf("%s.%s.%s", catalog, schema, name) -} - -func fileIncludesPattern(ctx context.Context, filePath string, expected string) bool { - content, err := os.ReadFile(filePath) - if err != nil { - log.Warnf(ctx, "failed to check file %s: %v", filePath, err) - return true - } - - matched, err := regexp.MatchString(expected, string(content)) - if err != nil { - log.Warnf(ctx, "failed to check pattern in %s: %v", filePath, err) - return true - } - return matched -} diff --git a/bundle/config/mutator/apply_presets_catalog_schema.go b/bundle/config/mutator/apply_presets_catalog_schema.go new file mode 100644 index 000000000..04b6dc73f --- /dev/null +++ b/bundle/config/mutator/apply_presets_catalog_schema.go @@ -0,0 +1,364 @@ +package mutator + +import ( + "context" + "fmt" + "os" + "regexp" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +type applyPresetsCatalogSchema struct{} + +// ApplyPresetsCatalogSchema applies catalog and schema presets to bundle resources. +func ApplyPresetsCatalogSchema() *applyPresetsCatalogSchema { + return &applyPresetsCatalogSchema{} +} + +func (m *applyPresetsCatalogSchema) Name() string { + return "ApplyPresetsCatalogSchema" +} + +func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + diags := validateCatalogAndSchema(b) + if diags.HasError() { + return diags + } + + r := b.Config.Resources + p := b.Config.Presets + + // Jobs + for key, j := range r.Jobs { + if j.JobSettings == nil { + continue + } + if p.Catalog != "" || p.Schema != "" { + for _, task := range j.Tasks { + if task.DbtTask != nil { + if task.DbtTask.Catalog == "" { + task.DbtTask.Catalog = p.Catalog + } + if task.DbtTask.Schema == "" { + task.DbtTask.Schema = p.Schema + } + } + } + + diags = diags.Extend(addCatalogSchemaParameters(b, key, j, p)) + diags = diags.Extend(recommendCatalogSchemaUsage(b, ctx, key, j)) + } + } + + // Pipelines + for _, pl := range r.Pipelines { + if pl.PipelineSpec == nil { + continue + } + if p.Catalog != "" && p.Schema != "" { + if pl.Catalog == "" { + pl.Catalog = p.Catalog + } + if pl.Target == "" { + pl.Target = p.Schema + } + if pl.GatewayDefinition != nil { + if pl.GatewayDefinition.GatewayStorageCatalog == "" { + pl.GatewayDefinition.GatewayStorageCatalog = p.Catalog + } + if pl.GatewayDefinition.GatewayStorageSchema == "" { + pl.GatewayDefinition.GatewayStorageSchema = p.Schema + } + } + if pl.IngestionDefinition != nil { + for _, obj := range pl.IngestionDefinition.Objects { + if obj.Report != nil { + if obj.Report.DestinationCatalog == "" { + obj.Report.DestinationCatalog = p.Catalog + } + if obj.Report.DestinationSchema == "" { + obj.Report.DestinationSchema = p.Schema + } + } + if obj.Schema != nil { + if obj.Schema.SourceCatalog == "" { + obj.Schema.SourceCatalog = p.Catalog + } + if obj.Schema.SourceSchema == "" { + obj.Schema.SourceSchema = p.Schema + } + if obj.Schema.DestinationCatalog == "" { + obj.Schema.DestinationCatalog = p.Catalog + } + if obj.Schema.DestinationSchema == "" { + obj.Schema.DestinationSchema = p.Schema + } + } + if obj.Table != nil { + if obj.Table.SourceCatalog == "" { + obj.Table.SourceCatalog = p.Catalog + } + if obj.Table.SourceSchema == "" { + obj.Table.SourceSchema = p.Schema + } + if obj.Table.DestinationCatalog == "" { + obj.Table.DestinationCatalog = p.Catalog + } + if obj.Table.DestinationSchema == "" { + obj.Table.DestinationSchema = p.Schema + } + } + } + } + } + } + + // Model serving endpoints + for _, e := range r.ModelServingEndpoints { + if e.CreateServingEndpoint == nil { + continue + } + + if p.Catalog != "" || p.Schema != "" { + if e.CreateServingEndpoint.AiGateway != nil && e.CreateServingEndpoint.AiGateway.InferenceTableConfig != nil { + if p.Catalog != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName == "" { + e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName = p.Catalog + } + if p.Schema != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName == "" { + e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName = p.Schema + } + } + + if e.CreateServingEndpoint.Config.AutoCaptureConfig != nil { + if p.Catalog != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName == "" { + e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName = p.Catalog + } + if p.Schema != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName == "" { + e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName = p.Schema + } + } + + for i := range e.CreateServingEndpoint.Config.ServedEntities { + e.CreateServingEndpoint.Config.ServedEntities[i].EntityName = fullyQualifyName( + e.CreateServingEndpoint.Config.ServedEntities[i].EntityName, p.Catalog, p.Schema, + ) + } + for i := range e.CreateServingEndpoint.Config.ServedModels { + e.CreateServingEndpoint.Config.ServedModels[i].ModelName = fullyQualifyName( + e.CreateServingEndpoint.Config.ServedModels[i].ModelName, p.Catalog, p.Schema, + ) + } + } + } + + // Registered models + for _, m := range r.RegisteredModels { + if m.CreateRegisteredModelRequest == nil { + continue + } + if p.Catalog != "" && m.CatalogName == "" { + m.CatalogName = p.Catalog + } + if p.Schema != "" && m.SchemaName == "" { + m.SchemaName = p.Schema + } + } + + // Quality monitors + for _, q := range r.QualityMonitors { + if q.CreateMonitor == nil { + continue + } + if p.Catalog != "" && p.Schema != "" { + q.TableName = fullyQualifyName(q.TableName, p.Catalog, p.Schema) + if q.OutputSchemaName == "" { + q.OutputSchemaName = p.Catalog + "." + p.Schema + } + } + } + + // Schemas + for _, s := range r.Schemas { + if s.CreateSchema == nil { + continue + } + if p.Catalog != "" && s.CatalogName == "" { + s.CatalogName = p.Catalog + } + if p.Schema != "" && s.Name == "" { + // If there is a schema preset such as 'dev', we directly + // use that name and don't add any prefix (which might result in dev_dev). + s.Name = p.Schema + } + } + + return diags +} + +func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics { + p := b.Config.Presets + if (p.Catalog != "" && p.Schema == "") || (p.Catalog == "" && p.Schema != "") { + return diag.Diagnostics{{ + Summary: "presets.catalog and presets.schema must always be set together", + Severity: diag.Error, + Locations: []dyn.Location{b.Config.GetLocation("presets")}, + }} + } + return diag.Diagnostics{} +} + +// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist. +// Returns any warning diagnostics for existing parameters. +func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, t config.Presets) diag.Diagnostics { + var diags diag.Diagnostics + + // Check for existing catalog/schema parameters + hasCatalog := false + hasSchema := false + if job.Parameters != nil { + for _, param := range job.Parameters { + if param.Name == "catalog" { + hasCatalog = true + diags = diags.Extend(diag.Diagnostics{{ + Summary: fmt.Sprintf("job %s already has 'catalog' parameter defined; ignoring preset value", key), + Severity: diag.Warning, + Locations: b.Config.GetLocations("resources.jobs." + key + ".parameters"), + }}) + } + if param.Name == "schema" { + hasSchema = true + diags = diags.Extend(diag.Diagnostics{{ + Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key), + Severity: diag.Warning, + Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)}, + }}) + } + } + } + + // Initialize parameters if nil + if job.Parameters == nil { + job.Parameters = []jobs.JobParameterDefinition{} + } + + // Add catalog parameter if not already present + if !hasCatalog && t.Catalog != "" { + job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{ + Name: "catalog", + Default: t.Catalog, + }) + } + + // Add schema parameter if not already present + if !hasSchema && t.Schema != "" { + job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{ + Name: "schema", + Default: t.Schema, + }) + } + + return diags +} + +func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key string, job *resources.Job) diag.Diagnostics { + var diags diag.Diagnostics + for _, t := range job.Tasks { + var relPath string + var expected string + var fix string + if t.NotebookTask != nil { + relPath = t.NotebookTask.NotebookPath + expected = `dbutils.widgets.text\(['"]schema|` + + `USE[^)]+schema` + fix = " dbutils.widgets.text('catalog')\n" + + " dbutils.widgets.text('schema')\n" + + " catalog = dbutils.widgets.get('catalog')\n" + + " schema = dbutils.widgets.get('schema')\n" + + " spark.sql(f'USE {catalog}.{schema}')\n" + } else if t.SparkPythonTask != nil { + relPath = t.SparkPythonTask.PythonFile + expected = `add_argument\(['"]--catalog'|` + + `USE[^)]+catalog` + fix = " def main():\n" + + " parser = argparse.ArgumentParser()\n" + + " parser.add_argument('--catalog', required=True)\n" + + " parser.add_argument('--schema', '-s', required=True)\n" + + " args, unknown = parser.parse_known_args()\n" + + " spark.sql(f\"USE {args.catalog}.{args.schema}\")\n" + } else if t.SqlTask != nil && t.SqlTask.File != nil { + relPath = t.SqlTask.File.Path + expected = `:schema|\{\{schema\}\}` + fix = " USE CATALOG {{catalog}};\n" + + " USE IDENTIFIER({schema});\n" + } else { + continue + } + + sourceDir, err := b.Config.GetLocation("resources.jobs." + key).Directory() + if err != nil { + continue + } + + localPath, _, err := GetLocalPath(ctx, b, sourceDir, relPath) + if err != nil || localPath == "" { + // We ignore errors (they're reported by another mutator) + // and ignore empty local paths (which means we'd have to download the file) + continue + } + + if !fileIncludesPattern(ctx, localPath, expected) { + diags = diags.Extend(diag.Diagnostics{{ + Summary: "Use the 'catalog' and 'schema' parameters provided via 'presets.catalog' and 'presets.schema' using\n\n" + fix, + Severity: diag.Recommendation, + Locations: []dyn.Location{{ + File: localPath, + Line: 1, + Column: 1, + }}, + }}) + } + } + + return diags + +} + +// fullyQualifyName checks if the given name is already qualified with a catalog and schema. +// If not, and both catalog and schema are available, it prefixes the name with catalog.schema. +// If name is empty, returns name as-is. +func fullyQualifyName(name, catalog, schema string) string { + if name == "" || catalog == "" || schema == "" { + return name + } + // If it's already qualified (contains at least two '.'), we assume it's fully qualified. + parts := strings.Split(name, ".") + if len(parts) >= 3 { + // Already fully qualified + return name + } + // Otherwise, fully qualify it + return fmt.Sprintf("%s.%s.%s", catalog, schema, name) +} + +func fileIncludesPattern(ctx context.Context, filePath string, expected string) bool { + content, err := os.ReadFile(filePath) + if err != nil { + log.Warnf(ctx, "failed to check file %s: %v", filePath, err) + return true + } + + matched, err := regexp.MatchString(expected, string(content)) + if err != nil { + log.Warnf(ctx, "failed to check pattern in %s: %v", filePath, err) + return true + } + return matched +} diff --git a/bundle/config/mutator/apply_presets_catalog_schema_test.go b/bundle/config/mutator/apply_presets_catalog_schema_test.go new file mode 100644 index 000000000..df4efeb8a --- /dev/null +++ b/bundle/config/mutator/apply_presets_catalog_schema_test.go @@ -0,0 +1,348 @@ +package mutator_test + +import ( + "context" + "reflect" + "strings" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/databricks/databricks-sdk-go/service/serving" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type recordedField struct { + Path dyn.Path + PathString string + Placeholder string + Expected string +} + +func mockPresetsCatalogSchema() *bundle.Bundle { + return &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "key": { + JobSettings: &jobs.JobSettings{ + Name: "job", + Parameters: []jobs.JobParameterDefinition{ + {Name: "catalog", Default: ""}, + {Name: "schema", Default: ""}, + }, + Tasks: []jobs.Task{ + { + DbtTask: &jobs.DbtTask{ + Catalog: "", + Schema: "", + }, + }, + { + SparkPythonTask: &jobs.SparkPythonTask{ + PythonFile: "/file", + }, + }, + { + NotebookTask: &jobs.NotebookTask{ + NotebookPath: "/notebook", + }, + }, + }, + }, + }, + }, + Pipelines: map[string]*resources.Pipeline{ + "key": { + PipelineSpec: &pipelines.PipelineSpec{ + Name: "pipeline", + Catalog: "", + Target: "", + GatewayDefinition: &pipelines.IngestionGatewayPipelineDefinition{ + GatewayStorageCatalog: "", + GatewayStorageSchema: "", + }, + IngestionDefinition: &pipelines.IngestionPipelineDefinition{ + Objects: []pipelines.IngestionConfig{ + { + Report: &pipelines.ReportSpec{ + DestinationCatalog: "", + DestinationSchema: "", + }, + Schema: &pipelines.SchemaSpec{ + SourceCatalog: "", + SourceSchema: "", + DestinationCatalog: "", + DestinationSchema: "", + }, + Table: &pipelines.TableSpec{ + SourceCatalog: "", + SourceSchema: "", + DestinationCatalog: "", + DestinationSchema: "", + }, + }, + }, + }, + }, + }, + }, + ModelServingEndpoints: map[string]*resources.ModelServingEndpoint{ + "key": { + CreateServingEndpoint: &serving.CreateServingEndpoint{ + Name: "serving", + AiGateway: &serving.AiGatewayConfig{ + InferenceTableConfig: &serving.AiGatewayInferenceTableConfig{ + CatalogName: "", + SchemaName: "", + }, + }, + Config: serving.EndpointCoreConfigInput{ + AutoCaptureConfig: &serving.AutoCaptureConfigInput{ + CatalogName: "", + SchemaName: "", + }, + ServedEntities: []serving.ServedEntityInput{ + {EntityName: "..entity"}, + }, + ServedModels: []serving.ServedModelInput{ + {ModelName: "..model"}, + }, + }, + }, + }, + }, + RegisteredModels: map[string]*resources.RegisteredModel{ + "key": { + CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{ + Name: "registered_model", + CatalogName: "", + SchemaName: "", + }, + }, + }, + QualityMonitors: map[string]*resources.QualityMonitor{ + "key": { + TableName: "..table", + CreateMonitor: &catalog.CreateMonitor{ + OutputSchemaName: ".", + }, + }, + }, + Schemas: map[string]*resources.Schema{ + "key": { + CreateSchema: &catalog.CreateSchema{ + Name: "", + CatalogName: "", + }, + }, + }, + }, + Presets: config.Presets{ + Catalog: "my_catalog", + Schema: "my_schema", + }, + }, + } +} + +// ignoredFields are fields that should be ignored in the completeness check +var ignoredFields = map[string]string{ + "resources.pipelines.key.schema": "schema is still in private preview", + "resources.jobs.key.tasks[0].notebook_task.base_parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].python_wheel_task.named_parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].python_wheel_task.parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.job_parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].spark_jar_task.parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].spark_python_task.parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].spark_submit_task.parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].sql_task.parameters": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.jar_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.notebook_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.pipeline_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.python_named_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.python_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.spark_submit_params": "catalog/schema are passed via job parameters", + "resources.jobs.key.tasks[0].run_job_task.sql_params": "catalog/schema are passed via job parameters", + "resources.pipelines.key.ingestion_definition.objects[0].schema": "schema name is under schema.source_schema/destination_schema", + "resources.schemas": "schema name of schemas is under resources.schemas.key.Name", +} + +func TestApplyPresetsCatalogSchemaWhenAlreadySet(t *testing.T) { + b := mockPresetsCatalogSchema() + recordedFields := recordPlaceholderFields(t, b) + + diags := bundle.Apply(context.Background(), b, mutator.ApplyPresets()) + require.NoError(t, diags.Error()) + + for _, f := range recordedFields { + val, err := dyn.GetByPath(b.Config.Value(), f.Path) + require.NoError(t, err, "failed to get path %s", f.Path) + require.Equal(t, f.Placeholder, val.MustString(), + "expected placeholder '%s' at %s to remain unchanged before cleanup", f.Placeholder, f.Path) + } +} + +func TestApplyPresetsCatalogSchemaWhenNotSet(t *testing.T) { + b := mockPresetsCatalogSchema() + recordedFields := recordPlaceholderFields(t, b) + + // Set all catalog/schema fields to empty strings / nil + require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) { + for _, f := range recordedFields { + value, err := dyn.GetByPath(root, f.Path) + require.NoError(t, err) + + val := value.MustString() + cleanedVal := removePlaceholders(val) + root, err = dyn.SetByPath(root, f.Path, dyn.V(cleanedVal)) + require.NoError(t, err) + } + return dyn.Set(root, "resources.jobs.key.parameters", dyn.NilValue) + })) + + // Apply catalog/schema presets + diags := bundle.Apply(context.Background(), b, mutator.ApplyPresetsCatalogSchema()) + require.NoError(t, diags.Error()) + + // Verify that all catalog/schema fields have been set to the presets + for _, f := range recordedFields { + val, err := dyn.GetByPath(b.Config.Value(), f.Path) + require.NoError(t, err, "could not find expected field(s) at %s", f.Path) + assert.Equal(t, f.Expected, val.MustString(), "preset value expected for %s based on placeholder %s", f.Path, f.Placeholder) + } +} + +func TestApplyPresetsCatalogSchemaCompleteness(t *testing.T) { + b := mockPresetsCatalogSchema() + recordedFields := recordPlaceholderFields(t, b) + + // Convert the recordedFields to a set for easier lookup + recordedPaths := make(map[string]struct{}) + for _, field := range recordedFields { + recordedPaths[field.PathString] = struct{}{} + if i := strings.Index(field.PathString, "["); i >= 0 { + // For entries like resources.jobs.key.parameters[1].default, just add resources.jobs.key.parameters + recordedPaths[field.PathString[:i]] = struct{}{} + } + } + + // Find all catalog/schema fields that we think should be covered based + // on all properties in config.Resources. + expectedFields := findCatalogSchemaFields() + assert.GreaterOrEqual(t, len(expectedFields), 42, "expected at least 42 catalog/schema fields, but got %d", len(expectedFields)) + + // Verify that all expected fields are there + for _, field := range expectedFields { + if _, recorded := recordedPaths[field]; !recorded { + if _, ignored := ignoredFields[field]; !ignored { + t.Errorf("Field %s was not included in the catalog/schema presets test. If this is a new field, please add it to PresetsMock or PresetsIgnoredFields and add support for it as appropriate.", field) + } + } + } +} + +// recordPlaceholderFields scans the config and records all fields containing catalog/schema placeholders +func recordPlaceholderFields(t *testing.T, b *bundle.Bundle) []recordedField { + t.Helper() + + var recordedFields []recordedField + err := b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) { + _, err := dyn.Walk(b.Config.Value(), func(p dyn.Path, v dyn.Value) (dyn.Value, error) { + if v.Kind() == dyn.KindString { + val := v.MustString() + if strings.Contains(val, "") || strings.Contains(val, "") { + pathCopy := make(dyn.Path, len(p)) + copy(pathCopy, p) + recordedFields = append(recordedFields, recordedField{ + Path: pathCopy, + PathString: pathCopy.String(), + Placeholder: val, + Expected: replacePlaceholders(val, "my_catalog", "my_schema"), + }) + } + } + return v, nil + }) + return root, err + }) + require.NoError(t, err) + return recordedFields +} + +// findCatalogSchemaFields finds all fields in config.Resources that might refer +// to a catalog or schema. Returns a slice of field paths. +func findCatalogSchemaFields() []string { + visited := make(map[reflect.Type]struct{}) + var results []string + + // verifyTypeFields is a recursive function to verify the fields of a given type + var walkTypeFields func(rt reflect.Type, path string) + walkTypeFields = func(rt reflect.Type, path string) { + if _, seen := visited[rt]; seen { + return + } + visited[rt] = struct{}{} + + switch rt.Kind() { + case reflect.Slice, reflect.Array: + walkTypeFields(rt.Elem(), path+"[0]") + case reflect.Map: + walkTypeFields(rt.Elem(), path+".key") + case reflect.Ptr: + walkTypeFields(rt.Elem(), path) + case reflect.Struct: + for i := 0; i < rt.NumField(); i++ { + ft := rt.Field(i) + jsonTag := ft.Tag.Get("json") + if jsonTag == "" || jsonTag == "-" { + // Ignore field names when there's no JSON tag, e.g. for Jobs.JobSettings + walkTypeFields(ft.Type, path) + continue + } + + fieldName := strings.Split(jsonTag, ",")[0] + fieldPath := path + "." + fieldName + + if isCatalogOrSchemaField(fieldName) { + results = append(results, fieldPath) + } + + walkTypeFields(ft.Type, fieldPath) + } + } + } + + var r config.Resources + walkTypeFields(reflect.TypeOf(r), "resources") + return results +} + +// isCatalogOrSchemaField returns true for a field names in config.Resources that we suspect could contain a catalog or schema name +func isCatalogOrSchemaField(name string) bool { + return strings.Contains(name, "catalog") || + strings.Contains(name, "schema") || + strings.Contains(name, "parameters") || + strings.Contains(name, "params") +} + +func removePlaceholders(value string) string { + value = strings.ReplaceAll(value, ".", "") + value = strings.ReplaceAll(value, ".", "") + value = strings.ReplaceAll(value, "", "") + value = strings.ReplaceAll(value, "", "") + return value +} + +func replacePlaceholders(placeholder, catalog, schema string) string { + expected := strings.ReplaceAll(placeholder, "", catalog) + expected = strings.ReplaceAll(expected, "", schema) + return expected +} diff --git a/bundle/config/mutator/apply_presets_test.go b/bundle/config/mutator/apply_presets_test.go index dfcb41171..497ef051a 100644 --- a/bundle/config/mutator/apply_presets_test.go +++ b/bundle/config/mutator/apply_presets_test.go @@ -2,9 +2,7 @@ package mutator_test import ( "context" - "reflect" "runtime" - "strings" "testing" "github.com/databricks/cli/bundle" @@ -16,19 +14,9 @@ import ( "github.com/databricks/cli/libs/dyn" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/jobs" - "github.com/databricks/databricks-sdk-go/service/pipelines" - "github.com/databricks/databricks-sdk-go/service/serving" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -type recordedField struct { - Path dyn.Path - PathString string - Placeholder string - Expected string -} - func TestApplyPresetsPrefix(t *testing.T) { tests := []struct { name string @@ -465,317 +453,3 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) { } } - -func PresetsMock() *bundle.Bundle { - return &bundle.Bundle{ - Config: config.Root{ - Resources: config.Resources{ - Jobs: map[string]*resources.Job{ - "key": { - JobSettings: &jobs.JobSettings{ - Name: "job", - Parameters: []jobs.JobParameterDefinition{ - {Name: "catalog", Default: ""}, - {Name: "schema", Default: ""}, - }, - Tasks: []jobs.Task{ - { - DbtTask: &jobs.DbtTask{ - Catalog: "", - Schema: "", - }, - }, - { - SparkPythonTask: &jobs.SparkPythonTask{ - PythonFile: "/file", - }, - }, - { - NotebookTask: &jobs.NotebookTask{ - NotebookPath: "/notebook", - }, - }, - }, - }, - }, - }, - Pipelines: map[string]*resources.Pipeline{ - "key": { - PipelineSpec: &pipelines.PipelineSpec{ - Name: "pipeline", - Catalog: "", - Target: "", - GatewayDefinition: &pipelines.IngestionGatewayPipelineDefinition{ - GatewayStorageCatalog: "", - GatewayStorageSchema: "", - }, - IngestionDefinition: &pipelines.IngestionPipelineDefinition{ - Objects: []pipelines.IngestionConfig{ - { - Report: &pipelines.ReportSpec{ - DestinationCatalog: "", - DestinationSchema: "", - }, - Schema: &pipelines.SchemaSpec{ - SourceCatalog: "", - SourceSchema: "", - DestinationCatalog: "", - DestinationSchema: "", - }, - Table: &pipelines.TableSpec{ - SourceCatalog: "", - SourceSchema: "", - DestinationCatalog: "", - DestinationSchema: "", - }, - }, - }, - }, - }, - }, - }, - ModelServingEndpoints: map[string]*resources.ModelServingEndpoint{ - "key": { - CreateServingEndpoint: &serving.CreateServingEndpoint{ - Name: "serving", - AiGateway: &serving.AiGatewayConfig{ - InferenceTableConfig: &serving.AiGatewayInferenceTableConfig{ - CatalogName: "", - SchemaName: "", - }, - }, - Config: serving.EndpointCoreConfigInput{ - AutoCaptureConfig: &serving.AutoCaptureConfigInput{ - CatalogName: "", - SchemaName: "", - }, - ServedEntities: []serving.ServedEntityInput{ - {EntityName: "..entity"}, - }, - ServedModels: []serving.ServedModelInput{ - {ModelName: "..model"}, - }, - }, - }, - }, - }, - RegisteredModels: map[string]*resources.RegisteredModel{ - "key": { - CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{ - Name: "registered_model", - CatalogName: "", - SchemaName: "", - }, - }, - }, - QualityMonitors: map[string]*resources.QualityMonitor{ - "key": { - TableName: "..table", - CreateMonitor: &catalog.CreateMonitor{ - OutputSchemaName: ".", - }, - }, - }, - Schemas: map[string]*resources.Schema{ - "key": { - CreateSchema: &catalog.CreateSchema{ - Name: "", - CatalogName: "", - }, - }, - }, - }, - }, - } -} - -// Any fields that should be ignored in the completeness check -var PresetsIgnoredFields = map[string]string{ - "resources.pipelines.key.schema": "schema is still in private preview", - "resources.jobs.key.tasks[0].notebook_task.base_parameters": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].python_wheel_task.named_parameters": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].python_wheel_task.parameters": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].run_job_task.job_parameters": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].spark_jar_task.parameters": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].spark_python_task.parameters": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].spark_submit_task.parameters": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].sql_task.parameters": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].run_job_task.jar_params": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].run_job_task.notebook_params": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].run_job_task.pipeline_params": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].run_job_task.python_named_params": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].run_job_task.python_params": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].run_job_task.spark_submit_params": "catalog/schema are passed via job parameters", - "resources.jobs.key.tasks[0].run_job_task.sql_params": "catalog/schema are passed via job parameters", - "resources.pipelines.key.ingestion_definition.objects[0].schema": "schema name is under schema.source_schema/destination_schema", - "resources.schemas": "schema name of schemas is under resources.schemas.key.Name", -} - -func TestApplyPresetsCatalogSchema(t *testing.T) { - b := PresetsMock() - b.Config.Presets = config.Presets{ - Catalog: "my_catalog", - Schema: "my_schema", - } - ctx := context.Background() - - // Initial scan: record all fields that contain placeholders. - // We do this before the first apply so we can verify no changes occur. - var recordedFields []recordedField - require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) { - _, err := dyn.Walk(b.Config.Value(), func(p dyn.Path, v dyn.Value) (dyn.Value, error) { - if v.Kind() == dyn.KindString { - val := v.MustString() - if strings.Contains(val, "") || strings.Contains(val, "") { - pathCopy := make(dyn.Path, len(p)) - copy(pathCopy, p) - recordedFields = append(recordedFields, recordedField{ - Path: pathCopy, - PathString: pathCopy.String(), - Placeholder: val, - Expected: replacePlaceholders(val, "my_catalog", "my_schema"), - }) - } - } - return v, nil - }) - return root, err - })) - - // Convert the recordedFields to a set for easier lookup - recordedSet := make(map[string]struct{}) - for _, field := range recordedFields { - recordedSet[field.PathString] = struct{}{} - if i := strings.Index(field.PathString, "["); i >= 0 { - // For entries like resources.jobs.key.parameters[1].default, just add resources.jobs.key.parameters - recordedSet[field.PathString[:i]] = struct{}{} - } - } - - // Stage 1: Apply presets before cleanup, should be no-op. - diags := bundle.Apply(ctx, b, mutator.ApplyPresets()) - require.False(t, diags.HasError(), "unexpected error before cleanup: %v", diags.Error()) - - // Verify that no recorded fields changed - verifyNoChangesBeforeCleanup(t, b.Config.Value(), recordedFields) - - // Stage 2: Cleanup: Walk over rootVal and remove placeholders, adjusting recordedFields Expected values. - require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) { - for _, f := range recordedFields { - value, err := dyn.GetByPath(root, f.Path) - require.NoError(t, err) - - val := value.MustString() - cleanedVal := removePlaceholders(val) - root, err = dyn.SetByPath(root, f.Path, dyn.V(cleanedVal)) - require.NoError(t, err) - } - root, err := dyn.Set(root, "resources.jobs.key.parameters", dyn.NilValue) - require.NoError(t, err) - return root, nil - })) - - // Stage 3: Apply presets after cleanup. - diags = bundle.Apply(ctx, b, mutator.ApplyPresets()) - require.False(t, diags.HasError(), "unexpected error after cleanup: %v", diags.Error()) - - // Verify that fields have the expected replacements - config := b.Config.Value() - for _, f := range recordedFields { - val, err := dyn.GetByPath(config, f.Path) - require.NoError(t, err, "failed to get path %s", f.Path) - assert.Equal(t, f.Expected, val.MustString(), "preset value expected for %s based on placeholder %s", f.Path, f.Placeholder) - } - - // Stage 4: Check completeness - expectedFields := findCatalogSchemaFields() - assert.GreaterOrEqual(t, len(expectedFields), 42, "expected at least 42 catalog/schema fields, but got %d", len(expectedFields)) - for _, field := range expectedFields { - if _, recorded := recordedSet[field]; !recorded { - if _, ignored := PresetsIgnoredFields[field]; !ignored { - t.Errorf("Field %s was not included in the catalog/schema presets test. If this is a new field, please add it to PresetsMock or PresetsIgnoredFields and add support for it as appropriate.", field) - } - } - } -} - -func verifyNoChangesBeforeCleanup(t *testing.T, rootVal dyn.Value, recordedFields []recordedField) { - t.Helper() - - for _, f := range recordedFields { - val, err := dyn.GetByPath(rootVal, f.Path) - require.NoError(t, err, "failed to get path %s", f.Path) - require.Equal(t, f.Placeholder, val.MustString(), - "expected placeholder '%s' at %s to remain unchanged before cleanup", f.Placeholder, f.Path) - } -} - -// findCatalogSchemaFields finds all fields in config.Resources that might refer -// to a catalog or schema. Returns a slice of field paths. -func findCatalogSchemaFields() []string { - visited := make(map[reflect.Type]struct{}) - var results []string - - // verifyTypeFields is a recursive function to verify the fields of a given type - var walkTypeFields func(rt reflect.Type, path string) - walkTypeFields = func(rt reflect.Type, path string) { - if _, seen := visited[rt]; seen { - return - } - visited[rt] = struct{}{} - - switch rt.Kind() { - case reflect.Slice, reflect.Array: - walkTypeFields(rt.Elem(), path+"[0]") - case reflect.Map: - walkTypeFields(rt.Elem(), path+".key") - case reflect.Ptr: - walkTypeFields(rt.Elem(), path) - case reflect.Struct: - for i := 0; i < rt.NumField(); i++ { - ft := rt.Field(i) - jsonTag := ft.Tag.Get("json") - if jsonTag == "" || jsonTag == "-" { - // Ignore field names when there's no JSON tag, e.g. for Jobs.JobSettings - walkTypeFields(ft.Type, path) - continue - } - - fieldName := strings.Split(jsonTag, ",")[0] - fieldPath := path + "." + fieldName - - if isCatalogOrSchemaField(fieldName) { - results = append(results, fieldPath) - } - - walkTypeFields(ft.Type, fieldPath) - } - } - } - - var r config.Resources - walkTypeFields(reflect.TypeOf(r), "resources") - return results -} - -// isCatalogOrSchemaField returns true for a field names in config.Resources that we suspect could contain a catalog or schema name -func isCatalogOrSchemaField(name string) bool { - return strings.Contains(name, "catalog") || - strings.Contains(name, "schema") || - strings.Contains(name, "parameters") || - strings.Contains(name, "params") -} - -func removePlaceholders(value string) string { - value = strings.ReplaceAll(value, ".", "") - value = strings.ReplaceAll(value, ".", "") - value = strings.ReplaceAll(value, "", "") - value = strings.ReplaceAll(value, "", "") - return value -} - -func replacePlaceholders(placeholder, catalog, schema string) string { - expected := strings.ReplaceAll(placeholder, "", catalog) - expected = strings.ReplaceAll(expected, "", schema) - return expected -} diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index b62fa0b14..d3b6f81f9 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -70,6 +70,7 @@ func Initialize() bundle.Mutator { mutator.ConfigureDashboardDefaults(), mutator.ProcessTargetMode(), mutator.ApplyPresets(), + mutator.ApplyPresetsCatalogSchema(), mutator.DefaultQueueing(), mutator.ExpandPipelineGlobPaths(),