mirror of https://github.com/databricks/cli.git
Rewrite relative paths using `dyn.Location` of the underlying value (#1273)
## Changes This change addresses the path resolution behavior in resource definitions. Previously, all paths were resolved relative to where the resource was first defined, which could lead to confusion and errors when paths were specified in different directories. The new behavior is to resolve paths relative to where they are defined, making it more intuitive. However, to avoid breaking existing configurations, compatibility with the old behavior is maintained. ## Tests * Existing unit tests for path translation pass. * Additional test to cover both the nominal and the fallback behavior.
This commit is contained in:
parent
d216404f27
commit
7c4b34945c
|
@ -11,6 +11,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/notebook"
|
||||
)
|
||||
|
||||
|
@ -150,55 +151,55 @@ func translateNoOp(literal, localFullPath, localRelPath, remotePath string) (str
|
|||
return localRelPath, nil
|
||||
}
|
||||
|
||||
type transformer struct {
|
||||
// A directory path relative to which `path` will be transformed
|
||||
dir string
|
||||
// A path to transform
|
||||
path *string
|
||||
// Name of the config property where the path string is coming from
|
||||
configPath string
|
||||
// A function that performs the actual rewriting logic.
|
||||
fn rewriteFunc
|
||||
}
|
||||
|
||||
type transformFunc func(resource any, dir string) *transformer
|
||||
|
||||
// Apply all matches transformers for the given resource
|
||||
func (m *translatePaths) applyTransformers(funcs []transformFunc, b *bundle.Bundle, resource any, dir string) error {
|
||||
for _, transformFn := range funcs {
|
||||
transformer := transformFn(resource, dir)
|
||||
if transformer == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
err := m.rewritePath(transformer.dir, b, transformer.path, transformer.fn)
|
||||
func (m *translatePaths) rewriteValue(b *bundle.Bundle, p dyn.Path, v dyn.Value, fn rewriteFunc, dir string) (dyn.Value, error) {
|
||||
out := v.MustString()
|
||||
err := m.rewritePath(dir, b, &out, fn)
|
||||
if err != nil {
|
||||
if target := (&ErrIsNotebook{}); errors.As(err, target) {
|
||||
return fmt.Errorf(`expected a file for "%s" but got a notebook: %w`, transformer.configPath, target)
|
||||
return dyn.InvalidValue, fmt.Errorf(`expected a file for "%s" but got a notebook: %w`, p, target)
|
||||
}
|
||||
if target := (&ErrIsNotNotebook{}); errors.As(err, target) {
|
||||
return fmt.Errorf(`expected a notebook for "%s" but got a file: %w`, transformer.configPath, target)
|
||||
return dyn.InvalidValue, fmt.Errorf(`expected a notebook for "%s" but got a file: %w`, p, target)
|
||||
}
|
||||
return err
|
||||
return dyn.InvalidValue, err
|
||||
}
|
||||
|
||||
return dyn.NewValue(out, v.Location()), nil
|
||||
}
|
||||
|
||||
func (m *translatePaths) rewriteRelativeTo(b *bundle.Bundle, p dyn.Path, v dyn.Value, fn rewriteFunc, dir, fallback string) (dyn.Value, error) {
|
||||
nv, err := m.rewriteValue(b, p, v, fn, dir)
|
||||
if err == nil {
|
||||
return nv, nil
|
||||
}
|
||||
|
||||
// If we failed to rewrite the path, try to rewrite it relative to the fallback directory.
|
||||
if fallback != "" {
|
||||
nv, nerr := m.rewriteValue(b, p, v, fn, fallback)
|
||||
if nerr == nil {
|
||||
// TODO: Emit a warning that this path should be rewritten.
|
||||
return nv, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return dyn.InvalidValue, err
|
||||
}
|
||||
|
||||
func (m *translatePaths) Apply(_ context.Context, b *bundle.Bundle) error {
|
||||
m.seen = make(map[string]string)
|
||||
|
||||
for _, fn := range []func(*translatePaths, *bundle.Bundle) error{
|
||||
applyJobTransformers,
|
||||
applyPipelineTransformers,
|
||||
applyArtifactTransformers,
|
||||
return b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
|
||||
var err error
|
||||
for _, fn := range []func(*bundle.Bundle, dyn.Value) (dyn.Value, error){
|
||||
m.applyJobTranslations,
|
||||
m.applyPipelineTranslations,
|
||||
m.applyArtifactTranslations,
|
||||
} {
|
||||
err := fn(m, b)
|
||||
v, err = fn(b, v)
|
||||
if err != nil {
|
||||
return err
|
||||
return dyn.InvalidValue, err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return v, nil
|
||||
})
|
||||
}
|
||||
|
|
|
@ -4,39 +4,40 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
)
|
||||
|
||||
func transformArtifactPath(resource any, dir string) *transformer {
|
||||
artifact, ok := resource.(*config.Artifact)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
func (m *translatePaths) applyArtifactTranslations(b *bundle.Bundle, v dyn.Value) (dyn.Value, error) {
|
||||
var err error
|
||||
|
||||
return &transformer{
|
||||
dir,
|
||||
&artifact.Path,
|
||||
"artifacts.path",
|
||||
// Base pattern to match all artifacts.
|
||||
base := dyn.NewPattern(
|
||||
dyn.Key("artifacts"),
|
||||
dyn.AnyKey(),
|
||||
)
|
||||
|
||||
for _, t := range []struct {
|
||||
pattern dyn.Pattern
|
||||
fn rewriteFunc
|
||||
}{
|
||||
{
|
||||
base.Append(dyn.Key("path")),
|
||||
translateNoOp,
|
||||
}
|
||||
}
|
||||
|
||||
func applyArtifactTransformers(m *translatePaths, b *bundle.Bundle) error {
|
||||
artifactTransformers := []transformFunc{
|
||||
transformArtifactPath,
|
||||
}
|
||||
|
||||
for key, artifact := range b.Config.Artifacts {
|
||||
dir, err := artifact.ConfigFileDirectory()
|
||||
},
|
||||
} {
|
||||
v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
||||
key := p[1].Key()
|
||||
dir, err := v.Location().Directory()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to determine directory for artifact %s: %w", key, err)
|
||||
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for artifact %s: %w", key, err)
|
||||
}
|
||||
|
||||
err = m.applyTransformers(artifactTransformers, b, artifact, dir)
|
||||
return m.rewriteRelativeTo(b, p, v, t.fn, dir, "")
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
return dyn.InvalidValue, err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return v, nil
|
||||
}
|
||||
|
|
|
@ -2,132 +2,101 @@ package mutator
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
)
|
||||
|
||||
func transformNotebookTask(resource any, dir string) *transformer {
|
||||
task, ok := resource.(*jobs.Task)
|
||||
if !ok || task.NotebookTask == nil {
|
||||
return nil
|
||||
type jobTaskRewritePattern struct {
|
||||
pattern dyn.Pattern
|
||||
fn rewriteFunc
|
||||
}
|
||||
|
||||
return &transformer{
|
||||
dir,
|
||||
&task.NotebookTask.NotebookPath,
|
||||
"tasks.notebook_task.notebook_path",
|
||||
func rewritePatterns(base dyn.Pattern) []jobTaskRewritePattern {
|
||||
return []jobTaskRewritePattern{
|
||||
{
|
||||
base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")),
|
||||
translateNotebookPath,
|
||||
}
|
||||
}
|
||||
|
||||
func transformSparkTask(resource any, dir string) *transformer {
|
||||
task, ok := resource.(*jobs.Task)
|
||||
if !ok || task.SparkPythonTask == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &transformer{
|
||||
dir,
|
||||
&task.SparkPythonTask.PythonFile,
|
||||
"tasks.spark_python_task.python_file",
|
||||
},
|
||||
{
|
||||
base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")),
|
||||
translateFilePath,
|
||||
}
|
||||
}
|
||||
|
||||
func transformWhlLibrary(resource any, dir string) *transformer {
|
||||
library, ok := resource.(*compute.Library)
|
||||
if !ok || library.Whl == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &transformer{
|
||||
dir,
|
||||
&library.Whl,
|
||||
"libraries.whl",
|
||||
translateNoOp, // Does not convert to remote path but makes sure that nested paths resolved correctly
|
||||
}
|
||||
}
|
||||
|
||||
func transformDbtTask(resource any, dir string) *transformer {
|
||||
task, ok := resource.(*jobs.Task)
|
||||
if !ok || task.DbtTask == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &transformer{
|
||||
dir,
|
||||
&task.DbtTask.ProjectDirectory,
|
||||
"tasks.dbt_task.project_directory",
|
||||
},
|
||||
{
|
||||
base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")),
|
||||
translateDirectoryPath,
|
||||
}
|
||||
}
|
||||
|
||||
func transformSqlFileTask(resource any, dir string) *transformer {
|
||||
task, ok := resource.(*jobs.Task)
|
||||
if !ok || task.SqlTask == nil || task.SqlTask.File == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &transformer{
|
||||
dir,
|
||||
&task.SqlTask.File.Path,
|
||||
"tasks.sql_task.file.path",
|
||||
},
|
||||
{
|
||||
base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")),
|
||||
translateFilePath,
|
||||
},
|
||||
{
|
||||
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")),
|
||||
translateNoOp,
|
||||
},
|
||||
{
|
||||
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")),
|
||||
translateNoOp,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func transformJarLibrary(resource any, dir string) *transformer {
|
||||
library, ok := resource.(*compute.Library)
|
||||
if !ok || library.Jar == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &transformer{
|
||||
dir,
|
||||
&library.Jar,
|
||||
"libraries.jar",
|
||||
translateNoOp, // Does not convert to remote path but makes sure that nested paths resolved correctly
|
||||
}
|
||||
}
|
||||
|
||||
func applyJobTransformers(m *translatePaths, b *bundle.Bundle) error {
|
||||
jobTransformers := []transformFunc{
|
||||
transformNotebookTask,
|
||||
transformSparkTask,
|
||||
transformWhlLibrary,
|
||||
transformJarLibrary,
|
||||
transformDbtTask,
|
||||
transformSqlFileTask,
|
||||
}
|
||||
func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dyn.Value, error) {
|
||||
var fallback = make(map[string]string)
|
||||
var ignore []string
|
||||
var err error
|
||||
|
||||
for key, job := range b.Config.Resources.Jobs {
|
||||
dir, err := job.ConfigFileDirectory()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to determine directory for job %s: %w", key, err)
|
||||
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err)
|
||||
}
|
||||
|
||||
// If we cannot resolve the relative path using the [dyn.Value] location itself,
|
||||
// use the job's location as fallback. This is necessary for backwards compatibility.
|
||||
fallback[key] = dir
|
||||
|
||||
// Do not translate job task paths if using git source
|
||||
if job.GitSource != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
for i := 0; i < len(job.Tasks); i++ {
|
||||
task := &job.Tasks[i]
|
||||
err := m.applyTransformers(jobTransformers, b, task, dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for j := 0; j < len(task.Libraries); j++ {
|
||||
library := &task.Libraries[j]
|
||||
err := m.applyTransformers(jobTransformers, b, library, dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
ignore = append(ignore, key)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
// Base pattern to match all tasks in all jobs.
|
||||
base := dyn.NewPattern(
|
||||
dyn.Key("resources"),
|
||||
dyn.Key("jobs"),
|
||||
dyn.AnyKey(),
|
||||
dyn.Key("tasks"),
|
||||
dyn.AnyIndex(),
|
||||
)
|
||||
|
||||
// Compile list of patterns and their respective rewrite functions.
|
||||
taskPatterns := rewritePatterns(base)
|
||||
forEachPatterns := rewritePatterns(base.Append(dyn.Key("for_each_task"), dyn.Key("task")))
|
||||
allPatterns := append(taskPatterns, forEachPatterns...)
|
||||
|
||||
for _, t := range allPatterns {
|
||||
v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
||||
key := p[2].Key()
|
||||
|
||||
// Skip path translation if the job is using git source.
|
||||
if slices.Contains(ignore, key) {
|
||||
return v, nil
|
||||
}
|
||||
|
||||
dir, err := v.Location().Directory()
|
||||
if err != nil {
|
||||
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err)
|
||||
}
|
||||
|
||||
return m.rewriteRelativeTo(b, p, v, t.fn, dir, fallback[key])
|
||||
})
|
||||
if err != nil {
|
||||
return dyn.InvalidValue, err
|
||||
}
|
||||
}
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
|
|
@ -4,57 +4,59 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
)
|
||||
|
||||
func transformLibraryNotebook(resource any, dir string) *transformer {
|
||||
library, ok := resource.(*pipelines.PipelineLibrary)
|
||||
if !ok || library.Notebook == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &transformer{
|
||||
dir,
|
||||
&library.Notebook.Path,
|
||||
"libraries.notebook.path",
|
||||
translateNotebookPath,
|
||||
}
|
||||
}
|
||||
|
||||
func transformLibraryFile(resource any, dir string) *transformer {
|
||||
library, ok := resource.(*pipelines.PipelineLibrary)
|
||||
if !ok || library.File == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &transformer{
|
||||
dir,
|
||||
&library.File.Path,
|
||||
"libraries.file.path",
|
||||
translateFilePath,
|
||||
}
|
||||
}
|
||||
|
||||
func applyPipelineTransformers(m *translatePaths, b *bundle.Bundle) error {
|
||||
pipelineTransformers := []transformFunc{
|
||||
transformLibraryNotebook,
|
||||
transformLibraryFile,
|
||||
}
|
||||
func (m *translatePaths) applyPipelineTranslations(b *bundle.Bundle, v dyn.Value) (dyn.Value, error) {
|
||||
var fallback = make(map[string]string)
|
||||
var err error
|
||||
|
||||
for key, pipeline := range b.Config.Resources.Pipelines {
|
||||
dir, err := pipeline.ConfigFileDirectory()
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to determine directory for pipeline %s: %w", key, err)
|
||||
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for pipeline %s: %w", key, err)
|
||||
}
|
||||
|
||||
for i := 0; i < len(pipeline.Libraries); i++ {
|
||||
library := &pipeline.Libraries[i]
|
||||
err := m.applyTransformers(pipelineTransformers, b, library, dir)
|
||||
// If we cannot resolve the relative path using the [dyn.Value] location itself,
|
||||
// use the pipeline's location as fallback. This is necessary for backwards compatibility.
|
||||
fallback[key] = dir
|
||||
}
|
||||
|
||||
// Base pattern to match all libraries in all pipelines.
|
||||
base := dyn.NewPattern(
|
||||
dyn.Key("resources"),
|
||||
dyn.Key("pipelines"),
|
||||
dyn.AnyKey(),
|
||||
dyn.Key("libraries"),
|
||||
dyn.AnyIndex(),
|
||||
)
|
||||
|
||||
for _, t := range []struct {
|
||||
pattern dyn.Pattern
|
||||
fn rewriteFunc
|
||||
}{
|
||||
{
|
||||
base.Append(dyn.Key("notebook"), dyn.Key("path")),
|
||||
translateNotebookPath,
|
||||
},
|
||||
{
|
||||
base.Append(dyn.Key("file"), dyn.Key("path")),
|
||||
translateFilePath,
|
||||
},
|
||||
} {
|
||||
v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
||||
key := p[2].Key()
|
||||
dir, err := v.Location().Directory()
|
||||
if err != nil {
|
||||
return err
|
||||
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for pipeline %s: %w", key, err)
|
||||
}
|
||||
|
||||
return m.rewriteRelativeTo(b, p, v, t.fn, dir, fallback[key])
|
||||
})
|
||||
if err != nil {
|
||||
return dyn.InvalidValue, err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return v, nil
|
||||
}
|
||||
|
|
|
@ -547,7 +547,7 @@ func TestJobSparkPythonTaskWithNotebookSourceError(t *testing.T) {
|
|||
bundletest.SetLocation(b, ".", filepath.Join(dir, "resource.yml"))
|
||||
|
||||
err := bundle.Apply(context.Background(), b, mutator.TranslatePaths())
|
||||
assert.ErrorContains(t, err, `expected a file for "tasks.spark_python_task.python_file" but got a notebook`)
|
||||
assert.ErrorContains(t, err, `expected a file for "resources.jobs.job.tasks[0].spark_python_task.python_file" but got a notebook`)
|
||||
}
|
||||
|
||||
func TestJobNotebookTaskWithFileSourceError(t *testing.T) {
|
||||
|
@ -581,7 +581,7 @@ func TestJobNotebookTaskWithFileSourceError(t *testing.T) {
|
|||
bundletest.SetLocation(b, ".", filepath.Join(dir, "resource.yml"))
|
||||
|
||||
err := bundle.Apply(context.Background(), b, mutator.TranslatePaths())
|
||||
assert.ErrorContains(t, err, `expected a notebook for "tasks.notebook_task.notebook_path" but got a file`)
|
||||
assert.ErrorContains(t, err, `expected a notebook for "resources.jobs.job.tasks[0].notebook_task.notebook_path" but got a file`)
|
||||
}
|
||||
|
||||
func TestPipelineNotebookLibraryWithFileSourceError(t *testing.T) {
|
||||
|
@ -615,7 +615,7 @@ func TestPipelineNotebookLibraryWithFileSourceError(t *testing.T) {
|
|||
bundletest.SetLocation(b, ".", filepath.Join(dir, "resource.yml"))
|
||||
|
||||
err := bundle.Apply(context.Background(), b, mutator.TranslatePaths())
|
||||
assert.ErrorContains(t, err, `expected a notebook for "libraries.notebook.path" but got a file`)
|
||||
assert.ErrorContains(t, err, `expected a notebook for "resources.pipelines.pipeline.libraries[0].notebook.path" but got a file`)
|
||||
}
|
||||
|
||||
func TestPipelineFileLibraryWithNotebookSourceError(t *testing.T) {
|
||||
|
@ -649,5 +649,5 @@ func TestPipelineFileLibraryWithNotebookSourceError(t *testing.T) {
|
|||
bundletest.SetLocation(b, ".", filepath.Join(dir, "resource.yml"))
|
||||
|
||||
err := bundle.Apply(context.Background(), b, mutator.TranslatePaths())
|
||||
assert.ErrorContains(t, err, `expected a file for "libraries.file.path" but got a notebook`)
|
||||
assert.ErrorContains(t, err, `expected a file for "resources.pipelines.pipeline.libraries[0].file.path" but got a notebook`)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
package bundle
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config/mutator"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func loadTarget(t *testing.T, path, env string) *bundle.Bundle {
|
||||
ctx := context.Background()
|
||||
b, err := bundle.Load(ctx, path)
|
||||
require.NoError(t, err)
|
||||
err = bundle.Apply(ctx, b, bundle.Seq(mutator.DefaultMutatorsForTarget(env)...))
|
||||
require.NoError(t, err)
|
||||
err = bundle.Apply(ctx, b, bundle.Seq(
|
||||
mutator.RewriteSyncPaths(),
|
||||
mutator.MergeJobClusters(),
|
||||
mutator.MergeJobTasks(),
|
||||
mutator.MergePipelineClusters(),
|
||||
))
|
||||
require.NoError(t, err)
|
||||
return b
|
||||
}
|
|
@ -8,5 +8,17 @@ resources:
|
|||
libraries:
|
||||
- notebook:
|
||||
path: ./dlt/*
|
||||
|
||||
targets:
|
||||
default:
|
||||
default: true
|
||||
|
||||
error:
|
||||
default: false
|
||||
|
||||
resources:
|
||||
pipelines:
|
||||
nyc_taxi_pipeline:
|
||||
libraries:
|
||||
- notebook:
|
||||
path: ./non-existent
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config/mutator"
|
||||
"github.com/databricks/cli/bundle/phases"
|
||||
"github.com/databricks/databricks-sdk-go/config"
|
||||
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
||||
|
@ -14,13 +13,8 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestExpandPipelineGlobPathsWithNonExistent(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
b, err := bundle.Load(ctx, "./pipeline_glob_paths")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = bundle.Apply(ctx, b, bundle.Seq(mutator.DefaultMutatorsForTarget("default")...))
|
||||
require.NoError(t, err)
|
||||
func TestExpandPipelineGlobPaths(t *testing.T) {
|
||||
b := loadTarget(t, "./pipeline_glob_paths", "default")
|
||||
|
||||
// Configure mock workspace client
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
|
@ -32,13 +26,30 @@ func TestExpandPipelineGlobPathsWithNonExistent(t *testing.T) {
|
|||
}, nil)
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
err = bundle.Apply(ctx, b, phases.Initialize())
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "notebook ./non-existent not found")
|
||||
|
||||
ctx := context.Background()
|
||||
err := bundle.Apply(ctx, b, phases.Initialize())
|
||||
require.NoError(t, err)
|
||||
require.Equal(
|
||||
t,
|
||||
b.Config.Resources.Pipelines["nyc_taxi_pipeline"].Libraries[0].Notebook.Path,
|
||||
"/Users/user@domain.com/.bundle/pipeline_glob_paths/default/files/dlt/nyc_taxi_loader",
|
||||
b.Config.Resources.Pipelines["nyc_taxi_pipeline"].Libraries[0].Notebook.Path,
|
||||
)
|
||||
}
|
||||
|
||||
func TestExpandPipelineGlobPathsWithNonExistent(t *testing.T) {
|
||||
b := loadTarget(t, "./pipeline_glob_paths", "error")
|
||||
|
||||
// Configure mock workspace client
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
m.WorkspaceClient.Config = &config.Config{
|
||||
Host: "https://mock.databricks.workspace.com",
|
||||
}
|
||||
m.GetMockCurrentUserAPI().EXPECT().Me(mock.Anything).Return(&iam.User{
|
||||
UserName: "user@domain.com",
|
||||
}, nil)
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
ctx := context.Background()
|
||||
err := bundle.Apply(ctx, b, phases.Initialize())
|
||||
require.ErrorContains(t, err, "notebook ./non-existent not found")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
# Test path translation (with fallback to previous behavior)
|
||||
|
||||
As of v0.214.0, all paths in a resource definition were resolved relative to the path
|
||||
where that resource was first defined. If those paths were specified in the same file,
|
||||
or in a different file in the same directory, this would be intuitive.
|
||||
|
||||
If those paths were specified in a different file in a different directory, they would
|
||||
still be resolved relative to the original file.
|
||||
|
||||
For example, a job defined in `./resources/my_job.yml` with an override
|
||||
in `./override.yml` would have to use paths relative to `./resources`.
|
||||
This is counter-intuitive and error-prone, and we changed this behavior
|
||||
in https://github.com/databricks/cli/pull/1273.
|
||||
|
||||
## Appendix
|
||||
|
||||
Q: Why did this behavior apply as of v0.214.0?
|
||||
|
||||
A: With the introduction of dynamic configuration loading, we keep track
|
||||
of the location (file, line, column) where a resource is defined.
|
||||
This location information is used to perform path translation, but upon
|
||||
introduction in v0.214.0, the code still used only a single path per resource.
|
||||
Due to the semantics of merging two `dyn.Value` objects, the location
|
||||
information of the first existing value is used for the merged value.
|
||||
This meant that all paths for a resource were resolved relative to the
|
||||
location where the resource was first defined.
|
||||
|
||||
Q: What was the behavior before v0.214.0?
|
||||
|
||||
A: Before we relied on dynamic configuration loading, all configuration was
|
||||
maintained in a typed struct. The path for a resource was an unexported field on the
|
||||
resource and was set right after loading the configuration file that contains it.
|
||||
Target overrides contained the same path field, and applying a target override
|
||||
would set the path for the resource to the path of the target override.
|
||||
This meant that all paths for a resource were resolved relative to the
|
||||
location where the resource was last defined.
|
||||
|
||||
Q: Why are we maintaining compatibility with the old behavior?
|
||||
|
||||
A: We want to avoid breaking existing configurations that depend on this behavior.
|
||||
Use of the old behavior should trigger warnings with a call to action to update.
|
||||
We can include a deprecation timeline to remove the old behavior in the future.
|
|
@ -0,0 +1,13 @@
|
|||
bundle:
|
||||
name: path_translation_fallback
|
||||
|
||||
include:
|
||||
- "resources/*.yml"
|
||||
- "override_*.yml"
|
||||
|
||||
targets:
|
||||
development:
|
||||
default: true
|
||||
|
||||
error:
|
||||
default: false
|
|
@ -0,0 +1,41 @@
|
|||
targets:
|
||||
development:
|
||||
resources:
|
||||
jobs:
|
||||
my_job:
|
||||
tasks:
|
||||
- task_key: notebook_example
|
||||
notebook_task:
|
||||
notebook_path: ../src/notebook.py
|
||||
|
||||
- task_key: spark_python_example
|
||||
spark_python_task:
|
||||
python_file: ../src/file.py
|
||||
|
||||
- task_key: dbt_example
|
||||
dbt_task:
|
||||
project_directory: ../src/dbt_project
|
||||
commands:
|
||||
- "dbt run"
|
||||
|
||||
- task_key: sql_example
|
||||
sql_task:
|
||||
file:
|
||||
path: ../src/sql.sql
|
||||
warehouse_id: cafef00d
|
||||
|
||||
- task_key: python_wheel_example
|
||||
python_wheel_task:
|
||||
package_name: my_package
|
||||
|
||||
# Append library; the path is resolved relative to the job's directory.
|
||||
libraries:
|
||||
- whl: ../dist/wheel2.whl
|
||||
|
||||
- task_key: spark_jar_example
|
||||
spark_jar_task:
|
||||
main_class_name: com.example.Main
|
||||
|
||||
# Append library; the path is resolved relative to the job's directory.
|
||||
libraries:
|
||||
- jar: ../target/jar2.jar
|
|
@ -0,0 +1,13 @@
|
|||
targets:
|
||||
development:
|
||||
resources:
|
||||
pipelines:
|
||||
my_pipeline:
|
||||
|
||||
# Append library; the path is resolved relative to the pipeline's directory.
|
||||
libraries:
|
||||
- file:
|
||||
path: ../src/file2.py
|
||||
|
||||
- notebook:
|
||||
path: ../src/notebook2.py
|
|
@ -0,0 +1,36 @@
|
|||
resources:
|
||||
jobs:
|
||||
my_job:
|
||||
name: "placeholder"
|
||||
tasks:
|
||||
- task_key: notebook_example
|
||||
notebook_task:
|
||||
notebook_path: "this value is overridden"
|
||||
|
||||
- task_key: spark_python_example
|
||||
spark_python_task:
|
||||
python_file: "this value is overridden"
|
||||
|
||||
- task_key: dbt_example
|
||||
dbt_task:
|
||||
project_directory: "this value is overridden"
|
||||
commands:
|
||||
- "dbt run"
|
||||
|
||||
- task_key: sql_example
|
||||
sql_task:
|
||||
file:
|
||||
path: "this value is overridden"
|
||||
warehouse_id: cafef00d
|
||||
|
||||
- task_key: python_wheel_example
|
||||
python_wheel_task:
|
||||
package_name: my_package
|
||||
libraries:
|
||||
- whl: ../dist/wheel1.whl
|
||||
|
||||
- task_key: spark_jar_example
|
||||
spark_jar_task:
|
||||
main_class_name: com.example.Main
|
||||
libraries:
|
||||
- jar: ../target/jar1.jar
|
|
@ -0,0 +1,9 @@
|
|||
resources:
|
||||
pipelines:
|
||||
my_pipeline:
|
||||
name: "placeholder"
|
||||
libraries:
|
||||
- file:
|
||||
path: ../src/file1.py
|
||||
- notebook:
|
||||
path: ../src/notebook1.py
|
|
@ -0,0 +1 @@
|
|||
print("Hello, World!")
|
|
@ -0,0 +1 @@
|
|||
print("Hello, World!")
|
|
@ -0,0 +1 @@
|
|||
print("Hello, World!")
|
|
@ -0,0 +1,2 @@
|
|||
# Databricks notebook source
|
||||
print("Hello, World!")
|
|
@ -0,0 +1,2 @@
|
|||
# Databricks notebook source
|
||||
print("Hello, World!")
|
|
@ -0,0 +1,2 @@
|
|||
# Databricks notebook source
|
||||
print("Hello, World!")
|
|
@ -0,0 +1 @@
|
|||
select "Hello, World!"
|
|
@ -0,0 +1,6 @@
|
|||
# Test path translation (nominal behavior)
|
||||
|
||||
As of v0.216.0 (PR at https://github.com/databricks/cli/pull/1273), all paths in a resource
|
||||
definition are resolved relative to the directory of the file where they are defined.
|
||||
|
||||
This is more intuitive than the previous behavior (see `../fallback/README.md` for details).
|
|
@ -0,0 +1,13 @@
|
|||
bundle:
|
||||
name: path_translation_nominal
|
||||
|
||||
include:
|
||||
- "resources/*.yml"
|
||||
- "override_*.yml"
|
||||
|
||||
targets:
|
||||
development:
|
||||
default: true
|
||||
|
||||
error:
|
||||
default: false
|
|
@ -0,0 +1,53 @@
|
|||
targets:
|
||||
development:
|
||||
resources:
|
||||
jobs:
|
||||
my_job:
|
||||
tasks:
|
||||
- task_key: notebook_example
|
||||
notebook_task:
|
||||
notebook_path: ./src/notebook.py
|
||||
|
||||
- task_key: spark_python_example
|
||||
spark_python_task:
|
||||
python_file: ./src/file.py
|
||||
|
||||
- task_key: dbt_example
|
||||
dbt_task:
|
||||
project_directory: ./src/dbt_project
|
||||
commands:
|
||||
- "dbt run"
|
||||
|
||||
- task_key: sql_example
|
||||
sql_task:
|
||||
file:
|
||||
path: ./src/sql.sql
|
||||
warehouse_id: cafef00d
|
||||
|
||||
- task_key: python_wheel_example
|
||||
python_wheel_task:
|
||||
package_name: my_package
|
||||
|
||||
# Append library; the path is resolved relative to this file's directory.
|
||||
libraries:
|
||||
- whl: ./dist/wheel2.whl
|
||||
|
||||
- task_key: spark_jar_example
|
||||
spark_jar_task:
|
||||
main_class_name: com.example.Main
|
||||
|
||||
# Append library; the path is resolved relative to this file's directory.
|
||||
libraries:
|
||||
- jar: ./target/jar2.jar
|
||||
|
||||
- task_key: for_each_notebook_example
|
||||
for_each_task:
|
||||
task:
|
||||
notebook_task:
|
||||
notebook_path: ./src/notebook.py
|
||||
|
||||
- task_key: for_each_spark_python_example
|
||||
for_each_task:
|
||||
task:
|
||||
spark_python_task:
|
||||
python_file: ./src/file.py
|
|
@ -0,0 +1,13 @@
|
|||
targets:
|
||||
development:
|
||||
resources:
|
||||
pipelines:
|
||||
my_pipeline:
|
||||
|
||||
# Append library; the path is resolved relative to this file's directory.
|
||||
libraries:
|
||||
- file:
|
||||
path: src/file2.py
|
||||
|
||||
- notebook:
|
||||
path: src/notebook2.py
|
|
@ -0,0 +1,48 @@
|
|||
resources:
|
||||
jobs:
|
||||
my_job:
|
||||
name: "placeholder"
|
||||
tasks:
|
||||
- task_key: notebook_example
|
||||
notebook_task:
|
||||
notebook_path: "this value is overridden"
|
||||
|
||||
- task_key: spark_python_example
|
||||
spark_python_task:
|
||||
python_file: "this value is overridden"
|
||||
|
||||
- task_key: dbt_example
|
||||
dbt_task:
|
||||
project_directory: "this value is overridden"
|
||||
commands:
|
||||
- "dbt run"
|
||||
|
||||
- task_key: sql_example
|
||||
sql_task:
|
||||
file:
|
||||
path: "this value is overridden"
|
||||
warehouse_id: cafef00d
|
||||
|
||||
- task_key: python_wheel_example
|
||||
python_wheel_task:
|
||||
package_name: my_package
|
||||
libraries:
|
||||
- whl: ../dist/wheel1.whl
|
||||
|
||||
- task_key: spark_jar_example
|
||||
spark_jar_task:
|
||||
main_class_name: com.example.Main
|
||||
libraries:
|
||||
- jar: ../target/jar1.jar
|
||||
|
||||
- task_key: for_each_notebook_example
|
||||
for_each_task:
|
||||
task:
|
||||
notebook_task:
|
||||
notebook_path: "this value is overridden"
|
||||
|
||||
- task_key: for_each_spark_python_example
|
||||
for_each_task:
|
||||
task:
|
||||
spark_python_task:
|
||||
python_file: "this value is overridden"
|
|
@ -0,0 +1,9 @@
|
|||
resources:
|
||||
pipelines:
|
||||
my_pipeline:
|
||||
name: "placeholder"
|
||||
libraries:
|
||||
- file:
|
||||
path: ../src/file1.py
|
||||
- notebook:
|
||||
path: ../src/notebook1.py
|
|
@ -0,0 +1 @@
|
|||
print("Hello, World!")
|
|
@ -0,0 +1 @@
|
|||
print("Hello, World!")
|
|
@ -0,0 +1 @@
|
|||
print("Hello, World!")
|
|
@ -0,0 +1,2 @@
|
|||
# Databricks notebook source
|
||||
print("Hello, World!")
|
|
@ -0,0 +1,2 @@
|
|||
# Databricks notebook source
|
||||
print("Hello, World!")
|
|
@ -0,0 +1,2 @@
|
|||
# Databricks notebook source
|
||||
print("Hello, World!")
|
|
@ -0,0 +1 @@
|
|||
select "Hello, World!"
|
|
@ -0,0 +1,112 @@
|
|||
package config_tests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config/mutator"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPathTranslationFallback(t *testing.T) {
|
||||
b := loadTarget(t, "./path_translation/fallback", "development")
|
||||
|
||||
m := mutator.TranslatePaths()
|
||||
err := bundle.Apply(context.Background(), b, m)
|
||||
require.NoError(t, err)
|
||||
|
||||
j := b.Config.Resources.Jobs["my_job"]
|
||||
assert.Len(t, j.Tasks, 6)
|
||||
|
||||
assert.Equal(t, "notebook_example", filepath.ToSlash(j.Tasks[0].TaskKey))
|
||||
assert.Equal(t, "src/notebook", filepath.ToSlash(j.Tasks[0].NotebookTask.NotebookPath))
|
||||
|
||||
assert.Equal(t, "spark_python_example", filepath.ToSlash(j.Tasks[1].TaskKey))
|
||||
assert.Equal(t, "src/file.py", filepath.ToSlash(j.Tasks[1].SparkPythonTask.PythonFile))
|
||||
|
||||
assert.Equal(t, "dbt_example", filepath.ToSlash(j.Tasks[2].TaskKey))
|
||||
assert.Equal(t, "src/dbt_project", filepath.ToSlash(j.Tasks[2].DbtTask.ProjectDirectory))
|
||||
|
||||
assert.Equal(t, "sql_example", filepath.ToSlash(j.Tasks[3].TaskKey))
|
||||
assert.Equal(t, "src/sql.sql", filepath.ToSlash(j.Tasks[3].SqlTask.File.Path))
|
||||
|
||||
assert.Equal(t, "python_wheel_example", filepath.ToSlash(j.Tasks[4].TaskKey))
|
||||
assert.Equal(t, "dist/wheel1.whl", filepath.ToSlash(j.Tasks[4].Libraries[0].Whl))
|
||||
assert.Equal(t, "dist/wheel2.whl", filepath.ToSlash(j.Tasks[4].Libraries[1].Whl))
|
||||
|
||||
assert.Equal(t, "spark_jar_example", filepath.ToSlash(j.Tasks[5].TaskKey))
|
||||
assert.Equal(t, "target/jar1.jar", filepath.ToSlash(j.Tasks[5].Libraries[0].Jar))
|
||||
assert.Equal(t, "target/jar2.jar", filepath.ToSlash(j.Tasks[5].Libraries[1].Jar))
|
||||
|
||||
p := b.Config.Resources.Pipelines["my_pipeline"]
|
||||
assert.Len(t, p.Libraries, 4)
|
||||
|
||||
assert.Equal(t, "src/file1.py", filepath.ToSlash(p.Libraries[0].File.Path))
|
||||
assert.Equal(t, "src/notebook1", filepath.ToSlash(p.Libraries[1].Notebook.Path))
|
||||
assert.Equal(t, "src/file2.py", filepath.ToSlash(p.Libraries[2].File.Path))
|
||||
assert.Equal(t, "src/notebook2", filepath.ToSlash(p.Libraries[3].Notebook.Path))
|
||||
}
|
||||
|
||||
func TestPathTranslationFallbackError(t *testing.T) {
|
||||
b := loadTarget(t, "./path_translation/fallback", "error")
|
||||
|
||||
m := mutator.TranslatePaths()
|
||||
err := bundle.Apply(context.Background(), b, m)
|
||||
assert.ErrorContains(t, err, `notebook this value is overridden not found`)
|
||||
}
|
||||
|
||||
func TestPathTranslationNominal(t *testing.T) {
|
||||
b := loadTarget(t, "./path_translation/nominal", "development")
|
||||
|
||||
m := mutator.TranslatePaths()
|
||||
err := bundle.Apply(context.Background(), b, m)
|
||||
assert.NoError(t, err)
|
||||
|
||||
j := b.Config.Resources.Jobs["my_job"]
|
||||
assert.Len(t, j.Tasks, 8)
|
||||
|
||||
assert.Equal(t, "notebook_example", filepath.ToSlash(j.Tasks[0].TaskKey))
|
||||
assert.Equal(t, "src/notebook", filepath.ToSlash(j.Tasks[0].NotebookTask.NotebookPath))
|
||||
|
||||
assert.Equal(t, "spark_python_example", filepath.ToSlash(j.Tasks[1].TaskKey))
|
||||
assert.Equal(t, "src/file.py", filepath.ToSlash(j.Tasks[1].SparkPythonTask.PythonFile))
|
||||
|
||||
assert.Equal(t, "dbt_example", filepath.ToSlash(j.Tasks[2].TaskKey))
|
||||
assert.Equal(t, "src/dbt_project", filepath.ToSlash(j.Tasks[2].DbtTask.ProjectDirectory))
|
||||
|
||||
assert.Equal(t, "sql_example", filepath.ToSlash(j.Tasks[3].TaskKey))
|
||||
assert.Equal(t, "src/sql.sql", filepath.ToSlash(j.Tasks[3].SqlTask.File.Path))
|
||||
|
||||
assert.Equal(t, "python_wheel_example", filepath.ToSlash(j.Tasks[4].TaskKey))
|
||||
assert.Equal(t, "dist/wheel1.whl", filepath.ToSlash(j.Tasks[4].Libraries[0].Whl))
|
||||
assert.Equal(t, "dist/wheel2.whl", filepath.ToSlash(j.Tasks[4].Libraries[1].Whl))
|
||||
|
||||
assert.Equal(t, "spark_jar_example", filepath.ToSlash(j.Tasks[5].TaskKey))
|
||||
assert.Equal(t, "target/jar1.jar", filepath.ToSlash(j.Tasks[5].Libraries[0].Jar))
|
||||
assert.Equal(t, "target/jar2.jar", filepath.ToSlash(j.Tasks[5].Libraries[1].Jar))
|
||||
|
||||
assert.Equal(t, "for_each_notebook_example", filepath.ToSlash(j.Tasks[6].TaskKey))
|
||||
assert.Equal(t, "src/notebook", filepath.ToSlash(j.Tasks[6].ForEachTask.Task.NotebookTask.NotebookPath))
|
||||
|
||||
assert.Equal(t, "for_each_spark_python_example", filepath.ToSlash(j.Tasks[7].TaskKey))
|
||||
assert.Equal(t, "src/file.py", filepath.ToSlash(j.Tasks[7].ForEachTask.Task.SparkPythonTask.PythonFile))
|
||||
|
||||
p := b.Config.Resources.Pipelines["my_pipeline"]
|
||||
assert.Len(t, p.Libraries, 4)
|
||||
|
||||
assert.Equal(t, "src/file1.py", filepath.ToSlash(p.Libraries[0].File.Path))
|
||||
assert.Equal(t, "src/notebook1", filepath.ToSlash(p.Libraries[1].Notebook.Path))
|
||||
assert.Equal(t, "src/file2.py", filepath.ToSlash(p.Libraries[2].File.Path))
|
||||
assert.Equal(t, "src/notebook2", filepath.ToSlash(p.Libraries[3].Notebook.Path))
|
||||
}
|
||||
|
||||
func TestPathTranslationNominalError(t *testing.T) {
|
||||
b := loadTarget(t, "./path_translation/nominal", "error")
|
||||
|
||||
m := mutator.TranslatePaths()
|
||||
err := bundle.Apply(context.Background(), b, m)
|
||||
assert.ErrorContains(t, err, `notebook this value is overridden not found`)
|
||||
}
|
|
@ -33,6 +33,14 @@ func NewPatternFromPath(p Path) Pattern {
|
|||
return cs
|
||||
}
|
||||
|
||||
// Append appends the given components to the pattern.
|
||||
func (p Pattern) Append(cs ...patternComponent) Pattern {
|
||||
out := make(Pattern, len(p)+len(cs))
|
||||
copy(out, p)
|
||||
copy(out[len(p):], cs)
|
||||
return out
|
||||
}
|
||||
|
||||
type anyKeyComponent struct{}
|
||||
|
||||
// AnyKey returns a pattern component that matches any key.
|
||||
|
|
|
@ -26,3 +26,18 @@ func TestNewPatternFromPath(t *testing.T) {
|
|||
pat2 := dyn.NewPatternFromPath(path)
|
||||
assert.Equal(t, pat1, pat2)
|
||||
}
|
||||
|
||||
func TestPatternAppend(t *testing.T) {
|
||||
p1 := dyn.NewPattern(dyn.Key("foo"), dyn.Index(1))
|
||||
p2 := dyn.NewPattern(dyn.Key("foo")).Append(dyn.Index(1))
|
||||
assert.Equal(t, p1, p2)
|
||||
}
|
||||
|
||||
func TestPatternAppendAlwaysNew(t *testing.T) {
|
||||
p := make(dyn.Pattern, 0, 2).Append(dyn.Key("foo"))
|
||||
|
||||
// There is room for a second element in the slice.
|
||||
p1 := p.Append(dyn.Index(1))
|
||||
p2 := p.Append(dyn.Index(2))
|
||||
assert.NotEqual(t, p1, p2)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue