diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go new file mode 100644 index 00000000..7c159f61 --- /dev/null +++ b/bundle/config/validate/single_node_cluster.go @@ -0,0 +1,137 @@ +package validate + +import ( + "context" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/log" +) + +// Validates that any single node clusters defined in the bundle are correctly configured. +func SingleNodeCluster() bundle.ReadOnlyMutator { + return &singleNodeCluster{} +} + +type singleNodeCluster struct{} + +func (m *singleNodeCluster) Name() string { + return "validate:SingleNodeCluster" +} + +const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + ` + +const singleNodeWarningSummary = `Single node cluster is not correctly configured` + +func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { + // Check if the user has explicitly set the num_workers to 0. Skip the warning + // if that's not the case. + numWorkers, ok := v.Get("num_workers").AsInt() + if !ok || numWorkers > 0 { + return false + } + + // Convenient type that contains the common fields from compute.ClusterSpec and + // pipelines.PipelineCluster that we are interested in. + type ClusterConf struct { + SparkConf map[string]string `json:"spark_conf"` + CustomTags map[string]string `json:"custom_tags"` + PolicyId string `json:"policy_id"` + } + + conf := &ClusterConf{} + err := convert.ToTyped(conf, v) + if err != nil { + return false + } + + // If the policy id is set, we don't want to show the warning. This is because + // the user might have configured `spark_conf` and `custom_tags` correctly + // in their cluster policy. + if conf.PolicyId != "" { + return false + } + + profile, ok := conf.SparkConf["spark.databricks.cluster.profile"] + if !ok { + log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec") + return true + } + if profile != "singleNode" { + log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile) + return true + } + + master, ok := conf.SparkConf["spark.master"] + if !ok { + log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec") + return true + } + if !strings.HasPrefix(master, "local") { + log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master) + return true + } + + resourceClass, ok := conf.CustomTags["ResourceClass"] + if !ok { + log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec") + return true + } + if resourceClass != "SingleNode" { + log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass) + return true + } + + return false +} + +func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { + diags := diag.Diagnostics{} + + patterns := []dyn.Pattern{ + // Interactive clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()), + // Job clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Job task clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Job for each task clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")), + // Pipeline clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()), + } + + for _, p := range patterns { + _, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { + warning := diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: v.Locations(), + Paths: []dyn.Path{p}, + } + + if showSingleNodeClusterWarning(ctx, v) { + diags = append(diags, warning) + } + return v, nil + }) + if err != nil { + log.Debugf(ctx, "Error while applying single node cluster validation: %s", err) + } + } + return diags +} diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go new file mode 100644 index 00000000..18771cc0 --- /dev/null +++ b/bundle/config/validate/single_node_cluster_test.go @@ -0,0 +1,566 @@ +package validate + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" +) + +func failCases() []struct { + name string + sparkConf map[string]string + customTags map[string]string +} { + return []struct { + name string + sparkConf map[string]string + customTags map[string]string + }{ + { + name: "no tags or conf", + }, + { + name: "no tags", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + }, + { + name: "no conf", + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "invalid spark cluster profile", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "invalid spark.master", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "invalid tags", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + customTags: map[string]string{"ResourceClass": "invalid"}, + }, + { + name: "missing ResourceClass tag", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + customTags: map[string]string{"what": "ever"}, + }, + { + name: "missing spark.master", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "missing spark.databricks.cluster.profile", + sparkConf: map[string]string{ + "spark.master": "local[*]", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + } +} + +func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Clusters: map[string]*resources.Cluster{ + "foo": { + ClusterSpec: &compute.ClusterSpec{ + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0)) + }) + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")}, + }, + }, diags) + + }) + } +} + +func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + { + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")}, + }, + }, diags) + }) + } +} + +func passCases() []struct { + name string + numWorkers *int + sparkConf map[string]string + customTags map[string]string + policyId string +} { + zero := 0 + one := 1 + + return []struct { + name string + numWorkers *int + sparkConf map[string]string + customTags map[string]string + policyId string + }{ + { + name: "single node cluster", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + customTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + numWorkers: &zero, + }, + { + name: "num workers is not zero", + numWorkers: &one, + }, + { + name: "num workers is not set", + }, + { + name: "policy id is not empty", + policyId: "policy-abc", + numWorkers: &zero, + }, + } +} + +func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Clusters: map[string]*resources.Cluster{ + "foo": { + ClusterSpec: &compute.ClusterSpec{ + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + { + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} diff --git a/bundle/config/validate/validate.go b/bundle/config/validate/validate.go index 440477e6..eb4c3c3c 100644 --- a/bundle/config/validate/validate.go +++ b/bundle/config/validate/validate.go @@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics ValidateSyncPatterns(), JobTaskClusterSpec(), ValidateFolderPermissions(), + SingleNodeCluster(), )) }