From 5aa89230e96635efaec5aa32a6a06a8cab9dec2f Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Fri, 7 Feb 2025 17:22:51 +0000 Subject: [PATCH] Use CreatePipeline instead of PipelineSpec for resources.Pipeline struct (#2287) ## Changes `CreatePipeline` is a more complete structure (superset of PipelineSpec one) which enables support of additional fields such as `run_as` and `allow_duplicate_names` in DABs configuration. Note: these fields are subject to support in TF in order to correctly work. ## Tests Existing tests pass + no fields are removed from JSON schema --- bundle/config/mutator/apply_presets.go | 2 +- .../mutator/capture_schema_dependency.go | 4 +-- .../mutator/capture_schema_dependency_test.go | 30 +++++++++---------- .../expand_pipeline_glob_paths_test.go | 2 +- bundle/config/mutator/initialize_urls_test.go | 4 +-- .../mutator/merge_pipeline_clusters_test.go | 4 +-- .../mutator/process_target_mode_test.go | 10 +++---- .../resolve_variable_references_test.go | 6 ++-- bundle/config/mutator/translate_paths_test.go | 16 +++++----- bundle/config/resources/pipeline.go | 4 +-- .../validate/single_node_cluster_test.go | 4 +-- bundle/deploy/metadata/annotate_pipelines.go | 4 +-- .../metadata/annotate_pipelines_test.go | 8 ++--- bundle/deploy/terraform/convert_test.go | 8 ++--- .../terraform/tfdyn/convert_pipeline.go | 5 ++++ .../terraform/tfdyn/convert_pipeline_test.go | 11 ++++++- bundle/internal/schema/annotations.yml | 10 +++++++ .../internal/schema/annotations_openapi.yml | 17 +++++++++++ .../schema/annotations_openapi_overrides.yml | 6 ++++ bundle/internal/schema/main.go | 15 ++++++++++ bundle/permissions/workspace_root_test.go | 8 ++--- bundle/render/render_text_output_test.go | 8 ++--- bundle/resources/completion_test.go | 4 +-- bundle/resources/lookup_test.go | 4 +-- bundle/run/pipeline.go | 4 +-- bundle/schema/jsonschema.json | 26 ++++++++++++++++ cmd/bundle/generate/pipeline.go | 2 +- libs/dyn/drop_keys.go | 27 +++++++++++++++++ libs/dyn/drop_keys_test.go | 24 +++++++++++++++ 29 files changed, 208 insertions(+), 69 deletions(-) create mode 100644 libs/dyn/drop_keys.go create mode 100644 libs/dyn/drop_keys_test.go diff --git a/bundle/config/mutator/apply_presets.go b/bundle/config/mutator/apply_presets.go index b402053e7..c8e7bf9e8 100644 --- a/bundle/config/mutator/apply_presets.go +++ b/bundle/config/mutator/apply_presets.go @@ -84,7 +84,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos // Pipelines presets: Prefix, PipelinesDevelopment for key, p := range r.Pipelines { - if p.PipelineSpec == nil { + if p.CreatePipeline == nil { diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key)) continue } diff --git a/bundle/config/mutator/capture_schema_dependency.go b/bundle/config/mutator/capture_schema_dependency.go index 5025c9a0d..2e17a8175 100644 --- a/bundle/config/mutator/capture_schema_dependency.go +++ b/bundle/config/mutator/capture_schema_dependency.go @@ -56,7 +56,7 @@ func resolveVolume(v *resources.Volume, b *bundle.Bundle) { } func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) { - if p == nil || p.PipelineSpec == nil { + if p == nil || p.CreatePipeline == nil { return } if p.Schema == "" { @@ -71,7 +71,7 @@ func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) { } func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) { - if p == nil || p.PipelineSpec == nil { + if p == nil || p.CreatePipeline == nil { return } if p.Target == "" { diff --git a/bundle/config/mutator/capture_schema_dependency_test.go b/bundle/config/mutator/capture_schema_dependency_test.go index 0a94e7748..16fa636ee 100644 --- a/bundle/config/mutator/capture_schema_dependency_test.go +++ b/bundle/config/mutator/capture_schema_dependency_test.go @@ -118,43 +118,43 @@ func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "pipeline1": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "catalog1", Schema: "foobar", }, }, "pipeline2": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "catalog2", Schema: "foobar", }, }, "pipeline3": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "catalog1", Schema: "barfoo", }, }, "pipeline4": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "catalogX", Schema: "foobar", }, }, "pipeline5": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "catalog1", Schema: "schemaX", }, }, "pipeline6": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "", Schema: "foobar", }, }, "pipeline7": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "", Schema: "", Name: "whatever", @@ -179,7 +179,7 @@ func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) { assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Schema) assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"]) - assert.Nil(t, b.Config.Resources.Pipelines["emptyPipeline"].PipelineSpec) + assert.Nil(t, b.Config.Resources.Pipelines["emptyPipeline"].CreatePipeline) for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} { assert.Empty(t, b.Config.Resources.Pipelines[k].Target) @@ -214,43 +214,43 @@ func TestCaptureSchemaDependencyForPipelinesWithSchema(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "pipeline1": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "catalog1", Target: "foobar", }, }, "pipeline2": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "catalog2", Target: "foobar", }, }, "pipeline3": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "catalog1", Target: "barfoo", }, }, "pipeline4": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "catalogX", Target: "foobar", }, }, "pipeline5": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "catalog1", Target: "schemaX", }, }, "pipeline6": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "", Target: "foobar", }, }, "pipeline7": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Catalog: "", Target: "", Name: "whatever", diff --git a/bundle/config/mutator/expand_pipeline_glob_paths_test.go b/bundle/config/mutator/expand_pipeline_glob_paths_test.go index 7cf3c9f3e..c5b1ad39d 100644 --- a/bundle/config/mutator/expand_pipeline_glob_paths_test.go +++ b/bundle/config/mutator/expand_pipeline_glob_paths_test.go @@ -47,7 +47,7 @@ func TestExpandGlobPathsInPipelines(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Libraries: []pipelines.PipelineLibrary{ { Notebook: &pipelines.NotebookLibrary{ diff --git a/bundle/config/mutator/initialize_urls_test.go b/bundle/config/mutator/initialize_urls_test.go index f07a7deb3..8c751079b 100644 --- a/bundle/config/mutator/initialize_urls_test.go +++ b/bundle/config/mutator/initialize_urls_test.go @@ -31,8 +31,8 @@ func TestInitializeURLs(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "pipeline1": { - ID: "3", - PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}, + ID: "3", + CreatePipeline: &pipelines.CreatePipeline{Name: "pipeline1"}, }, }, Experiments: map[string]*resources.MlflowExperiment{ diff --git a/bundle/config/mutator/merge_pipeline_clusters_test.go b/bundle/config/mutator/merge_pipeline_clusters_test.go index f117d9399..97ec44eea 100644 --- a/bundle/config/mutator/merge_pipeline_clusters_test.go +++ b/bundle/config/mutator/merge_pipeline_clusters_test.go @@ -19,7 +19,7 @@ func TestMergePipelineClusters(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "foo": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Clusters: []pipelines.PipelineCluster{ { NodeTypeId: "i3.xlarge", @@ -68,7 +68,7 @@ func TestMergePipelineClustersCaseInsensitive(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "foo": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Clusters: []pipelines.PipelineCluster{ { Label: "default", diff --git a/bundle/config/mutator/process_target_mode_test.go b/bundle/config/mutator/process_target_mode_test.go index 723b01ee3..6a0fd8e03 100644 --- a/bundle/config/mutator/process_target_mode_test.go +++ b/bundle/config/mutator/process_target_mode_test.go @@ -88,7 +88,7 @@ func mockBundle(mode config.Mode) *bundle.Bundle { }, }, Pipelines: map[string]*resources.Pipeline{ - "pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1", Continuous: true}}, + "pipeline1": {CreatePipeline: &pipelines.CreatePipeline{Name: "pipeline1", Continuous: true}}, }, Experiments: map[string]*resources.MlflowExperiment{ "experiment1": {Experiment: &ml.Experiment{Name: "/Users/lennart.kats@databricks.com/experiment1"}}, @@ -181,7 +181,7 @@ func TestProcessTargetModeDevelopment(t *testing.T) { // Pipeline 1 assert.Equal(t, "[dev lennart] pipeline1", b.Config.Resources.Pipelines["pipeline1"].Name) assert.False(t, b.Config.Resources.Pipelines["pipeline1"].Continuous) - assert.True(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) + assert.True(t, b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Development) // Experiment 1 assert.Equal(t, "/Users/lennart.kats@databricks.com/[dev lennart] experiment1", b.Config.Resources.Experiments["experiment1"].Name) @@ -316,7 +316,7 @@ func TestProcessTargetModeDefault(t *testing.T) { require.NoError(t, diags.Error()) assert.Equal(t, "job1", b.Config.Resources.Jobs["job1"].Name) assert.Equal(t, "pipeline1", b.Config.Resources.Pipelines["pipeline1"].Name) - assert.False(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) + assert.False(t, b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Development) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) @@ -362,7 +362,7 @@ func TestProcessTargetModeProduction(t *testing.T) { assert.Equal(t, "job1", b.Config.Resources.Jobs["job1"].Name) assert.Equal(t, "pipeline1", b.Config.Resources.Pipelines["pipeline1"].Name) - assert.False(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) + assert.False(t, b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Development) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) @@ -568,5 +568,5 @@ func TestPipelinesDevelopmentDisabled(t *testing.T) { diags := bundle.Apply(context.Background(), b, m) require.NoError(t, diags.Error()) - assert.False(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) + assert.False(t, b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Development) } diff --git a/bundle/config/mutator/resolve_variable_references_test.go b/bundle/config/mutator/resolve_variable_references_test.go index 44f6c8dbb..30969dc49 100644 --- a/bundle/config/mutator/resolve_variable_references_test.go +++ b/bundle/config/mutator/resolve_variable_references_test.go @@ -20,7 +20,7 @@ func TestResolveVariableReferencesWithSourceLinkedDeployment(t *testing.T) { true, func(t *testing.T, b *bundle.Bundle) { // Variables that use workspace file path should have SyncRootValue during resolution phase - require.Equal(t, "sync/root/path", b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Configuration["source"]) + require.Equal(t, "sync/root/path", b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Configuration["source"]) // The file path itself should remain the same require.Equal(t, "file/path", b.Config.Workspace.FilePath) @@ -29,7 +29,7 @@ func TestResolveVariableReferencesWithSourceLinkedDeployment(t *testing.T) { { false, func(t *testing.T, b *bundle.Bundle) { - require.Equal(t, "file/path", b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Configuration["source"]) + require.Equal(t, "file/path", b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Configuration["source"]) require.Equal(t, "file/path", b.Config.Workspace.FilePath) }, }, @@ -48,7 +48,7 @@ func TestResolveVariableReferencesWithSourceLinkedDeployment(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "pipeline1": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Configuration: map[string]string{ "source": "${workspace.file_path}", }, diff --git a/bundle/config/mutator/translate_paths_test.go b/bundle/config/mutator/translate_paths_test.go index aa6488ab0..6cfe5718a 100644 --- a/bundle/config/mutator/translate_paths_test.go +++ b/bundle/config/mutator/translate_paths_test.go @@ -179,7 +179,7 @@ func TestTranslatePaths(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Libraries: []pipelines.PipelineLibrary{ { Notebook: &pipelines.NotebookLibrary{ @@ -333,7 +333,7 @@ func TestTranslatePathsInSubdirectories(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Libraries: []pipelines.PipelineLibrary{ { File: &pipelines.FileLibrary{ @@ -488,7 +488,7 @@ func TestPipelineNotebookDoesNotExistError(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Libraries: []pipelines.PipelineLibrary{ { Notebook: &pipelines.NotebookLibrary{ @@ -532,7 +532,7 @@ func TestPipelineNotebookDoesNotExistErrorWithoutExtension(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Libraries: []pipelines.PipelineLibrary{ { Notebook: &pipelines.NotebookLibrary{ @@ -572,7 +572,7 @@ func TestPipelineFileDoesNotExistError(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Libraries: []pipelines.PipelineLibrary{ { File: &pipelines.FileLibrary{ @@ -677,7 +677,7 @@ func TestPipelineNotebookLibraryWithFileSourceError(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Libraries: []pipelines.PipelineLibrary{ { Notebook: &pipelines.NotebookLibrary{ @@ -712,7 +712,7 @@ func TestPipelineFileLibraryWithNotebookSourceError(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Libraries: []pipelines.PipelineLibrary{ { File: &pipelines.FileLibrary{ @@ -916,7 +916,7 @@ func TestTranslatePathsWithSourceLinkedDeployment(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Libraries: []pipelines.PipelineLibrary{ { Notebook: &pipelines.NotebookLibrary{ diff --git a/bundle/config/resources/pipeline.go b/bundle/config/resources/pipeline.go index 5127d07ba..57d9c4f19 100644 --- a/bundle/config/resources/pipeline.go +++ b/bundle/config/resources/pipeline.go @@ -16,7 +16,7 @@ type Pipeline struct { ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"` URL string `json:"url,omitempty" bundle:"internal"` - *pipelines.PipelineSpec + *pipelines.CreatePipeline } func (s *Pipeline) UnmarshalJSON(b []byte) error { @@ -59,5 +59,5 @@ func (s *Pipeline) GetURL() string { } func (s *Pipeline) IsNil() bool { - return s.PipelineSpec == nil + return s.CreatePipeline == nil } diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go index c3ead8ef6..be93420c6 100644 --- a/bundle/config/validate/single_node_cluster_test.go +++ b/bundle/config/validate/single_node_cluster_test.go @@ -238,7 +238,7 @@ func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "foo": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Clusters: []pipelines.PipelineCluster{ { SparkConf: tc.sparkConf, @@ -493,7 +493,7 @@ func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "foo": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Clusters: []pipelines.PipelineCluster{ { SparkConf: tc.sparkConf, diff --git a/bundle/deploy/metadata/annotate_pipelines.go b/bundle/deploy/metadata/annotate_pipelines.go index 990f48907..407aaea6e 100644 --- a/bundle/deploy/metadata/annotate_pipelines.go +++ b/bundle/deploy/metadata/annotate_pipelines.go @@ -20,11 +20,11 @@ func (m *annotatePipelines) Name() string { func (m *annotatePipelines) Apply(_ context.Context, b *bundle.Bundle) diag.Diagnostics { for _, pipeline := range b.Config.Resources.Pipelines { - if pipeline.PipelineSpec == nil { + if pipeline.CreatePipeline == nil { continue } - pipeline.PipelineSpec.Deployment = &pipelines.PipelineDeployment{ + pipeline.CreatePipeline.Deployment = &pipelines.PipelineDeployment{ Kind: pipelines.DeploymentKindBundle, MetadataFilePath: metadataFilePath(b), } diff --git a/bundle/deploy/metadata/annotate_pipelines_test.go b/bundle/deploy/metadata/annotate_pipelines_test.go index 448a022d0..606292724 100644 --- a/bundle/deploy/metadata/annotate_pipelines_test.go +++ b/bundle/deploy/metadata/annotate_pipelines_test.go @@ -21,12 +21,12 @@ func TestAnnotatePipelinesMutator(t *testing.T) { Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "my-pipeline-1": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Name: "My Pipeline One", }, }, "my-pipeline-2": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Name: "My Pipeline Two", }, }, @@ -43,14 +43,14 @@ func TestAnnotatePipelinesMutator(t *testing.T) { Kind: pipelines.DeploymentKindBundle, MetadataFilePath: "/a/b/c/metadata.json", }, - b.Config.Resources.Pipelines["my-pipeline-1"].PipelineSpec.Deployment) + b.Config.Resources.Pipelines["my-pipeline-1"].CreatePipeline.Deployment) assert.Equal(t, &pipelines.PipelineDeployment{ Kind: pipelines.DeploymentKindBundle, MetadataFilePath: "/a/b/c/metadata.json", }, - b.Config.Resources.Pipelines["my-pipeline-2"].PipelineSpec.Deployment) + b.Config.Resources.Pipelines["my-pipeline-2"].CreatePipeline.Deployment) } func TestAnnotatePipelinesMutatorPipelineWithoutASpec(t *testing.T) { diff --git a/bundle/deploy/terraform/convert_test.go b/bundle/deploy/terraform/convert_test.go index afc1fb22a..53d861b32 100644 --- a/bundle/deploy/terraform/convert_test.go +++ b/bundle/deploy/terraform/convert_test.go @@ -203,7 +203,7 @@ func TestBundleToTerraformForEachTaskLibraries(t *testing.T) { func TestBundleToTerraformPipeline(t *testing.T) { src := resources.Pipeline{ - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Name: "my pipeline", Libraries: []pipelines.PipelineLibrary{ { @@ -759,7 +759,7 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "test_pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Name: "test_pipeline", }, }, @@ -898,12 +898,12 @@ func TestTerraformToBundleModifiedResources(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "test_pipeline": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Name: "test_pipeline", }, }, "test_pipeline_new": { - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Name: "test_pipeline_new", }, }, diff --git a/bundle/deploy/terraform/tfdyn/convert_pipeline.go b/bundle/deploy/terraform/tfdyn/convert_pipeline.go index ea0c94d66..53a986864 100644 --- a/bundle/deploy/terraform/tfdyn/convert_pipeline.go +++ b/bundle/deploy/terraform/tfdyn/convert_pipeline.go @@ -21,6 +21,11 @@ func convertPipelineResource(ctx context.Context, vin dyn.Value) (dyn.Value, err return dyn.InvalidValue, err } + vout, err = dyn.DropKeys(vout, []string{"allow_duplicate_names", "dry_run"}) + if err != nil { + return dyn.InvalidValue, err + } + // Normalize the output value to the target schema. vout, diags := convert.Normalize(schema.ResourcePipeline{}, vout) for _, diag := range diags { diff --git a/bundle/deploy/terraform/tfdyn/convert_pipeline_test.go b/bundle/deploy/terraform/tfdyn/convert_pipeline_test.go index 0239bad18..d8de55bf0 100644 --- a/bundle/deploy/terraform/tfdyn/convert_pipeline_test.go +++ b/bundle/deploy/terraform/tfdyn/convert_pipeline_test.go @@ -15,8 +15,17 @@ import ( func TestConvertPipeline(t *testing.T) { src := resources.Pipeline{ - PipelineSpec: &pipelines.PipelineSpec{ + CreatePipeline: &pipelines.CreatePipeline{ Name: "my pipeline", + // This fields is not part of TF schema yet, but once we upgrade to TF version that supports it, this test will fail because run_as + // will be exposed which is expected and test will need to be updated. + RunAs: &pipelines.RunAs{ + UserName: "foo@bar.com", + }, + // We expect AllowDuplicateNames and DryRun to be ignored and not passed to the TF output. + // This is not supported by TF now, so we don't want to expose it. + AllowDuplicateNames: true, + DryRun: true, Libraries: []pipelines.PipelineLibrary{ { Notebook: &pipelines.NotebookLibrary{ diff --git a/bundle/internal/schema/annotations.yml b/bundle/internal/schema/annotations.yml index c10f43b04..2d1a6a3d8 100644 --- a/bundle/internal/schema/annotations.yml +++ b/bundle/internal/schema/annotations.yml @@ -414,6 +414,16 @@ github.com/databricks/cli/bundle/config/resources.Permission: "user_name": "description": |- The name of the user that has the permission set in level. +github.com/databricks/cli/bundle/config/resources.Pipeline: + "allow_duplicate_names": + "description": |- + PLACEHOLDER + "dry_run": + "description": |- + PLACEHOLDER + "run_as": + "description": |- + PLACEHOLDER github.com/databricks/cli/bundle/config/variable.Lookup: "alert": "description": |- diff --git a/bundle/internal/schema/annotations_openapi.yml b/bundle/internal/schema/annotations_openapi.yml index d5a9bf69e..d9a0be50e 100644 --- a/bundle/internal/schema/annotations_openapi.yml +++ b/bundle/internal/schema/annotations_openapi.yml @@ -371,6 +371,9 @@ github.com/databricks/cli/bundle/config/resources.ModelServingEndpoint: "description": |- Tags to be attached to the serving endpoint and automatically propagated to billing logs. github.com/databricks/cli/bundle/config/resources.Pipeline: + "allow_duplicate_names": + "description": |- + If false, deployment will fail if name conflicts with that of another pipeline. "budget_policy_id": "description": |- Budget policy of this pipeline. @@ -395,6 +398,7 @@ github.com/databricks/cli/bundle/config/resources.Pipeline: "development": "description": |- Whether the pipeline is in Development mode. Defaults to false. + "dry_run": {} "edition": "description": |- Pipeline product edition. @@ -425,6 +429,7 @@ github.com/databricks/cli/bundle/config/resources.Pipeline: "restart_window": "description": |- Restart window of this pipeline. + "run_as": {} "schema": "description": |- The default schema (database) where tables are read from or published to. The presence of this field implies that the pipeline is in direct publishing mode. @@ -2624,6 +2629,18 @@ github.com/databricks/databricks-sdk-go/service/pipelines.RestartWindow: "description": |- Time zone id of restart window. See https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-conf-mgmt-set-timezone.html for details. If not specified, UTC will be used. +github.com/databricks/databricks-sdk-go/service/pipelines.RunAs: + "_": + "description": |- + Write-only setting, available only in Create/Update calls. Specifies the user or service principal that the pipeline runs as. If not specified, the pipeline runs as the user who created the pipeline. + + Only `user_name` or `service_principal_name` can be specified. If both are specified, an error is thrown. + "service_principal_name": + "description": |- + Application ID of an active service principal. Setting this field requires the `servicePrincipal/user` role. + "user_name": + "description": |- + The email of an active workspace user. Users can only set this field to their own email. github.com/databricks/databricks-sdk-go/service/pipelines.SchemaSpec: "destination_catalog": "description": |- diff --git a/bundle/internal/schema/annotations_openapi_overrides.yml b/bundle/internal/schema/annotations_openapi_overrides.yml index 585886313..be83af2d1 100644 --- a/bundle/internal/schema/annotations_openapi_overrides.yml +++ b/bundle/internal/schema/annotations_openapi_overrides.yml @@ -239,9 +239,15 @@ github.com/databricks/cli/bundle/config/resources.Pipeline: - notebook: path: ./pipeline.py ``` + "dry_run": + "description": |- + PLACEHOLDER "permissions": "description": |- PLACEHOLDER + "run_as": + "description": |- + PLACEHOLDER github.com/databricks/cli/bundle/config/resources.QualityMonitor: "_": "markdown_description": |- diff --git a/bundle/internal/schema/main.go b/bundle/internal/schema/main.go index 38e099ece..2e0120e62 100644 --- a/bundle/internal/schema/main.go +++ b/bundle/internal/schema/main.go @@ -109,6 +109,20 @@ func removeJobsFields(typ reflect.Type, s jsonschema.Schema) jsonschema.Schema { return s } +func removePipelineFields(typ reflect.Type, s jsonschema.Schema) jsonschema.Schema { + switch typ { + case reflect.TypeOf(resources.Pipeline{}): + // Even though DABs supports this field, TF provider does not. Thus, we + // should not expose it to the user. + delete(s.Properties, "dry_run") + delete(s.Properties, "allow_duplicate_names") + default: + // Do nothing + } + + return s +} + // While volume_type is required in the volume create API, DABs automatically sets // it's value to "MANAGED" if it's not provided. Thus, we make it optional // in the bundle schema. @@ -168,6 +182,7 @@ func generateSchema(workdir, outputFile string) { // Generate the JSON schema from the bundle Go struct. s, err := jsonschema.FromType(reflect.TypeOf(config.Root{}), []func(reflect.Type, jsonschema.Schema) jsonschema.Schema{ removeJobsFields, + removePipelineFields, makeVolumeTypeOptional, a.addAnnotations, addInterpolationPatterns, diff --git a/bundle/permissions/workspace_root_test.go b/bundle/permissions/workspace_root_test.go index c48704a63..3e5f9c61b 100644 --- a/bundle/permissions/workspace_root_test.go +++ b/bundle/permissions/workspace_root_test.go @@ -38,8 +38,8 @@ func TestApplyWorkspaceRootPermissions(t *testing.T) { "job_2": {JobSettings: &jobs.JobSettings{Name: "job_2"}}, }, Pipelines: map[string]*resources.Pipeline{ - "pipeline_1": {PipelineSpec: &pipelines.PipelineSpec{}}, - "pipeline_2": {PipelineSpec: &pipelines.PipelineSpec{}}, + "pipeline_1": {CreatePipeline: &pipelines.CreatePipeline{}}, + "pipeline_2": {CreatePipeline: &pipelines.CreatePipeline{}}, }, Models: map[string]*resources.MlflowModel{ "model_1": {Model: &ml.Model{}}, @@ -98,8 +98,8 @@ func TestApplyWorkspaceRootPermissionsForAllPaths(t *testing.T) { "job_2": {JobSettings: &jobs.JobSettings{Name: "job_2"}}, }, Pipelines: map[string]*resources.Pipeline{ - "pipeline_1": {PipelineSpec: &pipelines.PipelineSpec{}}, - "pipeline_2": {PipelineSpec: &pipelines.PipelineSpec{}}, + "pipeline_1": {CreatePipeline: &pipelines.CreatePipeline{}}, + "pipeline_2": {CreatePipeline: &pipelines.CreatePipeline{}}, }, Models: map[string]*resources.MlflowModel{ "model_1": {Model: &ml.Model{}}, diff --git a/bundle/render/render_text_output_test.go b/bundle/render/render_text_output_test.go index 506756f70..d092e77c8 100644 --- a/bundle/render/render_text_output_test.go +++ b/bundle/render/render_text_output_test.go @@ -530,12 +530,12 @@ func TestRenderSummary(t *testing.T) { "pipeline2": { ID: "4", // no URL - PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline2-name"}, + CreatePipeline: &pipelines.CreatePipeline{Name: "pipeline2-name"}, }, "pipeline1": { - ID: "3", - URL: "https://url3", - PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1-name"}, + ID: "3", + URL: "https://url3", + CreatePipeline: &pipelines.CreatePipeline{Name: "pipeline1-name"}, }, }, Schemas: map[string]*resources.Schema{ diff --git a/bundle/resources/completion_test.go b/bundle/resources/completion_test.go index 80412b6f1..56559f18c 100644 --- a/bundle/resources/completion_test.go +++ b/bundle/resources/completion_test.go @@ -25,7 +25,7 @@ func TestCompletions_SkipDuplicates(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "foo": { - PipelineSpec: &pipelines.PipelineSpec{}, + CreatePipeline: &pipelines.CreatePipeline{}, }, }, }, @@ -50,7 +50,7 @@ func TestCompletions_Filter(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "bar": { - PipelineSpec: &pipelines.PipelineSpec{}, + CreatePipeline: &pipelines.CreatePipeline{}, }, }, }, diff --git a/bundle/resources/lookup_test.go b/bundle/resources/lookup_test.go index 0ea5af7a2..d95da977a 100644 --- a/bundle/resources/lookup_test.go +++ b/bundle/resources/lookup_test.go @@ -56,7 +56,7 @@ func TestLookup_MultipleFound(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "foo": { - PipelineSpec: &pipelines.PipelineSpec{}, + CreatePipeline: &pipelines.CreatePipeline{}, }, }, }, @@ -107,7 +107,7 @@ func TestLookup_NominalWithFilters(t *testing.T) { }, Pipelines: map[string]*resources.Pipeline{ "bar": { - PipelineSpec: &pipelines.PipelineSpec{}, + CreatePipeline: &pipelines.CreatePipeline{}, }, }, }, diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index bdcf0f142..1cd6e8743 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -79,10 +79,10 @@ type pipelineRunner struct { } func (r *pipelineRunner) Name() string { - if r.pipeline == nil || r.pipeline.PipelineSpec == nil { + if r.pipeline == nil || r.pipeline.CreatePipeline == nil { return "" } - return r.pipeline.PipelineSpec.Name + return r.pipeline.CreatePipeline.Name } func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, error) { diff --git a/bundle/schema/jsonschema.json b/bundle/schema/jsonschema.json index 7c72c440e..9d4304cd8 100644 --- a/bundle/schema/jsonschema.json +++ b/bundle/schema/jsonschema.json @@ -703,6 +703,9 @@ "description": "Restart window of this pipeline.", "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.RestartWindow" }, + "run_as": { + "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.RunAs" + }, "schema": { "description": "The default schema (database) where tables are read from or published to. The presence of this field implies that the pipeline is in direct publishing mode.", "$ref": "#/$defs/string" @@ -5385,6 +5388,29 @@ } ] }, + "pipelines.RunAs": { + "oneOf": [ + { + "type": "object", + "description": "Write-only setting, available only in Create/Update calls. Specifies the user or service principal that the pipeline runs as. If not specified, the pipeline runs as the user who created the pipeline.\n\nOnly `user_name` or `service_principal_name` can be specified. If both are specified, an error is thrown.", + "properties": { + "service_principal_name": { + "description": "Application ID of an active service principal. Setting this field requires the `servicePrincipal/user` role.", + "$ref": "#/$defs/string" + }, + "user_name": { + "description": "The email of an active workspace user. Users can only set this field to their own email.", + "$ref": "#/$defs/string" + } + }, + "additionalProperties": false + }, + { + "type": "string", + "pattern": "\\$\\{(var(\\.[a-zA-Z]+([-_]?[a-zA-Z0-9]+)*(\\[[0-9]+\\])*)+)\\}" + } + ] + }, "pipelines.SchemaSpec": { "oneOf": [ { diff --git a/cmd/bundle/generate/pipeline.go b/cmd/bundle/generate/pipeline.go index 1d2c345d6..9bf9e9947 100644 --- a/cmd/bundle/generate/pipeline.go +++ b/cmd/bundle/generate/pipeline.go @@ -92,7 +92,7 @@ func NewGeneratePipelineCommand() *cobra.Command { } saver := yamlsaver.NewSaverWithStyle( - // Including all PipelineSpec and nested fields which are map[string]string type + // Including all CreatePipeline and nested fields which are map[string]string type map[string]yaml.Style{ "spark_conf": yaml.DoubleQuotedStyle, "custom_tags": yaml.DoubleQuotedStyle, diff --git a/libs/dyn/drop_keys.go b/libs/dyn/drop_keys.go new file mode 100644 index 000000000..494f9b9cd --- /dev/null +++ b/libs/dyn/drop_keys.go @@ -0,0 +1,27 @@ +package dyn + +func DropKeys(v Value, drop []string) (Value, error) { + var err error + nv, err := Walk(v, func(p Path, v Value) (Value, error) { + if len(p) == 0 { + return v, nil + } + + // Check if this key should be dropped. + for _, key := range drop { + if p[0].Key() != key { + continue + } + + return InvalidValue, ErrDrop + } + + // Pass through all other values. + return v, ErrSkip + }) + if err != nil { + return InvalidValue, err + } + + return nv, nil +} diff --git a/libs/dyn/drop_keys_test.go b/libs/dyn/drop_keys_test.go new file mode 100644 index 000000000..83a9744ca --- /dev/null +++ b/libs/dyn/drop_keys_test.go @@ -0,0 +1,24 @@ +package dyn + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDropKeysTest(t *testing.T) { + v := V(map[string]Value{ + "key1": V("value1"), + "key2": V("value2"), + "key3": V("value3"), + }) + + vout, err := DropKeys(v, []string{"key1", "key3"}) + require.NoError(t, err) + + mv := vout.MustMap() + require.Equal(t, 1, mv.Len()) + v, ok := mv.GetByString("key2") + require.True(t, ok) + require.Equal(t, "value2", v.MustString()) +}