package validate import ( "context" "testing" "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/bundletest" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/stretchr/testify/assert" ) func failCases() []struct { name string sparkConf map[string]string customTags map[string]string } { return []struct { name string sparkConf map[string]string customTags map[string]string }{ { name: "no tags or conf", }, { name: "no tags", sparkConf: map[string]string{ "spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]", }, }, { name: "no conf", customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid spark cluster profile", sparkConf: map[string]string{ "spark.databricks.cluster.profile": "invalid", "spark.master": "local[*]", }, customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid spark.master", sparkConf: map[string]string{ "spark.databricks.cluster.profile": "singleNode", "spark.master": "invalid", }, customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "invalid tags", sparkConf: map[string]string{ "spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]", }, customTags: map[string]string{"ResourceClass": "invalid"}, }, { name: "missing ResourceClass tag", sparkConf: map[string]string{ "spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]", }, customTags: map[string]string{"what": "ever"}, }, { name: "missing spark.master", sparkConf: map[string]string{ "spark.databricks.cluster.profile": "singleNode", }, customTags: map[string]string{"ResourceClass": "SingleNode"}, }, { name: "missing spark.databricks.cluster.profile", sparkConf: map[string]string{ "spark.master": "local[*]", }, customTags: map[string]string{"ResourceClass": "SingleNode"}, }, } } func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ Clusters: map[string]*resources.Cluster{ "foo": { ClusterSpec: &compute.ClusterSpec{ SparkConf: tc.sparkConf, CustomTags: tc.customTags, }, }, }, }, }, } bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}) // We can't set num_workers to 0 explicitly in the typed configuration. // Do it on the dyn.Value directly. bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0)) }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { Severity: diag.Warning, Summary: singleNodeWarningSummary, Detail: singleNodeWarningDetail, Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}, Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))}, }, }, diags) }) } } func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ Jobs: map[string]*resources.Job{ "foo": { JobSettings: &jobs.JobSettings{ JobClusters: []jobs.JobCluster{ { NewCluster: compute.ClusterSpec{ ClusterName: "my_cluster", SparkConf: tc.sparkConf, CustomTags: tc.customTags, }, }, }, }, }, }, }, }, } bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) // We can't set num_workers to 0 explicitly in the typed configuration. // Do it on the dyn.Value directly. bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0)) }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { Severity: diag.Warning, Summary: singleNodeWarningSummary, Detail: singleNodeWarningDetail, Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}, Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")}, }, }, diags) }) } } func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ Jobs: map[string]*resources.Job{ "foo": { JobSettings: &jobs.JobSettings{ Tasks: []jobs.Task{ { NewCluster: &compute.ClusterSpec{ ClusterName: "my_cluster", SparkConf: tc.sparkConf, CustomTags: tc.customTags, }, }, }, }, }, }, }, }, } bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) // We can't set num_workers to 0 explicitly in the typed configuration. // Do it on the dyn.Value directly. bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0)) }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { Severity: diag.Warning, Summary: singleNodeWarningSummary, Detail: singleNodeWarningDetail, Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}, Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")}, }, }, diags) }) } } func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "foo": { PipelineSpec: &pipelines.PipelineSpec{ Clusters: []pipelines.PipelineCluster{ { SparkConf: tc.sparkConf, CustomTags: tc.customTags, }, }, }, }, }, }, }, } bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) // We can't set num_workers to 0 explicitly in the typed configuration. // Do it on the dyn.Value directly. bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0)) }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { Severity: diag.Warning, Summary: singleNodeWarningSummary, Detail: singleNodeWarningDetail, Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}, Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")}, }, }, diags) }) } } func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) { ctx := context.Background() for _, tc := range failCases() { t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ Jobs: map[string]*resources.Job{ "foo": { JobSettings: &jobs.JobSettings{ Tasks: []jobs.Task{ { ForEachTask: &jobs.ForEachTask{ Task: jobs.Task{ NewCluster: &compute.ClusterSpec{ ClusterName: "my_cluster", SparkConf: tc.sparkConf, CustomTags: tc.customTags, }, }, }, }, }, }, }, }, }, }, } bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}}) // We can't set num_workers to 0 explicitly in the typed configuration. // Do it on the dyn.Value directly. bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0)) }) diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { Severity: diag.Warning, Summary: singleNodeWarningSummary, Detail: singleNodeWarningDetail, Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}}, Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")}, }, }, diags) }) } } func passCases() []struct { name string numWorkers *int sparkConf map[string]string customTags map[string]string policyId string } { zero := 0 one := 1 return []struct { name string numWorkers *int sparkConf map[string]string customTags map[string]string policyId string }{ { name: "single node cluster", sparkConf: map[string]string{ "spark.databricks.cluster.profile": "singleNode", "spark.master": "local[*]", }, customTags: map[string]string{ "ResourceClass": "SingleNode", }, numWorkers: &zero, }, { name: "num workers is not zero", numWorkers: &one, }, { name: "num workers is not set", }, { name: "policy id is not empty", policyId: "policy-abc", numWorkers: &zero, }, } } func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ Clusters: map[string]*resources.Cluster{ "foo": { ClusterSpec: &compute.ClusterSpec{ SparkConf: tc.sparkConf, CustomTags: tc.customTags, PolicyId: tc.policyId, }, }, }, }, }, } if tc.numWorkers != nil { bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers)) }) } diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Empty(t, diags) }) } } func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ Jobs: map[string]*resources.Job{ "foo": { JobSettings: &jobs.JobSettings{ JobClusters: []jobs.JobCluster{ { NewCluster: compute.ClusterSpec{ ClusterName: "my_cluster", SparkConf: tc.sparkConf, CustomTags: tc.customTags, PolicyId: tc.policyId, }, }, }, }, }, }, }, }, } if tc.numWorkers != nil { bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) }) } diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Empty(t, diags) }) } } func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ Jobs: map[string]*resources.Job{ "foo": { JobSettings: &jobs.JobSettings{ Tasks: []jobs.Task{ { NewCluster: &compute.ClusterSpec{ ClusterName: "my_cluster", SparkConf: tc.sparkConf, CustomTags: tc.customTags, PolicyId: tc.policyId, }, }, }, }, }, }, }, }, } if tc.numWorkers != nil { bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) }) } diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Empty(t, diags) }) } } func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ Pipelines: map[string]*resources.Pipeline{ "foo": { PipelineSpec: &pipelines.PipelineSpec{ Clusters: []pipelines.PipelineCluster{ { SparkConf: tc.sparkConf, CustomTags: tc.customTags, PolicyId: tc.policyId, }, }, }, }, }, }, }, } if tc.numWorkers != nil { bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers)) }) } diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Empty(t, diags) }) } } func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) { ctx := context.Background() for _, tc := range passCases() { t.Run(tc.name, func(t *testing.T) { b := &bundle.Bundle{ Config: config.Root{ Resources: config.Resources{ Jobs: map[string]*resources.Job{ "foo": { JobSettings: &jobs.JobSettings{ Tasks: []jobs.Task{ { ForEachTask: &jobs.ForEachTask{ Task: jobs.Task{ NewCluster: &compute.ClusterSpec{ ClusterName: "my_cluster", SparkConf: tc.sparkConf, CustomTags: tc.customTags, PolicyId: tc.policyId, }, }, }, }, }, }, }, }, }, }, } if tc.numWorkers != nil { bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers)) }) } diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) assert.Empty(t, diags) }) } }