Add experimental test

This commit is contained in:
Lennart Kats 2024-12-14 20:39:22 +01:00
parent 6b5948cef9
commit 35bed3f549
No known key found for this signature in database
GPG Key ID: 1EB8B57673197023
1 changed files with 360 additions and 75 deletions

View File

@ -3,7 +3,9 @@ package mutator_test
import (
"context"
"fmt"
"reflect"
"runtime"
"strconv"
"strings"
"testing"
@ -15,10 +17,20 @@ import (
"github.com/databricks/cli/libs/dbr"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/dashboards"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/databricks/databricks-sdk-go/service/serving"
"github.com/stretchr/testify/require"
)
type RecordedField struct {
Path string
Value string
}
func TestApplyPresetsPrefix(t *testing.T) {
tests := []struct {
name string
@ -457,88 +469,361 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) {
}
func TestApplyPresetsCatalogSchema(t *testing.T) {
// Create a bundle in a known mode, e.g. development or production doesn't matter much here.
b := mockBundle(config.Development)
// Set the catalog and schema in presets.
b.Config.Presets.Catalog = "my_catalog"
b.Config.Presets.Schema = "my_schema"
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"object": {
JobSettings: &jobs.JobSettings{
Name: "job",
Parameters: []jobs.JobParameterDefinition{
{Name: "catalog", Default: "<catalog>"},
{Name: "schema", Default: "<schema>"},
},
},
},
},
Pipelines: map[string]*resources.Pipeline{
"object": {
PipelineSpec: &pipelines.PipelineSpec{
Name: "pipeline",
Catalog: "<catalog>",
Target: "<schema>",
},
},
},
ModelServingEndpoints: map[string]*resources.ModelServingEndpoint{
"object": {
CreateServingEndpoint: &serving.CreateServingEndpoint{
Name: "serving",
AiGateway: &serving.AiGatewayConfig{
InferenceTableConfig: &serving.AiGatewayInferenceTableConfig{
CatalogName: "<catalog>",
SchemaName: "<schema>",
},
},
Config: serving.EndpointCoreConfigInput{
AutoCaptureConfig: &serving.AutoCaptureConfigInput{
CatalogName: "<catalog>",
SchemaName: "<schema>",
},
ServedEntities: []serving.ServedEntityInput{
{EntityName: "<catalog>.<schema>.entity"},
},
ServedModels: []serving.ServedModelInput{
{ModelName: "<catalog>.<schema>.model"},
},
},
},
},
},
RegisteredModels: map[string]*resources.RegisteredModel{
"object": {
CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{
Name: "registered_model",
CatalogName: "<catalog>",
SchemaName: "<schema>",
},
},
},
QualityMonitors: map[string]*resources.QualityMonitor{
"object": {
TableName: "table",
CreateMonitor: &catalog.CreateMonitor{
OutputSchemaName: "<catalog>.<schema>",
},
},
},
Schemas: map[string]*resources.Schema{
"object": {
CreateSchema: &catalog.CreateSchema{
Name: "<schema>",
CatalogName: "<catalog>",
},
},
},
Models: map[string]*resources.MlflowModel{
"object": {
Model: &ml.Model{
Name: "<catalog>.<schema>.model",
},
},
},
Experiments: map[string]*resources.MlflowExperiment{
"object": {
Experiment: &ml.Experiment{
Name: "<catalog>.<schema>.experiment",
},
},
},
Clusters: map[string]*resources.Cluster{
"object": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "cluster",
},
},
},
Dashboards: map[string]*resources.Dashboard{
"object": {
Dashboard: &dashboards.Dashboard{
DisplayName: "dashboard",
},
},
},
},
},
}
b.Config.Presets = config.Presets{
Catalog: "my_catalog",
Schema: "my_schema",
}
// Stage 1: Apply presets BEFORE cleanup.
// Because all fields are already set to placeholders, Apply should NOT overwrite them (no-op).
ctx := context.Background()
diags := bundle.Apply(ctx, b, mutator.ApplyPresets())
require.NoError(t, diags.Error())
require.False(t, diags.HasError(), "unexpected error before cleanup: %v", diags.Error())
verifyNoChangesBeforeCleanup(t, b.Config)
// Verify that jobs got catalog/schema if they support it.
// For DBT tasks in jobs:
for _, job := range b.Config.Resources.Jobs {
if job.JobSettings != nil && job.Tasks != nil {
for _, task := range job.Tasks {
if task.DbtTask != nil {
require.Equal(t, "my_catalog", task.DbtTask.Catalog, "dbt catalog should be set")
require.Equal(t, "my_schema", task.DbtTask.Schema, "dbt schema should be set")
// Stage 2: Cleanup all "<catalog>" and "<schema>" placeholders
// and record where they were.
b.Config.MarkMutatorEntry(ctx)
resources := reflect.ValueOf(&b.Config.Resources).Elem()
recordedFields := recordAndCleanupFields(resources, "Resources")
b.Config.Resources.Jobs["object"].Parameters = nil
b.Config.MarkMutatorExit(ctx)
// Stage 3: Apply presets after cleanup.
diags = bundle.Apply(ctx, b, mutator.ApplyPresets())
require.False(t, diags.HasError(), "unexpected error after cleanup: %v", diags.Error())
verifyAllFields(t, b.Config, recordedFields)
// Stage 4: Verify that all known fields in config.Resources have been processed.
checkCompleteness(t, &b.Config.Resources, "Resources", recordedFields)
}
func verifyNoChangesBeforeCleanup(t *testing.T, cfg config.Root) {
t.Helper()
// Just check a few representative fields to ensure they are still placeholders.
// For example: Job parameter defaults should still have "<catalog>" and "<schema>"
jobParams := cfg.Resources.Jobs["object"].Parameters
require.Len(t, jobParams, 2, "job parameters count mismatch")
require.Equal(t, "<catalog>", jobParams[0].Default, "expected no changes before cleanup")
require.Equal(t, "<schema>", jobParams[1].Default, "expected no changes before cleanup")
pipeline := cfg.Resources.Pipelines["object"]
require.Equal(t, "<catalog>", pipeline.Catalog, "expected no changes before cleanup")
require.Equal(t, "<schema>", pipeline.Target, "expected no changes before cleanup")
}
// recordAndCleanupFields recursively finds all Catalog/CatalogName/Schema/SchemaName fields,
// records their original values, and replaces them with empty strings.
func recordAndCleanupFields(rv reflect.Value, path string) []RecordedField {
var recordedFields []RecordedField
switch rv.Kind() {
case reflect.Ptr, reflect.Interface:
if !rv.IsNil() {
recordedFields = append(recordedFields, recordAndCleanupFields(rv.Elem(), path)...)
}
case reflect.Struct:
tp := rv.Type()
for i := 0; i < rv.NumField(); i++ {
ft := tp.Field(i)
fv := rv.Field(i)
fPath := path + "." + ft.Name
if fv.Kind() == reflect.String {
original := fv.String()
newVal := cleanedValue(original)
if newVal != original {
fv.SetString(newVal)
recordedFields = append(recordedFields, RecordedField{fPath, original})
}
}
recordedFields = append(recordedFields, recordAndCleanupFieldsRecursive(fv, fPath)...)
}
case reflect.Map:
for _, mk := range rv.MapKeys() {
mVal := rv.MapIndex(mk)
recordedFields = append(recordedFields, recordAndCleanupFieldsRecursive(mVal, path+"."+mk.String())...)
}
case reflect.Slice, reflect.Array:
for i := 0; i < rv.Len(); i++ {
recordedFields = append(recordedFields, recordAndCleanupFieldsRecursive(rv.Index(i), fmt.Sprintf("%s[%d]", path, i))...)
}
}
return recordedFields
}
// verifyAllFields checks if all collected fields are now properly replaced after ApplyPresets.
func verifyAllFields(t *testing.T, cfg config.Root, recordedFields []RecordedField) {
t.Helper()
for _, f := range recordedFields {
expected := replaceCatalogSchemaPlaceholders(f.Value)
got := getStringValueAtPath(t, reflect.ValueOf(cfg), f.Path)
require.Equal(t, expected, got, "expected catalog/schema to be replaced by preset values at %s", f.Path)
}
}
// checkCompleteness ensures that all catalog/schema fields have been processed.
func checkCompleteness(t *testing.T, root interface{}, rootPath string, recordedFields []RecordedField) {
t.Helper()
recordedSet := make(map[string]bool)
for _, f := range recordedFields {
recordedSet[f.Path] = true
}
var check func(rv reflect.Value, path string)
check = func(rv reflect.Value, path string) {
switch rv.Kind() {
case reflect.Ptr, reflect.Interface:
if !rv.IsNil() {
check(rv.Elem(), path)
}
case reflect.Struct:
tp := rv.Type()
for i := 0; i < rv.NumField(); i++ {
ft := tp.Field(i)
fv := rv.Field(i)
fPath := path + "." + ft.Name
if isCatalogOrSchemaField(ft.Name) {
require.Truef(t, recordedSet[fPath],
"Field %s was not recorded in recordedFields (completeness check failed)", fPath)
}
check(fv, fPath)
}
case reflect.Map:
for _, mk := range rv.MapKeys() {
mVal := rv.MapIndex(mk)
check(mVal, path+"."+mk.String())
}
case reflect.Slice, reflect.Array:
for i := 0; i < rv.Len(); i++ {
check(rv.Index(i), fmt.Sprintf("%s[%d]", path, i))
}
}
}
// Pipelines: Catalog/Schema
for _, p := range b.Config.Resources.Pipelines {
if p.PipelineSpec != nil {
// pipeline catalog and schema
if p.Catalog == "" || p.Catalog == "hive_metastore" {
require.Equal(t, "my_catalog", p.Catalog, "pipeline catalog should be set")
rv := reflect.ValueOf(root)
if rv.Kind() == reflect.Ptr {
rv = rv.Elem()
}
require.Equal(t, "my_schema", p.Target, "pipeline schema (target) should be set")
check(rv, rootPath)
}
// getStringValueAtPath navigates the given path and returns the string value at that path.
func getStringValueAtPath(t *testing.T, root reflect.Value, path string) string {
t.Helper()
parts := strings.Split(path, ".")
return navigatePath(t, root, parts)
}
func navigatePath(t *testing.T, rv reflect.Value, parts []string) string {
t.Helper()
// Trim empty parts if any
for len(parts) > 0 && parts[0] == "" {
parts = parts[1:]
}
for len(parts) > 0 {
part := parts[0]
parts = parts[1:]
// Dereference pointers/interfaces before proceeding
for rv.Kind() == reflect.Ptr || rv.Kind() == reflect.Interface {
require.Falsef(t, rv.IsNil(), "nil pointer or interface encountered at part '%s'", part)
rv = rv.Elem()
}
// If the part has indexing like "Parameters[0]", split it into "Parameters" and "[0]"
var indexPart string
fieldName := part
if idx := strings.IndexRune(part, '['); idx != -1 {
// e.g. part = "Parameters[0]"
fieldName = part[:idx] // "Parameters"
indexPart = part[idx:] // "[0]"
require.Truef(t, strings.HasPrefix(indexPart, "["), "expected '[' in indexing")
require.Truef(t, strings.HasSuffix(indexPart, "]"), "expected ']' at end of indexing")
}
// Navigate down structures/maps
switch rv.Kind() {
case reflect.Struct:
// Find the struct field by name
ft, ok := rv.Type().FieldByName(fieldName)
if !ok {
t.Fatalf("Could not find field '%s' in struct at path", fieldName)
}
rv = rv.FieldByIndex(ft.Index)
case reflect.Map:
// Use fieldName as map key
mapVal := rv.MapIndex(reflect.ValueOf(fieldName))
require.Truef(t, mapVal.IsValid(), "no map entry '%s' found in path", fieldName)
rv = mapVal
default:
// If we're here, maybe we expected a struct or map but got something else
t.Fatalf("Unexpected kind '%s' when looking for '%s'", rv.Kind(), fieldName)
}
// If there's an index part, apply it now
if indexPart != "" {
// Dereference again if needed
for rv.Kind() == reflect.Ptr || rv.Kind() == reflect.Interface {
require.False(t, rv.IsNil(), "nil pointer or interface when indexing")
rv = rv.Elem()
}
require.Truef(t, rv.Kind() == reflect.Slice || rv.Kind() == reflect.Array, "expected slice/array for indexing but got %s", rv.Kind())
idxStr := indexPart[1 : len(indexPart)-1] // remove [ and ]
idx, err := strconv.Atoi(idxStr)
require.NoError(t, err, "invalid slice index %s", indexPart)
require.Truef(t, idx < rv.Len(), "index %d out of range in slice/array of length %d", idx, rv.Len())
rv = rv.Index(idx)
}
}
// Registered models: Catalog/Schema
for _, rm := range b.Config.Resources.RegisteredModels {
if rm.CreateRegisteredModelRequest != nil {
require.Equal(t, "my_catalog", rm.CatalogName, "registered model catalog should be set")
require.Equal(t, "my_schema", rm.SchemaName, "registered model schema should be set")
// Dereference if needed at the leaf
for rv.Kind() == reflect.Ptr || rv.Kind() == reflect.Interface {
require.False(t, rv.IsNil(), "nil pointer or interface at leaf")
rv = rv.Elem()
}
require.Equal(t, reflect.String, rv.Kind(), "expected a string at the final path")
return rv.String()
}
func isCatalogOrSchemaField(name string) bool {
switch name {
case "Catalog", "CatalogName", "Schema", "SchemaName", "Target":
return true
default:
return false
}
}
// Quality monitors: If paused, we rewrite tableName to include catalog.schema.
// In our code, if paused, we prepend catalog/schema if tableName wasn't already fully qualified.
// Let's verify that:
for _, qm := range b.Config.Resources.QualityMonitors {
// If not fully qualified (3 parts), it should have been rewritten.
parts := strings.Split(qm.TableName, ".")
if len(parts) != 3 {
require.Equal(t, fmt.Sprintf("my_catalog.my_schema.%s", parts[0]), qm.TableName, "quality monitor tableName should include catalog and schema")
}
func cleanedValue(value string) string {
value = strings.ReplaceAll(value, "<catalog>.", "")
value = strings.ReplaceAll(value, "<schema>.", "")
value = strings.ReplaceAll(value, "<catalog>", "")
value = strings.ReplaceAll(value, "<schema>", "")
return value
}
// Schemas: If there's a schema preset, we might replace the schema name or catalog name.
for _, s := range b.Config.Resources.Schemas {
if s.CreateSchema != nil {
// If catalog was empty before, now should be set:
require.Equal(t, "my_catalog", s.CatalogName, "schema catalog should be set")
// If schema was empty before, it should be set, but we did have "schema1",
// so let's verify that if schema had a name, prefix logic may apply:
// The code attempts to handle schema naming carefully. If t.Schema != "" and s.Name == "",
// s.Name is set to t.Schema. Since s.Name was originally "schema1", it should remain "schema1" with prefix applied.
// If you want to verify behavior, do so explicitly if changed code logic.
}
}
// Model serving endpoints currently return a warning that they don't support catalog/schema presets.
// We can just verify that the warning is generated or that no fields were set since they are not supported.
// The ApplyPresets code emits a diag error if we attempt to use catalog/schema with model serving endpoints.
// Let's check that we got an error diagnostic:
// The code currently returns a diag error if model serving endpoints are present and catalog/schema are set.
// So we verify diags here:
foundEndpointError := false
for _, d := range diags {
if strings.Contains(d.Summary, "model serving endpoints are not supported with catalog/schema presets") {
foundEndpointError = true
break
}
}
require.True(t, foundEndpointError, "should have diag error for model serving endpoints")
// Add assertions for any other resources that support catalog/schema if needed.
// This list is maintained manually. If you add new resource types that support catalog/schema,
// add them here as well.
// replaceCatalogSchemaPlaceholders replaces placeholders with the final expected values.
func replaceCatalogSchemaPlaceholders(value string) string {
value = strings.ReplaceAll(value, "<catalog>", "my_catalog")
value = strings.ReplaceAll(value, "<schema>", "my_schema")
return value
}