diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go index 8e0ad6da..7c159f61 100644 --- a/bundle/config/validate/single_node_cluster.go +++ b/bundle/config/validate/single_node_cluster.go @@ -108,6 +108,8 @@ func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")), // Job task clusters dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Job for each task clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")), // Pipeline clusters dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()), } diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go index 4fff3f8a..18771cc0 100644 --- a/bundle/config/validate/single_node_cluster_test.go +++ b/bundle/config/validate/single_node_cluster_test.go @@ -93,7 +93,7 @@ func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { - t.Run("interactive_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -134,7 +134,7 @@ func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { - t.Run("job_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -184,7 +184,7 @@ func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { - t.Run("task_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -233,7 +233,7 @@ func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { - t.Run("pipeline_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -275,6 +275,59 @@ func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) { } } +func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")}, + }, + }, diags) + }) + } +} + func passCases() []struct { name string numWorkers *int @@ -322,7 +375,7 @@ func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { - t.Run("interactive_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -355,7 +408,7 @@ func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { - t.Run("job_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -395,7 +448,7 @@ func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { - t.Run("task_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -435,7 +488,7 @@ func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { - t.Run("pipeline_"+tc.name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -467,3 +520,47 @@ func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) { }) } } + +func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +}