diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go index 977102173..9a6428a16 100644 --- a/libs/sync/repofiles/repofiles.go +++ b/libs/sync/repofiles/repofiles.go @@ -1,19 +1,18 @@ package repofiles import ( + "bytes" "context" "errors" "fmt" - "net/http" - "net/url" "os" "path" "path/filepath" "strings" + "github.com/databricks/cli/libs/filer" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/service/workspace" ) @@ -22,22 +21,32 @@ type RepoFileOptions struct { } // RepoFiles wraps reading and writing into a remote repo with safeguards to prevent -// accidental deletion of repos and more robust methods to overwrite workspace files +// accidental deletion of repos and more robust methods to overwrite workspac e files type RepoFiles struct { *RepoFileOptions repoRoot string localRoot string workspaceClient *databricks.WorkspaceClient + f filer.Filer } -func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient, opts *RepoFileOptions) *RepoFiles { +func Create(repoRoot, localRoot string, w *databricks.WorkspaceClient, opts *RepoFileOptions) (*RepoFiles, error) { + // override default timeout to support uploading larger files + w.Config.HTTPTimeoutSeconds = 600 + + // create filer to interact with WSFS + f, err := filer.NewWorkspaceFilesClient(w, repoRoot) + if err != nil { + return nil, err + } return &RepoFiles{ repoRoot: repoRoot, localRoot: localRoot, - workspaceClient: workspaceClient, + workspaceClient: w, RepoFileOptions: opts, - } + f: f, + }, nil } func (r *RepoFiles) RemotePath(relativePath string) (string, error) { @@ -59,36 +68,25 @@ func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) { } func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error { - apiClientConfig := r.workspaceClient.Config - apiClientConfig.HTTPTimeoutSeconds = 600 - apiClient, err := client.New(apiClientConfig) - if err != nil { - return err + if !r.OverwriteIfExists { + return r.f.Write(ctx, relativePath, bytes.NewReader(content), filer.CreateParentDirectories) } - remotePath, err := r.RemotePath(relativePath) - if err != nil { - return err - } - escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) - apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=%t", escapedPath, r.OverwriteIfExists) - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) + err := r.f.Write(ctx, relativePath, bytes.NewReader(content), filer.CreateParentDirectories, filer.OverwriteIfExists) - // Handling some edge cases when an upload might fail - // - // We cannot do more precise error scoping here because the API does not - // provide descriptive errors yet - // - // TODO: narrow down the error condition scope of this "if" block to only - // trigger for the specific edge cases instead of all errors once the API - // implements them + // TODO(pietern): Use the new FS interface to avoid needing to make a recursive + // delete call here. This call is dangerous if err != nil { // Delete any artifact files incase non overwriteable by the current file // type and thus are failing the PUT request. // files, folders and notebooks might not have been cleaned up and they // can't overwrite each other. If a folder `foo` exists, then attempts to // PUT a file `foo` will fail - err := r.workspaceClient.Workspace.Delete(ctx, + remotePath, err := r.RemotePath(relativePath) + if err != nil { + return err + } + err = r.workspaceClient.Workspace.Delete(ctx, workspace.Delete{ Path: remotePath, Recursive: true, @@ -103,33 +101,15 @@ func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, conten return err } - // Mkdir parent dirs incase they are what's causing the PUT request to - // fail - err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) - if err != nil { - return fmt.Errorf("could not mkdir to put file: %s", err) - } - - // Attempt to upload file again after cleanup/setup - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - if err != nil { - return err - } + // Attempt to write the file again, this time without the CreateParentDirectories and + // OverwriteIfExists flags + return r.f.Write(ctx, relativePath, bytes.NewReader(content)) } return nil } func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { - remotePath, err := r.RemotePath(relativePath) - if err != nil { - return err - } - return r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: false, - }, - ) + return r.f.Delete(ctx, relativePath) } // The API calls for a python script foo.py would be @@ -161,6 +141,3 @@ func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error { } return nil } - -// TODO: write integration tests for all non happy path cases that rely on -// specific behaviour of the workspace apis diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go index e71f26abf..ce2a14c2c 100644 --- a/libs/sync/repofiles/repofiles_test.go +++ b/libs/sync/repofiles/repofiles_test.go @@ -6,11 +6,13 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestRepoFilesRemotePath(t *testing.T) { repoRoot := "/Repos/doraemon/bar" - repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil, nil) + repoFiles, err := Create(repoRoot, "/doraemon/foo/bar", nil, nil) + require.NoError(t, err) remotePath, err := repoFiles.RemotePath("a/b/c") assert.NoError(t, err) @@ -81,7 +83,8 @@ func TestRepoReadLocal(t *testing.T) { err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) assert.NoError(t, err) - repoFiles := Create("/Repos/doraemon/bar", tempDir, nil, nil) + repoFiles, err := Create("/Repos/doraemon/bar", tempDir, nil, nil) + require.NoError(t, err) bytes, err := repoFiles.readLocal("./a/../hello.txt") assert.NoError(t, err) assert.Equal(t, "my name is doraemon :P", string(bytes))