mirror of https://github.com/databricks/cli.git
Compare commits
No commits in common. "8eb96ccb7d642667d596c6c7d5f5015a5432e296" and "70f54dca12a0cef5f525e7f6fe244cf6bbafe6bc" have entirely different histories.
8eb96ccb7d
...
70f54dca12
|
@ -2,16 +2,21 @@ package mutator
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"regexp"
|
||||||
"slices"
|
"slices"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/databricks/cli/bundle"
|
"github.com/databricks/cli/bundle"
|
||||||
"github.com/databricks/cli/bundle/config"
|
"github.com/databricks/cli/bundle/config"
|
||||||
|
"github.com/databricks/cli/bundle/config/resources"
|
||||||
"github.com/databricks/cli/libs/dbr"
|
"github.com/databricks/cli/libs/dbr"
|
||||||
"github.com/databricks/cli/libs/diag"
|
"github.com/databricks/cli/libs/diag"
|
||||||
"github.com/databricks/cli/libs/dyn"
|
"github.com/databricks/cli/libs/dyn"
|
||||||
|
"github.com/databricks/cli/libs/log"
|
||||||
"github.com/databricks/cli/libs/textutil"
|
"github.com/databricks/cli/libs/textutil"
|
||||||
"github.com/databricks/databricks-sdk-go/service/catalog"
|
"github.com/databricks/databricks-sdk-go/service/catalog"
|
||||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||||
|
@ -22,7 +27,6 @@ type applyPresets struct{}
|
||||||
|
|
||||||
// Apply all presets, e.g. the prefix presets that
|
// Apply all presets, e.g. the prefix presets that
|
||||||
// adds a prefix to all names of all resources.
|
// adds a prefix to all names of all resources.
|
||||||
// Note the catalog/schema presets are applied in ApplyPresetsCatalogSchema.
|
|
||||||
func ApplyPresets() *applyPresets {
|
func ApplyPresets() *applyPresets {
|
||||||
return &applyPresets{}
|
return &applyPresets{}
|
||||||
}
|
}
|
||||||
|
@ -39,6 +43,9 @@ func (m *applyPresets) Name() string {
|
||||||
func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||||
var diags diag.Diagnostics
|
var diags diag.Diagnostics
|
||||||
|
|
||||||
|
if d := validateCatalogAndSchema(b); d != nil {
|
||||||
|
return d // fast fail since code below would fail
|
||||||
|
}
|
||||||
if d := validatePauseStatus(b); d != nil {
|
if d := validatePauseStatus(b); d != nil {
|
||||||
diags = diags.Extend(d)
|
diags = diags.Extend(d)
|
||||||
}
|
}
|
||||||
|
@ -48,7 +55,8 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
prefix := t.NamePrefix
|
prefix := t.NamePrefix
|
||||||
tags := toTagArray(t.Tags)
|
tags := toTagArray(t.Tags)
|
||||||
|
|
||||||
// Jobs presets: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus
|
// Jobs presets.
|
||||||
|
// Supported: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus, Catalog, Schema
|
||||||
for key, j := range r.Jobs {
|
for key, j := range r.Jobs {
|
||||||
if j.JobSettings == nil {
|
if j.JobSettings == nil {
|
||||||
diags = diags.Extend(diag.Errorf("job %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("job %s is not defined", key))
|
||||||
|
@ -82,10 +90,26 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
j.Trigger.PauseStatus = paused
|
j.Trigger.PauseStatus = paused
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if t.Catalog != "" || t.Schema != "" {
|
||||||
|
for _, task := range j.Tasks {
|
||||||
|
if task.DbtTask != nil {
|
||||||
|
if task.DbtTask.Catalog == "" {
|
||||||
|
task.DbtTask.Catalog = t.Catalog
|
||||||
|
}
|
||||||
|
if task.DbtTask.Schema == "" {
|
||||||
|
task.DbtTask.Schema = t.Schema
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
diags = diags.Extend(addCatalogSchemaParameters(b, key, j, t))
|
||||||
|
diags = diags.Extend(recommendCatalogSchemaUsage(b, ctx, key, j))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pipelines presets: Prefix, PipelinesDevelopment
|
// Pipelines presets.
|
||||||
// Not supported: Tags (not in API as of 2024-12)
|
// Supported: Prefix, PipelinesDevelopment, Catalog, Schema
|
||||||
|
// Not supported: Tags (as of 2024-10 not in pipelines API)
|
||||||
for key, p := range r.Pipelines {
|
for key, p := range r.Pipelines {
|
||||||
if p.PipelineSpec == nil {
|
if p.PipelineSpec == nil {
|
||||||
diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key))
|
||||||
|
@ -98,9 +122,66 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
if t.TriggerPauseStatus == config.Paused {
|
if t.TriggerPauseStatus == config.Paused {
|
||||||
p.Continuous = false
|
p.Continuous = false
|
||||||
}
|
}
|
||||||
|
if t.Catalog != "" && t.Schema != "" {
|
||||||
|
if p.Catalog == "" {
|
||||||
|
p.Catalog = t.Catalog
|
||||||
|
}
|
||||||
|
if p.Target == "" {
|
||||||
|
p.Target = t.Schema
|
||||||
|
}
|
||||||
|
if p.GatewayDefinition != nil {
|
||||||
|
if p.GatewayDefinition.GatewayStorageCatalog == "" {
|
||||||
|
p.GatewayDefinition.GatewayStorageCatalog = t.Catalog
|
||||||
|
}
|
||||||
|
if p.GatewayDefinition.GatewayStorageSchema == "" {
|
||||||
|
p.GatewayDefinition.GatewayStorageSchema = t.Schema
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if p.IngestionDefinition != nil {
|
||||||
|
for _, obj := range p.IngestionDefinition.Objects {
|
||||||
|
if obj.Report != nil {
|
||||||
|
if obj.Report.DestinationCatalog == "" {
|
||||||
|
obj.Report.DestinationCatalog = t.Catalog
|
||||||
|
}
|
||||||
|
if obj.Report.DestinationSchema == "" {
|
||||||
|
obj.Report.DestinationSchema = t.Schema
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if obj.Schema != nil {
|
||||||
|
if obj.Schema.SourceCatalog == "" {
|
||||||
|
obj.Schema.SourceCatalog = t.Catalog
|
||||||
|
}
|
||||||
|
if obj.Schema.SourceSchema == "" {
|
||||||
|
obj.Schema.SourceSchema = t.Schema
|
||||||
|
}
|
||||||
|
if obj.Schema.DestinationCatalog == "" {
|
||||||
|
obj.Schema.DestinationCatalog = t.Catalog
|
||||||
|
}
|
||||||
|
if obj.Schema.DestinationSchema == "" {
|
||||||
|
obj.Schema.DestinationSchema = t.Schema
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if obj.Table != nil {
|
||||||
|
if obj.Table.SourceCatalog == "" {
|
||||||
|
obj.Table.SourceCatalog = t.Catalog
|
||||||
|
}
|
||||||
|
if obj.Table.SourceSchema == "" {
|
||||||
|
obj.Table.SourceSchema = t.Schema
|
||||||
|
}
|
||||||
|
if obj.Table.DestinationCatalog == "" {
|
||||||
|
obj.Table.DestinationCatalog = t.Catalog
|
||||||
|
}
|
||||||
|
if obj.Table.DestinationSchema == "" {
|
||||||
|
obj.Table.DestinationSchema = t.Schema
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Models presets: Prefix, Tags
|
// Models presets
|
||||||
|
// Supported: Prefix, Tags
|
||||||
for key, m := range r.Models {
|
for key, m := range r.Models {
|
||||||
if m.Model == nil {
|
if m.Model == nil {
|
||||||
diags = diags.Extend(diag.Errorf("model %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("model %s is not defined", key))
|
||||||
|
@ -118,7 +199,8 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Experiments presets: Prefix, Tags
|
// Experiments presets
|
||||||
|
// Supported: Prefix, Tags
|
||||||
for key, e := range r.Experiments {
|
for key, e := range r.Experiments {
|
||||||
if e.Experiment == nil {
|
if e.Experiment == nil {
|
||||||
diags = diags.Extend(diag.Errorf("experiment %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("experiment %s is not defined", key))
|
||||||
|
@ -146,28 +228,71 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Model serving endpoint presets: Prefix
|
// Model serving endpoint presets
|
||||||
// Not supported: Tags (not in API as of 2024-12)
|
// Supported: Prefix, Catalog, Schema
|
||||||
|
// Not supported: Tags (not in API as of 2024-10)
|
||||||
for key, e := range r.ModelServingEndpoints {
|
for key, e := range r.ModelServingEndpoints {
|
||||||
if e.CreateServingEndpoint == nil {
|
if e.CreateServingEndpoint == nil {
|
||||||
diags = diags.Extend(diag.Errorf("model serving endpoint %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("model serving endpoint %s is not defined", key))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
e.Name = normalizePrefix(prefix) + e.Name
|
e.Name = normalizePrefix(prefix) + e.Name
|
||||||
|
|
||||||
|
if t.Catalog != "" || t.Schema != "" {
|
||||||
|
// Apply catalog & schema to inference table config if not set
|
||||||
|
if e.CreateServingEndpoint.AiGateway != nil && e.CreateServingEndpoint.AiGateway.InferenceTableConfig != nil {
|
||||||
|
if t.Catalog != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName == "" {
|
||||||
|
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName = t.Catalog
|
||||||
|
}
|
||||||
|
if t.Schema != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName == "" {
|
||||||
|
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName = t.Schema
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply catalog & schema to auto capture config if not set
|
||||||
|
if e.CreateServingEndpoint.Config.AutoCaptureConfig != nil {
|
||||||
|
if t.Catalog != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName == "" {
|
||||||
|
e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName = t.Catalog
|
||||||
|
}
|
||||||
|
if t.Schema != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName == "" {
|
||||||
|
e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName = t.Schema
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fully qualify served entities and models if they are not already qualified
|
||||||
|
for i := range e.CreateServingEndpoint.Config.ServedEntities {
|
||||||
|
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName = fullyQualifyName(
|
||||||
|
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName, t.Catalog, t.Schema,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
for i := range e.CreateServingEndpoint.Config.ServedModels {
|
||||||
|
e.CreateServingEndpoint.Config.ServedModels[i].ModelName = fullyQualifyName(
|
||||||
|
e.CreateServingEndpoint.Config.ServedModels[i].ModelName, t.Catalog, t.Schema,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Registered models presets: Prefix
|
// Registered models presets
|
||||||
// Not supported: Tags (not in API as of 2024-12)
|
// Supported: Prefix, Catalog, Schema
|
||||||
|
// Not supported: Tags (not in API as of 2024-10)
|
||||||
for key, m := range r.RegisteredModels {
|
for key, m := range r.RegisteredModels {
|
||||||
if m.CreateRegisteredModelRequest == nil {
|
if m.CreateRegisteredModelRequest == nil {
|
||||||
diags = diags.Extend(diag.Errorf("registered model %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("registered model %s is not defined", key))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
m.Name = normalizePrefix(prefix) + m.Name
|
m.Name = normalizePrefix(prefix) + m.Name
|
||||||
|
if t.Catalog != "" && m.CatalogName == "" {
|
||||||
|
m.CatalogName = t.Catalog
|
||||||
|
}
|
||||||
|
if t.Schema != "" && m.SchemaName == "" {
|
||||||
|
m.SchemaName = t.Schema
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Quality monitors presets: Schedule
|
// Quality monitors presets
|
||||||
// Not supported: Tags (not in API as of 2024-12)
|
// Supported: Schedule, Catalog, Schema
|
||||||
|
// Not supported: Tags (not in API as of 2024-10)
|
||||||
for key, q := range r.QualityMonitors {
|
for key, q := range r.QualityMonitors {
|
||||||
if q.CreateMonitor == nil {
|
if q.CreateMonitor == nil {
|
||||||
diags = diags.Extend(diag.Errorf("quality monitor %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("quality monitor %s is not defined", key))
|
||||||
|
@ -181,19 +306,34 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
q.Schedule = nil
|
q.Schedule = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if t.Catalog != "" && t.Schema != "" {
|
||||||
|
q.TableName = fullyQualifyName(q.TableName, t.Catalog, t.Schema)
|
||||||
|
if q.OutputSchemaName == "" {
|
||||||
|
q.OutputSchemaName = t.Catalog + "." + t.Schema
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schemas: Prefix
|
// Schemas: Prefix, Catalog, Schema
|
||||||
// Not supported: Tags (only supported in Databricks UI / SQL API as of 2024-12)
|
// Not supported: Tags (as of 2024-10, only supported in Databricks UI / SQL API)
|
||||||
for key, s := range r.Schemas {
|
for key, s := range r.Schemas {
|
||||||
if s.CreateSchema == nil {
|
if s.CreateSchema == nil {
|
||||||
diags = diags.Extend(diag.Errorf("schema %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("schema %s is not defined", key))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.Name = normalizePrefix(prefix) + s.Name
|
s.Name = normalizePrefix(prefix) + s.Name
|
||||||
|
if t.Catalog != "" && s.CatalogName == "" {
|
||||||
|
s.CatalogName = t.Catalog
|
||||||
|
}
|
||||||
|
if t.Schema != "" && s.Name == "" {
|
||||||
|
// If there is a schema preset such as 'dev', we directly
|
||||||
|
// use that name and don't add any prefix (which might result in dev_dev).
|
||||||
|
s.Name = t.Schema
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clusters: Prefix, Tags
|
// Clusters: Prefix, Tags
|
||||||
|
// Not supported: Catalog / Schema (not applicable)
|
||||||
for key, c := range r.Clusters {
|
for key, c := range r.Clusters {
|
||||||
if c.ClusterSpec == nil {
|
if c.ClusterSpec == nil {
|
||||||
diags = diags.Extend(diag.Errorf("cluster %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("cluster %s is not defined", key))
|
||||||
|
@ -245,7 +385,6 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
return diags
|
return diags
|
||||||
}
|
}
|
||||||
|
|
||||||
// validatePauseStatus checks the user-provided pause status is valid.
|
|
||||||
func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics {
|
func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics {
|
||||||
p := b.Config.Presets.TriggerPauseStatus
|
p := b.Config.Presets.TriggerPauseStatus
|
||||||
if p == "" || p == config.Paused || p == config.Unpaused {
|
if p == "" || p == config.Paused || p == config.Unpaused {
|
||||||
|
@ -258,8 +397,20 @@ func validatePauseStatus(b *bundle.Bundle) diag.Diagnostics {
|
||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics {
|
||||||
|
p := b.Config.Presets
|
||||||
|
if (p.Catalog != "" && p.Schema == "") || (p.Catalog == "" && p.Schema != "") {
|
||||||
|
return diag.Diagnostics{{
|
||||||
|
Summary: "presets.catalog and presets.schema must always be set together",
|
||||||
|
Severity: diag.Error,
|
||||||
|
Locations: []dyn.Location{b.Config.GetLocation("presets")},
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// toTagArray converts a map of tags to an array of tags.
|
// toTagArray converts a map of tags to an array of tags.
|
||||||
// We sort tags to ensure stable ordering.
|
// We sort tags so ensure stable ordering.
|
||||||
func toTagArray(tags map[string]string) []Tag {
|
func toTagArray(tags map[string]string) []Tag {
|
||||||
var tagArray []Tag
|
var tagArray []Tag
|
||||||
if tags == nil {
|
if tags == nil {
|
||||||
|
@ -289,3 +440,151 @@ func normalizePrefix(prefix string) string {
|
||||||
|
|
||||||
return textutil.NormalizeString(prefix) + suffix
|
return textutil.NormalizeString(prefix) + suffix
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist.
|
||||||
|
// Returns any warning diagnostics for existing parameters.
|
||||||
|
func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, t config.Presets) diag.Diagnostics {
|
||||||
|
var diags diag.Diagnostics
|
||||||
|
|
||||||
|
// Check for existing catalog/schema parameters
|
||||||
|
hasCatalog := false
|
||||||
|
hasSchema := false
|
||||||
|
if job.Parameters != nil {
|
||||||
|
for _, param := range job.Parameters {
|
||||||
|
if param.Name == "catalog" {
|
||||||
|
hasCatalog = true
|
||||||
|
diags = diags.Extend(diag.Diagnostics{{
|
||||||
|
Summary: fmt.Sprintf("job %s already has 'catalog' parameter defined; ignoring preset value", key),
|
||||||
|
Severity: diag.Warning,
|
||||||
|
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
|
||||||
|
}})
|
||||||
|
}
|
||||||
|
if param.Name == "schema" {
|
||||||
|
hasSchema = true
|
||||||
|
diags = diags.Extend(diag.Diagnostics{{
|
||||||
|
Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key),
|
||||||
|
Severity: diag.Warning,
|
||||||
|
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
|
||||||
|
}})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize parameters if nil
|
||||||
|
if job.Parameters == nil {
|
||||||
|
job.Parameters = []jobs.JobParameterDefinition{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add catalog parameter if not already present
|
||||||
|
if !hasCatalog && t.Catalog != "" {
|
||||||
|
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
|
||||||
|
Name: "catalog",
|
||||||
|
Default: t.Catalog,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add schema parameter if not already present
|
||||||
|
if !hasSchema && t.Schema != "" {
|
||||||
|
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
|
||||||
|
Name: "schema",
|
||||||
|
Default: t.Schema,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return diags
|
||||||
|
}
|
||||||
|
|
||||||
|
func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key string, job *resources.Job) diag.Diagnostics {
|
||||||
|
var diags diag.Diagnostics
|
||||||
|
for _, t := range job.Tasks {
|
||||||
|
var relPath string
|
||||||
|
var expected string
|
||||||
|
var fix string
|
||||||
|
if t.NotebookTask != nil {
|
||||||
|
relPath = t.NotebookTask.NotebookPath
|
||||||
|
expected = `dbutils.widgets.text\(['"]schema|` +
|
||||||
|
`USE[^)]+schema`
|
||||||
|
fix = " dbutils.widgets.text('catalog')\n" +
|
||||||
|
" dbutils.widgets.text('schema')\n" +
|
||||||
|
" catalog = dbutils.widgets.get('catalog')\n" +
|
||||||
|
" schema = dbutils.widgets.get('schema')\n" +
|
||||||
|
" spark.sql(f'USE {catalog}.{schema}')\n"
|
||||||
|
} else if t.SparkPythonTask != nil {
|
||||||
|
relPath = t.SparkPythonTask.PythonFile
|
||||||
|
expected = `add_argument\(['"]--catalog'|` +
|
||||||
|
`USE[^)]+catalog`
|
||||||
|
fix = " def main():\n" +
|
||||||
|
" parser = argparse.ArgumentParser()\n" +
|
||||||
|
" parser.add_argument('--catalog', required=True)\n" +
|
||||||
|
" parser.add_argument('--schema', '-s', required=True)\n" +
|
||||||
|
" args, unknown = parser.parse_known_args()\n" +
|
||||||
|
" spark.sql(f\"USE {args.catalog}.{args.schema}\")\n"
|
||||||
|
} else if t.SqlTask != nil && t.SqlTask.File != nil {
|
||||||
|
relPath = t.SqlTask.File.Path
|
||||||
|
expected = `:schema|\{\{schema\}\}`
|
||||||
|
fix = " USE CATALOG {{catalog}};\n" +
|
||||||
|
" USE IDENTIFIER({schema});\n"
|
||||||
|
} else {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceDir, err := b.Config.GetLocation("resources.jobs." + key).Directory()
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
localPath, _, err := GetLocalPath(ctx, b, sourceDir, relPath)
|
||||||
|
if err != nil || localPath == "" {
|
||||||
|
// We ignore errors (they're reported by another mutator)
|
||||||
|
// and ignore empty local paths (which means we'd have to download the file)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !fileIncludesPattern(ctx, localPath, expected) {
|
||||||
|
diags = diags.Extend(diag.Diagnostics{{
|
||||||
|
Summary: "Use the 'catalog' and 'schema' parameters provided via 'presets.catalog' and 'presets.schema' using\n\n" + fix,
|
||||||
|
Severity: diag.Recommendation,
|
||||||
|
Locations: []dyn.Location{{
|
||||||
|
File: localPath,
|
||||||
|
Line: 1,
|
||||||
|
Column: 1,
|
||||||
|
}},
|
||||||
|
}})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return diags
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// fullyQualifyName checks if the given name is already qualified with a catalog and schema.
|
||||||
|
// If not, and both catalog and schema are available, it prefixes the name with catalog.schema.
|
||||||
|
// If name is empty, returns name as-is.
|
||||||
|
func fullyQualifyName(name, catalog, schema string) string {
|
||||||
|
if name == "" || catalog == "" || schema == "" {
|
||||||
|
return name
|
||||||
|
}
|
||||||
|
// If it's already qualified (contains at least two '.'), we assume it's fully qualified.
|
||||||
|
parts := strings.Split(name, ".")
|
||||||
|
if len(parts) >= 3 {
|
||||||
|
// Already fully qualified
|
||||||
|
return name
|
||||||
|
}
|
||||||
|
// Otherwise, fully qualify it
|
||||||
|
return fmt.Sprintf("%s.%s.%s", catalog, schema, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func fileIncludesPattern(ctx context.Context, filePath string, expected string) bool {
|
||||||
|
content, err := os.ReadFile(filePath)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf(ctx, "failed to check file %s: %v", filePath, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
matched, err := regexp.MatchString(expected, string(content))
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf(ctx, "failed to check pattern in %s: %v", filePath, err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return matched
|
||||||
|
}
|
||||||
|
|
|
@ -1,376 +0,0 @@
|
||||||
package mutator
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"regexp"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/databricks/cli/bundle"
|
|
||||||
"github.com/databricks/cli/bundle/config"
|
|
||||||
"github.com/databricks/cli/bundle/config/resources"
|
|
||||||
"github.com/databricks/cli/libs/diag"
|
|
||||||
"github.com/databricks/cli/libs/dyn"
|
|
||||||
"github.com/databricks/cli/libs/log"
|
|
||||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
|
||||||
)
|
|
||||||
|
|
||||||
type applyPresetsCatalogSchema struct{}
|
|
||||||
|
|
||||||
// ApplyPresetsCatalogSchema applies catalog and schema presets to bundle resources.
|
|
||||||
func ApplyPresetsCatalogSchema() *applyPresetsCatalogSchema {
|
|
||||||
return &applyPresetsCatalogSchema{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *applyPresetsCatalogSchema) Name() string {
|
|
||||||
return "ApplyPresetsCatalogSchema"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
|
||||||
diags := diag.Diagnostics{}
|
|
||||||
p := b.Config.Presets
|
|
||||||
r := b.Config.Resources
|
|
||||||
|
|
||||||
if p.Catalog == "" && p.Schema == "" {
|
|
||||||
return diags
|
|
||||||
}
|
|
||||||
if (p.Schema == "") || (p.Catalog == "" && p.Schema != "") {
|
|
||||||
return diag.Diagnostics{{
|
|
||||||
Summary: "presets.catalog and presets.schema must always be set together",
|
|
||||||
Severity: diag.Error,
|
|
||||||
Locations: []dyn.Location{b.Config.GetLocation("presets")},
|
|
||||||
}}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Jobs
|
|
||||||
for key, j := range r.Jobs {
|
|
||||||
if j.JobSettings == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, task := range j.Tasks {
|
|
||||||
if task.DbtTask != nil {
|
|
||||||
if task.DbtTask.Catalog == "" {
|
|
||||||
task.DbtTask.Catalog = p.Catalog
|
|
||||||
}
|
|
||||||
if task.DbtTask.Schema == "" {
|
|
||||||
task.DbtTask.Schema = p.Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
diags = diags.Extend(addCatalogSchemaParameters(b, key, j, p))
|
|
||||||
diags = diags.Extend(recommendCatalogSchemaUsage(b, ctx, key, j))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pipelines
|
|
||||||
allSameCatalog := allPipelinesSameCatalog(&r)
|
|
||||||
for key, pl := range r.Pipelines {
|
|
||||||
if pl.PipelineSpec == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if pl.Catalog == "" {
|
|
||||||
pl.Catalog = p.Catalog
|
|
||||||
}
|
|
||||||
if allSameCatalog && pl.Catalog == p.Catalog {
|
|
||||||
// Just for the common case where all pipelines have the same catalog,
|
|
||||||
// we show a recommendation to leave it out and rely on presets.
|
|
||||||
// This can happen when using the original default template.
|
|
||||||
diags = diags.Extend(diag.Diagnostics{{
|
|
||||||
Summary: "Omit the catalog field since it will be automatically populated from presets.catalog",
|
|
||||||
Severity: diag.Recommendation,
|
|
||||||
Locations: b.Config.GetLocations("resources.pipelines." + key + ".catalog"),
|
|
||||||
}})
|
|
||||||
}
|
|
||||||
if pl.GatewayDefinition != nil {
|
|
||||||
if pl.GatewayDefinition.GatewayStorageCatalog == "" {
|
|
||||||
pl.GatewayDefinition.GatewayStorageCatalog = p.Catalog
|
|
||||||
}
|
|
||||||
if pl.GatewayDefinition.GatewayStorageSchema == "" {
|
|
||||||
pl.GatewayDefinition.GatewayStorageSchema = p.Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if pl.IngestionDefinition != nil {
|
|
||||||
for _, obj := range pl.IngestionDefinition.Objects {
|
|
||||||
if obj.Report != nil {
|
|
||||||
if obj.Report.DestinationCatalog == "" {
|
|
||||||
obj.Report.DestinationCatalog = p.Catalog
|
|
||||||
}
|
|
||||||
if obj.Report.DestinationSchema == "" {
|
|
||||||
obj.Report.DestinationSchema = p.Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if obj.Schema != nil {
|
|
||||||
if obj.Schema.SourceCatalog == "" {
|
|
||||||
obj.Schema.SourceCatalog = p.Catalog
|
|
||||||
}
|
|
||||||
if obj.Schema.SourceSchema == "" {
|
|
||||||
obj.Schema.SourceSchema = p.Schema
|
|
||||||
}
|
|
||||||
if obj.Schema.DestinationCatalog == "" {
|
|
||||||
obj.Schema.DestinationCatalog = p.Catalog
|
|
||||||
}
|
|
||||||
if obj.Schema.DestinationSchema == "" {
|
|
||||||
obj.Schema.DestinationSchema = p.Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if obj.Table != nil {
|
|
||||||
if obj.Table.SourceCatalog == "" {
|
|
||||||
obj.Table.SourceCatalog = p.Catalog
|
|
||||||
}
|
|
||||||
if obj.Table.SourceSchema == "" {
|
|
||||||
obj.Table.SourceSchema = p.Schema
|
|
||||||
}
|
|
||||||
if obj.Table.DestinationCatalog == "" {
|
|
||||||
obj.Table.DestinationCatalog = p.Catalog
|
|
||||||
}
|
|
||||||
if obj.Table.DestinationSchema == "" {
|
|
||||||
obj.Table.DestinationSchema = p.Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Model serving endpoints
|
|
||||||
for _, e := range r.ModelServingEndpoints {
|
|
||||||
if e.CreateServingEndpoint == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if e.CreateServingEndpoint.AiGateway != nil && e.CreateServingEndpoint.AiGateway.InferenceTableConfig != nil {
|
|
||||||
if e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName == "" {
|
|
||||||
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName = p.Catalog
|
|
||||||
}
|
|
||||||
if e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName == "" {
|
|
||||||
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName = p.Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if e.CreateServingEndpoint.Config.AutoCaptureConfig != nil {
|
|
||||||
if e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName == "" {
|
|
||||||
e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName = p.Catalog
|
|
||||||
}
|
|
||||||
if e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName == "" {
|
|
||||||
e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName = p.Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range e.CreateServingEndpoint.Config.ServedEntities {
|
|
||||||
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName = fullyQualifyName(
|
|
||||||
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName, p.Catalog, p.Schema,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
for i := range e.CreateServingEndpoint.Config.ServedModels {
|
|
||||||
e.CreateServingEndpoint.Config.ServedModels[i].ModelName = fullyQualifyName(
|
|
||||||
e.CreateServingEndpoint.Config.ServedModels[i].ModelName, p.Catalog, p.Schema,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Registered models
|
|
||||||
for _, m := range r.RegisteredModels {
|
|
||||||
if m.CreateRegisteredModelRequest == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if m.CatalogName == "" {
|
|
||||||
m.CatalogName = p.Catalog
|
|
||||||
}
|
|
||||||
if m.SchemaName == "" {
|
|
||||||
m.SchemaName = p.Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Quality monitors
|
|
||||||
for _, q := range r.QualityMonitors {
|
|
||||||
if q.CreateMonitor == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
q.TableName = fullyQualifyName(q.TableName, p.Catalog, p.Schema)
|
|
||||||
if q.OutputSchemaName == "" {
|
|
||||||
q.OutputSchemaName = p.Catalog + "." + p.Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Schemas
|
|
||||||
for _, s := range r.Schemas {
|
|
||||||
if s.CreateSchema == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if s.CatalogName == "" {
|
|
||||||
s.CatalogName = p.Catalog
|
|
||||||
}
|
|
||||||
if s.Name == "" {
|
|
||||||
// If there is a schema preset such as 'dev', we directly
|
|
||||||
// use that name and don't add any prefix (which might result in dev_dev).
|
|
||||||
s.Name = p.Schema
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return diags
|
|
||||||
}
|
|
||||||
|
|
||||||
// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist.
|
|
||||||
// Returns any warning diagnostics for existing parameters.
|
|
||||||
func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, p config.Presets) diag.Diagnostics {
|
|
||||||
var diags diag.Diagnostics
|
|
||||||
|
|
||||||
// Check for existing catalog/schema parameters
|
|
||||||
hasCatalog := false
|
|
||||||
hasSchema := false
|
|
||||||
if job.Parameters != nil {
|
|
||||||
for _, param := range job.Parameters {
|
|
||||||
if param.Name == "catalog" {
|
|
||||||
hasCatalog = true
|
|
||||||
diags = diags.Extend(diag.Diagnostics{{
|
|
||||||
Summary: fmt.Sprintf("job %s already has 'catalog' parameter defined; ignoring preset value", key),
|
|
||||||
Severity: diag.Warning,
|
|
||||||
Locations: b.Config.GetLocations("resources.jobs." + key + ".parameters"),
|
|
||||||
}})
|
|
||||||
}
|
|
||||||
if param.Name == "schema" {
|
|
||||||
hasSchema = true
|
|
||||||
diags = diags.Extend(diag.Diagnostics{{
|
|
||||||
Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key),
|
|
||||||
Severity: diag.Warning,
|
|
||||||
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
|
|
||||||
}})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize parameters if nil
|
|
||||||
if job.Parameters == nil {
|
|
||||||
job.Parameters = []jobs.JobParameterDefinition{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add catalog parameter if not already present
|
|
||||||
if !hasCatalog {
|
|
||||||
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
|
|
||||||
Name: "catalog",
|
|
||||||
Default: p.Catalog,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add schema parameter if not already present
|
|
||||||
if !hasSchema {
|
|
||||||
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
|
|
||||||
Name: "schema",
|
|
||||||
Default: p.Schema,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return diags
|
|
||||||
}
|
|
||||||
|
|
||||||
func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key string, job *resources.Job) diag.Diagnostics {
|
|
||||||
var diags diag.Diagnostics
|
|
||||||
for _, t := range job.Tasks {
|
|
||||||
var relPath string
|
|
||||||
var expected string
|
|
||||||
var fix string
|
|
||||||
if t.NotebookTask != nil {
|
|
||||||
relPath = t.NotebookTask.NotebookPath
|
|
||||||
expected = `dbutils.widgets.text\(['"]schema|` +
|
|
||||||
`USE[^)]+schema`
|
|
||||||
fix = " dbutils.widgets.text('catalog')\n" +
|
|
||||||
" dbutils.widgets.text('schema')\n" +
|
|
||||||
" catalog = dbutils.widgets.get('catalog')\n" +
|
|
||||||
" schema = dbutils.widgets.get('schema')\n" +
|
|
||||||
" spark.sql(f'USE {catalog}.{schema}')\n"
|
|
||||||
} else if t.SparkPythonTask != nil {
|
|
||||||
relPath = t.SparkPythonTask.PythonFile
|
|
||||||
expected = `add_argument\(['"]--catalog'|` +
|
|
||||||
`USE[^)]+catalog`
|
|
||||||
fix = " def main():\n" +
|
|
||||||
" parser = argparse.ArgumentParser()\n" +
|
|
||||||
" parser.add_argument('--catalog', required=True)\n" +
|
|
||||||
" parser.add_argument('--schema', '-s', required=True)\n" +
|
|
||||||
" args, unknown = parser.parse_known_args()\n" +
|
|
||||||
" spark.sql(f\"USE {args.catalog}.{args.schema}\")\n"
|
|
||||||
} else if t.SqlTask != nil && t.SqlTask.File != nil {
|
|
||||||
relPath = t.SqlTask.File.Path
|
|
||||||
expected = `:schema|\{\{schema\}\}`
|
|
||||||
fix = " USE CATALOG {{catalog}};\n" +
|
|
||||||
" USE IDENTIFIER({schema});\n"
|
|
||||||
} else {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
sourceDir, err := b.Config.GetLocation("resources.jobs." + key).Directory()
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
localPath, _, err := GetLocalPath(ctx, b, sourceDir, relPath)
|
|
||||||
if err != nil || localPath == "" {
|
|
||||||
// We ignore errors (they're reported by another mutator)
|
|
||||||
// and ignore empty local paths (which means we'd have to download the file)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if !fileIncludesPattern(ctx, localPath, expected) {
|
|
||||||
diags = diags.Extend(diag.Diagnostics{{
|
|
||||||
Summary: "Use the 'catalog' and 'schema' parameters provided via 'presets.catalog' and 'presets.schema' using\n\n" + fix,
|
|
||||||
Severity: diag.Recommendation,
|
|
||||||
Locations: []dyn.Location{{
|
|
||||||
File: localPath,
|
|
||||||
Line: 1,
|
|
||||||
Column: 1,
|
|
||||||
}},
|
|
||||||
}})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return diags
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// fullyQualifyName checks if the given name is already qualified with a catalog and schema.
|
|
||||||
// If not, and both catalog and schema are available, it prefixes the name with catalog.schema.
|
|
||||||
// If name is empty, returns name as-is.
|
|
||||||
func fullyQualifyName(name, catalog, schema string) string {
|
|
||||||
if name == "" || catalog == "" || schema == "" {
|
|
||||||
return name
|
|
||||||
}
|
|
||||||
// If it's already qualified (contains at least two '.'), we assume it's fully qualified.
|
|
||||||
parts := strings.Split(name, ".")
|
|
||||||
if len(parts) >= 3 {
|
|
||||||
// Already fully qualified
|
|
||||||
return name
|
|
||||||
}
|
|
||||||
// Otherwise, fully qualify it
|
|
||||||
return fmt.Sprintf("%s.%s.%s", catalog, schema, name)
|
|
||||||
}
|
|
||||||
|
|
||||||
func fileIncludesPattern(ctx context.Context, filePath string, expected string) bool {
|
|
||||||
content, err := os.ReadFile(filePath)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf(ctx, "failed to check file %s: %v", filePath, err)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
matched, err := regexp.MatchString(expected, string(content))
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf(ctx, "failed to check pattern in %s: %v", filePath, err)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return matched
|
|
||||||
}
|
|
||||||
|
|
||||||
func allPipelinesSameCatalog(r *config.Resources) bool {
|
|
||||||
var firstCatalog string
|
|
||||||
|
|
||||||
for _, pl := range r.Pipelines {
|
|
||||||
if pl.PipelineSpec == nil || pl.PipelineSpec.Catalog == "" {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if firstCatalog == "" {
|
|
||||||
firstCatalog = pl.PipelineSpec.Catalog
|
|
||||||
} else if pl.PipelineSpec.Catalog != firstCatalog {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return firstCatalog != ""
|
|
||||||
}
|
|
|
@ -1,358 +0,0 @@
|
||||||
package mutator_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"reflect"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/databricks/cli/bundle"
|
|
||||||
"github.com/databricks/cli/bundle/config"
|
|
||||||
"github.com/databricks/cli/bundle/config/mutator"
|
|
||||||
"github.com/databricks/cli/bundle/config/resources"
|
|
||||||
"github.com/databricks/cli/libs/dyn"
|
|
||||||
"github.com/databricks/databricks-sdk-go/service/catalog"
|
|
||||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
|
||||||
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
|
||||||
"github.com/databricks/databricks-sdk-go/service/serving"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
type recordedField struct {
|
|
||||||
Path dyn.Path
|
|
||||||
PathString string
|
|
||||||
Placeholder string
|
|
||||||
Expected string
|
|
||||||
}
|
|
||||||
|
|
||||||
func mockPresetsCatalogSchema() *bundle.Bundle {
|
|
||||||
return &bundle.Bundle{
|
|
||||||
Config: config.Root{
|
|
||||||
Resources: config.Resources{
|
|
||||||
Jobs: map[string]*resources.Job{
|
|
||||||
"key": {
|
|
||||||
JobSettings: &jobs.JobSettings{
|
|
||||||
Name: "job",
|
|
||||||
Parameters: []jobs.JobParameterDefinition{
|
|
||||||
{Name: "catalog", Default: "<catalog>"},
|
|
||||||
{Name: "schema", Default: "<schema>"},
|
|
||||||
},
|
|
||||||
Tasks: []jobs.Task{
|
|
||||||
{
|
|
||||||
DbtTask: &jobs.DbtTask{
|
|
||||||
Catalog: "<catalog>",
|
|
||||||
Schema: "<schema>",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
SparkPythonTask: &jobs.SparkPythonTask{
|
|
||||||
PythonFile: "/file",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
NotebookTask: &jobs.NotebookTask{
|
|
||||||
NotebookPath: "/notebook",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Pipelines: map[string]*resources.Pipeline{
|
|
||||||
"key": {
|
|
||||||
PipelineSpec: &pipelines.PipelineSpec{
|
|
||||||
Name: "pipeline",
|
|
||||||
Catalog: "<catalog>",
|
|
||||||
Target: "<schema>",
|
|
||||||
GatewayDefinition: &pipelines.IngestionGatewayPipelineDefinition{
|
|
||||||
GatewayStorageCatalog: "<catalog>",
|
|
||||||
GatewayStorageSchema: "<schema>",
|
|
||||||
},
|
|
||||||
IngestionDefinition: &pipelines.IngestionPipelineDefinition{
|
|
||||||
Objects: []pipelines.IngestionConfig{
|
|
||||||
{
|
|
||||||
Report: &pipelines.ReportSpec{
|
|
||||||
DestinationCatalog: "<catalog>",
|
|
||||||
DestinationSchema: "<schema>",
|
|
||||||
},
|
|
||||||
Schema: &pipelines.SchemaSpec{
|
|
||||||
SourceCatalog: "<catalog>",
|
|
||||||
SourceSchema: "<schema>",
|
|
||||||
DestinationCatalog: "<catalog>",
|
|
||||||
DestinationSchema: "<schema>",
|
|
||||||
},
|
|
||||||
Table: &pipelines.TableSpec{
|
|
||||||
SourceCatalog: "<catalog>",
|
|
||||||
SourceSchema: "<schema>",
|
|
||||||
DestinationCatalog: "<catalog>",
|
|
||||||
DestinationSchema: "<schema>",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
ModelServingEndpoints: map[string]*resources.ModelServingEndpoint{
|
|
||||||
"key": {
|
|
||||||
CreateServingEndpoint: &serving.CreateServingEndpoint{
|
|
||||||
Name: "serving",
|
|
||||||
AiGateway: &serving.AiGatewayConfig{
|
|
||||||
InferenceTableConfig: &serving.AiGatewayInferenceTableConfig{
|
|
||||||
CatalogName: "<catalog>",
|
|
||||||
SchemaName: "<schema>",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Config: serving.EndpointCoreConfigInput{
|
|
||||||
AutoCaptureConfig: &serving.AutoCaptureConfigInput{
|
|
||||||
CatalogName: "<catalog>",
|
|
||||||
SchemaName: "<schema>",
|
|
||||||
},
|
|
||||||
ServedEntities: []serving.ServedEntityInput{
|
|
||||||
{EntityName: "<catalog>.<schema>.entity"},
|
|
||||||
},
|
|
||||||
ServedModels: []serving.ServedModelInput{
|
|
||||||
{ModelName: "<catalog>.<schema>.model"},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
RegisteredModels: map[string]*resources.RegisteredModel{
|
|
||||||
"key": {
|
|
||||||
CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{
|
|
||||||
Name: "registered_model",
|
|
||||||
CatalogName: "<catalog>",
|
|
||||||
SchemaName: "<schema>",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
QualityMonitors: map[string]*resources.QualityMonitor{
|
|
||||||
"key": {
|
|
||||||
TableName: "<catalog>.<schema>.table",
|
|
||||||
CreateMonitor: &catalog.CreateMonitor{
|
|
||||||
OutputSchemaName: "<catalog>.<schema>",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Schemas: map[string]*resources.Schema{
|
|
||||||
"key": {
|
|
||||||
CreateSchema: &catalog.CreateSchema{
|
|
||||||
Name: "<schema>",
|
|
||||||
CatalogName: "<catalog>",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Presets: config.Presets{
|
|
||||||
Catalog: "my_catalog",
|
|
||||||
Schema: "my_schema",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ignoredFields are fields that should be ignored in the completeness check
|
|
||||||
var ignoredFields = map[string]string{
|
|
||||||
"resources.pipelines.key.schema": "schema is still in private preview",
|
|
||||||
"resources.jobs.key.tasks[0].notebook_task.base_parameters": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].python_wheel_task.named_parameters": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].python_wheel_task.parameters": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].run_job_task.job_parameters": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].spark_jar_task.parameters": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].spark_python_task.parameters": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].spark_submit_task.parameters": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].sql_task.parameters": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].run_job_task.jar_params": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].run_job_task.notebook_params": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].run_job_task.pipeline_params": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].run_job_task.python_named_params": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].run_job_task.python_params": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].run_job_task.spark_submit_params": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.jobs.key.tasks[0].run_job_task.sql_params": "catalog/schema are passed via job parameters",
|
|
||||||
"resources.pipelines.key.ingestion_definition.objects[0].schema": "schema name is under schema.source_schema/destination_schema",
|
|
||||||
"resources.schemas": "schema name of schemas is under resources.schemas.key.Name",
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestApplyPresetsCatalogSchemaWhenAlreadySet(t *testing.T) {
|
|
||||||
b := mockPresetsCatalogSchema()
|
|
||||||
recordedFields := recordPlaceholderFields(t, b)
|
|
||||||
|
|
||||||
diags := bundle.Apply(context.Background(), b, mutator.ApplyPresetsCatalogSchema())
|
|
||||||
require.NoError(t, diags.Error())
|
|
||||||
|
|
||||||
for _, f := range recordedFields {
|
|
||||||
val, err := dyn.GetByPath(b.Config.Value(), f.Path)
|
|
||||||
require.NoError(t, err, "failed to get path %s", f.Path)
|
|
||||||
require.Equal(t, f.Placeholder, val.MustString(),
|
|
||||||
"expected placeholder '%s' at %s to remain unchanged before cleanup", f.Placeholder, f.Path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestApplyPresetsCatalogSchemaRecommmendRemovingCatalog(t *testing.T) {
|
|
||||||
b := mockPresetsCatalogSchema()
|
|
||||||
b.Config.Resources.Jobs["key"].Parameters = nil // avoid warnings about the job parameters
|
|
||||||
b.Config.Resources.Pipelines["key"].Catalog = "my_catalog"
|
|
||||||
|
|
||||||
diags := bundle.Apply(context.Background(), b, mutator.ApplyPresetsCatalogSchema())
|
|
||||||
require.Equal(t, 1, len(diags))
|
|
||||||
require.Equal(t, "Omit the catalog field since it will be automatically populated from presets.catalog", diags[0].Summary)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestApplyPresetsCatalogSchemaWhenNotSet(t *testing.T) {
|
|
||||||
b := mockPresetsCatalogSchema()
|
|
||||||
recordedFields := recordPlaceholderFields(t, b)
|
|
||||||
|
|
||||||
// Set all catalog/schema fields to empty strings / nil
|
|
||||||
require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) {
|
|
||||||
for _, f := range recordedFields {
|
|
||||||
value, err := dyn.GetByPath(root, f.Path)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
val := value.MustString()
|
|
||||||
cleanedVal := removePlaceholders(val)
|
|
||||||
root, err = dyn.SetByPath(root, f.Path, dyn.V(cleanedVal))
|
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
|
||||||
return dyn.Set(root, "resources.jobs.key.parameters", dyn.NilValue)
|
|
||||||
}))
|
|
||||||
|
|
||||||
// Apply catalog/schema presets
|
|
||||||
diags := bundle.Apply(context.Background(), b, mutator.ApplyPresetsCatalogSchema())
|
|
||||||
require.NoError(t, diags.Error())
|
|
||||||
|
|
||||||
// Verify that all catalog/schema fields have been set to the presets
|
|
||||||
for _, f := range recordedFields {
|
|
||||||
val, err := dyn.GetByPath(b.Config.Value(), f.Path)
|
|
||||||
require.NoError(t, err, "could not find expected field(s) at %s", f.Path)
|
|
||||||
assert.Equal(t, f.Expected, val.MustString(), "preset value expected for %s based on placeholder %s", f.Path, f.Placeholder)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestApplyPresetsCatalogSchemaCompleteness(t *testing.T) {
|
|
||||||
b := mockPresetsCatalogSchema()
|
|
||||||
recordedFields := recordPlaceholderFields(t, b)
|
|
||||||
|
|
||||||
// Convert the recordedFields to a set for easier lookup
|
|
||||||
recordedPaths := make(map[string]struct{})
|
|
||||||
for _, field := range recordedFields {
|
|
||||||
recordedPaths[field.PathString] = struct{}{}
|
|
||||||
if i := strings.Index(field.PathString, "["); i >= 0 {
|
|
||||||
// For entries like resources.jobs.key.parameters[1].default, just add resources.jobs.key.parameters
|
|
||||||
recordedPaths[field.PathString[:i]] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find all catalog/schema fields that we think should be covered based
|
|
||||||
// on all properties in config.Resources.
|
|
||||||
expectedFields := findCatalogSchemaFields()
|
|
||||||
assert.GreaterOrEqual(t, len(expectedFields), 42, "expected at least 42 catalog/schema fields, but got %d", len(expectedFields))
|
|
||||||
|
|
||||||
// Verify that all expected fields are there
|
|
||||||
for _, field := range expectedFields {
|
|
||||||
if _, recorded := recordedPaths[field]; !recorded {
|
|
||||||
if _, ignored := ignoredFields[field]; !ignored {
|
|
||||||
t.Errorf("Field %s was not included in the catalog/schema presets test. If this is a new field, please add it to PresetsMock or PresetsIgnoredFields and add support for it as appropriate.", field)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// recordPlaceholderFields scans the config and records all fields containing catalog/schema placeholders
|
|
||||||
func recordPlaceholderFields(t *testing.T, b *bundle.Bundle) []recordedField {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
var recordedFields []recordedField
|
|
||||||
err := b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) {
|
|
||||||
_, err := dyn.Walk(b.Config.Value(), func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
|
||||||
if v.Kind() == dyn.KindString {
|
|
||||||
val := v.MustString()
|
|
||||||
if strings.Contains(val, "<catalog>") || strings.Contains(val, "<schema>") {
|
|
||||||
pathCopy := make(dyn.Path, len(p))
|
|
||||||
copy(pathCopy, p)
|
|
||||||
recordedFields = append(recordedFields, recordedField{
|
|
||||||
Path: pathCopy,
|
|
||||||
PathString: pathCopy.String(),
|
|
||||||
Placeholder: val,
|
|
||||||
Expected: replacePlaceholders(val, "my_catalog", "my_schema"),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return v, nil
|
|
||||||
})
|
|
||||||
return root, err
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
return recordedFields
|
|
||||||
}
|
|
||||||
|
|
||||||
// findCatalogSchemaFields finds all fields in config.Resources that might refer
|
|
||||||
// to a catalog or schema. Returns a slice of field paths.
|
|
||||||
func findCatalogSchemaFields() []string {
|
|
||||||
visited := make(map[reflect.Type]struct{})
|
|
||||||
var results []string
|
|
||||||
|
|
||||||
// verifyTypeFields is a recursive function to verify the fields of a given type
|
|
||||||
var walkTypeFields func(rt reflect.Type, path string)
|
|
||||||
walkTypeFields = func(rt reflect.Type, path string) {
|
|
||||||
if _, seen := visited[rt]; seen {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
visited[rt] = struct{}{}
|
|
||||||
|
|
||||||
switch rt.Kind() {
|
|
||||||
case reflect.Slice, reflect.Array:
|
|
||||||
walkTypeFields(rt.Elem(), path+"[0]")
|
|
||||||
case reflect.Map:
|
|
||||||
walkTypeFields(rt.Elem(), path+".key")
|
|
||||||
case reflect.Ptr:
|
|
||||||
walkTypeFields(rt.Elem(), path)
|
|
||||||
case reflect.Struct:
|
|
||||||
for i := 0; i < rt.NumField(); i++ {
|
|
||||||
ft := rt.Field(i)
|
|
||||||
jsonTag := ft.Tag.Get("json")
|
|
||||||
if jsonTag == "" || jsonTag == "-" {
|
|
||||||
// Ignore field names when there's no JSON tag, e.g. for Jobs.JobSettings
|
|
||||||
walkTypeFields(ft.Type, path)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
fieldName := strings.Split(jsonTag, ",")[0]
|
|
||||||
fieldPath := path + "." + fieldName
|
|
||||||
|
|
||||||
if isCatalogOrSchemaField(fieldName) {
|
|
||||||
results = append(results, fieldPath)
|
|
||||||
}
|
|
||||||
|
|
||||||
walkTypeFields(ft.Type, fieldPath)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var r config.Resources
|
|
||||||
walkTypeFields(reflect.TypeOf(r), "resources")
|
|
||||||
return results
|
|
||||||
}
|
|
||||||
|
|
||||||
// isCatalogOrSchemaField returns true for a field names in config.Resources that we suspect could contain a catalog or schema name
|
|
||||||
func isCatalogOrSchemaField(name string) bool {
|
|
||||||
return strings.Contains(name, "catalog") ||
|
|
||||||
strings.Contains(name, "schema") ||
|
|
||||||
strings.Contains(name, "parameters") ||
|
|
||||||
strings.Contains(name, "params")
|
|
||||||
}
|
|
||||||
|
|
||||||
func removePlaceholders(value string) string {
|
|
||||||
value = strings.ReplaceAll(value, "<catalog>.", "")
|
|
||||||
value = strings.ReplaceAll(value, "<schema>.", "")
|
|
||||||
value = strings.ReplaceAll(value, "<catalog>", "")
|
|
||||||
value = strings.ReplaceAll(value, "<schema>", "")
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
|
|
||||||
func replacePlaceholders(placeholder, catalog, schema string) string {
|
|
||||||
expected := strings.ReplaceAll(placeholder, "<catalog>", catalog)
|
|
||||||
expected = strings.ReplaceAll(expected, "<schema>", schema)
|
|
||||||
return expected
|
|
||||||
}
|
|
|
@ -2,7 +2,9 @@ package mutator_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"reflect"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/databricks/cli/bundle"
|
"github.com/databricks/cli/bundle"
|
||||||
|
@ -14,9 +16,19 @@ import (
|
||||||
"github.com/databricks/cli/libs/dyn"
|
"github.com/databricks/cli/libs/dyn"
|
||||||
"github.com/databricks/databricks-sdk-go/service/catalog"
|
"github.com/databricks/databricks-sdk-go/service/catalog"
|
||||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||||
|
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
||||||
|
"github.com/databricks/databricks-sdk-go/service/serving"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type recordedField struct {
|
||||||
|
Path dyn.Path
|
||||||
|
PathString string
|
||||||
|
Placeholder string
|
||||||
|
Expected string
|
||||||
|
}
|
||||||
|
|
||||||
func TestApplyPresetsPrefix(t *testing.T) {
|
func TestApplyPresetsPrefix(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -453,3 +465,335 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func PresetsMock() *bundle.Bundle {
|
||||||
|
return &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Resources: config.Resources{
|
||||||
|
Jobs: map[string]*resources.Job{
|
||||||
|
"key": {
|
||||||
|
JobSettings: &jobs.JobSettings{
|
||||||
|
Name: "job",
|
||||||
|
Parameters: []jobs.JobParameterDefinition{
|
||||||
|
{Name: "catalog", Default: "<catalog>"},
|
||||||
|
{Name: "schema", Default: "<schema>"},
|
||||||
|
},
|
||||||
|
Tasks: []jobs.Task{
|
||||||
|
{
|
||||||
|
DbtTask: &jobs.DbtTask{
|
||||||
|
Catalog: "<catalog>",
|
||||||
|
Schema: "<schema>",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
SparkPythonTask: &jobs.SparkPythonTask{
|
||||||
|
PythonFile: "/file",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
NotebookTask: &jobs.NotebookTask{
|
||||||
|
NotebookPath: "/notebook",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Pipelines: map[string]*resources.Pipeline{
|
||||||
|
"key": {
|
||||||
|
PipelineSpec: &pipelines.PipelineSpec{
|
||||||
|
Name: "pipeline",
|
||||||
|
Catalog: "<catalog>",
|
||||||
|
Target: "<schema>",
|
||||||
|
GatewayDefinition: &pipelines.IngestionGatewayPipelineDefinition{
|
||||||
|
GatewayStorageCatalog: "<catalog>",
|
||||||
|
GatewayStorageSchema: "<schema>",
|
||||||
|
},
|
||||||
|
IngestionDefinition: &pipelines.IngestionPipelineDefinition{
|
||||||
|
Objects: []pipelines.IngestionConfig{
|
||||||
|
{
|
||||||
|
Report: &pipelines.ReportSpec{
|
||||||
|
DestinationCatalog: "<catalog>",
|
||||||
|
DestinationSchema: "<schema>",
|
||||||
|
},
|
||||||
|
Schema: &pipelines.SchemaSpec{
|
||||||
|
SourceCatalog: "<catalog>",
|
||||||
|
SourceSchema: "<schema>",
|
||||||
|
DestinationCatalog: "<catalog>",
|
||||||
|
DestinationSchema: "<schema>",
|
||||||
|
},
|
||||||
|
Table: &pipelines.TableSpec{
|
||||||
|
SourceCatalog: "<catalog>",
|
||||||
|
SourceSchema: "<schema>",
|
||||||
|
DestinationCatalog: "<catalog>",
|
||||||
|
DestinationSchema: "<schema>",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
ModelServingEndpoints: map[string]*resources.ModelServingEndpoint{
|
||||||
|
"key": {
|
||||||
|
CreateServingEndpoint: &serving.CreateServingEndpoint{
|
||||||
|
Name: "serving",
|
||||||
|
AiGateway: &serving.AiGatewayConfig{
|
||||||
|
InferenceTableConfig: &serving.AiGatewayInferenceTableConfig{
|
||||||
|
CatalogName: "<catalog>",
|
||||||
|
SchemaName: "<schema>",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Config: serving.EndpointCoreConfigInput{
|
||||||
|
AutoCaptureConfig: &serving.AutoCaptureConfigInput{
|
||||||
|
CatalogName: "<catalog>",
|
||||||
|
SchemaName: "<schema>",
|
||||||
|
},
|
||||||
|
ServedEntities: []serving.ServedEntityInput{
|
||||||
|
{EntityName: "<catalog>.<schema>.entity"},
|
||||||
|
},
|
||||||
|
ServedModels: []serving.ServedModelInput{
|
||||||
|
{ModelName: "<catalog>.<schema>.model"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
RegisteredModels: map[string]*resources.RegisteredModel{
|
||||||
|
"key": {
|
||||||
|
CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{
|
||||||
|
Name: "registered_model",
|
||||||
|
CatalogName: "<catalog>",
|
||||||
|
SchemaName: "<schema>",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
QualityMonitors: map[string]*resources.QualityMonitor{
|
||||||
|
"key": {
|
||||||
|
TableName: "<catalog>.<schema>.table",
|
||||||
|
CreateMonitor: &catalog.CreateMonitor{
|
||||||
|
OutputSchemaName: "<catalog>.<schema>",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Schemas: map[string]*resources.Schema{
|
||||||
|
"key": {
|
||||||
|
CreateSchema: &catalog.CreateSchema{
|
||||||
|
Name: "<schema>",
|
||||||
|
CatalogName: "<catalog>",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var PresetsIgnoredFields = map[string]string{
|
||||||
|
// Any fields that should be ignored in the completeness check
|
||||||
|
// Example:
|
||||||
|
// "resources.jobs.object.schema_something": "this property doesn't relate to the catalog/schema",
|
||||||
|
"resources.pipelines.key.schema": "schema is still in private preview",
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApplyPresetsCatalogSchema(t *testing.T) {
|
||||||
|
b := PresetsMock()
|
||||||
|
b.Config.Presets = config.Presets{
|
||||||
|
Catalog: "my_catalog",
|
||||||
|
Schema: "my_schema",
|
||||||
|
}
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Initial scan: record all fields that contain placeholders.
|
||||||
|
// We do this before the first apply so we can verify no changes occur.
|
||||||
|
var recordedFields []recordedField
|
||||||
|
require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) {
|
||||||
|
_, err := dyn.Walk(b.Config.Value(), func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
||||||
|
if v.Kind() == dyn.KindString {
|
||||||
|
val := v.MustString()
|
||||||
|
if strings.Contains(val, "<catalog>") || strings.Contains(val, "<schema>") {
|
||||||
|
pathCopy := make(dyn.Path, len(p))
|
||||||
|
copy(pathCopy, p)
|
||||||
|
recordedFields = append(recordedFields, recordedField{
|
||||||
|
Path: pathCopy,
|
||||||
|
PathString: pathCopy.String(),
|
||||||
|
Placeholder: val,
|
||||||
|
Expected: replacePlaceholders(val, "my_catalog", "my_schema"),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return v, nil
|
||||||
|
})
|
||||||
|
return root, err
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Stage 1: Apply presets before cleanup, should be no-op.
|
||||||
|
diags := bundle.Apply(ctx, b, mutator.ApplyPresets())
|
||||||
|
require.False(t, diags.HasError(), "unexpected error before cleanup: %v", diags.Error())
|
||||||
|
|
||||||
|
// Verify that no recorded fields changed
|
||||||
|
verifyNoChangesBeforeCleanup(t, b.Config.Value(), recordedFields)
|
||||||
|
|
||||||
|
// Stage 2: Cleanup: Walk over rootVal and remove placeholders, adjusting recordedFields Expected values.
|
||||||
|
require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) {
|
||||||
|
for _, f := range recordedFields {
|
||||||
|
value, err := dyn.GetByPath(root, f.Path)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
val := value.MustString()
|
||||||
|
cleanedVal := removePlaceholders(val)
|
||||||
|
root, err = dyn.SetByPath(root, f.Path, dyn.V(cleanedVal))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
root, err := dyn.Set(root, "resources.jobs.key.parameters", dyn.NilValue)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return root, nil
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Stage 3: Apply presets after cleanup.
|
||||||
|
diags = bundle.Apply(ctx, b, mutator.ApplyPresets())
|
||||||
|
require.False(t, diags.HasError(), "unexpected error after cleanup: %v", diags.Error())
|
||||||
|
|
||||||
|
// Verify that fields have the expected replacements
|
||||||
|
config := b.Config.Value()
|
||||||
|
for _, f := range recordedFields {
|
||||||
|
val, err := dyn.GetByPath(config, f.Path)
|
||||||
|
require.NoError(t, err, "failed to get path %s", f.Path)
|
||||||
|
assert.Equal(t, f.Expected, val.MustString(), "preset value expected for %s based on placeholder %s", f.Path, f.Placeholder)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stage 4: Check completeness
|
||||||
|
checkCompleteness(t, recordedFields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyNoChangesBeforeCleanup(t *testing.T, rootVal dyn.Value, recordedFields []recordedField) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
for _, f := range recordedFields {
|
||||||
|
val, err := dyn.GetByPath(rootVal, f.Path)
|
||||||
|
require.NoError(t, err, "failed to get path %s", f.Path)
|
||||||
|
require.Equal(t, f.Placeholder, val.MustString(),
|
||||||
|
"expected placeholder '%s' at %s to remain unchanged before cleanup", f.Placeholder, f.Path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkCompleteness(t *testing.T, recordedFields []recordedField) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
// Build a set for recorded fields
|
||||||
|
recordedSet := make(map[string]struct{})
|
||||||
|
for _, field := range recordedFields {
|
||||||
|
recordedSet[field.PathString] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Obtain the type of config.Resources
|
||||||
|
var r config.Resources
|
||||||
|
resourcesType := reflect.TypeOf(r)
|
||||||
|
|
||||||
|
// Track missing fields
|
||||||
|
var missingFields []string
|
||||||
|
|
||||||
|
// Keep track of visited types to prevent infinite loops (cycles)
|
||||||
|
visited := make(map[reflect.Type]struct{})
|
||||||
|
|
||||||
|
// Helper function to handle maps, slices, arrays, and nested pointers/interfaces
|
||||||
|
verifyFieldType := func(fieldType reflect.Type, path string, fn func(reflect.Type, string)) {
|
||||||
|
switch fieldType.Kind() {
|
||||||
|
case reflect.Slice, reflect.Array:
|
||||||
|
// For arrays/slices, inspect the element type
|
||||||
|
fn(fieldType.Elem(), path+"[0]")
|
||||||
|
case reflect.Map:
|
||||||
|
// For maps, inspect the value type
|
||||||
|
fn(fieldType.Elem(), path+".key")
|
||||||
|
case reflect.Ptr, reflect.Interface:
|
||||||
|
// For pointers/interfaces, inspect the element if it's a pointer
|
||||||
|
if fieldType.Kind() == reflect.Ptr {
|
||||||
|
fn(fieldType.Elem(), path)
|
||||||
|
}
|
||||||
|
case reflect.Struct:
|
||||||
|
// For structs, directly recurse into their fields
|
||||||
|
fn(fieldType, path)
|
||||||
|
default:
|
||||||
|
// For basic or unknown kinds, do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recursive function to verify the fields of a given type.
|
||||||
|
var verifyTypeFields func(rt reflect.Type, path string)
|
||||||
|
verifyTypeFields = func(rt reflect.Type, path string) {
|
||||||
|
// Avoid cycles by skipping already visited types
|
||||||
|
if _, seen := visited[rt]; seen {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
visited[rt] = struct{}{}
|
||||||
|
|
||||||
|
switch rt.Kind() {
|
||||||
|
case reflect.Ptr, reflect.Interface:
|
||||||
|
// For pointers/interfaces, inspect the element type if available
|
||||||
|
if rt.Kind() == reflect.Ptr {
|
||||||
|
verifyTypeFields(rt.Elem(), path)
|
||||||
|
}
|
||||||
|
case reflect.Struct:
|
||||||
|
for i := 0; i < rt.NumField(); i++ {
|
||||||
|
ft := rt.Field(i)
|
||||||
|
jsonTag := ft.Tag.Get("json")
|
||||||
|
if jsonTag == "" || jsonTag == "-" {
|
||||||
|
// Ignore field names when there's no JSON tag,
|
||||||
|
// e.g. for Jobs.JobSettings
|
||||||
|
verifyFieldType(ft.Type, path, verifyTypeFields)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
fieldName := strings.Split(jsonTag, ",")[0]
|
||||||
|
fieldPath := path + "." + fieldName
|
||||||
|
|
||||||
|
if isCatalogOrSchemaField(fieldName) {
|
||||||
|
// Only check if the field is a string
|
||||||
|
if ft.Type.Kind() == reflect.String {
|
||||||
|
if _, recorded := recordedSet[fieldPath]; !recorded {
|
||||||
|
if _, ignored := PresetsIgnoredFields[fieldPath]; !ignored {
|
||||||
|
missingFields = append(missingFields, fieldPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
verifyFieldType(ft.Type, fieldPath, verifyTypeFields)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// For other kinds at this level, do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start from "resources"
|
||||||
|
verifyTypeFields(resourcesType, "resources")
|
||||||
|
|
||||||
|
// Report all missing fields
|
||||||
|
for _, field := range missingFields {
|
||||||
|
t.Errorf("Field %s was not included in the catalog/schema presets test. If this is a new field, please add it to PresetsMock or PresetsIgnoredFields and add support for it as appropriate.", field)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fail the test if there were any missing fields
|
||||||
|
if len(missingFields) > 0 {
|
||||||
|
t.FailNow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// isCatalogOrSchemaField returns true for a field names in config.Resources that we suspect could contain a catalog or schema name
|
||||||
|
func isCatalogOrSchemaField(name string) bool {
|
||||||
|
return strings.Contains(name, "catalog") || strings.Contains(name, "schema")
|
||||||
|
}
|
||||||
|
|
||||||
|
func removePlaceholders(value string) string {
|
||||||
|
value = strings.ReplaceAll(value, "<catalog>.", "")
|
||||||
|
value = strings.ReplaceAll(value, "<schema>.", "")
|
||||||
|
value = strings.ReplaceAll(value, "<catalog>", "")
|
||||||
|
value = strings.ReplaceAll(value, "<schema>", "")
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
|
||||||
|
func replacePlaceholders(placeholder, catalog, schema string) string {
|
||||||
|
expected := strings.ReplaceAll(placeholder, "<catalog>", catalog)
|
||||||
|
expected = strings.ReplaceAll(expected, "<schema>", schema)
|
||||||
|
return expected
|
||||||
|
}
|
||||||
|
|
|
@ -70,7 +70,6 @@ func Initialize() bundle.Mutator {
|
||||||
mutator.ConfigureDashboardDefaults(),
|
mutator.ConfigureDashboardDefaults(),
|
||||||
mutator.ProcessTargetMode(),
|
mutator.ProcessTargetMode(),
|
||||||
mutator.ApplyPresets(),
|
mutator.ApplyPresets(),
|
||||||
mutator.ApplyPresetsCatalogSchema(),
|
|
||||||
mutator.DefaultQueueing(),
|
mutator.DefaultQueueing(),
|
||||||
mutator.ExpandPipelineGlobPaths(),
|
mutator.ExpandPipelineGlobPaths(),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue