address comments

This commit is contained in:
Shreyas Goenka 2024-11-19 23:06:43 +01:00
parent df0a98066a
commit 96a0a3ec27
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
2 changed files with 307 additions and 376 deletions

View File

@ -2,14 +2,13 @@ package validate
import ( import (
"context" "context"
"fmt"
"strings" "strings"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/databricks/cli/libs/log"
) )
// Validates that any single node clusters defined in the bundle are correctly configured. // Validates that any single node clusters defined in the bundle are correctly configured.
@ -37,138 +36,100 @@ are correctly set in the cluster specification:
const singleNodeWarningSummary = `Single node cluster is not correctly configured` const singleNodeWarningSummary = `Single node cluster is not correctly configured`
func validateSingleNodeCluster(spec *compute.ClusterSpec, l []dyn.Location, p dyn.Path) *diag.Diagnostic { func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool {
if spec == nil { // Check if the user has explicitly set the num_workers to 0. Skip the warning
return nil // if that's not the case.
numWorkers, ok := v.Get("num_workers").AsInt()
if !ok || numWorkers > 0 {
return false
} }
if spec.NumWorkers > 0 || spec.Autoscale != nil { // Convenient type that contains the common fields from compute.ClusterSpec and
return nil // pipelines.PipelineCluster that we are interested in.
type ClusterConf struct {
SparkConf map[string]string `json:"spark_conf"`
CustomTags map[string]string `json:"custom_tags"`
PolicyId string `json:"policy_id"`
} }
if spec.PolicyId != "" { conf := &ClusterConf{}
return nil err := convert.ToTyped(conf, v)
if err != nil {
return false
} }
invalidSingleNodeWarning := &diag.Diagnostic{ // If the policy id is set, we don't want to show the warning. This is because
Severity: diag.Warning, // the user might have configured `spark_conf` and `custom_tags` correctly
Summary: singleNodeWarningSummary, // in their cluster policy.
Detail: singleNodeWarningDetail, if conf.PolicyId != "" {
Locations: l, return false
Paths: []dyn.Path{p},
} }
profile, ok := spec.SparkConf["spark.databricks.cluster.profile"]
profile, ok := conf.SparkConf["spark.databricks.cluster.profile"]
if !ok { if !ok {
return invalidSingleNodeWarning log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec")
return true
} }
master, ok := spec.SparkConf["spark.master"] if profile != "singleNode" {
log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec")
return true
}
master, ok := conf.SparkConf["spark.master"]
if !ok { if !ok {
return invalidSingleNodeWarning log.Warnf(ctx, "spark_conf spark.master not found in single-node cluster spec")
return true
} }
resourceClass, ok := spec.CustomTags["ResourceClass"] if !strings.HasPrefix(master, "local") {
log.Warnf(ctx, "spark_conf spark.master is not local in single-node cluster spec")
return true
}
resourceClass, ok := conf.CustomTags["ResourceClass"]
if !ok { if !ok {
return invalidSingleNodeWarning log.Warnf(ctx, "custom_tag ResourceClass not found in single-node cluster spec")
return true
}
if resourceClass != "SingleNode" {
log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec")
return true
} }
if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { return false
return nil
}
return invalidSingleNodeWarning
}
func validateSingleNodePipelineCluster(spec pipelines.PipelineCluster, l []dyn.Location, p dyn.Path) *diag.Diagnostic {
if spec.NumWorkers > 0 || spec.Autoscale != nil {
return nil
}
if spec.PolicyId != "" {
return nil
}
invalidSingleNodeWarning := &diag.Diagnostic{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: l,
Paths: []dyn.Path{p},
}
profile, ok := spec.SparkConf["spark.databricks.cluster.profile"]
if !ok {
return invalidSingleNodeWarning
}
master, ok := spec.SparkConf["spark.master"]
if !ok {
return invalidSingleNodeWarning
}
resourceClass, ok := spec.CustomTags["ResourceClass"]
if !ok {
return invalidSingleNodeWarning
}
if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" {
return nil
}
return invalidSingleNodeWarning
} }
func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{} diags := diag.Diagnostics{}
// Interactive clusters patterns := []dyn.Pattern{
for k, r := range rb.Config().Resources.Clusters { // Interactive clusters
p := dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key(k)) dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()),
l := rb.Config().GetLocations("resources.clusters." + k) // Job clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")),
d := validateSingleNodeCluster(r.ClusterSpec, l, p) // Job task clusters
if d != nil { dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")),
diags = append(diags, *d) // Pipeline clusters
} dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()),
} }
// Job clusters for _, p := range patterns {
for jobK, jobV := range rb.Config().Resources.Jobs { _, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
for i, clusterV := range jobV.JobSettings.JobClusters { warning := diag.Diagnostic{
p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("job_clusters"), dyn.Index(i)) Severity: diag.Warning,
l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.job_clusters[%d]", jobK, i)) Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
d := validateSingleNodeCluster(&clusterV.NewCluster, l, p) Locations: v.Locations(),
if d != nil { Paths: []dyn.Path{p},
diags = append(diags, *d)
} }
if showSingleNodeClusterWarning(ctx, v) {
diags = append(diags, warning)
}
return v, nil
})
if err != nil {
log.Debugf(ctx, "Error while applying single node cluster validation: %s", err)
} }
} }
// Job task clusters
for jobK, jobV := range rb.Config().Resources.Jobs {
for i, taskV := range jobV.JobSettings.Tasks {
if taskV.NewCluster == nil {
continue
}
p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("tasks"), dyn.Index(i), dyn.Key("new_cluster"))
l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.tasks[%d].new_cluster", jobK, i))
d := validateSingleNodeCluster(taskV.NewCluster, l, p)
if d != nil {
diags = append(diags, *d)
}
}
}
// Pipeline clusters
for pipelineK, pipelineV := range rb.Config().Resources.Pipelines {
for i, clusterV := range pipelineV.PipelineSpec.Clusters {
p := dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(pipelineK), dyn.Key("clusters"), dyn.Index(i))
l := rb.Config().GetLocations(fmt.Sprintf("resources.pipelines.%s.clusters[%d]", pipelineK, i))
d := validateSingleNodePipelineCluster(clusterV, l, p)
if d != nil {
diags = append(diags, *d)
}
}
}
return diags return diags
} }

View File

@ -18,73 +18,75 @@ import (
func TestValidateSingleNodeClusterFail(t *testing.T) { func TestValidateSingleNodeClusterFail(t *testing.T) {
failCases := []struct { failCases := []struct {
name string name string
spec *compute.ClusterSpec sparkConf map[string]string
customTags map[string]string
}{ }{
{ {
name: "no tags or conf", name: "no tags or conf",
spec: &compute.ClusterSpec{
ClusterName: "foo",
},
}, },
{ {
name: "no tags", name: "no tags",
spec: &compute.ClusterSpec{ sparkConf: map[string]string{
SparkConf: map[string]string{ "spark.databricks.cluster.profile": "singleNode",
"spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]",
"spark.master": "local[*]",
},
}, },
}, },
{ {
name: "no conf", name: "no conf",
spec: &compute.ClusterSpec{ customTags: map[string]string{"ResourceClass": "SingleNode"},
CustomTags: map[string]string{
"ResourceClass": "SingleNode",
},
},
}, },
{ {
name: "invalid spark cluster profile", name: "invalid spark cluster profile",
spec: &compute.ClusterSpec{ sparkConf: map[string]string{
SparkConf: map[string]string{ "spark.databricks.cluster.profile": "invalid",
"spark.databricks.cluster.profile": "invalid", "spark.master": "local[*]",
"spark.master": "local[*]",
},
CustomTags: map[string]string{
"ResourceClass": "SingleNode",
},
}, },
customTags: map[string]string{"ResourceClass": "SingleNode"},
}, },
{ {
name: "invalid spark.master", name: "invalid spark.master",
spec: &compute.ClusterSpec{ sparkConf: map[string]string{
SparkConf: map[string]string{ "spark.databricks.cluster.profile": "singleNode",
"spark.databricks.cluster.profile": "singleNode", "spark.master": "invalid",
"spark.master": "invalid",
},
CustomTags: map[string]string{
"ResourceClass": "SingleNode",
},
}, },
customTags: map[string]string{"ResourceClass": "SingleNode"},
}, },
{ {
name: "invalid tags", name: "invalid tags",
spec: &compute.ClusterSpec{ sparkConf: map[string]string{
SparkConf: map[string]string{ "spark.databricks.cluster.profile": "singleNode",
"spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]",
"spark.master": "local[*]",
},
CustomTags: map[string]string{
"ResourceClass": "invalid",
},
}, },
customTags: map[string]string{"ResourceClass": "invalid"},
},
{
name: "missing ResourceClass tag",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
customTags: map[string]string{"what": "ever"},
},
{
name: "missing spark.master",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "missing spark.databricks.cluster.profile",
sparkConf: map[string]string{
"spark.master": "local[*]",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
}, },
} }
ctx := context.Background() ctx := context.Background()
// Test interactive clusters. // Interactive clusters.
for _, tc := range failCases { for _, tc := range failCases {
t.Run("interactive_"+tc.name, func(t *testing.T) { t.Run("interactive_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
@ -92,7 +94,10 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Clusters: map[string]*resources.Cluster{ Clusters: map[string]*resources.Cluster{
"foo": { "foo": {
ClusterSpec: tc.spec, ClusterSpec: &compute.ClusterSpec{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
}, },
}, },
}, },
@ -101,6 +106,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}) bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{ assert.Equal(t, diag.Diagnostics{
{ {
@ -114,7 +124,7 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
}) })
} }
// Test new job clusters. // Job clusters.
for _, tc := range failCases { for _, tc := range failCases {
t.Run("job_"+tc.name, func(t *testing.T) { t.Run("job_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
@ -125,7 +135,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
JobSettings: &jobs.JobSettings{ JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{ JobClusters: []jobs.JobCluster{
{ {
NewCluster: *tc.spec, NewCluster: compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
}, },
}, },
}, },
@ -135,7 +149,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
}, },
} }
bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0]", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{ assert.Equal(t, diag.Diagnostics{
@ -144,14 +164,14 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
Summary: singleNodeWarningSummary, Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail, Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}, Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0]")}, Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")},
}, },
}, diags) }, diags)
}) })
} }
// Test job task clusters. // Job task clusters.
for _, tc := range failCases { for _, tc := range failCases {
t.Run("task_"+tc.name, func(t *testing.T) { t.Run("task_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
@ -162,7 +182,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
JobSettings: &jobs.JobSettings{ JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{ Tasks: []jobs.Task{
{ {
NewCluster: tc.spec, NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
}, },
}, },
}, },
@ -172,7 +196,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
}, },
} }
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0]", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{ assert.Equal(t, diag.Diagnostics{
@ -186,192 +216,10 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
}, diags) }, diags)
}) })
} }
}
func TestValidateSingleNodeClusterPass(t *testing.T) {
passCases := []struct {
name string
spec *compute.ClusterSpec
}{
{
name: "single node cluster",
spec: &compute.ClusterSpec{
SparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
CustomTags: map[string]string{
"ResourceClass": "SingleNode",
},
},
},
{
name: "num workers is not zero",
spec: &compute.ClusterSpec{
NumWorkers: 1,
},
},
{
name: "autoscale is not nil",
spec: &compute.ClusterSpec{
Autoscale: &compute.AutoScale{
MinWorkers: 1,
},
},
},
{
name: "policy id is not empty",
spec: &compute.ClusterSpec{
PolicyId: "policy-abc",
},
},
}
ctx := context.Background()
// Test interactive clusters.
for _, tc := range passCases {
t.Run("interactive_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Clusters: map[string]*resources.Cluster{
"foo": {
ClusterSpec: tc.spec,
},
},
},
},
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
// Test new job clusters.
for _, tc := range passCases {
t.Run("job_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{
{
NewCluster: *tc.spec,
},
},
},
},
},
},
},
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
// Test job task clusters.
for _, tc := range passCases {
t.Run("task_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
NewCluster: tc.spec,
},
},
},
},
},
},
},
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodePipelineClustersFail(t *testing.T) {
failCases := []struct {
name string
spec pipelines.PipelineCluster
}{
{
name: "no tags or conf",
spec: pipelines.PipelineCluster{
DriverInstancePoolId: "abcd",
},
},
{
name: "no tags",
spec: pipelines.PipelineCluster{
SparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
},
},
{
name: "no conf",
spec: pipelines.PipelineCluster{
CustomTags: map[string]string{
"ResourceClass": "SingleNode",
},
},
},
{
name: "invalid spark cluster profile",
spec: pipelines.PipelineCluster{
SparkConf: map[string]string{
"spark.databricks.cluster.profile": "invalid",
"spark.master": "local[*]",
},
CustomTags: map[string]string{
"ResourceClass": "SingleNode",
},
},
},
{
name: "invalid spark.master",
spec: pipelines.PipelineCluster{
SparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "invalid",
},
CustomTags: map[string]string{
"ResourceClass": "SingleNode",
},
},
},
{
name: "invalid tags",
spec: pipelines.PipelineCluster{
SparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
CustomTags: map[string]string{
"ResourceClass": "invalid",
},
},
},
}
ctx := context.Background()
// Pipeline clusters.
for _, tc := range failCases { for _, tc := range failCases {
t.Run(tc.name, func(t *testing.T) { t.Run("pipeline_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
@ -379,7 +227,10 @@ func TestValidateSingleNodePipelineClustersFail(t *testing.T) {
"foo": { "foo": {
PipelineSpec: &pipelines.PipelineSpec{ PipelineSpec: &pipelines.PipelineSpec{
Clusters: []pipelines.PipelineCluster{ Clusters: []pipelines.PipelineCluster{
tc.spec, {
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
}, },
}, },
}, },
@ -390,6 +241,12 @@ func TestValidateSingleNodePipelineClustersFail(t *testing.T) {
bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{ assert.Equal(t, diag.Diagnostics{
{ {
@ -402,59 +259,95 @@ func TestValidateSingleNodePipelineClustersFail(t *testing.T) {
}, diags) }, diags)
}) })
} }
} }
func TestValidateSingleNodePipelineClustersPass(t *testing.T) { func TestValidateSingleNodeClusterPass(t *testing.T) {
zero := 0
one := 1
passCases := []struct { passCases := []struct {
name string name string
spec pipelines.PipelineCluster numWorkers *int
sparkConf map[string]string
customTags map[string]string
policyId string
}{ }{
{ {
name: "single node cluster", name: "single node cluster",
spec: pipelines.PipelineCluster{ sparkConf: map[string]string{
SparkConf: map[string]string{ "spark.databricks.cluster.profile": "singleNode",
"spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]",
"spark.master": "local[*]",
},
CustomTags: map[string]string{
"ResourceClass": "SingleNode",
},
}, },
customTags: map[string]string{
"ResourceClass": "SingleNode",
},
numWorkers: &zero,
}, },
{ {
name: "num workers is not zero", name: "num workers is not zero",
spec: pipelines.PipelineCluster{ numWorkers: &one,
NumWorkers: 1,
},
}, },
{ {
name: "autoscale is not nil", name: "num workers is not set",
spec: pipelines.PipelineCluster{
Autoscale: &pipelines.PipelineClusterAutoscale{
MaxWorkers: 3,
},
},
}, },
{ {
name: "policy id is not empty", name: "policy id is not empty",
spec: pipelines.PipelineCluster{ policyId: "policy-abc",
PolicyId: "policy-abc", numWorkers: &zero,
},
}, },
} }
ctx := context.Background() ctx := context.Background()
// Interactive clusters.
for _, tc := range passCases { for _, tc := range passCases {
t.Run(tc.name, func(t *testing.T) { t.Run("interactive_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{ Clusters: map[string]*resources.Cluster{
"foo": { "foo": {
PipelineSpec: &pipelines.PipelineSpec{ ClusterSpec: &compute.ClusterSpec{
Clusters: []pipelines.PipelineCluster{ SparkConf: tc.sparkConf,
tc.spec, CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
// Job clusters.
for _, tc := range passCases {
t.Run("job_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{
{
NewCluster: compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
}, },
}, },
}, },
@ -463,6 +356,83 @@ func TestValidateSingleNodePipelineClustersPass(t *testing.T) {
}, },
} }
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
// Job task clusters.
for _, tc := range passCases {
t.Run("task_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
// Pipeline clusters.
for _, tc := range passCases {
t.Run("pipeline_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"foo": {
PipelineSpec: &pipelines.PipelineSpec{
Clusters: []pipelines.PipelineCluster{
{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags) assert.Empty(t, diags)
}) })