2023-05-31 11:24:20 +00:00
|
|
|
package filer
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
2024-12-02 22:47:31 +00:00
|
|
|
"fmt"
|
2023-05-31 11:24:20 +00:00
|
|
|
"io"
|
2023-05-31 12:22:26 +00:00
|
|
|
"io/fs"
|
2024-12-02 22:47:31 +00:00
|
|
|
"mime/multipart"
|
2023-05-31 11:24:20 +00:00
|
|
|
"net/http"
|
2024-12-02 22:47:31 +00:00
|
|
|
"os"
|
2023-05-31 11:24:20 +00:00
|
|
|
"path"
|
2023-08-15 13:50:40 +00:00
|
|
|
"slices"
|
2023-05-31 11:24:20 +00:00
|
|
|
"sort"
|
2023-06-23 13:08:22 +00:00
|
|
|
"strings"
|
2023-05-31 11:24:20 +00:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/databricks/databricks-sdk-go"
|
|
|
|
"github.com/databricks/databricks-sdk-go/apierr"
|
2024-12-02 22:47:31 +00:00
|
|
|
"github.com/databricks/databricks-sdk-go/client"
|
2023-05-31 11:24:20 +00:00
|
|
|
"github.com/databricks/databricks-sdk-go/service/files"
|
|
|
|
)
|
|
|
|
|
2023-05-31 12:22:26 +00:00
|
|
|
// Type that implements fs.DirEntry for DBFS.
|
|
|
|
type dbfsDirEntry struct {
|
|
|
|
dbfsFileInfo
|
|
|
|
}
|
|
|
|
|
|
|
|
func (entry dbfsDirEntry) Type() fs.FileMode {
|
2023-06-01 18:23:22 +00:00
|
|
|
return entry.Mode()
|
2023-05-31 12:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (entry dbfsDirEntry) Info() (fs.FileInfo, error) {
|
|
|
|
return entry.dbfsFileInfo, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Type that implements fs.FileInfo for DBFS.
|
|
|
|
type dbfsFileInfo struct {
|
|
|
|
fi files.FileInfo
|
|
|
|
}
|
|
|
|
|
|
|
|
func (info dbfsFileInfo) Name() string {
|
|
|
|
return path.Base(info.fi.Path)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (info dbfsFileInfo) Size() int64 {
|
|
|
|
return info.fi.FileSize
|
|
|
|
}
|
|
|
|
|
|
|
|
func (info dbfsFileInfo) Mode() fs.FileMode {
|
2023-06-01 18:23:22 +00:00
|
|
|
mode := fs.ModePerm
|
|
|
|
if info.fi.IsDir {
|
|
|
|
mode |= fs.ModeDir
|
|
|
|
}
|
|
|
|
return mode
|
2023-05-31 12:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (info dbfsFileInfo) ModTime() time.Time {
|
|
|
|
return time.UnixMilli(info.fi.ModificationTime)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (info dbfsFileInfo) IsDir() bool {
|
|
|
|
return info.fi.IsDir
|
|
|
|
}
|
|
|
|
|
|
|
|
func (info dbfsFileInfo) Sys() any {
|
2023-06-08 16:15:12 +00:00
|
|
|
return info.fi
|
2023-05-31 12:22:26 +00:00
|
|
|
}
|
|
|
|
|
2024-12-02 23:05:04 +00:00
|
|
|
// Interface to allow mocking of the Databricks API client.
|
2024-12-31 07:16:48 +00:00
|
|
|
//
|
|
|
|
//nolint:gofumpt
|
2024-12-02 23:05:04 +00:00
|
|
|
type databricksClient interface {
|
|
|
|
Do(ctx context.Context, method, path string, headers map[string]string,
|
|
|
|
requestBody any, responseBody any, visitors ...func(*http.Request) error) error
|
|
|
|
}
|
|
|
|
|
2023-05-31 11:24:20 +00:00
|
|
|
// DbfsClient implements the [Filer] interface for the DBFS backend.
|
|
|
|
type DbfsClient struct {
|
|
|
|
workspaceClient *databricks.WorkspaceClient
|
|
|
|
|
2024-12-02 23:05:04 +00:00
|
|
|
apiClient databricksClient
|
|
|
|
|
2023-05-31 11:24:20 +00:00
|
|
|
// File operations will be relative to this path.
|
2023-06-23 14:07:09 +00:00
|
|
|
root WorkspaceRootPath
|
2023-05-31 11:24:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
|
2024-12-02 23:05:04 +00:00
|
|
|
apiClient, err := client.New(w.Config)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to create API client: %w", err)
|
|
|
|
}
|
|
|
|
|
2023-05-31 11:24:20 +00:00
|
|
|
return &DbfsClient{
|
|
|
|
workspaceClient: w,
|
2024-12-02 23:05:04 +00:00
|
|
|
apiClient: apiClient,
|
2023-05-31 11:24:20 +00:00
|
|
|
|
2023-06-23 14:07:09 +00:00
|
|
|
root: NewWorkspaceRootPath(root),
|
2023-05-31 11:24:20 +00:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2024-12-30 17:13:45 +00:00
|
|
|
func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, file *os.File) error {
|
2024-12-02 22:49:32 +00:00
|
|
|
overwriteField := "False"
|
|
|
|
if overwrite {
|
|
|
|
overwriteField = "True"
|
|
|
|
}
|
|
|
|
|
2025-01-02 11:25:56 +00:00
|
|
|
pr, pw := io.Pipe()
|
|
|
|
writer := multipart.NewWriter(pw)
|
|
|
|
go func() {
|
|
|
|
defer pw.Close()
|
|
|
|
|
|
|
|
err := writer.WriteField("path", path)
|
|
|
|
if err != nil {
|
|
|
|
pw.CloseWithError(fmt.Errorf("failed to write field path field in multipath form: %w", err))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
err = writer.WriteField("overwrite", overwriteField)
|
|
|
|
if err != nil {
|
|
|
|
pw.CloseWithError(fmt.Errorf("failed to write field overwrite field in multipath form: %w", err))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
contents, err := writer.CreateFormFile("contents", "")
|
|
|
|
if err != nil {
|
|
|
|
pw.CloseWithError(fmt.Errorf("failed to write contents field in multipath form: %w", err))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
for {
|
|
|
|
// Copy the file in 10 MB chunks. This reduces the memory usage of the
|
|
|
|
// program when copying large files.
|
|
|
|
_, err := io.CopyN(contents, file, 10*1024*1024)
|
|
|
|
if err == io.EOF {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
pw.CloseWithError(fmt.Errorf("failed to copy file in multipath form: %w", err))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
err = writer.Close()
|
|
|
|
if err != nil {
|
|
|
|
pw.CloseWithError(fmt.Errorf("failed to close multipath form writer: %w", err))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}()
|
2024-12-02 22:47:31 +00:00
|
|
|
|
2024-12-31 07:58:36 +00:00
|
|
|
// Request bodies of Content-Type multipart/form-data are not supported by
|
2024-12-02 22:47:31 +00:00
|
|
|
// the Go SDK directly for DBFS. So we use the Do method directly.
|
2025-01-02 11:25:56 +00:00
|
|
|
err := w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{
|
2024-12-02 22:47:31 +00:00
|
|
|
"Content-Type": writer.FormDataContentType(),
|
2025-01-02 11:25:56 +00:00
|
|
|
}, pr, nil)
|
2024-12-31 07:58:36 +00:00
|
|
|
var aerr *apierr.APIError
|
|
|
|
if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" {
|
|
|
|
return FileAlreadyExistsError{path}
|
|
|
|
}
|
|
|
|
return err
|
2024-12-02 22:47:31 +00:00
|
|
|
}
|
|
|
|
|
2024-12-31 11:40:19 +00:00
|
|
|
// MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded
|
|
|
|
// using the /dbfs/put API. If the file is larger than this limit, the streaming
|
|
|
|
// API (/dbfs/create and /dbfs/add-block) will be used instead.
|
|
|
|
var MaxDbfsPutFileSize int64 = 2 * 1024 * 1024 * 1024
|
|
|
|
|
2023-05-31 11:24:20 +00:00
|
|
|
func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
|
|
|
|
absPath, err := w.root.Join(name)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Issue info call before write because it automatically creates parent directories.
|
|
|
|
//
|
|
|
|
// For discussion: we could decide this is actually convenient, remove the call below,
|
|
|
|
// and apply the same semantics for the WSFS filer.
|
|
|
|
//
|
|
|
|
if !slices.Contains(mode, CreateParentDirectories) {
|
|
|
|
_, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, path.Dir(absPath))
|
|
|
|
if err != nil {
|
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This API returns a 404 if the file doesn't exist.
|
|
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
|
|
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
|
|
|
return NoSuchDirectoryError{path.Dir(absPath)}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-12-02 22:47:31 +00:00
|
|
|
localFile, ok := reader.(*os.File)
|
2023-05-31 11:24:20 +00:00
|
|
|
|
2024-12-02 22:47:31 +00:00
|
|
|
// If the source is not a local file, we'll always use the streaming API endpoint.
|
|
|
|
if !ok {
|
2024-12-30 17:13:45 +00:00
|
|
|
return w.streamFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), reader)
|
2024-12-02 22:47:31 +00:00
|
|
|
}
|
2023-05-31 11:24:20 +00:00
|
|
|
|
2024-12-02 22:47:31 +00:00
|
|
|
stat, err := localFile.Stat()
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to stat file: %w", err)
|
2023-05-31 11:24:20 +00:00
|
|
|
}
|
|
|
|
|
2024-12-02 22:47:31 +00:00
|
|
|
// If the source is a local file, but is too large then we'll use the streaming API endpoint.
|
2024-12-30 17:13:45 +00:00
|
|
|
if stat.Size() > MaxDbfsPutFileSize {
|
|
|
|
return w.streamFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile)
|
2023-05-31 11:24:20 +00:00
|
|
|
}
|
|
|
|
|
2024-12-02 22:47:31 +00:00
|
|
|
// Use the /dbfs/put API when the file is on the local filesystem
|
|
|
|
// and is small enough. This is the most common case when users use the
|
|
|
|
// `databricks fs cp` command.
|
2024-12-30 17:13:45 +00:00
|
|
|
return w.putFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile)
|
2023-05-31 11:24:20 +00:00
|
|
|
}
|
|
|
|
|
2024-12-31 11:41:39 +00:00
|
|
|
func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool, reader io.Reader) error {
|
|
|
|
fileMode := files.FileModeWrite
|
|
|
|
if overwrite {
|
|
|
|
fileMode |= files.FileModeOverwrite
|
|
|
|
}
|
|
|
|
|
|
|
|
handle, err := w.workspaceClient.Dbfs.Open(ctx, path, fileMode)
|
|
|
|
if err != nil {
|
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This API returns a 400 if the file already exists.
|
|
|
|
if aerr.StatusCode == http.StatusBadRequest {
|
|
|
|
if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" {
|
|
|
|
return FileAlreadyExistsError{path}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
_, err = io.Copy(handle, reader)
|
|
|
|
cerr := handle.Close()
|
|
|
|
if err == nil {
|
|
|
|
err = cerr
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-12 13:53:58 +00:00
|
|
|
func (w *DbfsClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {
|
2023-05-31 11:24:20 +00:00
|
|
|
absPath, err := w.root.Join(name)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2023-06-12 13:53:58 +00:00
|
|
|
handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead)
|
|
|
|
if err != nil {
|
2023-06-23 13:08:22 +00:00
|
|
|
// Return error if file is a directory
|
|
|
|
if strings.Contains(err.Error(), "cannot open directory for reading") {
|
|
|
|
return nil, NotAFile{absPath}
|
|
|
|
}
|
|
|
|
|
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This API returns a 404 if the file doesn't exist.
|
|
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
|
|
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
|
|
|
return nil, FileDoesNotExistError{absPath}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-06-12 13:53:58 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// A DBFS handle open for reading does not need to be closed.
|
|
|
|
return io.NopCloser(handle), nil
|
2023-05-31 11:24:20 +00:00
|
|
|
}
|
|
|
|
|
2023-06-06 06:27:47 +00:00
|
|
|
func (w *DbfsClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
|
2023-05-31 11:24:20 +00:00
|
|
|
absPath, err := w.root.Join(name)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-06 06:27:47 +00:00
|
|
|
// Illegal to delete the root path.
|
|
|
|
if absPath == w.root.rootPath {
|
|
|
|
return CannotDeleteRootError{}
|
|
|
|
}
|
|
|
|
|
2023-05-31 11:24:20 +00:00
|
|
|
// Issue info call before delete because delete succeeds if the specified path doesn't exist.
|
|
|
|
//
|
|
|
|
// For discussion: we could decide this is actually convenient, remove the call below,
|
|
|
|
// and apply the same semantics for the WSFS filer.
|
|
|
|
//
|
|
|
|
_, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, absPath)
|
|
|
|
if err != nil {
|
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This API returns a 404 if the file doesn't exist.
|
|
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
|
|
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
|
|
|
return FileDoesNotExistError{absPath}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-06 06:27:47 +00:00
|
|
|
recursive := false
|
|
|
|
if slices.Contains(mode, DeleteRecursively) {
|
|
|
|
recursive = true
|
|
|
|
}
|
|
|
|
|
|
|
|
err = w.workspaceClient.Dbfs.Delete(ctx, files.Delete{
|
2023-05-31 11:24:20 +00:00
|
|
|
Path: absPath,
|
2023-06-06 06:27:47 +00:00
|
|
|
Recursive: recursive,
|
2023-05-31 11:24:20 +00:00
|
|
|
})
|
2023-06-06 06:27:47 +00:00
|
|
|
|
|
|
|
// Return early on success.
|
|
|
|
if err == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Special handling of this error only if it is an API error.
|
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
switch aerr.StatusCode {
|
|
|
|
case http.StatusBadRequest:
|
|
|
|
// Anecdotally, this error is returned when attempting to delete a non-empty directory.
|
|
|
|
if aerr.ErrorCode == "IO_ERROR" {
|
|
|
|
return DirectoryNotEmptyError{absPath}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
2023-05-31 11:24:20 +00:00
|
|
|
}
|
|
|
|
|
2023-05-31 12:22:26 +00:00
|
|
|
func (w *DbfsClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
2023-05-31 11:24:20 +00:00
|
|
|
absPath, err := w.root.Join(name)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
res, err := w.workspaceClient.Dbfs.ListByPath(ctx, absPath)
|
|
|
|
if err != nil {
|
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This API returns a 404 if the file doesn't exist.
|
|
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
|
|
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
|
|
|
return nil, NoSuchDirectoryError{absPath}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2023-06-02 10:28:35 +00:00
|
|
|
if len(res.Files) == 1 && res.Files[0].Path == absPath {
|
|
|
|
return nil, NotADirectory{absPath}
|
|
|
|
}
|
|
|
|
|
2023-05-31 12:22:26 +00:00
|
|
|
info := make([]fs.DirEntry, len(res.Files))
|
2023-05-31 11:24:20 +00:00
|
|
|
for i, v := range res.Files {
|
2023-05-31 12:22:26 +00:00
|
|
|
info[i] = dbfsDirEntry{dbfsFileInfo: dbfsFileInfo{fi: v}}
|
2023-05-31 11:24:20 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Sort by name for parity with os.ReadDir.
|
2023-05-31 12:22:26 +00:00
|
|
|
sort.Slice(info, func(i, j int) bool { return info[i].Name() < info[j].Name() })
|
2023-05-31 11:24:20 +00:00
|
|
|
return info, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *DbfsClient) Mkdir(ctx context.Context, name string) error {
|
|
|
|
dirPath, err := w.root.Join(name)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return w.workspaceClient.Dbfs.MkdirsByPath(ctx, dirPath)
|
|
|
|
}
|
2023-06-01 18:23:22 +00:00
|
|
|
|
|
|
|
func (w *DbfsClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
|
|
|
|
absPath, err := w.root.Join(name)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
info, err := w.workspaceClient.Dbfs.GetStatusByPath(ctx, absPath)
|
|
|
|
if err != nil {
|
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This API returns a 404 if the file doesn't exist.
|
|
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
|
|
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
|
|
|
return nil, FileDoesNotExistError{absPath}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return dbfsFileInfo{*info}, nil
|
|
|
|
}
|