Interpolate paths for job tasks that reference files (#306)

## Changes

This change also swaps the order of mutators such that interpolation
happens before path translation. This means that is is possible to use
variables (e.g. `${bundle.environment}`) in notebook or file paths.

## Tests

New tests pass and verified manually.
This commit is contained in:
Pieter Noordhuis 2023-04-05 16:02:17 +02:00 committed by GitHub
parent 7427ceba6c
commit 4e4c0658db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 208 additions and 107 deletions

View File

@ -1,93 +0,0 @@
package mutator
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/libs/notebook"
)
type translateNotebookPaths struct {
seen map[string]string
}
// TranslateNotebookPaths converts paths to local notebook files into paths in the workspace file system.
func TranslateNotebookPaths() bundle.Mutator {
return &translateNotebookPaths{}
}
func (m *translateNotebookPaths) Name() string {
return "TranslateNotebookPaths"
}
func (m *translateNotebookPaths) rewritePath(b *bundle.Bundle, p *string) error {
// We assume absolute paths point to a location in the workspace
if path.IsAbs(*p) {
return nil
}
relPath := path.Clean(*p)
if interp, ok := m.seen[relPath]; ok {
*p = interp
return nil
}
absPath := filepath.Join(b.Config.Path, relPath)
nb, _, err := notebook.Detect(absPath)
if os.IsNotExist(err) {
return fmt.Errorf("notebook %s not found: %w", *p, err)
}
if err != nil {
return fmt.Errorf("unable to determine if %s is a notebook: %w", relPath, err)
}
if !nb {
return fmt.Errorf("file at %s is not a notebook", relPath)
}
// Upon import, notebooks are stripped of their extension.
withoutExt := strings.TrimSuffix(relPath, filepath.Ext(relPath))
// We have a notebook on our hands! It will be available under the file path.
interp := fmt.Sprintf("${workspace.file_path.workspace}/%s", withoutExt)
*p = interp
m.seen[relPath] = interp
return nil
}
func (m *translateNotebookPaths) Apply(_ context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
m.seen = make(map[string]string)
for _, job := range b.Config.Resources.Jobs {
for i := 0; i < len(job.Tasks); i++ {
task := &job.Tasks[i]
if task.NotebookTask == nil {
continue
}
err := m.rewritePath(b, &task.NotebookTask.NotebookPath)
if err != nil {
return nil, err
}
}
}
for _, pipeline := range b.Config.Resources.Pipelines {
for i := 0; i < len(pipeline.Libraries); i++ {
library := &pipeline.Libraries[i]
if library.Notebook == nil {
continue
}
err := m.rewritePath(b, &library.Notebook.Path)
if err != nil {
return nil, err
}
}
}
return nil, nil
}

View File

@ -0,0 +1,144 @@
package mutator
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/libs/notebook"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
)
type translatePaths struct {
seen map[string]string
filePath string
}
// TranslatePaths converts paths to local notebook files into paths in the workspace file system.
func TranslatePaths() bundle.Mutator {
return &translatePaths{}
}
func (m *translatePaths) Name() string {
return "TranslatePaths"
}
func (m *translatePaths) rewritePath(b *bundle.Bundle, p *string, fn func(literal, relPath, absPath string) (string, error)) error {
// We assume absolute paths point to a location in the workspace
if path.IsAbs(*p) {
return nil
}
// Reuse value if this path has been rewritten before.
relPath := path.Clean(*p)
if interp, ok := m.seen[relPath]; ok {
*p = interp
return nil
}
// Convert local path into workspace path via specified function.
absPath := filepath.Join(b.Config.Path, relPath)
interp, err := fn(*p, relPath, absPath)
if err != nil {
return err
}
*p = interp
m.seen[relPath] = interp
return nil
}
func (m *translatePaths) translateNotebookPath(literal, relPath, absPath string) (string, error) {
nb, _, err := notebook.Detect(absPath)
if os.IsNotExist(err) {
return "", fmt.Errorf("notebook %s not found", literal)
}
if err != nil {
return "", fmt.Errorf("unable to determine if %s is a notebook: %w", relPath, err)
}
if !nb {
return "", fmt.Errorf("file at %s is not a notebook", relPath)
}
// Upon import, notebooks are stripped of their extension.
withoutExt := strings.TrimSuffix(relPath, filepath.Ext(relPath))
// We have a notebook on our hands! It will be available under the file path.
return path.Join(m.filePath, withoutExt), nil
}
func (m *translatePaths) translateFilePath(literal, relPath, absPath string) (string, error) {
_, err := os.Stat(absPath)
if os.IsNotExist(err) {
return "", fmt.Errorf("file %s not found", literal)
}
if err != nil {
return "", fmt.Errorf("unable to access %s: %w", relPath, err)
}
// The file will be available under the file path.
return path.Join(m.filePath, relPath), nil
}
func (m *translatePaths) translateJobTask(b *bundle.Bundle, task *jobs.JobTaskSettings) error {
var err error
if task.NotebookTask != nil {
err = m.rewritePath(b, &task.NotebookTask.NotebookPath, m.translateNotebookPath)
if err != nil {
return err
}
}
if task.SparkPythonTask != nil {
err = m.rewritePath(b, &task.SparkPythonTask.PythonFile, m.translateFilePath)
if err != nil {
return err
}
}
return nil
}
func (m *translatePaths) translatePipelineLibrary(b *bundle.Bundle, library *pipelines.PipelineLibrary) error {
var err error
if library.Notebook != nil {
err = m.rewritePath(b, &library.Notebook.Path, m.translateNotebookPath)
if err != nil {
return err
}
}
return nil
}
func (m *translatePaths) Apply(_ context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
m.seen = make(map[string]string)
m.filePath = b.Config.Workspace.FilePath.Workspace
for _, job := range b.Config.Resources.Jobs {
for i := 0; i < len(job.Tasks); i++ {
err := m.translateJobTask(b, &job.Tasks[i])
if err != nil {
return nil, err
}
}
}
for _, pipeline := range b.Config.Resources.Pipelines {
for i := 0; i < len(pipeline.Libraries); i++ {
err := m.translatePipelineLibrary(b, &pipeline.Libraries[i])
if err != nil {
return nil, err
}
}
}
return nil, nil
}

View File

@ -16,21 +16,33 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func touchFile(t *testing.T, path string) { func touchNotebookFile(t *testing.T, path string) {
f, err := os.Create(path) f, err := os.Create(path)
require.NoError(t, err) require.NoError(t, err)
f.WriteString("# Databricks notebook source\n") f.WriteString("# Databricks notebook source\n")
f.Close() f.Close()
} }
func TestNotebookPaths(t *testing.T) { func touchEmptyFile(t *testing.T, path string) {
f, err := os.Create(path)
require.NoError(t, err)
f.Close()
}
func TestTranslatePaths(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
touchFile(t, filepath.Join(dir, "my_job_notebook.py")) touchNotebookFile(t, filepath.Join(dir, "my_job_notebook.py"))
touchFile(t, filepath.Join(dir, "my_pipeline_notebook.py")) touchNotebookFile(t, filepath.Join(dir, "my_pipeline_notebook.py"))
touchEmptyFile(t, filepath.Join(dir, "my_python_file.py"))
bundle := &bundle.Bundle{ bundle := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Path: dir, Path: dir,
Workspace: config.Workspace{
FilePath: config.PathLike{
Workspace: "/bundle",
},
},
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{
"job": { "job": {
@ -56,6 +68,11 @@ func TestNotebookPaths(t *testing.T) {
PackageName: "foo", PackageName: "foo",
}, },
}, },
{
SparkPythonTask: &jobs.SparkPythonTask{
PythonFile: "./my_python_file.py",
},
},
}, },
}, },
}, },
@ -90,13 +107,13 @@ func TestNotebookPaths(t *testing.T) {
}, },
} }
_, err := mutator.TranslateNotebookPaths().Apply(context.Background(), bundle) _, err := mutator.TranslatePaths().Apply(context.Background(), bundle)
require.NoError(t, err) require.NoError(t, err)
// Assert that the path in the tasks now refer to the artifact. // Assert that the path in the tasks now refer to the artifact.
assert.Equal( assert.Equal(
t, t,
"${workspace.file_path.workspace}/my_job_notebook", "/bundle/my_job_notebook",
bundle.Config.Resources.Jobs["job"].Tasks[0].NotebookTask.NotebookPath, bundle.Config.Resources.Jobs["job"].Tasks[0].NotebookTask.NotebookPath,
) )
assert.Equal( assert.Equal(
@ -106,14 +123,19 @@ func TestNotebookPaths(t *testing.T) {
) )
assert.Equal( assert.Equal(
t, t,
"${workspace.file_path.workspace}/my_job_notebook", "/bundle/my_job_notebook",
bundle.Config.Resources.Jobs["job"].Tasks[2].NotebookTask.NotebookPath, bundle.Config.Resources.Jobs["job"].Tasks[2].NotebookTask.NotebookPath,
) )
assert.Equal(
t,
"/bundle/my_python_file.py",
bundle.Config.Resources.Jobs["job"].Tasks[4].SparkPythonTask.PythonFile,
)
// Assert that the path in the libraries now refer to the artifact. // Assert that the path in the libraries now refer to the artifact.
assert.Equal( assert.Equal(
t, t,
"${workspace.file_path.workspace}/my_pipeline_notebook", "/bundle/my_pipeline_notebook",
bundle.Config.Resources.Pipelines["pipeline"].Libraries[0].Notebook.Path, bundle.Config.Resources.Pipelines["pipeline"].Libraries[0].Notebook.Path,
) )
assert.Equal( assert.Equal(
@ -123,7 +145,7 @@ func TestNotebookPaths(t *testing.T) {
) )
assert.Equal( assert.Equal(
t, t,
"${workspace.file_path.workspace}/my_pipeline_notebook", "/bundle/my_pipeline_notebook",
bundle.Config.Resources.Pipelines["pipeline"].Libraries[2].Notebook.Path, bundle.Config.Resources.Pipelines["pipeline"].Libraries[2].Notebook.Path,
) )
} }
@ -152,8 +174,36 @@ func TestJobNotebookDoesNotExistError(t *testing.T) {
}, },
} }
_, err := mutator.TranslateNotebookPaths().Apply(context.Background(), bundle) _, err := mutator.TranslatePaths().Apply(context.Background(), bundle)
assert.ErrorContains(t, err, "notebook ./doesnt_exist.py not found") assert.EqualError(t, err, "notebook ./doesnt_exist.py not found")
}
func TestJobFileDoesNotExistError(t *testing.T) {
dir := t.TempDir()
bundle := &bundle.Bundle{
Config: config.Root{
Path: dir,
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.JobTaskSettings{
{
SparkPythonTask: &jobs.SparkPythonTask{
PythonFile: "./doesnt_exist.py",
},
},
},
},
},
},
},
},
}
_, err := mutator.TranslatePaths().Apply(context.Background(), bundle)
assert.EqualError(t, err, "file ./doesnt_exist.py not found")
} }
func TestPipelineNotebookDoesNotExistError(t *testing.T) { func TestPipelineNotebookDoesNotExistError(t *testing.T) {
@ -180,6 +230,6 @@ func TestPipelineNotebookDoesNotExistError(t *testing.T) {
}, },
} }
_, err := mutator.TranslateNotebookPaths().Apply(context.Background(), bundle) _, err := mutator.TranslatePaths().Apply(context.Background(), bundle)
assert.ErrorContains(t, err, "notebook ./doesnt_exist.py not found") assert.EqualError(t, err, "notebook ./doesnt_exist.py not found")
} }

View File

@ -18,11 +18,11 @@ func Initialize() bundle.Mutator {
mutator.DefineDefaultWorkspaceRoot(), mutator.DefineDefaultWorkspaceRoot(),
mutator.ExpandWorkspaceRoot(), mutator.ExpandWorkspaceRoot(),
mutator.DefineDefaultWorkspacePaths(), mutator.DefineDefaultWorkspacePaths(),
mutator.TranslateNotebookPaths(),
interpolation.Interpolate( interpolation.Interpolate(
interpolation.IncludeLookupsInPath("bundle"), interpolation.IncludeLookupsInPath("bundle"),
interpolation.IncludeLookupsInPath("workspace"), interpolation.IncludeLookupsInPath("workspace"),
), ),
mutator.TranslatePaths(),
terraform.Initialize(), terraform.Initialize(),
}, },
) )