From 78b75f66efec34708efb3be5b70480dc6cfc3c19 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Thu, 3 Aug 2023 15:59:08 +0200 Subject: [PATCH 1/6] Added transformation mutator for Python wheel task for them to work on DBR <13.1 --- bundle/artifacts/artifacts.go | 33 +++++++++++ bundle/libraries/libraries.go | 50 +++++++++++----- bundle/phases/deploy.go | 2 + bundle/python/transform.go | 106 ++++++++++++++++++++++++++++++++++ 4 files changed, 177 insertions(+), 14 deletions(-) create mode 100644 bundle/python/transform.go diff --git a/bundle/artifacts/artifacts.go b/bundle/artifacts/artifacts.go index c54131217..73a6d4471 100644 --- a/bundle/artifacts/artifacts.go +++ b/bundle/artifacts/artifacts.go @@ -165,3 +165,36 @@ func getUploadBasePath(b *bundle.Bundle) (string, error) { return path.Join(artifactPath, ".internal"), nil } + +func UploadNotebook(ctx context.Context, notebook string, b *bundle.Bundle) (string, error) { + raw, err := os.ReadFile(notebook) + if err != nil { + return "", fmt.Errorf("unable to read %s: %w", notebook, errors.Unwrap(err)) + } + + uploadPath, err := getUploadBasePath(b) + if err != nil { + return "", err + } + + remotePath := path.Join(uploadPath, path.Base(notebook)) + // Make sure target directory exists. + err = b.WorkspaceClient().Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) + if err != nil { + return "", fmt.Errorf("unable to create directory for %s: %w", remotePath, err) + } + + // Import to workspace. + err = b.WorkspaceClient().Workspace.Import(ctx, workspace.Import{ + Path: remotePath, + Overwrite: true, + Format: workspace.ImportFormatSource, + Content: base64.StdEncoding.EncodeToString(raw), + Language: workspace.LanguagePython, + }) + if err != nil { + return "", fmt.Errorf("unable to import %s: %w", remotePath, err) + } + + return remotePath, nil +} diff --git a/bundle/libraries/libraries.go b/bundle/libraries/libraries.go index f7a2574ad..0afaf6d43 100644 --- a/bundle/libraries/libraries.go +++ b/bundle/libraries/libraries.go @@ -24,26 +24,48 @@ func (a *match) Name() string { } func (a *match) Apply(ctx context.Context, b *bundle.Bundle) error { - r := b.Config.Resources - for k := range b.Config.Resources.Jobs { - tasks := r.Jobs[k].JobSettings.Tasks - for i := range tasks { - task := &tasks[i] - if isMissingRequiredLibraries(task) { - return fmt.Errorf("task '%s' is missing required libraries. Please include your package code in task libraries block", task.TaskKey) - } - for j := range task.Libraries { - lib := &task.Libraries[j] - err := findArtifactsAndMarkForUpload(ctx, lib, b) - if err != nil { - return err - } + tasks := findAllTasks(b) + for _, task := range tasks { + if isMissingRequiredLibraries(task) { + return fmt.Errorf("task '%s' is missing required libraries. Please include your package code in task libraries block", task.TaskKey) + } + for j := range task.Libraries { + lib := &task.Libraries[j] + err := findArtifactsAndMarkForUpload(ctx, lib, b) + if err != nil { + return err } } } return nil } +func findAllTasks(b *bundle.Bundle) []*jobs.Task { + r := b.Config.Resources + result := make([]*jobs.Task, 0) + for k := range b.Config.Resources.Jobs { + tasks := r.Jobs[k].JobSettings.Tasks + for i := range tasks { + task := &tasks[i] + result = append(result, task) + } + } + + return result +} + +func FindAllWheelTasks(b *bundle.Bundle) []*jobs.Task { + tasks := findAllTasks(b) + wheelTasks := make([]*jobs.Task, 0) + for _, task := range tasks { + if task.PythonWheelTask != nil { + wheelTasks = append(wheelTasks, task) + } + } + + return wheelTasks +} + func isMissingRequiredLibraries(task *jobs.Task) bool { if task.Libraries != nil { return false diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 011bb4b2b..9d9b746e1 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -8,6 +8,7 @@ import ( "github.com/databricks/cli/bundle/deploy/lock" "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/libraries" + "github.com/databricks/cli/bundle/python" ) // The deploy phase deploys artifacts and resources. @@ -21,6 +22,7 @@ func Deploy() bundle.Mutator { libraries.MatchWithArtifacts(), artifacts.CleanUp(), artifacts.UploadAll(), + python.TransformWheelTask(), terraform.Interpolate(), terraform.Write(), terraform.StatePull(), diff --git a/bundle/python/transform.go b/bundle/python/transform.go new file mode 100644 index 000000000..401bce7f5 --- /dev/null +++ b/bundle/python/transform.go @@ -0,0 +1,106 @@ +package python + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/artifacts" + "github.com/databricks/cli/bundle/libraries" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +// This mutator takes the wheel task and trasnforms it into notebook +// which installs uploaded wheels using %pip and then calling corresponding +// entry point. +func TransformWheelTask() bundle.Mutator { + return &transform{} +} + +type transform struct { +} + +func (m *transform) Name() string { + return "python.TransformWheelTask" +} + +const INSTALL_WHEEL_CODE = `%%pip install --force-reinstall %s` + +const NOTEBOOK_CODE = ` +%%python +%s + +from contextlib import redirect_stdout +import io +import sys +sys.argv = [%s] + +import pkg_resources +_func = pkg_resources.load_entry_point("%s", "console_scripts", "%s") + +f = io.StringIO() +with redirect_stdout(f): + _func() +s = f.getvalue() +dbutils.notebook.exit(s) +` + +func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error { + // TODO: do the transformaton only for DBR < 13.1 and (maybe?) existing clusters + wheelTasks := libraries.FindAllWheelTasks(b) + for _, wheelTask := range wheelTasks { + taskDefinition := wheelTask.PythonWheelTask + libraries := wheelTask.Libraries + + wheelTask.PythonWheelTask = nil + wheelTask.Libraries = nil + + path, err := generateNotebookWrapper(taskDefinition, libraries) + if err != nil { + return err + } + + remotePath, err := artifacts.UploadNotebook(context.Background(), path, b) + if err != nil { + return err + } + + os.Remove(path) + + wheelTask.NotebookTask = &jobs.NotebookTask{ + NotebookPath: remotePath, + } + } + return nil +} + +func generateNotebookWrapper(task *jobs.PythonWheelTask, libraries []compute.Library) (string, error) { + pipInstall := "" + for _, lib := range libraries { + pipInstall = pipInstall + "\n" + fmt.Sprintf(INSTALL_WHEEL_CODE, lib.Whl) + } + content := fmt.Sprintf(NOTEBOOK_CODE, pipInstall, generateParameters(task), task.PackageName, task.EntryPoint) + + tmpDir := os.TempDir() + filename := fmt.Sprintf("notebook_%s_%s.ipynb", task.PackageName, task.EntryPoint) + path := filepath.Join(tmpDir, filename) + + err := os.WriteFile(path, bytes.NewBufferString(content).Bytes(), 0644) + return path, err +} + +func generateParameters(task *jobs.PythonWheelTask) string { + params := append([]string{"python"}, task.Parameters...) + for k, v := range task.NamedParameters { + params = append(params, fmt.Sprintf("%s=%s", k, v)) + } + for i := range params { + params[i] = `"` + params[i] + `"` + } + return strings.Join(params, ", ") +} From 2be69af604b58ffd932421d2524231f01d9d0a0f Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Mon, 14 Aug 2023 17:36:37 +0200 Subject: [PATCH 2/6] use sync include and templates --- bundle/artifacts/artifacts.go | 33 ----------- bundle/bundle.go | 17 ++++++ bundle/config/root.go | 3 + bundle/config/sync.go | 6 ++ bundle/deploy/files/sync.go | 13 ++++- bundle/phases/deploy.go | 2 +- bundle/python/transform.go | 101 +++++++++++++++++++++------------- cmd/bundle/sync.go | 8 +++ cmd/sync/sync.go | 13 ++++- libs/fileset/glob.go | 45 +++++++++++++++ libs/sync/sync.go | 73 ++++++++++++++++++++---- 11 files changed, 226 insertions(+), 88 deletions(-) create mode 100644 bundle/config/sync.go create mode 100644 libs/fileset/glob.go diff --git a/bundle/artifacts/artifacts.go b/bundle/artifacts/artifacts.go index 73a6d4471..c54131217 100644 --- a/bundle/artifacts/artifacts.go +++ b/bundle/artifacts/artifacts.go @@ -165,36 +165,3 @@ func getUploadBasePath(b *bundle.Bundle) (string, error) { return path.Join(artifactPath, ".internal"), nil } - -func UploadNotebook(ctx context.Context, notebook string, b *bundle.Bundle) (string, error) { - raw, err := os.ReadFile(notebook) - if err != nil { - return "", fmt.Errorf("unable to read %s: %w", notebook, errors.Unwrap(err)) - } - - uploadPath, err := getUploadBasePath(b) - if err != nil { - return "", err - } - - remotePath := path.Join(uploadPath, path.Base(notebook)) - // Make sure target directory exists. - err = b.WorkspaceClient().Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) - if err != nil { - return "", fmt.Errorf("unable to create directory for %s: %w", remotePath, err) - } - - // Import to workspace. - err = b.WorkspaceClient().Workspace.Import(ctx, workspace.Import{ - Path: remotePath, - Overwrite: true, - Format: workspace.ImportFormatSource, - Content: base64.StdEncoding.EncodeToString(raw), - Language: workspace.LanguagePython, - }) - if err != nil { - return "", fmt.Errorf("unable to import %s: %w", remotePath, err) - } - - return remotePath, nil -} diff --git a/bundle/bundle.go b/bundle/bundle.go index 0147883ca..1281b4f47 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -22,6 +22,8 @@ import ( "github.com/hashicorp/terraform-exec/tfexec" ) +const InternalFolder = ".internal" + type Bundle struct { Config config.Root @@ -151,6 +153,21 @@ func (b *Bundle) CacheDir(paths ...string) (string, error) { return dir, nil } +func (b *Bundle) InternalDir() (string, error) { + cacheDir, err := b.CacheDir() + if err != nil { + return "", err + } + + dir := filepath.Join(cacheDir, InternalFolder) + err = os.MkdirAll(dir, 0700) + if err != nil { + return dir, err + } + + return dir, nil +} + func (b *Bundle) GitRepository() (*git.Repository, error) { rootPath, err := folders.FindDirWithLeaf(b.Config.Path, ".git") if err != nil { diff --git a/bundle/config/root.go b/bundle/config/root.go index 52f887378..7db6771da 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -74,6 +74,9 @@ type Root struct { // If not specified, the code below initializes this field with a // single default-initialized environment called "default". Environments map[string]*Environment `json:"environments,omitempty"` + + // Sync section specifies options for files syncronisation + Sync Sync `json:"sync,omitempty"` } func Load(path string) (*Root, error) { diff --git a/bundle/config/sync.go b/bundle/config/sync.go new file mode 100644 index 000000000..ce420f3be --- /dev/null +++ b/bundle/config/sync.go @@ -0,0 +1,6 @@ +package config + +type Sync struct { + Include []string `json:"include,omitempty"` + Exclude []string `json:"exclude,omitempty"` +} diff --git a/bundle/deploy/files/sync.go b/bundle/deploy/files/sync.go index 84d79dc81..90ea22520 100644 --- a/bundle/deploy/files/sync.go +++ b/bundle/deploy/files/sync.go @@ -14,12 +14,21 @@ func getSync(ctx context.Context, b *bundle.Bundle) (*sync.Sync, error) { return nil, fmt.Errorf("cannot get bundle cache directory: %w", err) } + internalDir, err := b.InternalDir() + if err != nil { + return nil, fmt.Errorf("cannot get bundle internal directory: %w", err) + } + opts := sync.SyncOptions{ - LocalPath: b.Config.Path, - RemotePath: b.Config.Workspace.FilesPath, + LocalPath: b.Config.Path, + RemotePath: b.Config.Workspace.FilesPath, + Include: b.Config.Sync.Include, + Exclude: b.Config.Sync.Exclude, + Full: false, CurrentUser: b.Config.Workspace.CurrentUser.User, + InternalDir: internalDir, SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), } diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 9d9b746e1..5a9a7f2fe 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -18,11 +18,11 @@ func Deploy() bundle.Mutator { bundle.Defer( bundle.Seq( mutator.ValidateGitDetails(), - files.Upload(), libraries.MatchWithArtifacts(), artifacts.CleanUp(), artifacts.UploadAll(), python.TransformWheelTask(), + files.Upload(), terraform.Interpolate(), terraform.Write(), terraform.StatePull(), diff --git a/bundle/python/transform.go b/bundle/python/transform.go index 401bce7f5..659a892e7 100644 --- a/bundle/python/transform.go +++ b/bundle/python/transform.go @@ -1,21 +1,42 @@ package python import ( - "bytes" "context" "fmt" "os" + "path" "path/filepath" "strings" + "text/template" "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/artifacts" "github.com/databricks/cli/bundle/libraries" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" ) -// This mutator takes the wheel task and trasnforms it into notebook +const NOTEBOOK_TEMPLATE = `# Databricks notebook source +%python +{{range .Libraries}} +%pip install --force-reinstall {{.Whl}} +{{end}} + +from contextlib import redirect_stdout +import io +import sys +sys.argv = [{{.Params}}] + +import pkg_resources +_func = pkg_resources.load_entry_point("{{.Task.PackageName}}", "console_scripts", "{{.Task.EntryPoint}}") + +f = io.StringIO() +with redirect_stdout(f): + _func() +s = f.getvalue() +dbutils.notebook.exit(s) +` + +// This mutator takes the wheel task and transforms it into notebook // which installs uploaded wheels using %pip and then calling corresponding // entry point. func TransformWheelTask() bundle.Mutator { @@ -29,29 +50,7 @@ func (m *transform) Name() string { return "python.TransformWheelTask" } -const INSTALL_WHEEL_CODE = `%%pip install --force-reinstall %s` - -const NOTEBOOK_CODE = ` -%%python -%s - -from contextlib import redirect_stdout -import io -import sys -sys.argv = [%s] - -import pkg_resources -_func = pkg_resources.load_entry_point("%s", "console_scripts", "%s") - -f = io.StringIO() -with redirect_stdout(f): - _func() -s = f.getvalue() -dbutils.notebook.exit(s) -` - func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error { - // TODO: do the transformaton only for DBR < 13.1 and (maybe?) existing clusters wheelTasks := libraries.FindAllWheelTasks(b) for _, wheelTask := range wheelTasks { taskDefinition := wheelTask.PythonWheelTask @@ -60,38 +59,62 @@ func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error { wheelTask.PythonWheelTask = nil wheelTask.Libraries = nil - path, err := generateNotebookWrapper(taskDefinition, libraries) + filename, err := generateNotebookWrapper(b, taskDefinition, libraries) if err != nil { return err } - remotePath, err := artifacts.UploadNotebook(context.Background(), path, b) + internalDir, err := b.InternalDir() if err != nil { return err } - os.Remove(path) + internalDirRel, err := filepath.Rel(b.Config.Path, internalDir) + if err != nil { + return err + } + + parts := []string{b.Config.Workspace.FilesPath} + parts = append(parts, strings.Split(internalDirRel, string(os.PathSeparator))...) + parts = append(parts, filename) wheelTask.NotebookTask = &jobs.NotebookTask{ - NotebookPath: remotePath, + NotebookPath: path.Join(parts...), } } return nil } -func generateNotebookWrapper(task *jobs.PythonWheelTask, libraries []compute.Library) (string, error) { - pipInstall := "" - for _, lib := range libraries { - pipInstall = pipInstall + "\n" + fmt.Sprintf(INSTALL_WHEEL_CODE, lib.Whl) +func generateNotebookWrapper(b *bundle.Bundle, task *jobs.PythonWheelTask, libraries []compute.Library) (string, error) { + internalDir, err := b.InternalDir() + if err != nil { + return "", err } - content := fmt.Sprintf(NOTEBOOK_CODE, pipInstall, generateParameters(task), task.PackageName, task.EntryPoint) + notebookName := fmt.Sprintf("notebook_%s_%s", task.PackageName, task.EntryPoint) + path := filepath.Join(internalDir, notebookName+".py") - tmpDir := os.TempDir() - filename := fmt.Sprintf("notebook_%s_%s.ipynb", task.PackageName, task.EntryPoint) - path := filepath.Join(tmpDir, filename) + err = os.MkdirAll(filepath.Dir(path), 0755) + if err != nil { + return "", err + } - err := os.WriteFile(path, bytes.NewBufferString(content).Bytes(), 0644) - return path, err + f, err := os.Create(path) + if err != nil { + return "", err + } + defer f.Close() + + data := map[string]any{ + "Libraries": libraries, + "Params": generateParameters(task), + "Task": task, + } + + t, err := template.New("notebook").Parse(NOTEBOOK_TEMPLATE) + if err != nil { + return "", err + } + return notebookName, t.Execute(f, data) } func generateParameters(task *jobs.PythonWheelTask) string { diff --git a/cmd/bundle/sync.go b/cmd/bundle/sync.go index 2fff7baf5..3c503c6b4 100644 --- a/cmd/bundle/sync.go +++ b/cmd/bundle/sync.go @@ -23,12 +23,20 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) return nil, fmt.Errorf("cannot get bundle cache directory: %w", err) } + internalDir, err := b.InternalDir() + if err != nil { + return nil, fmt.Errorf("cannot get bundle internal directory: %w", err) + } + opts := sync.SyncOptions{ LocalPath: b.Config.Path, RemotePath: b.Config.Workspace.FilesPath, + Include: b.Config.Sync.Include, + Exclude: b.Config.Sync.Exclude, Full: f.full, PollInterval: f.interval, + InternalDir: internalDir, SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), } diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index d2aad0c3f..8a08b30d5 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -35,12 +35,21 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b * return nil, fmt.Errorf("cannot get bundle cache directory: %w", err) } + internalDir, err := b.InternalDir() + if err != nil { + return nil, fmt.Errorf("cannot get bundle internal directory: %w", err) + } + opts := sync.SyncOptions{ - LocalPath: b.Config.Path, - RemotePath: b.Config.Workspace.FilesPath, + LocalPath: b.Config.Path, + RemotePath: b.Config.Workspace.FilesPath, + Include: b.Config.Sync.Include, + Exclude: b.Config.Sync.Exclude, + Full: f.full, PollInterval: f.interval, + InternalDir: internalDir, SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), } diff --git a/libs/fileset/glob.go b/libs/fileset/glob.go new file mode 100644 index 000000000..8593ae86b --- /dev/null +++ b/libs/fileset/glob.go @@ -0,0 +1,45 @@ +package fileset + +import ( + "io/fs" + "os" + "path/filepath" +) + +type GlobSet struct { + root string + patterns []string +} + +func NewGlobSet(root string, includes []string) *GlobSet { + return &GlobSet{root, includes} +} + +// Return all tracked files for Repo +func (s *GlobSet) All() ([]File, error) { + files := make([]File, 0) + for _, pattern := range s.patterns { + matches, err := filepath.Glob(pattern) + if err != nil { + return files, err + } + + for _, match := range matches { + if !filepath.IsAbs(match) { + match = filepath.Join(s.root, match) + } + matchRel, err := filepath.Rel(s.root, match) + if err != nil { + return files, err + } + + stat, err := os.Stat(match) + if err != nil { + return files, err + } + files = append(files, File{fs.FileInfoToDirEntry(stat), match, matchRel}) + } + } + + return files, nil +} diff --git a/libs/sync/sync.go b/libs/sync/sync.go index a299214d0..3a00a192d 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -3,21 +3,27 @@ package sync import ( "context" "fmt" + "path/filepath" "time" "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/fileset" "github.com/databricks/cli/libs/git" "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/iam" + "golang.org/x/exp/slices" ) type SyncOptions struct { LocalPath string RemotePath string + Include []string + Exclude []string Full bool + InternalDir string SnapshotBasePath string PollInterval time.Duration @@ -32,7 +38,10 @@ type SyncOptions struct { type Sync struct { *SyncOptions - fileSet *git.FileSet + fileSet *git.FileSet + includeFileSet *fileset.GlobSet + excludeFileSet *fileset.GlobSet + snapshot *Snapshot filer filer.Filer @@ -52,6 +61,17 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, err } + includes := []string{} + if opts.InternalDir != "" { + includes = append(includes, filepath.Join(opts.InternalDir, "*.*")) + } + if opts.Include != nil { + includes = append(includes, opts.Include...) + } + + includeFileSet := fileset.NewGlobSet(opts.LocalPath, includes) + excludeFileSet := fileset.NewGlobSet(opts.LocalPath, opts.Exclude) + // Verify that the remote path we're about to synchronize to is valid and allowed. err = EnsureRemotePathIsUsable(ctx, opts.WorkspaceClient, opts.RemotePath, opts.CurrentUser) if err != nil { @@ -88,11 +108,13 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return &Sync{ SyncOptions: &opts, - fileSet: fileSet, - snapshot: snapshot, - filer: filer, - notifier: &NopNotifier{}, - seq: 0, + fileSet: fileSet, + includeFileSet: includeFileSet, + excludeFileSet: excludeFileSet, + snapshot: snapshot, + filer: filer, + notifier: &NopNotifier{}, + seq: 0, }, nil } @@ -132,15 +154,44 @@ func (s *Sync) notifyComplete(ctx context.Context, d diff) { } func (s *Sync) RunOnce(ctx context.Context) error { - // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement - // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 - all, err := s.fileSet.All() + all := make([]fileset.File, 0) + if s.SyncOptions.Include == nil { + // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement + // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 + gitFiles, err := s.fileSet.All() + if err != nil { + log.Errorf(ctx, "cannot list files: %s", err) + return err + } + all = append(all, gitFiles...) + } + + include, err := s.includeFileSet.All() if err != nil { - log.Errorf(ctx, "cannot list files: %s", err) + log.Errorf(ctx, "cannot list include files: %s", err) return err } - change, err := s.snapshot.diff(ctx, all) + all = append(all, include...) + + exclude, err := s.excludeFileSet.All() + if err != nil { + log.Errorf(ctx, "cannot list exclude files: %s", err) + return err + } + + files := make([]fileset.File, 0) + for _, f := range all { + if slices.ContainsFunc(exclude, func(a fileset.File) bool { + return a.Absolute == f.Absolute + }) { + continue + } + + files = append(files, f) + } + + change, err := s.snapshot.diff(ctx, files) if err != nil { return err } From 2e3c5ca9ea080fce34d05b40c9b53002daac444d Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Wed, 16 Aug 2023 15:50:00 +0200 Subject: [PATCH 3/6] removed sync include code --- bundle/bundle.go | 17 --------- bundle/config/root.go | 3 -- bundle/config/sync.go | 6 ---- bundle/deploy/files/sync.go | 8 ----- bundle/python/transform.go | 16 +++++++-- cmd/bundle/sync.go | 8 ----- cmd/sync/sync.go | 8 ----- libs/sync/sync.go | 72 ++++++------------------------------- 8 files changed, 24 insertions(+), 114 deletions(-) delete mode 100644 bundle/config/sync.go diff --git a/bundle/bundle.go b/bundle/bundle.go index 1281b4f47..0147883ca 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -22,8 +22,6 @@ import ( "github.com/hashicorp/terraform-exec/tfexec" ) -const InternalFolder = ".internal" - type Bundle struct { Config config.Root @@ -153,21 +151,6 @@ func (b *Bundle) CacheDir(paths ...string) (string, error) { return dir, nil } -func (b *Bundle) InternalDir() (string, error) { - cacheDir, err := b.CacheDir() - if err != nil { - return "", err - } - - dir := filepath.Join(cacheDir, InternalFolder) - err = os.MkdirAll(dir, 0700) - if err != nil { - return dir, err - } - - return dir, nil -} - func (b *Bundle) GitRepository() (*git.Repository, error) { rootPath, err := folders.FindDirWithLeaf(b.Config.Path, ".git") if err != nil { diff --git a/bundle/config/root.go b/bundle/config/root.go index 7db6771da..52f887378 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -74,9 +74,6 @@ type Root struct { // If not specified, the code below initializes this field with a // single default-initialized environment called "default". Environments map[string]*Environment `json:"environments,omitempty"` - - // Sync section specifies options for files syncronisation - Sync Sync `json:"sync,omitempty"` } func Load(path string) (*Root, error) { diff --git a/bundle/config/sync.go b/bundle/config/sync.go deleted file mode 100644 index ce420f3be..000000000 --- a/bundle/config/sync.go +++ /dev/null @@ -1,6 +0,0 @@ -package config - -type Sync struct { - Include []string `json:"include,omitempty"` - Exclude []string `json:"exclude,omitempty"` -} diff --git a/bundle/deploy/files/sync.go b/bundle/deploy/files/sync.go index 90ea22520..d0987d632 100644 --- a/bundle/deploy/files/sync.go +++ b/bundle/deploy/files/sync.go @@ -14,21 +14,13 @@ func getSync(ctx context.Context, b *bundle.Bundle) (*sync.Sync, error) { return nil, fmt.Errorf("cannot get bundle cache directory: %w", err) } - internalDir, err := b.InternalDir() - if err != nil { - return nil, fmt.Errorf("cannot get bundle internal directory: %w", err) - } - opts := sync.SyncOptions{ LocalPath: b.Config.Path, RemotePath: b.Config.Workspace.FilesPath, - Include: b.Config.Sync.Include, - Exclude: b.Config.Sync.Exclude, Full: false, CurrentUser: b.Config.Workspace.CurrentUser.User, - InternalDir: internalDir, SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), } diff --git a/bundle/python/transform.go b/bundle/python/transform.go index 659a892e7..42a5cf6e5 100644 --- a/bundle/python/transform.go +++ b/bundle/python/transform.go @@ -64,7 +64,7 @@ func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error { return err } - internalDir, err := b.InternalDir() + internalDir, err := getInternalDir(b) if err != nil { return err } @@ -85,11 +85,21 @@ func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error { return nil } -func generateNotebookWrapper(b *bundle.Bundle, task *jobs.PythonWheelTask, libraries []compute.Library) (string, error) { - internalDir, err := b.InternalDir() +func getInternalDir(b *bundle.Bundle) (string, error) { + cacheDir, err := b.CacheDir() if err != nil { return "", err } + internalDir := filepath.Join(cacheDir, ".internal") + return internalDir, nil +} + +func generateNotebookWrapper(b *bundle.Bundle, task *jobs.PythonWheelTask, libraries []compute.Library) (string, error) { + internalDir, err := getInternalDir(b) + if err != nil { + return "", err + } + notebookName := fmt.Sprintf("notebook_%s_%s", task.PackageName, task.EntryPoint) path := filepath.Join(internalDir, notebookName+".py") diff --git a/cmd/bundle/sync.go b/cmd/bundle/sync.go index 3c503c6b4..2fff7baf5 100644 --- a/cmd/bundle/sync.go +++ b/cmd/bundle/sync.go @@ -23,20 +23,12 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) return nil, fmt.Errorf("cannot get bundle cache directory: %w", err) } - internalDir, err := b.InternalDir() - if err != nil { - return nil, fmt.Errorf("cannot get bundle internal directory: %w", err) - } - opts := sync.SyncOptions{ LocalPath: b.Config.Path, RemotePath: b.Config.Workspace.FilesPath, - Include: b.Config.Sync.Include, - Exclude: b.Config.Sync.Exclude, Full: f.full, PollInterval: f.interval, - InternalDir: internalDir, SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), } diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index 8a08b30d5..a9380b756 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -35,21 +35,13 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b * return nil, fmt.Errorf("cannot get bundle cache directory: %w", err) } - internalDir, err := b.InternalDir() - if err != nil { - return nil, fmt.Errorf("cannot get bundle internal directory: %w", err) - } - opts := sync.SyncOptions{ LocalPath: b.Config.Path, RemotePath: b.Config.Workspace.FilesPath, - Include: b.Config.Sync.Include, - Exclude: b.Config.Sync.Exclude, Full: f.full, PollInterval: f.interval, - InternalDir: internalDir, SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), } diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 3a00a192d..4dc03c57e 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -3,27 +3,21 @@ package sync import ( "context" "fmt" - "path/filepath" "time" "github.com/databricks/cli/libs/filer" - "github.com/databricks/cli/libs/fileset" "github.com/databricks/cli/libs/git" "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/iam" - "golang.org/x/exp/slices" ) type SyncOptions struct { LocalPath string RemotePath string - Include []string - Exclude []string Full bool - InternalDir string SnapshotBasePath string PollInterval time.Duration @@ -38,9 +32,7 @@ type SyncOptions struct { type Sync struct { *SyncOptions - fileSet *git.FileSet - includeFileSet *fileset.GlobSet - excludeFileSet *fileset.GlobSet + fileSet *git.FileSet snapshot *Snapshot filer filer.Filer @@ -61,17 +53,6 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, err } - includes := []string{} - if opts.InternalDir != "" { - includes = append(includes, filepath.Join(opts.InternalDir, "*.*")) - } - if opts.Include != nil { - includes = append(includes, opts.Include...) - } - - includeFileSet := fileset.NewGlobSet(opts.LocalPath, includes) - excludeFileSet := fileset.NewGlobSet(opts.LocalPath, opts.Exclude) - // Verify that the remote path we're about to synchronize to is valid and allowed. err = EnsureRemotePathIsUsable(ctx, opts.WorkspaceClient, opts.RemotePath, opts.CurrentUser) if err != nil { @@ -108,13 +89,11 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return &Sync{ SyncOptions: &opts, - fileSet: fileSet, - includeFileSet: includeFileSet, - excludeFileSet: excludeFileSet, - snapshot: snapshot, - filer: filer, - notifier: &NopNotifier{}, - seq: 0, + fileSet: fileSet, + snapshot: snapshot, + filer: filer, + notifier: &NopNotifier{}, + seq: 0, }, nil } @@ -154,44 +133,15 @@ func (s *Sync) notifyComplete(ctx context.Context, d diff) { } func (s *Sync) RunOnce(ctx context.Context) error { - all := make([]fileset.File, 0) - if s.SyncOptions.Include == nil { - // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement - // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 - gitFiles, err := s.fileSet.All() - if err != nil { - log.Errorf(ctx, "cannot list files: %s", err) - return err - } - all = append(all, gitFiles...) - } - - include, err := s.includeFileSet.All() + // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement + // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 + all, err := s.fileSet.All() if err != nil { - log.Errorf(ctx, "cannot list include files: %s", err) + log.Errorf(ctx, "cannot list files: %s", err) return err } - all = append(all, include...) - - exclude, err := s.excludeFileSet.All() - if err != nil { - log.Errorf(ctx, "cannot list exclude files: %s", err) - return err - } - - files := make([]fileset.File, 0) - for _, f := range all { - if slices.ContainsFunc(exclude, func(a fileset.File) bool { - return a.Absolute == f.Absolute - }) { - continue - } - - files = append(files, f) - } - - change, err := s.snapshot.diff(ctx, files) + change, err := s.snapshot.diff(ctx, all) if err != nil { return err } From a9ed5df9fed671276b776e18f622b8d5716d997d Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Wed, 16 Aug 2023 16:34:46 +0200 Subject: [PATCH 4/6] add tests --- bundle/deploy/files/sync.go | 5 ++- bundle/python/transform.go | 14 +++++++-- bundle/python/transform_test.go | 55 +++++++++++++++++++++++++++++++++ cmd/sync/sync.go | 5 ++- libs/fileset/glob.go | 45 --------------------------- libs/sync/sync.go | 3 +- 6 files changed, 71 insertions(+), 56 deletions(-) create mode 100644 bundle/python/transform_test.go delete mode 100644 libs/fileset/glob.go diff --git a/bundle/deploy/files/sync.go b/bundle/deploy/files/sync.go index d0987d632..84d79dc81 100644 --- a/bundle/deploy/files/sync.go +++ b/bundle/deploy/files/sync.go @@ -15,9 +15,8 @@ func getSync(ctx context.Context, b *bundle.Bundle) (*sync.Sync, error) { } opts := sync.SyncOptions{ - LocalPath: b.Config.Path, - RemotePath: b.Config.Workspace.FilesPath, - + LocalPath: b.Config.Path, + RemotePath: b.Config.Workspace.FilesPath, Full: false, CurrentUser: b.Config.Workspace.CurrentUser.User, diff --git a/bundle/python/transform.go b/bundle/python/transform.go index 42a5cf6e5..aa1a3db52 100644 --- a/bundle/python/transform.go +++ b/bundle/python/transform.go @@ -114,9 +114,14 @@ func generateNotebookWrapper(b *bundle.Bundle, task *jobs.PythonWheelTask, libra } defer f.Close() + params, err := generateParameters(task) + if err != nil { + return "", err + } + data := map[string]any{ "Libraries": libraries, - "Params": generateParameters(task), + "Params": params, "Task": task, } @@ -127,7 +132,10 @@ func generateNotebookWrapper(b *bundle.Bundle, task *jobs.PythonWheelTask, libra return notebookName, t.Execute(f, data) } -func generateParameters(task *jobs.PythonWheelTask) string { +func generateParameters(task *jobs.PythonWheelTask) (string, error) { + if task.Parameters != nil && task.NamedParameters != nil { + return "", fmt.Errorf("not allowed to pass both paramaters and named_parameters") + } params := append([]string{"python"}, task.Parameters...) for k, v := range task.NamedParameters { params = append(params, fmt.Sprintf("%s=%s", k, v)) @@ -135,5 +143,5 @@ func generateParameters(task *jobs.PythonWheelTask) string { for i := range params { params[i] = `"` + params[i] + `"` } - return strings.Join(params, ", ") + return strings.Join(params, ", "), nil } diff --git a/bundle/python/transform_test.go b/bundle/python/transform_test.go new file mode 100644 index 000000000..6b3c5f551 --- /dev/null +++ b/bundle/python/transform_test.go @@ -0,0 +1,55 @@ +package python + +import ( + "testing" + + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/require" +) + +type testCase struct { + Actual []string + Expected string +} +type NamedParams map[string]string +type testCaseNamed struct { + Actual NamedParams + Expected string +} + +var paramsTestCases []testCase = []testCase{ + {[]string{}, `"python"`}, + {[]string{"a"}, `"python", "a"`}, + {[]string{"a", "b"}, `"python", "a", "b"`}, + {[]string{"123!@#$%^&*()-="}, `"python", "123!@#$%^&*()-="`}, +} + +var paramsTestCasesNamed []testCaseNamed = []testCaseNamed{ + {NamedParams{}, `"python"`}, + {NamedParams{"a": "1"}, `"python", "a=1"`}, + {NamedParams{"a": "1", "b": "2"}, `"python", "a=1", "b=2"`}, +} + +func TestGenerateParameters(t *testing.T) { + for _, c := range paramsTestCases { + task := &jobs.PythonWheelTask{Parameters: c.Actual} + result, err := generateParameters(task) + require.NoError(t, err) + require.Equal(t, c.Expected, result) + } +} + +func TestGenerateNamedParameters(t *testing.T) { + for _, c := range paramsTestCasesNamed { + task := &jobs.PythonWheelTask{NamedParameters: c.Actual} + result, err := generateParameters(task) + require.NoError(t, err) + require.Equal(t, c.Expected, result) + } +} + +func TestGenerateBoth(t *testing.T) { + task := &jobs.PythonWheelTask{NamedParameters: map[string]string{"a": "1"}, Parameters: []string{"b"}} + _, err := generateParameters(task) + require.Error(t, err) +} diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index a9380b756..d2aad0c3f 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -36,9 +36,8 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b * } opts := sync.SyncOptions{ - LocalPath: b.Config.Path, - RemotePath: b.Config.Workspace.FilesPath, - + LocalPath: b.Config.Path, + RemotePath: b.Config.Workspace.FilesPath, Full: f.full, PollInterval: f.interval, diff --git a/libs/fileset/glob.go b/libs/fileset/glob.go deleted file mode 100644 index 8593ae86b..000000000 --- a/libs/fileset/glob.go +++ /dev/null @@ -1,45 +0,0 @@ -package fileset - -import ( - "io/fs" - "os" - "path/filepath" -) - -type GlobSet struct { - root string - patterns []string -} - -func NewGlobSet(root string, includes []string) *GlobSet { - return &GlobSet{root, includes} -} - -// Return all tracked files for Repo -func (s *GlobSet) All() ([]File, error) { - files := make([]File, 0) - for _, pattern := range s.patterns { - matches, err := filepath.Glob(pattern) - if err != nil { - return files, err - } - - for _, match := range matches { - if !filepath.IsAbs(match) { - match = filepath.Join(s.root, match) - } - matchRel, err := filepath.Rel(s.root, match) - if err != nil { - return files, err - } - - stat, err := os.Stat(match) - if err != nil { - return files, err - } - files = append(files, File{fs.FileInfoToDirEntry(stat), match, matchRel}) - } - } - - return files, nil -} diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 4dc03c57e..a299214d0 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -32,8 +32,7 @@ type SyncOptions struct { type Sync struct { *SyncOptions - fileSet *git.FileSet - + fileSet *git.FileSet snapshot *Snapshot filer filer.Filer From 476439c48ef14ceb433d399687080582b022775c Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Thu, 17 Aug 2023 17:50:00 +0200 Subject: [PATCH 5/6] moved to separate function --- bundle/python/transform.go | 59 ++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/bundle/python/transform.go b/bundle/python/transform.go index aa1a3db52..cd7c826f9 100644 --- a/bundle/python/transform.go +++ b/bundle/python/transform.go @@ -53,38 +53,47 @@ func (m *transform) Name() string { func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error { wheelTasks := libraries.FindAllWheelTasks(b) for _, wheelTask := range wheelTasks { - taskDefinition := wheelTask.PythonWheelTask - libraries := wheelTask.Libraries - - wheelTask.PythonWheelTask = nil - wheelTask.Libraries = nil - - filename, err := generateNotebookWrapper(b, taskDefinition, libraries) + err := generateNotebookTrampoline(b, wheelTask) if err != nil { return err } - - internalDir, err := getInternalDir(b) - if err != nil { - return err - } - - internalDirRel, err := filepath.Rel(b.Config.Path, internalDir) - if err != nil { - return err - } - - parts := []string{b.Config.Workspace.FilesPath} - parts = append(parts, strings.Split(internalDirRel, string(os.PathSeparator))...) - parts = append(parts, filename) - - wheelTask.NotebookTask = &jobs.NotebookTask{ - NotebookPath: path.Join(parts...), - } } return nil } +func generateNotebookTrampoline(b *bundle.Bundle, wheelTask *jobs.Task) error { + taskDefinition := wheelTask.PythonWheelTask + libraries := wheelTask.Libraries + + wheelTask.PythonWheelTask = nil + wheelTask.Libraries = nil + + filename, err := generateNotebookWrapper(b, taskDefinition, libraries) + if err != nil { + return err + } + + internalDir, err := getInternalDir(b) + if err != nil { + return err + } + + internalDirRel, err := filepath.Rel(b.Config.Path, internalDir) + if err != nil { + return err + } + + parts := []string{b.Config.Workspace.FilesPath} + parts = append(parts, strings.Split(internalDirRel, string(os.PathSeparator))...) + parts = append(parts, filename) + + wheelTask.NotebookTask = &jobs.NotebookTask{ + NotebookPath: path.Join(parts...), + } + + return nil +} + func getInternalDir(b *bundle.Bundle) (string, error) { cacheDir, err := b.CacheDir() if err != nil { From aeb77d94c3d35eb8ccf1b50bb62cce091f687032 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Fri, 18 Aug 2023 11:23:45 +0200 Subject: [PATCH 6/6] more reusable code --- bundle/config/mutator/trampoline.go | 95 ++++++++++++++++++ bundle/config/mutator/trampoline_test.go | 90 +++++++++++++++++ bundle/python/transform.go | 119 +++++------------------ bundle/python/transform_test.go | 2 + 4 files changed, 211 insertions(+), 95 deletions(-) create mode 100644 bundle/config/mutator/trampoline.go create mode 100644 bundle/config/mutator/trampoline_test.go diff --git a/bundle/config/mutator/trampoline.go b/bundle/config/mutator/trampoline.go new file mode 100644 index 000000000..6265c28d5 --- /dev/null +++ b/bundle/config/mutator/trampoline.go @@ -0,0 +1,95 @@ +package mutator + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + "text/template" + + "github.com/databricks/cli/bundle" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +type fnTemplateData func(task *jobs.Task) (map[string]any, error) +type fnCleanUp func(task *jobs.Task) +type fnTasks func(b *bundle.Bundle) []*jobs.Task + +type trampoline struct { + name string + getTasks fnTasks + templateData fnTemplateData + cleanUp fnCleanUp + template string +} + +func NewTrampoline( + name string, + tasks fnTasks, + templateData fnTemplateData, + cleanUp fnCleanUp, + template string, +) *trampoline { + return &trampoline{name, tasks, templateData, cleanUp, template} +} + +func (m *trampoline) Name() string { + return fmt.Sprintf("trampoline(%s)", m.name) +} + +func (m *trampoline) Apply(ctx context.Context, b *bundle.Bundle) error { + tasks := m.getTasks(b) + for _, task := range tasks { + err := m.generateNotebookWrapper(b, task) + if err != nil { + return err + } + } + return nil +} + +func (m *trampoline) generateNotebookWrapper(b *bundle.Bundle, task *jobs.Task) error { + internalDir, err := b.InternalDir() + if err != nil { + return err + } + + notebookName := fmt.Sprintf("notebook_%s", task.TaskKey) + localNotebookPath := filepath.Join(internalDir, notebookName+".py") + + err = os.MkdirAll(filepath.Dir(localNotebookPath), 0755) + if err != nil { + return err + } + + f, err := os.Create(localNotebookPath) + if err != nil { + return err + } + defer f.Close() + + data, err := m.templateData(task) + if err != nil { + return err + } + + t, err := template.New(notebookName).Parse(m.template) + if err != nil { + return err + } + + internalDirRel, err := filepath.Rel(b.Config.Path, internalDir) + if err != nil { + return err + } + + m.cleanUp(task) + remotePath := path.Join(b.Config.Workspace.FilesPath, filepath.ToSlash(internalDirRel), notebookName) + + task.NotebookTask = &jobs.NotebookTask{ + NotebookPath: remotePath, + } + + return t.Execute(f, data) +} diff --git a/bundle/config/mutator/trampoline_test.go b/bundle/config/mutator/trampoline_test.go new file mode 100644 index 000000000..9c176ff06 --- /dev/null +++ b/bundle/config/mutator/trampoline_test.go @@ -0,0 +1,90 @@ +package mutator + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/require" +) + +func getTasks(b *bundle.Bundle) []*jobs.Task { + tasks := make([]*jobs.Task, 0) + for k := range b.Config.Resources.Jobs["test"].Tasks { + tasks = append(tasks, &b.Config.Resources.Jobs["test"].Tasks[k]) + } + + return tasks +} + +func templateData(task *jobs.Task) (map[string]any, error) { + if task.PythonWheelTask == nil { + return nil, fmt.Errorf("PythonWheelTask cannot be nil") + } + + data := make(map[string]any) + data["MyName"] = "Trampoline" + return data, nil +} + +func cleanUp(task *jobs.Task) { + task.PythonWheelTask = nil +} + +func TestGenerateTrampoline(t *testing.T) { + tmpDir := t.TempDir() + + tasks := []jobs.Task{ + { + TaskKey: "to_trampoline", + PythonWheelTask: &jobs.PythonWheelTask{ + PackageName: "test", + EntryPoint: "run", + }}, + } + + b := &bundle.Bundle{ + Config: config.Root{ + Path: tmpDir, + Bundle: config.Bundle{ + Target: "development", + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "test": { + Paths: resources.Paths{ + ConfigFilePath: tmpDir, + }, + JobSettings: &jobs.JobSettings{ + Tasks: tasks, + }, + }, + }, + }, + }, + } + ctx := context.Background() + + trampoline := NewTrampoline("test_trampoline", getTasks, templateData, cleanUp, "Hello from {{.MyName}}") + err := bundle.Apply(ctx, b, trampoline) + require.NoError(t, err) + + dir, err := b.InternalDir() + require.NoError(t, err) + filename := filepath.Join(dir, "notebook_to_trampoline.py") + + bytes, err := os.ReadFile(filename) + require.NoError(t, err) + + require.Equal(t, "Hello from Trampoline", string(bytes)) + + task := b.Config.Resources.Jobs["test"].Tasks[0] + require.Equal(t, task.NotebookTask.NotebookPath, ".databricks/bundle/development/.internal/notebook_to_trampoline") + require.Nil(t, task.PythonWheelTask) +} diff --git a/bundle/python/transform.go b/bundle/python/transform.go index cd7c826f9..c7938bab6 100644 --- a/bundle/python/transform.go +++ b/bundle/python/transform.go @@ -1,17 +1,13 @@ package python import ( - "context" "fmt" - "os" - "path" - "path/filepath" + "strconv" "strings" - "text/template" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config/mutator" "github.com/databricks/cli/bundle/libraries" - "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" ) @@ -40,105 +36,32 @@ dbutils.notebook.exit(s) // which installs uploaded wheels using %pip and then calling corresponding // entry point. func TransformWheelTask() bundle.Mutator { - return &transform{} + return mutator.NewTrampoline( + "python_wheel", + getTasks, + generateTemplateData, + cleanUpTask, + NOTEBOOK_TEMPLATE, + ) } -type transform struct { +func getTasks(b *bundle.Bundle) []*jobs.Task { + return libraries.FindAllWheelTasks(b) } -func (m *transform) Name() string { - return "python.TransformWheelTask" -} - -func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error { - wheelTasks := libraries.FindAllWheelTasks(b) - for _, wheelTask := range wheelTasks { - err := generateNotebookTrampoline(b, wheelTask) - if err != nil { - return err - } - } - return nil -} - -func generateNotebookTrampoline(b *bundle.Bundle, wheelTask *jobs.Task) error { - taskDefinition := wheelTask.PythonWheelTask - libraries := wheelTask.Libraries - - wheelTask.PythonWheelTask = nil - wheelTask.Libraries = nil - - filename, err := generateNotebookWrapper(b, taskDefinition, libraries) +func generateTemplateData(task *jobs.Task) (map[string]any, error) { + params, err := generateParameters(task.PythonWheelTask) if err != nil { - return err - } - - internalDir, err := getInternalDir(b) - if err != nil { - return err - } - - internalDirRel, err := filepath.Rel(b.Config.Path, internalDir) - if err != nil { - return err - } - - parts := []string{b.Config.Workspace.FilesPath} - parts = append(parts, strings.Split(internalDirRel, string(os.PathSeparator))...) - parts = append(parts, filename) - - wheelTask.NotebookTask = &jobs.NotebookTask{ - NotebookPath: path.Join(parts...), - } - - return nil -} - -func getInternalDir(b *bundle.Bundle) (string, error) { - cacheDir, err := b.CacheDir() - if err != nil { - return "", err - } - internalDir := filepath.Join(cacheDir, ".internal") - return internalDir, nil -} - -func generateNotebookWrapper(b *bundle.Bundle, task *jobs.PythonWheelTask, libraries []compute.Library) (string, error) { - internalDir, err := getInternalDir(b) - if err != nil { - return "", err - } - - notebookName := fmt.Sprintf("notebook_%s_%s", task.PackageName, task.EntryPoint) - path := filepath.Join(internalDir, notebookName+".py") - - err = os.MkdirAll(filepath.Dir(path), 0755) - if err != nil { - return "", err - } - - f, err := os.Create(path) - if err != nil { - return "", err - } - defer f.Close() - - params, err := generateParameters(task) - if err != nil { - return "", err + return nil, err } data := map[string]any{ - "Libraries": libraries, + "Libraries": task.Libraries, "Params": params, - "Task": task, + "Task": task.PythonWheelTask, } - t, err := template.New("notebook").Parse(NOTEBOOK_TEMPLATE) - if err != nil { - return "", err - } - return notebookName, t.Execute(f, data) + return data, nil } func generateParameters(task *jobs.PythonWheelTask) (string, error) { @@ -149,8 +72,14 @@ func generateParameters(task *jobs.PythonWheelTask) (string, error) { for k, v := range task.NamedParameters { params = append(params, fmt.Sprintf("%s=%s", k, v)) } + for i := range params { - params[i] = `"` + params[i] + `"` + params[i] = strconv.Quote(params[i]) } return strings.Join(params, ", "), nil } + +func cleanUpTask(task *jobs.Task) { + task.PythonWheelTask = nil + task.Libraries = nil +} diff --git a/bundle/python/transform_test.go b/bundle/python/transform_test.go index 6b3c5f551..ada64e7a9 100644 --- a/bundle/python/transform_test.go +++ b/bundle/python/transform_test.go @@ -22,12 +22,14 @@ var paramsTestCases []testCase = []testCase{ {[]string{"a"}, `"python", "a"`}, {[]string{"a", "b"}, `"python", "a", "b"`}, {[]string{"123!@#$%^&*()-="}, `"python", "123!@#$%^&*()-="`}, + {[]string{`{"a": 1}`}, `"python", "{\"a\": 1}"`}, } var paramsTestCasesNamed []testCaseNamed = []testCaseNamed{ {NamedParams{}, `"python"`}, {NamedParams{"a": "1"}, `"python", "a=1"`}, {NamedParams{"a": "1", "b": "2"}, `"python", "a=1", "b=2"`}, + {NamedParams{"data": `{"a": 1}`}, `"python", "data={\"a\": 1}"`}, } func TestGenerateParameters(t *testing.T) {