From df0a98066acb8346b5902eb6a80224160b45feff Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 18 Nov 2024 15:51:58 +0100 Subject: [PATCH] Add validation for single node clusters --- bundle/config/validate/single_node_cluster.go | 174 +++++++ .../validate/single_node_cluster_test.go | 470 ++++++++++++++++++ bundle/config/validate/validate.go | 1 + 3 files changed, 645 insertions(+) create mode 100644 bundle/config/validate/single_node_cluster.go create mode 100644 bundle/config/validate/single_node_cluster_test.go diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go new file mode 100644 index 000000000..f5b8befac --- /dev/null +++ b/bundle/config/validate/single_node_cluster.go @@ -0,0 +1,174 @@ +package validate + +import ( + "context" + "fmt" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/pipelines" +) + +// Validates that any single node clusters defined in the bundle are correctly configured. +func SingleNodeCluster() bundle.ReadOnlyMutator { + return &singleNodeCluster{} +} + +type singleNodeCluster struct{} + +func (m *singleNodeCluster) Name() string { + return "validate:SingleNodeCluster" +} + +const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + ` + +const singleNodeWarningSummary = `Single node cluster is not correctly configured` + +func validateSingleNodeCluster(spec *compute.ClusterSpec, l []dyn.Location, p dyn.Path) *diag.Diagnostic { + if spec == nil { + return nil + } + + if spec.NumWorkers > 0 || spec.Autoscale != nil { + return nil + } + + if spec.PolicyId != "" { + return nil + } + + invalidSingleNodeWarning := &diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: l, + Paths: []dyn.Path{p}, + } + profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] + if !ok { + return invalidSingleNodeWarning + } + master, ok := spec.SparkConf["spark.master"] + if !ok { + return invalidSingleNodeWarning + } + resourceClass, ok := spec.CustomTags["ResourceClass"] + if !ok { + return invalidSingleNodeWarning + } + + if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { + return nil + } + + return invalidSingleNodeWarning +} + +func validateSingleNodePipelineCluster(spec pipelines.PipelineCluster, l []dyn.Location, p dyn.Path) *diag.Diagnostic { + if spec.NumWorkers > 0 || spec.Autoscale != nil { + return nil + } + + if spec.PolicyId != "" { + return nil + } + + invalidSingleNodeWarning := &diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: l, + Paths: []dyn.Path{p}, + } + profile, ok := spec.SparkConf["spark.databricks.cluster.profile"] + if !ok { + return invalidSingleNodeWarning + } + master, ok := spec.SparkConf["spark.master"] + if !ok { + return invalidSingleNodeWarning + } + resourceClass, ok := spec.CustomTags["ResourceClass"] + if !ok { + return invalidSingleNodeWarning + } + + if profile == "singleNode" && strings.HasPrefix(master, "local") && resourceClass == "SingleNode" { + return nil + } + + return invalidSingleNodeWarning +} + +func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { + diags := diag.Diagnostics{} + + // Interactive clusters + for k, r := range rb.Config().Resources.Clusters { + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key(k)) + l := rb.Config().GetLocations("resources.clusters." + k) + + d := validateSingleNodeCluster(r.ClusterSpec, l, p) + if d != nil { + diags = append(diags, *d) + } + } + + // Job clusters + for jobK, jobV := range rb.Config().Resources.Jobs { + for i, clusterV := range jobV.JobSettings.JobClusters { + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("job_clusters"), dyn.Index(i)) + l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.job_clusters[%d]", jobK, i)) + + d := validateSingleNodeCluster(&clusterV.NewCluster, l, p) + if d != nil { + diags = append(diags, *d) + } + } + } + + // Job task clusters + for jobK, jobV := range rb.Config().Resources.Jobs { + for i, taskV := range jobV.JobSettings.Tasks { + if taskV.NewCluster == nil { + continue + } + + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"), dyn.Key(jobK), dyn.Key("tasks"), dyn.Index(i), dyn.Key("new_cluster")) + l := rb.Config().GetLocations(fmt.Sprintf("resources.jobs.%s.tasks[%d].new_cluster", jobK, i)) + + d := validateSingleNodeCluster(taskV.NewCluster, l, p) + if d != nil { + diags = append(diags, *d) + } + } + } + + // Pipeline clusters + for pipelineK, pipelineV := range rb.Config().Resources.Pipelines { + for i, clusterV := range pipelineV.PipelineSpec.Clusters { + p := dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(pipelineK), dyn.Key("clusters"), dyn.Index(i)) + l := rb.Config().GetLocations(fmt.Sprintf("resources.pipelines.%s.clusters[%d]", pipelineK, i)) + + d := validateSingleNodePipelineCluster(clusterV, l, p) + if d != nil { + diags = append(diags, *d) + } + } + } + + return diags +} diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go new file mode 100644 index 000000000..d9fea6cfd --- /dev/null +++ b/bundle/config/validate/single_node_cluster_test.go @@ -0,0 +1,470 @@ +package validate + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" +) + +func TestValidateSingleNodeClusterFail(t *testing.T) { + failCases := []struct { + name string + spec *compute.ClusterSpec + }{ + { + name: "no tags or conf", + spec: &compute.ClusterSpec{ + ClusterName: "foo", + }, + }, + { + name: "no tags", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + }, + }, + { + name: "no conf", + spec: &compute.ClusterSpec{ + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark cluster profile", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark.master", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid tags", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "invalid", + }, + }, + }, + } + + ctx := context.Background() + + // Test interactive clusters. + for _, tc := range failCases { + t.Run("interactive_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Clusters: map[string]*resources.Cluster{ + "foo": { + ClusterSpec: tc.spec, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))}, + }, + }, diags) + }) + } + + // Test new job clusters. + for _, tc := range failCases { + t.Run("job_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: *tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0]", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0]")}, + }, + }, diags) + + }) + } + + // Test job task clusters. + for _, tc := range failCases { + t.Run("task_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0]", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodeClusterPass(t *testing.T) { + passCases := []struct { + name string + spec *compute.ClusterSpec + }{ + { + name: "single node cluster", + spec: &compute.ClusterSpec{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "num workers is not zero", + spec: &compute.ClusterSpec{ + NumWorkers: 1, + }, + }, + { + name: "autoscale is not nil", + spec: &compute.ClusterSpec{ + Autoscale: &compute.AutoScale{ + MinWorkers: 1, + }, + }, + }, + { + name: "policy id is not empty", + spec: &compute.ClusterSpec{ + PolicyId: "policy-abc", + }, + }, + } + + ctx := context.Background() + + // Test interactive clusters. + for _, tc := range passCases { + t.Run("interactive_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Clusters: map[string]*resources.Cluster{ + "foo": { + ClusterSpec: tc.spec, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Test new job clusters. + for _, tc := range passCases { + t.Run("job_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: *tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } + + // Test job task clusters. + for _, tc := range passCases { + t.Run("task_"+tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: tc.spec, + }, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodePipelineClustersFail(t *testing.T) { + failCases := []struct { + name string + spec pipelines.PipelineCluster + }{ + { + name: "no tags or conf", + spec: pipelines.PipelineCluster{ + DriverInstancePoolId: "abcd", + }, + }, + { + name: "no tags", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + }, + }, + { + name: "no conf", + spec: pipelines.PipelineCluster{ + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark cluster profile", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid spark.master", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "invalid tags", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "invalid", + }, + }, + }, + } + + ctx := context.Background() + + for _, tc := range failCases { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + tc.spec, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodePipelineClustersPass(t *testing.T) { + passCases := []struct { + name string + spec pipelines.PipelineCluster + }{ + { + name: "single node cluster", + spec: pipelines.PipelineCluster{ + SparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + CustomTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + }, + }, + { + name: "num workers is not zero", + spec: pipelines.PipelineCluster{ + NumWorkers: 1, + }, + }, + { + name: "autoscale is not nil", + spec: pipelines.PipelineCluster{ + Autoscale: &pipelines.PipelineClusterAutoscale{ + MaxWorkers: 3, + }, + }, + }, + { + name: "policy id is not empty", + spec: pipelines.PipelineCluster{ + PolicyId: "policy-abc", + }, + }, + } + + ctx := context.Background() + + for _, tc := range passCases { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + tc.spec, + }, + }, + }, + }, + }, + }, + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} diff --git a/bundle/config/validate/validate.go b/bundle/config/validate/validate.go index 440477e65..eb4c3c3cd 100644 --- a/bundle/config/validate/validate.go +++ b/bundle/config/validate/validate.go @@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics ValidateSyncPatterns(), JobTaskClusterSpec(), ValidateFolderPermissions(), + SingleNodeCluster(), )) }