mirror of https://github.com/databricks/cli.git
[DECO-396] Post mortem followups on sync deletes repo (#119)
This PR: - Implements safeguards for not accidentally/maliciously deleting repos by sanitizing relative paths - Adds versioning for snapshot schemas to allow invalidation if needed - Adds logic to delete preexisting remote artifacts that might not have been cleaned up properly if they conflict with an upload - A bunch of tests for the changes here Co-authored-by: Pieter Noordhuis <pieter.noordhuis@databricks.com>
This commit is contained in:
parent
c255bd686a
commit
b42768801d
|
@ -0,0 +1,154 @@
|
|||
package repofiles
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/client"
|
||||
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||||
)
|
||||
|
||||
// RepoFiles wraps reading and writing into a remote repo with safeguards to prevent
|
||||
// accidental deletion of repos and more robust methods to overwrite workspace files
|
||||
type RepoFiles struct {
|
||||
repoRoot string
|
||||
localRoot string
|
||||
workspaceClient *databricks.WorkspaceClient
|
||||
}
|
||||
|
||||
func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles {
|
||||
return &RepoFiles{
|
||||
repoRoot: repoRoot,
|
||||
localRoot: localRoot,
|
||||
workspaceClient: workspaceClient,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RepoFiles) remotePath(relativePath string) (string, error) {
|
||||
fullPath := path.Join(r.repoRoot, relativePath)
|
||||
cleanFullPath := path.Clean(fullPath)
|
||||
if !strings.HasPrefix(cleanFullPath, r.repoRoot) {
|
||||
return "", fmt.Errorf("relative file path is not inside repo root: %s", relativePath)
|
||||
}
|
||||
// path.Clean will remove any trailing / so it's enough to check cleanFullPath == r.repoRoot
|
||||
if cleanFullPath == r.repoRoot {
|
||||
return "", fmt.Errorf("file path relative to repo root cannot be empty: %s", relativePath)
|
||||
}
|
||||
return cleanFullPath, nil
|
||||
}
|
||||
|
||||
func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) {
|
||||
localPath := filepath.Join(r.localRoot, relativePath)
|
||||
return os.ReadFile(localPath)
|
||||
}
|
||||
|
||||
func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error {
|
||||
apiClient, err := client.New(r.workspaceClient.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
remotePath, err := r.remotePath(relativePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
apiPath := fmt.Sprintf(
|
||||
"/api/2.0/workspace-files/import-file/%s?overwrite=true",
|
||||
strings.TrimLeft(remotePath, "/"))
|
||||
|
||||
err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil)
|
||||
|
||||
// Handling some edge cases when an upload might fail
|
||||
//
|
||||
// We cannot do more precise error scoping here because the API does not
|
||||
// provide descriptive errors yet
|
||||
//
|
||||
// TODO: narrow down the error condition scope of this "if" block to only
|
||||
// trigger for the specific edge cases instead of all errors once the API
|
||||
// implements them
|
||||
if err != nil {
|
||||
// Delete any artifact files incase non overwriteable by the current file
|
||||
// type and thus are failing the PUT request.
|
||||
// files, folders and notebooks might not have been cleaned up and they
|
||||
// can't overwrite each other. If a folder `foo` exists, then attempts to
|
||||
// PUT a file `foo` will fail
|
||||
err := r.workspaceClient.Workspace.Delete(ctx,
|
||||
workspace.Delete{
|
||||
Path: remotePath,
|
||||
Recursive: true,
|
||||
},
|
||||
)
|
||||
// ignore RESOURCE_DOES_NOT_EXIST here incase nothing existed at remotePath
|
||||
if val, ok := err.(apierr.APIError); ok && val.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||
err = nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Mkdir parent dirs incase they are what's causing the PUT request to
|
||||
// fail
|
||||
err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath))
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not mkdir to put file: %s", err)
|
||||
}
|
||||
|
||||
// Attempt to upload file again after cleanup/setup
|
||||
err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error {
|
||||
remotePath, err := r.remotePath(relativePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return r.workspaceClient.Workspace.Delete(ctx,
|
||||
workspace.Delete{
|
||||
Path: remotePath,
|
||||
Recursive: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// The API calls for a python script foo.py would be
|
||||
// `PUT foo.py`
|
||||
// `DELETE foo.py`
|
||||
//
|
||||
// The API calls for a python notebook foo.py would be
|
||||
// `PUT foo.py`
|
||||
// `DELETE foo`
|
||||
//
|
||||
// The workspace file system backend strips .py from the file name if the python
|
||||
// file is a notebook
|
||||
func (r *RepoFiles) PutFile(ctx context.Context, relativePath string) error {
|
||||
content, err := r.readLocal(relativePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return r.writeRemote(ctx, relativePath, content)
|
||||
}
|
||||
|
||||
func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error {
|
||||
err := r.deleteRemote(ctx, relativePath)
|
||||
|
||||
// We explictly ignore RESOURCE_DOES_NOT_EXIST error to make delete idempotent
|
||||
if val, ok := err.(apierr.APIError); ok && val.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: write integration tests for all non happy path cases that rely on
|
||||
// specific behaviour of the workspace apis
|
|
@ -0,0 +1,88 @@
|
|||
package repofiles
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestRepoFilesRemotePath(t *testing.T) {
|
||||
repoRoot := "/Repos/doraemon/bar"
|
||||
repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil)
|
||||
|
||||
remotePath, err := repoFiles.remotePath("a/b/c")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, repoRoot+"/a/b/c", remotePath)
|
||||
|
||||
remotePath, err = repoFiles.remotePath("a/b/../d")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, repoRoot+"/a/d", remotePath)
|
||||
|
||||
remotePath, err = repoFiles.remotePath("a/../c")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, repoRoot+"/c", remotePath)
|
||||
|
||||
remotePath, err = repoFiles.remotePath("a/b/c/.")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, repoRoot+"/a/b/c", remotePath)
|
||||
|
||||
remotePath, err = repoFiles.remotePath("a/b/c/d/./../../f/g")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, repoRoot+"/a/b/f/g", remotePath)
|
||||
|
||||
_, err = repoFiles.remotePath("..")
|
||||
assert.ErrorContains(t, err, `relative file path is not inside repo root: ..`)
|
||||
|
||||
_, err = repoFiles.remotePath("a/../..")
|
||||
assert.ErrorContains(t, err, `relative file path is not inside repo root: a/../..`)
|
||||
|
||||
_, err = repoFiles.remotePath("./../.")
|
||||
assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`)
|
||||
|
||||
_, err = repoFiles.remotePath("/./.././..")
|
||||
assert.ErrorContains(t, err, `relative file path is not inside repo root: /./.././..`)
|
||||
|
||||
_, err = repoFiles.remotePath("./../.")
|
||||
assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`)
|
||||
|
||||
_, err = repoFiles.remotePath("./..")
|
||||
assert.ErrorContains(t, err, `relative file path is not inside repo root: ./..`)
|
||||
|
||||
_, err = repoFiles.remotePath("./../../..")
|
||||
assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../../..`)
|
||||
|
||||
_, err = repoFiles.remotePath("./../a/./b../../..")
|
||||
assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../a/./b../../..`)
|
||||
|
||||
_, err = repoFiles.remotePath("../..")
|
||||
assert.ErrorContains(t, err, `relative file path is not inside repo root: ../..`)
|
||||
|
||||
_, err = repoFiles.remotePath(".//a/..//./b/..")
|
||||
assert.ErrorContains(t, err, `file path relative to repo root cannot be empty`)
|
||||
|
||||
_, err = repoFiles.remotePath("a/b/../..")
|
||||
assert.ErrorContains(t, err, "file path relative to repo root cannot be empty")
|
||||
|
||||
_, err = repoFiles.remotePath("")
|
||||
assert.ErrorContains(t, err, "file path relative to repo root cannot be empty")
|
||||
|
||||
_, err = repoFiles.remotePath(".")
|
||||
assert.ErrorContains(t, err, "file path relative to repo root cannot be empty")
|
||||
|
||||
_, err = repoFiles.remotePath("/")
|
||||
assert.ErrorContains(t, err, "file path relative to repo root cannot be empty")
|
||||
}
|
||||
|
||||
func TestRepoReadLocal(t *testing.T) {
|
||||
tempDir := t.TempDir()
|
||||
helloPath := filepath.Join(tempDir, "hello.txt")
|
||||
err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm)
|
||||
assert.NoError(t, err)
|
||||
|
||||
repoFiles := Create("/Repos/doraemon/bar", tempDir, nil)
|
||||
bytes, err := repoFiles.readLocal("./a/../hello.txt")
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "my name is doraemon :P", string(bytes))
|
||||
}
|
|
@ -5,7 +5,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
|
@ -19,6 +19,9 @@ import (
|
|||
"github.com/databricks/bricks/project"
|
||||
)
|
||||
|
||||
// Bump it up every time a potentially breaking change is made to the snapshot schema
|
||||
const LatestSnapshotVersion = "v1"
|
||||
|
||||
// A snapshot is a persistant store of knowledge bricks cli has about state of files
|
||||
// in the remote repo. We use the last modified times (mtime) of files to determine
|
||||
// whether a files need to be updated in the remote repo.
|
||||
|
@ -32,18 +35,26 @@ import (
|
|||
// local files are being synced to will make bricks cli switch to a different
|
||||
// snapshot for persisting/loading sync state
|
||||
type Snapshot struct {
|
||||
// version for snapshot schema. Only snapshots matching the latest snapshot
|
||||
// schema version are used and older ones are invalidated (by deleting them)
|
||||
Version string `json:"version"`
|
||||
|
||||
// hostname of the workspace this snapshot is for
|
||||
Host string `json:"host"`
|
||||
|
||||
// Path in workspace for project repo
|
||||
RemotePath string `json:"remote_path"`
|
||||
|
||||
// Map of all files present in the remote repo with the:
|
||||
// key: relative file path from project root
|
||||
// value: last time the remote instance of this file was updated
|
||||
LastUpdatedTimes map[string]time.Time `json:"last_modified_times"`
|
||||
|
||||
// This map maps local file names to their remote names
|
||||
// eg. notebook named "foo.py" locally would be stored as "foo", thus this
|
||||
// map will contain an entry "foo.py" -> "foo"
|
||||
LocalToRemoteNames map[string]string `json:"local_to_remote_names"`
|
||||
|
||||
// Inverse of localToRemoteNames. Together the form a bijective mapping (ie
|
||||
// there is a 1:1 unique mapping between local and remote name)
|
||||
RemoteToLocalNames map[string]string `json:"remote_to_local_names"`
|
||||
|
@ -96,6 +107,7 @@ func newSnapshot(ctx context.Context, remotePath string) (*Snapshot, error) {
|
|||
}
|
||||
|
||||
return &Snapshot{
|
||||
Version: LatestSnapshotVersion,
|
||||
Host: host,
|
||||
RemotePath: remotePath,
|
||||
LastUpdatedTimes: make(map[string]time.Time),
|
||||
|
@ -137,20 +149,23 @@ func (s *Snapshot) loadSnapshot(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
f, err := os.Open(snapshotPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open persisted sync snapshot file: %s", err)
|
||||
}
|
||||
defer f.Close()
|
||||
snapshotCopy := Snapshot{}
|
||||
|
||||
bytes, err := io.ReadAll(f)
|
||||
bytes, err := os.ReadFile(snapshotPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read sync snapshot from disk: %s", err)
|
||||
}
|
||||
err = json.Unmarshal(bytes, &s)
|
||||
err = json.Unmarshal(bytes, &snapshotCopy)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to json unmarshal persisted snapshot: %s", err)
|
||||
}
|
||||
// invalidate old snapshot with schema versions
|
||||
if snapshotCopy.Version != LatestSnapshotVersion {
|
||||
|
||||
log.Printf("Did not load existing snapshot because its version is %s while the latest version is %s", s.Version, LatestSnapshotVersion)
|
||||
return nil
|
||||
}
|
||||
*s = snapshotCopy
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -198,7 +213,7 @@ func getNotebookDetails(path string) (isNotebook bool, typeOfNotebook string, er
|
|||
return false, "", nil
|
||||
}
|
||||
|
||||
func (s Snapshot) diff(all []git.File) (change diff, err error) {
|
||||
func (s *Snapshot) diff(all []git.File) (change diff, err error) {
|
||||
currentFilenames := map[string]bool{}
|
||||
lastModifiedTimes := s.LastUpdatedTimes
|
||||
remoteToLocalNames := s.RemoteToLocalNames
|
||||
|
@ -250,12 +265,23 @@ func (s Snapshot) diff(all []git.File) (change diff, err error) {
|
|||
if exists {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: https://databricks.atlassian.net/browse/DECO-429
|
||||
// Add error wrapper giving instructions like this for all errors here :)
|
||||
remoteName, ok := localToRemoteNames[localName]
|
||||
if !ok {
|
||||
return change, fmt.Errorf("missing remote path for local path: %s. Please try syncing again after deleting .databricks/sync-snapshots dir from your project root", localName)
|
||||
}
|
||||
|
||||
// add them to a delete batch
|
||||
change.delete = append(change.delete, localToRemoteNames[localName])
|
||||
change.delete = append(change.delete, remoteName)
|
||||
}
|
||||
// and remove them from the snapshot
|
||||
for _, remoteName := range change.delete {
|
||||
// we do note assert that remoteName exists in remoteToLocalNames since it
|
||||
// will be missing for files with remote name changed
|
||||
localName := remoteToLocalNames[remoteName]
|
||||
|
||||
delete(lastModifiedTimes, localName)
|
||||
delete(remoteToLocalNames, remoteName)
|
||||
delete(localToRemoteNames, localName)
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/databricks/bricks/git"
|
||||
"github.com/databricks/bricks/project"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
|
@ -234,3 +237,116 @@ func TestErrorWhenIdenticalRemoteName(t *testing.T) {
|
|||
change, err = state.diff(files)
|
||||
assert.ErrorContains(t, err, "both foo and foo.py point to the same remote file location foo. Please remove one of them from your local project")
|
||||
}
|
||||
|
||||
func TestNewSnapshotDefaults(t *testing.T) {
|
||||
ctx := setupProject(t)
|
||||
snapshot, err := newSnapshot(ctx, "/Repos/foo/bar")
|
||||
prj := project.Get(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, LatestSnapshotVersion, snapshot.Version)
|
||||
assert.Equal(t, "/Repos/foo/bar", snapshot.RemotePath)
|
||||
assert.Equal(t, prj.WorkspacesClient().Config.Host, snapshot.Host)
|
||||
assert.Empty(t, snapshot.LastUpdatedTimes)
|
||||
assert.Empty(t, snapshot.RemoteToLocalNames)
|
||||
assert.Empty(t, snapshot.LocalToRemoteNames)
|
||||
}
|
||||
|
||||
func getEmptySnapshot() Snapshot {
|
||||
return Snapshot{
|
||||
LastUpdatedTimes: make(map[string]time.Time),
|
||||
LocalToRemoteNames: make(map[string]string),
|
||||
RemoteToLocalNames: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
func setupProject(t *testing.T) context.Context {
|
||||
projectDir := t.TempDir()
|
||||
ctx := context.TODO()
|
||||
t.Setenv("DATABRICKS_HOST", "www.foobar.com")
|
||||
ctx, err := project.Initialize(ctx, projectDir, "development")
|
||||
assert.NoError(t, err)
|
||||
return ctx
|
||||
}
|
||||
|
||||
func TestOldSnapshotInvalidation(t *testing.T) {
|
||||
oldVersionSnapshot := `{
|
||||
"version": "v0",
|
||||
"host": "www.foobar.com",
|
||||
"remote_path": "/Repos/foo/bar",
|
||||
"last_modified_times": {},
|
||||
"local_to_remote_names": {},
|
||||
"remote_to_local_names": {}
|
||||
}`
|
||||
ctx := setupProject(t)
|
||||
emptySnapshot := getEmptySnapshot()
|
||||
snapshotPath, err := emptySnapshot.getPath(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
snapshotFile := createFile(t, snapshotPath)
|
||||
snapshotFile.overwrite(t, oldVersionSnapshot)
|
||||
snapshotFile.close(t)
|
||||
|
||||
assert.FileExists(t, snapshotPath)
|
||||
snapshot := emptySnapshot
|
||||
err = snapshot.loadSnapshot(ctx)
|
||||
assert.NoError(t, err)
|
||||
// assert snapshot did not get loaded
|
||||
assert.Equal(t, emptySnapshot, snapshot)
|
||||
}
|
||||
|
||||
func TestNoVersionSnapshotInvalidation(t *testing.T) {
|
||||
noVersionSnapshot := `{
|
||||
"host": "www.foobar.com",
|
||||
"remote_path": "/Repos/foo/bar",
|
||||
"last_modified_times": {},
|
||||
"local_to_remote_names": {},
|
||||
"remote_to_local_names": {}
|
||||
}`
|
||||
ctx := setupProject(t)
|
||||
emptySnapshot := getEmptySnapshot()
|
||||
snapshotPath, err := emptySnapshot.getPath(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
snapshotFile := createFile(t, snapshotPath)
|
||||
snapshotFile.overwrite(t, noVersionSnapshot)
|
||||
snapshotFile.close(t)
|
||||
|
||||
assert.FileExists(t, snapshotPath)
|
||||
snapshot := emptySnapshot
|
||||
err = snapshot.loadSnapshot(ctx)
|
||||
assert.NoError(t, err)
|
||||
// assert snapshot did not get loaded
|
||||
assert.Equal(t, emptySnapshot, snapshot)
|
||||
}
|
||||
|
||||
func TestLatestVersionSnapshotGetsLoaded(t *testing.T) {
|
||||
latestVersionSnapshot := fmt.Sprintf(`{
|
||||
"version": "%s",
|
||||
"host": "www.foobar.com",
|
||||
"remote_path": "/Repos/foo/bar",
|
||||
"last_modified_times": {},
|
||||
"local_to_remote_names": {},
|
||||
"remote_to_local_names": {}
|
||||
}`, LatestSnapshotVersion)
|
||||
|
||||
ctx := setupProject(t)
|
||||
emptySnapshot := getEmptySnapshot()
|
||||
snapshotPath, err := emptySnapshot.getPath(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
snapshotFile := createFile(t, snapshotPath)
|
||||
snapshotFile.overwrite(t, latestVersionSnapshot)
|
||||
snapshotFile.close(t)
|
||||
|
||||
assert.FileExists(t, snapshotPath)
|
||||
snapshot := emptySnapshot
|
||||
err = snapshot.loadSnapshot(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// assert snapshot gets loaded
|
||||
assert.NotEqual(t, emptySnapshot, snapshot)
|
||||
assert.Equal(t, LatestSnapshotVersion, snapshot.Version)
|
||||
assert.Equal(t, "www.foobar.com", snapshot.Host)
|
||||
assert.Equal(t, "/Repos/foo/bar", snapshot.RemotePath)
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/databricks/bricks/cmd/root"
|
||||
"github.com/databricks/bricks/cmd/sync/repofiles"
|
||||
"github.com/databricks/bricks/git"
|
||||
"github.com/databricks/bricks/project"
|
||||
"github.com/spf13/cobra"
|
||||
|
@ -44,8 +45,9 @@ var syncCmd = &cobra.Command{
|
|||
}
|
||||
|
||||
root := prj.Root()
|
||||
syncCallback := getRemoteSyncCallback(ctx, root, *remotePath, wsc)
|
||||
err = spawnSyncRoutine(ctx, *interval, syncCallback, *remotePath)
|
||||
repoFiles := repofiles.Create(*remotePath, root, wsc)
|
||||
syncCallback := syncCallback(ctx, repoFiles)
|
||||
err = spawnWatchdog(ctx, *interval, syncCallback, *remotePath)
|
||||
return err
|
||||
},
|
||||
}
|
||||
|
|
|
@ -2,22 +2,12 @@ package sync
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/databricks/bricks/cmd/sync/repofiles"
|
||||
"github.com/databricks/bricks/project"
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/client"
|
||||
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
|
@ -32,56 +22,8 @@ type watchdog struct {
|
|||
// rate limits
|
||||
const MaxRequestsInFlight = 20
|
||||
|
||||
// path: The local path of the file in the local file system
|
||||
//
|
||||
// The API calls for a python script foo.py would be
|
||||
// `PUT foo.py`
|
||||
// `DELETE foo.py`
|
||||
//
|
||||
// The API calls for a python notebook foo.py would be
|
||||
// `PUT foo.py`
|
||||
// `DELETE foo`
|
||||
//
|
||||
// The workspace file system backend strips .py from the file name if the python
|
||||
// file is a notebook
|
||||
func putFile(ctx context.Context, remotePath string, content io.Reader) error {
|
||||
wsc := project.Get(ctx).WorkspacesClient()
|
||||
// workspace mkdirs is idempotent
|
||||
err := wsc.Workspace.MkdirsByPath(ctx, path.Dir(remotePath))
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not mkdir to put file: %s", err)
|
||||
}
|
||||
apiClient, err := client.New(wsc.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
apiPath := fmt.Sprintf(
|
||||
"/api/2.0/workspace-files/import-file/%s?overwrite=true",
|
||||
strings.TrimLeft(remotePath, "/"))
|
||||
return apiClient.Do(ctx, http.MethodPost, apiPath, content, nil)
|
||||
}
|
||||
|
||||
// path: The remote path of the file in the workspace
|
||||
func deleteFile(ctx context.Context, path string, w *databricks.WorkspaceClient) error {
|
||||
err := w.Workspace.Delete(ctx,
|
||||
workspace.Delete{
|
||||
Path: path,
|
||||
Recursive: false,
|
||||
},
|
||||
)
|
||||
// We explictly ignore RESOURCE_DOES_NOT_EXIST errors for deletion of files
|
||||
// This makes deletion operation idempotent and allows us to not crash syncing on
|
||||
// edge cases for eg: this api fails to delete notebooks, and returns a
|
||||
// RESOURCE_DOES_NOT_EXIST error instead
|
||||
if val, ok := err.(apierr.APIError); ok && val.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, w *databricks.WorkspaceClient) func(localDiff diff) error {
|
||||
func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(localDiff diff) error {
|
||||
return func(d diff) error {
|
||||
|
||||
// Abstraction over wait groups which allows you to get the errors
|
||||
// returned in goroutines
|
||||
var g errgroup.Group
|
||||
|
@ -96,7 +38,7 @@ func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, w *datab
|
|||
// is evaluated
|
||||
remoteNameCopy := remoteName
|
||||
g.Go(func() error {
|
||||
err := deleteFile(ctx, path.Join(remoteDir, remoteNameCopy), w)
|
||||
err := repoFiles.DeleteFile(ctx, remoteNameCopy)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -104,23 +46,15 @@ func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, w *datab
|
|||
return nil
|
||||
})
|
||||
}
|
||||
for _, localName := range d.put {
|
||||
for _, localRelativePath := range d.put {
|
||||
// Copy of localName created to make this safe for concurrent use.
|
||||
localNameCopy := localName
|
||||
localRelativePathCopy := localRelativePath
|
||||
g.Go(func() error {
|
||||
f, err := os.Open(filepath.Join(root, localNameCopy))
|
||||
err := repoFiles.PutFile(ctx, localRelativePathCopy)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = putFile(ctx, path.Join(remoteDir, localNameCopy), f)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to upload file: %s", err)
|
||||
}
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("[INFO] Uploaded %s", localNameCopy)
|
||||
log.Printf("[INFO] Uploaded %s", localRelativePathCopy)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
@ -133,7 +67,7 @@ func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, w *datab
|
|||
}
|
||||
}
|
||||
|
||||
func spawnSyncRoutine(ctx context.Context,
|
||||
func spawnWatchdog(ctx context.Context,
|
||||
interval time.Duration,
|
||||
applyDiff func(diff) error,
|
||||
remotePath string) error {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -18,6 +19,10 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// TODO: these tests are bloated, refactor these, and make write down tests for
|
||||
// all edge cases with interop between files, directory and notebooks during syncing
|
||||
// https://databricks.atlassian.net/browse/DECO-416
|
||||
|
||||
// This test needs auth env vars to run.
|
||||
// Please run using the deco env test or deco env shell
|
||||
func TestAccFullSync(t *testing.T) {
|
||||
|
@ -266,4 +271,88 @@ func TestAccIncrementalSync(t *testing.T) {
|
|||
assert.Contains(t, files3, "amsterdam.txt")
|
||||
assert.Contains(t, files3, ".gitignore")
|
||||
assertSnapshotContents(t, wsc.Config.Host, repoPath, projectDir, []string{"amsterdam.txt", ".gitignore"})
|
||||
|
||||
// new file in dir upload assertion
|
||||
fooPath := filepath.Join(projectDir, "bar/foo.txt")
|
||||
err = os.MkdirAll(filepath.Dir(fooPath), os.ModePerm)
|
||||
assert.NoError(t, err)
|
||||
f, err = os.Create(fooPath)
|
||||
assert.NoError(t, err)
|
||||
defer f.Close()
|
||||
assert.Eventually(t, func() bool {
|
||||
objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
|
||||
Path: repoPath,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
return len(objects) == 3
|
||||
}, 30*time.Second, 5*time.Second)
|
||||
objects, err = wsc.Workspace.ListAll(ctx, workspace.List{
|
||||
Path: repoPath,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
var files4 []string
|
||||
for _, v := range objects {
|
||||
files4 = append(files4, filepath.Base(v.Path))
|
||||
}
|
||||
assert.Len(t, files4, 3)
|
||||
assert.Contains(t, files4, "amsterdam.txt")
|
||||
assert.Contains(t, files4, ".gitignore")
|
||||
assert.Contains(t, files4, "bar")
|
||||
assertSnapshotContents(t, wsc.Config.Host, repoPath, projectDir, []string{"amsterdam.txt", "bar/foo.txt", ".gitignore"})
|
||||
|
||||
// delete dir
|
||||
err = os.RemoveAll(filepath.Dir(fooPath))
|
||||
assert.NoError(t, err)
|
||||
assert.Eventually(t, func() bool {
|
||||
objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
|
||||
Path: repoPath,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
return len(objects) == 3
|
||||
}, 30*time.Second, 5*time.Second)
|
||||
objects, err = wsc.Workspace.ListAll(ctx, workspace.List{
|
||||
Path: repoPath,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
var files5 []string
|
||||
for _, v := range objects {
|
||||
files5 = append(files5, filepath.Base(v.Path))
|
||||
if strings.Contains(v.Path, "bar") {
|
||||
assert.Equal(t, workspace.ObjectType("DIRECTORY"), v.ObjectType)
|
||||
}
|
||||
}
|
||||
assert.Len(t, files5, 3)
|
||||
assert.Contains(t, files5, "bar")
|
||||
assert.Contains(t, files5, "amsterdam.txt")
|
||||
assert.Contains(t, files5, ".gitignore")
|
||||
// workspace still contains `bar` directory but it has been deleted from snapshot
|
||||
assertSnapshotContents(t, wsc.Config.Host, repoPath, projectDir, []string{"amsterdam.txt", ".gitignore"})
|
||||
|
||||
// file called bar should overwrite the directory
|
||||
err = os.WriteFile(filepath.Join(projectDir, "bar"), []byte("Kal ho na ho is a cool movie"), os.ModePerm)
|
||||
assert.NoError(t, err)
|
||||
assert.Eventually(t, func() bool {
|
||||
objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
|
||||
Path: repoPath,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
return len(objects) == 3
|
||||
}, 30*time.Second, 5*time.Second)
|
||||
objects, err = wsc.Workspace.ListAll(ctx, workspace.List{
|
||||
Path: repoPath,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
var files6 []string
|
||||
for _, v := range objects {
|
||||
files6 = append(files6, filepath.Base(v.Path))
|
||||
if strings.Contains(v.Path, "bar") {
|
||||
assert.Equal(t, workspace.ObjectType("FILE"), v.ObjectType)
|
||||
}
|
||||
}
|
||||
assert.Len(t, files6, 3)
|
||||
assert.Contains(t, files6, "amsterdam.txt")
|
||||
assert.Contains(t, files6, ".gitignore")
|
||||
// workspace still contains `bar` directory but it has been deleted from snapshot
|
||||
assert.Contains(t, files6, "bar")
|
||||
assertSnapshotContents(t, wsc.Config.Host, repoPath, projectDir, []string{"amsterdam.txt", "bar", ".gitignore"})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue