2023-04-05 14:02:17 +00:00
|
|
|
package mutator
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"os"
|
|
|
|
"path"
|
|
|
|
"path/filepath"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"github.com/databricks/bricks/bundle"
|
|
|
|
"github.com/databricks/bricks/libs/notebook"
|
|
|
|
"github.com/databricks/databricks-sdk-go/service/jobs"
|
|
|
|
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
|
|
|
)
|
|
|
|
|
|
|
|
type translatePaths struct {
|
|
|
|
seen map[string]string
|
|
|
|
filePath string
|
|
|
|
}
|
|
|
|
|
|
|
|
// TranslatePaths converts paths to local notebook files into paths in the workspace file system.
|
|
|
|
func TranslatePaths() bundle.Mutator {
|
|
|
|
return &translatePaths{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *translatePaths) Name() string {
|
|
|
|
return "TranslatePaths"
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *translatePaths) rewritePath(b *bundle.Bundle, p *string, fn func(literal, relPath, absPath string) (string, error)) error {
|
|
|
|
// We assume absolute paths point to a location in the workspace
|
|
|
|
if path.IsAbs(*p) {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Reuse value if this path has been rewritten before.
|
|
|
|
relPath := path.Clean(*p)
|
|
|
|
if interp, ok := m.seen[relPath]; ok {
|
|
|
|
*p = interp
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Convert local path into workspace path via specified function.
|
|
|
|
absPath := filepath.Join(b.Config.Path, relPath)
|
|
|
|
interp, err := fn(*p, relPath, absPath)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
*p = interp
|
|
|
|
m.seen[relPath] = interp
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *translatePaths) translateNotebookPath(literal, relPath, absPath string) (string, error) {
|
|
|
|
nb, _, err := notebook.Detect(absPath)
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
return "", fmt.Errorf("notebook %s not found", literal)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return "", fmt.Errorf("unable to determine if %s is a notebook: %w", relPath, err)
|
|
|
|
}
|
|
|
|
if !nb {
|
|
|
|
return "", fmt.Errorf("file at %s is not a notebook", relPath)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Upon import, notebooks are stripped of their extension.
|
|
|
|
withoutExt := strings.TrimSuffix(relPath, filepath.Ext(relPath))
|
|
|
|
|
|
|
|
// We have a notebook on our hands! It will be available under the file path.
|
|
|
|
return path.Join(m.filePath, withoutExt), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *translatePaths) translateFilePath(literal, relPath, absPath string) (string, error) {
|
|
|
|
_, err := os.Stat(absPath)
|
|
|
|
if os.IsNotExist(err) {
|
|
|
|
return "", fmt.Errorf("file %s not found", literal)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return "", fmt.Errorf("unable to access %s: %w", relPath, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// The file will be available under the file path.
|
|
|
|
return path.Join(m.filePath, relPath), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *translatePaths) translateJobTask(b *bundle.Bundle, task *jobs.JobTaskSettings) error {
|
|
|
|
var err error
|
|
|
|
|
|
|
|
if task.NotebookTask != nil {
|
|
|
|
err = m.rewritePath(b, &task.NotebookTask.NotebookPath, m.translateNotebookPath)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if task.SparkPythonTask != nil {
|
|
|
|
err = m.rewritePath(b, &task.SparkPythonTask.PythonFile, m.translateFilePath)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *translatePaths) translatePipelineLibrary(b *bundle.Bundle, library *pipelines.PipelineLibrary) error {
|
|
|
|
var err error
|
|
|
|
|
|
|
|
if library.Notebook != nil {
|
|
|
|
err = m.rewritePath(b, &library.Notebook.Path, m.translateNotebookPath)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-04-05 14:29:42 +00:00
|
|
|
if library.File != nil {
|
|
|
|
err = m.rewritePath(b, &library.File.Path, m.translateFilePath)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-04-05 14:02:17 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *translatePaths) Apply(_ context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
|
|
|
|
m.seen = make(map[string]string)
|
|
|
|
m.filePath = b.Config.Workspace.FilePath.Workspace
|
|
|
|
|
|
|
|
for _, job := range b.Config.Resources.Jobs {
|
|
|
|
for i := 0; i < len(job.Tasks); i++ {
|
|
|
|
err := m.translateJobTask(b, &job.Tasks[i])
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, pipeline := range b.Config.Resources.Pipelines {
|
|
|
|
for i := 0; i < len(pipeline.Libraries); i++ {
|
|
|
|
err := m.translatePipelineLibrary(b, &pipeline.Libraries[i])
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, nil
|
|
|
|
}
|