From df0a98066acb8346b5902eb6a80224160b45feff Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 18 Nov 2024 15:51:58 +0100 Subject: [PATCH 1/3] Add validation for single node clusters --- bundle/config/validate/single_node_cluster.go | 174 +++++++ .../validate/single_node_cluster_test.go | 470 ++++++++++++++++++ bundle/config/validate/validate.go | 1 + 3 files changed, 645 insertions(+) create mode 100644 bundle/config/validate/single_node_cluster.go create mode 100644 bundle/config/validate/single_node_cluster_test.go diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go new file mode 100644 index 00000000..f5b8befa --- /dev/null +++ b/bundle/config/validate/single_node_cluster.go @@ -0,0 +1,174 @@ +package validate + +import ( + "context" + "fmt" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/pipelines" +) + +// Validates that any single node clusters defined in the bundle are correctly configured. +func SingleNodeCluster() bundle.ReadOnlyMutator { + return &singleNodeCluster{} +} + +type singleNodeCluster struct{} + +func (m *singleNodeCluster) Name() string { + return "validate:SingleNodeCluster" +} + +const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + ` + +const singleNodeWarningSummary = `Single node cluster is not correctly configured` + +func validateSingleNodeCluster(spec *compute.ClusterSpec, l []dyn.Location, p dyn.Path) *diag.Diagnostic { + if spec == nil { + return nil + } + + if spec.NumWorkers > 0 || spec.Autoscale != nil { + return nil + } + + if spec.PolicyId != "" { + return nil + } + + invalidSingleNodeWarning := &diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: l, + Paths: []dyn.Path{p}, + } + profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] + if !ok { + return invalidSingleNodeWarning + } + master, ok := spec.SparkConf["spark.master"] + if !ok { + return invalidSingleNodeWarning + } + resourceClass, ok := spec.CustomTags["ResourceClass"] + if !ok { + return invalidSingleNodeWarning + } + + if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { + return nil + } + + return invalidSingleNodeWarning +} + +func validateSingleNodePipelineCluster(spec pipelines.PipelineCluster, l []dyn.Location, p dyn.Path) *diag.Diagnostic { + if spec.NumWorkers > 0 || spec.Autoscale != nil { + return nil + } + + if spec.PolicyId != "" { + return nil + } + + invalidSingleNodeWarning := &diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: l, + Paths: []dyn.Path{p}, + } + profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] + if !ok { + return invalidSingleNodeWarning + } + master, ok := spec.SparkConf["spark.master"] + if !ok { + return invalidSingleNodeWarning + } + resourceClass, ok := spec.CustomTags["ResourceClass"] + if !ok { + return invalidSingleNodeWarning + } + + if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { + return nil + } + + return invalidSingleNodeWarning +} + +func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { + diags := diag.Diagnostics{} + + // Interactive clusters + for k, r := range rb.Config().Resources.Clusters { + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key(k)) + l := rb.Config().GetLocations("resources.clusters." + k) + + d := validateSingleNodeCluster(r.ClusterSpec, l, p) + if d != nil { + diags = append(diags, *d) + } + } + + // Job clusters + for jobK, jobV := range rb.Config().Resources.Jobs { + for i, clusterV := range jobV.JobSettings.JobClusters { + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("job_clusters"), dyn.Index(i)) + l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.job_clusters[%d]", jobK, i)) + + d := validateSingleNodeCluster(&clusterV.NewCluster, l, p) + if d != nil { + diags = append(diags, *d) + } + } + } + + // Job task clusters + for jobK, jobV := range rb.Config().Resources.Jobs { + for i, taskV := range jobV.JobSettings.Tasks { + if taskV.NewCluster == nil { + continue + } + + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("tasks"), dyn.Index(i), dyn.Key("new_cluster")) + l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.tasks[%d].new_cluster", jobK, i)) + + d := validateSingleNodeCluster(taskV.NewCluster, l, p) + if d != nil { + diags = append(diags, *d) + } + } + } + + // Pipeline clusters + for pipelineK, pipelineV := range rb.Config().Resources.Pipelines { + for i, clusterV := range pipelineV.PipelineSpec.Clusters { + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(pipelineK), dyn.Key("clusters"), dyn.Index(i)) + l := rb.Config().GetLocations(fmt.Sprintf("resources.pipelines.%s.clusters[%d]", pipelineK, i)) + + d := validateSingleNodePipelineCluster(clusterV, l, p) + if d != nil { + diags = append(diags, *d) + } + } + } + + return diags +} diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go new file mode 100644 index 00000000..d9fea6cf --- /dev/null +++ b/bundle/config/validate/single_node_cluster_test.go @@ -0,0 +1,470 @@ +package validate + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" +) + +func TestValidateSingleNodeClusterFail(t *testing.T) { + failCases := []struct { + name string + spec *compute.ClusterSpec + }{ + { + name: "no tags or conf", + spec: &compute.ClusterSpec{ + ClusterName: "foo", + }, + }, + { + name: "no tags", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + }, + }, + { + name: "no conf", + spec: &compute.ClusterSpec{ + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark cluster profile", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark.master", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid tags", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "invalid", + }, + }, + }, + } + + ctx := context.Background() + + // Test interactive clusters. + for _, tc := range failCases { + t.Run("interactive_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Clusters: map[string]*resources.Cluster{ + "foo": { + ClusterSpec: tc.spec, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))}, + }, + }, diags) + }) + } + + // Test new job clusters. + for _, tc := range failCases { + t.Run("job_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: *tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0]", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0]")}, + }, + }, diags) + + }) + } + + // Test job task clusters. + for _, tc := range failCases { + t.Run("task_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0]", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodeClusterPass(t *testing.T) { + passCases := []struct { + name string + spec *compute.ClusterSpec + }{ + { + name: "single node cluster", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "num workers is not zero", + spec: &compute.ClusterSpec{ + NumWorkers: 1, + }, + }, + { + name: "autoscale is not nil", + spec: &compute.ClusterSpec{ + Autoscale: &compute.AutoScale{ + MinWorkers: 1, + }, + }, + }, + { + name: "policy id is not empty", + spec: &compute.ClusterSpec{ + PolicyId: "policy-abc", + }, + }, + } + + ctx := context.Background() + + // Test interactive clusters. + for _, tc := range passCases { + t.Run("interactive_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Clusters: map[string]*resources.Cluster{ + "foo": { + ClusterSpec: tc.spec, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Test new job clusters. + for _, tc := range passCases { + t.Run("job_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: *tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Test job task clusters. + for _, tc := range passCases { + t.Run("task_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodePipelineClustersFail(t *testing.T) { + failCases := []struct { + name string + spec pipelines.PipelineCluster + }{ + { + name: "no tags or conf", + spec: pipelines.PipelineCluster{ + DriverInstancePoolId: "abcd", + }, + }, + { + name: "no tags", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + }, + }, + { + name: "no conf", + spec: pipelines.PipelineCluster{ + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark cluster profile", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark.master", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid tags", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "invalid", + }, + }, + }, + } + + ctx := context.Background() + + for _, tc := range failCases { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + tc.spec, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodePipelineClustersPass(t *testing.T) { + passCases := []struct { + name string + spec pipelines.PipelineCluster + }{ + { + name: "single node cluster", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "num workers is not zero", + spec: pipelines.PipelineCluster{ + NumWorkers: 1, + }, + }, + { + name: "autoscale is not nil", + spec: pipelines.PipelineCluster{ + Autoscale: &pipelines.PipelineClusterAutoscale{ + MaxWorkers: 3, + }, + }, + }, + { + name: "policy id is not empty", + spec: pipelines.PipelineCluster{ + PolicyId: "policy-abc", + }, + }, + } + + ctx := context.Background() + + for _, tc := range passCases { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + tc.spec, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} diff --git a/bundle/config/validate/validate.go b/bundle/config/validate/validate.go index 440477e6..eb4c3c3c 100644 --- a/bundle/config/validate/validate.go +++ b/bundle/config/validate/validate.go @@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics ValidateSyncPatterns(), JobTaskClusterSpec(), ValidateFolderPermissions(), + SingleNodeCluster(), )) } From 96a0a3ec272a19f82f53bd99f637c88151f54627 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 19 Nov 2024 23:06:43 +0100 Subject: [PATCH 2/3] address comments --- bundle/config/validate/single_node_cluster.go | 185 +++---- .../validate/single_node_cluster_test.go | 498 ++++++++---------- 2 files changed, 307 insertions(+), 376 deletions(-) diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go index f5b8befa..e1104f11 100644 --- a/bundle/config/validate/single_node_cluster.go +++ b/bundle/config/validate/single_node_cluster.go @@ -2,14 +2,13 @@ package validate import ( "context" - "fmt" "strings" "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" - "github.com/databricks/databricks-sdk-go/service/compute" - "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/log" ) // Validates that any single node clusters defined in the bundle are correctly configured. @@ -37,138 +36,100 @@ are correctly set in the cluster specification: const singleNodeWarningSummary = `Single node cluster is not correctly configured` -func validateSingleNodeCluster(spec *compute.ClusterSpec, l []dyn.Location, p dyn.Path) *diag.Diagnostic { - if spec == nil { - return nil +func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { + // Check if the user has explicitly set the num_workers to 0. Skip the warning + // if that's not the case. + numWorkers, ok := v.Get("num_workers").AsInt() + if !ok || numWorkers > 0 { + return false } - if spec.NumWorkers > 0 || spec.Autoscale != nil { - return nil + // Convenient type that contains the common fields from compute.ClusterSpec and + // pipelines.PipelineCluster that we are interested in. + type ClusterConf struct { + SparkConf map[string]string `json:"spark_conf"` + CustomTags map[string]string `json:"custom_tags"` + PolicyId string `json:"policy_id"` } - if spec.PolicyId != "" { - return nil + conf := &ClusterConf{} + err := convert.ToTyped(conf, v) + if err != nil { + return false } - invalidSingleNodeWarning := &diag.Diagnostic{ - Severity: diag.Warning, - Summary: singleNodeWarningSummary, - Detail: singleNodeWarningDetail, - Locations: l, - Paths: []dyn.Path{p}, + // If the policy id is set, we don't want to show the warning. This is because + // the user might have configured `spark_conf` and `custom_tags` correctly + // in their cluster policy. + if conf.PolicyId != "" { + return false } - profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] + + profile, ok := conf.SparkConf["spark.databricks.cluster.profile"] if !ok { - return invalidSingleNodeWarning + log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec") + return true } - master, ok := spec.SparkConf["spark.master"] + if profile != "singleNode" { + log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec") + return true + } + + master, ok := conf.SparkConf["spark.master"] if !ok { - return invalidSingleNodeWarning + log.Warnf(ctx, "spark_conf spark.master not found in single-node cluster spec") + return true } - resourceClass, ok := spec.CustomTags["ResourceClass"] + if !strings.HasPrefix(master, "local") { + log.Warnf(ctx, "spark_conf spark.master is not local in single-node cluster spec") + return true + } + + resourceClass, ok := conf.CustomTags["ResourceClass"] if !ok { - return invalidSingleNodeWarning + log.Warnf(ctx, "custom_tag ResourceClass not found in single-node cluster spec") + return true + } + if resourceClass != "SingleNode" { + log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec") + return true } - if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { - return nil - } - - return invalidSingleNodeWarning -} - -func validateSingleNodePipelineCluster(spec pipelines.PipelineCluster, l []dyn.Location, p dyn.Path) *diag.Diagnostic { - if spec.NumWorkers > 0 || spec.Autoscale != nil { - return nil - } - - if spec.PolicyId != "" { - return nil - } - - invalidSingleNodeWarning := &diag.Diagnostic{ - Severity: diag.Warning, - Summary: singleNodeWarningSummary, - Detail: singleNodeWarningDetail, - Locations: l, - Paths: []dyn.Path{p}, - } - profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] - if !ok { - return invalidSingleNodeWarning - } - master, ok := spec.SparkConf["spark.master"] - if !ok { - return invalidSingleNodeWarning - } - resourceClass, ok := spec.CustomTags["ResourceClass"] - if !ok { - return invalidSingleNodeWarning - } - - if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { - return nil - } - - return invalidSingleNodeWarning + return false } func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { diags := diag.Diagnostics{} - // Interactive clusters - for k, r := range rb.Config().Resources.Clusters { - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key(k)) - l := rb.Config().GetLocations("resources.clusters." + k) - - d := validateSingleNodeCluster(r.ClusterSpec, l, p) - if d != nil { - diags = append(diags, *d) - } + patterns := []dyn.Pattern{ + // Interactive clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()), + // Job clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Job task clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Pipeline clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()), } - // Job clusters - for jobK, jobV := range rb.Config().Resources.Jobs { - for i, clusterV := range jobV.JobSettings.JobClusters { - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("job_clusters"), dyn.Index(i)) - l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.job_clusters[%d]", jobK, i)) - - d := validateSingleNodeCluster(&clusterV.NewCluster, l, p) - if d != nil { - diags = append(diags, *d) + for _, p := range patterns { + _, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { + warning := diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: v.Locations(), + Paths: []dyn.Path{p}, } + + if showSingleNodeClusterWarning(ctx, v) { + diags = append(diags, warning) + } + return v, nil + }) + if err != nil { + log.Debugf(ctx, "Error while applying single node cluster validation: %s", err) } } - - // Job task clusters - for jobK, jobV := range rb.Config().Resources.Jobs { - for i, taskV := range jobV.JobSettings.Tasks { - if taskV.NewCluster == nil { - continue - } - - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("tasks"), dyn.Index(i), dyn.Key("new_cluster")) - l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.tasks[%d].new_cluster", jobK, i)) - - d := validateSingleNodeCluster(taskV.NewCluster, l, p) - if d != nil { - diags = append(diags, *d) - } - } - } - - // Pipeline clusters - for pipelineK, pipelineV := range rb.Config().Resources.Pipelines { - for i, clusterV := range pipelineV.PipelineSpec.Clusters { - p := dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(pipelineK), dyn.Key("clusters"), dyn.Index(i)) - l := rb.Config().GetLocations(fmt.Sprintf("resources.pipelines.%s.clusters[%d]", pipelineK, i)) - - d := validateSingleNodePipelineCluster(clusterV, l, p) - if d != nil { - diags = append(diags, *d) - } - } - } - return diags } diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go index d9fea6cf..9f76c968 100644 --- a/bundle/config/validate/single_node_cluster_test.go +++ b/bundle/config/validate/single_node_cluster_test.go @@ -18,73 +18,75 @@ import ( func TestValidateSingleNodeClusterFail(t *testing.T) { failCases := []struct { - name string - spec *compute.ClusterSpec + name string + sparkConf map[string]string + customTags map[string]string }{ { name: "no tags or conf", - spec: &compute.ClusterSpec{ - ClusterName: "foo", - }, }, { name: "no tags", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", }, }, { - name: "no conf", - spec: &compute.ClusterSpec{ - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, + name: "no conf", + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid spark cluster profile", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "invalid", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid spark.master", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "invalid", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid tags", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "invalid", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", }, + customTags: map[string]string{"ResourceClass": "invalid"}, + }, + { + name: "missing ResourceClass tag", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + customTags: map[string]string{"what": "ever"}, + }, + { + name: "missing spark.master", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "missing spark.databricks.cluster.profile", + sparkConf: map[string]string{ + "spark.master": "local[*]", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, }, } ctx := context.Background() - // Test interactive clusters. + // Interactive clusters. for _, tc := range failCases { t.Run("interactive_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -92,7 +94,10 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { Resources: config.Resources{ Clusters: map[string]*resources.Cluster{ "foo": { - ClusterSpec: tc.spec, + ClusterSpec: &compute.ClusterSpec{ + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -101,6 +106,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}) + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0)) + }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { @@ -114,7 +124,7 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }) } - // Test new job clusters. + // Job clusters. for _, tc := range failCases { t.Run("job_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -125,7 +135,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { JobSettings: &jobs.JobSettings{ JobClusters: []jobs.JobCluster{ { - NewCluster: *tc.spec, + NewCluster: compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -135,7 +149,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, } - bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0]", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0)) + }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ @@ -144,14 +164,14 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { Summary: singleNodeWarningSummary, Detail: singleNodeWarningDetail, Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}, - Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0]")}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")}, }, }, diags) }) } - // Test job task clusters. + // Job task clusters. for _, tc := range failCases { t.Run("task_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ @@ -162,7 +182,11 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { JobSettings: &jobs.JobSettings{ Tasks: []jobs.Task{ { - NewCluster: tc.spec, + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -172,7 +196,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, } - bundletest.SetLocation(b, "resources.jobs.foo.tasks[0]", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0)) + }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ @@ -186,192 +216,10 @@ func TestValidateSingleNodeClusterFail(t *testing.T) { }, diags) }) } -} - -func TestValidateSingleNodeClusterPass(t *testing.T) { - passCases := []struct { - name string - spec *compute.ClusterSpec - }{ - { - name: "single node cluster", - spec: &compute.ClusterSpec{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "num workers is not zero", - spec: &compute.ClusterSpec{ - NumWorkers: 1, - }, - }, - { - name: "autoscale is not nil", - spec: &compute.ClusterSpec{ - Autoscale: &compute.AutoScale{ - MinWorkers: 1, - }, - }, - }, - { - name: "policy id is not empty", - spec: &compute.ClusterSpec{ - PolicyId: "policy-abc", - }, - }, - } - - ctx := context.Background() - - // Test interactive clusters. - for _, tc := range passCases { - t.Run("interactive_"+tc.name, func(t *testing.T) { - b := &bundle.Bundle{ - Config: config.Root{ - Resources: config.Resources{ - Clusters: map[string]*resources.Cluster{ - "foo": { - ClusterSpec: tc.spec, - }, - }, - }, - }, - } - - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) - assert.Empty(t, diags) - }) - } - - // Test new job clusters. - for _, tc := range passCases { - t.Run("job_"+tc.name, func(t *testing.T) { - b := &bundle.Bundle{ - Config: config.Root{ - Resources: config.Resources{ - Jobs: map[string]*resources.Job{ - "foo": { - JobSettings: &jobs.JobSettings{ - JobClusters: []jobs.JobCluster{ - { - NewCluster: *tc.spec, - }, - }, - }, - }, - }, - }, - }, - } - - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) - assert.Empty(t, diags) - }) - } - - // Test job task clusters. - for _, tc := range passCases { - t.Run("task_"+tc.name, func(t *testing.T) { - b := &bundle.Bundle{ - Config: config.Root{ - Resources: config.Resources{ - Jobs: map[string]*resources.Job{ - "foo": { - JobSettings: &jobs.JobSettings{ - Tasks: []jobs.Task{ - { - NewCluster: tc.spec, - }, - }, - }, - }, - }, - }, - }, - } - - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) - assert.Empty(t, diags) - }) - } -} - -func TestValidateSingleNodePipelineClustersFail(t *testing.T) { - failCases := []struct { - name string - spec pipelines.PipelineCluster - }{ - { - name: "no tags or conf", - spec: pipelines.PipelineCluster{ - DriverInstancePoolId: "abcd", - }, - }, - { - name: "no tags", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - }, - }, - { - name: "no conf", - spec: pipelines.PipelineCluster{ - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "invalid spark cluster profile", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "invalid", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "invalid spark.master", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "invalid", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, - }, - }, - { - name: "invalid tags", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "invalid", - }, - }, - }, - } - - ctx := context.Background() + // Pipeline clusters. for _, tc := range failCases { - t.Run(tc.name, func(t *testing.T) { + t.Run("pipeline_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ @@ -379,7 +227,10 @@ func TestValidateSingleNodePipelineClustersFail(t *testing.T) { "foo": { PipelineSpec: &pipelines.PipelineSpec{ Clusters: []pipelines.PipelineCluster{ - tc.spec, + { + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, }, }, }, @@ -390,6 +241,12 @@ func TestValidateSingleNodePipelineClustersFail(t *testing.T) { bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0)) + }) + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { @@ -402,59 +259,95 @@ func TestValidateSingleNodePipelineClustersFail(t *testing.T) { }, diags) }) } + } -func TestValidateSingleNodePipelineClustersPass(t *testing.T) { +func TestValidateSingleNodeClusterPass(t *testing.T) { + zero := 0 + one := 1 + passCases := []struct { - name string - spec pipelines.PipelineCluster + name string + numWorkers *int + sparkConf map[string]string + customTags map[string]string + policyId string }{ { name: "single node cluster", - spec: pipelines.PipelineCluster{ - SparkConf: map[string]string{ - "spark.databricks.cluster.profile": "singleNode", - "spark.master": "local[*]", - }, - CustomTags: map[string]string{ - "ResourceClass": "SingleNode", - }, + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", }, + customTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + numWorkers: &zero, }, { - name: "num workers is not zero", - spec: pipelines.PipelineCluster{ - NumWorkers: 1, - }, + name: "num workers is not zero", + numWorkers: &one, }, { - name: "autoscale is not nil", - spec: pipelines.PipelineCluster{ - Autoscale: &pipelines.PipelineClusterAutoscale{ - MaxWorkers: 3, - }, - }, + name: "num workers is not set", }, { - name: "policy id is not empty", - spec: pipelines.PipelineCluster{ - PolicyId: "policy-abc", - }, + name: "policy id is not empty", + policyId: "policy-abc", + numWorkers: &zero, }, } ctx := context.Background() + // Interactive clusters. for _, tc := range passCases { - t.Run(tc.name, func(t *testing.T) { + t.Run("interactive_"+tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ - Pipelines: map[string]*resources.Pipeline{ + Clusters: map[string]*resources.Cluster{ "foo": { - PipelineSpec: &pipelines.PipelineSpec{ - Clusters: []pipelines.PipelineCluster{ - tc.spec, + ClusterSpec: &compute.ClusterSpec{ + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Job clusters. + for _, tc := range passCases { + t.Run("job_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, }, }, }, @@ -463,6 +356,83 @@ func TestValidateSingleNodePipelineClustersPass(t *testing.T) { }, } + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Job task clusters. + for _, tc := range passCases { + t.Run("task_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Pipeline clusters. + for _, tc := range passCases { + t.Run("pipeline_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + { + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers)) + }) + } + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Empty(t, diags) }) From 4233a7c2923c3acc5fc81e7cb6f0e45c4967dc2a Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 19 Nov 2024 23:11:03 +0100 Subject: [PATCH 3/3] better warn --- bundle/config/validate/single_node_cluster.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go index e1104f11..c4fc3a23 100644 --- a/bundle/config/validate/single_node_cluster.go +++ b/bundle/config/validate/single_node_cluster.go @@ -71,7 +71,7 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { return true } if profile != "singleNode" { - log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec") + log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile) return true } @@ -81,7 +81,7 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { return true } if !strings.HasPrefix(master, "local") { - log.Warnf(ctx, "spark_conf spark.master is not local in single-node cluster spec") + log.Warnf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master) return true } @@ -91,7 +91,7 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { return true } if resourceClass != "SingleNode" { - log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec") + log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass) return true }