mirror of https://github.com/databricks/cli.git
502 lines
12 KiB
Go
502 lines
12 KiB
Go
package filer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"slices"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/databricks/databricks-sdk-go"
|
|
"github.com/databricks/databricks-sdk-go/apierr"
|
|
"github.com/databricks/databricks-sdk-go/client"
|
|
"github.com/databricks/databricks-sdk-go/listing"
|
|
"github.com/databricks/databricks-sdk-go/service/files"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// As of 19th Feb 2024, the Files API backend has a rate limit of 10 concurrent
|
|
// requests and 100 QPS. We limit the number of concurrent requests to 5 to
|
|
// avoid hitting the rate limit.
|
|
const maxFilesRequestsInFlight = 5
|
|
|
|
// Type that implements fs.FileInfo for the Files API.
|
|
// This is required for the filer.Stat() method.
|
|
type filesApiFileInfo struct {
|
|
absPath string
|
|
isDir bool
|
|
fileSize int64
|
|
lastModified int64
|
|
}
|
|
|
|
func (info filesApiFileInfo) Name() string {
|
|
return path.Base(info.absPath)
|
|
}
|
|
|
|
func (info filesApiFileInfo) Size() int64 {
|
|
return info.fileSize
|
|
}
|
|
|
|
func (info filesApiFileInfo) Mode() fs.FileMode {
|
|
mode := fs.ModePerm
|
|
if info.isDir {
|
|
mode |= fs.ModeDir
|
|
}
|
|
return mode
|
|
}
|
|
|
|
func (info filesApiFileInfo) ModTime() time.Time {
|
|
return time.UnixMilli(info.lastModified)
|
|
}
|
|
|
|
func (info filesApiFileInfo) IsDir() bool {
|
|
return info.isDir
|
|
}
|
|
|
|
func (info filesApiFileInfo) Sys() any {
|
|
return nil
|
|
}
|
|
|
|
// Type that implements fs.DirEntry for the Files API.
|
|
// This is required for the filer.ReadDir() method.
|
|
type filesApiDirEntry struct {
|
|
i filesApiFileInfo
|
|
}
|
|
|
|
func (e filesApiDirEntry) Name() string {
|
|
return e.i.Name()
|
|
}
|
|
|
|
func (e filesApiDirEntry) IsDir() bool {
|
|
return e.i.IsDir()
|
|
}
|
|
|
|
func (e filesApiDirEntry) Type() fs.FileMode {
|
|
return e.i.Mode()
|
|
}
|
|
|
|
func (e filesApiDirEntry) Info() (fs.FileInfo, error) {
|
|
return e.i, nil
|
|
}
|
|
|
|
// FilesClient implements the [Filer] interface for the Files API backend.
|
|
type FilesClient struct {
|
|
workspaceClient *databricks.WorkspaceClient
|
|
apiClient *client.DatabricksClient
|
|
|
|
// File operations will be relative to this path.
|
|
root WorkspaceRootPath
|
|
}
|
|
|
|
func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
|
|
apiClient, err := client.New(w.Config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &FilesClient{
|
|
workspaceClient: w,
|
|
apiClient: apiClient,
|
|
|
|
root: NewWorkspaceRootPath(root),
|
|
}, nil
|
|
}
|
|
|
|
func (w *FilesClient) urlPath(name string) (string, string, error) {
|
|
absPath, err := w.root.Join(name)
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
|
|
// The user specified part of the path must be escaped.
|
|
urlPath := "/api/2.0/fs/files/" + url.PathEscape(strings.TrimLeft(absPath, "/"))
|
|
|
|
return absPath, urlPath, nil
|
|
}
|
|
|
|
func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
|
|
absPath, urlPath, err := w.urlPath(name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check that target path exists if CreateParentDirectories mode is not set
|
|
if !slices.Contains(mode, CreateParentDirectories) {
|
|
err := w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, path.Dir(absPath))
|
|
if err != nil {
|
|
var aerr *apierr.APIError
|
|
if !errors.As(err, &aerr) {
|
|
return err
|
|
}
|
|
|
|
// This API returns a 404 if the file doesn't exist.
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
return NoSuchDirectoryError{path.Dir(absPath)}
|
|
}
|
|
|
|
return err
|
|
}
|
|
}
|
|
|
|
overwrite := slices.Contains(mode, OverwriteIfExists)
|
|
urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite)
|
|
headers := map[string]string{"Content-Type": "application/octet-stream"}
|
|
err = w.apiClient.Do(ctx, http.MethodPut, urlPath, headers, reader, nil)
|
|
|
|
// Return early on success.
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
// Special handling of this error only if it is an API error.
|
|
var aerr *apierr.APIError
|
|
if !errors.As(err, &aerr) {
|
|
return err
|
|
}
|
|
|
|
// This API returns 409 if the file already exists, when the object type is file
|
|
if aerr.StatusCode == http.StatusConflict && aerr.ErrorCode == "ALREADY_EXISTS" {
|
|
return FileAlreadyExistsError{absPath}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (w *FilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {
|
|
absPath, urlPath, err := w.urlPath(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var reader io.ReadCloser
|
|
err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, nil, &reader)
|
|
|
|
// Return early on success.
|
|
if err == nil {
|
|
return reader, nil
|
|
}
|
|
|
|
// Special handling of this error only if it is an API error.
|
|
var aerr *apierr.APIError
|
|
if !errors.As(err, &aerr) {
|
|
return nil, err
|
|
}
|
|
|
|
// This API returns a 404 if the specified path does not exist.
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
// Check if the path is a directory. If so, return not a file error.
|
|
if _, err := w.statDir(ctx, name); err == nil {
|
|
return nil, NotAFile{absPath}
|
|
}
|
|
|
|
// No file or directory exists at the specified path. Return no such file error.
|
|
return nil, FileDoesNotExistError{absPath}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
func (w *FilesClient) deleteFile(ctx context.Context, name string) error {
|
|
absPath, err := w.root.Join(name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Illegal to delete the root path.
|
|
if absPath == w.root.rootPath {
|
|
return CannotDeleteRootError{}
|
|
}
|
|
|
|
err = w.workspaceClient.Files.DeleteByFilePath(ctx, absPath)
|
|
|
|
// Return early on success.
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
var aerr *apierr.APIError
|
|
// Special handling of this error only if it is an API error.
|
|
if !errors.As(err, &aerr) {
|
|
return err
|
|
}
|
|
|
|
// This files delete API returns a 404 if the specified path does not exist.
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
return FileDoesNotExistError{absPath}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (w *FilesClient) deleteDirectory(ctx context.Context, name string) error {
|
|
absPath, err := w.root.Join(name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Illegal to delete the root path.
|
|
if absPath == w.root.rootPath {
|
|
return CannotDeleteRootError{}
|
|
}
|
|
|
|
err = w.workspaceClient.Files.DeleteDirectoryByDirectoryPath(ctx, absPath)
|
|
|
|
var aerr *apierr.APIError
|
|
// Special handling of this error only if it is an API error.
|
|
if !errors.As(err, &aerr) {
|
|
return err
|
|
}
|
|
|
|
// The directory delete API returns a 400 if the directory is not empty
|
|
if aerr.StatusCode == http.StatusBadRequest {
|
|
reasons := []string{}
|
|
for _, detail := range aerr.Details {
|
|
reasons = append(reasons, detail.Reason)
|
|
}
|
|
// Error code 400 is generic and can be returned for other reasons. Make
|
|
// sure one of the reasons for the error is that the directory is not empty.
|
|
if !slices.Contains(reasons, "FILES_API_DIRECTORY_IS_NOT_EMPTY") {
|
|
return err
|
|
}
|
|
return DirectoryNotEmptyError{absPath}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (w *FilesClient) recursiveDelete(ctx context.Context, name string) error {
|
|
filerFS := NewFS(ctx, w)
|
|
dirsToDelete := make([]string, 0)
|
|
filesToDelete := make([]string, 0)
|
|
callback := func(path string, d fs.DirEntry, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Files API does not allowing deleting non-empty directories. We instead
|
|
// collect the directories to delete and delete them once all the files have
|
|
// been deleted.
|
|
if d.IsDir() {
|
|
dirsToDelete = append(dirsToDelete, path)
|
|
return nil
|
|
}
|
|
|
|
filesToDelete = append(filesToDelete, path)
|
|
return nil
|
|
}
|
|
|
|
// Walk the directory and accumulate the files and directories to delete.
|
|
err := fs.WalkDir(filerFS, name, callback)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete the files in parallel.
|
|
group, groupCtx := errgroup.WithContext(ctx)
|
|
group.SetLimit(maxFilesRequestsInFlight)
|
|
|
|
for _, file := range filesToDelete {
|
|
// Skip the file if the context has already been cancelled.
|
|
select {
|
|
case <-groupCtx.Done():
|
|
continue
|
|
default:
|
|
// Proceed.
|
|
}
|
|
|
|
group.Go(func() error {
|
|
return w.deleteFile(groupCtx, file)
|
|
})
|
|
}
|
|
|
|
// Wait for the files to be deleted and return the first non-nil error.
|
|
err = group.Wait()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete the directories in reverse order to ensure that the parent
|
|
// directories are deleted after the children. This is possible because
|
|
// fs.WalkDir walks the directories in lexicographical order.
|
|
for i := len(dirsToDelete) - 1; i >= 0; i-- {
|
|
err := w.deleteDirectory(ctx, dirsToDelete[i])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
|
|
if slices.Contains(mode, DeleteRecursively) {
|
|
return w.recursiveDelete(ctx, name)
|
|
}
|
|
|
|
// Issue a stat call to determine if the path is a file or directory.
|
|
info, err := w.Stat(ctx, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Issue the delete call for a directory
|
|
if info.IsDir() {
|
|
return w.deleteDirectory(ctx, name)
|
|
}
|
|
|
|
return w.deleteFile(ctx, name)
|
|
}
|
|
|
|
func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
|
absPath, err := w.root.Join(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
iter := w.workspaceClient.Files.ListDirectoryContents(ctx, files.ListDirectoryContentsRequest{
|
|
DirectoryPath: absPath,
|
|
})
|
|
|
|
files, err := listing.ToSlice(ctx, iter)
|
|
|
|
// Return early on success.
|
|
if err == nil {
|
|
entries := make([]fs.DirEntry, len(files))
|
|
for i, file := range files {
|
|
entries[i] = filesApiDirEntry{
|
|
i: filesApiFileInfo{
|
|
absPath: file.Path,
|
|
isDir: file.IsDirectory,
|
|
fileSize: file.FileSize,
|
|
lastModified: file.LastModified,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Sort by name for parity with os.ReadDir.
|
|
sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() })
|
|
return entries, nil
|
|
}
|
|
|
|
// Special handling of this error only if it is an API error.
|
|
var apierr *apierr.APIError
|
|
if !errors.As(err, &apierr) {
|
|
return nil, err
|
|
}
|
|
|
|
// This API returns a 404 if the specified path does not exist.
|
|
if apierr.StatusCode == http.StatusNotFound {
|
|
// Check if the path is a file. If so, return not a directory error.
|
|
if _, err := w.statFile(ctx, name); err == nil {
|
|
return nil, NotADirectory{absPath}
|
|
}
|
|
|
|
// No file or directory exists at the specified path. Return no such directory error.
|
|
return nil, NoSuchDirectoryError{absPath}
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
func (w *FilesClient) Mkdir(ctx context.Context, name string) error {
|
|
absPath, err := w.root.Join(name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = w.workspaceClient.Files.CreateDirectory(ctx, files.CreateDirectoryRequest{
|
|
DirectoryPath: absPath,
|
|
})
|
|
|
|
// Special handling of this error only if it is an API error.
|
|
var aerr *apierr.APIError
|
|
if errors.As(err, &aerr) && aerr.StatusCode == http.StatusConflict {
|
|
return FileAlreadyExistsError{absPath}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Get file metadata for a file using the Files API.
|
|
func (w *FilesClient) statFile(ctx context.Context, name string) (fs.FileInfo, error) {
|
|
absPath, err := w.root.Join(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
fileInfo, err := w.workspaceClient.Files.GetMetadataByFilePath(ctx, absPath)
|
|
|
|
// If the HEAD requests succeeds, the file exists.
|
|
if err == nil {
|
|
return filesApiFileInfo{
|
|
absPath: absPath,
|
|
isDir: false,
|
|
fileSize: fileInfo.ContentLength,
|
|
}, nil
|
|
}
|
|
|
|
// Special handling of this error only if it is an API error.
|
|
var aerr *apierr.APIError
|
|
if !errors.As(err, &aerr) {
|
|
return nil, err
|
|
}
|
|
|
|
// This API returns a 404 if the specified path does not exist.
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
return nil, FileDoesNotExistError{absPath}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
// Get file metadata for a directory using the Files API.
|
|
func (w *FilesClient) statDir(ctx context.Context, name string) (fs.FileInfo, error) {
|
|
absPath, err := w.root.Join(name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, absPath)
|
|
|
|
// If the HEAD requests succeeds, the directory exists.
|
|
if err == nil {
|
|
return filesApiFileInfo{absPath: absPath, isDir: true}, nil
|
|
}
|
|
|
|
// Special handling of this error only if it is an API error.
|
|
var aerr *apierr.APIError
|
|
if !errors.As(err, &aerr) {
|
|
return nil, err
|
|
}
|
|
|
|
// The directory metadata API returns a 404 if the specified path does not exist.
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
return nil, NoSuchDirectoryError{absPath}
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
|
|
// Assume that the path is a directory and issue a stat call.
|
|
dirInfo, err := w.statDir(ctx, name)
|
|
|
|
// If the file exists, return early.
|
|
if err == nil {
|
|
return dirInfo, nil
|
|
}
|
|
|
|
// Return early if the error is not a NoSuchDirectoryError.
|
|
if !errors.As(err, &NoSuchDirectoryError{}) {
|
|
return nil, err
|
|
}
|
|
|
|
// Since the path is not a directory, assume that it is a file and issue a stat call.
|
|
return w.statFile(ctx, name)
|
|
}
|