mirror of https://github.com/databricks/cli.git
Refactor jobs path translation (#1782)
## Changes Extract package for other modules to transform different kinds of paths in job resources. ## Tests Unit tests
This commit is contained in:
parent
0cc35ca056
commit
490259a14a
|
@ -0,0 +1,115 @@
|
||||||
|
package paths
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/databricks/cli/bundle/libraries"
|
||||||
|
"github.com/databricks/cli/libs/dyn"
|
||||||
|
)
|
||||||
|
|
||||||
|
type jobRewritePattern struct {
|
||||||
|
pattern dyn.Pattern
|
||||||
|
kind PathKind
|
||||||
|
skipRewrite func(string) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func noSkipRewrite(string) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func jobTaskRewritePatterns(base dyn.Pattern) []jobRewritePattern {
|
||||||
|
return []jobRewritePattern{
|
||||||
|
{
|
||||||
|
base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")),
|
||||||
|
PathKindNotebook,
|
||||||
|
noSkipRewrite,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")),
|
||||||
|
PathKindWorkspaceFile,
|
||||||
|
noSkipRewrite,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")),
|
||||||
|
PathKindDirectory,
|
||||||
|
noSkipRewrite,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")),
|
||||||
|
PathKindWorkspaceFile,
|
||||||
|
noSkipRewrite,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")),
|
||||||
|
PathKindLibrary,
|
||||||
|
noSkipRewrite,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")),
|
||||||
|
PathKindLibrary,
|
||||||
|
noSkipRewrite,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("requirements")),
|
||||||
|
PathKindWorkspaceFile,
|
||||||
|
noSkipRewrite,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func jobRewritePatterns() []jobRewritePattern {
|
||||||
|
// Base pattern to match all tasks in all jobs.
|
||||||
|
base := dyn.NewPattern(
|
||||||
|
dyn.Key("resources"),
|
||||||
|
dyn.Key("jobs"),
|
||||||
|
dyn.AnyKey(),
|
||||||
|
dyn.Key("tasks"),
|
||||||
|
dyn.AnyIndex(),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Compile list of patterns and their respective rewrite functions.
|
||||||
|
jobEnvironmentsPatterns := []jobRewritePattern{
|
||||||
|
{
|
||||||
|
dyn.NewPattern(
|
||||||
|
dyn.Key("resources"),
|
||||||
|
dyn.Key("jobs"),
|
||||||
|
dyn.AnyKey(),
|
||||||
|
dyn.Key("environments"),
|
||||||
|
dyn.AnyIndex(),
|
||||||
|
dyn.Key("spec"),
|
||||||
|
dyn.Key("dependencies"),
|
||||||
|
dyn.AnyIndex(),
|
||||||
|
),
|
||||||
|
PathKindWithPrefix,
|
||||||
|
func(s string) bool {
|
||||||
|
return !libraries.IsLibraryLocal(s)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
taskPatterns := jobTaskRewritePatterns(base)
|
||||||
|
forEachPatterns := jobTaskRewritePatterns(base.Append(dyn.Key("for_each_task"), dyn.Key("task")))
|
||||||
|
allPatterns := append(taskPatterns, jobEnvironmentsPatterns...)
|
||||||
|
allPatterns = append(allPatterns, forEachPatterns...)
|
||||||
|
return allPatterns
|
||||||
|
}
|
||||||
|
|
||||||
|
// VisitJobPaths visits all paths in job resources and applies a function to each path.
|
||||||
|
func VisitJobPaths(value dyn.Value, fn VisitFunc) (dyn.Value, error) {
|
||||||
|
var err error
|
||||||
|
var newValue = value
|
||||||
|
|
||||||
|
for _, rewritePattern := range jobRewritePatterns() {
|
||||||
|
newValue, err = dyn.MapByPattern(newValue, rewritePattern.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
||||||
|
if rewritePattern.skipRewrite(v.MustString()) {
|
||||||
|
return v, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fn(p, rewritePattern.kind, v)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return dyn.InvalidValue, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return newValue, nil
|
||||||
|
}
|
|
@ -0,0 +1,168 @@
|
||||||
|
package paths
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/bundle/config"
|
||||||
|
"github.com/databricks/cli/bundle/config/resources"
|
||||||
|
"github.com/databricks/cli/libs/dyn"
|
||||||
|
assert "github.com/databricks/cli/libs/dyn/dynassert"
|
||||||
|
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||||
|
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestVisitJobPaths(t *testing.T) {
|
||||||
|
task0 := jobs.Task{
|
||||||
|
NotebookTask: &jobs.NotebookTask{
|
||||||
|
NotebookPath: "abc",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
task1 := jobs.Task{
|
||||||
|
SparkPythonTask: &jobs.SparkPythonTask{
|
||||||
|
PythonFile: "abc",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
task2 := jobs.Task{
|
||||||
|
DbtTask: &jobs.DbtTask{
|
||||||
|
ProjectDirectory: "abc",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
task3 := jobs.Task{
|
||||||
|
SqlTask: &jobs.SqlTask{
|
||||||
|
File: &jobs.SqlTaskFile{
|
||||||
|
Path: "abc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
task4 := jobs.Task{
|
||||||
|
Libraries: []compute.Library{
|
||||||
|
{Whl: "dist/foo.whl"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
task5 := jobs.Task{
|
||||||
|
Libraries: []compute.Library{
|
||||||
|
{Jar: "dist/foo.jar"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
task6 := jobs.Task{
|
||||||
|
Libraries: []compute.Library{
|
||||||
|
{Requirements: "requirements.txt"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
job0 := &resources.Job{
|
||||||
|
JobSettings: &jobs.JobSettings{
|
||||||
|
Tasks: []jobs.Task{
|
||||||
|
task0,
|
||||||
|
task1,
|
||||||
|
task2,
|
||||||
|
task3,
|
||||||
|
task4,
|
||||||
|
task5,
|
||||||
|
task6,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
root := config.Root{
|
||||||
|
Resources: config.Resources{
|
||||||
|
Jobs: map[string]*resources.Job{
|
||||||
|
"job0": job0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
actual := visitJobPaths(t, root)
|
||||||
|
expected := []dyn.Path{
|
||||||
|
dyn.MustPathFromString("resources.jobs.job0.tasks[0].notebook_task.notebook_path"),
|
||||||
|
dyn.MustPathFromString("resources.jobs.job0.tasks[1].spark_python_task.python_file"),
|
||||||
|
dyn.MustPathFromString("resources.jobs.job0.tasks[2].dbt_task.project_directory"),
|
||||||
|
dyn.MustPathFromString("resources.jobs.job0.tasks[3].sql_task.file.path"),
|
||||||
|
dyn.MustPathFromString("resources.jobs.job0.tasks[4].libraries[0].whl"),
|
||||||
|
dyn.MustPathFromString("resources.jobs.job0.tasks[5].libraries[0].jar"),
|
||||||
|
dyn.MustPathFromString("resources.jobs.job0.tasks[6].libraries[0].requirements"),
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.ElementsMatch(t, expected, actual)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVisitJobPaths_environments(t *testing.T) {
|
||||||
|
environment0 := jobs.JobEnvironment{
|
||||||
|
Spec: &compute.Environment{
|
||||||
|
Dependencies: []string{
|
||||||
|
"dist_0/*.whl",
|
||||||
|
"dist_1/*.whl",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
job0 := &resources.Job{
|
||||||
|
JobSettings: &jobs.JobSettings{
|
||||||
|
Environments: []jobs.JobEnvironment{
|
||||||
|
environment0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
root := config.Root{
|
||||||
|
Resources: config.Resources{
|
||||||
|
Jobs: map[string]*resources.Job{
|
||||||
|
"job0": job0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
actual := visitJobPaths(t, root)
|
||||||
|
expected := []dyn.Path{
|
||||||
|
dyn.MustPathFromString("resources.jobs.job0.environments[0].spec.dependencies[0]"),
|
||||||
|
dyn.MustPathFromString("resources.jobs.job0.environments[0].spec.dependencies[1]"),
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.ElementsMatch(t, expected, actual)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestVisitJobPaths_foreach(t *testing.T) {
|
||||||
|
task0 := jobs.Task{
|
||||||
|
ForEachTask: &jobs.ForEachTask{
|
||||||
|
Task: jobs.Task{
|
||||||
|
NotebookTask: &jobs.NotebookTask{
|
||||||
|
NotebookPath: "abc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
job0 := &resources.Job{
|
||||||
|
JobSettings: &jobs.JobSettings{
|
||||||
|
Tasks: []jobs.Task{
|
||||||
|
task0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
root := config.Root{
|
||||||
|
Resources: config.Resources{
|
||||||
|
Jobs: map[string]*resources.Job{
|
||||||
|
"job0": job0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
actual := visitJobPaths(t, root)
|
||||||
|
expected := []dyn.Path{
|
||||||
|
dyn.MustPathFromString("resources.jobs.job0.tasks[0].for_each_task.task.notebook_task.notebook_path"),
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.ElementsMatch(t, expected, actual)
|
||||||
|
}
|
||||||
|
|
||||||
|
func visitJobPaths(t *testing.T, root config.Root) []dyn.Path {
|
||||||
|
var actual []dyn.Path
|
||||||
|
err := root.Mutate(func(value dyn.Value) (dyn.Value, error) {
|
||||||
|
return VisitJobPaths(value, func(p dyn.Path, kind PathKind, v dyn.Value) (dyn.Value, error) {
|
||||||
|
actual = append(actual, p)
|
||||||
|
return v, nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return actual
|
||||||
|
}
|
|
@ -0,0 +1,26 @@
|
||||||
|
package paths
|
||||||
|
|
||||||
|
import "github.com/databricks/cli/libs/dyn"
|
||||||
|
|
||||||
|
type PathKind int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// PathKindLibrary is a path to a library file
|
||||||
|
PathKindLibrary = iota
|
||||||
|
|
||||||
|
// PathKindNotebook is a path to a notebook file
|
||||||
|
PathKindNotebook
|
||||||
|
|
||||||
|
// PathKindWorkspaceFile is a path to a regular workspace file,
|
||||||
|
// notebooks are not allowed because they are uploaded a special
|
||||||
|
// kind of workspace object.
|
||||||
|
PathKindWorkspaceFile
|
||||||
|
|
||||||
|
// PathKindWithPrefix is a path that starts with './'
|
||||||
|
PathKindWithPrefix
|
||||||
|
|
||||||
|
// PathKindDirectory is a path to directory
|
||||||
|
PathKindDirectory
|
||||||
|
)
|
||||||
|
|
||||||
|
type VisitFunc func(path dyn.Path, kind PathKind, value dyn.Value) (dyn.Value, error)
|
|
@ -4,97 +4,11 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
|
|
||||||
"github.com/databricks/cli/bundle/libraries"
|
"github.com/databricks/cli/bundle/config/mutator/paths"
|
||||||
|
|
||||||
"github.com/databricks/cli/libs/dyn"
|
"github.com/databricks/cli/libs/dyn"
|
||||||
)
|
)
|
||||||
|
|
||||||
type jobRewritePattern struct {
|
|
||||||
pattern dyn.Pattern
|
|
||||||
fn rewriteFunc
|
|
||||||
skipRewrite func(string) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func noSkipRewrite(string) bool {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func rewritePatterns(t *translateContext, base dyn.Pattern) []jobRewritePattern {
|
|
||||||
return []jobRewritePattern{
|
|
||||||
{
|
|
||||||
base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")),
|
|
||||||
t.translateNotebookPath,
|
|
||||||
noSkipRewrite,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")),
|
|
||||||
t.translateFilePath,
|
|
||||||
noSkipRewrite,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")),
|
|
||||||
t.translateDirectoryPath,
|
|
||||||
noSkipRewrite,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")),
|
|
||||||
t.translateFilePath,
|
|
||||||
noSkipRewrite,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")),
|
|
||||||
t.translateNoOp,
|
|
||||||
noSkipRewrite,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")),
|
|
||||||
t.translateNoOp,
|
|
||||||
noSkipRewrite,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("requirements")),
|
|
||||||
t.translateFilePath,
|
|
||||||
noSkipRewrite,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *translateContext) jobRewritePatterns() []jobRewritePattern {
|
|
||||||
// Base pattern to match all tasks in all jobs.
|
|
||||||
base := dyn.NewPattern(
|
|
||||||
dyn.Key("resources"),
|
|
||||||
dyn.Key("jobs"),
|
|
||||||
dyn.AnyKey(),
|
|
||||||
dyn.Key("tasks"),
|
|
||||||
dyn.AnyIndex(),
|
|
||||||
)
|
|
||||||
|
|
||||||
// Compile list of patterns and their respective rewrite functions.
|
|
||||||
jobEnvironmentsPatterns := []jobRewritePattern{
|
|
||||||
{
|
|
||||||
dyn.NewPattern(
|
|
||||||
dyn.Key("resources"),
|
|
||||||
dyn.Key("jobs"),
|
|
||||||
dyn.AnyKey(),
|
|
||||||
dyn.Key("environments"),
|
|
||||||
dyn.AnyIndex(),
|
|
||||||
dyn.Key("spec"),
|
|
||||||
dyn.Key("dependencies"),
|
|
||||||
dyn.AnyIndex(),
|
|
||||||
),
|
|
||||||
t.translateNoOpWithPrefix,
|
|
||||||
func(s string) bool {
|
|
||||||
return !libraries.IsLibraryLocal(s)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
taskPatterns := rewritePatterns(t, base)
|
|
||||||
forEachPatterns := rewritePatterns(t, base.Append(dyn.Key("for_each_task"), dyn.Key("task")))
|
|
||||||
allPatterns := append(taskPatterns, jobEnvironmentsPatterns...)
|
|
||||||
allPatterns = append(allPatterns, forEachPatterns...)
|
|
||||||
return allPatterns
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *translateContext) applyJobTranslations(v dyn.Value) (dyn.Value, error) {
|
func (t *translateContext) applyJobTranslations(v dyn.Value) (dyn.Value, error) {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -111,30 +25,41 @@ func (t *translateContext) applyJobTranslations(v dyn.Value) (dyn.Value, error)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, rewritePattern := range t.jobRewritePatterns() {
|
return paths.VisitJobPaths(v, func(p dyn.Path, kind paths.PathKind, v dyn.Value) (dyn.Value, error) {
|
||||||
v, err = dyn.MapByPattern(v, rewritePattern.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
key := p[2].Key()
|
||||||
key := p[2].Key()
|
|
||||||
|
|
||||||
// Skip path translation if the job is using git source.
|
// Skip path translation if the job is using git source.
|
||||||
if slices.Contains(ignore, key) {
|
if slices.Contains(ignore, key) {
|
||||||
return v, nil
|
return v, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
dir, err := v.Location().Directory()
|
dir, err := v.Location().Directory()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err)
|
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
sv := v.MustString()
|
rewritePatternFn, err := t.getRewritePatternFn(kind)
|
||||||
if rewritePattern.skipRewrite(sv) {
|
|
||||||
return v, nil
|
|
||||||
}
|
|
||||||
return t.rewriteRelativeTo(p, v, rewritePattern.fn, dir, fallback[key])
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return dyn.InvalidValue, err
|
return dyn.InvalidValue, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return t.rewriteRelativeTo(p, v, rewritePatternFn, dir, fallback[key])
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *translateContext) getRewritePatternFn(kind paths.PathKind) (rewriteFunc, error) {
|
||||||
|
switch kind {
|
||||||
|
case paths.PathKindLibrary:
|
||||||
|
return t.translateNoOp, nil
|
||||||
|
case paths.PathKindNotebook:
|
||||||
|
return t.translateNotebookPath, nil
|
||||||
|
case paths.PathKindWorkspaceFile:
|
||||||
|
return t.translateFilePath, nil
|
||||||
|
case paths.PathKindDirectory:
|
||||||
|
return t.translateDirectoryPath, nil
|
||||||
|
case paths.PathKindWithPrefix:
|
||||||
|
return t.translateNoOpWithPrefix, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return v, nil
|
return nil, fmt.Errorf("unsupported path kind: %d", kind)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue