diff --git a/bundle/config/resources/job.go b/bundle/config/resources/job.go index f10c36fe..17831de4 100644 --- a/bundle/config/resources/job.go +++ b/bundle/config/resources/job.go @@ -1,5 +1,9 @@ package resources +import "github.com/databricks/databricks-sdk-go/service/jobs" + type Job struct { ID string `json:"id,omitempty"` + + *jobs.JobSettings } diff --git a/bundle/config/resources/pipeline.go b/bundle/config/resources/pipeline.go index cc3df650..ee400455 100644 --- a/bundle/config/resources/pipeline.go +++ b/bundle/config/resources/pipeline.go @@ -1,5 +1,9 @@ package resources +import "github.com/databricks/databricks-sdk-go/service/pipelines" + type Pipeline struct { ID string `json:"id,omitempty"` + + *pipelines.PipelineSpec } diff --git a/bundle/config/root.go b/bundle/config/root.go index effabfdf..620e3e3f 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -99,7 +99,7 @@ func (r *Root) MergeEnvironment(env *Environment) error { } if env.Resources != nil { - err = mergo.MergeWithOverwrite(&r.Resources, env.Resources) + err = mergo.Merge(&r.Resources, env.Resources, mergo.WithAppendSlice) if err != nil { return err } diff --git a/bundle/config/tests/job_and_pipeline/bundle.yml b/bundle/config/tests/job_and_pipeline/bundle.yml new file mode 100644 index 00000000..f4a5719a --- /dev/null +++ b/bundle/config/tests/job_and_pipeline/bundle.yml @@ -0,0 +1,42 @@ +resources: + pipelines: + nyc_taxi_pipeline: + name: "nyc taxi loader" + libraries: + - notebook: + path: ./dlt/nyc_taxi_loader + +environments: + development: + resources: + pipelines: + nyc_taxi_pipeline: + target: nyc_taxi_development + development: true + + staging: + resources: + pipelines: + nyc_taxi_pipeline: + target: nyc_taxi_staging + development: false + + production: + resources: + pipelines: + nyc_taxi_pipeline: + target: nyc_taxi_production + development: false + photon: true + + jobs: + pipeline_schedule: + name: Daily refresh of production pipeline + + schedule: + quartz_cron_expression: 6 6 11 * * ? + timezone_id: UTC + + tasks: + - pipeline_task: + pipeline_id: "to be interpolated" diff --git a/bundle/config/tests/job_and_pipeline_test.go b/bundle/config/tests/job_and_pipeline_test.go new file mode 100644 index 00000000..809cb3c0 --- /dev/null +++ b/bundle/config/tests/job_and_pipeline_test.go @@ -0,0 +1,49 @@ +package config_tests + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestJobAndPipelineDevelopment(t *testing.T) { + root := loadEnvironment(t, "./job_and_pipeline", "development") + assert.Len(t, root.Resources.Jobs, 0) + assert.Len(t, root.Resources.Pipelines, 1) + + p := root.Resources.Pipelines["nyc_taxi_pipeline"] + assert.True(t, p.Development) + require.Len(t, p.Libraries, 1) + assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path) + assert.Equal(t, "nyc_taxi_development", p.Target) +} + +func TestJobAndPipelineStaging(t *testing.T) { + root := loadEnvironment(t, "./job_and_pipeline", "staging") + assert.Len(t, root.Resources.Jobs, 0) + assert.Len(t, root.Resources.Pipelines, 1) + + p := root.Resources.Pipelines["nyc_taxi_pipeline"] + assert.False(t, p.Development) + require.Len(t, p.Libraries, 1) + assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path) + assert.Equal(t, "nyc_taxi_staging", p.Target) +} + +func TestJobAndPipelineProduction(t *testing.T) { + root := loadEnvironment(t, "./job_and_pipeline", "production") + assert.Len(t, root.Resources.Jobs, 1) + assert.Len(t, root.Resources.Pipelines, 1) + + p := root.Resources.Pipelines["nyc_taxi_pipeline"] + assert.False(t, p.Development) + require.Len(t, p.Libraries, 1) + assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path) + assert.Equal(t, "nyc_taxi_production", p.Target) + + j := root.Resources.Jobs["pipeline_schedule"] + assert.Equal(t, "Daily refresh of production pipeline", j.Name) + require.Len(t, j.Tasks, 1) + assert.NotEmpty(t, j.Tasks[0].PipelineTask.PipelineId) +}