databricks-cli/bundle/config/validate/single_node_cluster_test.go

567 lines
15 KiB
Go

package validate
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/bundletest"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
)
func failCases() []struct {
name string
sparkConf map[string]string
customTags map[string]string
} {
return []struct {
name string
sparkConf map[string]string
customTags map[string]string
}{
{
name: "no tags or conf",
},
{
name: "no tags",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
},
{
name: "no conf",
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "invalid spark cluster profile",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "invalid",
"spark.master": "local[*]",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "invalid spark.master",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "invalid",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "invalid tags",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
customTags: map[string]string{"ResourceClass": "invalid"},
},
{
name: "missing ResourceClass tag",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
customTags: map[string]string{"what": "ever"},
},
{
name: "missing spark.master",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "missing spark.databricks.cluster.profile",
sparkConf: map[string]string{
"spark.master": "local[*]",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
}
}
func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Clusters: map[string]*resources.Cluster{
"foo": {
ClusterSpec: &compute.ClusterSpec{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{
{
NewCluster: compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"foo": {
PipelineSpec: &pipelines.PipelineSpec{
Clusters: []pipelines.PipelineCluster{
{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")},
},
}, diags)
})
}
}
func passCases() []struct {
name string
numWorkers *int
sparkConf map[string]string
customTags map[string]string
policyId string
} {
zero := 0
one := 1
return []struct {
name string
numWorkers *int
sparkConf map[string]string
customTags map[string]string
policyId string
}{
{
name: "single node cluster",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
customTags: map[string]string{
"ResourceClass": "SingleNode",
},
numWorkers: &zero,
},
{
name: "num workers is not zero",
numWorkers: &one,
},
{
name: "num workers is not set",
},
{
name: "policy id is not empty",
policyId: "policy-abc",
numWorkers: &zero,
},
}
}
func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Clusters: map[string]*resources.Cluster{
"foo": {
ClusterSpec: &compute.ClusterSpec{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{
{
NewCluster: compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"foo": {
PipelineSpec: &pipelines.PipelineSpec{
Clusters: []pipelines.PipelineCluster{
{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}