This commit is contained in:
Lennart Kats 2024-12-20 09:28:38 +01:00
parent feb23ddefb
commit 65f10d187d
No known key found for this signature in database
GPG Key ID: 1EB8B57673197023
1 changed files with 101 additions and 116 deletions

View File

@ -28,20 +28,27 @@ func (m *applyPresetsCatalogSchema) Name() string {
}
func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
diags := validateCatalogAndSchema(b)
if diags.HasError() {
diags := diag.Diagnostics{}
p := b.Config.Presets
r := b.Config.Resources
if p.Catalog == "" && p.Schema == "" {
return diags
}
r := b.Config.Resources
p := b.Config.Presets
if (p.Schema == "") || (p.Catalog == "" && p.Schema != "") {
return diag.Diagnostics{{
Summary: "presets.catalog and presets.schema must always be set together",
Severity: diag.Error,
Locations: []dyn.Location{b.Config.GetLocation("presets")},
}}
}
// Jobs
for key, j := range r.Jobs {
if j.JobSettings == nil {
continue
}
if p.Catalog != "" || p.Schema != "" {
for _, task := range j.Tasks {
if task.DbtTask != nil {
if task.DbtTask.Catalog == "" {
@ -56,20 +63,15 @@ func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle)
diags = diags.Extend(addCatalogSchemaParameters(b, key, j, p))
diags = diags.Extend(recommendCatalogSchemaUsage(b, ctx, key, j))
}
}
// Pipelines
for _, pl := range r.Pipelines {
if pl.PipelineSpec == nil {
continue
}
if p.Catalog != "" && p.Schema != "" {
if pl.Catalog == "" {
pl.Catalog = p.Catalog
}
if pl.Target == "" {
pl.Target = p.Schema
}
if pl.GatewayDefinition != nil {
if pl.GatewayDefinition.GatewayStorageCatalog == "" {
pl.GatewayDefinition.GatewayStorageCatalog = p.Catalog
@ -119,7 +121,6 @@ func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle)
}
}
}
}
// Model serving endpoints
for _, e := range r.ModelServingEndpoints {
@ -127,21 +128,20 @@ func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle)
continue
}
if p.Catalog != "" || p.Schema != "" {
if e.CreateServingEndpoint.AiGateway != nil && e.CreateServingEndpoint.AiGateway.InferenceTableConfig != nil {
if p.Catalog != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName == "" {
if e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName == "" {
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName = p.Catalog
}
if p.Schema != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName == "" {
if e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName == "" {
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName = p.Schema
}
}
if e.CreateServingEndpoint.Config.AutoCaptureConfig != nil {
if p.Catalog != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName == "" {
if e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName == "" {
e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName = p.Catalog
}
if p.Schema != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName == "" {
if e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName == "" {
e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName = p.Schema
}
}
@ -157,17 +157,16 @@ func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle)
)
}
}
}
// Registered models
for _, m := range r.RegisteredModels {
if m.CreateRegisteredModelRequest == nil {
continue
}
if p.Catalog != "" && m.CatalogName == "" {
if m.CatalogName == "" {
m.CatalogName = p.Catalog
}
if p.Schema != "" && m.SchemaName == "" {
if m.SchemaName == "" {
m.SchemaName = p.Schema
}
}
@ -177,23 +176,21 @@ func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle)
if q.CreateMonitor == nil {
continue
}
if p.Catalog != "" && p.Schema != "" {
q.TableName = fullyQualifyName(q.TableName, p.Catalog, p.Schema)
if q.OutputSchemaName == "" {
q.OutputSchemaName = p.Catalog + "." + p.Schema
}
}
}
// Schemas
for _, s := range r.Schemas {
if s.CreateSchema == nil {
continue
}
if p.Catalog != "" && s.CatalogName == "" {
if s.CatalogName == "" {
s.CatalogName = p.Catalog
}
if p.Schema != "" && s.Name == "" {
if s.Name == "" {
// If there is a schema preset such as 'dev', we directly
// use that name and don't add any prefix (which might result in dev_dev).
s.Name = p.Schema
@ -203,21 +200,9 @@ func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle)
return diags
}
func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics {
p := b.Config.Presets
if (p.Catalog != "" && p.Schema == "") || (p.Catalog == "" && p.Schema != "") {
return diag.Diagnostics{{
Summary: "presets.catalog and presets.schema must always be set together",
Severity: diag.Error,
Locations: []dyn.Location{b.Config.GetLocation("presets")},
}}
}
return diag.Diagnostics{}
}
// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist.
// Returns any warning diagnostics for existing parameters.
func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, t config.Presets) diag.Diagnostics {
func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, p config.Presets) diag.Diagnostics {
var diags diag.Diagnostics
// Check for existing catalog/schema parameters
@ -250,18 +235,18 @@ func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job
}
// Add catalog parameter if not already present
if !hasCatalog && t.Catalog != "" {
if !hasCatalog {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "catalog",
Default: t.Catalog,
Default: p.Catalog,
})
}
// Add schema parameter if not already present
if !hasSchema && t.Schema != "" {
if !hasSchema {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "schema",
Default: t.Schema,
Default: p.Schema,
})
}