Add JobTaskClusterSpec validate mutator

This commit is contained in:
Gleb Kanterov 2024-09-23 12:17:25 +02:00
parent cf989a7e10
commit d7da8f146e
No known key found for this signature in database
GPG Key ID: 4D87C640DBD00176
3 changed files with 357 additions and 0 deletions

View File

@ -0,0 +1,155 @@
package validate
import (
"context"
"fmt"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
// JobTaskClusterSpec validates that job tasks have cluster spec defined
// if task requires a cluster
func JobTaskClusterSpec() bundle.ReadOnlyMutator {
return &jobTaskClusterSpec{}
}
type jobTaskClusterSpec struct {
}
func (v *jobTaskClusterSpec) Name() string {
return "validate:job_task_cluster_spec"
}
func (v *jobTaskClusterSpec) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{}
jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"))
for resourceName, job := range rb.Config().Resources.Jobs {
resourcePath := jobsPath.Append(dyn.Key(resourceName))
for taskIndex, task := range job.Tasks {
taskPath := resourcePath.Append(dyn.Key("tasks"), dyn.Index(taskIndex))
diags = diags.Extend(validateJobTask(rb, task, taskPath))
}
}
return diags
}
func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics {
diags := diag.Diagnostics{}
var specified []string
var unspecified []string
if task.JobClusterKey != "" {
specified = append(specified, "job_cluster_key")
} else {
unspecified = append(unspecified, "job_cluster_key")
}
if task.EnvironmentKey != "" {
specified = append(specified, "environment_key")
} else {
unspecified = append(unspecified, "environment_key")
}
if task.ExistingClusterId != "" {
specified = append(specified, "existing_cluster_id")
} else {
unspecified = append(unspecified, "existing_cluster_id")
}
if task.NewCluster != nil {
specified = append(specified, "new_cluster")
} else {
unspecified = append(unspecified, "new_cluster")
}
if task.ForEachTask != nil {
forEachTaskPath := taskPath.Append(dyn.Key("for_each_task"), dyn.Key("task"))
diags = diags.Extend(validateJobTask(rb, task.ForEachTask.Task, forEachTaskPath))
}
if isComputeTask(task) && len(specified) == 0 {
if task.NotebookTask != nil {
// notebook tasks without cluster spec will use notebook environment
} else {
// path might be not very helpful, adding user-specified task key clarifies the context
detail := fmt.Sprintf("Task %q has a task type that requires a cluster, but no cluster is specified", task.TaskKey)
diags = diags.Append(diag.Diagnostic{
Severity: diag.Error,
Summary: fmt.Sprintf("One of the following fields must be set: %s", strings.Join(unspecified, ", ")),
Detail: detail,
Locations: rb.Config().GetLocations(taskPath.String()),
Paths: []dyn.Path{taskPath},
})
}
}
return diags
}
// isComputeTask returns true if the task requires a cluster
func isComputeTask(task jobs.Task) bool {
if task.NotebookTask != nil {
// if warehouse_id is set, it's SQL notebook that doesn't need cluster
if task.NotebookTask.WarehouseId != "" {
return false
} else {
return true
}
}
if task.PythonWheelTask != nil {
return true
}
if task.DbtTask != nil {
return true
}
if task.SparkJarTask != nil {
return true
}
if task.SparkSubmitTask != nil {
return true
}
if task.SparkPythonTask != nil {
return true
}
if task.SqlTask != nil {
return false
}
if task.PipelineTask != nil {
// while pipelines use clusters, pipeline tasks don't, they only trigger pipelines
return false
}
if task.RunJobTask != nil {
return false
}
if task.ConditionTask != nil {
return false
}
// for each task doesn't use clusters, underlying task(s) can though
if task.ForEachTask != nil {
return false
}
return false
}

View File

@ -0,0 +1,201 @@
package validate
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
)
func TestJobTaskClusterSpec(t *testing.T) {
expectedSummary := "One of the following fields must be set: job_cluster_key, environment_key, existing_cluster_id, new_cluster"
type testCase struct {
name string
task jobs.Task
errorPath string
errorDetail string
errorSummary string
}
testCases := []testCase{
{
name: "valid notebook task",
task: jobs.Task{
// while a cluster is needed, it will use notebook environment to create one
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid notebook task (job_cluster_key)",
task: jobs.Task{
JobClusterKey: "cluster1",
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid notebook task (new_cluster)",
task: jobs.Task{
NewCluster: &compute.ClusterSpec{},
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid notebook task (existing_cluster_id)",
task: jobs.Task{
ExistingClusterId: "cluster1",
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid SQL notebook task",
task: jobs.Task{
NotebookTask: &jobs.NotebookTask{
WarehouseId: "warehouse1",
},
},
},
{
name: "valid python wheel task",
task: jobs.Task{
JobClusterKey: "cluster1",
PythonWheelTask: &jobs.PythonWheelTask{},
},
},
{
name: "valid python wheel task (environment_key)",
task: jobs.Task{
EnvironmentKey: "environment1",
PythonWheelTask: &jobs.PythonWheelTask{},
},
},
{
name: "valid dbt task",
task: jobs.Task{
JobClusterKey: "cluster1",
DbtTask: &jobs.DbtTask{},
},
},
{
name: "valid spark jar task",
task: jobs.Task{
JobClusterKey: "cluster1",
SparkJarTask: &jobs.SparkJarTask{},
},
},
{
name: "valid spark submit",
task: jobs.Task{
NewCluster: &compute.ClusterSpec{},
SparkSubmitTask: &jobs.SparkSubmitTask{},
},
},
{
name: "valid spark python task",
task: jobs.Task{
JobClusterKey: "cluster1",
SparkPythonTask: &jobs.SparkPythonTask{},
},
},
{
name: "valid SQL task",
task: jobs.Task{
SqlTask: &jobs.SqlTask{},
},
},
{
name: "valid pipeline task",
task: jobs.Task{
PipelineTask: &jobs.PipelineTask{},
},
},
{
name: "valid run job task",
task: jobs.Task{
RunJobTask: &jobs.RunJobTask{},
},
},
{
name: "valid condition task",
task: jobs.Task{
ConditionTask: &jobs.ConditionTask{},
},
},
{
name: "valid for each task",
task: jobs.Task{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
JobClusterKey: "cluster1",
NotebookTask: &jobs.NotebookTask{},
},
},
},
},
{
name: "invalid python wheel task",
task: jobs.Task{
PythonWheelTask: &jobs.PythonWheelTask{},
TaskKey: "my_task",
},
errorPath: "resources.jobs.job1.tasks[0]",
errorDetail: "Task \"my_task\" has a task type that requires a cluster, but no cluster is specified",
errorSummary: expectedSummary,
},
{
name: "invalid for each task",
task: jobs.Task{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
PythonWheelTask: &jobs.PythonWheelTask{},
TaskKey: "my_task",
},
},
},
errorPath: "resources.jobs.job1.tasks[0].for_each_task.task",
errorDetail: "Task \"my_task\" has a task type that requires a cluster, but no cluster is specified",
errorSummary: expectedSummary,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
job := &resources.Job{
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{tc.task},
},
}
b := createBundle(map[string]*resources.Job{"job1": job})
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobTaskClusterSpec())
if tc.errorPath != "" || tc.errorDetail != "" || tc.errorSummary != "" {
assert.Len(t, diags, 1)
assert.Len(t, diags[0].Paths, 1)
diag := diags[0]
assert.Equal(t, tc.errorPath, diag.Paths[0].String())
assert.Equal(t, tc.errorSummary, diag.Summary)
assert.Equal(t, tc.errorDetail, diag.Detail)
} else {
assert.ElementsMatch(t, []string{}, diags)
}
})
}
}
func createBundle(jobs map[string]*resources.Job) *bundle.Bundle {
return &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: jobs,
},
},
}
}

View File

@ -34,6 +34,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
JobClusterKeyDefined(),
FilesToSync(),
ValidateSyncPatterns(),
JobTaskClusterSpec(),
))
}