Update sync defaults (#177)

By default the command runs an incremental, one-time sync, similar to the
behavior of rsync. The `--persist-snapshot` flag has been removed and the
command now always saves a synchronization snapshot.

* Add `--full` flag to force full synchronization
* Add `--watch` flag to run continuously and watch the local file system for changes

This builds on #176.
This commit is contained in:
Pieter Noordhuis 2023-01-24 15:06:59 +01:00 committed by GitHub
parent 077304ffa1
commit 03c863f49b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 104 additions and 110 deletions

View File

@ -15,7 +15,7 @@ import (
// syncCmd represents the sync command // syncCmd represents the sync command
var syncCmd = &cobra.Command{ var syncCmd = &cobra.Command{
Use: "sync", Use: "sync",
Short: "run syncs for the project", Short: "Synchronize a local directory to a workspace directory",
PreRunE: project.Configure, PreRunE: project.Configure,
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
@ -46,7 +46,7 @@ var syncCmd = &cobra.Command{
opts := sync.SyncOptions{ opts := sync.SyncOptions{
LocalPath: prj.Root(), LocalPath: prj.Root(),
RemotePath: *remotePath, RemotePath: *remotePath,
PersistSnapshot: *persistSnapshot, Full: *full,
SnapshotBasePath: cacheDir, SnapshotBasePath: cacheDir,
PollInterval: *interval, PollInterval: *interval,
WorkspaceClient: wsc, WorkspaceClient: wsc,
@ -57,7 +57,11 @@ var syncCmd = &cobra.Command{
return err return err
} }
return s.RunWatchdog(ctx) if *watch {
return s.RunContinuous(ctx)
}
return s.RunOnce(ctx)
}, },
} }
@ -66,11 +70,13 @@ var interval *time.Duration
var remotePath *string var remotePath *string
var persistSnapshot *bool var full *bool
var watch *bool
func init() { func init() {
root.RootCmd.AddCommand(syncCmd) root.RootCmd.AddCommand(syncCmd)
interval = syncCmd.Flags().Duration("interval", 1*time.Second, "project files polling interval") interval = syncCmd.Flags().Duration("interval", 1*time.Second, "file system polling interval (for --watch)")
remotePath = syncCmd.Flags().String("remote-path", "", "remote path to store repo in. eg: /Repos/me@example.com/test-repo") remotePath = syncCmd.Flags().String("remote-path", "", "remote path to store repo in. eg: /Repos/me@example.com/test-repo")
persistSnapshot = syncCmd.Flags().Bool("persist-snapshot", true, "whether to store local snapshots of sync state") full = syncCmd.Flags().Bool("full", false, "perform full synchronization (default is incremental)")
watch = syncCmd.Flags().Bool("watch", false, "watch local file system for changes")
} }

View File

@ -29,7 +29,7 @@ import (
// Please run using the deco env test or deco env shell // Please run using the deco env test or deco env shell
func setupRepo(t *testing.T, wsc *databricks.WorkspaceClient, ctx context.Context) (localRoot, remoteRoot string) { func setupRepo(t *testing.T, wsc *databricks.WorkspaceClient, ctx context.Context) (localRoot, remoteRoot string) {
me, err := wsc.CurrentUser.Me(ctx) me, err := wsc.CurrentUser.Me(ctx)
assert.NoError(t, err) require.NoError(t, err)
repoUrl := "https://github.com/shreyas-goenka/empty-repo.git" repoUrl := "https://github.com/shreyas-goenka/empty-repo.git"
repoPath := fmt.Sprintf("/Repos/%s/%s", me.UserName, RandomName("empty-repo-sync-integration-")) repoPath := fmt.Sprintf("/Repos/%s/%s", me.UserName, RandomName("empty-repo-sync-integration-"))
@ -38,7 +38,7 @@ func setupRepo(t *testing.T, wsc *databricks.WorkspaceClient, ctx context.Contex
Url: repoUrl, Url: repoUrl,
Provider: "gitHub", Provider: "gitHub",
}) })
assert.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
err := wsc.Repos.DeleteByRepoId(ctx, repoInfo.Id) err := wsc.Repos.DeleteByRepoId(ctx, repoInfo.Id)
@ -50,7 +50,7 @@ func setupRepo(t *testing.T, wsc *databricks.WorkspaceClient, ctx context.Contex
cmd := exec.Command("git", "clone", repoUrl) cmd := exec.Command("git", "clone", repoUrl)
cmd.Dir = tempDir cmd.Dir = tempDir
err = cmd.Run() err = cmd.Run()
assert.NoError(t, err) require.NoError(t, err)
localRoot = filepath.Join(tempDir, "empty-repo") localRoot = filepath.Join(tempDir, "empty-repo")
remoteRoot = repoPath remoteRoot = repoPath
@ -168,7 +168,7 @@ func TestAccFullFileSync(t *testing.T) {
// Run `bricks sync` in the background. // Run `bricks sync` in the background.
t.Setenv("BRICKS_ROOT", localRepoPath) t.Setenv("BRICKS_ROOT", localRepoPath)
c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=false") c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--full", "--watch")
c.RunBackground() c.RunBackground()
assertSync := assertSync{ assertSync := assertSync{
@ -214,7 +214,7 @@ func TestAccIncrementalFileSync(t *testing.T) {
// Run `bricks sync` in the background. // Run `bricks sync` in the background.
t.Setenv("BRICKS_ROOT", localRepoPath) t.Setenv("BRICKS_ROOT", localRepoPath)
c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch")
c.RunBackground() c.RunBackground()
assertSync := assertSync{ assertSync := assertSync{
@ -262,7 +262,7 @@ func TestAccNestedFolderSync(t *testing.T) {
// Run `bricks sync` in the background. // Run `bricks sync` in the background.
t.Setenv("BRICKS_ROOT", localRepoPath) t.Setenv("BRICKS_ROOT", localRepoPath)
c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch")
c.RunBackground() c.RunBackground()
assertSync := assertSync{ assertSync := assertSync{
@ -316,7 +316,7 @@ func TestAccIncrementalFileOverwritesFolder(t *testing.T) {
// Run `bricks sync` in the background. // Run `bricks sync` in the background.
t.Setenv("BRICKS_ROOT", localRepoPath) t.Setenv("BRICKS_ROOT", localRepoPath)
c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch")
c.RunBackground() c.RunBackground()
assertSync := assertSync{ assertSync := assertSync{
@ -367,7 +367,7 @@ func TestAccIncrementalSyncPythonNotebookToFile(t *testing.T) {
// Run `bricks sync` in the background. // Run `bricks sync` in the background.
t.Setenv("BRICKS_ROOT", localRepoPath) t.Setenv("BRICKS_ROOT", localRepoPath)
c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch")
c.RunBackground() c.RunBackground()
assertSync := assertSync{ assertSync := assertSync{
@ -406,7 +406,7 @@ func TestAccIncrementalSyncFileToPythonNotebook(t *testing.T) {
// Run `bricks sync` in the background. // Run `bricks sync` in the background.
t.Setenv("BRICKS_ROOT", localRepoPath) t.Setenv("BRICKS_ROOT", localRepoPath)
c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch")
c.RunBackground() c.RunBackground()
assertSync := assertSync{ assertSync := assertSync{
@ -451,7 +451,7 @@ func TestAccIncrementalSyncPythonNotebookDelete(t *testing.T) {
// Run `bricks sync` in the background. // Run `bricks sync` in the background.
t.Setenv("BRICKS_ROOT", localRepoPath) t.Setenv("BRICKS_ROOT", localRepoPath)
c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch")
c.RunBackground() c.RunBackground()
assertSync := assertSync{ assertSync := assertSync{

View File

@ -3,6 +3,8 @@ package sync
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"sync"
"time" "time"
"github.com/databricks/bricks/git" "github.com/databricks/bricks/git"
@ -14,7 +16,7 @@ type SyncOptions struct {
LocalPath string LocalPath string
RemotePath string RemotePath string
PersistSnapshot bool Full bool
SnapshotBasePath string SnapshotBasePath string
@ -29,6 +31,8 @@ type Sync struct {
*SyncOptions *SyncOptions
fileSet *git.FileSet fileSet *git.FileSet
snapshot *Snapshot
repoFiles *repofiles.RepoFiles
} }
// New initializes and returns a new [Sync] instance. // New initializes and returns a new [Sync] instance.
@ -52,16 +56,86 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
return nil, fmt.Errorf("failed to resolve host for snapshot") return nil, fmt.Errorf("failed to resolve host for snapshot")
} }
// For full sync, we start with an empty snapshot.
// For incremental sync, we try to load an existing snapshot to start from.
var snapshot *Snapshot
if opts.Full {
snapshot, err = newSnapshot(&opts)
if err != nil {
return nil, fmt.Errorf("unable to instantiate new sync snapshot: %w", err)
}
} else {
snapshot, err = loadOrNewSnapshot(&opts)
if err != nil {
return nil, fmt.Errorf("unable to load sync snapshot: %w", err)
}
}
repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient)
return &Sync{ return &Sync{
SyncOptions: &opts, SyncOptions: &opts,
fileSet: fileSet, fileSet: fileSet,
snapshot: snapshot,
repoFiles: repoFiles,
}, nil }, nil
} }
// RunWatchdog kicks off a polling loop to monitor local changes and synchronize func (s *Sync) RunOnce(ctx context.Context) error {
// them to the remote workspace path.
func (s *Sync) RunWatchdog(ctx context.Context) error {
repoFiles := repofiles.Create(s.RemotePath, s.LocalPath, s.WorkspaceClient) repoFiles := repofiles.Create(s.RemotePath, s.LocalPath, s.WorkspaceClient)
syncCallback := syncCallback(ctx, repoFiles) applyDiff := syncCallback(ctx, repoFiles)
return spawnWatchdog(ctx, syncCallback, s)
// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement
// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
all, err := s.fileSet.All()
if err != nil {
log.Printf("[ERROR] cannot list files: %s", err)
return err
}
change, err := s.snapshot.diff(all)
if err != nil {
return err
}
if change.IsEmpty() {
return nil
}
log.Printf("[INFO] Action: %v", change)
err = applyDiff(change)
if err != nil {
return err
}
err = s.snapshot.Save(ctx)
if err != nil {
log.Printf("[ERROR] cannot store snapshot: %s", err)
return err
}
return nil
}
func (s *Sync) RunContinuous(ctx context.Context) error {
var once sync.Once
ticker := time.NewTicker(s.PollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
err := s.RunOnce(ctx)
if err != nil {
return err
}
once.Do(func() {
log.Printf("[INFO] Initial Sync Complete")
})
}
}
} }

View File

@ -3,22 +3,11 @@ package sync
import ( import (
"context" "context"
"log" "log"
"sync"
"time"
"github.com/databricks/bricks/libs/sync/repofiles" "github.com/databricks/bricks/libs/sync/repofiles"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
// TODO: add .databricks to .gitignore on bricks init
type watchdog struct {
ticker *time.Ticker
wg sync.WaitGroup
failure error // data race? make channel?
sync *Sync
}
// See https://docs.databricks.com/resources/limits.html#limits-api-rate-limits for per api // See https://docs.databricks.com/resources/limits.html#limits-api-rate-limits for per api
// rate limits // rate limits
const MaxRequestsInFlight = 20 const MaxRequestsInFlight = 20
@ -67,78 +56,3 @@ func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(loca
return nil return nil
} }
} }
func spawnWatchdog(ctx context.Context,
applyDiff func(diff) error,
sync *Sync) error {
w := &watchdog{
ticker: time.NewTicker(sync.PollInterval),
sync: sync,
}
w.wg.Add(1)
go w.main(ctx, applyDiff, sync.RemotePath)
w.wg.Wait()
return w.failure
}
// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement
// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error, remotePath string) {
defer w.wg.Done()
snapshot, err := newSnapshot(w.sync.SyncOptions)
if err != nil {
log.Printf("[ERROR] cannot create snapshot: %s", err)
w.failure = err
return
}
if w.sync.PersistSnapshot {
snapshot, err = loadOrNewSnapshot(w.sync.SyncOptions)
if err != nil {
log.Printf("[ERROR] cannot load snapshot: %s", err)
w.failure = err
return
}
}
var onlyOnceInitLog sync.Once
for {
select {
case <-ctx.Done():
return
case <-w.ticker.C:
all, err := w.sync.fileSet.All()
if err != nil {
log.Printf("[ERROR] cannot list files: %s", err)
w.failure = err
return
}
change, err := snapshot.diff(all)
if err != nil {
w.failure = err
return
}
if change.IsEmpty() {
onlyOnceInitLog.Do(func() {
log.Printf("[INFO] Initial Sync Complete")
})
continue
}
log.Printf("[INFO] Action: %v", change)
err = applyDiff(change)
if err != nil {
w.failure = err
return
}
if w.sync.PersistSnapshot {
err = snapshot.Save(ctx)
if err != nil {
log.Printf("[ERROR] cannot store snapshot: %s", err)
w.failure = err
return
}
}
onlyOnceInitLog.Do(func() {
log.Printf("[INFO] Initial Sync Complete")
})
}
}
}