databricks-cli/libs/filer/workspace_files_client.go

333 lines
8.1 KiB
Go
Raw Normal View History

package filer
import (
"bytes"
"context"
2023-02-20 15:00:20 +00:00
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"net/url"
"path"
"regexp"
Upgraded Go version to 1.21 (#664) ## Changes Upgraded Go version to 1.21 Upgraded to use `slices` and `slog` from core instead of experimental. Still use `exp/maps` as our code relies on `maps.Keys` which is not part of core package and therefore refactoring required. ### Tests Integration tests passed ``` [DEBUG] Test execution command: /opt/homebrew/opt/go@1.21/bin/go test ./... -json -timeout 1h -run ^TestAcc [DEBUG] Test execution directory: /Users/andrew.nester/cli 2023/08/15 13:20:51 [INFO] ✅ TestAccAlertsCreateErrWhenNoArguments (2.150s) 2023/08/15 13:20:52 [INFO] ✅ TestAccApiGet (0.580s) 2023/08/15 13:20:53 [INFO] ✅ TestAccClustersList (0.900s) 2023/08/15 13:20:54 [INFO] ✅ TestAccClustersGet (0.870s) 2023/08/15 13:21:06 [INFO] ✅ TestAccFilerWorkspaceFilesReadWrite (11.980s) 2023/08/15 13:21:13 [INFO] ✅ TestAccFilerWorkspaceFilesReadDir (7.060s) 2023/08/15 13:21:25 [INFO] ✅ TestAccFilerDbfsReadWrite (12.810s) 2023/08/15 13:21:33 [INFO] ✅ TestAccFilerDbfsReadDir (7.380s) 2023/08/15 13:21:41 [INFO] ✅ TestAccFilerWorkspaceNotebookConflict (7.760s) 2023/08/15 13:21:49 [INFO] ✅ TestAccFilerWorkspaceNotebookWithOverwriteFlag (8.660s) 2023/08/15 13:21:49 [INFO] ✅ TestAccFilerLocalReadWrite (0.020s) 2023/08/15 13:21:49 [INFO] ✅ TestAccFilerLocalReadDir (0.010s) 2023/08/15 13:21:52 [INFO] ✅ TestAccFsCatForDbfs (3.190s) 2023/08/15 13:21:53 [INFO] ✅ TestAccFsCatForDbfsOnNonExistentFile (0.890s) 2023/08/15 13:21:54 [INFO] ✅ TestAccFsCatForDbfsInvalidScheme (0.600s) 2023/08/15 13:21:57 [INFO] ✅ TestAccFsCatDoesNotSupportOutputModeJson (2.960s) 2023/08/15 13:22:28 [INFO] ✅ TestAccFsCpDir (31.480s) 2023/08/15 13:22:43 [INFO] ✅ TestAccFsCpFileToFile (14.530s) 2023/08/15 13:22:58 [INFO] ✅ TestAccFsCpFileToDir (14.610s) 2023/08/15 13:23:29 [INFO] ✅ TestAccFsCpDirToDirFileNotOverwritten (31.810s) 2023/08/15 13:23:47 [INFO] ✅ TestAccFsCpFileToDirFileNotOverwritten (17.500s) 2023/08/15 13:24:04 [INFO] ✅ TestAccFsCpFileToFileFileNotOverwritten (17.260s) 2023/08/15 13:24:37 [INFO] ✅ TestAccFsCpDirToDirWithOverwriteFlag (32.690s) 2023/08/15 13:24:56 [INFO] ✅ TestAccFsCpFileToFileWithOverwriteFlag (19.290s) 2023/08/15 13:25:15 [INFO] ✅ TestAccFsCpFileToDirWithOverwriteFlag (19.230s) 2023/08/15 13:25:17 [INFO] ✅ TestAccFsCpErrorsWhenSourceIsDirWithoutRecursiveFlag (2.010s) 2023/08/15 13:25:18 [INFO] ✅ TestAccFsCpErrorsOnInvalidScheme (0.610s) 2023/08/15 13:25:33 [INFO] ✅ TestAccFsCpSourceIsDirectoryButTargetIsFile (14.900s) 2023/08/15 13:25:37 [INFO] ✅ TestAccFsLsForDbfs (3.770s) 2023/08/15 13:25:41 [INFO] ✅ TestAccFsLsForDbfsWithAbsolutePaths (4.160s) 2023/08/15 13:25:44 [INFO] ✅ TestAccFsLsForDbfsOnFile (2.990s) 2023/08/15 13:25:46 [INFO] ✅ TestAccFsLsForDbfsOnEmptyDir (1.870s) 2023/08/15 13:25:46 [INFO] ✅ TestAccFsLsForDbfsForNonexistingDir (0.850s) 2023/08/15 13:25:47 [INFO] ✅ TestAccFsLsWithoutScheme (0.560s) 2023/08/15 13:25:49 [INFO] ✅ TestAccFsMkdirCreatesDirectory (2.310s) 2023/08/15 13:25:52 [INFO] ✅ TestAccFsMkdirCreatesMultipleDirectories (2.920s) 2023/08/15 13:25:55 [INFO] ✅ TestAccFsMkdirWhenDirectoryAlreadyExists (2.320s) 2023/08/15 13:25:57 [INFO] ✅ TestAccFsMkdirWhenFileExistsAtPath (2.820s) 2023/08/15 13:26:01 [INFO] ✅ TestAccFsRmForFile (4.030s) 2023/08/15 13:26:05 [INFO] ✅ TestAccFsRmForEmptyDirectory (3.530s) 2023/08/15 13:26:08 [INFO] ✅ TestAccFsRmForNonEmptyDirectory (3.190s) 2023/08/15 13:26:09 [INFO] ✅ TestAccFsRmForNonExistentFile (0.830s) 2023/08/15 13:26:13 [INFO] ✅ TestAccFsRmForNonEmptyDirectoryWithRecursiveFlag (3.580s) 2023/08/15 13:26:13 [INFO] ✅ TestAccGitClone (0.800s) 2023/08/15 13:26:14 [INFO] ✅ TestAccGitCloneWithOnlyRepoNameOnAlternateBranch (0.790s) 2023/08/15 13:26:15 [INFO] ✅ TestAccGitCloneErrorsWhenRepositoryDoesNotExist (0.540s) 2023/08/15 13:26:23 [INFO] ✅ TestAccLock (8.630s) 2023/08/15 13:26:27 [INFO] ✅ TestAccLockUnlockWithoutAllowsLockFileNotExist (3.490s) 2023/08/15 13:26:30 [INFO] ✅ TestAccLockUnlockWithAllowsLockFileNotExist (3.130s) 2023/08/15 13:26:39 [INFO] ✅ TestAccSyncFullFileSync (9.370s) 2023/08/15 13:26:50 [INFO] ✅ TestAccSyncIncrementalFileSync (10.390s) 2023/08/15 13:27:00 [INFO] ✅ TestAccSyncNestedFolderSync (10.680s) 2023/08/15 13:27:11 [INFO] ✅ TestAccSyncNestedFolderDoesntFailOnNonEmptyDirectory (10.970s) 2023/08/15 13:27:22 [INFO] ✅ TestAccSyncNestedSpacePlusAndHashAreEscapedSync (10.930s) 2023/08/15 13:27:29 [INFO] ✅ TestAccSyncIncrementalFileOverwritesFolder (7.020s) 2023/08/15 13:27:37 [INFO] ✅ TestAccSyncIncrementalSyncPythonNotebookToFile (7.380s) 2023/08/15 13:27:43 [INFO] ✅ TestAccSyncIncrementalSyncFileToPythonNotebook (6.050s) 2023/08/15 13:27:48 [INFO] ✅ TestAccSyncIncrementalSyncPythonNotebookDelete (5.390s) 2023/08/15 13:27:51 [INFO] ✅ TestAccSyncEnsureRemotePathIsUsableIfRepoDoesntExist (2.570s) 2023/08/15 13:27:56 [INFO] ✅ TestAccSyncEnsureRemotePathIsUsableIfRepoExists (5.540s) 2023/08/15 13:27:58 [INFO] ✅ TestAccSyncEnsureRemotePathIsUsableInWorkspace (1.840s) 2023/08/15 13:27:59 [INFO] ✅ TestAccWorkspaceList (0.790s) 2023/08/15 13:28:08 [INFO] ✅ TestAccExportDir (8.860s) 2023/08/15 13:28:11 [INFO] ✅ TestAccExportDirDoesNotOverwrite (3.090s) 2023/08/15 13:28:14 [INFO] ✅ TestAccExportDirWithOverwriteFlag (3.500s) 2023/08/15 13:28:23 [INFO] ✅ TestAccImportDir (8.330s) 2023/08/15 13:28:34 [INFO] ✅ TestAccImportDirDoesNotOverwrite (10.970s) 2023/08/15 13:28:44 [INFO] ✅ TestAccImportDirWithOverwriteFlag (10.130s) 2023/08/15 13:28:44 [INFO] ✅ 68/68 passed, 0 failed, 3 skipped ```
2023-08-15 13:50:40 +00:00
"slices"
"sort"
"strings"
"time"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/schollz/progressbar/v3"
)
// Type that implements fs.DirEntry for WSFS.
type wsfsDirEntry struct {
wsfsFileInfo
}
func (entry wsfsDirEntry) Type() fs.FileMode {
return entry.wsfsFileInfo.Mode()
}
func (entry wsfsDirEntry) Info() (fs.FileInfo, error) {
return entry.wsfsFileInfo, nil
}
// Type that implements fs.FileInfo for WSFS.
type wsfsFileInfo struct {
oi workspace.ObjectInfo
}
func (info wsfsFileInfo) Name() string {
return path.Base(info.oi.Path)
}
func (info wsfsFileInfo) Size() int64 {
return info.oi.Size
}
func (info wsfsFileInfo) Mode() fs.FileMode {
switch info.oi.ObjectType {
case workspace.ObjectTypeDirectory:
return fs.ModeDir
default:
return fs.ModePerm
}
}
func (info wsfsFileInfo) ModTime() time.Time {
return time.UnixMilli(info.oi.ModifiedAt)
}
func (info wsfsFileInfo) IsDir() bool {
return info.oi.ObjectType == workspace.ObjectTypeDirectory
}
func (info wsfsFileInfo) Sys() any {
return info.oi
}
// WorkspaceFilesClient implements the files-in-workspace API.
// NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled.
// It can access any workspace path if files-in-workspace is enabled.
type WorkspaceFilesClient struct {
workspaceClient *databricks.WorkspaceClient
apiClient *client.DatabricksClient
// File operations will be relative to this path.
root WorkspaceRootPath
bar *progressbar.ProgressBar
}
func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
apiClient, err := client.New(w.Config)
if err != nil {
return nil, err
}
return &WorkspaceFilesClient{
workspaceClient: w,
apiClient: apiClient,
root: NewWorkspaceRootPath(root),
}, nil
}
func NewWorkspaceFilesClientWithProgressLogging(w *databricks.WorkspaceClient, root string) (Filer, error) {
apiClient, err := client.New(w.Config)
if err != nil {
return nil, err
}
return &WorkspaceFilesClient{
workspaceClient: w,
apiClient: apiClient,
root: NewWorkspaceRootPath(root),
bar: progressbar.DefaultBytes(100),
}, nil
}
func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, size int64, mode ...WriteMode) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}
// Remove leading "/" so we can use it in the URL.
overwrite := slices.Contains(mode, OverwriteIfExists)
urlPath := fmt.Sprintf(
"/api/2.0/workspace-files/import-file/%s?overwrite=%t",
url.PathEscape(strings.TrimLeft(absPath, "/")),
overwrite,
)
// Buffer the file contents because we may need to retry below and we cannot read twice.
body, err := io.ReadAll(reader)
if err != nil {
return err
}
var r io.Reader = bytes.NewBuffer(body)
if w.bar != nil {
w.bar.ChangeMax64(size)
reader := progressbar.NewReader(r, w.bar)
r = &reader
}
err = w.apiClient.Do(ctx, http.MethodPost, urlPath, nil, r, nil)
// Return early on success.
if err == nil {
return nil
}
// Special handling of this error only if it is an API error.
2023-02-20 15:00:20 +00:00
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}
// This API returns a 404 if the parent directory does not exist.
if aerr.StatusCode == http.StatusNotFound {
if !slices.Contains(mode, CreateParentDirectories) {
return NoSuchDirectoryError{path.Dir(absPath)}
}
// Create parent directory.
err = w.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(absPath))
if err != nil {
return fmt.Errorf("unable to mkdir to write file %s: %w", absPath, err)
}
// Retry without CreateParentDirectories mode flag.
return w.Write(ctx, name, bytes.NewReader(body), int64(len(body)), sliceWithout(mode, CreateParentDirectories)...)
}
// This API returns 409 if the file already exists, when the object type is file
if aerr.StatusCode == http.StatusConflict {
return FileAlreadyExistsError{absPath}
}
// This API returns 400 if the file already exists, when the object type is notebook
regex := regexp.MustCompile(`Path \((.*)\) already exists.`)
if aerr.StatusCode == http.StatusBadRequest && regex.Match([]byte(aerr.Message)) {
// Parse file path from regex capture group
matches := regex.FindStringSubmatch(aerr.Message)
if len(matches) == 2 {
return FileAlreadyExistsError{matches[1]}
}
// Default to path specified to filer.Write if regex capture fails
return FileAlreadyExistsError{absPath}
}
return err
}
func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {
absPath, err := w.root.Join(name)
if err != nil {
return nil, err
}
// This stat call serves two purposes:
// 1. Checks file at path exists, and throws an error if it does not
// 2. Allows us to error out if the path is a directory. This is needed
// because the /workspace/export API does not error out, and returns the directory
// as a DBC archive even if format "SOURCE" is specified
stat, err := w.Stat(ctx, name)
if err != nil {
return nil, err
}
if stat.IsDir() {
return nil, NotAFile{absPath}
}
// Export file contents. Note the /workspace/export API has a limit of 10MBs
// for the file size
return w.workspaceClient.Workspace.Download(ctx, absPath)
}
func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}
// Illegal to delete the root path.
if absPath == w.root.rootPath {
return CannotDeleteRootError{}
}
recursive := false
if slices.Contains(mode, DeleteRecursively) {
recursive = true
}
err = w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{
Path: absPath,
Recursive: recursive,
})
// Return early on success.
if err == nil {
return nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}
switch aerr.StatusCode {
case http.StatusBadRequest:
if aerr.ErrorCode == "DIRECTORY_NOT_EMPTY" {
return DirectoryNotEmptyError{absPath}
}
case http.StatusNotFound:
return FileDoesNotExistError{absPath}
}
return err
}
func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
absPath, err := w.root.Join(name)
if err != nil {
return nil, err
}
objects, err := w.workspaceClient.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{
Path: absPath,
})
if len(objects) == 1 && objects[0].Path == absPath {
return nil, NotADirectory{absPath}
}
if err != nil {
// If we got an API error we deal with it below.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}
// This API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
return nil, NoSuchDirectoryError{path.Dir(absPath)}
}
return nil, err
}
info := make([]fs.DirEntry, len(objects))
for i, v := range objects {
info[i] = wsfsDirEntry{wsfsFileInfo{oi: v}}
}
// Sort by name for parity with os.ReadDir.
sort.Slice(info, func(i, j int) bool { return info[i].Name() < info[j].Name() })
return info, nil
}
func (w *WorkspaceFilesClient) Mkdir(ctx context.Context, name string) error {
dirPath, err := w.root.Join(name)
if err != nil {
return err
}
return w.workspaceClient.Workspace.Mkdirs(ctx, workspace.Mkdirs{
Path: dirPath,
})
}
func (w *WorkspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
absPath, err := w.root.Join(name)
if err != nil {
return nil, err
}
info, err := w.workspaceClient.Workspace.GetStatusByPath(ctx, absPath)
if err != nil {
// If we got an API error we deal with it below.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}
// This API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
return nil, FileDoesNotExistError{absPath}
}
}
return wsfsFileInfo{*info}, nil
}