diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 679e18798..ce4683f70 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -98,7 +98,7 @@ func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { }, nil } -func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, overwrite bool, file *os.File) error { +func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, file *os.File) error { overwriteField := "False" if overwrite { overwriteField = "True" @@ -136,7 +136,7 @@ func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, ove }, buf.Bytes(), nil) } -func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path string, overwrite bool, reader io.Reader) error { +func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool, reader io.Reader) error { fileMode := files.FileModeWrite if overwrite { fileMode |= files.FileModeOverwrite @@ -178,7 +178,7 @@ func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path strin // MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded // using the /dbfs/put API. If the file is larger than this limit, the streaming // API (/dbfs/create and /dbfs/add-block) will be used instead. -var MaxDbfsUploadLimitForPutApi int64 = 2 * 1024 * 1024 +var MaxDbfsPutFileSize int64 = 2 * 1024 * 1024 func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) @@ -186,11 +186,6 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m return err } - fileMode := files.FileModeWrite - if slices.Contains(mode, OverwriteIfExists) { - fileMode |= files.FileModeOverwrite - } - // Issue info call before write because it automatically creates parent directories. // // For discussion: we could decide this is actually convenient, remove the call below, @@ -219,7 +214,7 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m // If the source is not a local file, we'll always use the streaming API endpoint. if !ok { - return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), reader) + return w.streamFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), reader) } stat, err := localFile.Stat() @@ -228,14 +223,14 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m } // If the source is a local file, but is too large then we'll use the streaming API endpoint. - if stat.Size() > MaxDbfsUploadLimitForPutApi { - return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) + if stat.Size() > MaxDbfsPutFileSize { + return w.streamFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) } // Use the /dbfs/put API when the file is on the local filesystem // and is small enough. This is the most common case when users use the // `databricks fs cp` command. - return w.uploadUsingDbfsPutApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) + return w.putFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) } func (w *DbfsClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {