Execute file synchronization on deploy (#211)

1. Perform file synchronization on deploy
2. Update notebook file path translation logic to point to the
synchronization target rather than treating the notebook as an artifact
and uploading it separately.
This commit is contained in:
Pieter Noordhuis 2023-02-20 19:42:55 +01:00 committed by GitHub
parent 414ea4f891
commit a0ed02281d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 85 additions and 38 deletions

View File

@ -6,17 +6,17 @@ import (
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"regexp" "strings"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config" "github.com/databricks/bricks/libs/notebook"
) )
type translateNotebookPaths struct { type translateNotebookPaths struct {
seen map[string]string seen map[string]string
} }
// TranslateNotebookPaths converts paths to local notebook files into references to artifacts. // TranslateNotebookPaths converts paths to local notebook files into paths in the workspace file system.
func TranslateNotebookPaths() bundle.Mutator { func TranslateNotebookPaths() bundle.Mutator {
return &translateNotebookPaths{} return &translateNotebookPaths{}
} }
@ -25,43 +25,40 @@ func (m *translateNotebookPaths) Name() string {
return "TranslateNotebookPaths" return "TranslateNotebookPaths"
} }
var nonWord = regexp.MustCompile(`[^\w]`) func (m *translateNotebookPaths) rewritePath(b *bundle.Bundle, p *string) error {
func (m *translateNotebookPaths) rewritePath(b *bundle.Bundle, p *string) {
relPath := path.Clean(*p) relPath := path.Clean(*p)
if interp, ok := m.seen[relPath]; ok {
*p = interp
return nil
}
absPath := filepath.Join(b.Config.Path, relPath) absPath := filepath.Join(b.Config.Path, relPath)
nb, _, err := notebook.Detect(absPath)
// This is opportunistic. If we can't stat, continue.
_, err := os.Stat(absPath)
if err != nil { if err != nil {
return // Ignore if this file doesn't exist. Maybe it's an absolute workspace path?
if os.IsNotExist(err) {
return nil
}
return fmt.Errorf("unable to determine if %s is a notebook: %w", relPath, err)
} }
// Define artifact for this notebook. if !nb {
id := nonWord.ReplaceAllString(relPath, "_") return fmt.Errorf("file at %s is not a notebook", relPath)
if v, ok := m.seen[id]; ok {
*p = v
return
} }
b.Config.Artifacts[id] = &config.Artifact{ // Upon import, notebooks are stripped of their extension.
Notebook: &config.NotebookArtifact{ withoutExt := strings.TrimSuffix(relPath, filepath.Ext(relPath))
Path: relPath,
},
}
interp := fmt.Sprintf("${artifacts.%s.notebook.remote_path}", id) // We have a notebook on our hands! It will be available under the file path.
interp := fmt.Sprintf("${workspace.file_path.workspace}/%s", withoutExt)
*p = interp *p = interp
m.seen[id] = interp m.seen[relPath] = interp
return nil
} }
func (m *translateNotebookPaths) Apply(_ context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) { func (m *translateNotebookPaths) Apply(_ context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
m.seen = make(map[string]string) m.seen = make(map[string]string)
if b.Config.Artifacts == nil {
b.Config.Artifacts = make(map[string]*config.Artifact)
}
for _, job := range b.Config.Resources.Jobs { for _, job := range b.Config.Resources.Jobs {
for i := 0; i < len(job.Tasks); i++ { for i := 0; i < len(job.Tasks); i++ {
task := &job.Tasks[i] task := &job.Tasks[i]
@ -69,7 +66,10 @@ func (m *translateNotebookPaths) Apply(_ context.Context, b *bundle.Bundle) ([]b
continue continue
} }
m.rewritePath(b, &task.NotebookTask.NotebookPath) err := m.rewritePath(b, &task.NotebookTask.NotebookPath)
if err != nil {
return nil, err
}
} }
} }
@ -80,7 +80,10 @@ func (m *translateNotebookPaths) Apply(_ context.Context, b *bundle.Bundle) ([]b
continue continue
} }
m.rewritePath(b, &library.Notebook.Path) err := m.rewritePath(b, &library.Notebook.Path)
if err != nil {
return nil, err
}
} }
} }

View File

@ -19,6 +19,7 @@ import (
func touchFile(t *testing.T, path string) { func touchFile(t *testing.T, path string) {
f, err := os.Create(path) f, err := os.Create(path)
require.NoError(t, err) require.NoError(t, err)
f.WriteString("# Databricks notebook source\n")
f.Close() f.Close()
} }
@ -92,16 +93,10 @@ func TestNotebookPaths(t *testing.T) {
_, err := mutator.TranslateNotebookPaths().Apply(context.Background(), bundle) _, err := mutator.TranslateNotebookPaths().Apply(context.Background(), bundle)
require.NoError(t, err) require.NoError(t, err)
// Assert that the notebook artifact was defined.
assert.Len(t, bundle.Config.Artifacts, 2)
for _, artifact := range bundle.Config.Artifacts {
assert.Contains(t, artifact.Notebook.Path, "notebook.py")
}
// Assert that the path in the tasks now refer to the artifact. // Assert that the path in the tasks now refer to the artifact.
assert.Equal( assert.Equal(
t, t,
"${artifacts.my_job_notebook_py.notebook.remote_path}", "${workspace.file_path.workspace}/my_job_notebook",
bundle.Config.Resources.Jobs["job"].Tasks[0].NotebookTask.NotebookPath, bundle.Config.Resources.Jobs["job"].Tasks[0].NotebookTask.NotebookPath,
) )
assert.Equal( assert.Equal(
@ -111,14 +106,14 @@ func TestNotebookPaths(t *testing.T) {
) )
assert.Equal( assert.Equal(
t, t,
"${artifacts.my_job_notebook_py.notebook.remote_path}", "${workspace.file_path.workspace}/my_job_notebook",
bundle.Config.Resources.Jobs["job"].Tasks[2].NotebookTask.NotebookPath, bundle.Config.Resources.Jobs["job"].Tasks[2].NotebookTask.NotebookPath,
) )
// Assert that the path in the libraries now refer to the artifact. // Assert that the path in the libraries now refer to the artifact.
assert.Equal( assert.Equal(
t, t,
"${artifacts.my_pipeline_notebook_py.notebook.remote_path}", "${workspace.file_path.workspace}/my_pipeline_notebook",
bundle.Config.Resources.Pipelines["pipeline"].Libraries[0].Notebook.Path, bundle.Config.Resources.Pipelines["pipeline"].Libraries[0].Notebook.Path,
) )
assert.Equal( assert.Equal(
@ -128,7 +123,7 @@ func TestNotebookPaths(t *testing.T) {
) )
assert.Equal( assert.Equal(
t, t,
"${artifacts.my_pipeline_notebook_py.notebook.remote_path}", "${workspace.file_path.workspace}/my_pipeline_notebook",
bundle.Config.Resources.Pipelines["pipeline"].Libraries[2].Notebook.Path, bundle.Config.Resources.Pipelines["pipeline"].Libraries[2].Notebook.Path,
) )
} }

View File

@ -0,0 +1,47 @@
package files
import (
"context"
"fmt"
"github.com/databricks/bricks/bundle"
sync "github.com/databricks/bricks/libs/sync"
)
type upload struct{}
func (m *upload) Name() string {
return "files.Upload"
}
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
cacheDir, err := b.CacheDir()
if err != nil {
return nil, fmt.Errorf("cannot get bundle cache directory: %w", err)
}
opts := sync.SyncOptions{
LocalPath: b.Config.Path,
RemotePath: b.Config.Workspace.FilePath.Workspace,
Full: false,
SnapshotBasePath: cacheDir,
WorkspaceClient: b.WorkspaceClient(),
}
sync, err := sync.New(ctx, opts)
if err != nil {
return nil, err
}
err = sync.RunOnce(ctx)
if err != nil {
return nil, err
}
return nil, nil
}
func Upload() bundle.Mutator {
return &upload{}
}

View File

@ -3,6 +3,7 @@ package phases
import ( import (
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/artifacts" "github.com/databricks/bricks/bundle/artifacts"
"github.com/databricks/bricks/bundle/deploy/files"
"github.com/databricks/bricks/bundle/deploy/terraform" "github.com/databricks/bricks/bundle/deploy/terraform"
) )
@ -11,6 +12,7 @@ func Deploy() bundle.Mutator {
return newPhase( return newPhase(
"deploy", "deploy",
[]bundle.Mutator{ []bundle.Mutator{
files.Upload(),
artifacts.UploadAll(), artifacts.UploadAll(),
terraform.Interpolate(), terraform.Interpolate(),
terraform.Write(), terraform.Write(),