databricks-cli/libs/filer/files_client.go

508 lines
12 KiB
Go
Raw Permalink Normal View History

package filer
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"net/url"
"path"
Upgraded Go version to 1.21 (#664) ## Changes Upgraded Go version to 1.21 Upgraded to use `slices` and `slog` from core instead of experimental. Still use `exp/maps` as our code relies on `maps.Keys` which is not part of core package and therefore refactoring required. ### Tests Integration tests passed ``` [DEBUG] Test execution command: /opt/homebrew/opt/go@1.21/bin/go test ./... -json -timeout 1h -run ^TestAcc [DEBUG] Test execution directory: /Users/andrew.nester/cli 2023/08/15 13:20:51 [INFO] ✅ TestAccAlertsCreateErrWhenNoArguments (2.150s) 2023/08/15 13:20:52 [INFO] ✅ TestAccApiGet (0.580s) 2023/08/15 13:20:53 [INFO] ✅ TestAccClustersList (0.900s) 2023/08/15 13:20:54 [INFO] ✅ TestAccClustersGet (0.870s) 2023/08/15 13:21:06 [INFO] ✅ TestAccFilerWorkspaceFilesReadWrite (11.980s) 2023/08/15 13:21:13 [INFO] ✅ TestAccFilerWorkspaceFilesReadDir (7.060s) 2023/08/15 13:21:25 [INFO] ✅ TestAccFilerDbfsReadWrite (12.810s) 2023/08/15 13:21:33 [INFO] ✅ TestAccFilerDbfsReadDir (7.380s) 2023/08/15 13:21:41 [INFO] ✅ TestAccFilerWorkspaceNotebookConflict (7.760s) 2023/08/15 13:21:49 [INFO] ✅ TestAccFilerWorkspaceNotebookWithOverwriteFlag (8.660s) 2023/08/15 13:21:49 [INFO] ✅ TestAccFilerLocalReadWrite (0.020s) 2023/08/15 13:21:49 [INFO] ✅ TestAccFilerLocalReadDir (0.010s) 2023/08/15 13:21:52 [INFO] ✅ TestAccFsCatForDbfs (3.190s) 2023/08/15 13:21:53 [INFO] ✅ TestAccFsCatForDbfsOnNonExistentFile (0.890s) 2023/08/15 13:21:54 [INFO] ✅ TestAccFsCatForDbfsInvalidScheme (0.600s) 2023/08/15 13:21:57 [INFO] ✅ TestAccFsCatDoesNotSupportOutputModeJson (2.960s) 2023/08/15 13:22:28 [INFO] ✅ TestAccFsCpDir (31.480s) 2023/08/15 13:22:43 [INFO] ✅ TestAccFsCpFileToFile (14.530s) 2023/08/15 13:22:58 [INFO] ✅ TestAccFsCpFileToDir (14.610s) 2023/08/15 13:23:29 [INFO] ✅ TestAccFsCpDirToDirFileNotOverwritten (31.810s) 2023/08/15 13:23:47 [INFO] ✅ TestAccFsCpFileToDirFileNotOverwritten (17.500s) 2023/08/15 13:24:04 [INFO] ✅ TestAccFsCpFileToFileFileNotOverwritten (17.260s) 2023/08/15 13:24:37 [INFO] ✅ TestAccFsCpDirToDirWithOverwriteFlag (32.690s) 2023/08/15 13:24:56 [INFO] ✅ TestAccFsCpFileToFileWithOverwriteFlag (19.290s) 2023/08/15 13:25:15 [INFO] ✅ TestAccFsCpFileToDirWithOverwriteFlag (19.230s) 2023/08/15 13:25:17 [INFO] ✅ TestAccFsCpErrorsWhenSourceIsDirWithoutRecursiveFlag (2.010s) 2023/08/15 13:25:18 [INFO] ✅ TestAccFsCpErrorsOnInvalidScheme (0.610s) 2023/08/15 13:25:33 [INFO] ✅ TestAccFsCpSourceIsDirectoryButTargetIsFile (14.900s) 2023/08/15 13:25:37 [INFO] ✅ TestAccFsLsForDbfs (3.770s) 2023/08/15 13:25:41 [INFO] ✅ TestAccFsLsForDbfsWithAbsolutePaths (4.160s) 2023/08/15 13:25:44 [INFO] ✅ TestAccFsLsForDbfsOnFile (2.990s) 2023/08/15 13:25:46 [INFO] ✅ TestAccFsLsForDbfsOnEmptyDir (1.870s) 2023/08/15 13:25:46 [INFO] ✅ TestAccFsLsForDbfsForNonexistingDir (0.850s) 2023/08/15 13:25:47 [INFO] ✅ TestAccFsLsWithoutScheme (0.560s) 2023/08/15 13:25:49 [INFO] ✅ TestAccFsMkdirCreatesDirectory (2.310s) 2023/08/15 13:25:52 [INFO] ✅ TestAccFsMkdirCreatesMultipleDirectories (2.920s) 2023/08/15 13:25:55 [INFO] ✅ TestAccFsMkdirWhenDirectoryAlreadyExists (2.320s) 2023/08/15 13:25:57 [INFO] ✅ TestAccFsMkdirWhenFileExistsAtPath (2.820s) 2023/08/15 13:26:01 [INFO] ✅ TestAccFsRmForFile (4.030s) 2023/08/15 13:26:05 [INFO] ✅ TestAccFsRmForEmptyDirectory (3.530s) 2023/08/15 13:26:08 [INFO] ✅ TestAccFsRmForNonEmptyDirectory (3.190s) 2023/08/15 13:26:09 [INFO] ✅ TestAccFsRmForNonExistentFile (0.830s) 2023/08/15 13:26:13 [INFO] ✅ TestAccFsRmForNonEmptyDirectoryWithRecursiveFlag (3.580s) 2023/08/15 13:26:13 [INFO] ✅ TestAccGitClone (0.800s) 2023/08/15 13:26:14 [INFO] ✅ TestAccGitCloneWithOnlyRepoNameOnAlternateBranch (0.790s) 2023/08/15 13:26:15 [INFO] ✅ TestAccGitCloneErrorsWhenRepositoryDoesNotExist (0.540s) 2023/08/15 13:26:23 [INFO] ✅ TestAccLock (8.630s) 2023/08/15 13:26:27 [INFO] ✅ TestAccLockUnlockWithoutAllowsLockFileNotExist (3.490s) 2023/08/15 13:26:30 [INFO] ✅ TestAccLockUnlockWithAllowsLockFileNotExist (3.130s) 2023/08/15 13:26:39 [INFO] ✅ TestAccSyncFullFileSync (9.370s) 2023/08/15 13:26:50 [INFO] ✅ TestAccSyncIncrementalFileSync (10.390s) 2023/08/15 13:27:00 [INFO] ✅ TestAccSyncNestedFolderSync (10.680s) 2023/08/15 13:27:11 [INFO] ✅ TestAccSyncNestedFolderDoesntFailOnNonEmptyDirectory (10.970s) 2023/08/15 13:27:22 [INFO] ✅ TestAccSyncNestedSpacePlusAndHashAreEscapedSync (10.930s) 2023/08/15 13:27:29 [INFO] ✅ TestAccSyncIncrementalFileOverwritesFolder (7.020s) 2023/08/15 13:27:37 [INFO] ✅ TestAccSyncIncrementalSyncPythonNotebookToFile (7.380s) 2023/08/15 13:27:43 [INFO] ✅ TestAccSyncIncrementalSyncFileToPythonNotebook (6.050s) 2023/08/15 13:27:48 [INFO] ✅ TestAccSyncIncrementalSyncPythonNotebookDelete (5.390s) 2023/08/15 13:27:51 [INFO] ✅ TestAccSyncEnsureRemotePathIsUsableIfRepoDoesntExist (2.570s) 2023/08/15 13:27:56 [INFO] ✅ TestAccSyncEnsureRemotePathIsUsableIfRepoExists (5.540s) 2023/08/15 13:27:58 [INFO] ✅ TestAccSyncEnsureRemotePathIsUsableInWorkspace (1.840s) 2023/08/15 13:27:59 [INFO] ✅ TestAccWorkspaceList (0.790s) 2023/08/15 13:28:08 [INFO] ✅ TestAccExportDir (8.860s) 2023/08/15 13:28:11 [INFO] ✅ TestAccExportDirDoesNotOverwrite (3.090s) 2023/08/15 13:28:14 [INFO] ✅ TestAccExportDirWithOverwriteFlag (3.500s) 2023/08/15 13:28:23 [INFO] ✅ TestAccImportDir (8.330s) 2023/08/15 13:28:34 [INFO] ✅ TestAccImportDirDoesNotOverwrite (10.970s) 2023/08/15 13:28:44 [INFO] ✅ TestAccImportDirWithOverwriteFlag (10.130s) 2023/08/15 13:28:44 [INFO] ✅ 68/68 passed, 0 failed, 3 skipped ```
2023-08-15 13:50:40 +00:00
"slices"
"sort"
"strings"
"time"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/listing"
"github.com/databricks/databricks-sdk-go/service/files"
"golang.org/x/sync/errgroup"
)
// As of 19th Feb 2024, the Files API backend has a rate limit of 10 concurrent
// requests and 100 QPS. We limit the number of concurrent requests to 5 to
// avoid hitting the rate limit.
const maxFilesRequestsInFlight = 5
// Type that implements fs.FileInfo for the Files API.
// This is required for the filer.Stat() method.
type filesApiFileInfo struct {
absPath string
isDir bool
fileSize int64
lastModified int64
}
func (info filesApiFileInfo) Name() string {
return path.Base(info.absPath)
}
func (info filesApiFileInfo) Size() int64 {
return info.fileSize
}
func (info filesApiFileInfo) Mode() fs.FileMode {
mode := fs.ModePerm
if info.isDir {
mode |= fs.ModeDir
}
return mode
}
func (info filesApiFileInfo) ModTime() time.Time {
return time.UnixMilli(info.lastModified)
}
func (info filesApiFileInfo) IsDir() bool {
return info.isDir
}
func (info filesApiFileInfo) Sys() any {
return nil
}
// Type that implements fs.DirEntry for the Files API.
// This is required for the filer.ReadDir() method.
type filesApiDirEntry struct {
i filesApiFileInfo
}
func (e filesApiDirEntry) Name() string {
return e.i.Name()
}
func (e filesApiDirEntry) IsDir() bool {
return e.i.IsDir()
}
func (e filesApiDirEntry) Type() fs.FileMode {
return e.i.Mode()
}
func (e filesApiDirEntry) Info() (fs.FileInfo, error) {
return e.i, nil
}
// FilesClient implements the [Filer] interface for the Files API backend.
type FilesClient struct {
workspaceClient *databricks.WorkspaceClient
apiClient *client.DatabricksClient
// File operations will be relative to this path.
root WorkspaceRootPath
}
func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
apiClient, err := client.New(w.Config)
if err != nil {
return nil, err
}
return &FilesClient{
workspaceClient: w,
apiClient: apiClient,
root: NewWorkspaceRootPath(root),
}, nil
}
func (w *FilesClient) urlPath(name string) (string, string, error) {
absPath, err := w.root.Join(name)
if err != nil {
return "", "", err
}
// The user specified part of the path must be escaped.
urlPath := fmt.Sprintf(
"/api/2.0/fs/files/%s",
url.PathEscape(strings.TrimLeft(absPath, "/")),
)
return absPath, urlPath, nil
}
func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
absPath, urlPath, err := w.urlPath(name)
if err != nil {
return err
}
// Check that target path exists if CreateParentDirectories mode is not set
if !slices.Contains(mode, CreateParentDirectories) {
err := w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, path.Dir(absPath))
if err != nil {
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}
// This API returns a 404 if the file doesn't exist.
if aerr.StatusCode == http.StatusNotFound {
return NoSuchDirectoryError{path.Dir(absPath)}
}
return err
}
}
overwrite := slices.Contains(mode, OverwriteIfExists)
urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite)
headers := map[string]string{"Content-Type": "application/octet-stream"}
err = w.apiClient.Do(ctx, http.MethodPut, urlPath, headers, reader, nil)
// Return early on success.
if err == nil {
return nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}
// This API returns 409 if the file already exists, when the object type is file
if aerr.StatusCode == http.StatusConflict && aerr.ErrorCode == "ALREADY_EXISTS" {
return FileAlreadyExistsError{absPath}
}
return err
}
func (w *FilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {
absPath, urlPath, err := w.urlPath(name)
if err != nil {
return nil, err
}
var buf bytes.Buffer
err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, nil, &buf)
// Return early on success.
if err == nil {
return io.NopCloser(&buf), nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}
// This API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
// Check if the path is a directory. If so, return not a file error.
if _, err := w.statDir(ctx, name); err == nil {
return nil, NotAFile{absPath}
}
// No file or directory exists at the specified path. Return no such file error.
return nil, FileDoesNotExistError{absPath}
}
return nil, err
}
func (w *FilesClient) deleteFile(ctx context.Context, name string) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}
// Illegal to delete the root path.
if absPath == w.root.rootPath {
return CannotDeleteRootError{}
}
err = w.workspaceClient.Files.DeleteByFilePath(ctx, absPath)
// Return early on success.
if err == nil {
return nil
}
var aerr *apierr.APIError
// Special handling of this error only if it is an API error.
if !errors.As(err, &aerr) {
return err
}
// This files delete API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
return FileDoesNotExistError{absPath}
}
return err
}
func (w *FilesClient) deleteDirectory(ctx context.Context, name string) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}
// Illegal to delete the root path.
if absPath == w.root.rootPath {
return CannotDeleteRootError{}
}
err = w.workspaceClient.Files.DeleteDirectoryByDirectoryPath(ctx, absPath)
var aerr *apierr.APIError
// Special handling of this error only if it is an API error.
if !errors.As(err, &aerr) {
return err
}
// The directory delete API returns a 400 if the directory is not empty
if aerr.StatusCode == http.StatusBadRequest {
reasons := []string{}
for _, detail := range aerr.Details {
reasons = append(reasons, detail.Reason)
}
// Error code 400 is generic and can be returned for other reasons. Make
// sure one of the reasons for the error is that the directory is not empty.
if !slices.Contains(reasons, "FILES_API_DIRECTORY_IS_NOT_EMPTY") {
return err
}
return DirectoryNotEmptyError{absPath}
}
return err
}
func (w *FilesClient) recursiveDelete(ctx context.Context, name string) error {
filerFS := NewFS(ctx, w)
dirsToDelete := make([]string, 0)
filesToDelete := make([]string, 0)
callback := func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
// Files API does not allowing deleting non-empty directories. We instead
// collect the directories to delete and delete them once all the files have
// been deleted.
if d.IsDir() {
dirsToDelete = append(dirsToDelete, path)
return nil
}
filesToDelete = append(filesToDelete, path)
return nil
}
// Walk the directory and accumulate the files and directories to delete.
err := fs.WalkDir(filerFS, name, callback)
if err != nil {
return err
}
// Delete the files in parallel.
group, groupCtx := errgroup.WithContext(ctx)
group.SetLimit(maxFilesRequestsInFlight)
for _, file := range filesToDelete {
file := file
// Skip the file if the context has already been cancelled.
select {
case <-groupCtx.Done():
continue
default:
// Proceed.
}
group.Go(func() error {
return w.deleteFile(groupCtx, file)
})
}
// Wait for the files to be deleted and return the first non-nil error.
err = group.Wait()
if err != nil {
return err
}
// Delete the directories in reverse order to ensure that the parent
// directories are deleted after the children. This is possible because
// fs.WalkDir walks the directories in lexicographical order.
for i := len(dirsToDelete) - 1; i >= 0; i-- {
err := w.deleteDirectory(ctx, dirsToDelete[i])
if err != nil {
return err
}
}
return nil
}
func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
if slices.Contains(mode, DeleteRecursively) {
return w.recursiveDelete(ctx, name)
}
// Issue a stat call to determine if the path is a file or directory.
info, err := w.Stat(ctx, name)
if err != nil {
return err
}
// Issue the delete call for a directory
if info.IsDir() {
return w.deleteDirectory(ctx, name)
}
return w.deleteFile(ctx, name)
}
func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
absPath, err := w.root.Join(name)
if err != nil {
return nil, err
}
iter := w.workspaceClient.Files.ListDirectoryContents(ctx, files.ListDirectoryContentsRequest{
DirectoryPath: absPath,
})
files, err := listing.ToSlice(ctx, iter)
// Return early on success.
if err == nil {
entries := make([]fs.DirEntry, len(files))
for i, file := range files {
entries[i] = filesApiDirEntry{
i: filesApiFileInfo{
absPath: file.Path,
isDir: file.IsDirectory,
fileSize: file.FileSize,
lastModified: file.LastModified,
},
}
}
// Sort by name for parity with os.ReadDir.
sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() })
return entries, nil
}
// Special handling of this error only if it is an API error.
var apierr *apierr.APIError
if !errors.As(err, &apierr) {
return nil, err
}
// This API returns a 404 if the specified path does not exist.
if apierr.StatusCode == http.StatusNotFound {
// Check if the path is a file. If so, return not a directory error.
if _, err := w.statFile(ctx, name); err == nil {
return nil, NotADirectory{absPath}
}
// No file or directory exists at the specified path. Return no such directory error.
return nil, NoSuchDirectoryError{absPath}
}
return nil, err
}
func (w *FilesClient) Mkdir(ctx context.Context, name string) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}
err = w.workspaceClient.Files.CreateDirectory(ctx, files.CreateDirectoryRequest{
DirectoryPath: absPath,
})
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if errors.As(err, &aerr) && aerr.StatusCode == http.StatusConflict {
return FileAlreadyExistsError{absPath}
}
return err
}
// Get file metadata for a file using the Files API.
func (w *FilesClient) statFile(ctx context.Context, name string) (fs.FileInfo, error) {
absPath, err := w.root.Join(name)
if err != nil {
return nil, err
}
fileInfo, err := w.workspaceClient.Files.GetMetadataByFilePath(ctx, absPath)
// If the HEAD requests succeeds, the file exists.
if err == nil {
return filesApiFileInfo{
absPath: absPath,
isDir: false,
fileSize: fileInfo.ContentLength,
}, nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}
// This API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
return nil, FileDoesNotExistError{absPath}
}
return nil, err
}
// Get file metadata for a directory using the Files API.
func (w *FilesClient) statDir(ctx context.Context, name string) (fs.FileInfo, error) {
absPath, err := w.root.Join(name)
if err != nil {
return nil, err
}
err = w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, absPath)
// If the HEAD requests succeeds, the directory exists.
if err == nil {
return filesApiFileInfo{absPath: absPath, isDir: true}, nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}
// The directory metadata API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
return nil, NoSuchDirectoryError{absPath}
}
return nil, err
}
func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
// Assume that the path is a directory and issue a stat call.
dirInfo, err := w.statDir(ctx, name)
// If the file exists, return early.
if err == nil {
return dirInfo, nil
}
// Return early if the error is not a NoSuchDirectoryError.
if !errors.As(err, &NoSuchDirectoryError{}) {
return nil, err
}
// Since the path is not a directory, assume that it is a file and issue a stat call.
return w.statFile(ctx, name)
}