Add job and pipeline structs (#94)

This commit is contained in:
Pieter Noordhuis 2022-11-18 11:12:24 +01:00 committed by GitHub
parent e47fa61951
commit 195eb7f0f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 1 deletions

View File

@ -1,5 +1,9 @@
package resources
import "github.com/databricks/databricks-sdk-go/service/jobs"
type Job struct {
ID string `json:"id,omitempty"`
*jobs.JobSettings
}

View File

@ -1,5 +1,9 @@
package resources
import "github.com/databricks/databricks-sdk-go/service/pipelines"
type Pipeline struct {
ID string `json:"id,omitempty"`
*pipelines.PipelineSpec
}

View File

@ -99,7 +99,7 @@ func (r *Root) MergeEnvironment(env *Environment) error {
}
if env.Resources != nil {
err = mergo.MergeWithOverwrite(&r.Resources, env.Resources)
err = mergo.Merge(&r.Resources, env.Resources, mergo.WithAppendSlice)
if err != nil {
return err
}

View File

@ -0,0 +1,42 @@
resources:
pipelines:
nyc_taxi_pipeline:
name: "nyc taxi loader"
libraries:
- notebook:
path: ./dlt/nyc_taxi_loader
environments:
development:
resources:
pipelines:
nyc_taxi_pipeline:
target: nyc_taxi_development
development: true
staging:
resources:
pipelines:
nyc_taxi_pipeline:
target: nyc_taxi_staging
development: false
production:
resources:
pipelines:
nyc_taxi_pipeline:
target: nyc_taxi_production
development: false
photon: true
jobs:
pipeline_schedule:
name: Daily refresh of production pipeline
schedule:
quartz_cron_expression: 6 6 11 * * ?
timezone_id: UTC
tasks:
- pipeline_task:
pipeline_id: "to be interpolated"

View File

@ -0,0 +1,49 @@
package config_tests
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestJobAndPipelineDevelopment(t *testing.T) {
root := loadEnvironment(t, "./job_and_pipeline", "development")
assert.Len(t, root.Resources.Jobs, 0)
assert.Len(t, root.Resources.Pipelines, 1)
p := root.Resources.Pipelines["nyc_taxi_pipeline"]
assert.True(t, p.Development)
require.Len(t, p.Libraries, 1)
assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path)
assert.Equal(t, "nyc_taxi_development", p.Target)
}
func TestJobAndPipelineStaging(t *testing.T) {
root := loadEnvironment(t, "./job_and_pipeline", "staging")
assert.Len(t, root.Resources.Jobs, 0)
assert.Len(t, root.Resources.Pipelines, 1)
p := root.Resources.Pipelines["nyc_taxi_pipeline"]
assert.False(t, p.Development)
require.Len(t, p.Libraries, 1)
assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path)
assert.Equal(t, "nyc_taxi_staging", p.Target)
}
func TestJobAndPipelineProduction(t *testing.T) {
root := loadEnvironment(t, "./job_and_pipeline", "production")
assert.Len(t, root.Resources.Jobs, 1)
assert.Len(t, root.Resources.Pipelines, 1)
p := root.Resources.Pipelines["nyc_taxi_pipeline"]
assert.False(t, p.Development)
require.Len(t, p.Libraries, 1)
assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path)
assert.Equal(t, "nyc_taxi_production", p.Target)
j := root.Resources.Jobs["pipeline_schedule"]
assert.Equal(t, "Daily refresh of production pipeline", j.Name)
require.Len(t, j.Tasks, 1)
assert.NotEmpty(t, j.Tasks[0].PipelineTask.PipelineId)
}