From 16bb2241087d8a003d6bc650ea81516fa893b90f Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 12 Jun 2023 13:44:00 +0200 Subject: [PATCH] Add directory tracking to sync (#425) ## Changes This change replaces usage of the `repofiles` package with the `filer` package to consolidate WSFS code paths. The `repofiles` package implemented the following behavior. If a file at `foo/bar.txt` was created and removed, the directory `foo` was kept around because we do not perform directory tracking. If subsequently, a file at `foo` was created, it resulted in an `fs.ErrExist` because it is impossible to overwrite a directory. It would then perform a recursive delete of the path if this happened and retry the file write. To make this use case work without resorting to a recursive delete on conflict, we need to implement directory tracking as part of sync. The approach in this commit is as follows: 1. Maintain set of directories needed for current set of files. Compare to previous set of files. This results in mkdir of added directories and rmdir of removed directories. 2. Creation of new directories should happen prior to writing files. Otherwise, many file writes may race to create the same parent directories, resulting in additional API calls. Removal of existing directories should happen after removing files. 3. Making new directories can be deduped across common prefixes where only the longest prefix is created recursively. 4. Removing existing directories must happen sequentially, starting with the longest prefix. 5. Removal of directories is a best effort. It fails only if the directory is not empty, and if this happens we know something placed a file or directory manually, outside of sync. ## Tests * Existing integration tests pass (modified where it used to assert directories weren't cleaned up) * New integration test to confirm the inability to remove a directory doesn't fail the sync run --- internal/sync_test.go | 64 +++++++++-- libs/sync/diff.go | 92 ++++++++++++++- libs/sync/diff_test.go | 73 ++++++++++++ libs/sync/dirset.go | 54 +++++++++ libs/sync/dirset_test.go | 37 ++++++ libs/sync/repofiles/repofiles.go | 159 -------------------------- libs/sync/repofiles/repofiles_test.go | 88 -------------- libs/sync/snapshot.go | 15 +++ libs/sync/snapshot_test.go | 6 + libs/sync/sync.go | 23 ++-- libs/sync/watchdog.go | 130 +++++++++++++++------ 11 files changed, 444 insertions(+), 297 deletions(-) create mode 100644 libs/sync/diff_test.go create mode 100644 libs/sync/dirset.go create mode 100644 libs/sync/dirset_test.go delete mode 100644 libs/sync/repofiles/repofiles.go delete mode 100644 libs/sync/repofiles/repofiles_test.go diff --git a/internal/sync_test.go b/internal/sync_test.go index 89b318a9..09418a85 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "io/fs" "net/http" "os" "os/exec" @@ -15,6 +16,7 @@ import ( "time" _ "github.com/databricks/cli/cmd/sync" + "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/sync" "github.com/databricks/cli/libs/testfile" "github.com/databricks/databricks-sdk-go" @@ -63,6 +65,7 @@ type syncTest struct { t *testing.T c *cobraTestRunner w *databricks.WorkspaceClient + f filer.Filer localRoot string remoteRoot string } @@ -73,6 +76,8 @@ func setupSyncTest(t *testing.T, args ...string) *syncTest { w := databricks.Must(databricks.NewWorkspaceClient()) localRoot := t.TempDir() remoteRoot := temporaryWorkspaceDir(t, w) + f, err := filer.NewWorkspaceFilesClient(w, remoteRoot) + require.NoError(t, err) // Prepend common arguments. args = append([]string{ @@ -90,6 +95,7 @@ func setupSyncTest(t *testing.T, args ...string) *syncTest { t: t, c: c, w: w, + f: f, localRoot: localRoot, remoteRoot: remoteRoot, } @@ -160,6 +166,21 @@ func (a *syncTest) remoteFileContent(ctx context.Context, relativePath string, e }, 30*time.Second, 5*time.Second) } +func (a *syncTest) remoteNotExist(ctx context.Context, relativePath string) { + _, err := a.f.Stat(ctx, relativePath) + require.ErrorIs(a.t, err, fs.ErrNotExist) +} + +func (a *syncTest) remoteExists(ctx context.Context, relativePath string) { + _, err := a.f.Stat(ctx, relativePath) + require.NoError(a.t, err) +} + +func (a *syncTest) touchFile(ctx context.Context, path string) { + err := a.f.Write(ctx, path, strings.NewReader("contents"), filer.CreateParentDirectories) + require.NoError(a.t, err) +} + func (a *syncTest) objectType(ctx context.Context, relativePath string, expected string) { path := path.Join(a.remoteRoot, relativePath) @@ -297,11 +318,43 @@ func TestAccSyncNestedFolderSync(t *testing.T) { // delete f.Remove(t) assertSync.waitForCompletionMarker() - // directories are not cleaned up right now. This is not ideal - assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{}) + assertSync.remoteNotExist(ctx, "dir1") assertSync.snapshotContains(append(repoFiles, ".gitignore")) } +func TestAccSyncNestedFolderDoesntFailOnNonEmptyDirectory(t *testing.T) { + ctx := context.Background() + assertSync := setupSyncTest(t, "--watch") + + // .gitignore is created by the sync process to enforce .databricks is not synced + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + + // New file + localFilePath := filepath.Join(assertSync.localRoot, "dir1/dir2/dir3/foo.txt") + err := os.MkdirAll(filepath.Dir(localFilePath), 0o755) + assert.NoError(t, err) + f := testfile.CreateFile(t, localFilePath) + defer f.Close(t) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{"foo.txt"}) + + // Add file to dir1 to simulate a user writing to the workspace directly. + assertSync.touchFile(ctx, "dir1/foo.txt") + + // Remove original file. + f.Remove(t) + assertSync.waitForCompletionMarker() + + // Sync should have removed these directories. + assertSync.remoteNotExist(ctx, "dir1/dir2/dir3") + assertSync.remoteNotExist(ctx, "dir1/dir2") + + // Sync should have ignored not being able to delete dir1. + assertSync.remoteExists(ctx, "dir1/foo.txt") + assertSync.remoteExists(ctx, "dir1") +} + func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { ctx := context.Background() assertSync := setupSyncTest(t, "--watch") @@ -326,12 +379,10 @@ func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { // delete f.Remove(t) assertSync.waitForCompletionMarker() - // directories are not cleaned up right now. This is not ideal - assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", []string{}) + assertSync.remoteNotExist(ctx, "dir1/a b+c/c+d e") assertSync.snapshotContains(append(repoFiles, ".gitignore")) } -// sync does not clean up empty directories from the workspace file system. // This is a check for the edge case when a user does the following: // // 1. Add file foo/bar.txt @@ -359,8 +410,7 @@ func TestAccSyncIncrementalFileOverwritesFolder(t *testing.T) { f.Remove(t) os.Remove(filepath.Join(assertSync.localRoot, "foo")) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "foo", []string{}) - assertSync.objectType(ctx, "foo", "DIRECTORY") + assertSync.remoteNotExist(ctx, "foo") assertSync.snapshotContains(append(repoFiles, ".gitignore")) f2 := testfile.CreateFile(t, filepath.Join(assertSync.localRoot, "foo")) diff --git a/libs/sync/diff.go b/libs/sync/diff.go index 72c1d5aa..26e99b34 100644 --- a/libs/sync/diff.go +++ b/libs/sync/diff.go @@ -1,10 +1,100 @@ package sync +import ( + "path" +) + type diff struct { - put []string delete []string + rmdir []string + mkdir []string + put []string } func (d diff) IsEmpty() bool { return len(d.put) == 0 && len(d.delete) == 0 } + +// groupedMkdir returns a slice of slices of paths to create. +// Because the underlying mkdir calls create intermediate directories, +// we can group them together to reduce the total number of calls. +// This returns a slice of a slice for parity with [groupedRmdir]. +func (d diff) groupedMkdir() [][]string { + // Compute the set of prefixes of all paths to create. + prefixes := make(map[string]bool) + for _, name := range d.mkdir { + dir := path.Dir(name) + for dir != "." && dir != "/" { + prefixes[dir] = true + dir = path.Dir(dir) + } + } + + var out []string + + // Collect all paths that are not a prefix of another path. + for _, name := range d.mkdir { + if !prefixes[name] { + out = append(out, name) + } + } + + return [][]string{out} +} + +// groupedRmdir returns a slice of slices of paths to delete. +// The outer slice is ordered such that each inner slice can be +// deleted in parallel, as long as it is processed in order. +// The first entry will contain leaf directories, the second entry +// will contain intermediate directories, and so on. +func (d diff) groupedRmdir() [][]string { + // Compute the number of times each directory is a prefix of another directory. + prefixes := make(map[string]int) + for _, dir := range d.rmdir { + prefixes[dir] = 0 + } + for _, dir := range d.rmdir { + dir = path.Dir(dir) + for dir != "." && dir != "/" { + // Increment the prefix count for this directory, only if it + // it one of the directories we are deleting. + if _, ok := prefixes[dir]; ok { + prefixes[dir]++ + } + dir = path.Dir(dir) + } + } + + var out [][]string + + for len(prefixes) > 0 { + var toDelete []string + + // Find directories which are not a prefix of another directory. + // These are the directories we can delete. + for dir, count := range prefixes { + if count == 0 { + toDelete = append(toDelete, dir) + delete(prefixes, dir) + } + } + + // Remove these directories from the prefixes map. + for _, dir := range toDelete { + dir = path.Dir(dir) + for dir != "." && dir != "/" { + // Decrement the prefix count for this directory, only if it + // it one of the directories we are deleting. + if _, ok := prefixes[dir]; ok { + prefixes[dir]-- + } + dir = path.Dir(dir) + } + } + + // Add these directories to the output. + out = append(out, toDelete) + } + + return out +} diff --git a/libs/sync/diff_test.go b/libs/sync/diff_test.go new file mode 100644 index 00000000..ff448872 --- /dev/null +++ b/libs/sync/diff_test.go @@ -0,0 +1,73 @@ +package sync + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDiffGroupedMkdir(t *testing.T) { + d := diff{ + mkdir: []string{ + "foo", + "foo/bar", + "foo/bar/baz1", + "foo/bar/baz2", + "foo1", + "a/b", + "a/b/c/d/e/f", + }, + } + + // Expect only leaf directories to be included. + out := d.groupedMkdir() + assert.Len(t, out, 1) + assert.ElementsMatch(t, []string{ + "foo/bar/baz1", + "foo/bar/baz2", + "foo1", + "a/b/c/d/e/f", + }, out[0]) +} + +func TestDiffGroupedRmdir(t *testing.T) { + d := diff{ + rmdir: []string{ + "a/b/c/d/e/f", + "a/b/c/d/e", + "a/b/c/d", + "a/b/c", + "a/b/e/f/g/h", + "a/b/e/f/g", + "a/b/e/f", + "a/b/e", + "a/b", + }, + } + + out := d.groupedRmdir() + assert.Len(t, out, 5) + assert.ElementsMatch(t, []string{"a/b/c/d/e/f", "a/b/e/f/g/h"}, out[0]) + assert.ElementsMatch(t, []string{"a/b/c/d/e", "a/b/e/f/g"}, out[1]) + assert.ElementsMatch(t, []string{"a/b/c/d", "a/b/e/f"}, out[2]) + assert.ElementsMatch(t, []string{"a/b/c", "a/b/e"}, out[3]) + assert.ElementsMatch(t, []string{"a/b"}, out[4]) +} + +func TestDiffGroupedRmdirWithLeafsOnly(t *testing.T) { + d := diff{ + rmdir: []string{ + "foo/bar/baz1", + "foo/bar1", + "foo/bar/baz2", + "foo/bar2", + "foo1", + "foo2", + }, + } + + // Expect all directories to be included. + out := d.groupedRmdir() + assert.Len(t, out, 1) + assert.ElementsMatch(t, d.rmdir, out[0]) +} diff --git a/libs/sync/dirset.go b/libs/sync/dirset.go new file mode 100644 index 00000000..3c37c97c --- /dev/null +++ b/libs/sync/dirset.go @@ -0,0 +1,54 @@ +package sync + +import ( + "path" + "path/filepath" + "sort" +) + +// DirSet is a set of directories. +type DirSet map[string]struct{} + +// MakeDirSet turns a list of file paths into the complete set of directories +// that is needed to store them (including parent directories). +func MakeDirSet(files []string) DirSet { + out := map[string]struct{}{} + + // Iterate over all files. + for _, f := range files { + // Get the directory of the file in /-separated form. + dir := filepath.ToSlash(filepath.Dir(f)) + + // Add this directory and its parents until it is either "." or already in the set. + for dir != "." { + if _, ok := out[dir]; ok { + break + } + out[dir] = struct{}{} + dir = path.Dir(dir) + } + } + + return out +} + +// Slice returns a sorted copy of the dirset elements as a slice. +func (dirset DirSet) Slice() []string { + out := make([]string, 0, len(dirset)) + for dir := range dirset { + out = append(out, dir) + } + sort.Strings(out) + return out +} + +// Remove returns the set difference of two DirSets. +func (dirset DirSet) Remove(other DirSet) DirSet { + out := map[string]struct{}{} + for dir := range dirset { + if _, ok := other[dir]; !ok { + out[dir] = struct{}{} + } + } + return out +} diff --git a/libs/sync/dirset_test.go b/libs/sync/dirset_test.go new file mode 100644 index 00000000..7e920819 --- /dev/null +++ b/libs/sync/dirset_test.go @@ -0,0 +1,37 @@ +package sync + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/exp/maps" +) + +func TestMakeDirSet(t *testing.T) { + assert.ElementsMatch(t, + []string{ + "a", + "a/b", + "a/b/c", + "a/b/d", + "a/e", + "b", + }, + maps.Keys( + MakeDirSet([]string{ + "./a/b/c/file1", + "./a/b/c/file2", + "./a/b/d/file", + "./a/e/file", + "b/file", + }), + ), + ) +} + +func TestDirSetRemove(t *testing.T) { + a := MakeDirSet([]string{"./a/b/c/file1"}) + b := MakeDirSet([]string{"./a/b/d/file2"}) + assert.ElementsMatch(t, []string{"a/b/c"}, a.Remove(b).Slice()) + assert.ElementsMatch(t, []string{"a/b/d"}, b.Remove(a).Slice()) +} diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go deleted file mode 100644 index 8fcabc11..00000000 --- a/libs/sync/repofiles/repofiles.go +++ /dev/null @@ -1,159 +0,0 @@ -package repofiles - -import ( - "context" - "errors" - "fmt" - "net/http" - "net/url" - "os" - "path" - "path/filepath" - "strings" - - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/client" - "github.com/databricks/databricks-sdk-go/service/workspace" -) - -// RepoFiles wraps reading and writing into a remote repo with safeguards to prevent -// accidental deletion of repos and more robust methods to overwrite workspace files -type RepoFiles struct { - repoRoot string - localRoot string - workspaceClient *databricks.WorkspaceClient -} - -func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles { - return &RepoFiles{ - repoRoot: repoRoot, - localRoot: localRoot, - workspaceClient: workspaceClient, - } -} - -func (r *RepoFiles) remotePath(relativePath string) (string, error) { - fullPath := path.Join(r.repoRoot, relativePath) - cleanFullPath := path.Clean(fullPath) - if !strings.HasPrefix(cleanFullPath, r.repoRoot) { - return "", fmt.Errorf("relative file path is not inside repo root: %s", relativePath) - } - // path.Clean will remove any trailing / so it's enough to check cleanFullPath == r.repoRoot - if cleanFullPath == r.repoRoot { - return "", fmt.Errorf("file path relative to repo root cannot be empty: %s", relativePath) - } - return cleanFullPath, nil -} - -func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) { - localPath := filepath.Join(r.localRoot, relativePath) - return os.ReadFile(localPath) -} - -func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error { - apiClientConfig := r.workspaceClient.Config - apiClientConfig.HTTPTimeoutSeconds = 600 - apiClient, err := client.New(apiClientConfig) - if err != nil { - return err - } - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) - apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=true", escapedPath) - - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - - // Handling some edge cases when an upload might fail - // - // We cannot do more precise error scoping here because the API does not - // provide descriptive errors yet - // - // TODO: narrow down the error condition scope of this "if" block to only - // trigger for the specific edge cases instead of all errors once the API - // implements them - if err != nil { - // Delete any artifact files incase non overwriteable by the current file - // type and thus are failing the PUT request. - // files, folders and notebooks might not have been cleaned up and they - // can't overwrite each other. If a folder `foo` exists, then attempts to - // PUT a file `foo` will fail - err := r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: true, - }, - ) - // ignore RESOURCE_DOES_NOT_EXIST here incase nothing existed at remotePath - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - err = nil - } - if err != nil { - return err - } - - // Mkdir parent dirs incase they are what's causing the PUT request to - // fail - err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) - if err != nil { - return fmt.Errorf("could not mkdir to put file: %s", err) - } - - // Attempt to upload file again after cleanup/setup - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - if err != nil { - return err - } - } - return nil -} - -func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - return r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: false, - }, - ) -} - -// The API calls for a python script foo.py would be -// `PUT foo.py` -// `DELETE foo.py` -// -// The API calls for a python notebook foo.py would be -// `PUT foo.py` -// `DELETE foo` -// -// The workspace file system backend strips .py from the file name if the python -// file is a notebook -func (r *RepoFiles) PutFile(ctx context.Context, relativePath string) error { - content, err := r.readLocal(relativePath) - if err != nil { - return err - } - - return r.writeRemote(ctx, relativePath, content) -} - -func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error { - err := r.deleteRemote(ctx, relativePath) - - // We explictly ignore RESOURCE_DOES_NOT_EXIST error to make delete idempotent - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - err = nil - } - return nil -} - -// TODO: write integration tests for all non happy path cases that rely on -// specific behaviour of the workspace apis diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go deleted file mode 100644 index 2a881d90..00000000 --- a/libs/sync/repofiles/repofiles_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package repofiles - -import ( - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRepoFilesRemotePath(t *testing.T) { - repoRoot := "/Repos/doraemon/bar" - repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil) - - remotePath, err := repoFiles.remotePath("a/b/c") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/../d") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/d", remotePath) - - remotePath, err = repoFiles.remotePath("a/../c") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/c/.") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/c/d/./../../f/g") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/f/g", remotePath) - - _, err = repoFiles.remotePath("..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ..`) - - _, err = repoFiles.remotePath("a/../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: a/../..`) - - _, err = repoFiles.remotePath("./../.") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - - _, err = repoFiles.remotePath("/./.././..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: /./.././..`) - - _, err = repoFiles.remotePath("./../.") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - - _, err = repoFiles.remotePath("./..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./..`) - - _, err = repoFiles.remotePath("./../../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../../..`) - - _, err = repoFiles.remotePath("./../a/./b../../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../a/./b../../..`) - - _, err = repoFiles.remotePath("../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ../..`) - - _, err = repoFiles.remotePath(".//a/..//./b/..") - assert.ErrorContains(t, err, `file path relative to repo root cannot be empty`) - - _, err = repoFiles.remotePath("a/b/../..") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath("") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath(".") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath("/") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") -} - -func TestRepoReadLocal(t *testing.T) { - tempDir := t.TempDir() - helloPath := filepath.Join(tempDir, "hello.txt") - err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) - assert.NoError(t, err) - - repoFiles := Create("/Repos/doraemon/bar", tempDir, nil) - bytes, err := repoFiles.readLocal("./a/../hello.txt") - assert.NoError(t, err) - assert.Equal(t, "my name is doraemon :P", string(bytes)) -} diff --git a/libs/sync/snapshot.go b/libs/sync/snapshot.go index 1ea7b18b..1680f046 100644 --- a/libs/sync/snapshot.go +++ b/libs/sync/snapshot.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/cli/libs/fileset" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/notebook" + "golang.org/x/exp/maps" ) // Bump it up every time a potentially breaking change is made to the snapshot schema @@ -183,6 +184,18 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e localFileSet[f.Relative] = struct{}{} } + // Capture both previous and current set of files. + previousFiles := maps.Keys(lastModifiedTimes) + currentFiles := maps.Keys(localFileSet) + + // Build directory sets to figure out which directories to create and which to remove. + previousDirectories := MakeDirSet(previousFiles) + currentDirectories := MakeDirSet(currentFiles) + + // Create new directories; remove stale directories. + change.mkdir = currentDirectories.Remove(previousDirectories).Slice() + change.rmdir = previousDirectories.Remove(currentDirectories).Slice() + for _, f := range all { // get current modified timestamp modified := f.Modified() @@ -252,6 +265,7 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e // add them to a delete batch change.delete = append(change.delete, remoteName) } + // and remove them from the snapshot for _, remoteName := range change.delete { // we do note assert that remoteName exists in remoteToLocalNames since it @@ -262,5 +276,6 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e delete(remoteToLocalNames, remoteName) delete(localToRemoteNames, localName) } + return } diff --git a/libs/sync/snapshot_test.go b/libs/sync/snapshot_test.go index 8154b791..c2e8f6b8 100644 --- a/libs/sync/snapshot_test.go +++ b/libs/sync/snapshot_test.go @@ -139,7 +139,10 @@ func TestFolderDiff(t *testing.T) { change, err := state.diff(ctx, files) assert.NoError(t, err) assert.Len(t, change.delete, 0) + assert.Len(t, change.rmdir, 0) + assert.Len(t, change.mkdir, 1) assert.Len(t, change.put, 1) + assert.Contains(t, change.mkdir, "foo") assert.Contains(t, change.put, "foo/bar.py") f1.Remove(t) @@ -148,8 +151,11 @@ func TestFolderDiff(t *testing.T) { change, err = state.diff(ctx, files) assert.NoError(t, err) assert.Len(t, change.delete, 1) + assert.Len(t, change.rmdir, 1) + assert.Len(t, change.mkdir, 0) assert.Len(t, change.put, 0) assert.Contains(t, change.delete, "foo/bar") + assert.Contains(t, change.rmdir, "foo") } func TestPythonNotebookDiff(t *testing.T) { diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 54d0624e..5c4c9d8f 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -5,9 +5,9 @@ import ( "fmt" "time" + "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/git" "github.com/databricks/cli/libs/log" - "github.com/databricks/cli/libs/sync/repofiles" "github.com/databricks/databricks-sdk-go" ) @@ -29,9 +29,9 @@ type SyncOptions struct { type Sync struct { *SyncOptions - fileSet *git.FileSet - snapshot *Snapshot - repoFiles *repofiles.RepoFiles + fileSet *git.FileSet + snapshot *Snapshot + filer filer.Filer // Synchronization progress events are sent to this event notifier. notifier EventNotifier @@ -77,16 +77,19 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { } } - repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient) + filer, err := filer.NewWorkspaceFilesClient(opts.WorkspaceClient, opts.RemotePath) + if err != nil { + return nil, err + } return &Sync{ SyncOptions: &opts, - fileSet: fileSet, - snapshot: snapshot, - repoFiles: repoFiles, - notifier: &NopNotifier{}, - seq: 0, + fileSet: fileSet, + snapshot: snapshot, + filer: filer, + notifier: &NopNotifier{}, + seq: 0, }, nil } diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 3e7acccc..b0c96e01 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -2,36 +2,81 @@ package sync import ( "context" + "errors" + "io/fs" + "os" + "path/filepath" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" "golang.org/x/sync/errgroup" ) // Maximum number of concurrent requests during sync. const MaxRequestsInFlight = 20 -// Perform a DELETE of the specified remote path. -func (s *Sync) applyDelete(ctx context.Context, group *errgroup.Group, remoteName string) { - // Return early if the context has already been cancelled. - select { - case <-ctx.Done(): - return - default: - // Proceed. +// Delete the specified path. +func (s *Sync) applyDelete(ctx context.Context, remoteName string) error { + s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) + + err := s.filer.Delete(ctx, remoteName) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return err } - group.Go(func() error { - s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) - err := s.repoFiles.DeleteFile(ctx, remoteName) - if err != nil { - return err - } - s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) - return nil - }) + s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) + return nil +} + +// Remove the directory at the specified path. +func (s *Sync) applyRmdir(ctx context.Context, remoteName string) error { + s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) + + err := s.filer.Delete(ctx, remoteName) + if err != nil { + // Directory deletion is opportunistic, so we ignore errors. + log.Debugf(ctx, "error removing directory %s: %s", remoteName, err) + } + + s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) + return nil +} + +// Create a directory at the specified path. +func (s *Sync) applyMkdir(ctx context.Context, localName string) error { + s.notifyProgress(ctx, EventActionPut, localName, 0.0) + + err := s.filer.Mkdir(ctx, localName) + if err != nil { + return err + } + + s.notifyProgress(ctx, EventActionPut, localName, 1.0) + return nil } // Perform a PUT of the specified local path. -func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName string) { +func (s *Sync) applyPut(ctx context.Context, localName string) error { + s.notifyProgress(ctx, EventActionPut, localName, 0.0) + + localFile, err := os.Open(filepath.Join(s.LocalPath, localName)) + if err != nil { + return err + } + + defer localFile.Close() + + opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} + err = s.filer.Write(ctx, localName, localFile, opts...) + if err != nil { + return err + } + + s.notifyProgress(ctx, EventActionPut, localName, 1.0) + return nil +} + +func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context.Context, string) error, path string) { // Return early if the context has already been cancelled. select { case <-ctx.Done(): @@ -41,28 +86,49 @@ func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName st } group.Go(func() error { - s.notifyProgress(ctx, EventActionPut, localName, 0.0) - err := s.repoFiles.PutFile(ctx, localName) - if err != nil { - return err - } - s.notifyProgress(ctx, EventActionPut, localName, 1.0) - return nil + return fn(ctx, path) }) } -func (s *Sync) applyDiff(ctx context.Context, d diff) error { +func groupRunParallel(ctx context.Context, paths []string, fn func(context.Context, string) error) error { group, ctx := errgroup.WithContext(ctx) group.SetLimit(MaxRequestsInFlight) - for _, remoteName := range d.delete { - s.applyDelete(ctx, group, remoteName) - } - - for _, localName := range d.put { - s.applyPut(ctx, group, localName) + for _, path := range paths { + groupRunSingle(ctx, group, fn, path) } // Wait for goroutines to finish and return first non-nil error return if any. return group.Wait() } + +func (s *Sync) applyDiff(ctx context.Context, d diff) error { + var err error + + // Delete files in parallel. + err = groupRunParallel(ctx, d.delete, s.applyDelete) + if err != nil { + return err + } + + // Delete directories ordered by depth from leaf to root. + for _, group := range d.groupedRmdir() { + err = groupRunParallel(ctx, group, s.applyRmdir) + if err != nil { + return err + } + } + + // Create directories (leafs only because intermediates are created automatically). + for _, group := range d.groupedMkdir() { + err = groupRunParallel(ctx, group, s.applyMkdir) + if err != nil { + return err + } + } + + // Put files in parallel. + err = groupRunParallel(ctx, d.put, s.applyPut) + + return err +}