From 2be69af604b58ffd932421d2524231f01d9d0a0f Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Mon, 14 Aug 2023 17:36:37 +0200 Subject: [PATCH] use sync include and templates --- bundle/artifacts/artifacts.go | 33 ----------- bundle/bundle.go | 17 ++++++ bundle/config/root.go | 3 + bundle/config/sync.go | 6 ++ bundle/deploy/files/sync.go | 13 ++++- bundle/phases/deploy.go | 2 +- bundle/python/transform.go | 101 +++++++++++++++++++++------------- cmd/bundle/sync.go | 8 +++ cmd/sync/sync.go | 13 ++++- libs/fileset/glob.go | 45 +++++++++++++++ libs/sync/sync.go | 73 ++++++++++++++++++++---- 11 files changed, 226 insertions(+), 88 deletions(-) create mode 100644 bundle/config/sync.go create mode 100644 libs/fileset/glob.go diff --git a/bundle/artifacts/artifacts.go b/bundle/artifacts/artifacts.go index 73a6d447..c5413121 100644 --- a/bundle/artifacts/artifacts.go +++ b/bundle/artifacts/artifacts.go @@ -165,36 +165,3 @@ func getUploadBasePath(b *bundle.Bundle) (string, error) { return path.Join(artifactPath, ".internal"), nil } - -func UploadNotebook(ctx context.Context, notebook string, b *bundle.Bundle) (string, error) { - raw, err := os.ReadFile(notebook) - if err != nil { - return "", fmt.Errorf("unable to read %s: %w", notebook, errors.Unwrap(err)) - } - - uploadPath, err := getUploadBasePath(b) - if err != nil { - return "", err - } - - remotePath := path.Join(uploadPath, path.Base(notebook)) - // Make sure target directory exists. - err = b.WorkspaceClient().Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) - if err != nil { - return "", fmt.Errorf("unable to create directory for %s: %w", remotePath, err) - } - - // Import to workspace. - err = b.WorkspaceClient().Workspace.Import(ctx, workspace.Import{ - Path: remotePath, - Overwrite: true, - Format: workspace.ImportFormatSource, - Content: base64.StdEncoding.EncodeToString(raw), - Language: workspace.LanguagePython, - }) - if err != nil { - return "", fmt.Errorf("unable to import %s: %w", remotePath, err) - } - - return remotePath, nil -} diff --git a/bundle/bundle.go b/bundle/bundle.go index 0147883c..1281b4f4 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -22,6 +22,8 @@ import ( "github.com/hashicorp/terraform-exec/tfexec" ) +const InternalFolder = ".internal" + type Bundle struct { Config config.Root @@ -151,6 +153,21 @@ func (b *Bundle) CacheDir(paths ...string) (string, error) { return dir, nil } +func (b *Bundle) InternalDir() (string, error) { + cacheDir, err := b.CacheDir() + if err != nil { + return "", err + } + + dir := filepath.Join(cacheDir, InternalFolder) + err = os.MkdirAll(dir, 0700) + if err != nil { + return dir, err + } + + return dir, nil +} + func (b *Bundle) GitRepository() (*git.Repository, error) { rootPath, err := folders.FindDirWithLeaf(b.Config.Path, ".git") if err != nil { diff --git a/bundle/config/root.go b/bundle/config/root.go index 52f88737..7db6771d 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -74,6 +74,9 @@ type Root struct { // If not specified, the code below initializes this field with a // single default-initialized environment called "default". Environments map[string]*Environment `json:"environments,omitempty"` + + // Sync section specifies options for files syncronisation + Sync Sync `json:"sync,omitempty"` } func Load(path string) (*Root, error) { diff --git a/bundle/config/sync.go b/bundle/config/sync.go new file mode 100644 index 00000000..ce420f3b --- /dev/null +++ b/bundle/config/sync.go @@ -0,0 +1,6 @@ +package config + +type Sync struct { + Include []string `json:"include,omitempty"` + Exclude []string `json:"exclude,omitempty"` +} diff --git a/bundle/deploy/files/sync.go b/bundle/deploy/files/sync.go index 84d79dc8..90ea2252 100644 --- a/bundle/deploy/files/sync.go +++ b/bundle/deploy/files/sync.go @@ -14,12 +14,21 @@ func getSync(ctx context.Context, b *bundle.Bundle) (*sync.Sync, error) { return nil, fmt.Errorf("cannot get bundle cache directory: %w", err) } + internalDir, err := b.InternalDir() + if err != nil { + return nil, fmt.Errorf("cannot get bundle internal directory: %w", err) + } + opts := sync.SyncOptions{ - LocalPath: b.Config.Path, - RemotePath: b.Config.Workspace.FilesPath, + LocalPath: b.Config.Path, + RemotePath: b.Config.Workspace.FilesPath, + Include: b.Config.Sync.Include, + Exclude: b.Config.Sync.Exclude, + Full: false, CurrentUser: b.Config.Workspace.CurrentUser.User, + InternalDir: internalDir, SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), } diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 9d9b746e..5a9a7f2f 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -18,11 +18,11 @@ func Deploy() bundle.Mutator { bundle.Defer( bundle.Seq( mutator.ValidateGitDetails(), - files.Upload(), libraries.MatchWithArtifacts(), artifacts.CleanUp(), artifacts.UploadAll(), python.TransformWheelTask(), + files.Upload(), terraform.Interpolate(), terraform.Write(), terraform.StatePull(), diff --git a/bundle/python/transform.go b/bundle/python/transform.go index 401bce7f..659a892e 100644 --- a/bundle/python/transform.go +++ b/bundle/python/transform.go @@ -1,21 +1,42 @@ package python import ( - "bytes" "context" "fmt" "os" + "path" "path/filepath" "strings" + "text/template" "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/artifacts" "github.com/databricks/cli/bundle/libraries" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" ) -// This mutator takes the wheel task and trasnforms it into notebook +const NOTEBOOK_TEMPLATE = `# Databricks notebook source +%python +{{range .Libraries}} +%pip install --force-reinstall {{.Whl}} +{{end}} + +from contextlib import redirect_stdout +import io +import sys +sys.argv = [{{.Params}}] + +import pkg_resources +_func = pkg_resources.load_entry_point("{{.Task.PackageName}}", "console_scripts", "{{.Task.EntryPoint}}") + +f = io.StringIO() +with redirect_stdout(f): + _func() +s = f.getvalue() +dbutils.notebook.exit(s) +` + +// This mutator takes the wheel task and transforms it into notebook // which installs uploaded wheels using %pip and then calling corresponding // entry point. func TransformWheelTask() bundle.Mutator { @@ -29,29 +50,7 @@ func (m *transform) Name() string { return "python.TransformWheelTask" } -const INSTALL_WHEEL_CODE = `%%pip install --force-reinstall %s` - -const NOTEBOOK_CODE = ` -%%python -%s - -from contextlib import redirect_stdout -import io -import sys -sys.argv = [%s] - -import pkg_resources -_func = pkg_resources.load_entry_point("%s", "console_scripts", "%s") - -f = io.StringIO() -with redirect_stdout(f): - _func() -s = f.getvalue() -dbutils.notebook.exit(s) -` - func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error { - // TODO: do the transformaton only for DBR < 13.1 and (maybe?) existing clusters wheelTasks := libraries.FindAllWheelTasks(b) for _, wheelTask := range wheelTasks { taskDefinition := wheelTask.PythonWheelTask @@ -60,38 +59,62 @@ func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error { wheelTask.PythonWheelTask = nil wheelTask.Libraries = nil - path, err := generateNotebookWrapper(taskDefinition, libraries) + filename, err := generateNotebookWrapper(b, taskDefinition, libraries) if err != nil { return err } - remotePath, err := artifacts.UploadNotebook(context.Background(), path, b) + internalDir, err := b.InternalDir() if err != nil { return err } - os.Remove(path) + internalDirRel, err := filepath.Rel(b.Config.Path, internalDir) + if err != nil { + return err + } + + parts := []string{b.Config.Workspace.FilesPath} + parts = append(parts, strings.Split(internalDirRel, string(os.PathSeparator))...) + parts = append(parts, filename) wheelTask.NotebookTask = &jobs.NotebookTask{ - NotebookPath: remotePath, + NotebookPath: path.Join(parts...), } } return nil } -func generateNotebookWrapper(task *jobs.PythonWheelTask, libraries []compute.Library) (string, error) { - pipInstall := "" - for _, lib := range libraries { - pipInstall = pipInstall + "\n" + fmt.Sprintf(INSTALL_WHEEL_CODE, lib.Whl) +func generateNotebookWrapper(b *bundle.Bundle, task *jobs.PythonWheelTask, libraries []compute.Library) (string, error) { + internalDir, err := b.InternalDir() + if err != nil { + return "", err } - content := fmt.Sprintf(NOTEBOOK_CODE, pipInstall, generateParameters(task), task.PackageName, task.EntryPoint) + notebookName := fmt.Sprintf("notebook_%s_%s", task.PackageName, task.EntryPoint) + path := filepath.Join(internalDir, notebookName+".py") - tmpDir := os.TempDir() - filename := fmt.Sprintf("notebook_%s_%s.ipynb", task.PackageName, task.EntryPoint) - path := filepath.Join(tmpDir, filename) + err = os.MkdirAll(filepath.Dir(path), 0755) + if err != nil { + return "", err + } - err := os.WriteFile(path, bytes.NewBufferString(content).Bytes(), 0644) - return path, err + f, err := os.Create(path) + if err != nil { + return "", err + } + defer f.Close() + + data := map[string]any{ + "Libraries": libraries, + "Params": generateParameters(task), + "Task": task, + } + + t, err := template.New("notebook").Parse(NOTEBOOK_TEMPLATE) + if err != nil { + return "", err + } + return notebookName, t.Execute(f, data) } func generateParameters(task *jobs.PythonWheelTask) string { diff --git a/cmd/bundle/sync.go b/cmd/bundle/sync.go index 2fff7baf..3c503c6b 100644 --- a/cmd/bundle/sync.go +++ b/cmd/bundle/sync.go @@ -23,12 +23,20 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) return nil, fmt.Errorf("cannot get bundle cache directory: %w", err) } + internalDir, err := b.InternalDir() + if err != nil { + return nil, fmt.Errorf("cannot get bundle internal directory: %w", err) + } + opts := sync.SyncOptions{ LocalPath: b.Config.Path, RemotePath: b.Config.Workspace.FilesPath, + Include: b.Config.Sync.Include, + Exclude: b.Config.Sync.Exclude, Full: f.full, PollInterval: f.interval, + InternalDir: internalDir, SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), } diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index d2aad0c3..8a08b30d 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -35,12 +35,21 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b * return nil, fmt.Errorf("cannot get bundle cache directory: %w", err) } + internalDir, err := b.InternalDir() + if err != nil { + return nil, fmt.Errorf("cannot get bundle internal directory: %w", err) + } + opts := sync.SyncOptions{ - LocalPath: b.Config.Path, - RemotePath: b.Config.Workspace.FilesPath, + LocalPath: b.Config.Path, + RemotePath: b.Config.Workspace.FilesPath, + Include: b.Config.Sync.Include, + Exclude: b.Config.Sync.Exclude, + Full: f.full, PollInterval: f.interval, + InternalDir: internalDir, SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), } diff --git a/libs/fileset/glob.go b/libs/fileset/glob.go new file mode 100644 index 00000000..8593ae86 --- /dev/null +++ b/libs/fileset/glob.go @@ -0,0 +1,45 @@ +package fileset + +import ( + "io/fs" + "os" + "path/filepath" +) + +type GlobSet struct { + root string + patterns []string +} + +func NewGlobSet(root string, includes []string) *GlobSet { + return &GlobSet{root, includes} +} + +// Return all tracked files for Repo +func (s *GlobSet) All() ([]File, error) { + files := make([]File, 0) + for _, pattern := range s.patterns { + matches, err := filepath.Glob(pattern) + if err != nil { + return files, err + } + + for _, match := range matches { + if !filepath.IsAbs(match) { + match = filepath.Join(s.root, match) + } + matchRel, err := filepath.Rel(s.root, match) + if err != nil { + return files, err + } + + stat, err := os.Stat(match) + if err != nil { + return files, err + } + files = append(files, File{fs.FileInfoToDirEntry(stat), match, matchRel}) + } + } + + return files, nil +} diff --git a/libs/sync/sync.go b/libs/sync/sync.go index a299214d..3a00a192 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -3,21 +3,27 @@ package sync import ( "context" "fmt" + "path/filepath" "time" "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/fileset" "github.com/databricks/cli/libs/git" "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/iam" + "golang.org/x/exp/slices" ) type SyncOptions struct { LocalPath string RemotePath string + Include []string + Exclude []string Full bool + InternalDir string SnapshotBasePath string PollInterval time.Duration @@ -32,7 +38,10 @@ type SyncOptions struct { type Sync struct { *SyncOptions - fileSet *git.FileSet + fileSet *git.FileSet + includeFileSet *fileset.GlobSet + excludeFileSet *fileset.GlobSet + snapshot *Snapshot filer filer.Filer @@ -52,6 +61,17 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, err } + includes := []string{} + if opts.InternalDir != "" { + includes = append(includes, filepath.Join(opts.InternalDir, "*.*")) + } + if opts.Include != nil { + includes = append(includes, opts.Include...) + } + + includeFileSet := fileset.NewGlobSet(opts.LocalPath, includes) + excludeFileSet := fileset.NewGlobSet(opts.LocalPath, opts.Exclude) + // Verify that the remote path we're about to synchronize to is valid and allowed. err = EnsureRemotePathIsUsable(ctx, opts.WorkspaceClient, opts.RemotePath, opts.CurrentUser) if err != nil { @@ -88,11 +108,13 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return &Sync{ SyncOptions: &opts, - fileSet: fileSet, - snapshot: snapshot, - filer: filer, - notifier: &NopNotifier{}, - seq: 0, + fileSet: fileSet, + includeFileSet: includeFileSet, + excludeFileSet: excludeFileSet, + snapshot: snapshot, + filer: filer, + notifier: &NopNotifier{}, + seq: 0, }, nil } @@ -132,15 +154,44 @@ func (s *Sync) notifyComplete(ctx context.Context, d diff) { } func (s *Sync) RunOnce(ctx context.Context) error { - // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement - // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 - all, err := s.fileSet.All() + all := make([]fileset.File, 0) + if s.SyncOptions.Include == nil { + // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement + // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 + gitFiles, err := s.fileSet.All() + if err != nil { + log.Errorf(ctx, "cannot list files: %s", err) + return err + } + all = append(all, gitFiles...) + } + + include, err := s.includeFileSet.All() if err != nil { - log.Errorf(ctx, "cannot list files: %s", err) + log.Errorf(ctx, "cannot list include files: %s", err) return err } - change, err := s.snapshot.diff(ctx, all) + all = append(all, include...) + + exclude, err := s.excludeFileSet.All() + if err != nil { + log.Errorf(ctx, "cannot list exclude files: %s", err) + return err + } + + files := make([]fileset.File, 0) + for _, f := range all { + if slices.ContainsFunc(exclude, func(a fileset.File) bool { + return a.Absolute == f.Absolute + }) { + continue + } + + files = append(files, f) + } + + change, err := s.snapshot.diff(ctx, files) if err != nil { return err }