diff --git a/bundle/config/mutator/process_target_mode_test.go b/bundle/config/mutator/process_target_mode_test.go index 583efcfe..cf8229bf 100644 --- a/bundle/config/mutator/process_target_mode_test.go +++ b/bundle/config/mutator/process_target_mode_test.go @@ -97,6 +97,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle { RegisteredModels: map[string]*resources.RegisteredModel{ "registeredmodel1": {CreateRegisteredModelRequest: &catalog.CreateRegisteredModelRequest{Name: "registeredmodel1"}}, }, + QualityMonitors: map[string]*resources.QualityMonitor{ + "qualityMonitor1": {CreateMonitor: &catalog.CreateMonitor{TableName: "qualityMonitor1"}}, + }, }, }, // Use AWS implementation for testing. @@ -145,6 +148,9 @@ func TestProcessTargetModeDevelopment(t *testing.T) { // Registered model 1 assert.Equal(t, "dev_lennart_registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) + + // Quality Monitor 1 + assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) } func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) { @@ -200,6 +206,7 @@ func TestProcessTargetModeDefault(t *testing.T) { assert.False(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) + assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) } func TestProcessTargetModeProduction(t *testing.T) { @@ -240,6 +247,7 @@ func TestProcessTargetModeProduction(t *testing.T) { assert.False(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) + assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) } func TestProcessTargetModeProductionOkForPrincipal(t *testing.T) { diff --git a/bundle/config/mutator/run_as.go b/bundle/config/mutator/run_as.go index c5b294b2..aecd1d17 100644 --- a/bundle/config/mutator/run_as.go +++ b/bundle/config/mutator/run_as.go @@ -100,6 +100,16 @@ func validateRunAs(b *bundle.Bundle) error { } } + // Monitors do not support run_as in the API. + if len(b.Config.Resources.QualityMonitors) > 0 { + return errUnsupportedResourceTypeForRunAs{ + resourceType: "quality_monitors", + resourceLocation: b.Config.GetLocation("resources.quality_monitors"), + currentUser: b.Config.Workspace.CurrentUser.UserName, + runAsUser: identity, + } + } + return nil } diff --git a/bundle/config/mutator/run_as_test.go b/bundle/config/mutator/run_as_test.go index d6fb2939..c57de847 100644 --- a/bundle/config/mutator/run_as_test.go +++ b/bundle/config/mutator/run_as_test.go @@ -37,6 +37,7 @@ func allResourceTypes(t *testing.T) []string { "model_serving_endpoints", "models", "pipelines", + "quality_monitors", "registered_models", }, resourceTypes, diff --git a/bundle/config/resources.go b/bundle/config/resources.go index 41ffc25c..f70052ec 100644 --- a/bundle/config/resources.go +++ b/bundle/config/resources.go @@ -17,6 +17,7 @@ type Resources struct { Experiments map[string]*resources.MlflowExperiment `json:"experiments,omitempty"` ModelServingEndpoints map[string]*resources.ModelServingEndpoint `json:"model_serving_endpoints,omitempty"` RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"` + QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"` } type UniqueResourceIdTracker struct { @@ -123,6 +124,19 @@ func (r *Resources) VerifyUniqueResourceIdentifiers() (*UniqueResourceIdTracker, tracker.Type[k] = "registered_model" tracker.ConfigPath[k] = r.RegisteredModels[k].ConfigFilePath } + for k := range r.QualityMonitors { + if _, ok := tracker.Type[k]; ok { + return tracker, fmt.Errorf("multiple resources named %s (%s at %s, %s at %s)", + k, + tracker.Type[k], + tracker.ConfigPath[k], + "quality_monitor", + r.QualityMonitors[k].ConfigFilePath, + ) + } + tracker.Type[k] = "quality_monitor" + tracker.ConfigPath[k] = r.QualityMonitors[k].ConfigFilePath + } return tracker, nil } @@ -152,6 +166,9 @@ func (r *Resources) allResources() []resource { for k, e := range r.RegisteredModels { all = append(all, resource{resource_type: "registered model", resource: e, key: k}) } + for k, e := range r.QualityMonitors { + all = append(all, resource{resource_type: "quality monitor", resource: e, key: k}) + } return all } @@ -189,6 +206,9 @@ func (r *Resources) ConfigureConfigFilePath() { for _, e := range r.RegisteredModels { e.ConfigureConfigFilePath() } + for _, e := range r.QualityMonitors { + e.ConfigureConfigFilePath() + } } type ConfigResource interface { diff --git a/bundle/config/resources/quality_monitor.go b/bundle/config/resources/quality_monitor.go new file mode 100644 index 00000000..0d13e58f --- /dev/null +++ b/bundle/config/resources/quality_monitor.go @@ -0,0 +1,60 @@ +package resources + +import ( + "context" + "fmt" + + "github.com/databricks/cli/bundle/config/paths" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/marshal" + "github.com/databricks/databricks-sdk-go/service/catalog" +) + +type QualityMonitor struct { + // Represents the Input Arguments for Terraform and will get + // converted to a HCL representation for CRUD + *catalog.CreateMonitor + + // This represents the id which is the full name of the monitor + // (catalog_name.schema_name.table_name) that can be used + // as a reference in other resources. This value is returned by terraform. + ID string `json:"id,omitempty" bundle:"readonly"` + + // Path to config file where the resource is defined. All bundle resources + // include this for interpolation purposes. + paths.Paths + + ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"` +} + +func (s *QualityMonitor) UnmarshalJSON(b []byte) error { + return marshal.Unmarshal(b, s) +} + +func (s QualityMonitor) MarshalJSON() ([]byte, error) { + return marshal.Marshal(s) +} + +func (s *QualityMonitor) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) { + _, err := w.QualityMonitors.Get(ctx, catalog.GetQualityMonitorRequest{ + TableName: id, + }) + if err != nil { + log.Debugf(ctx, "quality monitor %s does not exist", id) + return false, err + } + return true, nil +} + +func (s *QualityMonitor) TerraformResourceName() string { + return "databricks_quality_monitor" +} + +func (s *QualityMonitor) Validate() error { + if s == nil || !s.DynamicValue.IsValid() { + return fmt.Errorf("quality monitor is not defined") + } + + return nil +} diff --git a/bundle/deploy/terraform/convert.go b/bundle/deploy/terraform/convert.go index d0b63358..a6ec04d9 100644 --- a/bundle/deploy/terraform/convert.go +++ b/bundle/deploy/terraform/convert.go @@ -222,6 +222,13 @@ func BundleToTerraform(config *config.Root) *schema.Root { } } + for k, src := range config.Resources.QualityMonitors { + noResources = false + var dst schema.ResourceQualityMonitor + conv(src, &dst) + tfroot.Resource.QualityMonitor[k] = &dst + } + // We explicitly set "resource" to nil to omit it from a JSON encoding. // This is required because the terraform CLI requires >= 1 resources defined // if the "resource" property is used in a .tf.json file. @@ -365,6 +372,16 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error { } cur.ID = instance.Attributes.ID config.Resources.RegisteredModels[resource.Name] = cur + case "databricks_quality_monitor": + if config.Resources.QualityMonitors == nil { + config.Resources.QualityMonitors = make(map[string]*resources.QualityMonitor) + } + cur := config.Resources.QualityMonitors[resource.Name] + if cur == nil { + cur = &resources.QualityMonitor{ModifiedStatus: resources.ModifiedStatusDeleted} + } + cur.ID = instance.Attributes.ID + config.Resources.QualityMonitors[resource.Name] = cur case "databricks_permissions": case "databricks_grants": // Ignore; no need to pull these back into the configuration. @@ -404,6 +421,11 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error { src.ModifiedStatus = resources.ModifiedStatusCreated } } + for _, src := range config.Resources.QualityMonitors { + if src.ModifiedStatus == "" && src.ID == "" { + src.ModifiedStatus = resources.ModifiedStatusCreated + } + } return nil } diff --git a/bundle/deploy/terraform/convert_test.go b/bundle/deploy/terraform/convert_test.go index 58523bb4..e1f73be2 100644 --- a/bundle/deploy/terraform/convert_test.go +++ b/bundle/deploy/terraform/convert_test.go @@ -629,6 +629,14 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) { {Attributes: stateInstanceAttributes{ID: "1"}}, }, }, + { + Type: "databricks_quality_monitor", + Mode: "managed", + Name: "test_monitor", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "1"}}, + }, + }, }, } err := TerraformToBundle(&tfState, &config) @@ -652,6 +660,9 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) { assert.Equal(t, "1", config.Resources.RegisteredModels["test_registered_model"].ID) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.RegisteredModels["test_registered_model"].ModifiedStatus) + assert.Equal(t, "1", config.Resources.QualityMonitors["test_monitor"].ID) + assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.QualityMonitors["test_monitor"].ModifiedStatus) + AssertFullResourceCoverage(t, &config) } @@ -700,6 +711,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) { }, }, }, + QualityMonitors: map[string]*resources.QualityMonitor{ + "test_monitor": { + CreateMonitor: &catalog.CreateMonitor{ + TableName: "test_monitor", + }, + }, + }, }, } var tfState = resourcesState{ @@ -726,6 +744,9 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) { assert.Equal(t, "", config.Resources.RegisteredModels["test_registered_model"].ID) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.RegisteredModels["test_registered_model"].ModifiedStatus) + assert.Equal(t, "", config.Resources.QualityMonitors["test_monitor"].ID) + assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.QualityMonitors["test_monitor"].ModifiedStatus) + AssertFullResourceCoverage(t, &config) } @@ -804,6 +825,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { }, }, }, + QualityMonitors: map[string]*resources.QualityMonitor{ + "test_monitor": { + CreateMonitor: &catalog.CreateMonitor{ + TableName: "test_monitor", + }, + }, + "test_monitor_new": { + CreateMonitor: &catalog.CreateMonitor{ + TableName: "test_monitor_new", + }, + }, + }, }, } var tfState = resourcesState{ @@ -904,6 +937,22 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { {Attributes: stateInstanceAttributes{ID: "2"}}, }, }, + { + Type: "databricks_quality_monitor", + Mode: "managed", + Name: "test_monitor", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "test_monitor"}}, + }, + }, + { + Type: "databricks_quality_monitor", + Mode: "managed", + Name: "test_monitor_old", + Instances: []stateResourceInstance{ + {Attributes: stateInstanceAttributes{ID: "test_monitor_old"}}, + }, + }, }, } err := TerraformToBundle(&tfState, &config) @@ -951,6 +1000,12 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { assert.Equal(t, "", config.Resources.ModelServingEndpoints["test_model_serving_new"].ID) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.ModelServingEndpoints["test_model_serving_new"].ModifiedStatus) + assert.Equal(t, "test_monitor", config.Resources.QualityMonitors["test_monitor"].ID) + assert.Equal(t, "", config.Resources.QualityMonitors["test_monitor"].ModifiedStatus) + assert.Equal(t, "test_monitor_old", config.Resources.QualityMonitors["test_monitor_old"].ID) + assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.QualityMonitors["test_monitor_old"].ModifiedStatus) + assert.Equal(t, "", config.Resources.QualityMonitors["test_monitor_new"].ID) + assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.QualityMonitors["test_monitor_new"].ModifiedStatus) AssertFullResourceCoverage(t, &config) } diff --git a/bundle/deploy/terraform/interpolate.go b/bundle/deploy/terraform/interpolate.go index 358279a7..608f1c79 100644 --- a/bundle/deploy/terraform/interpolate.go +++ b/bundle/deploy/terraform/interpolate.go @@ -54,6 +54,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D path = dyn.NewPath(dyn.Key("databricks_model_serving")).Append(path[2:]...) case dyn.Key("registered_models"): path = dyn.NewPath(dyn.Key("databricks_registered_model")).Append(path[2:]...) + case dyn.Key("quality_monitors"): + path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...) default: // Trigger "key not found" for unknown resource types. return dyn.GetByPath(root, path) diff --git a/bundle/deploy/terraform/tfdyn/convert_quality_monitor.go b/bundle/deploy/terraform/tfdyn/convert_quality_monitor.go new file mode 100644 index 00000000..341df7c2 --- /dev/null +++ b/bundle/deploy/terraform/tfdyn/convert_quality_monitor.go @@ -0,0 +1,37 @@ +package tfdyn + +import ( + "context" + + "github.com/databricks/cli/bundle/internal/tf/schema" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/log" +) + +func convertQualityMonitorResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) { + // Normalize the output value to the target schema. + vout, diags := convert.Normalize(schema.ResourceQualityMonitor{}, vin) + for _, diag := range diags { + log.Debugf(ctx, "monitor normalization diagnostic: %s", diag.Summary) + } + return vout, nil +} + +type qualityMonitorConverter struct{} + +func (qualityMonitorConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error { + vout, err := convertQualityMonitorResource(ctx, vin) + if err != nil { + return err + } + + // Add the converted resource to the output. + out.QualityMonitor[key] = vout.AsAny() + + return nil +} + +func init() { + registerConverter("quality_monitors", qualityMonitorConverter{}) +} diff --git a/bundle/deploy/terraform/tfdyn/convert_quality_monitor_test.go b/bundle/deploy/terraform/tfdyn/convert_quality_monitor_test.go new file mode 100644 index 00000000..50bfce7a --- /dev/null +++ b/bundle/deploy/terraform/tfdyn/convert_quality_monitor_test.go @@ -0,0 +1,46 @@ +package tfdyn + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/tf/schema" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConvertQualityMonitor(t *testing.T) { + var src = resources.QualityMonitor{ + CreateMonitor: &catalog.CreateMonitor{ + TableName: "test_table_name", + AssetsDir: "assets_dir", + OutputSchemaName: "output_schema_name", + InferenceLog: &catalog.MonitorInferenceLog{ + ModelIdCol: "model_id", + PredictionCol: "test_prediction_col", + ProblemType: "PROBLEM_TYPE_CLASSIFICATION", + }, + }, + } + vin, err := convert.FromTyped(src, dyn.NilValue) + require.NoError(t, err) + ctx := context.Background() + out := schema.NewResources() + err = qualityMonitorConverter{}.Convert(ctx, "my_monitor", vin, out) + + require.NoError(t, err) + assert.Equal(t, map[string]any{ + "assets_dir": "assets_dir", + "output_schema_name": "output_schema_name", + "table_name": "test_table_name", + "inference_log": map[string]any{ + "model_id_col": "model_id", + "prediction_col": "test_prediction_col", + "problem_type": "PROBLEM_TYPE_CLASSIFICATION", + }, + }, out.QualityMonitor["my_monitor"]) +} diff --git a/bundle/tests/quality_monitor/databricks.yml b/bundle/tests/quality_monitor/databricks.yml new file mode 100644 index 00000000..3abcdfdd --- /dev/null +++ b/bundle/tests/quality_monitor/databricks.yml @@ -0,0 +1,40 @@ +resources: + quality_monitors: + my_monitor: + table_name: "main.test.thing1" + assets_dir: "/Shared/provider-test/databricks_monitoring/main.test.thing1" + output_schema_name: "test" + inference_log: + granularities: ["1 day"] + timestamp_col: "timestamp" + prediction_col: "prediction" + model_id_col: "model_id" + problem_type: "PROBLEM_TYPE_REGRESSION" + +targets: + development: + mode: development + resources: + quality_monitors: + my_monitor: + table_name: "main.test.dev" + + staging: + resources: + quality_monitors: + my_monitor: + table_name: "main.test.staging" + output_schema_name: "staging" + + production: + resources: + quality_monitors: + my_monitor: + table_name: "main.test.prod" + output_schema_name: "prod" + inference_log: + granularities: ["1 hour"] + timestamp_col: "timestamp_prod" + prediction_col: "prediction_prod" + model_id_col: "model_id_prod" + problem_type: "PROBLEM_TYPE_REGRESSION" diff --git a/bundle/tests/quality_monitor_test.go b/bundle/tests/quality_monitor_test.go new file mode 100644 index 00000000..d5db0519 --- /dev/null +++ b/bundle/tests/quality_monitor_test.go @@ -0,0 +1,59 @@ +package config_tests + +import ( + "testing" + + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/catalog" + "github.com/stretchr/testify/assert" +) + +func assertExpectedMonitor(t *testing.T, p *resources.QualityMonitor) { + assert.Equal(t, "timestamp", p.InferenceLog.TimestampCol) + assert.Equal(t, "prediction", p.InferenceLog.PredictionCol) + assert.Equal(t, "model_id", p.InferenceLog.ModelIdCol) + assert.Equal(t, catalog.MonitorInferenceLogProblemType("PROBLEM_TYPE_REGRESSION"), p.InferenceLog.ProblemType) +} + +func TestMonitorTableNames(t *testing.T) { + b := loadTarget(t, "./quality_monitor", "development") + assert.Len(t, b.Config.Resources.QualityMonitors, 1) + assert.Equal(t, b.Config.Bundle.Mode, config.Development) + + p := b.Config.Resources.QualityMonitors["my_monitor"] + assert.Equal(t, "main.test.dev", p.TableName) + assert.Equal(t, "/Shared/provider-test/databricks_monitoring/main.test.thing1", p.AssetsDir) + assert.Equal(t, "test", p.OutputSchemaName) + + assertExpectedMonitor(t, p) +} + +func TestMonitorStaging(t *testing.T) { + b := loadTarget(t, "./quality_monitor", "staging") + assert.Len(t, b.Config.Resources.QualityMonitors, 1) + + p := b.Config.Resources.QualityMonitors["my_monitor"] + assert.Equal(t, "main.test.staging", p.TableName) + assert.Equal(t, "/Shared/provider-test/databricks_monitoring/main.test.thing1", p.AssetsDir) + assert.Equal(t, "staging", p.OutputSchemaName) + + assertExpectedMonitor(t, p) +} + +func TestMonitorProduction(t *testing.T) { + b := loadTarget(t, "./quality_monitor", "production") + assert.Len(t, b.Config.Resources.QualityMonitors, 1) + + p := b.Config.Resources.QualityMonitors["my_monitor"] + assert.Equal(t, "main.test.prod", p.TableName) + assert.Equal(t, "/Shared/provider-test/databricks_monitoring/main.test.thing1", p.AssetsDir) + assert.Equal(t, "prod", p.OutputSchemaName) + + inferenceLog := p.InferenceLog + assert.Equal(t, []string{"1 day", "1 hour"}, inferenceLog.Granularities) + assert.Equal(t, "timestamp_prod", p.InferenceLog.TimestampCol) + assert.Equal(t, "prediction_prod", p.InferenceLog.PredictionCol) + assert.Equal(t, "model_id_prod", p.InferenceLog.ModelIdCol) + assert.Equal(t, catalog.MonitorInferenceLogProblemType("PROBLEM_TYPE_REGRESSION"), p.InferenceLog.ProblemType) +} diff --git a/libs/dyn/convert/struct_info.go b/libs/dyn/convert/struct_info.go index dc3ed4da..595e52ed 100644 --- a/libs/dyn/convert/struct_info.go +++ b/libs/dyn/convert/struct_info.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/textutil" ) // structInfo holds the type information we need to efficiently @@ -84,6 +85,14 @@ func buildStructInfo(typ reflect.Type) structInfo { } name, _, _ := strings.Cut(sf.Tag.Get("json"), ",") + if typ.Name() == "QualityMonitor" && name == "-" { + urlName, _, _ := strings.Cut(sf.Tag.Get("url"), ",") + if urlName == "" || urlName == "-" { + name = textutil.CamelToSnakeCase(sf.Name) + } else { + name = urlName + } + } if name == "" || name == "-" { continue } diff --git a/libs/textutil/case.go b/libs/textutil/case.go new file mode 100644 index 00000000..a8c78059 --- /dev/null +++ b/libs/textutil/case.go @@ -0,0 +1,14 @@ +package textutil + +import "unicode" + +func CamelToSnakeCase(name string) string { + var out []rune = make([]rune, 0, len(name)*2) + for i, r := range name { + if i > 0 && unicode.IsUpper(r) { + out = append(out, '_') + } + out = append(out, unicode.ToLower(r)) + } + return string(out) +} diff --git a/libs/textutil/case_test.go b/libs/textutil/case_test.go new file mode 100644 index 00000000..77b3e067 --- /dev/null +++ b/libs/textutil/case_test.go @@ -0,0 +1,40 @@ +package textutil + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCamelToSnakeCase(t *testing.T) { + cases := []struct { + input string + expected string + }{ + { + input: "test", + expected: "test", + }, + { + input: "testTest", + expected: "test_test", + }, + { + input: "testTestTest", + expected: "test_test_test", + }, + { + input: "TestTest", + expected: "test_test", + }, + { + input: "TestTestTest", + expected: "test_test_test", + }, + } + + for _, c := range cases { + output := CamelToSnakeCase(c.input) + assert.Equal(t, c.expected, output) + } +}