mirror of https://github.com/databricks/cli.git
Add JobTaskClusterSpec validate mutator (#1784)
## Changes Add JobTaskClusterSpec validate mutator. It catches the case when tasks don't which cluster to use. For example, we can get this error with minor modifications to `default-python` template: ```yaml tasks: - task_key: python_file_task spark_python_task: python_file: ../src/my_project_10/main.py ``` ``` % databricks bundle validate Error: Missing required cluster or environment settings at resources.jobs.my_project_10_job.tasks[0] in resources/my_project_10_job.yml:17:11 Task "print_github_stars" requires a cluster or an environment to run. Specify one of the following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster. ``` We implicitly rely on "one of" validation, which does not exist. Many bundle fields can't co-exist, for instance, specifying: `JobTask.{existing_cluster_id,job_cluster_key}`, `Library.{whl,pypi}`, `JobTask.{notebook_task,python_wheel_task}`, etc. ## Tests Unit tests --------- Co-authored-by: Pieter Noordhuis <pcnoordhuis@gmail.com>
This commit is contained in:
parent
490259a14a
commit
3d9decdda9
|
@ -0,0 +1,161 @@
|
|||
package validate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
)
|
||||
|
||||
// JobTaskClusterSpec validates that job tasks have cluster spec defined
|
||||
// if task requires a cluster
|
||||
func JobTaskClusterSpec() bundle.ReadOnlyMutator {
|
||||
return &jobTaskClusterSpec{}
|
||||
}
|
||||
|
||||
type jobTaskClusterSpec struct {
|
||||
}
|
||||
|
||||
func (v *jobTaskClusterSpec) Name() string {
|
||||
return "validate:job_task_cluster_spec"
|
||||
}
|
||||
|
||||
func (v *jobTaskClusterSpec) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
diags := diag.Diagnostics{}
|
||||
|
||||
jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"))
|
||||
|
||||
for resourceName, job := range rb.Config().Resources.Jobs {
|
||||
resourcePath := jobsPath.Append(dyn.Key(resourceName))
|
||||
|
||||
for taskIndex, task := range job.Tasks {
|
||||
taskPath := resourcePath.Append(dyn.Key("tasks"), dyn.Index(taskIndex))
|
||||
|
||||
diags = diags.Extend(validateJobTask(rb, task, taskPath))
|
||||
}
|
||||
}
|
||||
|
||||
return diags
|
||||
}
|
||||
|
||||
func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics {
|
||||
diags := diag.Diagnostics{}
|
||||
|
||||
var specified []string
|
||||
var unspecified []string
|
||||
|
||||
if task.JobClusterKey != "" {
|
||||
specified = append(specified, "job_cluster_key")
|
||||
} else {
|
||||
unspecified = append(unspecified, "job_cluster_key")
|
||||
}
|
||||
|
||||
if task.EnvironmentKey != "" {
|
||||
specified = append(specified, "environment_key")
|
||||
} else {
|
||||
unspecified = append(unspecified, "environment_key")
|
||||
}
|
||||
|
||||
if task.ExistingClusterId != "" {
|
||||
specified = append(specified, "existing_cluster_id")
|
||||
} else {
|
||||
unspecified = append(unspecified, "existing_cluster_id")
|
||||
}
|
||||
|
||||
if task.NewCluster != nil {
|
||||
specified = append(specified, "new_cluster")
|
||||
} else {
|
||||
unspecified = append(unspecified, "new_cluster")
|
||||
}
|
||||
|
||||
if task.ForEachTask != nil {
|
||||
forEachTaskPath := taskPath.Append(dyn.Key("for_each_task"), dyn.Key("task"))
|
||||
|
||||
diags = diags.Extend(validateJobTask(rb, task.ForEachTask.Task, forEachTaskPath))
|
||||
}
|
||||
|
||||
if isComputeTask(task) && len(specified) == 0 {
|
||||
if task.NotebookTask != nil {
|
||||
// notebook tasks without cluster spec will use notebook environment
|
||||
} else {
|
||||
// path might be not very helpful, adding user-specified task key clarifies the context
|
||||
detail := fmt.Sprintf(
|
||||
"Task %q requires a cluster or an environment to run.\nSpecify one of the following fields: %s.",
|
||||
task.TaskKey,
|
||||
strings.Join(unspecified, ", "),
|
||||
)
|
||||
|
||||
diags = diags.Append(diag.Diagnostic{
|
||||
Severity: diag.Error,
|
||||
Summary: "Missing required cluster or environment settings",
|
||||
Detail: detail,
|
||||
Locations: rb.Config().GetLocations(taskPath.String()),
|
||||
Paths: []dyn.Path{taskPath},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return diags
|
||||
}
|
||||
|
||||
// isComputeTask returns true if the task runs on a cluster or serverless GC
|
||||
func isComputeTask(task jobs.Task) bool {
|
||||
if task.NotebookTask != nil {
|
||||
// if warehouse_id is set, it's SQL notebook that doesn't need cluster or serverless GC
|
||||
if task.NotebookTask.WarehouseId != "" {
|
||||
return false
|
||||
} else {
|
||||
// task settings don't require specifying a cluster/serverless GC, but task itself can run on one
|
||||
// we handle that case separately in validateJobTask
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
if task.PythonWheelTask != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if task.DbtTask != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if task.SparkJarTask != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if task.SparkSubmitTask != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if task.SparkPythonTask != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if task.SqlTask != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if task.PipelineTask != nil {
|
||||
// while pipelines use clusters, pipeline tasks don't, they only trigger pipelines
|
||||
return false
|
||||
}
|
||||
|
||||
if task.RunJobTask != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if task.ConditionTask != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// for each task doesn't use clusters, underlying task(s) can though
|
||||
if task.ForEachTask != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
|
@ -0,0 +1,203 @@
|
|||
package validate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestJobTaskClusterSpec(t *testing.T) {
|
||||
expectedSummary := "Missing required cluster or environment settings"
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
task jobs.Task
|
||||
errorPath string
|
||||
errorDetail string
|
||||
errorSummary string
|
||||
}
|
||||
|
||||
testCases := []testCase{
|
||||
{
|
||||
name: "valid notebook task",
|
||||
task: jobs.Task{
|
||||
// while a cluster is needed, it will use notebook environment to create one
|
||||
NotebookTask: &jobs.NotebookTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid notebook task (job_cluster_key)",
|
||||
task: jobs.Task{
|
||||
JobClusterKey: "cluster1",
|
||||
NotebookTask: &jobs.NotebookTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid notebook task (new_cluster)",
|
||||
task: jobs.Task{
|
||||
NewCluster: &compute.ClusterSpec{},
|
||||
NotebookTask: &jobs.NotebookTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid notebook task (existing_cluster_id)",
|
||||
task: jobs.Task{
|
||||
ExistingClusterId: "cluster1",
|
||||
NotebookTask: &jobs.NotebookTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid SQL notebook task",
|
||||
task: jobs.Task{
|
||||
NotebookTask: &jobs.NotebookTask{
|
||||
WarehouseId: "warehouse1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid python wheel task",
|
||||
task: jobs.Task{
|
||||
JobClusterKey: "cluster1",
|
||||
PythonWheelTask: &jobs.PythonWheelTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid python wheel task (environment_key)",
|
||||
task: jobs.Task{
|
||||
EnvironmentKey: "environment1",
|
||||
PythonWheelTask: &jobs.PythonWheelTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid dbt task",
|
||||
task: jobs.Task{
|
||||
JobClusterKey: "cluster1",
|
||||
DbtTask: &jobs.DbtTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid spark jar task",
|
||||
task: jobs.Task{
|
||||
JobClusterKey: "cluster1",
|
||||
SparkJarTask: &jobs.SparkJarTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid spark submit",
|
||||
task: jobs.Task{
|
||||
NewCluster: &compute.ClusterSpec{},
|
||||
SparkSubmitTask: &jobs.SparkSubmitTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid spark python task",
|
||||
task: jobs.Task{
|
||||
JobClusterKey: "cluster1",
|
||||
SparkPythonTask: &jobs.SparkPythonTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid SQL task",
|
||||
task: jobs.Task{
|
||||
SqlTask: &jobs.SqlTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid pipeline task",
|
||||
task: jobs.Task{
|
||||
PipelineTask: &jobs.PipelineTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid run job task",
|
||||
task: jobs.Task{
|
||||
RunJobTask: &jobs.RunJobTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid condition task",
|
||||
task: jobs.Task{
|
||||
ConditionTask: &jobs.ConditionTask{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "valid for each task",
|
||||
task: jobs.Task{
|
||||
ForEachTask: &jobs.ForEachTask{
|
||||
Task: jobs.Task{
|
||||
JobClusterKey: "cluster1",
|
||||
NotebookTask: &jobs.NotebookTask{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid python wheel task",
|
||||
task: jobs.Task{
|
||||
PythonWheelTask: &jobs.PythonWheelTask{},
|
||||
TaskKey: "my_task",
|
||||
},
|
||||
errorPath: "resources.jobs.job1.tasks[0]",
|
||||
errorDetail: `Task "my_task" requires a cluster or an environment to run.
|
||||
Specify one of the following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster.`,
|
||||
errorSummary: expectedSummary,
|
||||
},
|
||||
{
|
||||
name: "invalid for each task",
|
||||
task: jobs.Task{
|
||||
ForEachTask: &jobs.ForEachTask{
|
||||
Task: jobs.Task{
|
||||
PythonWheelTask: &jobs.PythonWheelTask{},
|
||||
TaskKey: "my_task",
|
||||
},
|
||||
},
|
||||
},
|
||||
errorPath: "resources.jobs.job1.tasks[0].for_each_task.task",
|
||||
errorDetail: `Task "my_task" requires a cluster or an environment to run.
|
||||
Specify one of the following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster.`,
|
||||
errorSummary: expectedSummary,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
job := &resources.Job{
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Tasks: []jobs.Task{tc.task},
|
||||
},
|
||||
}
|
||||
|
||||
b := createBundle(map[string]*resources.Job{"job1": job})
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobTaskClusterSpec())
|
||||
|
||||
if tc.errorPath != "" || tc.errorDetail != "" || tc.errorSummary != "" {
|
||||
assert.Len(t, diags, 1)
|
||||
assert.Len(t, diags[0].Paths, 1)
|
||||
|
||||
diag := diags[0]
|
||||
|
||||
assert.Equal(t, tc.errorPath, diag.Paths[0].String())
|
||||
assert.Equal(t, tc.errorSummary, diag.Summary)
|
||||
assert.Equal(t, tc.errorDetail, diag.Detail)
|
||||
} else {
|
||||
assert.ElementsMatch(t, []string{}, diags)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func createBundle(jobs map[string]*resources.Job) *bundle.Bundle {
|
||||
return &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: jobs,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
|
|||
JobClusterKeyDefined(),
|
||||
FilesToSync(),
|
||||
ValidateSyncPatterns(),
|
||||
JobTaskClusterSpec(),
|
||||
))
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue