some cleanup

This commit is contained in:
Shreyas Goenka 2024-12-30 22:43:45 +05:30
parent da46c142e0
commit 78b9788bc6
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
1 changed files with 7 additions and 12 deletions

View File

@ -98,7 +98,7 @@ func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
}, nil }, nil
} }
func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, overwrite bool, file *os.File) error { func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, file *os.File) error {
overwriteField := "False" overwriteField := "False"
if overwrite { if overwrite {
overwriteField = "True" overwriteField = "True"
@ -136,7 +136,7 @@ func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, ove
}, buf.Bytes(), nil) }, buf.Bytes(), nil)
} }
func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path string, overwrite bool, reader io.Reader) error { func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool, reader io.Reader) error {
fileMode := files.FileModeWrite fileMode := files.FileModeWrite
if overwrite { if overwrite {
fileMode |= files.FileModeOverwrite fileMode |= files.FileModeOverwrite
@ -178,7 +178,7 @@ func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path strin
// MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded // MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded
// using the /dbfs/put API. If the file is larger than this limit, the streaming // using the /dbfs/put API. If the file is larger than this limit, the streaming
// API (/dbfs/create and /dbfs/add-block) will be used instead. // API (/dbfs/create and /dbfs/add-block) will be used instead.
var MaxDbfsUploadLimitForPutApi int64 = 2 * 1024 * 1024 var MaxDbfsPutFileSize int64 = 2 * 1024 * 1024
func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
absPath, err := w.root.Join(name) absPath, err := w.root.Join(name)
@ -186,11 +186,6 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m
return err return err
} }
fileMode := files.FileModeWrite
if slices.Contains(mode, OverwriteIfExists) {
fileMode |= files.FileModeOverwrite
}
// Issue info call before write because it automatically creates parent directories. // Issue info call before write because it automatically creates parent directories.
// //
// For discussion: we could decide this is actually convenient, remove the call below, // For discussion: we could decide this is actually convenient, remove the call below,
@ -219,7 +214,7 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m
// If the source is not a local file, we'll always use the streaming API endpoint. // If the source is not a local file, we'll always use the streaming API endpoint.
if !ok { if !ok {
return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), reader) return w.streamFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), reader)
} }
stat, err := localFile.Stat() stat, err := localFile.Stat()
@ -228,14 +223,14 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m
} }
// If the source is a local file, but is too large then we'll use the streaming API endpoint. // If the source is a local file, but is too large then we'll use the streaming API endpoint.
if stat.Size() > MaxDbfsUploadLimitForPutApi { if stat.Size() > MaxDbfsPutFileSize {
return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) return w.streamFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile)
} }
// Use the /dbfs/put API when the file is on the local filesystem // Use the /dbfs/put API when the file is on the local filesystem
// and is small enough. This is the most common case when users use the // and is small enough. This is the most common case when users use the
// `databricks fs cp` command. // `databricks fs cp` command.
return w.uploadUsingDbfsPutApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) return w.putFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile)
} }
func (w *DbfsClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { func (w *DbfsClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {