This commit is contained in:
Lennart Kats 2024-12-20 20:38:55 +01:00
parent fbd476093f
commit f3923da6a2
No known key found for this signature in database
GPG Key ID: 1EB8B57673197023
2 changed files with 12 additions and 21 deletions

View File

@ -22,7 +22,8 @@ type applyPresets struct{}
// Apply all presets, e.g. the prefix presets that // Apply all presets, e.g. the prefix presets that
// adds a prefix to all names of all resources. // adds a prefix to all names of all resources.
// Note the catalog/schema presets are applied in ApplyPresetsCatalogSchema. //
// Note that the catalog/schema presets are applied in ApplyPresetsCatalogSchema.
func ApplyPresets() *applyPresets { func ApplyPresets() *applyPresets {
return &applyPresets{} return &applyPresets{}
} }

View File

@ -35,7 +35,7 @@ func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle)
if p.Catalog == "" && p.Schema == "" { if p.Catalog == "" && p.Schema == "" {
return diags return diags
} }
if (p.Schema == "") || (p.Catalog == "" && p.Schema != "") { if (p.Schema == "" && p.Catalog != "") || (p.Catalog == "" && p.Schema != "") {
return diag.Diagnostics{{ return diag.Diagnostics{{
Summary: "presets.catalog and presets.schema must always be set together", Summary: "presets.catalog and presets.schema must always be set together",
Severity: diag.Error, Severity: diag.Error,
@ -164,12 +164,12 @@ func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle)
for i := range e.CreateServingEndpoint.Config.ServedEntities { for i := range e.CreateServingEndpoint.Config.ServedEntities {
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName = fullyQualifyName( e.CreateServingEndpoint.Config.ServedEntities[i].EntityName = fullyQualifyName(
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName, p.Catalog, p.Schema, e.CreateServingEndpoint.Config.ServedEntities[i].EntityName, p,
) )
} }
for i := range e.CreateServingEndpoint.Config.ServedModels { for i := range e.CreateServingEndpoint.Config.ServedModels {
e.CreateServingEndpoint.Config.ServedModels[i].ModelName = fullyQualifyName( e.CreateServingEndpoint.Config.ServedModels[i].ModelName = fullyQualifyName(
e.CreateServingEndpoint.Config.ServedModels[i].ModelName, p.Catalog, p.Schema, e.CreateServingEndpoint.Config.ServedModels[i].ModelName, p,
) )
} }
} }
@ -192,7 +192,7 @@ func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle)
if q.CreateMonitor == nil { if q.CreateMonitor == nil {
continue continue
} }
q.TableName = fullyQualifyName(q.TableName, p.Catalog, p.Schema) q.TableName = fullyQualifyName(q.TableName, p)
if q.OutputSchemaName == "" { if q.OutputSchemaName == "" {
q.OutputSchemaName = p.Catalog + "." + p.Schema q.OutputSchemaName = p.Catalog + "." + p.Schema
} }
@ -207,8 +207,6 @@ func (m *applyPresetsCatalogSchema) Apply(ctx context.Context, b *bundle.Bundle)
s.CatalogName = p.Catalog s.CatalogName = p.Catalog
} }
if s.Name == "" { if s.Name == "" {
// If there is a schema preset such as 'dev', we directly
// use that name and don't add any prefix (which might result in dev_dev).
s.Name = p.Schema s.Name = p.Schema
} }
} }
@ -239,26 +237,19 @@ func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job
diags = diags.Extend(diag.Diagnostics{{ diags = diags.Extend(diag.Diagnostics{{
Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key), Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key),
Severity: diag.Warning, Severity: diag.Warning,
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)}, Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key + ".parameters")},
}}) }})
} }
} }
} }
// Initialize parameters if nil // Add catalog/schema parameters
if job.Parameters == nil {
job.Parameters = []jobs.JobParameterDefinition{}
}
// Add catalog parameter if not already present
if !hasCatalog { if !hasCatalog {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{ job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "catalog", Name: "catalog",
Default: p.Catalog, Default: p.Catalog,
}) })
} }
// Add schema parameter if not already present
if !hasSchema { if !hasSchema {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{ job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "schema", Name: "schema",
@ -329,14 +320,13 @@ func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key stri
} }
return diags return diags
} }
// fullyQualifyName checks if the given name is already qualified with a catalog and schema. // fullyQualifyName checks if the given name is already qualified with a catalog and schema.
// If not, and both catalog and schema are available, it prefixes the name with catalog.schema. // If not, and both catalog and schema are available, it prefixes the name with catalog.schema.
// If name is empty, returns name as-is. // If name is empty, returns name as-is.
func fullyQualifyName(name, catalog, schema string) string { func fullyQualifyName(name string, p config.Presets) string {
if name == "" || catalog == "" || schema == "" { if name == "" || p.Catalog == "" || p.Schema == "" {
return name return name
} }
// If it's already qualified (contains at least two '.'), we assume it's fully qualified. // If it's already qualified (contains at least two '.'), we assume it's fully qualified.
@ -346,10 +336,10 @@ func fullyQualifyName(name, catalog, schema string) string {
return name return name
} }
// Otherwise, fully qualify it // Otherwise, fully qualify it
return fmt.Sprintf("%s.%s.%s", catalog, schema, name) return fmt.Sprintf("%s.%s.%s", p.Catalog, p.Schema, name)
} }
func fileIncludesPattern(ctx context.Context, filePath string, expected string) bool { func fileIncludesPattern(ctx context.Context, filePath, expected string) bool {
content, err := os.ReadFile(filePath) content, err := os.ReadFile(filePath)
if err != nil { if err != nil {
log.Warnf(ctx, "failed to check file %s: %v", filePath, err) log.Warnf(ctx, "failed to check file %s: %v", filePath, err)