mirror of https://github.com/databricks/cli.git
Add validation for single node clusters (#1909)
## Changes This PR adds a warning validating that the configuration for a single node cluster is valid for interactive, job, job-task, and pipeline clusters. Note: We skip the validation if a cluster policy is configured because the policy is likely to configure `spark_conf` / `custom_tags` itself. Note: Terrform originally only had validation for interactive, job, and job-task clusters. This PR adding the validation for pipeline clusters as well is new. This PR follows the same logic as we used to have in Terraform. The validation was removed from Terraform because we had no way to demote the error to a warning: https://github.com/databricks/terraform-provider-databricks/pull/4222 ### Background Single-node clusters require `spark_conf` and `custom_tags` to be correctly set in the cluster definition for them to function optimally. The cluster will be created even if incorrectly configured, but its performance will not be great. For example, if both `spark_conf` and `custom_tags` are not set and `num_workers` is 0, then only the driver process will be launched on the cluster compute instance thus leading to sub-optimal utilization of available compute resources and no parallelization across worker processes when processing a spark query. ### Issue This PR addresses some issues reported in https://github.com/databricks/cli/issues/1546 ## Tests Unit tests and manually. Example output of the warning: ``` ➜ bundle-playground git:(master) ✗ cli bundle validate Warning: Single node cluster is not correctly configured at resources.pipelines.bar.clusters[0] in databricks.yml:29:11 num_workers should be 0 only for single-node clusters. To create a valid single node cluster please ensure that the following properties are correctly set in the cluster specification: spark_conf: spark.databricks.cluster.profile: singleNode spark.master: local[*] custom_tags: ResourceClass: SingleNode Name: foobar Target: default Workspace: User: shreyas.goenka@databricks.com Path: /Workspace/Users/shreyas.goenka@databricks.com/.bundle/foobar/default Found 1 warning ```
This commit is contained in:
parent
490dd058aa
commit
b323703c1b
|
@ -0,0 +1,137 @@
|
|||
package validate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/dyn/convert"
|
||||
"github.com/databricks/cli/libs/log"
|
||||
)
|
||||
|
||||
// Validates that any single node clusters defined in the bundle are correctly configured.
|
||||
func SingleNodeCluster() bundle.ReadOnlyMutator {
|
||||
return &singleNodeCluster{}
|
||||
}
|
||||
|
||||
type singleNodeCluster struct{}
|
||||
|
||||
func (m *singleNodeCluster) Name() string {
|
||||
return "validate:SingleNodeCluster"
|
||||
}
|
||||
|
||||
const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a
|
||||
valid single node cluster please ensure that the following properties
|
||||
are correctly set in the cluster specification:
|
||||
|
||||
spark_conf:
|
||||
spark.databricks.cluster.profile: singleNode
|
||||
spark.master: local[*]
|
||||
|
||||
custom_tags:
|
||||
ResourceClass: SingleNode
|
||||
`
|
||||
|
||||
const singleNodeWarningSummary = `Single node cluster is not correctly configured`
|
||||
|
||||
func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool {
|
||||
// Check if the user has explicitly set the num_workers to 0. Skip the warning
|
||||
// if that's not the case.
|
||||
numWorkers, ok := v.Get("num_workers").AsInt()
|
||||
if !ok || numWorkers > 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Convenient type that contains the common fields from compute.ClusterSpec and
|
||||
// pipelines.PipelineCluster that we are interested in.
|
||||
type ClusterConf struct {
|
||||
SparkConf map[string]string `json:"spark_conf"`
|
||||
CustomTags map[string]string `json:"custom_tags"`
|
||||
PolicyId string `json:"policy_id"`
|
||||
}
|
||||
|
||||
conf := &ClusterConf{}
|
||||
err := convert.ToTyped(conf, v)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// If the policy id is set, we don't want to show the warning. This is because
|
||||
// the user might have configured `spark_conf` and `custom_tags` correctly
|
||||
// in their cluster policy.
|
||||
if conf.PolicyId != "" {
|
||||
return false
|
||||
}
|
||||
|
||||
profile, ok := conf.SparkConf["spark.databricks.cluster.profile"]
|
||||
if !ok {
|
||||
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec")
|
||||
return true
|
||||
}
|
||||
if profile != "singleNode" {
|
||||
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile)
|
||||
return true
|
||||
}
|
||||
|
||||
master, ok := conf.SparkConf["spark.master"]
|
||||
if !ok {
|
||||
log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec")
|
||||
return true
|
||||
}
|
||||
if !strings.HasPrefix(master, "local") {
|
||||
log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master)
|
||||
return true
|
||||
}
|
||||
|
||||
resourceClass, ok := conf.CustomTags["ResourceClass"]
|
||||
if !ok {
|
||||
log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec")
|
||||
return true
|
||||
}
|
||||
if resourceClass != "SingleNode" {
|
||||
log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
diags := diag.Diagnostics{}
|
||||
|
||||
patterns := []dyn.Pattern{
|
||||
// Interactive clusters
|
||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()),
|
||||
// Job clusters
|
||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")),
|
||||
// Job task clusters
|
||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")),
|
||||
// Job for each task clusters
|
||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")),
|
||||
// Pipeline clusters
|
||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()),
|
||||
}
|
||||
|
||||
for _, p := range patterns {
|
||||
_, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
||||
warning := diag.Diagnostic{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: v.Locations(),
|
||||
Paths: []dyn.Path{p},
|
||||
}
|
||||
|
||||
if showSingleNodeClusterWarning(ctx, v) {
|
||||
diags = append(diags, warning)
|
||||
}
|
||||
return v, nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Debugf(ctx, "Error while applying single node cluster validation: %s", err)
|
||||
}
|
||||
}
|
||||
return diags
|
||||
}
|
|
@ -0,0 +1,566 @@
|
|||
package validate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/cli/bundle/internal/bundletest"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func failCases() []struct {
|
||||
name string
|
||||
sparkConf map[string]string
|
||||
customTags map[string]string
|
||||
} {
|
||||
return []struct {
|
||||
name string
|
||||
sparkConf map[string]string
|
||||
customTags map[string]string
|
||||
}{
|
||||
{
|
||||
name: "no tags or conf",
|
||||
},
|
||||
{
|
||||
name: "no tags",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no conf",
|
||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||
},
|
||||
{
|
||||
name: "invalid spark cluster profile",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "invalid",
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||
},
|
||||
{
|
||||
name: "invalid spark.master",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
"spark.master": "invalid",
|
||||
},
|
||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||
},
|
||||
{
|
||||
name: "invalid tags",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
customTags: map[string]string{"ResourceClass": "invalid"},
|
||||
},
|
||||
{
|
||||
name: "missing ResourceClass tag",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
customTags: map[string]string{"what": "ever"},
|
||||
},
|
||||
{
|
||||
name: "missing spark.master",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
},
|
||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||
},
|
||||
{
|
||||
name: "missing spark.databricks.cluster.profile",
|
||||
sparkConf: map[string]string{
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range failCases() {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Clusters: map[string]*resources.Cluster{
|
||||
"foo": {
|
||||
ClusterSpec: &compute.ClusterSpec{
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}})
|
||||
|
||||
// We can't set num_workers to 0 explicitly in the typed configuration.
|
||||
// Do it on the dyn.Value directly.
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0))
|
||||
})
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}},
|
||||
Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))},
|
||||
},
|
||||
}, diags)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range failCases() {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"foo": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
JobClusters: []jobs.JobCluster{
|
||||
{
|
||||
NewCluster: compute.ClusterSpec{
|
||||
ClusterName: "my_cluster",
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}})
|
||||
|
||||
// We can't set num_workers to 0 explicitly in the typed configuration.
|
||||
// Do it on the dyn.Value directly.
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")},
|
||||
},
|
||||
}, diags)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range failCases() {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"foo": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Tasks: []jobs.Task{
|
||||
{
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
ClusterName: "my_cluster",
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}})
|
||||
|
||||
// We can't set num_workers to 0 explicitly in the typed configuration.
|
||||
// Do it on the dyn.Value directly.
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")},
|
||||
},
|
||||
}, diags)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range failCases() {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Pipelines: map[string]*resources.Pipeline{
|
||||
"foo": {
|
||||
PipelineSpec: &pipelines.PipelineSpec{
|
||||
Clusters: []pipelines.PipelineCluster{
|
||||
{
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}})
|
||||
|
||||
// We can't set num_workers to 0 explicitly in the typed configuration.
|
||||
// Do it on the dyn.Value directly.
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")},
|
||||
},
|
||||
}, diags)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range failCases() {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"foo": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Tasks: []jobs.Task{
|
||||
{
|
||||
ForEachTask: &jobs.ForEachTask{
|
||||
Task: jobs.Task{
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
ClusterName: "my_cluster",
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}})
|
||||
|
||||
// We can't set num_workers to 0 explicitly in the typed configuration.
|
||||
// Do it on the dyn.Value directly.
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")},
|
||||
},
|
||||
}, diags)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func passCases() []struct {
|
||||
name string
|
||||
numWorkers *int
|
||||
sparkConf map[string]string
|
||||
customTags map[string]string
|
||||
policyId string
|
||||
} {
|
||||
zero := 0
|
||||
one := 1
|
||||
|
||||
return []struct {
|
||||
name string
|
||||
numWorkers *int
|
||||
sparkConf map[string]string
|
||||
customTags map[string]string
|
||||
policyId string
|
||||
}{
|
||||
{
|
||||
name: "single node cluster",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
customTags: map[string]string{
|
||||
"ResourceClass": "SingleNode",
|
||||
},
|
||||
numWorkers: &zero,
|
||||
},
|
||||
{
|
||||
name: "num workers is not zero",
|
||||
numWorkers: &one,
|
||||
},
|
||||
{
|
||||
name: "num workers is not set",
|
||||
},
|
||||
{
|
||||
name: "policy id is not empty",
|
||||
policyId: "policy-abc",
|
||||
numWorkers: &zero,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range passCases() {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Clusters: map[string]*resources.Cluster{
|
||||
"foo": {
|
||||
ClusterSpec: &compute.ClusterSpec{
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
PolicyId: tc.policyId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if tc.numWorkers != nil {
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers))
|
||||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range passCases() {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"foo": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
JobClusters: []jobs.JobCluster{
|
||||
{
|
||||
NewCluster: compute.ClusterSpec{
|
||||
ClusterName: "my_cluster",
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
PolicyId: tc.policyId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if tc.numWorkers != nil {
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
|
||||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range passCases() {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"foo": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Tasks: []jobs.Task{
|
||||
{
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
ClusterName: "my_cluster",
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
PolicyId: tc.policyId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if tc.numWorkers != nil {
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
|
||||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range passCases() {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Pipelines: map[string]*resources.Pipeline{
|
||||
"foo": {
|
||||
PipelineSpec: &pipelines.PipelineSpec{
|
||||
Clusters: []pipelines.PipelineCluster{
|
||||
{
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
PolicyId: tc.policyId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if tc.numWorkers != nil {
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers))
|
||||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, tc := range passCases() {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"foo": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Tasks: []jobs.Task{
|
||||
{
|
||||
ForEachTask: &jobs.ForEachTask{
|
||||
Task: jobs.Task{
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
ClusterName: "my_cluster",
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
PolicyId: tc.policyId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if tc.numWorkers != nil {
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers))
|
||||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
|
|||
ValidateSyncPatterns(),
|
||||
JobTaskClusterSpec(),
|
||||
ValidateFolderPermissions(),
|
||||
SingleNodeCluster(),
|
||||
))
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue