This commit is contained in:
Shreyas Goenka 2024-05-02 16:16:17 +02:00
parent d67484203d
commit 557223e165
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
9 changed files with 141 additions and 267 deletions

View File

@ -17,114 +17,6 @@ type Resources struct {
Experiments map[string]*resources.MlflowExperiment `json:"experiments,omitempty"` Experiments map[string]*resources.MlflowExperiment `json:"experiments,omitempty"`
ModelServingEndpoints map[string]*resources.ModelServingEndpoint `json:"model_serving_endpoints,omitempty"` ModelServingEndpoints map[string]*resources.ModelServingEndpoint `json:"model_serving_endpoints,omitempty"`
RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"` RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"`
Schemas map[string]*resources.Schema `json:"schemas,omitempty"`
}
type UniqueResourceIdTracker struct {
Type map[string]string
ConfigPath map[string]string
}
// verifies merging is safe by checking no duplicate identifiers exist
func (r *Resources) VerifySafeMerge(other *Resources) error {
rootTracker, err := r.VerifyUniqueResourceIdentifiers()
if err != nil {
return err
}
otherTracker, err := other.VerifyUniqueResourceIdentifiers()
if err != nil {
return err
}
for k := range otherTracker.Type {
if _, ok := rootTracker.Type[k]; ok {
return fmt.Errorf("multiple resources named %s (%s at %s, %s at %s)",
k,
rootTracker.Type[k],
rootTracker.ConfigPath[k],
otherTracker.Type[k],
otherTracker.ConfigPath[k],
)
}
}
return nil
}
// This function verifies there are no duplicate names used for the resource definations
func (r *Resources) VerifyUniqueResourceIdentifiers() (*UniqueResourceIdTracker, error) {
tracker := &UniqueResourceIdTracker{
Type: make(map[string]string),
ConfigPath: make(map[string]string),
}
for k := range r.Jobs {
tracker.Type[k] = "job"
tracker.ConfigPath[k] = r.Jobs[k].ConfigFilePath
}
for k := range r.Pipelines {
if _, ok := tracker.Type[k]; ok {
return tracker, fmt.Errorf("multiple resources named %s (%s at %s, %s at %s)",
k,
tracker.Type[k],
tracker.ConfigPath[k],
"pipeline",
r.Pipelines[k].ConfigFilePath,
)
}
tracker.Type[k] = "pipeline"
tracker.ConfigPath[k] = r.Pipelines[k].ConfigFilePath
}
for k := range r.Models {
if _, ok := tracker.Type[k]; ok {
return tracker, fmt.Errorf("multiple resources named %s (%s at %s, %s at %s)",
k,
tracker.Type[k],
tracker.ConfigPath[k],
"mlflow_model",
r.Models[k].ConfigFilePath,
)
}
tracker.Type[k] = "mlflow_model"
tracker.ConfigPath[k] = r.Models[k].ConfigFilePath
}
for k := range r.Experiments {
if _, ok := tracker.Type[k]; ok {
return tracker, fmt.Errorf("multiple resources named %s (%s at %s, %s at %s)",
k,
tracker.Type[k],
tracker.ConfigPath[k],
"mlflow_experiment",
r.Experiments[k].ConfigFilePath,
)
}
tracker.Type[k] = "mlflow_experiment"
tracker.ConfigPath[k] = r.Experiments[k].ConfigFilePath
}
for k := range r.ModelServingEndpoints {
if _, ok := tracker.Type[k]; ok {
return tracker, fmt.Errorf("multiple resources named %s (%s at %s, %s at %s)",
k,
tracker.Type[k],
tracker.ConfigPath[k],
"model_serving_endpoint",
r.ModelServingEndpoints[k].ConfigFilePath,
)
}
tracker.Type[k] = "model_serving_endpoint"
tracker.ConfigPath[k] = r.ModelServingEndpoints[k].ConfigFilePath
}
for k := range r.RegisteredModels {
if _, ok := tracker.Type[k]; ok {
return tracker, fmt.Errorf("multiple resources named %s (%s at %s, %s at %s)",
k,
tracker.Type[k],
tracker.ConfigPath[k],
"registered_model",
r.RegisteredModels[k].ConfigFilePath,
)
}
tracker.Type[k] = "registered_model"
tracker.ConfigPath[k] = r.RegisteredModels[k].ConfigFilePath
}
return tracker, nil
} }
// ConfigureConfigFilePath sets the specified path for all resources contained in this instance. // ConfigureConfigFilePath sets the specified path for all resources contained in this instance.

View File

@ -1,25 +0,0 @@
package resources
import (
"github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/databricks-sdk-go/service/catalog"
)
type Schema struct {
// List of grants to apply on this schema.
Grants []Grant `json:"grants,omitempty"`
// This represents the id which is the full name of the schema
// (catalog_name.schema_name) that can be used
// as a reference in other resources. This value is returned by terraform.
// TODO: verify the accuracy of this comment
ID string `json:"id,omitempty" bundle:"readonly"`
// Path to config file where the resource is defined. All bundle resources
// include this for interpolation purposes.
paths.Paths
*catalog.CreateSchema
ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
}

View File

@ -1,127 +0,0 @@
package config
import (
"testing"
"github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/bundle/config/resources"
"github.com/stretchr/testify/assert"
)
func TestVerifyUniqueResourceIdentifiers(t *testing.T) {
r := Resources{
Jobs: map[string]*resources.Job{
"foo": {
Paths: paths.Paths{
ConfigFilePath: "foo.yml",
},
},
},
Models: map[string]*resources.MlflowModel{
"bar": {
Paths: paths.Paths{
ConfigFilePath: "bar.yml",
},
},
},
Experiments: map[string]*resources.MlflowExperiment{
"foo": {
Paths: paths.Paths{
ConfigFilePath: "foo2.yml",
},
},
},
}
_, err := r.VerifyUniqueResourceIdentifiers()
assert.ErrorContains(t, err, "multiple resources named foo (job at foo.yml, mlflow_experiment at foo2.yml)")
}
func TestVerifySafeMerge(t *testing.T) {
r := Resources{
Jobs: map[string]*resources.Job{
"foo": {
Paths: paths.Paths{
ConfigFilePath: "foo.yml",
},
},
},
Models: map[string]*resources.MlflowModel{
"bar": {
Paths: paths.Paths{
ConfigFilePath: "bar.yml",
},
},
},
}
other := Resources{
Pipelines: map[string]*resources.Pipeline{
"foo": {
Paths: paths.Paths{
ConfigFilePath: "foo2.yml",
},
},
},
}
err := r.VerifySafeMerge(&other)
assert.ErrorContains(t, err, "multiple resources named foo (job at foo.yml, pipeline at foo2.yml)")
}
func TestVerifySafeMergeForSameResourceType(t *testing.T) {
r := Resources{
Jobs: map[string]*resources.Job{
"foo": {
Paths: paths.Paths{
ConfigFilePath: "foo.yml",
},
},
},
Models: map[string]*resources.MlflowModel{
"bar": {
Paths: paths.Paths{
ConfigFilePath: "bar.yml",
},
},
},
}
other := Resources{
Jobs: map[string]*resources.Job{
"foo": {
Paths: paths.Paths{
ConfigFilePath: "foo2.yml",
},
},
},
}
err := r.VerifySafeMerge(&other)
assert.ErrorContains(t, err, "multiple resources named foo (job at foo.yml, job at foo2.yml)")
}
func TestVerifySafeMergeForRegisteredModels(t *testing.T) {
r := Resources{
Jobs: map[string]*resources.Job{
"foo": {
Paths: paths.Paths{
ConfigFilePath: "foo.yml",
},
},
},
RegisteredModels: map[string]*resources.RegisteredModel{
"bar": {
Paths: paths.Paths{
ConfigFilePath: "bar.yml",
},
},
},
}
other := Resources{
RegisteredModels: map[string]*resources.RegisteredModel{
"bar": {
Paths: paths.Paths{
ConfigFilePath: "bar2.yml",
},
},
},
}
err := r.VerifySafeMerge(&other)
assert.ErrorContains(t, err, "multiple resources named bar (registered_model at bar.yml, registered_model at bar2.yml)")
}

View File

@ -97,7 +97,8 @@ func Load(path string) (*Root, diag.Diagnostics) {
return nil, diag.Errorf("failed to load %s: %v", path, err) return nil, diag.Errorf("failed to load %s: %v", path, err)
} }
_, err = r.Resources.VerifyUniqueResourceIdentifiers() // Error if there are duplicate resource identifiers in the config file.
_, err = r.gatherResourceIdentifiers()
if err != nil { if err != nil {
diags = diags.Extend(diag.FromErr(err)) diags = diags.Extend(diag.FromErr(err))
} }
@ -265,7 +266,7 @@ func (r *Root) InitializeVariables(vars []string) error {
func (r *Root) Merge(other *Root) error { func (r *Root) Merge(other *Root) error {
// Check for safe merge, protecting against duplicate resource identifiers // Check for safe merge, protecting against duplicate resource identifiers
err := r.Resources.VerifySafeMerge(&other.Resources) err := r.verifySafeMerge(*other)
if err != nil { if err != nil {
return err return err
} }
@ -463,3 +464,84 @@ func (r Root) GetLocation(path string) dyn.Location {
} }
return v.Location() return v.Location()
} }
// This function verifies that the merge of two Root objects is safe. It checks
// there will be no duplicate resource identifiers in the merged configuration.
func (r Root) verifySafeMerge(other Root) error {
paths, err := r.gatherResourceIdentifiers()
if err != nil {
return err
}
otherPaths, err := other.gatherResourceIdentifiers()
if err != nil {
return err
}
// If duplicate keys exist, return
for k, p := range paths {
if _, ok := otherPaths[k]; ok {
// Type and location of the existing resource in the map.
ot := strings.TrimSuffix(paths[k][0].Key(), "s")
ov, _ := dyn.GetByPath(r.value.Get("resources"), paths[k])
ol := ov.Location()
// Type and location of the newly encountered resource with a duplicate name.
nt := strings.TrimSuffix(p[0].Key(), "s")
nv, _ := dyn.GetByPath(r.value.Get("resources"), p)
nl := nv.Location()
// Error, encountered a duplicate resource identifier.
return fmt.Errorf("multiple resources named %s (%s at %v, %s at %v)", k, ot, ol, nt, nl)
}
}
return nil
}
// This function gathers the resource identifiers, which exist in the bundle configuration
// in the form: resources.<resource_type>.<resource_identifiers>.
//
// It returns an error if it encounters a duplicate resource identifiers.
//
// Otherwise it returns a map of resource identifiers to their paths in the configuration tree
// relative to the resources key.
func (r Root) gatherResourceIdentifiers() (map[string]dyn.Path, error) {
paths := make(map[string]dyn.Path)
rrv := r.value.Get("resources")
// Walk the resources tree and accumulate the resource identifiers.
_, err := dyn.Walk(rrv, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
// The path is expected to be of length 2, and of the form <resource_type>.<resource_identifier>.
// Eg: jobs.my_job, pipelines.my_pipeline, etc.
if len(p) < 2 {
return v, nil
}
if len(p) > 2 {
return v, dyn.ErrSkip
}
// TODO: Add validation that the resource is a map.
// If the resource identifier already exists in the map, return an error.
k := p[1].Key()
if _, ok := paths[k]; ok {
// Type and location of the existing resource in the map.
ot := strings.TrimSuffix(paths[k][0].Key(), "s")
ov, _ := dyn.GetByPath(rrv, paths[k])
ol := ov.Location()
// Type and location of the newly encountered with a duplicate name.
nt := strings.TrimSuffix(p[0].Key(), "s")
nv, _ := dyn.GetByPath(rrv, p)
nl := nv.Location()
// Error, encountered a duplicate resource identifier.
return v, fmt.Errorf("multiple resources named %s (%s at %v, %s at %v)", k, ot, ol, nt, nl)
}
// Accumulate the resource identifier and its path.
paths[k] = p
return v, nil
})
return paths, err
}

View File

@ -30,12 +30,17 @@ func TestRootLoad(t *testing.T) {
assert.Equal(t, "basic", root.Bundle.Name) assert.Equal(t, "basic", root.Bundle.Name)
} }
func TestDuplicateIdOnLoadReturnsError(t *testing.T) { func TestDuplicateIdOnLoadReturnsErrorForJobAndPipeline(t *testing.T) {
_, diags := Load("./testdata/duplicate_resource_names_in_root/databricks.yml") _, diags := Load("./testdata/duplicate_resource_names_in_root_job_and_pipeline/databricks.yml")
assert.ErrorContains(t, diags.Error(), "multiple resources named foo (job at ./testdata/duplicate_resource_names_in_root/databricks.yml, pipeline at ./testdata/duplicate_resource_names_in_root/databricks.yml)") assert.ErrorContains(t, diags.Error(), "multiple resources named foo (job at ./testdata/duplicate_resource_names_in_root_job_and_pipeline/databricks.yml:10:7, pipeline at ./testdata/duplicate_resource_names_in_root_job_and_pipeline/databricks.yml:15:7)")
} }
func TestDuplicateIdOnMergeReturnsError(t *testing.T) { func TestDuplicateIdOnLoadReturnsErrorForJobsAndExperiments(t *testing.T) {
_, diags := Load("./testdata/duplicate_resource_names_in_root_job_and_experiment/databricks.yml")
assert.ErrorContains(t, diags.Error(), "multiple resources named foo (job at ./testdata/duplicate_resource_names_in_root_jobs_and_experiments/databricks.yml:10:7, experiment at ./testdata/duplicate_resource_names_in_root_jobs_and_experiments/databricks.yml:18:7)")
}
func TestDuplicateIdOnMergeReturnsErrorForJobAndPipeline(t *testing.T) {
root, diags := Load("./testdata/duplicate_resource_name_in_subconfiguration/databricks.yml") root, diags := Load("./testdata/duplicate_resource_name_in_subconfiguration/databricks.yml")
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
@ -43,7 +48,18 @@ func TestDuplicateIdOnMergeReturnsError(t *testing.T) {
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
err := root.Merge(other) err := root.Merge(other)
assert.ErrorContains(t, err, "multiple resources named foo (job at ./testdata/duplicate_resource_name_in_subconfiguration/databricks.yml, pipeline at ./testdata/duplicate_resource_name_in_subconfiguration/resources.yml)") assert.ErrorContains(t, err, "multiple resources named foo (job at ./testdata/duplicate_resource_name_in_subconfiguration/databricks.yml:10:7, job at ./testdata/duplicate_resource_name_in_subconfiguration/databricks.yml:10:7)")
}
func TestDuplicateIdOnMergeReturnsErrorForJobAndJob(t *testing.T) {
root, diags := Load("./testdata/duplicate_resource_name_in_subconfiguration_job_and_job/databricks.yml")
require.NoError(t, diags.Error())
other, diags := Load("./testdata/duplicate_resource_name_in_subconfiguration_job_and_job/resources.yml")
require.NoError(t, diags.Error())
err := root.Merge(other)
assert.ErrorContains(t, err, "multiple resources named foo (job at ./testdata/duplicate_resource_name_in_subconfiguration_job_and_job/databricks.yml:10:7, job at ./testdata/duplicate_resource_name_in_subconfiguration_job_and_job/databricks.yml:10:7)")
} }
func TestInitializeVariables(t *testing.T) { func TestInitializeVariables(t *testing.T) {

View File

@ -0,0 +1,10 @@
bundle:
name: test
workspace:
profile: test
resources:
jobs:
foo:
name: job foo

View File

@ -0,0 +1,4 @@
resources:
jobs:
foo:
name: pipeline foo

View File

@ -0,0 +1,18 @@
bundle:
name: test
workspace:
profile: test
resources:
jobs:
foo:
name: job foo
bar:
name: job bar
pipelines:
baz:
name: pipeline baz
experiments:
foo:
name: experiment foo

View File

@ -8,6 +8,10 @@ resources:
jobs: jobs:
foo: foo:
name: job foo name: job foo
bar:
name: job bar
pipelines: pipelines:
foo: foo:
name: pipeline foo name: pipeline foo
baz:
name: pipeline baz