diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go index 6ea9f6bd..cfbf762f 100644 --- a/libs/sync/repofiles/repofiles.go +++ b/libs/sync/repofiles/repofiles.go @@ -1,18 +1,19 @@ package repofiles import ( - "bytes" "context" "errors" "fmt" + "net/http" + "net/url" "os" "path" "path/filepath" "strings" - "github.com/databricks/cli/libs/filer" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/service/workspace" ) @@ -21,32 +22,21 @@ type RepoFileOptions struct { } // RepoFiles wraps reading and writing into a remote repo with safeguards to prevent -// accidental deletion of repos and more robust methods to overwrite workspac e files +// accidental deletion of repos and more robust methods to overwrite workspace files type RepoFiles struct { *RepoFileOptions repoRoot string localRoot string workspaceClient *databricks.WorkspaceClient - f filer.Filer } -func Create(repoRoot, localRoot string, w *databricks.WorkspaceClient, opts *RepoFileOptions) (*RepoFiles, error) { - // override default timeout to support uploading larger files - w.Config.HTTPTimeoutSeconds = 600 - - // create filer to interact with WSFS - f, err := filer.NewWorkspaceFilesClient(w, repoRoot) - if err != nil { - return nil, err - } +func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles { return &RepoFiles{ repoRoot: repoRoot, localRoot: localRoot, - workspaceClient: w, - RepoFileOptions: opts, - f: f, - }, nil + workspaceClient: workspaceClient, + } } func (r *RepoFiles) remotePath(relativePath string) (string, error) { @@ -68,25 +58,36 @@ func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) { } func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error { - if !r.OverwriteIfExists { - return r.f.Write(ctx, relativePath, bytes.NewReader(content), filer.CreateParentDirectories) + apiClientConfig := r.workspaceClient.Config + apiClientConfig.HTTPTimeoutSeconds = 600 + apiClient, err := client.New(apiClientConfig) + if err != nil { + return err } + remotePath, err := r.remotePath(relativePath) + if err != nil { + return err + } + escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) + apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=%t", escapedPath, r.OverwriteIfExists) - err := r.f.Write(ctx, relativePath, bytes.NewReader(content), filer.CreateParentDirectories, filer.OverwriteIfExists) + err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - // TODO(pietern): Use the new FS interface to avoid needing to make a recursive - // delete call here. This call is dangerous + // Handling some edge cases when an upload might fail + // + // We cannot do more precise error scoping here because the API does not + // provide descriptive errors yet + // + // TODO: narrow down the error condition scope of this "if" block to only + // trigger for the specific edge cases instead of all errors once the API + // implements them if err != nil { // Delete any artifact files incase non overwriteable by the current file // type and thus are failing the PUT request. // files, folders and notebooks might not have been cleaned up and they // can't overwrite each other. If a folder `foo` exists, then attempts to // PUT a file `foo` will fail - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - err = r.workspaceClient.Workspace.Delete(ctx, + err := r.workspaceClient.Workspace.Delete(ctx, workspace.Delete{ Path: remotePath, Recursive: true, @@ -101,15 +102,33 @@ func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, conten return err } - // Attempt to write the file again, this time without the CreateParentDirectories and - // OverwriteIfExists flags - return r.f.Write(ctx, relativePath, bytes.NewReader(content)) + // Mkdir parent dirs incase they are what's causing the PUT request to + // fail + err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) + if err != nil { + return fmt.Errorf("could not mkdir to put file: %s", err) + } + + // Attempt to upload file again after cleanup/setup + err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) + if err != nil { + return err + } } return nil } func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { - return r.f.Delete(ctx, relativePath) + remotePath, err := r.remotePath(relativePath) + if err != nil { + return err + } + return r.workspaceClient.Workspace.Delete(ctx, + workspace.Delete{ + Path: remotePath, + Recursive: false, + }, + ) } // The API calls for a python script foo.py would be @@ -141,3 +160,6 @@ func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error { } return nil } + +// TODO: write integration tests for all non happy path cases that rely on +// specific behaviour of the workspace apis diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go index dc9abbcd..2a881d90 100644 --- a/libs/sync/repofiles/repofiles_test.go +++ b/libs/sync/repofiles/repofiles_test.go @@ -6,13 +6,11 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestRepoFilesRemotePath(t *testing.T) { repoRoot := "/Repos/doraemon/bar" - repoFiles, err := Create(repoRoot, "/doraemon/foo/bar", nil, nil) - require.NoError(t, err) + repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil) remotePath, err := repoFiles.remotePath("a/b/c") assert.NoError(t, err) @@ -83,8 +81,7 @@ func TestRepoReadLocal(t *testing.T) { err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) assert.NoError(t, err) - repoFiles, err := Create("/Repos/doraemon/bar", tempDir, nil, nil) - require.NoError(t, err) + repoFiles := Create("/Repos/doraemon/bar", tempDir, nil) bytes, err := repoFiles.readLocal("./a/../hello.txt") assert.NoError(t, err) assert.Equal(t, "my name is doraemon :P", string(bytes))