From 7fb646a5388ffe4e61ba16f38f145939c6820442 Mon Sep 17 00:00:00 2001 From: Lennart Kats Date: Tue, 22 Oct 2024 09:39:57 +0200 Subject: [PATCH] WIP --- bundle/config/mutator/apply_presets.go | 80 ++++++++++++++++++++------ 1 file changed, 64 insertions(+), 16 deletions(-) diff --git a/bundle/config/mutator/apply_presets.go b/bundle/config/mutator/apply_presets.go index 8e3a0baf3..9d265cc99 100644 --- a/bundle/config/mutator/apply_presets.go +++ b/bundle/config/mutator/apply_presets.go @@ -51,7 +51,8 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos prefix := t.NamePrefix tags := toTagArray(t.Tags) - // Jobs presets: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus, Catalog, Schema + // Jobs presets. + // Supported: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus, Catalog, Schema for key, j := range r.Jobs { if j.JobSettings == nil { diags = diags.Extend(diag.Errorf("job %s is not defined", key)) @@ -86,11 +87,23 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos } } if t.Catalog != "" || t.Schema != "" { + for _, task := range j.Tasks { + if task.DbtTask != nil { + if task.DbtTask.Catalog == "" { + task.DbtTask.Catalog = t.Catalog + } + if task.DbtTask.Schema == "" { + task.DbtTask.Schema = t.Catalog + } + } + } diags = diags.Extend(validateJobUsesCatalogAndSchema(b, key, j)) } } - // Pipelines presets: Prefix, PipelinesDevelopment, Catalog, Schema + // Pipelines presets. + // Supported: Prefix, PipelinesDevelopment, Catalog, Schema + // Not supported: Tags (as of 2024-10 not in pipelines API) for key, p := range r.Pipelines { if p.PipelineSpec == nil { diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key)) @@ -109,10 +122,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos if t.Schema != "" && p.Target == "" { p.Target = t.Schema } - // As of 2024-10, pipelines don't yet support tags } - // Models presets: Prefix, Tags + // Models presets + // Supported: Prefix, Tags for key, m := range r.Models { if m.Model == nil { diags = diags.Extend(diag.Errorf("model %s is not defined", key)) @@ -130,7 +143,8 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos } } - // Experiments presets: Prefix, Tags + // Experiments presets + // Supported: Prefix, Tags for key, e := range r.Experiments { if e.Experiment == nil { diags = diags.Extend(diag.Errorf("experiment %s is not defined", key)) @@ -158,7 +172,9 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos } } - // Model serving endpoint presets: Prefix + // Model serving endpoint presets + // Supported: Prefix, Catalog, Schema + // Not supported: Tags (not in API as of 2024-10) for key, e := range r.ModelServingEndpoints { if e.CreateServingEndpoint == nil { diags = diags.Extend(diag.Errorf("model serving endpoint %s is not defined", key)) @@ -166,10 +182,19 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos } e.Name = normalizePrefix(prefix) + e.Name - // As of 2024-06, model serving endpoints don't yet support tags + TODO: + - e.AiGateway.InferenceTableConfig.CatalogName + - e.AiGateway.InferenceTableConfig.SchemaName + - e.Config.AutoCaptureConfig.SchemaName + - e.Config.AutoCaptureConfig.CatalogName + - e.Config.ServedEntities[0].EntityName (__catalog_name__.__schema_name__.__model_name__.) + - e.Config.ServedModels[0].ModelName (__catalog_name__.__schema_name__.__model_name__.) + } - // Registered models presets: Prefix, Catalog, Schema + // Registered models presets + // Supported: Prefix, Catalog, Schema + // Not supported: Tags (not in API as of 2024-10) for key, m := range r.RegisteredModels { if m.CreateRegisteredModelRequest == nil { diags = diags.Extend(diag.Errorf("registered model %s is not defined", key)) @@ -182,10 +207,11 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos if t.Schema != "" && m.SchemaName == "" { m.SchemaName = t.Schema } - // As of 2024-06, registered models don't yet support tags } - // Quality monitors presets: Schedule, Catalog, Schema + // Quality monitors presets + // Supported: Schedule, Catalog, Schema + // Not supported: Tags (not in API as of 2024-10) if t.TriggerPauseStatus == config.Paused { for key, q := range r.QualityMonitors { if q.CreateMonitor == nil { @@ -208,6 +234,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos } // Schemas: Prefix, Catalog, Schema + // Not supported: Tags (as of 2024-10, only supported in Databricks UI / SQL API) for key, s := range r.Schemas { if s.CreateSchema == nil { diags = diags.Extend(diag.Errorf("schema %s is not defined", key)) @@ -222,11 +249,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos // use that name and don't add any prefix (which might result in dev_dev). s.Name = t.Schema } - // HTTP API for schemas doesn't yet support tags. It's only supported in - // the Databricks UI and via the SQL API. } // Clusters: Prefix, Tags + // Not supported: Catalog / Schema (not applicable) for key, c := range r.Clusters { if c.ClusterSpec == nil { diags = diags.Extend(diag.Errorf("cluster %s is not defined", key)) @@ -273,7 +299,10 @@ func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics { } func validateJobUsesCatalogAndSchema(b *bundle.Bundle, key string, job *resources.Job) diag.Diagnostics { - if !hasParameter(job.Parameters, "catalog") || !hasParameter(job.Parameters, "schema") { + if !hasTasksRequiringParameters(job) { + return nil + } + if !hasParameter(job, "catalog") || !hasParameter(job, "schema") { return diag.Diagnostics{{ Summary: fmt.Sprintf("job %s must pass catalog and schema presets as parameters as follows:\n"+ " parameters:\n"+ @@ -288,11 +317,30 @@ func validateJobUsesCatalogAndSchema(b *bundle.Bundle, key string, job *resource return nil } -func hasParameter(parameters []jobs.JobParameterDefinition, name string) bool { - if parameters == nil { +// hasTasksRequiringParameters determines if there is a task in this job that +// requires the 'catalog' and 'schema' parameters when they are enabled in presets. +func hasTasksRequiringParameters(job *resources.Job) bool { + for _, task := range job.Tasks { + // Allowlisted task types: these don't require catalog / schema to be passed as a paramater + if task.DbtTask != nil || task.ConditionTask != nil || task.RunJobTask != nil || task.ForEachTask != nil || task.PipelineTask != nil { + continue + } + // Alert tasks, query object tasks, etc. don't require a parameter; + // the catalog / schema is set inside those objects instead. + if task.SqlTask != nil && task.SqlTask.File == nil { + continue + } + return true + } + return false +} + +// hasParameter determines if a job has a parameter with the given name. +func hasParameter(job *resources.Job, name string) bool { + if job.Parameters == nil { return false } - for _, p := range parameters { + for _, p := range job.Parameters { if p.Name == name { return true }