Merge remote-tracking branch 'origin' into detect-convention

This commit is contained in:
Shreyas Goenka 2024-09-26 17:33:44 +02:00
commit 4cc0dd1fa1
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
29 changed files with 924 additions and 149 deletions

View File

@ -33,7 +33,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.22.7
- name: Setup Python
uses: actions/setup-python@v5
@ -68,7 +68,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.22.7
# No need to download cached dependencies when running gofmt.
cache: false
@ -100,7 +100,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.22.7
# Github repo: https://github.com/ajv-validator/ajv-cli
- name: Install ajv-cli

View File

@ -21,7 +21,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.22.7
# The default cache key for this action considers only the `go.sum` file.
# We include .goreleaser.yaml here to differentiate from the cache used by the push action

View File

@ -22,7 +22,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.22.7
# The default cache key for this action considers only the `go.sum` file.
# We include .goreleaser.yaml here to differentiate from the cache used by the push action

View File

@ -0,0 +1,115 @@
package paths
import (
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/dyn"
)
type jobRewritePattern struct {
pattern dyn.Pattern
kind PathKind
skipRewrite func(string) bool
}
func noSkipRewrite(string) bool {
return false
}
func jobTaskRewritePatterns(base dyn.Pattern) []jobRewritePattern {
return []jobRewritePattern{
{
base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")),
PathKindNotebook,
noSkipRewrite,
},
{
base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")),
PathKindWorkspaceFile,
noSkipRewrite,
},
{
base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")),
PathKindDirectory,
noSkipRewrite,
},
{
base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")),
PathKindWorkspaceFile,
noSkipRewrite,
},
{
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")),
PathKindLibrary,
noSkipRewrite,
},
{
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")),
PathKindLibrary,
noSkipRewrite,
},
{
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("requirements")),
PathKindWorkspaceFile,
noSkipRewrite,
},
}
}
func jobRewritePatterns() []jobRewritePattern {
// Base pattern to match all tasks in all jobs.
base := dyn.NewPattern(
dyn.Key("resources"),
dyn.Key("jobs"),
dyn.AnyKey(),
dyn.Key("tasks"),
dyn.AnyIndex(),
)
// Compile list of patterns and their respective rewrite functions.
jobEnvironmentsPatterns := []jobRewritePattern{
{
dyn.NewPattern(
dyn.Key("resources"),
dyn.Key("jobs"),
dyn.AnyKey(),
dyn.Key("environments"),
dyn.AnyIndex(),
dyn.Key("spec"),
dyn.Key("dependencies"),
dyn.AnyIndex(),
),
PathKindWithPrefix,
func(s string) bool {
return !libraries.IsLibraryLocal(s)
},
},
}
taskPatterns := jobTaskRewritePatterns(base)
forEachPatterns := jobTaskRewritePatterns(base.Append(dyn.Key("for_each_task"), dyn.Key("task")))
allPatterns := append(taskPatterns, jobEnvironmentsPatterns...)
allPatterns = append(allPatterns, forEachPatterns...)
return allPatterns
}
// VisitJobPaths visits all paths in job resources and applies a function to each path.
func VisitJobPaths(value dyn.Value, fn VisitFunc) (dyn.Value, error) {
var err error
var newValue = value
for _, rewritePattern := range jobRewritePatterns() {
newValue, err = dyn.MapByPattern(newValue, rewritePattern.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
if rewritePattern.skipRewrite(v.MustString()) {
return v, nil
}
return fn(p, rewritePattern.kind, v)
})
if err != nil {
return dyn.InvalidValue, err
}
}
return newValue, nil
}

View File

@ -0,0 +1,168 @@
package paths
import (
"testing"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/dyn"
assert "github.com/databricks/cli/libs/dyn/dynassert"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)
func TestVisitJobPaths(t *testing.T) {
task0 := jobs.Task{
NotebookTask: &jobs.NotebookTask{
NotebookPath: "abc",
},
}
task1 := jobs.Task{
SparkPythonTask: &jobs.SparkPythonTask{
PythonFile: "abc",
},
}
task2 := jobs.Task{
DbtTask: &jobs.DbtTask{
ProjectDirectory: "abc",
},
}
task3 := jobs.Task{
SqlTask: &jobs.SqlTask{
File: &jobs.SqlTaskFile{
Path: "abc",
},
},
}
task4 := jobs.Task{
Libraries: []compute.Library{
{Whl: "dist/foo.whl"},
},
}
task5 := jobs.Task{
Libraries: []compute.Library{
{Jar: "dist/foo.jar"},
},
}
task6 := jobs.Task{
Libraries: []compute.Library{
{Requirements: "requirements.txt"},
},
}
job0 := &resources.Job{
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
task0,
task1,
task2,
task3,
task4,
task5,
task6,
},
},
}
root := config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job0": job0,
},
},
}
actual := visitJobPaths(t, root)
expected := []dyn.Path{
dyn.MustPathFromString("resources.jobs.job0.tasks[0].notebook_task.notebook_path"),
dyn.MustPathFromString("resources.jobs.job0.tasks[1].spark_python_task.python_file"),
dyn.MustPathFromString("resources.jobs.job0.tasks[2].dbt_task.project_directory"),
dyn.MustPathFromString("resources.jobs.job0.tasks[3].sql_task.file.path"),
dyn.MustPathFromString("resources.jobs.job0.tasks[4].libraries[0].whl"),
dyn.MustPathFromString("resources.jobs.job0.tasks[5].libraries[0].jar"),
dyn.MustPathFromString("resources.jobs.job0.tasks[6].libraries[0].requirements"),
}
assert.ElementsMatch(t, expected, actual)
}
func TestVisitJobPaths_environments(t *testing.T) {
environment0 := jobs.JobEnvironment{
Spec: &compute.Environment{
Dependencies: []string{
"dist_0/*.whl",
"dist_1/*.whl",
},
},
}
job0 := &resources.Job{
JobSettings: &jobs.JobSettings{
Environments: []jobs.JobEnvironment{
environment0,
},
},
}
root := config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job0": job0,
},
},
}
actual := visitJobPaths(t, root)
expected := []dyn.Path{
dyn.MustPathFromString("resources.jobs.job0.environments[0].spec.dependencies[0]"),
dyn.MustPathFromString("resources.jobs.job0.environments[0].spec.dependencies[1]"),
}
assert.ElementsMatch(t, expected, actual)
}
func TestVisitJobPaths_foreach(t *testing.T) {
task0 := jobs.Task{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
NotebookTask: &jobs.NotebookTask{
NotebookPath: "abc",
},
},
},
}
job0 := &resources.Job{
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
task0,
},
},
}
root := config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job0": job0,
},
},
}
actual := visitJobPaths(t, root)
expected := []dyn.Path{
dyn.MustPathFromString("resources.jobs.job0.tasks[0].for_each_task.task.notebook_task.notebook_path"),
}
assert.ElementsMatch(t, expected, actual)
}
func visitJobPaths(t *testing.T, root config.Root) []dyn.Path {
var actual []dyn.Path
err := root.Mutate(func(value dyn.Value) (dyn.Value, error) {
return VisitJobPaths(value, func(p dyn.Path, kind PathKind, v dyn.Value) (dyn.Value, error) {
actual = append(actual, p)
return v, nil
})
})
require.NoError(t, err)
return actual
}

View File

@ -0,0 +1,26 @@
package paths
import "github.com/databricks/cli/libs/dyn"
type PathKind int
const (
// PathKindLibrary is a path to a library file
PathKindLibrary = iota
// PathKindNotebook is a path to a notebook file
PathKindNotebook
// PathKindWorkspaceFile is a path to a regular workspace file,
// notebooks are not allowed because they are uploaded a special
// kind of workspace object.
PathKindWorkspaceFile
// PathKindWithPrefix is a path that starts with './'
PathKindWithPrefix
// PathKindDirectory is a path to directory
PathKindDirectory
)
type VisitFunc func(path dyn.Path, kind PathKind, value dyn.Value) (dyn.Value, error)

View File

@ -4,97 +4,11 @@ import (
"fmt"
"slices"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/bundle/config/mutator/paths"
"github.com/databricks/cli/libs/dyn"
)
type jobRewritePattern struct {
pattern dyn.Pattern
fn rewriteFunc
skipRewrite func(string) bool
}
func noSkipRewrite(string) bool {
return false
}
func rewritePatterns(t *translateContext, base dyn.Pattern) []jobRewritePattern {
return []jobRewritePattern{
{
base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")),
t.translateNotebookPath,
noSkipRewrite,
},
{
base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")),
t.translateFilePath,
noSkipRewrite,
},
{
base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")),
t.translateDirectoryPath,
noSkipRewrite,
},
{
base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")),
t.translateFilePath,
noSkipRewrite,
},
{
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")),
t.translateNoOp,
noSkipRewrite,
},
{
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")),
t.translateNoOp,
noSkipRewrite,
},
{
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("requirements")),
t.translateFilePath,
noSkipRewrite,
},
}
}
func (t *translateContext) jobRewritePatterns() []jobRewritePattern {
// Base pattern to match all tasks in all jobs.
base := dyn.NewPattern(
dyn.Key("resources"),
dyn.Key("jobs"),
dyn.AnyKey(),
dyn.Key("tasks"),
dyn.AnyIndex(),
)
// Compile list of patterns and their respective rewrite functions.
jobEnvironmentsPatterns := []jobRewritePattern{
{
dyn.NewPattern(
dyn.Key("resources"),
dyn.Key("jobs"),
dyn.AnyKey(),
dyn.Key("environments"),
dyn.AnyIndex(),
dyn.Key("spec"),
dyn.Key("dependencies"),
dyn.AnyIndex(),
),
t.translateNoOpWithPrefix,
func(s string) bool {
return !libraries.IsLibraryLocal(s)
},
},
}
taskPatterns := rewritePatterns(t, base)
forEachPatterns := rewritePatterns(t, base.Append(dyn.Key("for_each_task"), dyn.Key("task")))
allPatterns := append(taskPatterns, jobEnvironmentsPatterns...)
allPatterns = append(allPatterns, forEachPatterns...)
return allPatterns
}
func (t *translateContext) applyJobTranslations(v dyn.Value) (dyn.Value, error) {
var err error
@ -111,8 +25,7 @@ func (t *translateContext) applyJobTranslations(v dyn.Value) (dyn.Value, error)
}
}
for _, rewritePattern := range t.jobRewritePatterns() {
v, err = dyn.MapByPattern(v, rewritePattern.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
return paths.VisitJobPaths(v, func(p dyn.Path, kind paths.PathKind, v dyn.Value) (dyn.Value, error) {
key := p[2].Key()
// Skip path translation if the job is using git source.
@ -125,16 +38,28 @@ func (t *translateContext) applyJobTranslations(v dyn.Value) (dyn.Value, error)
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err)
}
sv := v.MustString()
if rewritePattern.skipRewrite(sv) {
return v, nil
}
return t.rewriteRelativeTo(p, v, rewritePattern.fn, dir, fallback[key])
})
rewritePatternFn, err := t.getRewritePatternFn(kind)
if err != nil {
return dyn.InvalidValue, err
}
return t.rewriteRelativeTo(p, v, rewritePatternFn, dir, fallback[key])
})
}
func (t *translateContext) getRewritePatternFn(kind paths.PathKind) (rewriteFunc, error) {
switch kind {
case paths.PathKindLibrary:
return t.translateNoOp, nil
case paths.PathKindNotebook:
return t.translateNotebookPath, nil
case paths.PathKindWorkspaceFile:
return t.translateFilePath, nil
case paths.PathKindDirectory:
return t.translateDirectoryPath, nil
case paths.PathKindWithPrefix:
return t.translateNoOpWithPrefix, nil
}
return v, nil
return nil, fmt.Errorf("unsupported path kind: %d", kind)
}

View File

@ -406,7 +406,14 @@ func (r *Root) MergeTargetOverrides(name string) error {
return r.updateWithDynamicValue(root)
}
var variableKeywords = []string{"default", "lookup"}
var allowedVariableDefinitions = []([]string){
{"default", "type", "description"},
{"default", "type"},
{"default", "description"},
{"lookup", "description"},
{"default"},
{"lookup"},
}
// isFullVariableOverrideDef checks if the given value is a full syntax varaible override.
// A full syntax variable override is a map with either 1 of 2 keys.
@ -418,26 +425,26 @@ func isFullVariableOverrideDef(v dyn.Value) bool {
return false
}
// If the map has more than 2 keys, it is not a full variable override.
if mv.Len() > 2 {
// If the map has more than 3 keys, it is not a full variable override.
if mv.Len() > 3 {
return false
}
// If the map has 2 keys, one of them should be "default" and the other is "type"
if mv.Len() == 2 {
if _, ok := mv.GetByString("type"); !ok {
return false
for _, keys := range allowedVariableDefinitions {
if len(keys) != mv.Len() {
continue
}
if _, ok := mv.GetByString("default"); !ok {
return false
// Check if the keys are the same.
match := true
for _, key := range keys {
if _, ok := mv.GetByString(key); !ok {
match = false
break
}
}
return true
}
for _, keyword := range variableKeywords {
if _, ok := mv.GetByString(keyword); ok {
if match {
return true
}
}

View File

@ -6,6 +6,7 @@ import (
"testing"
"github.com/databricks/cli/bundle/config/variable"
"github.com/databricks/cli/libs/dyn"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -169,3 +170,87 @@ func TestRootMergeTargetOverridesWithVariables(t *testing.T) {
assert.Equal(t, "complex var", root.Variables["complex"].Description)
}
func TestIsFullVariableOverrideDef(t *testing.T) {
testCases := []struct {
value dyn.Value
expected bool
}{
{
value: dyn.V(map[string]dyn.Value{
"type": dyn.V("string"),
"default": dyn.V("foo"),
"description": dyn.V("foo var"),
}),
expected: true,
},
{
value: dyn.V(map[string]dyn.Value{
"type": dyn.V("string"),
"lookup": dyn.V("foo"),
"description": dyn.V("foo var"),
}),
expected: false,
},
{
value: dyn.V(map[string]dyn.Value{
"type": dyn.V("string"),
"default": dyn.V("foo"),
}),
expected: true,
},
{
value: dyn.V(map[string]dyn.Value{
"type": dyn.V("string"),
"lookup": dyn.V("foo"),
}),
expected: false,
},
{
value: dyn.V(map[string]dyn.Value{
"description": dyn.V("string"),
"default": dyn.V("foo"),
}),
expected: true,
},
{
value: dyn.V(map[string]dyn.Value{
"description": dyn.V("string"),
"lookup": dyn.V("foo"),
}),
expected: true,
},
{
value: dyn.V(map[string]dyn.Value{
"default": dyn.V("foo"),
}),
expected: true,
},
{
value: dyn.V(map[string]dyn.Value{
"lookup": dyn.V("foo"),
}),
expected: true,
},
{
value: dyn.V(map[string]dyn.Value{
"type": dyn.V("string"),
}),
expected: false,
},
{
value: dyn.V(map[string]dyn.Value{
"type": dyn.V("string"),
"default": dyn.V("foo"),
"description": dyn.V("foo var"),
"lookup": dyn.V("foo"),
}),
expected: false,
},
}
for i, tc := range testCases {
assert.Equal(t, tc.expected, isFullVariableOverrideDef(tc.value), "test case %d", i)
}
}

View File

@ -0,0 +1,161 @@
package validate
import (
"context"
"fmt"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
// JobTaskClusterSpec validates that job tasks have cluster spec defined
// if task requires a cluster
func JobTaskClusterSpec() bundle.ReadOnlyMutator {
return &jobTaskClusterSpec{}
}
type jobTaskClusterSpec struct {
}
func (v *jobTaskClusterSpec) Name() string {
return "validate:job_task_cluster_spec"
}
func (v *jobTaskClusterSpec) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{}
jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"))
for resourceName, job := range rb.Config().Resources.Jobs {
resourcePath := jobsPath.Append(dyn.Key(resourceName))
for taskIndex, task := range job.Tasks {
taskPath := resourcePath.Append(dyn.Key("tasks"), dyn.Index(taskIndex))
diags = diags.Extend(validateJobTask(rb, task, taskPath))
}
}
return diags
}
func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics {
diags := diag.Diagnostics{}
var specified []string
var unspecified []string
if task.JobClusterKey != "" {
specified = append(specified, "job_cluster_key")
} else {
unspecified = append(unspecified, "job_cluster_key")
}
if task.EnvironmentKey != "" {
specified = append(specified, "environment_key")
} else {
unspecified = append(unspecified, "environment_key")
}
if task.ExistingClusterId != "" {
specified = append(specified, "existing_cluster_id")
} else {
unspecified = append(unspecified, "existing_cluster_id")
}
if task.NewCluster != nil {
specified = append(specified, "new_cluster")
} else {
unspecified = append(unspecified, "new_cluster")
}
if task.ForEachTask != nil {
forEachTaskPath := taskPath.Append(dyn.Key("for_each_task"), dyn.Key("task"))
diags = diags.Extend(validateJobTask(rb, task.ForEachTask.Task, forEachTaskPath))
}
if isComputeTask(task) && len(specified) == 0 {
if task.NotebookTask != nil {
// notebook tasks without cluster spec will use notebook environment
} else {
// path might be not very helpful, adding user-specified task key clarifies the context
detail := fmt.Sprintf(
"Task %q requires a cluster or an environment to run.\nSpecify one of the following fields: %s.",
task.TaskKey,
strings.Join(unspecified, ", "),
)
diags = diags.Append(diag.Diagnostic{
Severity: diag.Error,
Summary: "Missing required cluster or environment settings",
Detail: detail,
Locations: rb.Config().GetLocations(taskPath.String()),
Paths: []dyn.Path{taskPath},
})
}
}
return diags
}
// isComputeTask returns true if the task runs on a cluster or serverless GC
func isComputeTask(task jobs.Task) bool {
if task.NotebookTask != nil {
// if warehouse_id is set, it's SQL notebook that doesn't need cluster or serverless GC
if task.NotebookTask.WarehouseId != "" {
return false
} else {
// task settings don't require specifying a cluster/serverless GC, but task itself can run on one
// we handle that case separately in validateJobTask
return true
}
}
if task.PythonWheelTask != nil {
return true
}
if task.DbtTask != nil {
return true
}
if task.SparkJarTask != nil {
return true
}
if task.SparkSubmitTask != nil {
return true
}
if task.SparkPythonTask != nil {
return true
}
if task.SqlTask != nil {
return false
}
if task.PipelineTask != nil {
// while pipelines use clusters, pipeline tasks don't, they only trigger pipelines
return false
}
if task.RunJobTask != nil {
return false
}
if task.ConditionTask != nil {
return false
}
// for each task doesn't use clusters, underlying task(s) can though
if task.ForEachTask != nil {
return false
}
return false
}

View File

@ -0,0 +1,203 @@
package validate
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
)
func TestJobTaskClusterSpec(t *testing.T) {
expectedSummary := "Missing required cluster or environment settings"
type testCase struct {
name string
task jobs.Task
errorPath string
errorDetail string
errorSummary string
}
testCases := []testCase{
{
name: "valid notebook task",
task: jobs.Task{
// while a cluster is needed, it will use notebook environment to create one
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid notebook task (job_cluster_key)",
task: jobs.Task{
JobClusterKey: "cluster1",
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid notebook task (new_cluster)",
task: jobs.Task{
NewCluster: &compute.ClusterSpec{},
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid notebook task (existing_cluster_id)",
task: jobs.Task{
ExistingClusterId: "cluster1",
NotebookTask: &jobs.NotebookTask{},
},
},
{
name: "valid SQL notebook task",
task: jobs.Task{
NotebookTask: &jobs.NotebookTask{
WarehouseId: "warehouse1",
},
},
},
{
name: "valid python wheel task",
task: jobs.Task{
JobClusterKey: "cluster1",
PythonWheelTask: &jobs.PythonWheelTask{},
},
},
{
name: "valid python wheel task (environment_key)",
task: jobs.Task{
EnvironmentKey: "environment1",
PythonWheelTask: &jobs.PythonWheelTask{},
},
},
{
name: "valid dbt task",
task: jobs.Task{
JobClusterKey: "cluster1",
DbtTask: &jobs.DbtTask{},
},
},
{
name: "valid spark jar task",
task: jobs.Task{
JobClusterKey: "cluster1",
SparkJarTask: &jobs.SparkJarTask{},
},
},
{
name: "valid spark submit",
task: jobs.Task{
NewCluster: &compute.ClusterSpec{},
SparkSubmitTask: &jobs.SparkSubmitTask{},
},
},
{
name: "valid spark python task",
task: jobs.Task{
JobClusterKey: "cluster1",
SparkPythonTask: &jobs.SparkPythonTask{},
},
},
{
name: "valid SQL task",
task: jobs.Task{
SqlTask: &jobs.SqlTask{},
},
},
{
name: "valid pipeline task",
task: jobs.Task{
PipelineTask: &jobs.PipelineTask{},
},
},
{
name: "valid run job task",
task: jobs.Task{
RunJobTask: &jobs.RunJobTask{},
},
},
{
name: "valid condition task",
task: jobs.Task{
ConditionTask: &jobs.ConditionTask{},
},
},
{
name: "valid for each task",
task: jobs.Task{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
JobClusterKey: "cluster1",
NotebookTask: &jobs.NotebookTask{},
},
},
},
},
{
name: "invalid python wheel task",
task: jobs.Task{
PythonWheelTask: &jobs.PythonWheelTask{},
TaskKey: "my_task",
},
errorPath: "resources.jobs.job1.tasks[0]",
errorDetail: `Task "my_task" requires a cluster or an environment to run.
Specify one of the following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster.`,
errorSummary: expectedSummary,
},
{
name: "invalid for each task",
task: jobs.Task{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
PythonWheelTask: &jobs.PythonWheelTask{},
TaskKey: "my_task",
},
},
},
errorPath: "resources.jobs.job1.tasks[0].for_each_task.task",
errorDetail: `Task "my_task" requires a cluster or an environment to run.
Specify one of the following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster.`,
errorSummary: expectedSummary,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
job := &resources.Job{
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{tc.task},
},
}
b := createBundle(map[string]*resources.Job{"job1": job})
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobTaskClusterSpec())
if tc.errorPath != "" || tc.errorDetail != "" || tc.errorSummary != "" {
assert.Len(t, diags, 1)
assert.Len(t, diags[0].Paths, 1)
diag := diags[0]
assert.Equal(t, tc.errorPath, diag.Paths[0].String())
assert.Equal(t, tc.errorSummary, diag.Summary)
assert.Equal(t, tc.errorDetail, diag.Detail)
} else {
assert.ElementsMatch(t, []string{}, diags)
}
})
}
}
func createBundle(jobs map[string]*resources.Job) *bundle.Bundle {
return &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: jobs,
},
},
}
}

View File

@ -34,6 +34,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
JobClusterKeyDefined(),
FilesToSync(),
ValidateSyncPatterns(),
JobTaskClusterSpec(),
))
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
@ -82,6 +83,10 @@ func BundleToTerraform(config *config.Root) *schema.Root {
conv(src, &dst)
if src.JobSettings != nil {
sort.Slice(src.JobSettings.Tasks, func(i, j int) bool {
return src.JobSettings.Tasks[i].TaskKey < src.JobSettings.Tasks[j].TaskKey
})
for _, v := range src.Tasks {
var t schema.ResourceJobTask
conv(v, &t)

View File

@ -3,6 +3,7 @@ package tfdyn
import (
"context"
"fmt"
"sort"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
@ -19,8 +20,38 @@ func convertJobResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
log.Debugf(ctx, "job normalization diagnostic: %s", diag.Summary)
}
// Sort the tasks of each job in the bundle by task key. Sorting
// the task keys ensures that the diff computed by terraform is correct and avoids
// recreates. For more details see the NOTE at
// https://registry.terraform.io/providers/databricks/databricks/latest/docs/resources/job#example-usage
// and https://github.com/databricks/terraform-provider-databricks/issues/4011
// and https://github.com/databricks/cli/pull/1776
vout := vin
var err error
tasks, ok := vin.Get("tasks").AsSequence()
if ok {
sort.Slice(tasks, func(i, j int) bool {
// We sort the tasks by their task key. Tasks without task keys are ordered
// before tasks with task keys. We do not error for those tasks
// since presence of a task_key is validated for in the Jobs backend.
tk1, ok := tasks[i].Get("task_key").AsString()
if !ok {
return true
}
tk2, ok := tasks[j].Get("task_key").AsString()
if !ok {
return false
}
return tk1 < tk2
})
vout, err = dyn.Set(vin, "tasks", dyn.V(tasks))
if err != nil {
return dyn.InvalidValue, err
}
}
// Modify top-level keys.
vout, err := renameKeys(vin, map[string]string{
vout, err = renameKeys(vout, map[string]string{
"tasks": "task",
"job_clusters": "job_cluster",
"parameters": "parameter",

View File

@ -42,8 +42,8 @@ func TestConvertJob(t *testing.T) {
},
Tasks: []jobs.Task{
{
TaskKey: "task_key",
JobClusterKey: "job_cluster_key",
TaskKey: "task_key_b",
JobClusterKey: "job_cluster_key_b",
Libraries: []compute.Library{
{
Pypi: &compute.PythonPyPiLibrary{
@ -55,6 +55,17 @@ func TestConvertJob(t *testing.T) {
},
},
},
{
TaskKey: "task_key_a",
JobClusterKey: "job_cluster_key_a",
},
{
TaskKey: "task_key_c",
JobClusterKey: "job_cluster_key_c",
},
{
Description: "missing task key 😱",
},
},
},
Permissions: []resources.Permission{
@ -100,8 +111,15 @@ func TestConvertJob(t *testing.T) {
},
"task": []any{
map[string]any{
"task_key": "task_key",
"job_cluster_key": "job_cluster_key",
"description": "missing task key 😱",
},
map[string]any{
"task_key": "task_key_a",
"job_cluster_key": "job_cluster_key_a",
},
map[string]any{
"task_key": "task_key_b",
"job_cluster_key": "job_cluster_key_b",
"library": []any{
map[string]any{
"pypi": map[string]any{
@ -113,6 +131,10 @@ func TestConvertJob(t *testing.T) {
},
},
},
map[string]any{
"task_key": "task_key_c",
"job_cluster_key": "job_cluster_key_c",
},
},
}, out.Job["my_job"])

9
go.mod
View File

@ -1,6 +1,8 @@
module github.com/databricks/cli
go 1.22
go 1.22.0
toolchain go1.22.7
require (
github.com/Masterminds/semver/v3 v3.3.0 // MIT
@ -10,7 +12,7 @@ require (
github.com/ghodss/yaml v1.0.0 // MIT + NOTICE
github.com/google/uuid v1.6.0 // BSD-3-Clause
github.com/hashicorp/go-version v1.7.0 // MPL 2.0
github.com/hashicorp/hc-install v0.7.0 // MPL 2.0
github.com/hashicorp/hc-install v0.9.0 // MPL 2.0
github.com/hashicorp/terraform-exec v0.21.0 // MPL 2.0
github.com/hashicorp/terraform-json v0.22.1 // MPL 2.0
github.com/manifoldco/promptui v0.9.0 // BSD-3-Clause
@ -22,7 +24,7 @@ require (
github.com/spf13/pflag v1.0.5 // BSD-3-Clause
github.com/stretchr/testify v1.9.0 // MIT
golang.org/x/exp v0.0.0-20240222234643-814bf88cf225
golang.org/x/mod v0.20.0
golang.org/x/mod v0.21.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
golang.org/x/term v0.24.0
@ -49,6 +51,7 @@ require (
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect

12
go.sum generated
View File

@ -99,10 +99,14 @@ github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw
github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU=
github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/hc-install v0.7.0 h1:Uu9edVqjKQxxuD28mR5TikkKDd/p55S8vzPC1659aBk=
github.com/hashicorp/hc-install v0.7.0/go.mod h1:ELmmzZlGnEcqoUMKUuykHaPCIR1sYLYX+KSggWSKZuA=
github.com/hashicorp/hc-install v0.9.0 h1:2dIk8LcvANwtv3QZLckxcjyF5w8KVtiMxu6G6eLhghE=
github.com/hashicorp/hc-install v0.9.0/go.mod h1:+6vOP+mf3tuGgMApVYtmsnDoKWMDcFXeTxCACYZ8SFg=
github.com/hashicorp/terraform-exec v0.21.0 h1:uNkLAe95ey5Uux6KJdua6+cv8asgILFVWkd/RG0D2XQ=
github.com/hashicorp/terraform-exec v0.21.0/go.mod h1:1PPeMYou+KDUSSeRE9szMZ/oHf4fYUmB923Wzbq1ICg=
github.com/hashicorp/terraform-json v0.22.1 h1:xft84GZR0QzjPVWs4lRUwvTcPnegqlyS7orfb5Ltvec=
@ -180,8 +184,8 @@ golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0=
golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=

View File

@ -209,7 +209,26 @@ func TestRepositoryGitConfigWhenNotARepo(t *testing.T) {
}
func TestRepositoryOriginUrlRemovesUserCreds(t *testing.T) {
tcases := []struct {
url string
expected string
}{
{
url: "https://username:token@github.com/databricks/foobar.git",
expected: "https://github.com/databricks/foobar.git",
},
{
// Note: The token is still considered and parsed as a username here.
// However credentials integrations by Git providers like GitHub
// allow for setting a PAT token as a username.
url: "https://token@github.com/databricks/foobar.git",
expected: "https://github.com/databricks/foobar.git",
},
}
for _, tc := range tcases {
repo := newTestRepository(t)
repo.addOriginUrl("https://username:token@github.com/databricks/foobar.git")
repo.assertOriginUrl("https://github.com/databricks/foobar.git")
repo.addOriginUrl(tc.url)
repo.assertOriginUrl(tc.expected)
}
}

View File

@ -121,7 +121,7 @@ You can find that job by opening your workpace and clicking on **Workflows**.
You can also deploy to your production target directly from the command-line.
The warehouse, catalog, and schema for that target are configured in databricks.yml.
When deploying to this target, note that the default job at resources/{{.project_name}}_job.yml
When deploying to this target, note that the default job at resources/{{.project_name}}.job.yml
has a schedule set that runs every day. The schedule is paused when deploying in development mode
(see https://docs.databricks.com/dev-tools/bundles/deployment-modes.html).

View File

@ -18,7 +18,7 @@ This file only template directives; it is skipped for the actual output.
{{if $notDLT}}
{{skip "{{.project_name}}/src/dlt_pipeline.ipynb"}}
{{skip "{{.project_name}}/resources/{{.project_name}}_pipeline.yml"}}
{{skip "{{.project_name}}/resources/{{.project_name}}.pipeline.yml"}}
{{end}}
{{if $notNotebook}}
@ -26,7 +26,7 @@ This file only template directives; it is skipped for the actual output.
{{end}}
{{if (and $notDLT $notNotebook $notPython)}}
{{skip "{{.project_name}}/resources/{{.project_name}}_job.yml"}}
{{skip "{{.project_name}}/resources/{{.project_name}}.job.yml"}}
{{else}}
{{skip "{{.project_name}}/resources/.gitkeep"}}
{{end}}

View File

@ -29,7 +29,7 @@ The '{{.project_name}}' project was generated by using the default-python templa
```
Note that the default job from the template has a schedule that runs every day
(defined in resources/{{.project_name}}_job.yml). The schedule
(defined in resources/{{.project_name}}.job.yml). The schedule
is paused when deploying in development mode (see
https://docs.databricks.com/dev-tools/bundles/deployment-modes.html).

View File

@ -40,7 +40,7 @@ resources:
- task_key: notebook_task
{{- end}}
pipeline_task:
{{- /* TODO: we should find a way that doesn't use magics for the below, like ./{{project_name}}_pipeline.yml */}}
{{- /* TODO: we should find a way that doesn't use magics for the below, like ./{{project_name}}.pipeline.yml */}}
pipeline_id: ${resources.pipelines.{{.project_name}}_pipeline.id}
{{end -}}
{{- if (eq .include_python "yes") }}

View File

@ -14,7 +14,7 @@
"source": [
"# DLT pipeline\n",
"\n",
"This Delta Live Tables (DLT) definition is executed using a pipeline defined in resources/{{.project_name}}_pipeline.yml."
"This Delta Live Tables (DLT) definition is executed using a pipeline defined in resources/{{.project_name}}.pipeline.yml."
]
},
{

View File

@ -14,7 +14,7 @@
"source": [
"# Default notebook\n",
"\n",
"This default notebook is executed using Databricks Workflows as defined in resources/{{.project_name}}_job.yml."
"This default notebook is executed using Databricks Workflows as defined in resources/{{.project_name}}.job.yml."
]
},
{

View File

@ -1,4 +1,4 @@
-- This query is executed using Databricks Workflows (see resources/{{.project_name}}_sql_job.yml)
-- This query is executed using Databricks Workflows (see resources/{{.project_name}}_sql.job.yml)
USE CATALOG {{"{{"}}catalog{{"}}"}};
USE IDENTIFIER({{"{{"}}schema{{"}}"}});

View File

@ -1,4 +1,4 @@
-- This query is executed using Databricks Workflows (see resources/{{.project_name}}_sql_job.yml)
-- This query is executed using Databricks Workflows (see resources/{{.project_name}}_sql.job.yml)
--
-- The streaming table below ingests all JSON files in /databricks-datasets/retail-org/sales_orders/
-- See also https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-create-streaming-table.html