Store relative path to configuration file for every resource (#322)

## Changes

If a configuration file is located in a subdirectory of the bundle root,
files referenced from that configuration file should be relative to its
configuration file's directory instead of the bundle root.

## Tests

* New tests in `bundle/config/mutator/translate_paths_test.go`.
* Existing tests under `bundle/tests` pass and are augmented to assert
on paths.

---------

Co-authored-by: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com>
This commit is contained in:
Pieter Noordhuis 2023-04-12 16:17:13 +02:00 committed by GitHub
parent a390271cd8
commit 31ccebd62a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 301 additions and 47 deletions

View File

@ -44,11 +44,7 @@ type Bundle struct {
}
func Load(path string) (*Bundle, error) {
bundle := &Bundle{
Config: config.Root{
Path: path,
},
}
bundle := &Bundle{}
err := bundle.Config.Load(filepath.Join(path, config.FileName))
if err != nil {
return nil, err

View File

@ -44,7 +44,7 @@ func TestBundleMustLoadSuccess(t *testing.T) {
t.Setenv(envBundleRoot, "./tests/basic")
b, err := MustLoad()
require.NoError(t, err)
assert.Equal(t, "./tests/basic", b.Config.Path)
assert.Equal(t, "tests/basic", filepath.ToSlash(b.Config.Path))
}
func TestBundleMustLoadFailureWithEnv(t *testing.T) {
@ -63,7 +63,7 @@ func TestBundleTryLoadSuccess(t *testing.T) {
t.Setenv(envBundleRoot, "./tests/basic")
b, err := TryLoad()
require.NoError(t, err)
assert.Equal(t, "./tests/basic", b.Config.Path)
assert.Equal(t, "tests/basic", filepath.ToSlash(b.Config.Path))
}
func TestBundleTryLoadFailureWithEnv(t *testing.T) {

View File

@ -15,8 +15,7 @@ import (
)
type translatePaths struct {
seen map[string]string
filePath string
seen map[string]string
}
// TranslatePaths converts paths to local notebook files into paths in the workspace file system.
@ -28,75 +27,96 @@ func (m *translatePaths) Name() string {
return "TranslatePaths"
}
func (m *translatePaths) rewritePath(b *bundle.Bundle, p *string, fn func(literal, relPath, absPath string) (string, error)) error {
// rewritePath converts a given relative path to a stable remote workspace path.
//
// It takes these arguments:
// - The argument `dir` is the directory relative to which the given relative path is.
// - The given relative path is both passed and written back through `*p`.
// - The argument `fn` is a function that performs the actual rewriting logic.
// This logic is different between regular files or notebooks.
//
// The function returns an error if it is impossible to rewrite the given relative path.
func (m *translatePaths) rewritePath(
dir string,
b *bundle.Bundle,
p *string,
fn func(literal, localPath, remotePath string) (string, error),
) error {
// We assume absolute paths point to a location in the workspace
if path.IsAbs(*p) {
if path.IsAbs(filepath.ToSlash(*p)) {
return nil
}
// Reuse value if this path has been rewritten before.
relPath := path.Clean(*p)
if interp, ok := m.seen[relPath]; ok {
// Local path is relative to the directory the resource was defined in.
localPath := filepath.Join(dir, filepath.FromSlash(*p))
if interp, ok := m.seen[localPath]; ok {
*p = interp
return nil
}
// Remote path must be relative to the bundle root.
remotePath, err := filepath.Rel(b.Config.Path, localPath)
if err != nil {
return err
}
if strings.HasPrefix(remotePath, "..") {
return fmt.Errorf("path %s is not contained in bundle root path", localPath)
}
// Prefix remote path with its remote root path.
remotePath = path.Join(b.Config.Workspace.FilePath.Workspace, filepath.ToSlash(remotePath))
// Convert local path into workspace path via specified function.
absPath := filepath.Join(b.Config.Path, relPath)
interp, err := fn(*p, relPath, absPath)
interp, err := fn(*p, localPath, filepath.ToSlash(remotePath))
if err != nil {
return err
}
*p = interp
m.seen[relPath] = interp
m.seen[localPath] = interp
return nil
}
func (m *translatePaths) translateNotebookPath(literal, relPath, absPath string) (string, error) {
nb, _, err := notebook.Detect(absPath)
func (m *translatePaths) translateNotebookPath(literal, localPath, remotePath string) (string, error) {
nb, _, err := notebook.Detect(localPath)
if os.IsNotExist(err) {
return "", fmt.Errorf("notebook %s not found", literal)
}
if err != nil {
return "", fmt.Errorf("unable to determine if %s is a notebook: %w", relPath, err)
return "", fmt.Errorf("unable to determine if %s is a notebook: %w", localPath, err)
}
if !nb {
return "", fmt.Errorf("file at %s is not a notebook", relPath)
return "", fmt.Errorf("file at %s is not a notebook", localPath)
}
// Upon import, notebooks are stripped of their extension.
withoutExt := strings.TrimSuffix(relPath, filepath.Ext(relPath))
// We have a notebook on our hands! It will be available under the file path.
return path.Join(m.filePath, withoutExt), nil
return strings.TrimSuffix(remotePath, filepath.Ext(localPath)), nil
}
func (m *translatePaths) translateFilePath(literal, relPath, absPath string) (string, error) {
_, err := os.Stat(absPath)
func (m *translatePaths) translateFilePath(literal, localPath, remotePath string) (string, error) {
_, err := os.Stat(localPath)
if os.IsNotExist(err) {
return "", fmt.Errorf("file %s not found", literal)
}
if err != nil {
return "", fmt.Errorf("unable to access %s: %w", relPath, err)
return "", fmt.Errorf("unable to access %s: %w", localPath, err)
}
// The file will be available under the file path.
return path.Join(m.filePath, relPath), nil
return remotePath, nil
}
func (m *translatePaths) translateJobTask(b *bundle.Bundle, task *jobs.JobTaskSettings) error {
func (m *translatePaths) translateJobTask(dir string, b *bundle.Bundle, task *jobs.JobTaskSettings) error {
var err error
if task.NotebookTask != nil {
err = m.rewritePath(b, &task.NotebookTask.NotebookPath, m.translateNotebookPath)
err = m.rewritePath(dir, b, &task.NotebookTask.NotebookPath, m.translateNotebookPath)
if err != nil {
return err
}
}
if task.SparkPythonTask != nil {
err = m.rewritePath(b, &task.SparkPythonTask.PythonFile, m.translateFilePath)
err = m.rewritePath(dir, b, &task.SparkPythonTask.PythonFile, m.translateFilePath)
if err != nil {
return err
}
@ -105,18 +125,18 @@ func (m *translatePaths) translateJobTask(b *bundle.Bundle, task *jobs.JobTaskSe
return nil
}
func (m *translatePaths) translatePipelineLibrary(b *bundle.Bundle, library *pipelines.PipelineLibrary) error {
func (m *translatePaths) translatePipelineLibrary(dir string, b *bundle.Bundle, library *pipelines.PipelineLibrary) error {
var err error
if library.Notebook != nil {
err = m.rewritePath(b, &library.Notebook.Path, m.translateNotebookPath)
err = m.rewritePath(dir, b, &library.Notebook.Path, m.translateNotebookPath)
if err != nil {
return err
}
}
if library.File != nil {
err = m.rewritePath(b, &library.File.Path, m.translateFilePath)
err = m.rewritePath(dir, b, &library.File.Path, m.translateFilePath)
if err != nil {
return err
}
@ -127,20 +147,29 @@ func (m *translatePaths) translatePipelineLibrary(b *bundle.Bundle, library *pip
func (m *translatePaths) Apply(_ context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
m.seen = make(map[string]string)
m.filePath = b.Config.Workspace.FilePath.Workspace
for _, job := range b.Config.Resources.Jobs {
for key, job := range b.Config.Resources.Jobs {
dir, err := job.ConfigFileDirectory()
if err != nil {
return nil, fmt.Errorf("unable to determine directory for job %s: %w", key, err)
}
for i := 0; i < len(job.Tasks); i++ {
err := m.translateJobTask(b, &job.Tasks[i])
err := m.translateJobTask(dir, b, &job.Tasks[i])
if err != nil {
return nil, err
}
}
}
for _, pipeline := range b.Config.Resources.Pipelines {
for key, pipeline := range b.Config.Resources.Pipelines {
dir, err := pipeline.ConfigFileDirectory()
if err != nil {
return nil, fmt.Errorf("unable to determine directory for pipeline %s: %w", key, err)
}
for i := 0; i < len(pipeline.Libraries); i++ {
err := m.translatePipelineLibrary(b, &pipeline.Libraries[i])
err := m.translatePipelineLibrary(dir, b, &pipeline.Libraries[i])
if err != nil {
return nil, err
}

View File

@ -24,6 +24,8 @@ func touchNotebookFile(t *testing.T, path string) {
}
func touchEmptyFile(t *testing.T, path string) {
err := os.MkdirAll(filepath.Dir(path), 0700)
require.NoError(t, err)
f, err := os.Create(path)
require.NoError(t, err)
f.Close()
@ -46,6 +48,9 @@ func TestTranslatePaths(t *testing.T) {
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
Paths: resources.Paths{
ConfigFilePath: filepath.Join(dir, "resource.yml"),
},
JobSettings: &jobs.JobSettings{
Tasks: []jobs.JobTaskSettings{
{
@ -79,6 +84,9 @@ func TestTranslatePaths(t *testing.T) {
},
Pipelines: map[string]*resources.Pipeline{
"pipeline": {
Paths: resources.Paths{
ConfigFilePath: filepath.Join(dir, "resource.yml"),
},
PipelineSpec: &pipelines.PipelineSpec{
Libraries: []pipelines.PipelineLibrary{
{
@ -160,6 +168,109 @@ func TestTranslatePaths(t *testing.T) {
)
}
func TestTranslatePathsInSubdirectories(t *testing.T) {
dir := t.TempDir()
touchEmptyFile(t, filepath.Join(dir, "job", "my_python_file.py"))
touchEmptyFile(t, filepath.Join(dir, "pipeline", "my_python_file.py"))
bundle := &bundle.Bundle{
Config: config.Root{
Path: dir,
Workspace: config.Workspace{
FilePath: config.PathLike{
Workspace: "/bundle",
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
Paths: resources.Paths{
ConfigFilePath: filepath.Join(dir, "job/resource.yml"),
},
JobSettings: &jobs.JobSettings{
Tasks: []jobs.JobTaskSettings{
{
SparkPythonTask: &jobs.SparkPythonTask{
PythonFile: "./my_python_file.py",
},
},
},
},
},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline": {
Paths: resources.Paths{
ConfigFilePath: filepath.Join(dir, "pipeline/resource.yml"),
},
PipelineSpec: &pipelines.PipelineSpec{
Libraries: []pipelines.PipelineLibrary{
{
File: &pipelines.FileLibrary{
Path: "./my_python_file.py",
},
},
},
},
},
},
},
},
}
_, err := mutator.TranslatePaths().Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Equal(
t,
"/bundle/job/my_python_file.py",
bundle.Config.Resources.Jobs["job"].Tasks[0].SparkPythonTask.PythonFile,
)
assert.Equal(
t,
"/bundle/pipeline/my_python_file.py",
bundle.Config.Resources.Pipelines["pipeline"].Libraries[0].File.Path,
)
}
func TestTranslatePathsOutsideBundleRoot(t *testing.T) {
dir := t.TempDir()
bundle := &bundle.Bundle{
Config: config.Root{
Path: dir,
Workspace: config.Workspace{
FilePath: config.PathLike{
Workspace: "/bundle",
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
Paths: resources.Paths{
ConfigFilePath: filepath.Join(dir, "../resource.yml"),
},
JobSettings: &jobs.JobSettings{
Tasks: []jobs.JobTaskSettings{
{
SparkPythonTask: &jobs.SparkPythonTask{
PythonFile: "./my_python_file.py",
},
},
},
},
},
},
},
},
}
_, err := mutator.TranslatePaths().Apply(context.Background(), bundle)
assert.ErrorContains(t, err, "is not contained in bundle root")
}
func TestJobNotebookDoesNotExistError(t *testing.T) {
dir := t.TempDir()
@ -169,6 +280,9 @@ func TestJobNotebookDoesNotExistError(t *testing.T) {
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
Paths: resources.Paths{
ConfigFilePath: filepath.Join(dir, "fake.yml"),
},
JobSettings: &jobs.JobSettings{
Tasks: []jobs.JobTaskSettings{
{
@ -197,6 +311,9 @@ func TestJobFileDoesNotExistError(t *testing.T) {
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
Paths: resources.Paths{
ConfigFilePath: filepath.Join(dir, "fake.yml"),
},
JobSettings: &jobs.JobSettings{
Tasks: []jobs.JobTaskSettings{
{
@ -225,6 +342,9 @@ func TestPipelineNotebookDoesNotExistError(t *testing.T) {
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"pipeline": {
Paths: resources.Paths{
ConfigFilePath: filepath.Join(dir, "fake.yml"),
},
PipelineSpec: &pipelines.PipelineSpec{
Libraries: []pipelines.PipelineLibrary{
{
@ -243,3 +363,34 @@ func TestPipelineNotebookDoesNotExistError(t *testing.T) {
_, err := mutator.TranslatePaths().Apply(context.Background(), bundle)
assert.EqualError(t, err, "notebook ./doesnt_exist.py not found")
}
func TestPipelineFileDoesNotExistError(t *testing.T) {
dir := t.TempDir()
bundle := &bundle.Bundle{
Config: config.Root{
Path: dir,
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"pipeline": {
Paths: resources.Paths{
ConfigFilePath: filepath.Join(dir, "fake.yml"),
},
PipelineSpec: &pipelines.PipelineSpec{
Libraries: []pipelines.PipelineLibrary{
{
File: &pipelines.FileLibrary{
Path: "./doesnt_exist.py",
},
},
},
},
},
},
},
},
}
_, err := mutator.TranslatePaths().Apply(context.Background(), bundle)
assert.EqualError(t, err, "file ./doesnt_exist.py not found")
}

View File

@ -12,3 +12,21 @@ type Resources struct {
Models map[string]*resources.MlflowModel `json:"models,omitempty"`
Experiments map[string]*resources.MlflowExperiment `json:"experiments,omitempty"`
}
// SetConfigFilePath sets the specified path for all resources contained in this instance.
// This property is used to correctly resolve paths relative to the path
// of the configuration file they were defined in.
func (r *Resources) SetConfigFilePath(path string) {
for _, e := range r.Jobs {
e.ConfigFilePath = path
}
for _, e := range r.Pipelines {
e.ConfigFilePath = path
}
for _, e := range r.Models {
e.ConfigFilePath = path
}
for _, e := range r.Experiments {
e.ConfigFilePath = path
}
}

View File

@ -6,5 +6,7 @@ type Job struct {
ID string `json:"id,omitempty" bundle:"readonly"`
Permissions []Permission `json:"permissions,omitempty"`
Paths
*jobs.JobSettings
}

View File

@ -5,5 +5,7 @@ import "github.com/databricks/databricks-sdk-go/service/mlflow"
type MlflowExperiment struct {
Permissions []Permission `json:"permissions,omitempty"`
Paths
*mlflow.Experiment
}

View File

@ -5,5 +5,7 @@ import "github.com/databricks/databricks-sdk-go/service/mlflow"
type MlflowModel struct {
Permissions []Permission `json:"permissions,omitempty"`
Paths
*mlflow.RegisteredModel
}

View File

@ -6,5 +6,7 @@ type Pipeline struct {
ID string `json:"id,omitempty" bundle:"readonly"`
Permissions []Permission `json:"permissions,omitempty"`
Paths
*pipelines.PipelineSpec
}

View File

@ -0,0 +1,19 @@
package resources
import (
"fmt"
"path/filepath"
)
type Paths struct {
// ConfigFilePath holds the path to the configuration file that
// described the resource that this type is embedded in.
ConfigFilePath string `json:"-" bundle:"readonly"`
}
func (p *Paths) ConfigFileDirectory() (string, error) {
if p.ConfigFilePath == "" {
return "", fmt.Errorf("config file path not configured")
}
return filepath.Dir(p.ConfigFilePath), nil
}

View File

@ -14,7 +14,7 @@ const FileName = "bundle.yml"
type Root struct {
// Path contains the directory path to the root of the bundle.
// It is set when loading `bundle.yml`.
Path string `json:"-"`
Path string `json:"-" bundle:"readonly"`
// Bundle contains details about this bundle, such as its name,
// version of the spec (TODO), default cluster, default warehouse, etc.
@ -56,7 +56,6 @@ func Load(path string) (*Root, error) {
// If we were given a directory, assume this is the bundle root.
if stat.IsDir() {
r.Path = path
path = filepath.Join(path, FileName)
}
@ -67,8 +66,21 @@ func Load(path string) (*Root, error) {
return &r, nil
}
func (r *Root) Load(file string) error {
raw, err := os.ReadFile(file)
// SetConfigFilePath configures the path that its configuration
// was loaded from in configuration leafs that require it.
func (r *Root) SetConfigFilePath(path string) {
r.Resources.SetConfigFilePath(path)
if r.Environments != nil {
for _, env := range r.Environments {
if env.Resources != nil {
env.Resources.SetConfigFilePath(path)
}
}
}
}
func (r *Root) Load(path string) error {
raw, err := os.ReadFile(path)
if err != nil {
return err
}
@ -76,10 +88,15 @@ func (r *Root) Load(file string) error {
if err != nil {
return err
}
r.Path = filepath.Dir(path)
r.SetConfigFilePath(path)
return nil
}
func (r *Root) Merge(other *Root) error {
// TODO: when hooking into merge semantics, disallow setting path on the target instance.
other.Path = ""
// TODO: define and test semantics for merging.
return mergo.MergeWithOverwrite(r, other)
}

View File

@ -32,12 +32,14 @@ func TestRootLoad(t *testing.T) {
func TestRootMergeStruct(t *testing.T) {
root := &Root{
Path: "path",
Workspace: Workspace{
Host: "foo",
Profile: "profile",
},
}
other := &Root{
Path: "path",
Workspace: Workspace{
Host: "bar",
},
@ -49,6 +51,7 @@ func TestRootMergeStruct(t *testing.T) {
func TestRootMergeMap(t *testing.T) {
root := &Root{
Path: "path",
Environments: map[string]*Environment{
"development": {
Workspace: &Workspace{
@ -59,6 +62,7 @@ func TestRootMergeMap(t *testing.T) {
},
}
other := &Root{
Path: "path",
Environments: map[string]*Environment{
"development": {
Workspace: &Workspace{

View File

@ -1,6 +1,7 @@
package config_tests
import (
"path/filepath"
"sort"
"testing"
@ -15,6 +16,12 @@ func TestIncludeDefault(t *testing.T) {
keys := maps.Keys(b.Config.Resources.Jobs)
sort.Strings(keys)
assert.Equal(t, []string{"my_first_job", "my_second_job"}, keys)
assert.Equal(t, "1", b.Config.Resources.Jobs["my_first_job"].ID)
assert.Equal(t, "2", b.Config.Resources.Jobs["my_second_job"].ID)
first := b.Config.Resources.Jobs["my_first_job"]
assert.Equal(t, "1", first.ID)
assert.Equal(t, "include_default/my_first_job/resource.yml", filepath.ToSlash(first.ConfigFilePath))
second := b.Config.Resources.Jobs["my_second_job"]
assert.Equal(t, "2", second.ID)
assert.Equal(t, "include_default/my_second_job/resource.yml", filepath.ToSlash(second.ConfigFilePath))
}

View File

@ -1,6 +1,7 @@
package config_tests
import (
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
@ -13,6 +14,7 @@ func TestJobAndPipelineDevelopment(t *testing.T) {
assert.Len(t, b.Config.Resources.Pipelines, 1)
p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"]
assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(p.ConfigFilePath))
assert.True(t, p.Development)
require.Len(t, p.Libraries, 1)
assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path)
@ -25,6 +27,7 @@ func TestJobAndPipelineStaging(t *testing.T) {
assert.Len(t, b.Config.Resources.Pipelines, 1)
p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"]
assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(p.ConfigFilePath))
assert.False(t, p.Development)
require.Len(t, p.Libraries, 1)
assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path)
@ -37,12 +40,14 @@ func TestJobAndPipelineProduction(t *testing.T) {
assert.Len(t, b.Config.Resources.Pipelines, 1)
p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"]
assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(p.ConfigFilePath))
assert.False(t, p.Development)
require.Len(t, p.Libraries, 1)
assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path)
assert.Equal(t, "nyc_taxi_production", p.Target)
j := b.Config.Resources.Jobs["pipeline_schedule"]
assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(j.ConfigFilePath))
assert.Equal(t, "Daily refresh of production pipeline", j.Name)
require.Len(t, j.Tasks, 1)
assert.NotEmpty(t, j.Tasks[0].PipelineTask.PipelineId)