mirror of https://github.com/databricks/cli.git
WIP
This commit is contained in:
parent
6a7d31fb27
commit
7fb646a538
|
@ -51,7 +51,8 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
prefix := t.NamePrefix
|
prefix := t.NamePrefix
|
||||||
tags := toTagArray(t.Tags)
|
tags := toTagArray(t.Tags)
|
||||||
|
|
||||||
// Jobs presets: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus, Catalog, Schema
|
// Jobs presets.
|
||||||
|
// Supported: Prefix, Tags, JobsMaxConcurrentRuns, TriggerPauseStatus, Catalog, Schema
|
||||||
for key, j := range r.Jobs {
|
for key, j := range r.Jobs {
|
||||||
if j.JobSettings == nil {
|
if j.JobSettings == nil {
|
||||||
diags = diags.Extend(diag.Errorf("job %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("job %s is not defined", key))
|
||||||
|
@ -86,11 +87,23 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if t.Catalog != "" || t.Schema != "" {
|
if t.Catalog != "" || t.Schema != "" {
|
||||||
|
for _, task := range j.Tasks {
|
||||||
|
if task.DbtTask != nil {
|
||||||
|
if task.DbtTask.Catalog == "" {
|
||||||
|
task.DbtTask.Catalog = t.Catalog
|
||||||
|
}
|
||||||
|
if task.DbtTask.Schema == "" {
|
||||||
|
task.DbtTask.Schema = t.Catalog
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
diags = diags.Extend(validateJobUsesCatalogAndSchema(b, key, j))
|
diags = diags.Extend(validateJobUsesCatalogAndSchema(b, key, j))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pipelines presets: Prefix, PipelinesDevelopment, Catalog, Schema
|
// Pipelines presets.
|
||||||
|
// Supported: Prefix, PipelinesDevelopment, Catalog, Schema
|
||||||
|
// Not supported: Tags (as of 2024-10 not in pipelines API)
|
||||||
for key, p := range r.Pipelines {
|
for key, p := range r.Pipelines {
|
||||||
if p.PipelineSpec == nil {
|
if p.PipelineSpec == nil {
|
||||||
diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key))
|
||||||
|
@ -109,10 +122,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
if t.Schema != "" && p.Target == "" {
|
if t.Schema != "" && p.Target == "" {
|
||||||
p.Target = t.Schema
|
p.Target = t.Schema
|
||||||
}
|
}
|
||||||
// As of 2024-10, pipelines don't yet support tags
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Models presets: Prefix, Tags
|
// Models presets
|
||||||
|
// Supported: Prefix, Tags
|
||||||
for key, m := range r.Models {
|
for key, m := range r.Models {
|
||||||
if m.Model == nil {
|
if m.Model == nil {
|
||||||
diags = diags.Extend(diag.Errorf("model %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("model %s is not defined", key))
|
||||||
|
@ -130,7 +143,8 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Experiments presets: Prefix, Tags
|
// Experiments presets
|
||||||
|
// Supported: Prefix, Tags
|
||||||
for key, e := range r.Experiments {
|
for key, e := range r.Experiments {
|
||||||
if e.Experiment == nil {
|
if e.Experiment == nil {
|
||||||
diags = diags.Extend(diag.Errorf("experiment %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("experiment %s is not defined", key))
|
||||||
|
@ -158,7 +172,9 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Model serving endpoint presets: Prefix
|
// Model serving endpoint presets
|
||||||
|
// Supported: Prefix, Catalog, Schema
|
||||||
|
// Not supported: Tags (not in API as of 2024-10)
|
||||||
for key, e := range r.ModelServingEndpoints {
|
for key, e := range r.ModelServingEndpoints {
|
||||||
if e.CreateServingEndpoint == nil {
|
if e.CreateServingEndpoint == nil {
|
||||||
diags = diags.Extend(diag.Errorf("model serving endpoint %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("model serving endpoint %s is not defined", key))
|
||||||
|
@ -166,10 +182,19 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
}
|
}
|
||||||
e.Name = normalizePrefix(prefix) + e.Name
|
e.Name = normalizePrefix(prefix) + e.Name
|
||||||
|
|
||||||
// As of 2024-06, model serving endpoints don't yet support tags
|
TODO:
|
||||||
|
- e.AiGateway.InferenceTableConfig.CatalogName
|
||||||
|
- e.AiGateway.InferenceTableConfig.SchemaName
|
||||||
|
- e.Config.AutoCaptureConfig.SchemaName
|
||||||
|
- e.Config.AutoCaptureConfig.CatalogName
|
||||||
|
- e.Config.ServedEntities[0].EntityName (__catalog_name__.__schema_name__.__model_name__.)
|
||||||
|
- e.Config.ServedModels[0].ModelName (__catalog_name__.__schema_name__.__model_name__.)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Registered models presets: Prefix, Catalog, Schema
|
// Registered models presets
|
||||||
|
// Supported: Prefix, Catalog, Schema
|
||||||
|
// Not supported: Tags (not in API as of 2024-10)
|
||||||
for key, m := range r.RegisteredModels {
|
for key, m := range r.RegisteredModels {
|
||||||
if m.CreateRegisteredModelRequest == nil {
|
if m.CreateRegisteredModelRequest == nil {
|
||||||
diags = diags.Extend(diag.Errorf("registered model %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("registered model %s is not defined", key))
|
||||||
|
@ -182,10 +207,11 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
if t.Schema != "" && m.SchemaName == "" {
|
if t.Schema != "" && m.SchemaName == "" {
|
||||||
m.SchemaName = t.Schema
|
m.SchemaName = t.Schema
|
||||||
}
|
}
|
||||||
// As of 2024-06, registered models don't yet support tags
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Quality monitors presets: Schedule, Catalog, Schema
|
// Quality monitors presets
|
||||||
|
// Supported: Schedule, Catalog, Schema
|
||||||
|
// Not supported: Tags (not in API as of 2024-10)
|
||||||
if t.TriggerPauseStatus == config.Paused {
|
if t.TriggerPauseStatus == config.Paused {
|
||||||
for key, q := range r.QualityMonitors {
|
for key, q := range r.QualityMonitors {
|
||||||
if q.CreateMonitor == nil {
|
if q.CreateMonitor == nil {
|
||||||
|
@ -208,6 +234,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schemas: Prefix, Catalog, Schema
|
// Schemas: Prefix, Catalog, Schema
|
||||||
|
// Not supported: Tags (as of 2024-10, only supported in Databricks UI / SQL API)
|
||||||
for key, s := range r.Schemas {
|
for key, s := range r.Schemas {
|
||||||
if s.CreateSchema == nil {
|
if s.CreateSchema == nil {
|
||||||
diags = diags.Extend(diag.Errorf("schema %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("schema %s is not defined", key))
|
||||||
|
@ -222,11 +249,10 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
||||||
// use that name and don't add any prefix (which might result in dev_dev).
|
// use that name and don't add any prefix (which might result in dev_dev).
|
||||||
s.Name = t.Schema
|
s.Name = t.Schema
|
||||||
}
|
}
|
||||||
// HTTP API for schemas doesn't yet support tags. It's only supported in
|
|
||||||
// the Databricks UI and via the SQL API.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clusters: Prefix, Tags
|
// Clusters: Prefix, Tags
|
||||||
|
// Not supported: Catalog / Schema (not applicable)
|
||||||
for key, c := range r.Clusters {
|
for key, c := range r.Clusters {
|
||||||
if c.ClusterSpec == nil {
|
if c.ClusterSpec == nil {
|
||||||
diags = diags.Extend(diag.Errorf("cluster %s is not defined", key))
|
diags = diags.Extend(diag.Errorf("cluster %s is not defined", key))
|
||||||
|
@ -273,7 +299,10 @@ func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics {
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateJobUsesCatalogAndSchema(b *bundle.Bundle, key string, job *resources.Job) diag.Diagnostics {
|
func validateJobUsesCatalogAndSchema(b *bundle.Bundle, key string, job *resources.Job) diag.Diagnostics {
|
||||||
if !hasParameter(job.Parameters, "catalog") || !hasParameter(job.Parameters, "schema") {
|
if !hasTasksRequiringParameters(job) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !hasParameter(job, "catalog") || !hasParameter(job, "schema") {
|
||||||
return diag.Diagnostics{{
|
return diag.Diagnostics{{
|
||||||
Summary: fmt.Sprintf("job %s must pass catalog and schema presets as parameters as follows:\n"+
|
Summary: fmt.Sprintf("job %s must pass catalog and schema presets as parameters as follows:\n"+
|
||||||
" parameters:\n"+
|
" parameters:\n"+
|
||||||
|
@ -288,11 +317,30 @@ func validateJobUsesCatalogAndSchema(b *bundle.Bundle, key string, job *resource
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func hasParameter(parameters []jobs.JobParameterDefinition, name string) bool {
|
// hasTasksRequiringParameters determines if there is a task in this job that
|
||||||
if parameters == nil {
|
// requires the 'catalog' and 'schema' parameters when they are enabled in presets.
|
||||||
|
func hasTasksRequiringParameters(job *resources.Job) bool {
|
||||||
|
for _, task := range job.Tasks {
|
||||||
|
// Allowlisted task types: these don't require catalog / schema to be passed as a paramater
|
||||||
|
if task.DbtTask != nil || task.ConditionTask != nil || task.RunJobTask != nil || task.ForEachTask != nil || task.PipelineTask != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Alert tasks, query object tasks, etc. don't require a parameter;
|
||||||
|
// the catalog / schema is set inside those objects instead.
|
||||||
|
if task.SqlTask != nil && task.SqlTask.File == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for _, p := range parameters {
|
|
||||||
|
// hasParameter determines if a job has a parameter with the given name.
|
||||||
|
func hasParameter(job *resources.Job, name string) bool {
|
||||||
|
if job.Parameters == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, p := range job.Parameters {
|
||||||
if p.Name == name {
|
if p.Name == name {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue