diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go index f5b8befac..e1104f116 100644 --- a/bundle/config/validate/single_node_cluster.go +++ b/bundle/config/validate/single_node_cluster.go @@ -2,14 +2,13 @@ package validate import ( "context" - "fmt" "strings" "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" - "github.com/databricks/databricks-sdk-go/service/compute" - "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/log" ) // Validates that any single node clusters defined in the bundle are correctly configured. @@ -37,138 +36,100 @@ are correctly set in the cluster specification: const singleNodeWarningSummary = `Single node cluster is not correctly configured` -func validateSingleNodeCluster(spec *compute.ClusterSpec, l []dyn.Location, p dyn.Path) *diag.Diagnostic { - if spec == nil { - return nil +func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { + // Check if the user has explicitly set the num_workers to 0. Skip the warning + // if that's not the case. + numWorkers, ok := v.Get("num_workers").AsInt() + if !ok || numWorkers > 0 { + return false } - if spec.NumWorkers > 0 || spec.Autoscale != nil { - return nil + // Convenient type that contains the common fields from compute.ClusterSpec and + // pipelines.PipelineCluster that we are interested in. + type ClusterConf struct { + SparkConf map[string]string `json:"spark_conf"` + CustomTags map[string]string `json:"custom_tags"` + PolicyId string `json:"policy_id"` } - if spec.PolicyId != "" { - return nil + conf := &ClusterConf{} + err := convert.ToTyped(conf, v) + if err != nil { + return false } - invalidSingleNodeWarning := &diag.Diagnostic{ - Severity: diag.Warning, - Summary: singleNodeWarningSummary, - Detail: singleNodeWarningDetail, - Locations: l, - Paths: []dyn.Path{p}, + // If the policy id is set, we don't want to show the warning. This is because + // the user might have configured `spark_conf` and `custom_tags` correctly + // in their cluster policy. + if conf.PolicyId != "" { + return false } - profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] + + profile, ok := conf.SparkConf["spark.databricks.cluster.profile"] if !ok { - return invalidSingleNodeWarning + log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec") + return true } - master, ok := spec.SparkConf["spark.master"] + if profile != "singleNode" { + log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec") + return true + } + + master, ok := conf.SparkConf["spark.master"] if !ok { - return invalidSingleNodeWarning + log.Warnf(ctx, "spark_conf spark.master not found in single-node cluster spec") + return true } - resourceClass, ok := spec.CustomTags["ResourceClass"] + if !strings.HasPrefix(master, "local") { + log.Warnf(ctx, "spark_conf spark.master is not local in single-node cluster spec") + return true + } + + resourceClass, ok := conf.CustomTags["ResourceClass"] if !ok { - return invalidSingleNodeWarning + log.Warnf(ctx, "custom_tag ResourceClass not found in single-node cluster spec") + return true + } + if resourceClass != "SingleNode" { + log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec") + return true } - if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { - return nil - } - - return invalidSingleNodeWarning -} - -func validateSingleNodePipelineCluster(spec pipelines.PipelineCluster, l []dyn.Location, p dyn.Path) *diag.Diagnostic { - if spec.NumWorkers > 0 || spec.Autoscale != nil { - return nil - } - - if spec.PolicyId != "" { - return nil - } - - invalidSingleNodeWarning := &diag.Diagnostic{ - Severity: diag.Warning, - Summary: singleNodeWarningSummary, - Detail: singleNodeWarningDetail, - Locations: l, - Paths: []dyn.Path{p}, - } - profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] - if !ok { - return invalidSingleNodeWarning - } - master, ok := spec.SparkConf["spark.master"] - if !ok { - return invalidSingleNodeWarning - } - resourceClass, ok := spec.CustomTags["ResourceClass"] - if !ok { - return invalidSingleNodeWarning - } - - if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { - return nil - } - - return invalidSingleNodeWarning + return false } func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { diags := diag.Diagnostics{} - // Interactive clusters - for k, r := range rb.Config().Resources.Clusters { - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key(k)) - l := rb.Config().GetLocations("resources.clusters." + k) - - d := validateSingleNodeCluster(r.ClusterSpec, l, p) - if d != nil { - diags = append(diags, *d) - } + patterns := []dyn.Pattern{ + // Interactive clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()), + // Job clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Job task clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Pipeline clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()), } - // Job clusters - for jobK, jobV := range rb.Config().Resources.Jobs { - for i, clusterV := range jobV.JobSettings.JobClusters { - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("job_clusters"), dyn.Index(i)) - l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.job_clusters[%d]", jobK, i)) - - d := validateSingleNodeCluster(&clusterV.NewCluster, l, p) - if d != nil { - diags = append(diags, *d) + for _, p := range patterns { + _, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { + warning := diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: v.Locations(), + Paths: []dyn.Path{p}, } + + if showSingleNodeClusterWarning(ctx, v) { + diags = append(diags, warning) + } + return v, nil + }) + if err != nil { + log.Debugf(ctx, "Error while applying single node cluster validation: %s", err) } } - - // Job task clusters - for jobK, jobV := range rb.Config().Resources.Jobs { - for i, taskV := range jobV.JobSettings.Tasks { - if taskV.NewCluster == nil { - continue - } - - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("tasks"), dyn.Index(i), dyn.Key("new_cluster")) - l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.tasks[%d].new_cluster", jobK, i)) - - d := validateSingleNodeCluster(taskV.NewCluster, l, p) - if d != nil { - diags = append(diags, *d) - } - } - } - - // Pipeline clusters - for pipelineK, pipelineV := range rb.Config().Resources.Pipelines { - for i, clusterV := range pipelineV.PipelineSpec.Clusters { - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(pipelineK), dyn.Key("clusters"), dyn.Index(i)) - l := rb.Config().GetLocations(fmt.Sprintf("resources.pipelines.%s.clusters[%d]", pipelineK, i)) - - d := validateSingleNodePipelineCluster(clusterV, l, p) - if d != nil { - diags = append(diags, *d) - } - } - } - return diags } diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go index d9fea6cfd..9f76c968d 100644 --- a/bundle/config/validate/single_node_cluster_test.go +++ b/bundle/config/validate/single_node_cluster_test.go @@ -18,73 +18,75 @@ import ( func TestValidateSingleNodeClusterFail(t *testing.T) { failCases := []struct { - name string - spec *compute.ClusterSpec + name string + sparkConf map[string]string + customTags map[string]string }{ { name: "no tags or conf", - spec: &compute.ClusterSpec{ - ClusterName: "foo", - }, }, { name: "no tags", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", }, }, { - name: "no conf", - spec: &compute.ClusterSpec{ - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, + name: "no conf", + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid spark cluster profile", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "invalid", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid spark.master", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "invalid", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid tags", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "invalid", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", }, + customTags: map[string]string{"ResourceClass": "invalid"}, + }, + { + name: "missing ResourceClass tag", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + customTags: map[string]string{"what": "ever"}, + }, + { + name: "missing spark.master", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "missing spark.databricks.cluster.profile", + sparkConf: map[string]string{ + "spark.master": "local[*]", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, } ctx := context.Background() - // Test interactive clusters. + // Interactive clusters. for _, tc := range failCases { t.Run("interactive_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -92,7 +94,10 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { Resources: config.Resources{ Clusters: map[string]*resources.Cluster{ "foo": { - ClusterSpec: tc.spec, + ClusterSpec: &compute.ClusterSpec{ + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -101,6 +106,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}) + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0)) + }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { @@ -114,7 +124,7 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }) } - // Test new job clusters. + // Job clusters. for _, tc := range failCases { t.Run("job_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -125,7 +135,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { JobSettings: &jobs.JobSettings{ JobClusters: []jobs.JobCluster{ { - NewCluster: *tc.spec, + NewCluster: compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -135,7 +149,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, } - bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0]", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0)) + }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ @@ -144,14 +164,14 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { Summary: singleNodeWarningSummary, Detail: singleNodeWarningDetail, Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}, - Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0]")}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")}, }, }, diags) }) } - // Test job task clusters. + // Job task clusters. for _, tc := range failCases { t.Run("task_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -162,7 +182,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { JobSettings: &jobs.JobSettings{ Tasks: []jobs.Task{ { - NewCluster: tc.spec, + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -172,7 +196,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, } - bundletest.SetLocation(b, "resources.jobs.foo.tasks[0]", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0)) + }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ @@ -186,192 +216,10 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, diags) }) } -} - -func TestValidateSingleNodeClusterPass(t *testing.T) { - passCases := []struct { - name string - spec *compute.ClusterSpec - }{ - { - name: "single node cluster", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "num workers is not zero", - spec: &compute.ClusterSpec{ - NumWorkers: 1, - }, - }, - { - name: "autoscale is not nil", - spec: &compute.ClusterSpec{ - Autoscale: &compute.AutoScale{ - MinWorkers: 1, - }, - }, - }, - { - name: "policy id is not empty", - spec: &compute.ClusterSpec{ - PolicyId: "policy-abc", - }, - }, - } - - ctx := context.Background() - - // Test interactive clusters. - for _, tc := range passCases { - t.Run("interactive_"+tc.name, func(t *testing.T) { - b := &bundle.Bundle{ - Config: config.Root{ - Resources: config.Resources{ - Clusters: map[string]*resources.Cluster{ - "foo": { - ClusterSpec: tc.spec, - }, - }, - }, - }, - } - - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) - assert.Empty(t, diags) - }) - } - - // Test new job clusters. - for _, tc := range passCases { - t.Run("job_"+tc.name, func(t *testing.T) { - b := &bundle.Bundle{ - Config: config.Root{ - Resources: config.Resources{ - Jobs: map[string]*resources.Job{ - "foo": { - JobSettings: &jobs.JobSettings{ - JobClusters: []jobs.JobCluster{ - { - NewCluster: *tc.spec, - }, - }, - }, - }, - }, - }, - }, - } - - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) - assert.Empty(t, diags) - }) - } - - // Test job task clusters. - for _, tc := range passCases { - t.Run("task_"+tc.name, func(t *testing.T) { - b := &bundle.Bundle{ - Config: config.Root{ - Resources: config.Resources{ - Jobs: map[string]*resources.Job{ - "foo": { - JobSettings: &jobs.JobSettings{ - Tasks: []jobs.Task{ - { - NewCluster: tc.spec, - }, - }, - }, - }, - }, - }, - }, - } - - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) - assert.Empty(t, diags) - }) - } -} - -func TestValidateSingleNodePipelineClustersFail(t *testing.T) { - failCases := []struct { - name string - spec pipelines.PipelineCluster - }{ - { - name: "no tags or conf", - spec: pipelines.PipelineCluster{ - DriverInstancePoolId: "abcd", - }, - }, - { - name: "no tags", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - }, - }, - { - name: "no conf", - spec: pipelines.PipelineCluster{ - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "invalid spark cluster profile", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "invalid", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "invalid spark.master", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "invalid", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "invalid tags", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "invalid", - }, - }, - }, - } - - ctx := context.Background() + // Pipeline clusters. for _, tc := range failCases { - t.Run(tc.name, func(t *testing.T) { + t.Run("pipeline_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -379,7 +227,10 @@ func TestValidateSingleNodePipelineClustersFail(t *testing.T) { "foo": { PipelineSpec: &pipelines.PipelineSpec{ Clusters: []pipelines.PipelineCluster{ - tc.spec, + { + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -390,6 +241,12 @@ func TestValidateSingleNodePipelineClustersFail(t *testing.T) { bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0)) + }) + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { @@ -402,59 +259,95 @@ func TestValidateSingleNodePipelineClustersFail(t *testing.T) { }, diags) }) } + } -func TestValidateSingleNodePipelineClustersPass(t *testing.T) { +func TestValidateSingleNodeClusterPass(t *testing.T) { + zero := 0 + one := 1 + passCases := []struct { - name string - spec pipelines.PipelineCluster + name string + numWorkers *int + sparkConf map[string]string + customTags map[string]string + policyId string }{ { name: "single node cluster", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", }, + customTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + numWorkers: &zero, }, { - name: "num workers is not zero", - spec: pipelines.PipelineCluster{ - NumWorkers: 1, - }, + name: "num workers is not zero", + numWorkers: &one, }, { - name: "autoscale is not nil", - spec: pipelines.PipelineCluster{ - Autoscale: &pipelines.PipelineClusterAutoscale{ - MaxWorkers: 3, - }, - }, + name: "num workers is not set", }, { - name: "policy id is not empty", - spec: pipelines.PipelineCluster{ - PolicyId: "policy-abc", - }, + name: "policy id is not empty", + policyId: "policy-abc", + numWorkers: &zero, }, } ctx := context.Background() + // Interactive clusters. for _, tc := range passCases { - t.Run(tc.name, func(t *testing.T) { + t.Run("interactive_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ - Pipelines: map[string]*resources.Pipeline{ + Clusters: map[string]*resources.Cluster{ "foo": { - PipelineSpec: &pipelines.PipelineSpec{ - Clusters: []pipelines.PipelineCluster{ - tc.spec, + ClusterSpec: &compute.ClusterSpec{ + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Job clusters. + for _, tc := range passCases { + t.Run("job_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, }, }, }, @@ -463,6 +356,83 @@ func TestValidateSingleNodePipelineClustersPass(t *testing.T) { }, } + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Job task clusters. + for _, tc := range passCases { + t.Run("task_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Pipeline clusters. + for _, tc := range passCases { + t.Run("pipeline_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + { + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers)) + }) + } + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Empty(t, diags) })