Use the `/dbfs/put` API endpoint to upload smaller DBFS files

This commit is contained in:
Shreyas Goenka 2024-12-02 23:47:31 +01:00
parent e86a949d99
commit c4cea1aeff
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
1 changed files with 95 additions and 19 deletions

View File

@ -1,11 +1,15 @@
package filer
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"mime/multipart"
"net/http"
"os"
"path"
"slices"
"sort"
@ -14,6 +18,7 @@ import (
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/service/files"
)
@ -79,6 +84,80 @@ func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
}, nil
}
func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, overwrite bool, file *os.File) error {
apiClient, err := client.New(w.workspaceClient.Config)
if err != nil {
return fmt.Errorf("failed to create API client: %w", err)
}
buf := &bytes.Buffer{}
writer := multipart.NewWriter(buf)
err = writer.WriteField("path", path)
if err != nil {
return err
}
err = writer.WriteField("overwrite", "True")
if err != nil {
return err
}
contents, err := writer.CreateFormFile("contents", "")
if err != nil {
return err
}
_, err = io.Copy(contents, file)
if err != nil {
return err
}
err = writer.Close()
if err != nil {
return err
}
// Request bodies of Content-Type multipart/form-data must are not supported by
// the Go SDK directly for DBFS. So we use the Do method directly.
return apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{
"Content-Type": writer.FormDataContentType(),
}, buf.Bytes(), nil)
}
func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path string, overwrite bool, reader io.Reader) error {
fileMode := files.FileModeWrite
if overwrite {
fileMode |= files.FileModeOverwrite
}
handle, err := w.workspaceClient.Dbfs.Open(ctx, path, fileMode)
if err != nil {
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}
// This API returns a 400 if the file already exists.
if aerr.StatusCode == http.StatusBadRequest {
if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" {
return FileAlreadyExistsError{path}
}
}
return err
}
_, err = io.Copy(handle, reader)
cerr := handle.Close()
if err == nil {
err = cerr
}
return err
}
// MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded
// using the /dbfs/put API. If the file is larger than this limit, the streaming
// API (/dbfs/create and /dbfs/add-block) will be used instead.
var MaxUploadLimitForPutApi int64 = 2 * 1024 * 1024
func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
absPath, err := w.root.Join(name)
if err != nil {
@ -114,30 +193,27 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m
}
}
handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, fileMode)
localFile, ok := reader.(*os.File)
// If the source is not a local file, we'll always use the streaming API endpoint.
if !ok {
return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), reader)
}
stat, err := localFile.Stat()
if err != nil {
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}
// This API returns a 400 if the file already exists.
if aerr.StatusCode == http.StatusBadRequest {
if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" {
return FileAlreadyExistsError{absPath}
}
}
return err
return fmt.Errorf("failed to stat file: %w", err)
}
_, err = io.Copy(handle, reader)
cerr := handle.Close()
if err == nil {
err = cerr
// If the source is a local file, but is too large then we'll use the streaming API endpoint.
if stat.Size() > MaxUploadLimitForPutApi {
return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile)
}
return err
// Use the /dbfs/put API when the file is on the local filesystem
// and is small enough. This is the most common case when users use the
// `databricks fs cp` command.
return w.uploadUsingDbfsPutApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile)
}
func (w *DbfsClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {