Use CreatePipeline instead of PipelineSpec for resources.Pipeline struct (#2287)

## Changes
`CreatePipeline` is a more complete structure (superset of PipelineSpec
one) which enables support of additional fields such as `run_as` and
`allow_duplicate_names` in DABs configuration. Note: these fields are
subject to support in TF in order to correctly work.

## Tests
Existing tests pass + no fields are removed from JSON schema
This commit is contained in:
Andrew Nester 2025-02-07 17:22:51 +00:00 committed by GitHub
parent ff4a5c2269
commit 5aa89230e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 208 additions and 69 deletions

View File

@ -84,7 +84,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
// Pipelines presets: Prefix, PipelinesDevelopment // Pipelines presets: Prefix, PipelinesDevelopment
for key, p := range r.Pipelines { for key, p := range r.Pipelines {
if p.PipelineSpec == nil { if p.CreatePipeline == nil {
diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key)) diags = diags.Extend(diag.Errorf("pipeline %s is not defined", key))
continue continue
} }

View File

@ -56,7 +56,7 @@ func resolveVolume(v *resources.Volume, b *bundle.Bundle) {
} }
func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) { func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) {
if p == nil || p.PipelineSpec == nil { if p == nil || p.CreatePipeline == nil {
return return
} }
if p.Schema == "" { if p.Schema == "" {
@ -71,7 +71,7 @@ func resolvePipelineSchema(p *resources.Pipeline, b *bundle.Bundle) {
} }
func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) { func resolvePipelineTarget(p *resources.Pipeline, b *bundle.Bundle) {
if p == nil || p.PipelineSpec == nil { if p == nil || p.CreatePipeline == nil {
return return
} }
if p.Target == "" { if p.Target == "" {

View File

@ -118,43 +118,43 @@ func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline1": { "pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "catalog1", Catalog: "catalog1",
Schema: "foobar", Schema: "foobar",
}, },
}, },
"pipeline2": { "pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "catalog2", Catalog: "catalog2",
Schema: "foobar", Schema: "foobar",
}, },
}, },
"pipeline3": { "pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "catalog1", Catalog: "catalog1",
Schema: "barfoo", Schema: "barfoo",
}, },
}, },
"pipeline4": { "pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "catalogX", Catalog: "catalogX",
Schema: "foobar", Schema: "foobar",
}, },
}, },
"pipeline5": { "pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "catalog1", Catalog: "catalog1",
Schema: "schemaX", Schema: "schemaX",
}, },
}, },
"pipeline6": { "pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "", Catalog: "",
Schema: "foobar", Schema: "foobar",
}, },
}, },
"pipeline7": { "pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "", Catalog: "",
Schema: "", Schema: "",
Name: "whatever", Name: "whatever",
@ -179,7 +179,7 @@ func TestCaptureSchemaDependencyForPipelinesWithTarget(t *testing.T) {
assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Schema) assert.Equal(t, "", b.Config.Resources.Pipelines["pipeline7"].Schema)
assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"]) assert.Nil(t, b.Config.Resources.Pipelines["nilPipeline"])
assert.Nil(t, b.Config.Resources.Pipelines["emptyPipeline"].PipelineSpec) assert.Nil(t, b.Config.Resources.Pipelines["emptyPipeline"].CreatePipeline)
for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} { for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Target) assert.Empty(t, b.Config.Resources.Pipelines[k].Target)
@ -214,43 +214,43 @@ func TestCaptureSchemaDependencyForPipelinesWithSchema(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline1": { "pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "catalog1", Catalog: "catalog1",
Target: "foobar", Target: "foobar",
}, },
}, },
"pipeline2": { "pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "catalog2", Catalog: "catalog2",
Target: "foobar", Target: "foobar",
}, },
}, },
"pipeline3": { "pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "catalog1", Catalog: "catalog1",
Target: "barfoo", Target: "barfoo",
}, },
}, },
"pipeline4": { "pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "catalogX", Catalog: "catalogX",
Target: "foobar", Target: "foobar",
}, },
}, },
"pipeline5": { "pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "catalog1", Catalog: "catalog1",
Target: "schemaX", Target: "schemaX",
}, },
}, },
"pipeline6": { "pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "", Catalog: "",
Target: "foobar", Target: "foobar",
}, },
}, },
"pipeline7": { "pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Catalog: "", Catalog: "",
Target: "", Target: "",
Name: "whatever", Name: "whatever",

View File

@ -47,7 +47,7 @@ func TestExpandGlobPathsInPipelines(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline": { "pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
Notebook: &pipelines.NotebookLibrary{ Notebook: &pipelines.NotebookLibrary{

View File

@ -31,8 +31,8 @@ func TestInitializeURLs(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline1": { "pipeline1": {
ID: "3", ID: "3",
PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}, CreatePipeline: &pipelines.CreatePipeline{Name: "pipeline1"},
}, },
}, },
Experiments: map[string]*resources.MlflowExperiment{ Experiments: map[string]*resources.MlflowExperiment{

View File

@ -19,7 +19,7 @@ func TestMergePipelineClusters(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"foo": { "foo": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Clusters: []pipelines.PipelineCluster{ Clusters: []pipelines.PipelineCluster{
{ {
NodeTypeId: "i3.xlarge", NodeTypeId: "i3.xlarge",
@ -68,7 +68,7 @@ func TestMergePipelineClustersCaseInsensitive(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"foo": { "foo": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Clusters: []pipelines.PipelineCluster{ Clusters: []pipelines.PipelineCluster{
{ {
Label: "default", Label: "default",

View File

@ -88,7 +88,7 @@ func mockBundle(mode config.Mode) *bundle.Bundle {
}, },
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1", Continuous: true}}, "pipeline1": {CreatePipeline: &pipelines.CreatePipeline{Name: "pipeline1", Continuous: true}},
}, },
Experiments: map[string]*resources.MlflowExperiment{ Experiments: map[string]*resources.MlflowExperiment{
"experiment1": {Experiment: &ml.Experiment{Name: "/Users/lennart.kats@databricks.com/experiment1"}}, "experiment1": {Experiment: &ml.Experiment{Name: "/Users/lennart.kats@databricks.com/experiment1"}},
@ -181,7 +181,7 @@ func TestProcessTargetModeDevelopment(t *testing.T) {
// Pipeline 1 // Pipeline 1
assert.Equal(t, "[dev lennart] pipeline1", b.Config.Resources.Pipelines["pipeline1"].Name) assert.Equal(t, "[dev lennart] pipeline1", b.Config.Resources.Pipelines["pipeline1"].Name)
assert.False(t, b.Config.Resources.Pipelines["pipeline1"].Continuous) assert.False(t, b.Config.Resources.Pipelines["pipeline1"].Continuous)
assert.True(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) assert.True(t, b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Development)
// Experiment 1 // Experiment 1
assert.Equal(t, "/Users/lennart.kats@databricks.com/[dev lennart] experiment1", b.Config.Resources.Experiments["experiment1"].Name) assert.Equal(t, "/Users/lennart.kats@databricks.com/[dev lennart] experiment1", b.Config.Resources.Experiments["experiment1"].Name)
@ -316,7 +316,7 @@ func TestProcessTargetModeDefault(t *testing.T) {
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
assert.Equal(t, "job1", b.Config.Resources.Jobs["job1"].Name) assert.Equal(t, "job1", b.Config.Resources.Jobs["job1"].Name)
assert.Equal(t, "pipeline1", b.Config.Resources.Pipelines["pipeline1"].Name) assert.Equal(t, "pipeline1", b.Config.Resources.Pipelines["pipeline1"].Name)
assert.False(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) assert.False(t, b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Development)
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
@ -362,7 +362,7 @@ func TestProcessTargetModeProduction(t *testing.T) {
assert.Equal(t, "job1", b.Config.Resources.Jobs["job1"].Name) assert.Equal(t, "job1", b.Config.Resources.Jobs["job1"].Name)
assert.Equal(t, "pipeline1", b.Config.Resources.Pipelines["pipeline1"].Name) assert.Equal(t, "pipeline1", b.Config.Resources.Pipelines["pipeline1"].Name)
assert.False(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) assert.False(t, b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Development)
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
@ -568,5 +568,5 @@ func TestPipelinesDevelopmentDisabled(t *testing.T) {
diags := bundle.Apply(context.Background(), b, m) diags := bundle.Apply(context.Background(), b, m)
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
assert.False(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) assert.False(t, b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Development)
} }

View File

@ -20,7 +20,7 @@ func TestResolveVariableReferencesWithSourceLinkedDeployment(t *testing.T) {
true, true,
func(t *testing.T, b *bundle.Bundle) { func(t *testing.T, b *bundle.Bundle) {
// Variables that use workspace file path should have SyncRootValue during resolution phase // Variables that use workspace file path should have SyncRootValue during resolution phase
require.Equal(t, "sync/root/path", b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Configuration["source"]) require.Equal(t, "sync/root/path", b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Configuration["source"])
// The file path itself should remain the same // The file path itself should remain the same
require.Equal(t, "file/path", b.Config.Workspace.FilePath) require.Equal(t, "file/path", b.Config.Workspace.FilePath)
@ -29,7 +29,7 @@ func TestResolveVariableReferencesWithSourceLinkedDeployment(t *testing.T) {
{ {
false, false,
func(t *testing.T, b *bundle.Bundle) { func(t *testing.T, b *bundle.Bundle) {
require.Equal(t, "file/path", b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Configuration["source"]) require.Equal(t, "file/path", b.Config.Resources.Pipelines["pipeline1"].CreatePipeline.Configuration["source"])
require.Equal(t, "file/path", b.Config.Workspace.FilePath) require.Equal(t, "file/path", b.Config.Workspace.FilePath)
}, },
}, },
@ -48,7 +48,7 @@ func TestResolveVariableReferencesWithSourceLinkedDeployment(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline1": { "pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Configuration: map[string]string{ Configuration: map[string]string{
"source": "${workspace.file_path}", "source": "${workspace.file_path}",
}, },

View File

@ -179,7 +179,7 @@ func TestTranslatePaths(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline": { "pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
Notebook: &pipelines.NotebookLibrary{ Notebook: &pipelines.NotebookLibrary{
@ -333,7 +333,7 @@ func TestTranslatePathsInSubdirectories(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline": { "pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
File: &pipelines.FileLibrary{ File: &pipelines.FileLibrary{
@ -488,7 +488,7 @@ func TestPipelineNotebookDoesNotExistError(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline": { "pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
Notebook: &pipelines.NotebookLibrary{ Notebook: &pipelines.NotebookLibrary{
@ -532,7 +532,7 @@ func TestPipelineNotebookDoesNotExistErrorWithoutExtension(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline": { "pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
Notebook: &pipelines.NotebookLibrary{ Notebook: &pipelines.NotebookLibrary{
@ -572,7 +572,7 @@ func TestPipelineFileDoesNotExistError(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline": { "pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
File: &pipelines.FileLibrary{ File: &pipelines.FileLibrary{
@ -677,7 +677,7 @@ func TestPipelineNotebookLibraryWithFileSourceError(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline": { "pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
Notebook: &pipelines.NotebookLibrary{ Notebook: &pipelines.NotebookLibrary{
@ -712,7 +712,7 @@ func TestPipelineFileLibraryWithNotebookSourceError(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline": { "pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
File: &pipelines.FileLibrary{ File: &pipelines.FileLibrary{
@ -916,7 +916,7 @@ func TestTranslatePathsWithSourceLinkedDeployment(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline": { "pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
Notebook: &pipelines.NotebookLibrary{ Notebook: &pipelines.NotebookLibrary{

View File

@ -16,7 +16,7 @@ type Pipeline struct {
ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"` ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
URL string `json:"url,omitempty" bundle:"internal"` URL string `json:"url,omitempty" bundle:"internal"`
*pipelines.PipelineSpec *pipelines.CreatePipeline
} }
func (s *Pipeline) UnmarshalJSON(b []byte) error { func (s *Pipeline) UnmarshalJSON(b []byte) error {
@ -59,5 +59,5 @@ func (s *Pipeline) GetURL() string {
} }
func (s *Pipeline) IsNil() bool { func (s *Pipeline) IsNil() bool {
return s.PipelineSpec == nil return s.CreatePipeline == nil
} }

View File

@ -238,7 +238,7 @@ func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"foo": { "foo": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Clusters: []pipelines.PipelineCluster{ Clusters: []pipelines.PipelineCluster{
{ {
SparkConf: tc.sparkConf, SparkConf: tc.sparkConf,
@ -493,7 +493,7 @@ func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"foo": { "foo": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Clusters: []pipelines.PipelineCluster{ Clusters: []pipelines.PipelineCluster{
{ {
SparkConf: tc.sparkConf, SparkConf: tc.sparkConf,

View File

@ -20,11 +20,11 @@ func (m *annotatePipelines) Name() string {
func (m *annotatePipelines) Apply(_ context.Context, b *bundle.Bundle) diag.Diagnostics { func (m *annotatePipelines) Apply(_ context.Context, b *bundle.Bundle) diag.Diagnostics {
for _, pipeline := range b.Config.Resources.Pipelines { for _, pipeline := range b.Config.Resources.Pipelines {
if pipeline.PipelineSpec == nil { if pipeline.CreatePipeline == nil {
continue continue
} }
pipeline.PipelineSpec.Deployment = &pipelines.PipelineDeployment{ pipeline.CreatePipeline.Deployment = &pipelines.PipelineDeployment{
Kind: pipelines.DeploymentKindBundle, Kind: pipelines.DeploymentKindBundle,
MetadataFilePath: metadataFilePath(b), MetadataFilePath: metadataFilePath(b),
} }

View File

@ -21,12 +21,12 @@ func TestAnnotatePipelinesMutator(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"my-pipeline-1": { "my-pipeline-1": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Name: "My Pipeline One", Name: "My Pipeline One",
}, },
}, },
"my-pipeline-2": { "my-pipeline-2": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Name: "My Pipeline Two", Name: "My Pipeline Two",
}, },
}, },
@ -43,14 +43,14 @@ func TestAnnotatePipelinesMutator(t *testing.T) {
Kind: pipelines.DeploymentKindBundle, Kind: pipelines.DeploymentKindBundle,
MetadataFilePath: "/a/b/c/metadata.json", MetadataFilePath: "/a/b/c/metadata.json",
}, },
b.Config.Resources.Pipelines["my-pipeline-1"].PipelineSpec.Deployment) b.Config.Resources.Pipelines["my-pipeline-1"].CreatePipeline.Deployment)
assert.Equal(t, assert.Equal(t,
&pipelines.PipelineDeployment{ &pipelines.PipelineDeployment{
Kind: pipelines.DeploymentKindBundle, Kind: pipelines.DeploymentKindBundle,
MetadataFilePath: "/a/b/c/metadata.json", MetadataFilePath: "/a/b/c/metadata.json",
}, },
b.Config.Resources.Pipelines["my-pipeline-2"].PipelineSpec.Deployment) b.Config.Resources.Pipelines["my-pipeline-2"].CreatePipeline.Deployment)
} }
func TestAnnotatePipelinesMutatorPipelineWithoutASpec(t *testing.T) { func TestAnnotatePipelinesMutatorPipelineWithoutASpec(t *testing.T) {

View File

@ -203,7 +203,7 @@ func TestBundleToTerraformForEachTaskLibraries(t *testing.T) {
func TestBundleToTerraformPipeline(t *testing.T) { func TestBundleToTerraformPipeline(t *testing.T) {
src := resources.Pipeline{ src := resources.Pipeline{
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Name: "my pipeline", Name: "my pipeline",
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
@ -759,7 +759,7 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"test_pipeline": { "test_pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Name: "test_pipeline", Name: "test_pipeline",
}, },
}, },
@ -898,12 +898,12 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"test_pipeline": { "test_pipeline": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Name: "test_pipeline", Name: "test_pipeline",
}, },
}, },
"test_pipeline_new": { "test_pipeline_new": {
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Name: "test_pipeline_new", Name: "test_pipeline_new",
}, },
}, },

View File

@ -21,6 +21,11 @@ func convertPipelineResource(ctx context.Context, vin dyn.Value) (dyn.Value, err
return dyn.InvalidValue, err return dyn.InvalidValue, err
} }
vout, err = dyn.DropKeys(vout, []string{"allow_duplicate_names", "dry_run"})
if err != nil {
return dyn.InvalidValue, err
}
// Normalize the output value to the target schema. // Normalize the output value to the target schema.
vout, diags := convert.Normalize(schema.ResourcePipeline{}, vout) vout, diags := convert.Normalize(schema.ResourcePipeline{}, vout)
for _, diag := range diags { for _, diag := range diags {

View File

@ -15,8 +15,17 @@ import (
func TestConvertPipeline(t *testing.T) { func TestConvertPipeline(t *testing.T) {
src := resources.Pipeline{ src := resources.Pipeline{
PipelineSpec: &pipelines.PipelineSpec{ CreatePipeline: &pipelines.CreatePipeline{
Name: "my pipeline", Name: "my pipeline",
// This fields is not part of TF schema yet, but once we upgrade to TF version that supports it, this test will fail because run_as
// will be exposed which is expected and test will need to be updated.
RunAs: &pipelines.RunAs{
UserName: "foo@bar.com",
},
// We expect AllowDuplicateNames and DryRun to be ignored and not passed to the TF output.
// This is not supported by TF now, so we don't want to expose it.
AllowDuplicateNames: true,
DryRun: true,
Libraries: []pipelines.PipelineLibrary{ Libraries: []pipelines.PipelineLibrary{
{ {
Notebook: &pipelines.NotebookLibrary{ Notebook: &pipelines.NotebookLibrary{

View File

@ -414,6 +414,16 @@ github.com/databricks/cli/bundle/config/resources.Permission:
"user_name": "user_name":
"description": |- "description": |-
The name of the user that has the permission set in level. The name of the user that has the permission set in level.
github.com/databricks/cli/bundle/config/resources.Pipeline:
"allow_duplicate_names":
"description": |-
PLACEHOLDER
"dry_run":
"description": |-
PLACEHOLDER
"run_as":
"description": |-
PLACEHOLDER
github.com/databricks/cli/bundle/config/variable.Lookup: github.com/databricks/cli/bundle/config/variable.Lookup:
"alert": "alert":
"description": |- "description": |-

View File

@ -371,6 +371,9 @@ github.com/databricks/cli/bundle/config/resources.ModelServingEndpoint:
"description": |- "description": |-
Tags to be attached to the serving endpoint and automatically propagated to billing logs. Tags to be attached to the serving endpoint and automatically propagated to billing logs.
github.com/databricks/cli/bundle/config/resources.Pipeline: github.com/databricks/cli/bundle/config/resources.Pipeline:
"allow_duplicate_names":
"description": |-
If false, deployment will fail if name conflicts with that of another pipeline.
"budget_policy_id": "budget_policy_id":
"description": |- "description": |-
Budget policy of this pipeline. Budget policy of this pipeline.
@ -395,6 +398,7 @@ github.com/databricks/cli/bundle/config/resources.Pipeline:
"development": "development":
"description": |- "description": |-
Whether the pipeline is in Development mode. Defaults to false. Whether the pipeline is in Development mode. Defaults to false.
"dry_run": {}
"edition": "edition":
"description": |- "description": |-
Pipeline product edition. Pipeline product edition.
@ -425,6 +429,7 @@ github.com/databricks/cli/bundle/config/resources.Pipeline:
"restart_window": "restart_window":
"description": |- "description": |-
Restart window of this pipeline. Restart window of this pipeline.
"run_as": {}
"schema": "schema":
"description": |- "description": |-
The default schema (database) where tables are read from or published to. The presence of this field implies that the pipeline is in direct publishing mode. The default schema (database) where tables are read from or published to. The presence of this field implies that the pipeline is in direct publishing mode.
@ -2624,6 +2629,18 @@ github.com/databricks/databricks-sdk-go/service/pipelines.RestartWindow:
"description": |- "description": |-
Time zone id of restart window. See https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-conf-mgmt-set-timezone.html for details. Time zone id of restart window. See https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-conf-mgmt-set-timezone.html for details.
If not specified, UTC will be used. If not specified, UTC will be used.
github.com/databricks/databricks-sdk-go/service/pipelines.RunAs:
"_":
"description": |-
Write-only setting, available only in Create/Update calls. Specifies the user or service principal that the pipeline runs as. If not specified, the pipeline runs as the user who created the pipeline.
Only `user_name` or `service_principal_name` can be specified. If both are specified, an error is thrown.
"service_principal_name":
"description": |-
Application ID of an active service principal. Setting this field requires the `servicePrincipal/user` role.
"user_name":
"description": |-
The email of an active workspace user. Users can only set this field to their own email.
github.com/databricks/databricks-sdk-go/service/pipelines.SchemaSpec: github.com/databricks/databricks-sdk-go/service/pipelines.SchemaSpec:
"destination_catalog": "destination_catalog":
"description": |- "description": |-

View File

@ -239,9 +239,15 @@ github.com/databricks/cli/bundle/config/resources.Pipeline:
- notebook: - notebook:
path: ./pipeline.py path: ./pipeline.py
``` ```
"dry_run":
"description": |-
PLACEHOLDER
"permissions": "permissions":
"description": |- "description": |-
PLACEHOLDER PLACEHOLDER
"run_as":
"description": |-
PLACEHOLDER
github.com/databricks/cli/bundle/config/resources.QualityMonitor: github.com/databricks/cli/bundle/config/resources.QualityMonitor:
"_": "_":
"markdown_description": |- "markdown_description": |-

View File

@ -109,6 +109,20 @@ func removeJobsFields(typ reflect.Type, s jsonschema.Schema) jsonschema.Schema {
return s return s
} }
func removePipelineFields(typ reflect.Type, s jsonschema.Schema) jsonschema.Schema {
switch typ {
case reflect.TypeOf(resources.Pipeline{}):
// Even though DABs supports this field, TF provider does not. Thus, we
// should not expose it to the user.
delete(s.Properties, "dry_run")
delete(s.Properties, "allow_duplicate_names")
default:
// Do nothing
}
return s
}
// While volume_type is required in the volume create API, DABs automatically sets // While volume_type is required in the volume create API, DABs automatically sets
// it's value to "MANAGED" if it's not provided. Thus, we make it optional // it's value to "MANAGED" if it's not provided. Thus, we make it optional
// in the bundle schema. // in the bundle schema.
@ -168,6 +182,7 @@ func generateSchema(workdir, outputFile string) {
// Generate the JSON schema from the bundle Go struct. // Generate the JSON schema from the bundle Go struct.
s, err := jsonschema.FromType(reflect.TypeOf(config.Root{}), []func(reflect.Type, jsonschema.Schema) jsonschema.Schema{ s, err := jsonschema.FromType(reflect.TypeOf(config.Root{}), []func(reflect.Type, jsonschema.Schema) jsonschema.Schema{
removeJobsFields, removeJobsFields,
removePipelineFields,
makeVolumeTypeOptional, makeVolumeTypeOptional,
a.addAnnotations, a.addAnnotations,
addInterpolationPatterns, addInterpolationPatterns,

View File

@ -38,8 +38,8 @@ func TestApplyWorkspaceRootPermissions(t *testing.T) {
"job_2": {JobSettings: &jobs.JobSettings{Name: "job_2"}}, "job_2": {JobSettings: &jobs.JobSettings{Name: "job_2"}},
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline_1": {PipelineSpec: &pipelines.PipelineSpec{}}, "pipeline_1": {CreatePipeline: &pipelines.CreatePipeline{}},
"pipeline_2": {PipelineSpec: &pipelines.PipelineSpec{}}, "pipeline_2": {CreatePipeline: &pipelines.CreatePipeline{}},
}, },
Models: map[string]*resources.MlflowModel{ Models: map[string]*resources.MlflowModel{
"model_1": {Model: &ml.Model{}}, "model_1": {Model: &ml.Model{}},
@ -98,8 +98,8 @@ func TestApplyWorkspaceRootPermissionsForAllPaths(t *testing.T) {
"job_2": {JobSettings: &jobs.JobSettings{Name: "job_2"}}, "job_2": {JobSettings: &jobs.JobSettings{Name: "job_2"}},
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline_1": {PipelineSpec: &pipelines.PipelineSpec{}}, "pipeline_1": {CreatePipeline: &pipelines.CreatePipeline{}},
"pipeline_2": {PipelineSpec: &pipelines.PipelineSpec{}}, "pipeline_2": {CreatePipeline: &pipelines.CreatePipeline{}},
}, },
Models: map[string]*resources.MlflowModel{ Models: map[string]*resources.MlflowModel{
"model_1": {Model: &ml.Model{}}, "model_1": {Model: &ml.Model{}},

View File

@ -530,12 +530,12 @@ func TestRenderSummary(t *testing.T) {
"pipeline2": { "pipeline2": {
ID: "4", ID: "4",
// no URL // no URL
PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline2-name"}, CreatePipeline: &pipelines.CreatePipeline{Name: "pipeline2-name"},
}, },
"pipeline1": { "pipeline1": {
ID: "3", ID: "3",
URL: "https://url3", URL: "https://url3",
PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1-name"}, CreatePipeline: &pipelines.CreatePipeline{Name: "pipeline1-name"},
}, },
}, },
Schemas: map[string]*resources.Schema{ Schemas: map[string]*resources.Schema{

View File

@ -25,7 +25,7 @@ func TestCompletions_SkipDuplicates(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"foo": { "foo": {
PipelineSpec: &pipelines.PipelineSpec{}, CreatePipeline: &pipelines.CreatePipeline{},
}, },
}, },
}, },
@ -50,7 +50,7 @@ func TestCompletions_Filter(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"bar": { "bar": {
PipelineSpec: &pipelines.PipelineSpec{}, CreatePipeline: &pipelines.CreatePipeline{},
}, },
}, },
}, },

View File

@ -56,7 +56,7 @@ func TestLookup_MultipleFound(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"foo": { "foo": {
PipelineSpec: &pipelines.PipelineSpec{}, CreatePipeline: &pipelines.CreatePipeline{},
}, },
}, },
}, },
@ -107,7 +107,7 @@ func TestLookup_NominalWithFilters(t *testing.T) {
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"bar": { "bar": {
PipelineSpec: &pipelines.PipelineSpec{}, CreatePipeline: &pipelines.CreatePipeline{},
}, },
}, },
}, },

View File

@ -79,10 +79,10 @@ type pipelineRunner struct {
} }
func (r *pipelineRunner) Name() string { func (r *pipelineRunner) Name() string {
if r.pipeline == nil || r.pipeline.PipelineSpec == nil { if r.pipeline == nil || r.pipeline.CreatePipeline == nil {
return "" return ""
} }
return r.pipeline.PipelineSpec.Name return r.pipeline.CreatePipeline.Name
} }
func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, error) { func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, error) {

View File

@ -703,6 +703,9 @@
"description": "Restart window of this pipeline.", "description": "Restart window of this pipeline.",
"$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.RestartWindow" "$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.RestartWindow"
}, },
"run_as": {
"$ref": "#/$defs/github.com/databricks/databricks-sdk-go/service/pipelines.RunAs"
},
"schema": { "schema": {
"description": "The default schema (database) where tables are read from or published to. The presence of this field implies that the pipeline is in direct publishing mode.", "description": "The default schema (database) where tables are read from or published to. The presence of this field implies that the pipeline is in direct publishing mode.",
"$ref": "#/$defs/string" "$ref": "#/$defs/string"
@ -5385,6 +5388,29 @@
} }
] ]
}, },
"pipelines.RunAs": {
"oneOf": [
{
"type": "object",
"description": "Write-only setting, available only in Create/Update calls. Specifies the user or service principal that the pipeline runs as. If not specified, the pipeline runs as the user who created the pipeline.\n\nOnly `user_name` or `service_principal_name` can be specified. If both are specified, an error is thrown.",
"properties": {
"service_principal_name": {
"description": "Application ID of an active service principal. Setting this field requires the `servicePrincipal/user` role.",
"$ref": "#/$defs/string"
},
"user_name": {
"description": "The email of an active workspace user. Users can only set this field to their own email.",
"$ref": "#/$defs/string"
}
},
"additionalProperties": false
},
{
"type": "string",
"pattern": "\\$\\{(var(\\.[a-zA-Z]+([-_]?[a-zA-Z0-9]+)*(\\[[0-9]+\\])*)+)\\}"
}
]
},
"pipelines.SchemaSpec": { "pipelines.SchemaSpec": {
"oneOf": [ "oneOf": [
{ {

View File

@ -92,7 +92,7 @@ func NewGeneratePipelineCommand() *cobra.Command {
} }
saver := yamlsaver.NewSaverWithStyle( saver := yamlsaver.NewSaverWithStyle(
// Including all PipelineSpec and nested fields which are map[string]string type // Including all CreatePipeline and nested fields which are map[string]string type
map[string]yaml.Style{ map[string]yaml.Style{
"spark_conf": yaml.DoubleQuotedStyle, "spark_conf": yaml.DoubleQuotedStyle,
"custom_tags": yaml.DoubleQuotedStyle, "custom_tags": yaml.DoubleQuotedStyle,

27
libs/dyn/drop_keys.go Normal file
View File

@ -0,0 +1,27 @@
package dyn
func DropKeys(v Value, drop []string) (Value, error) {
var err error
nv, err := Walk(v, func(p Path, v Value) (Value, error) {
if len(p) == 0 {
return v, nil
}
// Check if this key should be dropped.
for _, key := range drop {
if p[0].Key() != key {
continue
}
return InvalidValue, ErrDrop
}
// Pass through all other values.
return v, ErrSkip
})
if err != nil {
return InvalidValue, err
}
return nv, nil
}

View File

@ -0,0 +1,24 @@
package dyn
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestDropKeysTest(t *testing.T) {
v := V(map[string]Value{
"key1": V("value1"),
"key2": V("value2"),
"key3": V("value3"),
})
vout, err := DropKeys(v, []string{"key1", "key3"})
require.NoError(t, err)
mv := vout.MustMap()
require.Equal(t, 1, mv.Len())
v, ok := mv.GetByString("key2")
require.True(t, ok)
require.Equal(t, "value2", v.MustString())
}