Compare commits

..

3 Commits

Author SHA1 Message Date
Shreyas Goenka 382a4efd6e
add validation for foreach task clusters as well 2024-11-22 15:28:00 +01:00
Shreyas Goenka 8128cc390c
separate test function 2024-11-22 15:10:12 +01:00
Shreyas Goenka 3ac1bb1853
warn -> debug log 2024-11-22 14:53:58 +01:00
2 changed files with 163 additions and 35 deletions

View File

@ -67,31 +67,31 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool {
profile, ok := conf.SparkConf["spark.databricks.cluster.profile"] profile, ok := conf.SparkConf["spark.databricks.cluster.profile"]
if !ok { if !ok {
log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec") log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec")
return true return true
} }
if profile != "singleNode" { if profile != "singleNode" {
log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile) log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile)
return true return true
} }
master, ok := conf.SparkConf["spark.master"] master, ok := conf.SparkConf["spark.master"]
if !ok { if !ok {
log.Warnf(ctx, "spark_conf spark.master not found in single-node cluster spec") log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec")
return true return true
} }
if !strings.HasPrefix(master, "local") { if !strings.HasPrefix(master, "local") {
log.Warnf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master) log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master)
return true return true
} }
resourceClass, ok := conf.CustomTags["ResourceClass"] resourceClass, ok := conf.CustomTags["ResourceClass"]
if !ok { if !ok {
log.Warnf(ctx, "custom_tag ResourceClass not found in single-node cluster spec") log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec")
return true return true
} }
if resourceClass != "SingleNode" { if resourceClass != "SingleNode" {
log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass) log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass)
return true return true
} }
@ -108,6 +108,8 @@ func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle)
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")), dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")),
// Job task clusters // Job task clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")), dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")),
// Job for each task clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")),
// Pipeline clusters // Pipeline clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()), dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()),
} }

View File

@ -16,8 +16,12 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestValidateSingleNodeClusterFail(t *testing.T) { func failCases() []struct {
failCases := []struct { name string
sparkConf map[string]string
customTags map[string]string
} {
return []struct {
name string name string
sparkConf map[string]string sparkConf map[string]string
customTags map[string]string customTags map[string]string
@ -83,12 +87,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
customTags: map[string]string{"ResourceClass": "SingleNode"}, customTags: map[string]string{"ResourceClass": "SingleNode"},
}, },
} }
}
func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Interactive clusters. for _, tc := range failCases() {
for _, tc := range failCases { t.Run(tc.name, func(t *testing.T) {
t.Run("interactive_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
@ -123,10 +128,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
}, diags) }, diags)
}) })
} }
}
// Job clusters. func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) {
for _, tc := range failCases { ctx := context.Background()
t.Run("job_"+tc.name, func(t *testing.T) {
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
@ -170,10 +178,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
}) })
} }
}
// Job task clusters. func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) {
for _, tc := range failCases { ctx := context.Background()
t.Run("task_"+tc.name, func(t *testing.T) {
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
@ -216,10 +227,13 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
}, diags) }, diags)
}) })
} }
}
// Pipeline clusters. func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) {
for _, tc := range failCases { ctx := context.Background()
t.Run("pipeline_"+tc.name, func(t *testing.T) {
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
@ -259,14 +273,72 @@ func TestValidateSingleNodeClusterFail(t *testing.T) {
}, diags) }, diags)
}) })
} }
} }
func TestValidateSingleNodeClusterPass(t *testing.T) { func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")},
},
}, diags)
})
}
}
func passCases() []struct {
name string
numWorkers *int
sparkConf map[string]string
customTags map[string]string
policyId string
} {
zero := 0 zero := 0
one := 1 one := 1
passCases := []struct { return []struct {
name string name string
numWorkers *int numWorkers *int
sparkConf map[string]string sparkConf map[string]string
@ -297,12 +369,13 @@ func TestValidateSingleNodeClusterPass(t *testing.T) {
numWorkers: &zero, numWorkers: &zero,
}, },
} }
}
func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Interactive clusters. for _, tc := range passCases() {
for _, tc := range passCases { t.Run(tc.name, func(t *testing.T) {
t.Run("interactive_"+tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
@ -329,10 +402,13 @@ func TestValidateSingleNodeClusterPass(t *testing.T) {
assert.Empty(t, diags) assert.Empty(t, diags)
}) })
} }
}
// Job clusters. func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) {
for _, tc := range passCases { ctx := context.Background()
t.Run("job_"+tc.name, func(t *testing.T) {
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
@ -366,10 +442,13 @@ func TestValidateSingleNodeClusterPass(t *testing.T) {
assert.Empty(t, diags) assert.Empty(t, diags)
}) })
} }
}
// Job task clusters. func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) {
for _, tc := range passCases { ctx := context.Background()
t.Run("task_"+tc.name, func(t *testing.T) {
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
@ -403,10 +482,13 @@ func TestValidateSingleNodeClusterPass(t *testing.T) {
assert.Empty(t, diags) assert.Empty(t, diags)
}) })
} }
}
// Pipeline clusters. func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) {
for _, tc := range passCases { ctx := context.Background()
t.Run("pipeline_"+tc.name, func(t *testing.T) {
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
@ -438,3 +520,47 @@ func TestValidateSingleNodeClusterPass(t *testing.T) {
}) })
} }
} }
func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}