diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index ce122bc03..5e007674d 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -68,28 +68,37 @@ func (info dbfsFileInfo) Sys() any { return info.fi } +// Interface to allow mocking of the Databricks API client. +type databricksClient interface { + Do(ctx context.Context, method, path string, headers map[string]string, + requestBody any, responseBody any, visitors ...func(*http.Request) error) error +} + // DbfsClient implements the [Filer] interface for the DBFS backend. type DbfsClient struct { workspaceClient *databricks.WorkspaceClient + apiClient databricksClient + // File operations will be relative to this path. root WorkspaceRootPath } func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { + apiClient, err := client.New(w.Config) + if err != nil { + return nil, fmt.Errorf("failed to create API client: %w", err) + } + return &DbfsClient{ workspaceClient: w, + apiClient: apiClient, root: NewWorkspaceRootPath(root), }, nil } func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, overwrite bool, file *os.File) error { - apiClient, err := client.New(w.workspaceClient.Config) - if err != nil { - return fmt.Errorf("failed to create API client: %w", err) - } - overwriteField := "False" if overwrite { overwriteField = "True" @@ -97,7 +106,7 @@ func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, ove buf := &bytes.Buffer{} writer := multipart.NewWriter(buf) - err = writer.WriteField("path", path) + err := writer.WriteField("path", path) if err != nil { return err } @@ -122,7 +131,7 @@ func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, ove // Request bodies of Content-Type multipart/form-data must are not supported by // the Go SDK directly for DBFS. So we use the Do method directly. - return apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ + return w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ "Content-Type": writer.FormDataContentType(), }, buf.Bytes(), nil) } @@ -161,7 +170,7 @@ func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path strin // MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded // using the /dbfs/put API. If the file is larger than this limit, the streaming // API (/dbfs/create and /dbfs/add-block) will be used instead. -var MaxUploadLimitForPutApi int64 = 2 * 1024 * 1024 +var MaxDbfsUploadLimitForPutApi int64 = 2 * 1024 * 1024 func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) @@ -211,7 +220,7 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m } // If the source is a local file, but is too large then we'll use the streaming API endpoint. - if stat.Size() > MaxUploadLimitForPutApi { + if stat.Size() > MaxDbfsUploadLimitForPutApi { return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) }