mirror of https://github.com/databricks/cli.git
Compare commits
3 Commits
4233a7c292
...
382a4efd6e
Author | SHA1 | Date |
---|---|---|
Shreyas Goenka | 382a4efd6e | |
Shreyas Goenka | 8128cc390c | |
Shreyas Goenka | 3ac1bb1853 |
|
@ -67,31 +67,31 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool {
|
||||||
|
|
||||||
profile, ok := conf.SparkConf["spark.databricks.cluster.profile"]
|
profile, ok := conf.SparkConf["spark.databricks.cluster.profile"]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec")
|
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if profile != "singleNode" {
|
if profile != "singleNode" {
|
||||||
log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile)
|
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
master, ok := conf.SparkConf["spark.master"]
|
master, ok := conf.SparkConf["spark.master"]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warnf(ctx, "spark_conf spark.master not found in single-node cluster spec")
|
log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if !strings.HasPrefix(master, "local") {
|
if !strings.HasPrefix(master, "local") {
|
||||||
log.Warnf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master)
|
log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
resourceClass, ok := conf.CustomTags["ResourceClass"]
|
resourceClass, ok := conf.CustomTags["ResourceClass"]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warnf(ctx, "custom_tag ResourceClass not found in single-node cluster spec")
|
log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if resourceClass != "SingleNode" {
|
if resourceClass != "SingleNode" {
|
||||||
log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass)
|
log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,6 +108,8 @@ func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle)
|
||||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")),
|
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")),
|
||||||
// Job task clusters
|
// Job task clusters
|
||||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")),
|
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")),
|
||||||
|
// Job for each task clusters
|
||||||
|
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")),
|
||||||
// Pipeline clusters
|
// Pipeline clusters
|
||||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()),
|
dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()),
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,8 +16,12 @@ import (
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestValidateSingleNodeClusterFail(t *testing.T) {
|
func failCases() []struct {
|
||||||
failCases := []struct {
|
name string
|
||||||
|
sparkConf map[string]string
|
||||||
|
customTags map[string]string
|
||||||
|
} {
|
||||||
|
return []struct {
|
||||||
name string
|
name string
|
||||||
sparkConf map[string]string
|
sparkConf map[string]string
|
||||||
customTags map[string]string
|
customTags map[string]string
|
||||||
|
@ -83,12 +87,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
|
||||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Interactive clusters.
|
for _, tc := range failCases() {
|
||||||
for _, tc := range failCases {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
t.Run("interactive_"+tc.name, func(t *testing.T) {
|
|
||||||
b := &bundle.Bundle{
|
b := &bundle.Bundle{
|
||||||
Config: config.Root{
|
Config: config.Root{
|
||||||
Resources: config.Resources{
|
Resources: config.Resources{
|
||||||
|
@ -123,10 +128,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
|
||||||
}, diags)
|
}, diags)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Job clusters.
|
func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) {
|
||||||
for _, tc := range failCases {
|
ctx := context.Background()
|
||||||
t.Run("job_"+tc.name, func(t *testing.T) {
|
|
||||||
|
for _, tc := range failCases() {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
b := &bundle.Bundle{
|
b := &bundle.Bundle{
|
||||||
Config: config.Root{
|
Config: config.Root{
|
||||||
Resources: config.Resources{
|
Resources: config.Resources{
|
||||||
|
@ -170,10 +178,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
|
||||||
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Job task clusters.
|
func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) {
|
||||||
for _, tc := range failCases {
|
ctx := context.Background()
|
||||||
t.Run("task_"+tc.name, func(t *testing.T) {
|
|
||||||
|
for _, tc := range failCases() {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
b := &bundle.Bundle{
|
b := &bundle.Bundle{
|
||||||
Config: config.Root{
|
Config: config.Root{
|
||||||
Resources: config.Resources{
|
Resources: config.Resources{
|
||||||
|
@ -216,10 +227,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
|
||||||
}, diags)
|
}, diags)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Pipeline clusters.
|
func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) {
|
||||||
for _, tc := range failCases {
|
ctx := context.Background()
|
||||||
t.Run("pipeline_"+tc.name, func(t *testing.T) {
|
|
||||||
|
for _, tc := range failCases() {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
b := &bundle.Bundle{
|
b := &bundle.Bundle{
|
||||||
Config: config.Root{
|
Config: config.Root{
|
||||||
Resources: config.Resources{
|
Resources: config.Resources{
|
||||||
|
@ -259,14 +273,72 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
|
||||||
}, diags)
|
}, diags)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestValidateSingleNodeClusterPass(t *testing.T) {
|
func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
for _, tc := range failCases() {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Resources: config.Resources{
|
||||||
|
Jobs: map[string]*resources.Job{
|
||||||
|
"foo": {
|
||||||
|
JobSettings: &jobs.JobSettings{
|
||||||
|
Tasks: []jobs.Task{
|
||||||
|
{
|
||||||
|
ForEachTask: &jobs.ForEachTask{
|
||||||
|
Task: jobs.Task{
|
||||||
|
NewCluster: &compute.ClusterSpec{
|
||||||
|
ClusterName: "my_cluster",
|
||||||
|
SparkConf: tc.sparkConf,
|
||||||
|
CustomTags: tc.customTags,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}})
|
||||||
|
|
||||||
|
// We can't set num_workers to 0 explicitly in the typed configuration.
|
||||||
|
// Do it on the dyn.Value directly.
|
||||||
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||||
|
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0))
|
||||||
|
})
|
||||||
|
|
||||||
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||||
|
assert.Equal(t, diag.Diagnostics{
|
||||||
|
{
|
||||||
|
Severity: diag.Warning,
|
||||||
|
Summary: singleNodeWarningSummary,
|
||||||
|
Detail: singleNodeWarningDetail,
|
||||||
|
Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}},
|
||||||
|
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")},
|
||||||
|
},
|
||||||
|
}, diags)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func passCases() []struct {
|
||||||
|
name string
|
||||||
|
numWorkers *int
|
||||||
|
sparkConf map[string]string
|
||||||
|
customTags map[string]string
|
||||||
|
policyId string
|
||||||
|
} {
|
||||||
zero := 0
|
zero := 0
|
||||||
one := 1
|
one := 1
|
||||||
|
|
||||||
passCases := []struct {
|
return []struct {
|
||||||
name string
|
name string
|
||||||
numWorkers *int
|
numWorkers *int
|
||||||
sparkConf map[string]string
|
sparkConf map[string]string
|
||||||
|
@ -297,12 +369,13 @@ func TestValidateSingleNodeClusterPass(t *testing.T) {
|
||||||
numWorkers: &zero,
|
numWorkers: &zero,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Interactive clusters.
|
for _, tc := range passCases() {
|
||||||
for _, tc := range passCases {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
t.Run("interactive_"+tc.name, func(t *testing.T) {
|
|
||||||
b := &bundle.Bundle{
|
b := &bundle.Bundle{
|
||||||
Config: config.Root{
|
Config: config.Root{
|
||||||
Resources: config.Resources{
|
Resources: config.Resources{
|
||||||
|
@ -329,10 +402,13 @@ func TestValidateSingleNodeClusterPass(t *testing.T) {
|
||||||
assert.Empty(t, diags)
|
assert.Empty(t, diags)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Job clusters.
|
func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) {
|
||||||
for _, tc := range passCases {
|
ctx := context.Background()
|
||||||
t.Run("job_"+tc.name, func(t *testing.T) {
|
|
||||||
|
for _, tc := range passCases() {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
b := &bundle.Bundle{
|
b := &bundle.Bundle{
|
||||||
Config: config.Root{
|
Config: config.Root{
|
||||||
Resources: config.Resources{
|
Resources: config.Resources{
|
||||||
|
@ -366,10 +442,13 @@ func TestValidateSingleNodeClusterPass(t *testing.T) {
|
||||||
assert.Empty(t, diags)
|
assert.Empty(t, diags)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Job task clusters.
|
func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) {
|
||||||
for _, tc := range passCases {
|
ctx := context.Background()
|
||||||
t.Run("task_"+tc.name, func(t *testing.T) {
|
|
||||||
|
for _, tc := range passCases() {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
b := &bundle.Bundle{
|
b := &bundle.Bundle{
|
||||||
Config: config.Root{
|
Config: config.Root{
|
||||||
Resources: config.Resources{
|
Resources: config.Resources{
|
||||||
|
@ -403,10 +482,13 @@ func TestValidateSingleNodeClusterPass(t *testing.T) {
|
||||||
assert.Empty(t, diags)
|
assert.Empty(t, diags)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Pipeline clusters.
|
func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) {
|
||||||
for _, tc := range passCases {
|
ctx := context.Background()
|
||||||
t.Run("pipeline_"+tc.name, func(t *testing.T) {
|
|
||||||
|
for _, tc := range passCases() {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
b := &bundle.Bundle{
|
b := &bundle.Bundle{
|
||||||
Config: config.Root{
|
Config: config.Root{
|
||||||
Resources: config.Resources{
|
Resources: config.Resources{
|
||||||
|
@ -438,3 +520,47 @@ func TestValidateSingleNodeClusterPass(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
for _, tc := range passCases() {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Resources: config.Resources{
|
||||||
|
Jobs: map[string]*resources.Job{
|
||||||
|
"foo": {
|
||||||
|
JobSettings: &jobs.JobSettings{
|
||||||
|
Tasks: []jobs.Task{
|
||||||
|
{
|
||||||
|
ForEachTask: &jobs.ForEachTask{
|
||||||
|
Task: jobs.Task{
|
||||||
|
NewCluster: &compute.ClusterSpec{
|
||||||
|
ClusterName: "my_cluster",
|
||||||
|
SparkConf: tc.sparkConf,
|
||||||
|
CustomTags: tc.customTags,
|
||||||
|
PolicyId: tc.policyId,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.numWorkers != nil {
|
||||||
|
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||||
|
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||||
|
assert.Empty(t, diags)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue