revert changes to reposfile

This commit is contained in:
Shreyas Goenka 2023-06-06 02:00:30 +02:00
parent 764d9e93e7
commit 8166aed617
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
2 changed files with 55 additions and 36 deletions

View File

@ -1,18 +1,19 @@
package repofiles package repofiles
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"net/http"
"net/url"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/service/workspace" "github.com/databricks/databricks-sdk-go/service/workspace"
) )
@ -28,25 +29,14 @@ type RepoFiles struct {
repoRoot string repoRoot string
localRoot string localRoot string
workspaceClient *databricks.WorkspaceClient workspaceClient *databricks.WorkspaceClient
f filer.Filer
} }
func Create(repoRoot, localRoot string, w *databricks.WorkspaceClient, opts *RepoFileOptions) (*RepoFiles, error) { func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles {
// override default timeout to support uploading larger files
w.Config.HTTPTimeoutSeconds = 600
// create filer to interact with WSFS
f, err := filer.NewWorkspaceFilesClient(w, repoRoot)
if err != nil {
return nil, err
}
return &RepoFiles{ return &RepoFiles{
repoRoot: repoRoot, repoRoot: repoRoot,
localRoot: localRoot, localRoot: localRoot,
workspaceClient: w, workspaceClient: workspaceClient,
RepoFileOptions: opts, }
f: f,
}, nil
} }
func (r *RepoFiles) remotePath(relativePath string) (string, error) { func (r *RepoFiles) remotePath(relativePath string) (string, error) {
@ -68,25 +58,36 @@ func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) {
} }
func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error { func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error {
if !r.OverwriteIfExists { apiClientConfig := r.workspaceClient.Config
return r.f.Write(ctx, relativePath, bytes.NewReader(content), filer.CreateParentDirectories) apiClientConfig.HTTPTimeoutSeconds = 600
apiClient, err := client.New(apiClientConfig)
if err != nil {
return err
} }
remotePath, err := r.remotePath(relativePath)
if err != nil {
return err
}
escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/"))
apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=%t", escapedPath, r.OverwriteIfExists)
err := r.f.Write(ctx, relativePath, bytes.NewReader(content), filer.CreateParentDirectories, filer.OverwriteIfExists) err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil)
// TODO(pietern): Use the new FS interface to avoid needing to make a recursive // Handling some edge cases when an upload might fail
// delete call here. This call is dangerous //
// We cannot do more precise error scoping here because the API does not
// provide descriptive errors yet
//
// TODO: narrow down the error condition scope of this "if" block to only
// trigger for the specific edge cases instead of all errors once the API
// implements them
if err != nil { if err != nil {
// Delete any artifact files incase non overwriteable by the current file // Delete any artifact files incase non overwriteable by the current file
// type and thus are failing the PUT request. // type and thus are failing the PUT request.
// files, folders and notebooks might not have been cleaned up and they // files, folders and notebooks might not have been cleaned up and they
// can't overwrite each other. If a folder `foo` exists, then attempts to // can't overwrite each other. If a folder `foo` exists, then attempts to
// PUT a file `foo` will fail // PUT a file `foo` will fail
remotePath, err := r.remotePath(relativePath) err := r.workspaceClient.Workspace.Delete(ctx,
if err != nil {
return err
}
err = r.workspaceClient.Workspace.Delete(ctx,
workspace.Delete{ workspace.Delete{
Path: remotePath, Path: remotePath,
Recursive: true, Recursive: true,
@ -101,15 +102,33 @@ func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, conten
return err return err
} }
// Attempt to write the file again, this time without the CreateParentDirectories and // Mkdir parent dirs incase they are what's causing the PUT request to
// OverwriteIfExists flags // fail
return r.f.Write(ctx, relativePath, bytes.NewReader(content)) err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath))
if err != nil {
return fmt.Errorf("could not mkdir to put file: %s", err)
}
// Attempt to upload file again after cleanup/setup
err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil)
if err != nil {
return err
}
} }
return nil return nil
} }
func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error {
return r.f.Delete(ctx, relativePath) remotePath, err := r.remotePath(relativePath)
if err != nil {
return err
}
return r.workspaceClient.Workspace.Delete(ctx,
workspace.Delete{
Path: remotePath,
Recursive: false,
},
)
} }
// The API calls for a python script foo.py would be // The API calls for a python script foo.py would be
@ -141,3 +160,6 @@ func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error {
} }
return nil return nil
} }
// TODO: write integration tests for all non happy path cases that rely on
// specific behaviour of the workspace apis

View File

@ -6,13 +6,11 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestRepoFilesRemotePath(t *testing.T) { func TestRepoFilesRemotePath(t *testing.T) {
repoRoot := "/Repos/doraemon/bar" repoRoot := "/Repos/doraemon/bar"
repoFiles, err := Create(repoRoot, "/doraemon/foo/bar", nil, nil) repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil)
require.NoError(t, err)
remotePath, err := repoFiles.remotePath("a/b/c") remotePath, err := repoFiles.remotePath("a/b/c")
assert.NoError(t, err) assert.NoError(t, err)
@ -83,8 +81,7 @@ func TestRepoReadLocal(t *testing.T) {
err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm)
assert.NoError(t, err) assert.NoError(t, err)
repoFiles, err := Create("/Repos/doraemon/bar", tempDir, nil, nil) repoFiles := Create("/Repos/doraemon/bar", tempDir, nil)
require.NoError(t, err)
bytes, err := repoFiles.readLocal("./a/../hello.txt") bytes, err := repoFiles.readLocal("./a/../hello.txt")
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, "my name is doraemon :P", string(bytes)) assert.Equal(t, "my name is doraemon :P", string(bytes))