mirror of https://github.com/databricks/cli.git
Compare commits
5 Commits
a08b59d4dd
...
70f54dca12
Author | SHA1 | Date |
---|---|---|
|
70f54dca12 | |
|
3e1e169225 | |
|
35bed3f549 | |
|
6b5948cef9 | |
|
7d1f8ad19b |
|
@ -122,11 +122,61 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
|||
if t.TriggerPauseStatus == config.Paused {
|
||||
p.Continuous = false
|
||||
}
|
||||
if t.Catalog != "" && p.Catalog == "" && p.Catalog != "hive_metastore" {
|
||||
p.Catalog = t.Catalog
|
||||
}
|
||||
if t.Schema != "" && p.Target == "" {
|
||||
p.Target = t.Schema
|
||||
if t.Catalog != "" && t.Schema != "" {
|
||||
if p.Catalog == "" {
|
||||
p.Catalog = t.Catalog
|
||||
}
|
||||
if p.Target == "" {
|
||||
p.Target = t.Schema
|
||||
}
|
||||
if p.GatewayDefinition != nil {
|
||||
if p.GatewayDefinition.GatewayStorageCatalog == "" {
|
||||
p.GatewayDefinition.GatewayStorageCatalog = t.Catalog
|
||||
}
|
||||
if p.GatewayDefinition.GatewayStorageSchema == "" {
|
||||
p.GatewayDefinition.GatewayStorageSchema = t.Schema
|
||||
}
|
||||
}
|
||||
if p.IngestionDefinition != nil {
|
||||
for _, obj := range p.IngestionDefinition.Objects {
|
||||
if obj.Report != nil {
|
||||
if obj.Report.DestinationCatalog == "" {
|
||||
obj.Report.DestinationCatalog = t.Catalog
|
||||
}
|
||||
if obj.Report.DestinationSchema == "" {
|
||||
obj.Report.DestinationSchema = t.Schema
|
||||
}
|
||||
}
|
||||
if obj.Schema != nil {
|
||||
if obj.Schema.SourceCatalog == "" {
|
||||
obj.Schema.SourceCatalog = t.Catalog
|
||||
}
|
||||
if obj.Schema.SourceSchema == "" {
|
||||
obj.Schema.SourceSchema = t.Schema
|
||||
}
|
||||
if obj.Schema.DestinationCatalog == "" {
|
||||
obj.Schema.DestinationCatalog = t.Catalog
|
||||
}
|
||||
if obj.Schema.DestinationSchema == "" {
|
||||
obj.Schema.DestinationSchema = t.Schema
|
||||
}
|
||||
}
|
||||
if obj.Table != nil {
|
||||
if obj.Table.SourceCatalog == "" {
|
||||
obj.Table.SourceCatalog = t.Catalog
|
||||
}
|
||||
if obj.Table.SourceSchema == "" {
|
||||
obj.Table.SourceSchema = t.Schema
|
||||
}
|
||||
if obj.Table.DestinationCatalog == "" {
|
||||
obj.Table.DestinationCatalog = t.Catalog
|
||||
}
|
||||
if obj.Table.DestinationSchema == "" {
|
||||
obj.Table.DestinationSchema = t.Schema
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -189,16 +239,38 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
|||
e.Name = normalizePrefix(prefix) + e.Name
|
||||
|
||||
if t.Catalog != "" || t.Schema != "" {
|
||||
// TODO:
|
||||
// - e.AiGateway.InferenceTableConfig.CatalogName
|
||||
// - e.AiGateway.InferenceTableConfig.SchemaName
|
||||
// - e.Config.AutoCaptureConfig.SchemaName
|
||||
// - e.Config.AutoCaptureConfig.CatalogName
|
||||
// - e.Config.ServedEntities[0].EntityName (__catalog_name__.__schema_name__.__model_name__.)
|
||||
// - e.Config.ServedModels[0].ModelName (__catalog_name__.__schema_name__.__model_name__.)
|
||||
diags = diags.Extend(diag.Errorf("model serving endpoints are not supported with catalog/schema presets"))
|
||||
}
|
||||
// Apply catalog & schema to inference table config if not set
|
||||
if e.CreateServingEndpoint.AiGateway != nil && e.CreateServingEndpoint.AiGateway.InferenceTableConfig != nil {
|
||||
if t.Catalog != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName == "" {
|
||||
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.CatalogName = t.Catalog
|
||||
}
|
||||
if t.Schema != "" && e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName == "" {
|
||||
e.CreateServingEndpoint.AiGateway.InferenceTableConfig.SchemaName = t.Schema
|
||||
}
|
||||
}
|
||||
|
||||
// Apply catalog & schema to auto capture config if not set
|
||||
if e.CreateServingEndpoint.Config.AutoCaptureConfig != nil {
|
||||
if t.Catalog != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName == "" {
|
||||
e.CreateServingEndpoint.Config.AutoCaptureConfig.CatalogName = t.Catalog
|
||||
}
|
||||
if t.Schema != "" && e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName == "" {
|
||||
e.CreateServingEndpoint.Config.AutoCaptureConfig.SchemaName = t.Schema
|
||||
}
|
||||
}
|
||||
|
||||
// Fully qualify served entities and models if they are not already qualified
|
||||
for i := range e.CreateServingEndpoint.Config.ServedEntities {
|
||||
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName = fullyQualifyName(
|
||||
e.CreateServingEndpoint.Config.ServedEntities[i].EntityName, t.Catalog, t.Schema,
|
||||
)
|
||||
}
|
||||
for i := range e.CreateServingEndpoint.Config.ServedModels {
|
||||
e.CreateServingEndpoint.Config.ServedModels[i].ModelName = fullyQualifyName(
|
||||
e.CreateServingEndpoint.Config.ServedModels[i].ModelName, t.Catalog, t.Schema,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Registered models presets
|
||||
|
@ -221,23 +293,23 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
|||
// Quality monitors presets
|
||||
// Supported: Schedule, Catalog, Schema
|
||||
// Not supported: Tags (not in API as of 2024-10)
|
||||
if t.TriggerPauseStatus == config.Paused {
|
||||
for key, q := range r.QualityMonitors {
|
||||
if q.CreateMonitor == nil {
|
||||
diags = diags.Extend(diag.Errorf("quality monitor %s is not defined", key))
|
||||
continue
|
||||
}
|
||||
// Remove all schedules from monitors, since they don't support pausing/unpausing.
|
||||
// Quality monitors might support the "pause" property in the future, so at the
|
||||
// CLI level we do respect that property if it is set to "unpaused."
|
||||
for key, q := range r.QualityMonitors {
|
||||
if q.CreateMonitor == nil {
|
||||
diags = diags.Extend(diag.Errorf("quality monitor %s is not defined", key))
|
||||
continue
|
||||
}
|
||||
// Remove all schedules from monitors, since they don't support pausing/unpausing.
|
||||
// Quality monitors might support the "pause" property in the future, so at the
|
||||
// CLI level we do respect that property if it is set to "unpaused."
|
||||
if t.TriggerPauseStatus == config.Paused {
|
||||
if q.Schedule != nil && q.Schedule.PauseStatus != catalog.MonitorCronSchedulePauseStatusUnpaused {
|
||||
q.Schedule = nil
|
||||
}
|
||||
if t.Catalog != "" && t.Schema != "" {
|
||||
parts := strings.Split(q.TableName, ".")
|
||||
if len(parts) != 3 {
|
||||
q.TableName = fmt.Sprintf("%s.%s.%s", t.Catalog, t.Schema, q.TableName)
|
||||
}
|
||||
}
|
||||
if t.Catalog != "" && t.Schema != "" {
|
||||
q.TableName = fullyQualifyName(q.TableName, t.Catalog, t.Schema)
|
||||
if q.OutputSchemaName == "" {
|
||||
q.OutputSchemaName = t.Catalog + "." + t.Schema
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -470,8 +542,7 @@ func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key stri
|
|||
|
||||
if !fileIncludesPattern(ctx, localPath, expected) {
|
||||
diags = diags.Extend(diag.Diagnostics{{
|
||||
Summary: fmt.Sprintf("Use the 'catalog' and 'schema' parameters provided via 'presets.catalog' and 'presets.schema' using\n\n" +
|
||||
fix),
|
||||
Summary: "Use the 'catalog' and 'schema' parameters provided via 'presets.catalog' and 'presets.schema' using\n\n" + fix,
|
||||
Severity: diag.Recommendation,
|
||||
Locations: []dyn.Location{{
|
||||
File: localPath,
|
||||
|
@ -483,6 +554,24 @@ func recommendCatalogSchemaUsage(b *bundle.Bundle, ctx context.Context, key stri
|
|||
}
|
||||
|
||||
return diags
|
||||
|
||||
}
|
||||
|
||||
// fullyQualifyName checks if the given name is already qualified with a catalog and schema.
|
||||
// If not, and both catalog and schema are available, it prefixes the name with catalog.schema.
|
||||
// If name is empty, returns name as-is.
|
||||
func fullyQualifyName(name, catalog, schema string) string {
|
||||
if name == "" || catalog == "" || schema == "" {
|
||||
return name
|
||||
}
|
||||
// If it's already qualified (contains at least two '.'), we assume it's fully qualified.
|
||||
parts := strings.Split(name, ".")
|
||||
if len(parts) >= 3 {
|
||||
// Already fully qualified
|
||||
return name
|
||||
}
|
||||
// Otherwise, fully qualify it
|
||||
return fmt.Sprintf("%s.%s.%s", catalog, schema, name)
|
||||
}
|
||||
|
||||
func fileIncludesPattern(ctx context.Context, filePath string, expected string) bool {
|
||||
|
|
|
@ -2,7 +2,7 @@ package mutator_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
|
@ -16,9 +16,19 @@ import (
|
|||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/databricks-sdk-go/service/catalog"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
||||
"github.com/databricks/databricks-sdk-go/service/serving"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type recordedField struct {
|
||||
Path dyn.Path
|
||||
PathString string
|
||||
Placeholder string
|
||||
Expected string
|
||||
}
|
||||
|
||||
func TestApplyPresetsPrefix(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -456,89 +466,334 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func PresetsMock() *bundle.Bundle {
|
||||
return &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"key": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Name: "job",
|
||||
Parameters: []jobs.JobParameterDefinition{
|
||||
{Name: "catalog", Default: "<catalog>"},
|
||||
{Name: "schema", Default: "<schema>"},
|
||||
},
|
||||
Tasks: []jobs.Task{
|
||||
{
|
||||
DbtTask: &jobs.DbtTask{
|
||||
Catalog: "<catalog>",
|
||||
Schema: "<schema>",
|
||||
},
|
||||
},
|
||||
{
|
||||
SparkPythonTask: &jobs.SparkPythonTask{
|
||||
PythonFile: "/file",
|
||||
},
|
||||
},
|
||||
{
|
||||
NotebookTask: &jobs.NotebookTask{
|
||||
NotebookPath: "/notebook",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Pipelines: map[string]*resources.Pipeline{
|
||||
"key": {
|
||||
PipelineSpec: &pipelines.PipelineSpec{
|
||||
Name: "pipeline",
|
||||
Catalog: "<catalog>",
|
||||
Target: "<schema>",
|
||||
GatewayDefinition: &pipelines.IngestionGatewayPipelineDefinition{
|
||||
GatewayStorageCatalog: "<catalog>",
|
||||
GatewayStorageSchema: "<schema>",
|
||||
},
|
||||
IngestionDefinition: &pipelines.IngestionPipelineDefinition{
|
||||
Objects: []pipelines.IngestionConfig{
|
||||
{
|
||||
Report: &pipelines.ReportSpec{
|
||||
DestinationCatalog: "<catalog>",
|
||||
DestinationSchema: "<schema>",
|
||||
},
|
||||
Schema: &pipelines.SchemaSpec{
|
||||
SourceCatalog: "<catalog>",
|
||||
SourceSchema: "<schema>",
|
||||
DestinationCatalog: "<catalog>",
|
||||
DestinationSchema: "<schema>",
|
||||
},
|
||||
Table: &pipelines.TableSpec{
|
||||
SourceCatalog: "<catalog>",
|
||||
SourceSchema: "<schema>",
|
||||
DestinationCatalog: "<catalog>",
|
||||
DestinationSchema: "<schema>",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ModelServingEndpoints: map[string]*resources.ModelServingEndpoint{
|
||||
"key": {
|
||||
CreateServingEndpoint: &serving.CreateServingEndpoint{
|
||||
Name: "serving",
|
||||
AiGateway: &serving.AiGatewayConfig{
|
||||
InferenceTableConfig: &serving.AiGatewayInferenceTableConfig{
|
||||
CatalogName: "<catalog>",
|
||||
SchemaName: "<schema>",
|
||||
},
|
||||
},
|
||||
Config: serving.EndpointCoreConfigInput{
|
||||
AutoCaptureConfig: &serving.AutoCaptureConfigInput{
|
||||
CatalogName: "<catalog>",
|
||||
SchemaName: "<schema>",
|
||||
},
|
||||
ServedEntities: []serving.ServedEntityInput{
|
||||
{EntityName: "<catalog>.<schema>.entity"},
|
||||
},
|
||||
ServedModels: []serving.ServedModelInput{
|
||||
{ModelName: "<catalog>.<schema>.model"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
RegisteredModels: map[string]*resources.RegisteredModel{
|
||||
"key": {
|
||||
CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{
|
||||
Name: "registered_model",
|
||||
CatalogName: "<catalog>",
|
||||
SchemaName: "<schema>",
|
||||
},
|
||||
},
|
||||
},
|
||||
QualityMonitors: map[string]*resources.QualityMonitor{
|
||||
"key": {
|
||||
TableName: "<catalog>.<schema>.table",
|
||||
CreateMonitor: &catalog.CreateMonitor{
|
||||
OutputSchemaName: "<catalog>.<schema>",
|
||||
},
|
||||
},
|
||||
},
|
||||
Schemas: map[string]*resources.Schema{
|
||||
"key": {
|
||||
CreateSchema: &catalog.CreateSchema{
|
||||
Name: "<schema>",
|
||||
CatalogName: "<catalog>",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
var PresetsIgnoredFields = map[string]string{
|
||||
// Any fields that should be ignored in the completeness check
|
||||
// Example:
|
||||
// "resources.jobs.object.schema_something": "this property doesn't relate to the catalog/schema",
|
||||
"resources.pipelines.key.schema": "schema is still in private preview",
|
||||
}
|
||||
|
||||
func TestApplyPresetsCatalogSchema(t *testing.T) {
|
||||
// Create a bundle in a known mode, e.g. development or production doesn't matter much here.
|
||||
b := mockBundle(config.Development)
|
||||
// Set the catalog and schema in presets.
|
||||
b.Config.Presets.Catalog = "my_catalog"
|
||||
b.Config.Presets.Schema = "my_schema"
|
||||
|
||||
b := PresetsMock()
|
||||
b.Config.Presets = config.Presets{
|
||||
Catalog: "my_catalog",
|
||||
Schema: "my_schema",
|
||||
}
|
||||
ctx := context.Background()
|
||||
diags := bundle.Apply(ctx, b, mutator.ApplyPresets())
|
||||
require.NoError(t, diags.Error())
|
||||
|
||||
// Verify that jobs got catalog/schema if they support it.
|
||||
// For DBT tasks in jobs:
|
||||
for _, job := range b.Config.Resources.Jobs {
|
||||
if job.JobSettings != nil && job.Tasks != nil {
|
||||
for _, task := range job.Tasks {
|
||||
if task.DbtTask != nil {
|
||||
require.Equal(t, "my_catalog", task.DbtTask.Catalog, "dbt catalog should be set")
|
||||
require.Equal(t, "my_schema", task.DbtTask.Schema, "dbt schema should be set")
|
||||
// Initial scan: record all fields that contain placeholders.
|
||||
// We do this before the first apply so we can verify no changes occur.
|
||||
var recordedFields []recordedField
|
||||
require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) {
|
||||
_, err := dyn.Walk(b.Config.Value(), func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
||||
if v.Kind() == dyn.KindString {
|
||||
val := v.MustString()
|
||||
if strings.Contains(val, "<catalog>") || strings.Contains(val, "<schema>") {
|
||||
pathCopy := make(dyn.Path, len(p))
|
||||
copy(pathCopy, p)
|
||||
recordedFields = append(recordedFields, recordedField{
|
||||
Path: pathCopy,
|
||||
PathString: pathCopy.String(),
|
||||
Placeholder: val,
|
||||
Expected: replacePlaceholders(val, "my_catalog", "my_schema"),
|
||||
})
|
||||
}
|
||||
}
|
||||
return v, nil
|
||||
})
|
||||
return root, err
|
||||
}))
|
||||
|
||||
// Stage 1: Apply presets before cleanup, should be no-op.
|
||||
diags := bundle.Apply(ctx, b, mutator.ApplyPresets())
|
||||
require.False(t, diags.HasError(), "unexpected error before cleanup: %v", diags.Error())
|
||||
|
||||
// Verify that no recorded fields changed
|
||||
verifyNoChangesBeforeCleanup(t, b.Config.Value(), recordedFields)
|
||||
|
||||
// Stage 2: Cleanup: Walk over rootVal and remove placeholders, adjusting recordedFields Expected values.
|
||||
require.NoError(t, b.Config.Mutate(func(root dyn.Value) (dyn.Value, error) {
|
||||
for _, f := range recordedFields {
|
||||
value, err := dyn.GetByPath(root, f.Path)
|
||||
require.NoError(t, err)
|
||||
|
||||
val := value.MustString()
|
||||
cleanedVal := removePlaceholders(val)
|
||||
root, err = dyn.SetByPath(root, f.Path, dyn.V(cleanedVal))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
root, err := dyn.Set(root, "resources.jobs.key.parameters", dyn.NilValue)
|
||||
require.NoError(t, err)
|
||||
return root, nil
|
||||
}))
|
||||
|
||||
// Stage 3: Apply presets after cleanup.
|
||||
diags = bundle.Apply(ctx, b, mutator.ApplyPresets())
|
||||
require.False(t, diags.HasError(), "unexpected error after cleanup: %v", diags.Error())
|
||||
|
||||
// Verify that fields have the expected replacements
|
||||
config := b.Config.Value()
|
||||
for _, f := range recordedFields {
|
||||
val, err := dyn.GetByPath(config, f.Path)
|
||||
require.NoError(t, err, "failed to get path %s", f.Path)
|
||||
assert.Equal(t, f.Expected, val.MustString(), "preset value expected for %s based on placeholder %s", f.Path, f.Placeholder)
|
||||
}
|
||||
|
||||
// Pipelines: Catalog/Schema
|
||||
for _, p := range b.Config.Resources.Pipelines {
|
||||
if p.PipelineSpec != nil {
|
||||
// pipeline catalog and schema
|
||||
if p.Catalog == "" || p.Catalog == "hive_metastore" {
|
||||
require.Equal(t, "my_catalog", p.Catalog, "pipeline catalog should be set")
|
||||
}
|
||||
require.Equal(t, "my_schema", p.Target, "pipeline schema (target) should be set")
|
||||
}
|
||||
}
|
||||
|
||||
// Registered models: Catalog/Schema
|
||||
for _, rm := range b.Config.Resources.RegisteredModels {
|
||||
if rm.CreateRegisteredModelRequest != nil {
|
||||
require.Equal(t, "my_catalog", rm.CatalogName, "registered model catalog should be set")
|
||||
require.Equal(t, "my_schema", rm.SchemaName, "registered model schema should be set")
|
||||
}
|
||||
}
|
||||
|
||||
// Quality monitors: If paused, we rewrite tableName to include catalog.schema.
|
||||
// In our code, if paused, we prepend catalog/schema if tableName wasn't already fully qualified.
|
||||
// Let's verify that:
|
||||
for _, qm := range b.Config.Resources.QualityMonitors {
|
||||
// If not fully qualified (3 parts), it should have been rewritten.
|
||||
parts := strings.Split(qm.TableName, ".")
|
||||
if len(parts) != 3 {
|
||||
require.Equal(t, fmt.Sprintf("my_catalog.my_schema.%s", parts[0]), qm.TableName, "quality monitor tableName should include catalog and schema")
|
||||
}
|
||||
}
|
||||
|
||||
// Schemas: If there's a schema preset, we might replace the schema name or catalog name.
|
||||
for _, s := range b.Config.Resources.Schemas {
|
||||
if s.CreateSchema != nil {
|
||||
// If catalog was empty before, now should be set:
|
||||
require.Equal(t, "my_catalog", s.CatalogName, "schema catalog should be set")
|
||||
// If schema was empty before, it should be set, but we did have "schema1",
|
||||
// so let's verify that if schema had a name, prefix logic may apply:
|
||||
// The code attempts to handle schema naming carefully. If t.Schema != "" and s.Name == "",
|
||||
// s.Name is set to t.Schema. Since s.Name was originally "schema1", it should remain "schema1" with prefix applied.
|
||||
// If you want to verify behavior, do so explicitly if changed code logic.
|
||||
}
|
||||
}
|
||||
|
||||
// Model serving endpoints currently return a warning that they don't support catalog/schema presets.
|
||||
// We can just verify that the warning is generated or that no fields were set since they are not supported.
|
||||
// The ApplyPresets code emits a diag error if we attempt to use catalog/schema with model serving endpoints.
|
||||
// Let's check that we got an error diagnostic:
|
||||
// The code currently returns a diag error if model serving endpoints are present and catalog/schema are set.
|
||||
// So we verify diags here:
|
||||
foundEndpointError := false
|
||||
for _, d := range diags {
|
||||
if strings.Contains(d.Summary, "model serving endpoints are not supported with catalog/schema presets") {
|
||||
foundEndpointError = true
|
||||
break
|
||||
}
|
||||
}
|
||||
require.True(t, foundEndpointError, "should have diag error for model serving endpoints")
|
||||
|
||||
// Add assertions for any other resources that support catalog/schema if needed.
|
||||
// This list is maintained manually. If you add new resource types that support catalog/schema,
|
||||
// add them here as well.
|
||||
// Stage 4: Check completeness
|
||||
checkCompleteness(t, recordedFields)
|
||||
}
|
||||
|
||||
func verifyNoChangesBeforeCleanup(t *testing.T, rootVal dyn.Value, recordedFields []recordedField) {
|
||||
t.Helper()
|
||||
|
||||
for _, f := range recordedFields {
|
||||
val, err := dyn.GetByPath(rootVal, f.Path)
|
||||
require.NoError(t, err, "failed to get path %s", f.Path)
|
||||
require.Equal(t, f.Placeholder, val.MustString(),
|
||||
"expected placeholder '%s' at %s to remain unchanged before cleanup", f.Placeholder, f.Path)
|
||||
}
|
||||
}
|
||||
|
||||
func checkCompleteness(t *testing.T, recordedFields []recordedField) {
|
||||
t.Helper()
|
||||
|
||||
// Build a set for recorded fields
|
||||
recordedSet := make(map[string]struct{})
|
||||
for _, field := range recordedFields {
|
||||
recordedSet[field.PathString] = struct{}{}
|
||||
}
|
||||
|
||||
// Obtain the type of config.Resources
|
||||
var r config.Resources
|
||||
resourcesType := reflect.TypeOf(r)
|
||||
|
||||
// Track missing fields
|
||||
var missingFields []string
|
||||
|
||||
// Keep track of visited types to prevent infinite loops (cycles)
|
||||
visited := make(map[reflect.Type]struct{})
|
||||
|
||||
// Helper function to handle maps, slices, arrays, and nested pointers/interfaces
|
||||
verifyFieldType := func(fieldType reflect.Type, path string, fn func(reflect.Type, string)) {
|
||||
switch fieldType.Kind() {
|
||||
case reflect.Slice, reflect.Array:
|
||||
// For arrays/slices, inspect the element type
|
||||
fn(fieldType.Elem(), path+"[0]")
|
||||
case reflect.Map:
|
||||
// For maps, inspect the value type
|
||||
fn(fieldType.Elem(), path+".key")
|
||||
case reflect.Ptr, reflect.Interface:
|
||||
// For pointers/interfaces, inspect the element if it's a pointer
|
||||
if fieldType.Kind() == reflect.Ptr {
|
||||
fn(fieldType.Elem(), path)
|
||||
}
|
||||
case reflect.Struct:
|
||||
// For structs, directly recurse into their fields
|
||||
fn(fieldType, path)
|
||||
default:
|
||||
// For basic or unknown kinds, do nothing
|
||||
}
|
||||
}
|
||||
|
||||
// Recursive function to verify the fields of a given type.
|
||||
var verifyTypeFields func(rt reflect.Type, path string)
|
||||
verifyTypeFields = func(rt reflect.Type, path string) {
|
||||
// Avoid cycles by skipping already visited types
|
||||
if _, seen := visited[rt]; seen {
|
||||
return
|
||||
}
|
||||
visited[rt] = struct{}{}
|
||||
|
||||
switch rt.Kind() {
|
||||
case reflect.Ptr, reflect.Interface:
|
||||
// For pointers/interfaces, inspect the element type if available
|
||||
if rt.Kind() == reflect.Ptr {
|
||||
verifyTypeFields(rt.Elem(), path)
|
||||
}
|
||||
case reflect.Struct:
|
||||
for i := 0; i < rt.NumField(); i++ {
|
||||
ft := rt.Field(i)
|
||||
jsonTag := ft.Tag.Get("json")
|
||||
if jsonTag == "" || jsonTag == "-" {
|
||||
// Ignore field names when there's no JSON tag,
|
||||
// e.g. for Jobs.JobSettings
|
||||
verifyFieldType(ft.Type, path, verifyTypeFields)
|
||||
continue
|
||||
}
|
||||
|
||||
fieldName := strings.Split(jsonTag, ",")[0]
|
||||
fieldPath := path + "." + fieldName
|
||||
|
||||
if isCatalogOrSchemaField(fieldName) {
|
||||
// Only check if the field is a string
|
||||
if ft.Type.Kind() == reflect.String {
|
||||
if _, recorded := recordedSet[fieldPath]; !recorded {
|
||||
if _, ignored := PresetsIgnoredFields[fieldPath]; !ignored {
|
||||
missingFields = append(missingFields, fieldPath)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
verifyFieldType(ft.Type, fieldPath, verifyTypeFields)
|
||||
}
|
||||
default:
|
||||
// For other kinds at this level, do nothing
|
||||
}
|
||||
}
|
||||
|
||||
// Start from "resources"
|
||||
verifyTypeFields(resourcesType, "resources")
|
||||
|
||||
// Report all missing fields
|
||||
for _, field := range missingFields {
|
||||
t.Errorf("Field %s was not included in the catalog/schema presets test. If this is a new field, please add it to PresetsMock or PresetsIgnoredFields and add support for it as appropriate.", field)
|
||||
}
|
||||
|
||||
// Fail the test if there were any missing fields
|
||||
if len(missingFields) > 0 {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
// isCatalogOrSchemaField returns true for a field names in config.Resources that we suspect could contain a catalog or schema name
|
||||
func isCatalogOrSchemaField(name string) bool {
|
||||
return strings.Contains(name, "catalog") || strings.Contains(name, "schema")
|
||||
}
|
||||
|
||||
func removePlaceholders(value string) string {
|
||||
value = strings.ReplaceAll(value, "<catalog>.", "")
|
||||
value = strings.ReplaceAll(value, "<schema>.", "")
|
||||
value = strings.ReplaceAll(value, "<catalog>", "")
|
||||
value = strings.ReplaceAll(value, "<schema>", "")
|
||||
return value
|
||||
}
|
||||
|
||||
func replacePlaceholders(placeholder, catalog, schema string) string {
|
||||
expected := strings.ReplaceAll(placeholder, "<catalog>", catalog)
|
||||
expected = strings.ReplaceAll(expected, "<schema>", schema)
|
||||
return expected
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue