[DECO-79][DECO-165] Incremental sync with support for multiple profiles (#82)

This PR does multiple things, which are:

1. Creates .databricks dir according to outcomes concluded in "bricks
configuration principles"
2. Puts the sync snapshots into a file whose names is tagged with
md5(concat(host, remote-path))
3. Saves both host and username in the bricks snapshot for debuggability

Tested manually:


https://user-images.githubusercontent.com/88374338/195672267-9dd90230-570f-49b7-847f-05a5a6fd8986.mov
This commit is contained in:
shreyas-goenka 2022-10-19 16:22:55 +02:00 committed by GitHub
parent 38a9dabcbe
commit 18dae73505
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 400 additions and 50 deletions

View File

@ -1,6 +1,7 @@
package sync
import (
"context"
"encoding/json"
"fmt"
"io"
@ -9,30 +10,95 @@ import (
"strings"
"time"
"crypto/md5"
"encoding/hex"
"github.com/databricks/bricks/git"
"github.com/databricks/bricks/project"
)
type snapshot map[string]time.Time
// A snapshot is a persistant store of knowledge bricks cli has about state of files
// in the remote repo. We use the last modified times (mtime) of files to determine
// whether a files need to be updated in the remote repo.
//
// 1. Any stale files in the remote repo are updated. That is if the last modified
// time recorded in the snapshot is less than the actual last modified time of the file
//
// 2. Any files present in snapshot but absent locally are deleted from remote path
//
// Changing either the databricks workspace (ie Host) or the remote path (ie RemotePath)
// local files are being synced to will make bricks cli switch to a different
// snapshot for persisting/loading sync state
type Snapshot struct {
// hostname of the workspace this snapshot is for
Host string `json:"host"`
// Path in workspace for project repo
RemotePath string `json:"remote_path"`
// Map of all files present in the remote repo with the:
// key: relative file path from project root
// value: last time the remote instance of this file was updated
LastUpdatedTimes map[string]time.Time `json:"last_modified_times"`
}
type diff struct {
put []string
delete []string
}
const SyncSnapshotFile = "repo_snapshot.json"
const BricksDir = ".bricks"
const syncSnapshotDirName = "sync-snapshots"
func (s *snapshot) storeSnapshot(root string) error {
// create snapshot file
configDir := filepath.Join(root, BricksDir)
if _, err := os.Stat(configDir); os.IsNotExist(err) {
err = os.Mkdir(configDir, os.ModeDir|os.ModePerm)
func GetFileName(host, remotePath string) string {
hash := md5.Sum([]byte(host + remotePath))
hashString := hex.EncodeToString(hash[:])
return hashString[:16] + ".json"
}
// Compute path of the snapshot file on the local machine
// The file name for unique for a tuple of (host, remotePath)
// precisely it's the first 16 characters of md5(concat(host, remotePath))
func (s *Snapshot) getPath(ctx context.Context) (string, error) {
prj := project.Get(ctx)
cacheDir, err := prj.CacheDir()
if err != nil {
return "", err
}
snapshotDir := filepath.Join(cacheDir, syncSnapshotDirName)
if _, err := os.Stat(snapshotDir); os.IsNotExist(err) {
err = os.Mkdir(snapshotDir, os.ModeDir|os.ModePerm)
if err != nil {
return fmt.Errorf("failed to create config directory: %s", err)
return "", fmt.Errorf("failed to create config directory: %s", err)
}
}
persistedSnapshotPath := filepath.Join(configDir, SyncSnapshotFile)
f, err := os.OpenFile(persistedSnapshotPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0755)
fileName := GetFileName(s.Host, s.RemotePath)
return filepath.Join(snapshotDir, fileName), nil
}
func newSnapshot(ctx context.Context, remotePath string) (*Snapshot, error) {
prj := project.Get(ctx)
// Get host this snapshot is for
wsc := prj.WorkspacesClient()
// TODO: The host may be late-initialized in certain Azure setups where we
// specify the workspace by its resource ID. tracked in: https://databricks.atlassian.net/browse/DECO-194
host := wsc.Config.Host
if host == "" {
return nil, fmt.Errorf("failed to resolve host for snapshot")
}
return &Snapshot{
Host: host,
RemotePath: remotePath,
LastUpdatedTimes: make(map[string]time.Time),
}, nil
}
func (s *Snapshot) storeSnapshot(ctx context.Context) error {
snapshotPath, err := s.getPath(ctx)
if err != nil {
return err
}
f, err := os.OpenFile(snapshotPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to create/open persisted sync snapshot file: %s", err)
}
@ -50,13 +116,17 @@ func (s *snapshot) storeSnapshot(root string) error {
return nil
}
func (s *snapshot) loadSnapshot(root string) error {
persistedSnapshotPath := filepath.Join(root, BricksDir, SyncSnapshotFile)
if _, err := os.Stat(persistedSnapshotPath); os.IsNotExist(err) {
func (s *Snapshot) loadSnapshot(ctx context.Context) error {
snapshotPath, err := s.getPath(ctx)
if err != nil {
return err
}
// Snapshot file not found. We do not load anything
if _, err := os.Stat(snapshotPath); os.IsNotExist(err) {
return nil
}
f, err := os.Open(persistedSnapshotPath)
f, err := os.Open(snapshotPath)
if err != nil {
return fmt.Errorf("failed to open persisted sync snapshot file: %s", err)
}
@ -64,10 +134,9 @@ func (s *snapshot) loadSnapshot(root string) error {
bytes, err := io.ReadAll(f)
if err != nil {
// clean up these error messages a bit
return fmt.Errorf("failed to read sync snapshot from disk: %s", err)
}
err = json.Unmarshal(bytes, s)
err = json.Unmarshal(bytes, &s)
if err != nil {
return fmt.Errorf("failed to json unmarshal persisted snapshot: %s", err)
}
@ -92,22 +161,23 @@ func (d diff) String() string {
return strings.Join(changes, ", ")
}
func (s snapshot) diff(all []git.File) (change diff) {
func (s Snapshot) diff(all []git.File) (change diff) {
currentFilenames := map[string]bool{}
lastModifiedTimes := s.LastUpdatedTimes
for _, f := range all {
// create set of current files to figure out if removals are needed
currentFilenames[f.Relative] = true
// get current modified timestamp
modified := f.Modified()
lastSeenModified, seen := s[f.Relative]
lastSeenModified, seen := lastModifiedTimes[f.Relative]
if !seen || modified.After(lastSeenModified) {
change.put = append(change.put, f.Relative)
s[f.Relative] = modified
lastModifiedTimes[f.Relative] = modified
}
}
// figure out files in the snapshot, but not on local filesystem
for relative := range s {
for relative := range lastModifiedTimes {
_, exists := currentFilenames[relative]
if exists {
continue
@ -115,11 +185,11 @@ func (s snapshot) diff(all []git.File) (change diff) {
// add them to a delete batch
change.delete = append(change.delete, relative)
// remove the file from snapshot
delete(s, relative)
delete(lastModifiedTimes, relative)
}
// and remove them from the snapshot
for _, v := range change.delete {
delete(s, v)
delete(lastModifiedTimes, v)
}
return
}

View File

@ -25,7 +25,9 @@ func TestDiff(t *testing.T) {
fileSet := git.NewFileSet(projectDir)
files, err := fileSet.All()
assert.NoError(t, err)
state := snapshot{}
state := Snapshot{
LastUpdatedTimes: make(map[string]time.Time),
}
change := state.diff(files)
// New files are added to put

View File

@ -44,12 +44,8 @@ var syncCmd = &cobra.Command{
}
root := prj.Root()
fileSet := git.NewFileSet(root)
if err != nil {
return err
}
syncCallback := getRemoteSyncCallback(ctx, root, *remotePath, wsc)
err = spawnSyncRoutine(ctx, fileSet, *interval, syncCallback)
err = spawnSyncRoutine(ctx, *interval, syncCallback, *remotePath)
return err
},
}

View File

@ -12,7 +12,6 @@ import (
"sync"
"time"
"github.com/databricks/bricks/git"
"github.com/databricks/bricks/project"
"github.com/databricks/databricks-sdk-go/databricks/client"
"github.com/databricks/databricks-sdk-go/service/workspace"
@ -20,8 +19,8 @@ import (
"golang.org/x/sync/errgroup"
)
// TODO: add .databricks to .gitignore on bricks init
type watchdog struct {
files git.FileSet
ticker *time.Ticker
wg sync.WaitGroup
failure error // data race? make channel?
@ -104,46 +103,49 @@ func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, wsc *wor
}
func spawnSyncRoutine(ctx context.Context,
files git.FileSet,
interval time.Duration,
applyDiff func(diff) error) error {
applyDiff func(diff) error,
remotePath string) error {
w := &watchdog{
files: files,
ticker: time.NewTicker(interval),
}
w.wg.Add(1)
go w.main(ctx, applyDiff)
go w.main(ctx, applyDiff, remotePath)
w.wg.Wait()
return w.failure
}
// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement
// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error) {
func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error, remotePath string) {
defer w.wg.Done()
// load from json or sync it every time there's an action
state := snapshot{}
root := w.files.Root()
snapshot, err := newSnapshot(ctx, remotePath)
if err != nil {
log.Printf("[ERROR] cannot create snapshot: %s", err)
w.failure = err
return
}
if *persistSnapshot {
err := state.loadSnapshot(root)
err := snapshot.loadSnapshot(ctx)
if err != nil {
log.Printf("[ERROR] cannot load snapshot: %s", err)
w.failure = err
return
}
}
prj := project.Get(ctx)
for {
select {
case <-ctx.Done():
return
case <-w.ticker.C:
all, err := w.files.All()
all, err := prj.GetFileSet().All()
if err != nil {
log.Printf("[ERROR] cannot list files: %s", err)
w.failure = err
return
}
change := state.diff(all)
change := snapshot.diff(all)
if change.IsEmpty() {
continue
}
@ -154,7 +156,7 @@ func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error) {
return
}
if *persistSnapshot {
err = state.storeSnapshot(root)
err = snapshot.storeSnapshot(ctx)
if err != nil {
log.Printf("[ERROR] cannot store snapshot: %s", err)
w.failure = err

View File

@ -71,6 +71,10 @@ func (w *FileSet) All() ([]File, error) {
return w.RecursiveListFiles(w.root)
}
func (w *FileSet) IsGitIgnored(pattern string) bool {
return w.ignore.MatchesPath(pattern)
}
// Recursively traverses dir in a depth first manner and returns a list of all files
// that are being tracked in the FileSet (ie not being ignored for matching one of the
// patterns in w.ignore)

View File

@ -3,7 +3,9 @@ package internal
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
@ -11,6 +13,7 @@ import (
"testing"
"time"
"github.com/databricks/bricks/cmd/sync"
"github.com/databricks/bricks/folders"
"github.com/databricks/databricks-sdk-go/service/repos"
"github.com/databricks/databricks-sdk-go/service/workspace"
@ -18,9 +21,6 @@ import (
"github.com/stretchr/testify/assert"
)
// TODO: Write an integration tests for incremental bricks sync once its complete
// with support for different profiles (go/jira/DECO-118)
// This test needs auth env vars to run.
// Please run using the deco env test or deco env shell
func TestAccFullSync(t *testing.T) {
@ -72,7 +72,7 @@ func TestAccFullSync(t *testing.T) {
defer f.Close()
// start bricks sync process
cmd = exec.Command("go", "run", "main.go", "sync", "--remote-path", repoPath, "--persist-snapshot", "false")
cmd = exec.Command("go", "run", "main.go", "sync", "--remote-path", repoPath, "--persist-snapshot=false")
var cmdOut, cmdErr bytes.Buffer
cmd.Stdout = &cmdOut
@ -89,7 +89,8 @@ func TestAccFullSync(t *testing.T) {
// terminate the bricks sync process
cmd.Process.Kill()
// Print the stdout and stderr logs from the bricks sync process
t.Log("[INFO] bricks sync logs: ")
t.Log("\n\n\n\n\n\n")
t.Logf("bricks sync logs for command: %s", cmd.String())
if err != nil {
t.Logf("error in bricks sync process: %s\n", err)
}
@ -167,3 +168,181 @@ func TestAccFullSync(t *testing.T) {
assert.Contains(t, files3, ".gitkeep")
assert.Contains(t, files3, "world.txt")
}
func assertSnapshotContents(t *testing.T, host, repoPath, projectDir string, listOfSyncedFiles []string) {
snapshotPath := filepath.Join(projectDir, ".databricks/sync-snapshots", sync.GetFileName(host, repoPath))
assert.FileExists(t, snapshotPath)
var s *sync.Snapshot
f, err := os.Open(snapshotPath)
assert.NoError(t, err)
defer f.Close()
bytes, err := io.ReadAll(f)
assert.NoError(t, err)
err = json.Unmarshal(bytes, &s)
assert.NoError(t, err)
assert.Equal(t, s.Host, host)
assert.Equal(t, s.RemotePath, repoPath)
for _, filePath := range listOfSyncedFiles {
_, ok := s.LastUpdatedTimes[filePath]
assert.True(t, ok, fmt.Sprintf("%s not in snapshot file: %v", filePath, s.LastUpdatedTimes))
}
assert.Equal(t, len(listOfSyncedFiles), len(s.LastUpdatedTimes))
}
func TestAccIncrementalSync(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
// We assume cwd is in the bricks repo
wd, err := os.Getwd()
if err != nil {
t.Log("[WARN] error fetching current working dir: ", err)
}
t.Log("test run dir: ", wd)
bricksRepo, err := folders.FindDirWithLeaf(wd, ".git")
if err != nil {
t.Log("[ERROR] error finding git repo root in : ", wd)
}
t.Log("bricks repo location: : ", bricksRepo)
assert.Equal(t, "bricks", filepath.Base(bricksRepo))
wsc := workspaces.New()
ctx := context.Background()
me, err := wsc.CurrentUser.Me(ctx)
assert.NoError(t, err)
repoUrl := "https://github.com/shreyas-goenka/empty-repo.git"
repoPath := fmt.Sprintf("/Repos/%s/%s", me.UserName, RandomName("empty-repo-sync-integration-"))
repoInfo, err := wsc.Repos.Create(ctx, repos.CreateRepo{
Path: repoPath,
Url: repoUrl,
Provider: "gitHub",
})
assert.NoError(t, err)
t.Cleanup(func() {
err := wsc.Repos.DeleteByRepoId(ctx, repoInfo.Id)
assert.NoError(t, err)
})
// clone public empty remote repo
tempDir := t.TempDir()
cmd := exec.Command("git", "clone", repoUrl)
cmd.Dir = tempDir
err = cmd.Run()
assert.NoError(t, err)
projectDir := filepath.Join(tempDir, "empty-repo")
// Add .databricks to .gitignore
content := []byte("/.databricks/")
f2, err := os.Create(filepath.Join(projectDir, ".gitignore"))
assert.NoError(t, err)
defer f2.Close()
_, err = f2.Write(content)
assert.NoError(t, err)
// start bricks sync process
cmd = exec.Command("go", "run", "main.go", "sync", "--remote-path", repoPath, "--persist-snapshot=true")
var cmdOut, cmdErr bytes.Buffer
cmd.Stdout = &cmdOut
cmd.Stderr = &cmdErr
cmd.Dir = bricksRepo
// bricks sync command will inherit the env vars from process
t.Setenv("BRICKS_ROOT", projectDir)
err = cmd.Start()
assert.NoError(t, err)
t.Cleanup(func() {
// We wait three seconds to allow the bricks sync process flush its
// stdout buffer
time.Sleep(3 * time.Second)
// terminate the bricks sync process
cmd.Process.Kill()
// Print the stdout and stderr logs from the bricks sync process
// TODO: modify logs to suit multiple sync processes
t.Log("\n\n\n\n\n\n")
t.Logf("bricks sync logs for command: %s", cmd.String())
if err != nil {
t.Logf("error in bricks sync process: %s\n", err)
}
for _, line := range strings.Split(strings.TrimSuffix(cmdOut.String(), "\n"), "\n") {
t.Log("[bricks sync stdout]", line)
}
for _, line := range strings.Split(strings.TrimSuffix(cmdErr.String(), "\n"), "\n") {
t.Log("[bricks sync stderr]", line)
}
})
// First upload assertion
assert.Eventually(t, func() bool {
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{
Path: repoPath,
})
assert.NoError(t, err)
return len(repoContent.Objects) == 2
}, 30*time.Second, 5*time.Second)
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{
Path: repoPath,
})
assert.NoError(t, err)
var files1 []string
for _, v := range repoContent.Objects {
files1 = append(files1, filepath.Base(v.Path))
}
assert.Len(t, files1, 2)
assert.Contains(t, files1, ".gitignore")
assert.Contains(t, files1, ".gitkeep")
assertSnapshotContents(t, wsc.Config.Host, repoPath, projectDir, []string{".gitkeep", ".gitignore"})
// Create amsterdam.txt file
f, err := os.Create(filepath.Join(projectDir, "amsterdam.txt"))
assert.NoError(t, err)
defer f.Close()
// new file upload assertion
assert.Eventually(t, func() bool {
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{
Path: repoPath,
})
assert.NoError(t, err)
return len(repoContent.Objects) == 3
}, 30*time.Second, 5*time.Second)
repoContent, err = wsc.Workspace.List(ctx, workspace.ListRequest{
Path: repoPath,
})
assert.NoError(t, err)
var files2 []string
for _, v := range repoContent.Objects {
files2 = append(files2, filepath.Base(v.Path))
}
assert.Len(t, files2, 3)
assert.Contains(t, files2, "amsterdam.txt")
assert.Contains(t, files2, ".gitkeep")
assert.Contains(t, files2, ".gitignore")
assertSnapshotContents(t, wsc.Config.Host, repoPath, projectDir, []string{"amsterdam.txt", ".gitkeep", ".gitignore"})
// delete a file and assert
os.Remove(filepath.Join(projectDir, ".gitkeep"))
assert.Eventually(t, func() bool {
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{
Path: repoPath,
})
assert.NoError(t, err)
return len(repoContent.Objects) == 2
}, 30*time.Second, 5*time.Second)
repoContent, err = wsc.Workspace.List(ctx, workspace.ListRequest{
Path: repoPath,
})
assert.NoError(t, err)
var files3 []string
for _, v := range repoContent.Objects {
files3 = append(files3, filepath.Base(v.Path))
}
assert.Len(t, files3, 2)
assert.Contains(t, files3, "amsterdam.txt")
assert.Contains(t, files3, ".gitignore")
assertSnapshotContents(t, wsc.Config.Host, repoPath, projectDir, []string{"amsterdam.txt", ".gitignore"})
}

View File

@ -31,8 +31,7 @@ type Assertions struct {
}
type Config struct {
Name string `json:"name"` // or do default from folder name?..
Profile string `json:"profile,omitempty"` // rename?
Name string `json:"name"` // or do default from folder name?..
Isolation Isolation `json:"isolation,omitempty"`
// development-time vs deployment-time resources

View File

@ -3,8 +3,11 @@ package project
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/databricks/bricks/git"
"github.com/databricks/databricks-sdk-go/databricks"
"github.com/databricks/databricks-sdk-go/service/clusters"
"github.com/databricks/databricks-sdk-go/service/commands"
@ -13,6 +16,8 @@ import (
"github.com/spf13/cobra"
)
const CacheDirName = ".databricks"
type project struct {
mu sync.Mutex
@ -23,6 +28,7 @@ type project struct {
environment *Environment
wsc *workspaces.WorkspacesClient
me *scim.User
fileSet *git.FileSet
}
// Configure is used as a PreRunE function for all commands that
@ -61,12 +67,15 @@ func Initialize(ctx context.Context, root, env string) (context.Context, error)
return nil, fmt.Errorf("environment [%s] not defined", env)
}
fileSet := git.NewFileSet(root)
p := project{
root: root,
env: env,
config: &config,
environment: &environment,
fileSet: &fileSet,
}
p.initializeWorkspacesClient(ctx)
@ -105,6 +114,34 @@ func (p *project) Root() string {
return p.root
}
func (p *project) GetFileSet() *git.FileSet {
return p.fileSet
}
// This cache dir will contain any state, state overrides (per user overrides
// to the project config) or any generated artifacts (eg: sync snapshots)
// that should never be checked into Git.
//
// We enfore that cache dir (.databricks) is added to .gitignore
// because it contains per-user overrides that we do not want users to
// accidentally check into git
func (p *project) CacheDir() (string, error) {
// assert cache dir is present in git ignore
if !p.fileSet.IsGitIgnored(fmt.Sprintf("/%s/", CacheDirName)) {
return "", fmt.Errorf("please add /%s/ to .gitignore", CacheDirName)
}
cacheDirPath := filepath.Join(p.root, CacheDirName)
// create cache dir if it does not exist
if _, err := os.Stat(cacheDirPath); os.IsNotExist(err) {
err = os.Mkdir(cacheDirPath, os.ModeDir|os.ModePerm)
if err != nil {
return "", fmt.Errorf("failed to create cache directory %s with error: %s", cacheDirPath, err)
}
}
return cacheDirPath, nil
}
func (p *project) Config() Config {
return *p.config
}

View File

@ -2,6 +2,8 @@ package project
import (
"context"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
@ -13,3 +15,62 @@ func TestProjectInitialize(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, Get(ctx).config.Name, "dev")
}
func TestProjectCacheDirErrorsIfNoGitIgnoreFile(t *testing.T) {
// create project root with databricks.yml
projectDir := t.TempDir()
f1, err := os.Create(filepath.Join(projectDir, "databricks.yml"))
assert.NoError(t, err)
defer f1.Close()
ctx, err := Initialize(context.Background(), projectDir, DefaultEnvironment)
assert.NoError(t, err)
prj := Get(ctx)
_, err = prj.CacheDir()
assert.Error(t, err, "please add /.databricks/ to .gitignore")
}
func TestProjectCacheDirErrorsIfGitIgnoreEntryAbsent(t *testing.T) {
// create project root with databricks.yml
projectDir := t.TempDir()
f1, err := os.Create(filepath.Join(projectDir, "databricks.yml"))
assert.NoError(t, err)
defer f1.Close()
// create empty .gitignore
f2, err := os.Create(filepath.Join(projectDir, ".gitignore"))
assert.NoError(t, err)
defer f2.Close()
ctx, err := Initialize(context.Background(), projectDir, DefaultEnvironment)
assert.NoError(t, err)
prj := Get(ctx)
_, err = prj.CacheDir()
assert.Error(t, err, "please add /.databricks/ to .gitignore")
}
func TestProjectCacheDir(t *testing.T) {
// create project root with databricks.yml
projectDir := t.TempDir()
f1, err := os.Create(filepath.Join(projectDir, "databricks.yml"))
assert.NoError(t, err)
defer f1.Close()
// create .gitignore with the .databricks dir in it
f2, err := os.Create(filepath.Join(projectDir, ".gitignore"))
assert.NoError(t, err)
defer f2.Close()
content := []byte("/.databricks/")
_, err = f2.Write(content)
assert.NoError(t, err)
ctx, err := Initialize(context.Background(), projectDir, DefaultEnvironment)
assert.NoError(t, err)
prj := Get(ctx)
cacheDir, err := prj.CacheDir()
assert.NoError(t, err)
assert.Equal(t, filepath.Join(projectDir, ".databricks"), cacheDir)
}