From 03c863f49bb7f57d350893c33326d5b6dcc2e6e6 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 24 Jan 2023 15:06:59 +0100 Subject: [PATCH] Update sync defaults (#177) By default the command runs an incremental, one-time sync, similar to the behavior of rsync. The `--persist-snapshot` flag has been removed and the command now always saves a synchronization snapshot. * Add `--full` flag to force full synchronization * Add `--watch` flag to run continuously and watch the local file system for changes This builds on #176. --- cmd/sync/sync.go | 18 ++++++--- internal/sync_test.go | 20 +++++----- libs/sync/sync.go | 90 +++++++++++++++++++++++++++++++++++++++---- libs/sync/watchdog.go | 86 ----------------------------------------- 4 files changed, 104 insertions(+), 110 deletions(-) diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index ea268be1..3872f101 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -15,7 +15,7 @@ import ( // syncCmd represents the sync command var syncCmd = &cobra.Command{ Use: "sync", - Short: "run syncs for the project", + Short: "Synchronize a local directory to a workspace directory", PreRunE: project.Configure, RunE: func(cmd *cobra.Command, args []string) error { @@ -46,7 +46,7 @@ var syncCmd = &cobra.Command{ opts := sync.SyncOptions{ LocalPath: prj.Root(), RemotePath: *remotePath, - PersistSnapshot: *persistSnapshot, + Full: *full, SnapshotBasePath: cacheDir, PollInterval: *interval, WorkspaceClient: wsc, @@ -57,7 +57,11 @@ var syncCmd = &cobra.Command{ return err } - return s.RunWatchdog(ctx) + if *watch { + return s.RunContinuous(ctx) + } + + return s.RunOnce(ctx) }, } @@ -66,11 +70,13 @@ var interval *time.Duration var remotePath *string -var persistSnapshot *bool +var full *bool +var watch *bool func init() { root.RootCmd.AddCommand(syncCmd) - interval = syncCmd.Flags().Duration("interval", 1*time.Second, "project files polling interval") + interval = syncCmd.Flags().Duration("interval", 1*time.Second, "file system polling interval (for --watch)") remotePath = syncCmd.Flags().String("remote-path", "", "remote path to store repo in. eg: /Repos/me@example.com/test-repo") - persistSnapshot = syncCmd.Flags().Bool("persist-snapshot", true, "whether to store local snapshots of sync state") + full = syncCmd.Flags().Bool("full", false, "perform full synchronization (default is incremental)") + watch = syncCmd.Flags().Bool("watch", false, "watch local file system for changes") } diff --git a/internal/sync_test.go b/internal/sync_test.go index 1c5ac6f8..8d0ba86f 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -29,7 +29,7 @@ import ( // Please run using the deco env test or deco env shell func setupRepo(t *testing.T, wsc *databricks.WorkspaceClient, ctx context.Context) (localRoot, remoteRoot string) { me, err := wsc.CurrentUser.Me(ctx) - assert.NoError(t, err) + require.NoError(t, err) repoUrl := "https://github.com/shreyas-goenka/empty-repo.git" repoPath := fmt.Sprintf("/Repos/%s/%s", me.UserName, RandomName("empty-repo-sync-integration-")) @@ -38,7 +38,7 @@ func setupRepo(t *testing.T, wsc *databricks.WorkspaceClient, ctx context.Contex Url: repoUrl, Provider: "gitHub", }) - assert.NoError(t, err) + require.NoError(t, err) t.Cleanup(func() { err := wsc.Repos.DeleteByRepoId(ctx, repoInfo.Id) @@ -50,7 +50,7 @@ func setupRepo(t *testing.T, wsc *databricks.WorkspaceClient, ctx context.Contex cmd := exec.Command("git", "clone", repoUrl) cmd.Dir = tempDir err = cmd.Run() - assert.NoError(t, err) + require.NoError(t, err) localRoot = filepath.Join(tempDir, "empty-repo") remoteRoot = repoPath @@ -168,7 +168,7 @@ func TestAccFullFileSync(t *testing.T) { // Run `bricks sync` in the background. t.Setenv("BRICKS_ROOT", localRepoPath) - c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=false") + c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--full", "--watch") c.RunBackground() assertSync := assertSync{ @@ -214,7 +214,7 @@ func TestAccIncrementalFileSync(t *testing.T) { // Run `bricks sync` in the background. t.Setenv("BRICKS_ROOT", localRepoPath) - c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") + c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch") c.RunBackground() assertSync := assertSync{ @@ -262,7 +262,7 @@ func TestAccNestedFolderSync(t *testing.T) { // Run `bricks sync` in the background. t.Setenv("BRICKS_ROOT", localRepoPath) - c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") + c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch") c.RunBackground() assertSync := assertSync{ @@ -316,7 +316,7 @@ func TestAccIncrementalFileOverwritesFolder(t *testing.T) { // Run `bricks sync` in the background. t.Setenv("BRICKS_ROOT", localRepoPath) - c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") + c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch") c.RunBackground() assertSync := assertSync{ @@ -367,7 +367,7 @@ func TestAccIncrementalSyncPythonNotebookToFile(t *testing.T) { // Run `bricks sync` in the background. t.Setenv("BRICKS_ROOT", localRepoPath) - c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") + c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch") c.RunBackground() assertSync := assertSync{ @@ -406,7 +406,7 @@ func TestAccIncrementalSyncFileToPythonNotebook(t *testing.T) { // Run `bricks sync` in the background. t.Setenv("BRICKS_ROOT", localRepoPath) - c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") + c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch") c.RunBackground() assertSync := assertSync{ @@ -451,7 +451,7 @@ func TestAccIncrementalSyncPythonNotebookDelete(t *testing.T) { // Run `bricks sync` in the background. t.Setenv("BRICKS_ROOT", localRepoPath) - c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--persist-snapshot=true") + c := NewCobraTestRunner(t, "sync", "--remote-path", remoteRepoPath, "--watch") c.RunBackground() assertSync := assertSync{ diff --git a/libs/sync/sync.go b/libs/sync/sync.go index ec67c6cc..f1198365 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -3,6 +3,8 @@ package sync import ( "context" "fmt" + "log" + "sync" "time" "github.com/databricks/bricks/git" @@ -14,7 +16,7 @@ type SyncOptions struct { LocalPath string RemotePath string - PersistSnapshot bool + Full bool SnapshotBasePath string @@ -28,7 +30,9 @@ type SyncOptions struct { type Sync struct { *SyncOptions - fileSet *git.FileSet + fileSet *git.FileSet + snapshot *Snapshot + repoFiles *repofiles.RepoFiles } // New initializes and returns a new [Sync] instance. @@ -52,16 +56,86 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, fmt.Errorf("failed to resolve host for snapshot") } + // For full sync, we start with an empty snapshot. + // For incremental sync, we try to load an existing snapshot to start from. + var snapshot *Snapshot + if opts.Full { + snapshot, err = newSnapshot(&opts) + if err != nil { + return nil, fmt.Errorf("unable to instantiate new sync snapshot: %w", err) + } + } else { + snapshot, err = loadOrNewSnapshot(&opts) + if err != nil { + return nil, fmt.Errorf("unable to load sync snapshot: %w", err) + } + } + + repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient) + return &Sync{ SyncOptions: &opts, - fileSet: fileSet, + + fileSet: fileSet, + snapshot: snapshot, + repoFiles: repoFiles, }, nil } -// RunWatchdog kicks off a polling loop to monitor local changes and synchronize -// them to the remote workspace path. -func (s *Sync) RunWatchdog(ctx context.Context) error { +func (s *Sync) RunOnce(ctx context.Context) error { repoFiles := repofiles.Create(s.RemotePath, s.LocalPath, s.WorkspaceClient) - syncCallback := syncCallback(ctx, repoFiles) - return spawnWatchdog(ctx, syncCallback, s) + applyDiff := syncCallback(ctx, repoFiles) + + // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement + // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 + all, err := s.fileSet.All() + if err != nil { + log.Printf("[ERROR] cannot list files: %s", err) + return err + } + + change, err := s.snapshot.diff(all) + if err != nil { + return err + } + if change.IsEmpty() { + return nil + } + + log.Printf("[INFO] Action: %v", change) + err = applyDiff(change) + if err != nil { + return err + } + + err = s.snapshot.Save(ctx) + if err != nil { + log.Printf("[ERROR] cannot store snapshot: %s", err) + return err + } + + return nil +} + +func (s *Sync) RunContinuous(ctx context.Context) error { + var once sync.Once + + ticker := time.NewTicker(s.PollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + err := s.RunOnce(ctx) + if err != nil { + return err + } + + once.Do(func() { + log.Printf("[INFO] Initial Sync Complete") + }) + } + } } diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index bf8b7717..4ac86456 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -3,22 +3,11 @@ package sync import ( "context" "log" - "sync" - "time" "github.com/databricks/bricks/libs/sync/repofiles" "golang.org/x/sync/errgroup" ) -// TODO: add .databricks to .gitignore on bricks init -type watchdog struct { - ticker *time.Ticker - wg sync.WaitGroup - failure error // data race? make channel? - - sync *Sync -} - // See https://docs.databricks.com/resources/limits.html#limits-api-rate-limits for per api // rate limits const MaxRequestsInFlight = 20 @@ -67,78 +56,3 @@ func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(loca return nil } } - -func spawnWatchdog(ctx context.Context, - applyDiff func(diff) error, - sync *Sync) error { - w := &watchdog{ - ticker: time.NewTicker(sync.PollInterval), - sync: sync, - } - w.wg.Add(1) - go w.main(ctx, applyDiff, sync.RemotePath) - w.wg.Wait() - return w.failure -} - -// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement -// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 -func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error, remotePath string) { - defer w.wg.Done() - snapshot, err := newSnapshot(w.sync.SyncOptions) - if err != nil { - log.Printf("[ERROR] cannot create snapshot: %s", err) - w.failure = err - return - } - if w.sync.PersistSnapshot { - snapshot, err = loadOrNewSnapshot(w.sync.SyncOptions) - if err != nil { - log.Printf("[ERROR] cannot load snapshot: %s", err) - w.failure = err - return - } - } - var onlyOnceInitLog sync.Once - for { - select { - case <-ctx.Done(): - return - case <-w.ticker.C: - all, err := w.sync.fileSet.All() - if err != nil { - log.Printf("[ERROR] cannot list files: %s", err) - w.failure = err - return - } - change, err := snapshot.diff(all) - if err != nil { - w.failure = err - return - } - if change.IsEmpty() { - onlyOnceInitLog.Do(func() { - log.Printf("[INFO] Initial Sync Complete") - }) - continue - } - log.Printf("[INFO] Action: %v", change) - err = applyDiff(change) - if err != nil { - w.failure = err - return - } - if w.sync.PersistSnapshot { - err = snapshot.Save(ctx) - if err != nil { - log.Printf("[ERROR] cannot store snapshot: %s", err) - w.failure = err - return - } - } - onlyOnceInitLog.Do(func() { - log.Printf("[INFO] Initial Sync Complete") - }) - } - } -}