mirror of https://github.com/databricks/cli.git
Enable environment overrides for job clusters (#658)
## Changes While they are a slice, we can identify a job cluster by its job cluster key. A job definition with multiple job clusters with the same key is always invalid. We can therefore merge definitions with the same key into one. This is compatible with how environment overrides are applied; merging a slice means appending to it. The override will end up in the job cluster slice of the original, which gives us a deterministic way to merge them. Since the alternative is an invalid configuration, this doesn't change behavior. ## Tests New test coverage.
This commit is contained in:
parent
6ea70c82a9
commit
97699b849f
|
@ -113,3 +113,14 @@ func (r *Resources) SetConfigFilePath(path string) {
|
|||
e.ConfigFilePath = path
|
||||
}
|
||||
}
|
||||
|
||||
// MergeJobClusters iterates over all jobs and merges their job clusters.
|
||||
// This is called after applying the environment overrides.
|
||||
func (r *Resources) MergeJobClusters() error {
|
||||
for _, job := range r.Jobs {
|
||||
if err := job.MergeJobClusters(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
package resources
|
||||
|
||||
import "github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
import (
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/imdario/mergo"
|
||||
)
|
||||
|
||||
type Job struct {
|
||||
ID string `json:"id,omitempty" bundle:"readonly"`
|
||||
|
@ -10,3 +13,36 @@ type Job struct {
|
|||
|
||||
*jobs.JobSettings
|
||||
}
|
||||
|
||||
// MergeJobClusters merges job clusters with the same key.
|
||||
// The job clusters field is a slice, and as such, overrides are appended to it.
|
||||
// We can identify a job cluster by its key, however, so we can use this key
|
||||
// to figure out which definitions are actually overrides and merge them.
|
||||
func (j *Job) MergeJobClusters() error {
|
||||
keys := make(map[string]*jobs.JobCluster)
|
||||
output := make([]jobs.JobCluster, 0, len(j.JobClusters))
|
||||
|
||||
// Environment overrides are always appended, so we can iterate in natural order to
|
||||
// first find the base definition, and merge instances we encounter later.
|
||||
for i := range j.JobClusters {
|
||||
key := j.JobClusters[i].JobClusterKey
|
||||
|
||||
// Register job cluster with key if not yet seen before.
|
||||
ref, ok := keys[key]
|
||||
if !ok {
|
||||
output = append(output, j.JobClusters[i])
|
||||
keys[key] = &j.JobClusters[i]
|
||||
continue
|
||||
}
|
||||
|
||||
// Merge this instance into the reference.
|
||||
err := mergo.Merge(ref, &j.JobClusters[i], mergo.WithOverride, mergo.WithAppendSlice)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Overwrite resulting slice.
|
||||
j.JobClusters = output
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
package resources
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestJobMergeJobClusters(t *testing.T) {
|
||||
j := &Job{
|
||||
JobSettings: &jobs.JobSettings{
|
||||
JobClusters: []jobs.JobCluster{
|
||||
{
|
||||
JobClusterKey: "foo",
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
SparkVersion: "13.3.x-scala2.12",
|
||||
NodeTypeId: "i3.xlarge",
|
||||
NumWorkers: 2,
|
||||
},
|
||||
},
|
||||
{
|
||||
JobClusterKey: "bar",
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
SparkVersion: "10.4.x-scala2.12",
|
||||
},
|
||||
},
|
||||
{
|
||||
JobClusterKey: "foo",
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
NodeTypeId: "i3.2xlarge",
|
||||
NumWorkers: 4,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := j.MergeJobClusters()
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Len(t, j.JobClusters, 2)
|
||||
assert.Equal(t, "foo", j.JobClusters[0].JobClusterKey)
|
||||
assert.Equal(t, "bar", j.JobClusters[1].JobClusterKey)
|
||||
|
||||
// This job cluster was merged with a subsequent one.
|
||||
jc0 := j.JobClusters[0].NewCluster
|
||||
assert.Equal(t, "13.3.x-scala2.12", jc0.SparkVersion)
|
||||
assert.Equal(t, "i3.2xlarge", jc0.NodeTypeId)
|
||||
assert.Equal(t, 4, jc0.NumWorkers)
|
||||
|
||||
// This job cluster was left untouched.
|
||||
jc1 := j.JobClusters[1].NewCluster
|
||||
assert.Equal(t, "10.4.x-scala2.12", jc1.SparkVersion)
|
||||
}
|
|
@ -203,6 +203,11 @@ func (r *Root) MergeEnvironment(env *Environment) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = r.Resources.MergeJobClusters()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if env.Variables != nil {
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
bundle:
|
||||
name: override_job_cluster
|
||||
|
||||
workspace:
|
||||
host: https://acme.cloud.databricks.com/
|
||||
|
||||
resources:
|
||||
jobs:
|
||||
foo:
|
||||
name: job
|
||||
job_clusters:
|
||||
- job_cluster_key: key
|
||||
new_cluster:
|
||||
spark_version: 13.3.x-scala2.12
|
||||
|
||||
environments:
|
||||
development:
|
||||
resources:
|
||||
jobs:
|
||||
foo:
|
||||
job_clusters:
|
||||
- job_cluster_key: key
|
||||
new_cluster:
|
||||
node_type_id: i3.xlarge
|
||||
num_workers: 1
|
||||
|
||||
staging:
|
||||
resources:
|
||||
jobs:
|
||||
foo:
|
||||
job_clusters:
|
||||
- job_cluster_key: key
|
||||
new_cluster:
|
||||
node_type_id: i3.2xlarge
|
||||
num_workers: 4
|
|
@ -0,0 +1,29 @@
|
|||
package config_tests
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestOverrideJobClusterDev(t *testing.T) {
|
||||
b := loadEnvironment(t, "./override_job_cluster", "development")
|
||||
assert.Equal(t, "job", b.Config.Resources.Jobs["foo"].Name)
|
||||
assert.Len(t, b.Config.Resources.Jobs["foo"].JobClusters, 1)
|
||||
|
||||
c := b.Config.Resources.Jobs["foo"].JobClusters[0]
|
||||
assert.Equal(t, "13.3.x-scala2.12", c.NewCluster.SparkVersion)
|
||||
assert.Equal(t, "i3.xlarge", c.NewCluster.NodeTypeId)
|
||||
assert.Equal(t, 1, c.NewCluster.NumWorkers)
|
||||
}
|
||||
|
||||
func TestOverrideJobClusterStaging(t *testing.T) {
|
||||
b := loadEnvironment(t, "./override_job_cluster", "staging")
|
||||
assert.Equal(t, "job", b.Config.Resources.Jobs["foo"].Name)
|
||||
assert.Len(t, b.Config.Resources.Jobs["foo"].JobClusters, 1)
|
||||
|
||||
c := b.Config.Resources.Jobs["foo"].JobClusters[0]
|
||||
assert.Equal(t, "13.3.x-scala2.12", c.NewCluster.SparkVersion)
|
||||
assert.Equal(t, "i3.2xlarge", c.NewCluster.NodeTypeId)
|
||||
assert.Equal(t, 4, c.NewCluster.NumWorkers)
|
||||
}
|
Loading…
Reference in New Issue