mirror of https://github.com/databricks/cli.git
567 lines
15 KiB
Go
567 lines
15 KiB
Go
package validate
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"github.com/databricks/cli/bundle"
|
|
"github.com/databricks/cli/bundle/config"
|
|
"github.com/databricks/cli/bundle/config/resources"
|
|
"github.com/databricks/cli/bundle/internal/bundletest"
|
|
"github.com/databricks/cli/libs/diag"
|
|
"github.com/databricks/cli/libs/dyn"
|
|
"github.com/databricks/databricks-sdk-go/service/compute"
|
|
"github.com/databricks/databricks-sdk-go/service/jobs"
|
|
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func failCases() []struct {
|
|
name string
|
|
sparkConf map[string]string
|
|
customTags map[string]string
|
|
} {
|
|
return []struct {
|
|
name string
|
|
sparkConf map[string]string
|
|
customTags map[string]string
|
|
}{
|
|
{
|
|
name: "no tags or conf",
|
|
},
|
|
{
|
|
name: "no tags",
|
|
sparkConf: map[string]string{
|
|
"spark.databricks.cluster.profile": "singleNode",
|
|
"spark.master": "local[*]",
|
|
},
|
|
},
|
|
{
|
|
name: "no conf",
|
|
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
|
},
|
|
{
|
|
name: "invalid spark cluster profile",
|
|
sparkConf: map[string]string{
|
|
"spark.databricks.cluster.profile": "invalid",
|
|
"spark.master": "local[*]",
|
|
},
|
|
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
|
},
|
|
{
|
|
name: "invalid spark.master",
|
|
sparkConf: map[string]string{
|
|
"spark.databricks.cluster.profile": "singleNode",
|
|
"spark.master": "invalid",
|
|
},
|
|
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
|
},
|
|
{
|
|
name: "invalid tags",
|
|
sparkConf: map[string]string{
|
|
"spark.databricks.cluster.profile": "singleNode",
|
|
"spark.master": "local[*]",
|
|
},
|
|
customTags: map[string]string{"ResourceClass": "invalid"},
|
|
},
|
|
{
|
|
name: "missing ResourceClass tag",
|
|
sparkConf: map[string]string{
|
|
"spark.databricks.cluster.profile": "singleNode",
|
|
"spark.master": "local[*]",
|
|
},
|
|
customTags: map[string]string{"what": "ever"},
|
|
},
|
|
{
|
|
name: "missing spark.master",
|
|
sparkConf: map[string]string{
|
|
"spark.databricks.cluster.profile": "singleNode",
|
|
},
|
|
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
|
},
|
|
{
|
|
name: "missing spark.databricks.cluster.profile",
|
|
sparkConf: map[string]string{
|
|
"spark.master": "local[*]",
|
|
},
|
|
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
|
},
|
|
}
|
|
}
|
|
|
|
func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
for _, tc := range failCases() {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
b := &bundle.Bundle{
|
|
Config: config.Root{
|
|
Resources: config.Resources{
|
|
Clusters: map[string]*resources.Cluster{
|
|
"foo": {
|
|
ClusterSpec: &compute.ClusterSpec{
|
|
SparkConf: tc.sparkConf,
|
|
CustomTags: tc.customTags,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}})
|
|
|
|
// We can't set num_workers to 0 explicitly in the typed configuration.
|
|
// Do it on the dyn.Value directly.
|
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
|
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0))
|
|
})
|
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
|
assert.Equal(t, diag.Diagnostics{
|
|
{
|
|
Severity: diag.Warning,
|
|
Summary: singleNodeWarningSummary,
|
|
Detail: singleNodeWarningDetail,
|
|
Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}},
|
|
Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))},
|
|
},
|
|
}, diags)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
for _, tc := range failCases() {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
b := &bundle.Bundle{
|
|
Config: config.Root{
|
|
Resources: config.Resources{
|
|
Jobs: map[string]*resources.Job{
|
|
"foo": {
|
|
JobSettings: &jobs.JobSettings{
|
|
JobClusters: []jobs.JobCluster{
|
|
{
|
|
NewCluster: compute.ClusterSpec{
|
|
ClusterName: "my_cluster",
|
|
SparkConf: tc.sparkConf,
|
|
CustomTags: tc.customTags,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}})
|
|
|
|
// We can't set num_workers to 0 explicitly in the typed configuration.
|
|
// Do it on the dyn.Value directly.
|
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
|
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0))
|
|
})
|
|
|
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
|
assert.Equal(t, diag.Diagnostics{
|
|
{
|
|
Severity: diag.Warning,
|
|
Summary: singleNodeWarningSummary,
|
|
Detail: singleNodeWarningDetail,
|
|
Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}},
|
|
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")},
|
|
},
|
|
}, diags)
|
|
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
for _, tc := range failCases() {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
b := &bundle.Bundle{
|
|
Config: config.Root{
|
|
Resources: config.Resources{
|
|
Jobs: map[string]*resources.Job{
|
|
"foo": {
|
|
JobSettings: &jobs.JobSettings{
|
|
Tasks: []jobs.Task{
|
|
{
|
|
NewCluster: &compute.ClusterSpec{
|
|
ClusterName: "my_cluster",
|
|
SparkConf: tc.sparkConf,
|
|
CustomTags: tc.customTags,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}})
|
|
|
|
// We can't set num_workers to 0 explicitly in the typed configuration.
|
|
// Do it on the dyn.Value directly.
|
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
|
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0))
|
|
})
|
|
|
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
|
assert.Equal(t, diag.Diagnostics{
|
|
{
|
|
Severity: diag.Warning,
|
|
Summary: singleNodeWarningSummary,
|
|
Detail: singleNodeWarningDetail,
|
|
Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}},
|
|
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")},
|
|
},
|
|
}, diags)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
for _, tc := range failCases() {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
b := &bundle.Bundle{
|
|
Config: config.Root{
|
|
Resources: config.Resources{
|
|
Pipelines: map[string]*resources.Pipeline{
|
|
"foo": {
|
|
PipelineSpec: &pipelines.PipelineSpec{
|
|
Clusters: []pipelines.PipelineCluster{
|
|
{
|
|
SparkConf: tc.sparkConf,
|
|
CustomTags: tc.customTags,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}})
|
|
|
|
// We can't set num_workers to 0 explicitly in the typed configuration.
|
|
// Do it on the dyn.Value directly.
|
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
|
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0))
|
|
})
|
|
|
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
|
assert.Equal(t, diag.Diagnostics{
|
|
{
|
|
Severity: diag.Warning,
|
|
Summary: singleNodeWarningSummary,
|
|
Detail: singleNodeWarningDetail,
|
|
Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}},
|
|
Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")},
|
|
},
|
|
}, diags)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
for _, tc := range failCases() {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
b := &bundle.Bundle{
|
|
Config: config.Root{
|
|
Resources: config.Resources{
|
|
Jobs: map[string]*resources.Job{
|
|
"foo": {
|
|
JobSettings: &jobs.JobSettings{
|
|
Tasks: []jobs.Task{
|
|
{
|
|
ForEachTask: &jobs.ForEachTask{
|
|
Task: jobs.Task{
|
|
NewCluster: &compute.ClusterSpec{
|
|
ClusterName: "my_cluster",
|
|
SparkConf: tc.sparkConf,
|
|
CustomTags: tc.customTags,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}})
|
|
|
|
// We can't set num_workers to 0 explicitly in the typed configuration.
|
|
// Do it on the dyn.Value directly.
|
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
|
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0))
|
|
})
|
|
|
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
|
assert.Equal(t, diag.Diagnostics{
|
|
{
|
|
Severity: diag.Warning,
|
|
Summary: singleNodeWarningSummary,
|
|
Detail: singleNodeWarningDetail,
|
|
Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}},
|
|
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")},
|
|
},
|
|
}, diags)
|
|
})
|
|
}
|
|
}
|
|
|
|
func passCases() []struct {
|
|
name string
|
|
numWorkers *int
|
|
sparkConf map[string]string
|
|
customTags map[string]string
|
|
policyId string
|
|
} {
|
|
zero := 0
|
|
one := 1
|
|
|
|
return []struct {
|
|
name string
|
|
numWorkers *int
|
|
sparkConf map[string]string
|
|
customTags map[string]string
|
|
policyId string
|
|
}{
|
|
{
|
|
name: "single node cluster",
|
|
sparkConf: map[string]string{
|
|
"spark.databricks.cluster.profile": "singleNode",
|
|
"spark.master": "local[*]",
|
|
},
|
|
customTags: map[string]string{
|
|
"ResourceClass": "SingleNode",
|
|
},
|
|
numWorkers: &zero,
|
|
},
|
|
{
|
|
name: "num workers is not zero",
|
|
numWorkers: &one,
|
|
},
|
|
{
|
|
name: "num workers is not set",
|
|
},
|
|
{
|
|
name: "policy id is not empty",
|
|
policyId: "policy-abc",
|
|
numWorkers: &zero,
|
|
},
|
|
}
|
|
}
|
|
|
|
func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
for _, tc := range passCases() {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
b := &bundle.Bundle{
|
|
Config: config.Root{
|
|
Resources: config.Resources{
|
|
Clusters: map[string]*resources.Cluster{
|
|
"foo": {
|
|
ClusterSpec: &compute.ClusterSpec{
|
|
SparkConf: tc.sparkConf,
|
|
CustomTags: tc.customTags,
|
|
PolicyId: tc.policyId,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if tc.numWorkers != nil {
|
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
|
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers))
|
|
})
|
|
}
|
|
|
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
|
assert.Empty(t, diags)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
for _, tc := range passCases() {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
b := &bundle.Bundle{
|
|
Config: config.Root{
|
|
Resources: config.Resources{
|
|
Jobs: map[string]*resources.Job{
|
|
"foo": {
|
|
JobSettings: &jobs.JobSettings{
|
|
JobClusters: []jobs.JobCluster{
|
|
{
|
|
NewCluster: compute.ClusterSpec{
|
|
ClusterName: "my_cluster",
|
|
SparkConf: tc.sparkConf,
|
|
CustomTags: tc.customTags,
|
|
PolicyId: tc.policyId,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if tc.numWorkers != nil {
|
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
|
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
|
|
})
|
|
}
|
|
|
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
|
assert.Empty(t, diags)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
for _, tc := range passCases() {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
b := &bundle.Bundle{
|
|
Config: config.Root{
|
|
Resources: config.Resources{
|
|
Jobs: map[string]*resources.Job{
|
|
"foo": {
|
|
JobSettings: &jobs.JobSettings{
|
|
Tasks: []jobs.Task{
|
|
{
|
|
NewCluster: &compute.ClusterSpec{
|
|
ClusterName: "my_cluster",
|
|
SparkConf: tc.sparkConf,
|
|
CustomTags: tc.customTags,
|
|
PolicyId: tc.policyId,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if tc.numWorkers != nil {
|
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
|
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
|
|
})
|
|
}
|
|
|
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
|
assert.Empty(t, diags)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
for _, tc := range passCases() {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
b := &bundle.Bundle{
|
|
Config: config.Root{
|
|
Resources: config.Resources{
|
|
Pipelines: map[string]*resources.Pipeline{
|
|
"foo": {
|
|
PipelineSpec: &pipelines.PipelineSpec{
|
|
Clusters: []pipelines.PipelineCluster{
|
|
{
|
|
SparkConf: tc.sparkConf,
|
|
CustomTags: tc.customTags,
|
|
PolicyId: tc.policyId,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if tc.numWorkers != nil {
|
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
|
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers))
|
|
})
|
|
}
|
|
|
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
|
assert.Empty(t, diags)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
for _, tc := range passCases() {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
b := &bundle.Bundle{
|
|
Config: config.Root{
|
|
Resources: config.Resources{
|
|
Jobs: map[string]*resources.Job{
|
|
"foo": {
|
|
JobSettings: &jobs.JobSettings{
|
|
Tasks: []jobs.Task{
|
|
{
|
|
ForEachTask: &jobs.ForEachTask{
|
|
Task: jobs.Task{
|
|
NewCluster: &compute.ClusterSpec{
|
|
ClusterName: "my_cluster",
|
|
SparkConf: tc.sparkConf,
|
|
CustomTags: tc.customTags,
|
|
PolicyId: tc.policyId,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if tc.numWorkers != nil {
|
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
|
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers))
|
|
})
|
|
}
|
|
|
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
|
assert.Empty(t, diags)
|
|
})
|
|
}
|
|
}
|