From c4cea1aeff03f74ec83780a5bc8121646e130b0a Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 2 Dec 2024 23:47:31 +0100 Subject: [PATCH 01/28] Use the `/dbfs/put` API endpoint to upload smaller DBFS files --- libs/filer/dbfs_client.go | 114 +++++++++++++++++++++++++++++++------- 1 file changed, 95 insertions(+), 19 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 38e8f9f3f..efe51f07f 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -1,11 +1,15 @@ package filer import ( + "bytes" "context" "errors" + "fmt" "io" "io/fs" + "mime/multipart" "net/http" + "os" "path" "slices" "sort" @@ -14,6 +18,7 @@ import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/service/files" ) @@ -79,6 +84,80 @@ func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { }, nil } +func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, overwrite bool, file *os.File) error { + apiClient, err := client.New(w.workspaceClient.Config) + if err != nil { + return fmt.Errorf("failed to create API client: %w", err) + } + + buf := &bytes.Buffer{} + writer := multipart.NewWriter(buf) + err = writer.WriteField("path", path) + if err != nil { + return err + } + err = writer.WriteField("overwrite", "True") + if err != nil { + return err + } + contents, err := writer.CreateFormFile("contents", "") + if err != nil { + return err + } + + _, err = io.Copy(contents, file) + if err != nil { + return err + } + + err = writer.Close() + if err != nil { + return err + } + + // Request bodies of Content-Type multipart/form-data must are not supported by + // the Go SDK directly for DBFS. So we use the Do method directly. + return apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ + "Content-Type": writer.FormDataContentType(), + }, buf.Bytes(), nil) +} + +func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path string, overwrite bool, reader io.Reader) error { + fileMode := files.FileModeWrite + if overwrite { + fileMode |= files.FileModeOverwrite + } + + handle, err := w.workspaceClient.Dbfs.Open(ctx, path, fileMode) + if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 400 if the file already exists. + if aerr.StatusCode == http.StatusBadRequest { + if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { + return FileAlreadyExistsError{path} + } + } + + return err + } + + _, err = io.Copy(handle, reader) + cerr := handle.Close() + if err == nil { + err = cerr + } + return err +} + +// MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded +// using the /dbfs/put API. If the file is larger than this limit, the streaming +// API (/dbfs/create and /dbfs/add-block) will be used instead. +var MaxUploadLimitForPutApi int64 = 2 * 1024 * 1024 + func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) if err != nil { @@ -114,30 +193,27 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m } } - handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, fileMode) + localFile, ok := reader.(*os.File) + + // If the source is not a local file, we'll always use the streaming API endpoint. + if !ok { + return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), reader) + } + + stat, err := localFile.Stat() if err != nil { - var aerr *apierr.APIError - if !errors.As(err, &aerr) { - return err - } - - // This API returns a 400 if the file already exists. - if aerr.StatusCode == http.StatusBadRequest { - if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { - return FileAlreadyExistsError{absPath} - } - } - - return err + return fmt.Errorf("failed to stat file: %w", err) } - _, err = io.Copy(handle, reader) - cerr := handle.Close() - if err == nil { - err = cerr + // If the source is a local file, but is too large then we'll use the streaming API endpoint. + if stat.Size() > MaxUploadLimitForPutApi { + return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) } - return err + // Use the /dbfs/put API when the file is on the local filesystem + // and is small enough. This is the most common case when users use the + // `databricks fs cp` command. + return w.uploadUsingDbfsPutApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) } func (w *DbfsClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { From 91a2dfa0eda28d708a696e32cb0a375e51e47bd4 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 2 Dec 2024 23:49:32 +0100 Subject: [PATCH 02/28] - --- libs/filer/dbfs_client.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index efe51f07f..ce122bc03 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -90,13 +90,18 @@ func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, ove return fmt.Errorf("failed to create API client: %w", err) } + overwriteField := "False" + if overwrite { + overwriteField = "True" + } + buf := &bytes.Buffer{} writer := multipart.NewWriter(buf) err = writer.WriteField("path", path) if err != nil { return err } - err = writer.WriteField("overwrite", "True") + err = writer.WriteField("overwrite", overwriteField) if err != nil { return err } From 06af01c8f6246734f271b44be5ed722d534d2581 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 3 Dec 2024 00:05:04 +0100 Subject: [PATCH 03/28] refactor to make tests easier --- libs/filer/dbfs_client.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index ce122bc03..5e007674d 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -68,28 +68,37 @@ func (info dbfsFileInfo) Sys() any { return info.fi } +// Interface to allow mocking of the Databricks API client. +type databricksClient interface { + Do(ctx context.Context, method, path string, headers map[string]string, + requestBody any, responseBody any, visitors ...func(*http.Request) error) error +} + // DbfsClient implements the [Filer] interface for the DBFS backend. type DbfsClient struct { workspaceClient *databricks.WorkspaceClient + apiClient databricksClient + // File operations will be relative to this path. root WorkspaceRootPath } func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { + apiClient, err := client.New(w.Config) + if err != nil { + return nil, fmt.Errorf("failed to create API client: %w", err) + } + return &DbfsClient{ workspaceClient: w, + apiClient: apiClient, root: NewWorkspaceRootPath(root), }, nil } func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, overwrite bool, file *os.File) error { - apiClient, err := client.New(w.workspaceClient.Config) - if err != nil { - return fmt.Errorf("failed to create API client: %w", err) - } - overwriteField := "False" if overwrite { overwriteField = "True" @@ -97,7 +106,7 @@ func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, ove buf := &bytes.Buffer{} writer := multipart.NewWriter(buf) - err = writer.WriteField("path", path) + err := writer.WriteField("path", path) if err != nil { return err } @@ -122,7 +131,7 @@ func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, ove // Request bodies of Content-Type multipart/form-data must are not supported by // the Go SDK directly for DBFS. So we use the Do method directly. - return apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ + return w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ "Content-Type": writer.FormDataContentType(), }, buf.Bytes(), nil) } @@ -161,7 +170,7 @@ func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path strin // MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded // using the /dbfs/put API. If the file is larger than this limit, the streaming // API (/dbfs/create and /dbfs/add-block) will be used instead. -var MaxUploadLimitForPutApi int64 = 2 * 1024 * 1024 +var MaxDbfsUploadLimitForPutApi int64 = 2 * 1024 * 1024 func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) @@ -211,7 +220,7 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m } // If the source is a local file, but is too large then we'll use the streaming API endpoint. - if stat.Size() > MaxUploadLimitForPutApi { + if stat.Size() > MaxDbfsUploadLimitForPutApi { return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) } From 4b484fdcdc72d6623145a240a93fb0808d61385b Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 3 Dec 2024 00:07:14 +0100 Subject: [PATCH 04/28] todo --- libs/filer/dbfs_client.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 5e007674d..679e18798 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -167,6 +167,14 @@ func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path strin return err } +// TODO CONTINUE: +// 1. Write the unit tests that make sure the filer write method works correctly +// in either case. +// 2. Write a intergration test that asserts write continues works for big file +// uploads. Also test the overwrite flag in the integration test. +// We can change MaxDbfsUploadLimitForPutApi in the test to avoid creating +// massive test fixtures. + // MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded // using the /dbfs/put API. If the file is larger than this limit, the streaming // API (/dbfs/create and /dbfs/add-block) will be used instead. From 78b9788bc6e45753afbd2991caba31f67e193b67 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 30 Dec 2024 22:43:45 +0530 Subject: [PATCH 05/28] some cleanup --- libs/filer/dbfs_client.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 679e18798..ce4683f70 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -98,7 +98,7 @@ func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { }, nil } -func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, overwrite bool, file *os.File) error { +func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, file *os.File) error { overwriteField := "False" if overwrite { overwriteField = "True" @@ -136,7 +136,7 @@ func (w *DbfsClient) uploadUsingDbfsPutApi(ctx context.Context, path string, ove }, buf.Bytes(), nil) } -func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path string, overwrite bool, reader io.Reader) error { +func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool, reader io.Reader) error { fileMode := files.FileModeWrite if overwrite { fileMode |= files.FileModeOverwrite @@ -178,7 +178,7 @@ func (w *DbfsClient) uploadUsingDbfsStreamingApi(ctx context.Context, path strin // MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded // using the /dbfs/put API. If the file is larger than this limit, the streaming // API (/dbfs/create and /dbfs/add-block) will be used instead. -var MaxDbfsUploadLimitForPutApi int64 = 2 * 1024 * 1024 +var MaxDbfsPutFileSize int64 = 2 * 1024 * 1024 func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) @@ -186,11 +186,6 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m return err } - fileMode := files.FileModeWrite - if slices.Contains(mode, OverwriteIfExists) { - fileMode |= files.FileModeOverwrite - } - // Issue info call before write because it automatically creates parent directories. // // For discussion: we could decide this is actually convenient, remove the call below, @@ -219,7 +214,7 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m // If the source is not a local file, we'll always use the streaming API endpoint. if !ok { - return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), reader) + return w.streamFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), reader) } stat, err := localFile.Stat() @@ -228,14 +223,14 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m } // If the source is a local file, but is too large then we'll use the streaming API endpoint. - if stat.Size() > MaxDbfsUploadLimitForPutApi { - return w.uploadUsingDbfsStreamingApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) + if stat.Size() > MaxDbfsPutFileSize { + return w.streamFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) } // Use the /dbfs/put API when the file is on the local filesystem // and is small enough. This is the most common case when users use the // `databricks fs cp` command. - return w.uploadUsingDbfsPutApi(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) + return w.putFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) } func (w *DbfsClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { From 0ce50fadf02ba90abceb3c01f7f1fd5baef6b962 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 12:09:24 +0530 Subject: [PATCH 06/28] add unit test --- libs/filer/dbfs_client_test.go | 124 +++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 libs/filer/dbfs_client_test.go diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go new file mode 100644 index 000000000..2e8ad86d0 --- /dev/null +++ b/libs/filer/dbfs_client_test.go @@ -0,0 +1,124 @@ +package filer + +import ( + "context" + "io" + "net/http" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/databricks/cli/internal/testutil" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/files" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type mockDbfsApiClient struct { + t testutil.TestingT + isCalled bool +} + +func (m *mockDbfsApiClient) Do(ctx context.Context, method, path string, + headers map[string]string, request, response any, + visitors ...func(*http.Request) error, +) error { + m.isCalled = true + + require.Equal(m.t, "POST", method) + require.Equal(m.t, "/api/2.0/dbfs/put", path) + require.Contains(m.t, headers["Content-Type"], "multipart/form-data; boundary=") + require.Contains(m.t, string(request.([]byte)), "hello world") + return nil +} + +func TestDbfsClientForSmallFiles(t *testing.T) { + // write file to local disk + tmp := t.TempDir() + localPath := filepath.Join(tmp, "hello.txt") + err := os.WriteFile(localPath, []byte("hello world"), 0o644) + require.NoError(t, err) + + // setup DBFS client with mocks + m := mocks.NewMockWorkspaceClient(t) + mockApiClient := &mockDbfsApiClient{t: t} + dbfsClient := DbfsClient{ + apiClient: mockApiClient, + workspaceClient: m.WorkspaceClient, + root: NewWorkspaceRootPath("dbfs:/a/b/c"), + } + + m.GetMockDbfsAPI().EXPECT().GetStatusByPath(mock.Anything, "dbfs:/a/b/c").Return(nil, nil) + + // write file to DBFS + fd, err := os.Open(localPath) + require.NoError(t, err) + err = dbfsClient.Write(context.Background(), "hello.txt", fd) + require.NoError(t, err) + + // verify mock API client is called + require.True(t, mockApiClient.isCalled) +} + +type mockDbfsHandle struct { + builder strings.Builder +} + +func (h *mockDbfsHandle) Write(data []byte) (n int, err error) { return 0, nil } +func (h *mockDbfsHandle) Read(data []byte) (n int, err error) { return 0, nil } +func (h *mockDbfsHandle) Close() error { return nil } +func (h *mockDbfsHandle) WriteTo(w io.Writer) (n int64, err error) { return 0, nil } + +func (h *mockDbfsHandle) ReadFrom(r io.Reader) (n int64, err error) { + b, err := io.ReadAll(r) + if err != nil { + return 0, err + } + num, err := h.builder.Write(b) + return int64(num), err +} + +func TestDbfsClientForLargerFiles(t *testing.T) { + // write file to local disk + tmp := t.TempDir() + localPath := filepath.Join(tmp, "hello.txt") + err := os.WriteFile(localPath, []byte("hello world"), 0o644) + require.NoError(t, err) + + // Modify the max file size to 1 byte to simulate + // a large file that needs to be uploaded in chunks. + oldV := MaxDbfsPutFileSize + MaxDbfsPutFileSize = 1 + t.Cleanup(func() { + MaxDbfsPutFileSize = oldV + }) + + // setup DBFS client with mocks + m := mocks.NewMockWorkspaceClient(t) + mockApiClient := &mockDbfsApiClient{t: t} + dbfsClient := DbfsClient{ + apiClient: mockApiClient, + workspaceClient: m.WorkspaceClient, + root: NewWorkspaceRootPath("dbfs:/a/b/c"), + } + + h := &mockDbfsHandle{} + + m.GetMockDbfsAPI().EXPECT().GetStatusByPath(mock.Anything, "dbfs:/a/b/c").Return(nil, nil) + m.GetMockDbfsAPI().EXPECT().Open(mock.Anything, "dbfs:/a/b/c/hello.txt", files.FileModeWrite).Return(h, nil) + + // write file to DBFS + fd, err := os.Open(localPath) + require.NoError(t, err) + err = dbfsClient.Write(context.Background(), "hello.txt", fd) + require.NoError(t, err) + + // verify mock API client is NOT called + require.False(t, mockApiClient.isCalled) + + // verify the file content was written to the mock handle + assert.Equal(t, "hello world", h.builder.String()) +} From 63e599ccb26230dce714f39261ddae2ce0b21dfe Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 12:31:37 +0530 Subject: [PATCH 07/28] added integration test --- integration/libs/filer/filer_test.go | 26 +++++++++++++++++++++++ libs/filer/dbfs_client_test.go | 31 ++++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/integration/libs/filer/filer_test.go b/integration/libs/filer/filer_test.go index 766f9817b..6fbd6d6ef 100644 --- a/integration/libs/filer/filer_test.go +++ b/integration/libs/filer/filer_test.go @@ -7,7 +7,9 @@ import ( "errors" "io" "io/fs" + "os" "path" + "path/filepath" "regexp" "strings" "testing" @@ -895,3 +897,27 @@ func TestWorkspaceFilesExtensions_ExportFormatIsPreserved(t *testing.T) { }) } } + +func TestDbfsFilerForStreamingUploads(t *testing.T) { + ctx := context.Background() + f, _ := setupDbfsFiler(t) + + // Set MaxDbfsPutFileSize to 1 to force streaming uploads + prevV := filer.MaxDbfsPutFileSize + filer.MaxDbfsPutFileSize = 1 + t.Cleanup(func() { + filer.MaxDbfsPutFileSize = prevV + }) + + // Write a file to local disk. + tmpDir := t.TempDir() + err := os.WriteFile(filepath.Join(tmpDir, "foo.txt"), []byte("foobar"), 0o644) + require.NoError(t, err) + + // Write a file with streaming upload + err = f.Write(ctx, "foo.txt", strings.NewReader("foo")) + require.NoError(t, err) + + // Assert contents + filerTest{t, f}.assertContents(ctx, "foo.txt", "foo") +} diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go index 2e8ad86d0..d08c73cf7 100644 --- a/libs/filer/dbfs_client_test.go +++ b/libs/filer/dbfs_client_test.go @@ -67,7 +67,6 @@ type mockDbfsHandle struct { builder strings.Builder } -func (h *mockDbfsHandle) Write(data []byte) (n int, err error) { return 0, nil } func (h *mockDbfsHandle) Read(data []byte) (n int, err error) { return 0, nil } func (h *mockDbfsHandle) Close() error { return nil } func (h *mockDbfsHandle) WriteTo(w io.Writer) (n int64, err error) { return 0, nil } @@ -81,6 +80,10 @@ func (h *mockDbfsHandle) ReadFrom(r io.Reader) (n int64, err error) { return int64(num), err } +func (h *mockDbfsHandle) Write(data []byte) (n int, err error) { + return h.builder.Write(data) +} + func TestDbfsClientForLargerFiles(t *testing.T) { // write file to local disk tmp := t.TempDir() @@ -106,7 +109,6 @@ func TestDbfsClientForLargerFiles(t *testing.T) { } h := &mockDbfsHandle{} - m.GetMockDbfsAPI().EXPECT().GetStatusByPath(mock.Anything, "dbfs:/a/b/c").Return(nil, nil) m.GetMockDbfsAPI().EXPECT().Open(mock.Anything, "dbfs:/a/b/c/hello.txt", files.FileModeWrite).Return(h, nil) @@ -122,3 +124,28 @@ func TestDbfsClientForLargerFiles(t *testing.T) { // verify the file content was written to the mock handle assert.Equal(t, "hello world", h.builder.String()) } + +func TestDbfsClientForNonLocalFiles(t *testing.T) { + // setup DBFS client with mocks + m := mocks.NewMockWorkspaceClient(t) + mockApiClient := &mockDbfsApiClient{t: t} + dbfsClient := DbfsClient{ + apiClient: mockApiClient, + workspaceClient: m.WorkspaceClient, + root: NewWorkspaceRootPath("dbfs:/a/b/c"), + } + + h := &mockDbfsHandle{} + m.GetMockDbfsAPI().EXPECT().GetStatusByPath(mock.Anything, "dbfs:/a/b/c").Return(nil, nil) + m.GetMockDbfsAPI().EXPECT().Open(mock.Anything, "dbfs:/a/b/c/hello.txt", files.FileModeWrite).Return(h, nil) + + // write file to DBFS + err := dbfsClient.Write(context.Background(), "hello.txt", strings.NewReader("hello world")) + require.NoError(t, err) + + // verify mock API client is NOT called + require.False(t, mockApiClient.isCalled) + + // verify the file content was written to the mock handle + assert.Equal(t, "hello world", h.builder.String()) +} From 932aeee349e0b3c26d7ee77d7182d4099c15467b Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 12:46:48 +0530 Subject: [PATCH 08/28] ignore linter --- libs/filer/dbfs_client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index ce4683f70..7996bc89e 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -69,6 +69,8 @@ func (info dbfsFileInfo) Sys() any { } // Interface to allow mocking of the Databricks API client. +// +//nolint:gofumpt type databricksClient interface { Do(ctx context.Context, method, path string, headers map[string]string, requestBody any, responseBody any, visitors ...func(*http.Request) error) error From 9d8ba099bae7946d047b3906be8abe71a059efd3 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 12:47:47 +0530 Subject: [PATCH 09/28] fix fd lingering' --- libs/filer/dbfs_client_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go index d08c73cf7..a95a9e4bc 100644 --- a/libs/filer/dbfs_client_test.go +++ b/libs/filer/dbfs_client_test.go @@ -55,6 +55,7 @@ func TestDbfsClientForSmallFiles(t *testing.T) { // write file to DBFS fd, err := os.Open(localPath) + defer fd.Close() require.NoError(t, err) err = dbfsClient.Write(context.Background(), "hello.txt", fd) require.NoError(t, err) @@ -114,6 +115,7 @@ func TestDbfsClientForLargerFiles(t *testing.T) { // write file to DBFS fd, err := os.Open(localPath) + defer fd.Close() require.NoError(t, err) err = dbfsClient.Write(context.Background(), "hello.txt", fd) require.NoError(t, err) From 09bf4fa90c5236aa7e7819214b31418bc6873288 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 12:53:25 +0530 Subject: [PATCH 10/28] add unit test --- integration/libs/filer/filer_test.go | 29 ++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/integration/libs/filer/filer_test.go b/integration/libs/filer/filer_test.go index 6fbd6d6ef..3fe6f9147 100644 --- a/integration/libs/filer/filer_test.go +++ b/integration/libs/filer/filer_test.go @@ -914,10 +914,35 @@ func TestDbfsFilerForStreamingUploads(t *testing.T) { err := os.WriteFile(filepath.Join(tmpDir, "foo.txt"), []byte("foobar"), 0o644) require.NoError(t, err) + fd, err := os.Open(filepath.Join(tmpDir, "foo.txt")) + require.NoError(t, err) + defer fd.Close() + // Write a file with streaming upload - err = f.Write(ctx, "foo.txt", strings.NewReader("foo")) + err = f.Write(ctx, "foo.txt", fd) require.NoError(t, err) // Assert contents - filerTest{t, f}.assertContents(ctx, "foo.txt", "foo") + filerTest{t, f}.assertContents(ctx, "foo.txt", "foobar") +} + +func TestDbfsFilerForPutUploads(t *testing.T) { + ctx := context.Background() + f, _ := setupDbfsFiler(t) + + // Write a file to local disk. + tmpDir := t.TempDir() + err := os.WriteFile(filepath.Join(tmpDir, "foo.txt"), []byte("foobar"), 0o644) + require.NoError(t, err) + + fd, err := os.Open(filepath.Join(tmpDir, "foo.txt")) + require.NoError(t, err) + defer fd.Close() + + // Write a file with PUT upload + err = f.Write(ctx, "foo.txt", fd) + require.NoError(t, err) + + // Assert contents + filerTest{t, f}.assertContents(ctx, "foo.txt", "foobar") } From cf51636faab1191b4e26b83057bb0af575dc8598 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 13:28:36 +0530 Subject: [PATCH 11/28] overwrite fix --- integration/libs/filer/filer_test.go | 40 +++++++++++++++++++++++++--- libs/filer/dbfs_client.go | 9 +++++-- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/integration/libs/filer/filer_test.go b/integration/libs/filer/filer_test.go index 3fe6f9147..d484c3e62 100644 --- a/integration/libs/filer/filer_test.go +++ b/integration/libs/filer/filer_test.go @@ -924,6 +924,17 @@ func TestDbfsFilerForStreamingUploads(t *testing.T) { // Assert contents filerTest{t, f}.assertContents(ctx, "foo.txt", "foobar") + + // Overwrite the file with streaming upload, and fail + err = f.Write(ctx, "foo.txt", strings.NewReader("barfoo")) + require.ErrorIs(t, err, fs.ErrExist) + + // Overwrite the file with streaming upload, and succeed + err = f.Write(ctx, "foo.txt", strings.NewReader("barfoo"), filer.OverwriteIfExists) + require.NoError(t, err) + + // Assert contents + filerTest{t, f}.assertContents(ctx, "foo.txt", "barfoo") } func TestDbfsFilerForPutUploads(t *testing.T) { @@ -934,15 +945,36 @@ func TestDbfsFilerForPutUploads(t *testing.T) { tmpDir := t.TempDir() err := os.WriteFile(filepath.Join(tmpDir, "foo.txt"), []byte("foobar"), 0o644) require.NoError(t, err) - - fd, err := os.Open(filepath.Join(tmpDir, "foo.txt")) + err = os.WriteFile(filepath.Join(tmpDir, "bar.txt"), []byte("barfoo"), 0o644) require.NoError(t, err) - defer fd.Close() + + fdFoo, err := os.Open(filepath.Join(tmpDir, "foo.txt")) + require.NoError(t, err) + defer fdFoo.Close() + + fdBar, err := os.Open(filepath.Join(tmpDir, "bar.txt")) + require.NoError(t, err) + defer fdBar.Close() // Write a file with PUT upload - err = f.Write(ctx, "foo.txt", fd) + err = f.Write(ctx, "foo.txt", fdFoo) require.NoError(t, err) // Assert contents filerTest{t, f}.assertContents(ctx, "foo.txt", "foobar") + + // Try to overwrite the file, and fail. + err = f.Write(ctx, "foo.txt", fdBar) + require.ErrorIs(t, err, fs.ErrExist) + + // Reset the file descriptor. + _, err = fdBar.Seek(0, io.SeekStart) + require.NoError(t, err) + + // Overwrite the file with OverwriteIfExists flag + err = f.Write(ctx, "foo.txt", fdBar, filer.OverwriteIfExists) + require.NoError(t, err) + + // Assert contents + filerTest{t, f}.assertContents(ctx, "foo.txt", "barfoo") } diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 7996bc89e..eae04921c 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -131,11 +131,16 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f return err } - // Request bodies of Content-Type multipart/form-data must are not supported by + // Request bodies of Content-Type multipart/form-data are not supported by // the Go SDK directly for DBFS. So we use the Do method directly. - return w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ + err = w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ "Content-Type": writer.FormDataContentType(), }, buf.Bytes(), nil) + var aerr *apierr.APIError + if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { + return FileAlreadyExistsError{path} + } + return err } func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool, reader io.Reader) error { From be62ead7be7d9a4a5e27d40d70c184a81ab0a060 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 13:31:38 +0530 Subject: [PATCH 12/28] lint --- libs/filer/dbfs_client_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go index a95a9e4bc..7bf840958 100644 --- a/libs/filer/dbfs_client_test.go +++ b/libs/filer/dbfs_client_test.go @@ -55,8 +55,9 @@ func TestDbfsClientForSmallFiles(t *testing.T) { // write file to DBFS fd, err := os.Open(localPath) - defer fd.Close() require.NoError(t, err) + defer fd.Close() + err = dbfsClient.Write(context.Background(), "hello.txt", fd) require.NoError(t, err) @@ -115,8 +116,9 @@ func TestDbfsClientForLargerFiles(t *testing.T) { // write file to DBFS fd, err := os.Open(localPath) - defer fd.Close() require.NoError(t, err) + defer fd.Close() + err = dbfsClient.Write(context.Background(), "hello.txt", fd) require.NoError(t, err) From 7084392a0fd2fef882d7bcf9b1327b069f5fc771 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 13:40:10 +0530 Subject: [PATCH 13/28] cleanup code --- libs/filer/dbfs_client.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index eae04921c..fc8c134e2 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -174,14 +174,6 @@ func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool return err } -// TODO CONTINUE: -// 1. Write the unit tests that make sure the filer write method works correctly -// in either case. -// 2. Write a intergration test that asserts write continues works for big file -// uploads. Also test the overwrite flag in the integration test. -// We can change MaxDbfsUploadLimitForPutApi in the test to avoid creating -// massive test fixtures. - // MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded // using the /dbfs/put API. If the file is larger than this limit, the streaming // API (/dbfs/create and /dbfs/add-block) will be used instead. From ee8017357e55c434bc77c2f7fefda416c5a314c0 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 16:07:53 +0530 Subject: [PATCH 14/28] fix size --- libs/filer/dbfs_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index fc8c134e2..5fefd13c6 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -177,7 +177,7 @@ func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool // MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded // using the /dbfs/put API. If the file is larger than this limit, the streaming // API (/dbfs/create and /dbfs/add-block) will be used instead. -var MaxDbfsPutFileSize int64 = 2 * 1024 * 1024 +var MaxDbfsPutFileSize int64 = 2 * 1024 * 1024 * 1024 func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) From 69fdd9736b9ccb37ff8bc5a87643b0394b1c227c Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 17:10:19 +0530 Subject: [PATCH 15/28] reduce diff --- libs/filer/dbfs_client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 5fefd13c6..b09012ca9 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -143,6 +143,11 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f return err } +// MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded +// using the /dbfs/put API. If the file is larger than this limit, the streaming +// API (/dbfs/create and /dbfs/add-block) will be used instead. +var MaxDbfsPutFileSize int64 = 2 * 1024 * 1024 * 1024 + func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool, reader io.Reader) error { fileMode := files.FileModeWrite if overwrite { @@ -174,11 +179,6 @@ func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool return err } -// MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded -// using the /dbfs/put API. If the file is larger than this limit, the streaming -// API (/dbfs/create and /dbfs/add-block) will be used instead. -var MaxDbfsPutFileSize int64 = 2 * 1024 * 1024 * 1024 - func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) if err != nil { From 92e97ad4137ad735e162fd87be8962a0e7e37619 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 31 Dec 2024 17:11:39 +0530 Subject: [PATCH 16/28] - --- libs/filer/dbfs_client.go | 62 +++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index b09012ca9..251cde26a 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -148,37 +148,6 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f // API (/dbfs/create and /dbfs/add-block) will be used instead. var MaxDbfsPutFileSize int64 = 2 * 1024 * 1024 * 1024 -func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool, reader io.Reader) error { - fileMode := files.FileModeWrite - if overwrite { - fileMode |= files.FileModeOverwrite - } - - handle, err := w.workspaceClient.Dbfs.Open(ctx, path, fileMode) - if err != nil { - var aerr *apierr.APIError - if !errors.As(err, &aerr) { - return err - } - - // This API returns a 400 if the file already exists. - if aerr.StatusCode == http.StatusBadRequest { - if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { - return FileAlreadyExistsError{path} - } - } - - return err - } - - _, err = io.Copy(handle, reader) - cerr := handle.Close() - if err == nil { - err = cerr - } - return err -} - func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) if err != nil { @@ -232,6 +201,37 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m return w.putFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile) } +func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool, reader io.Reader) error { + fileMode := files.FileModeWrite + if overwrite { + fileMode |= files.FileModeOverwrite + } + + handle, err := w.workspaceClient.Dbfs.Open(ctx, path, fileMode) + if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 400 if the file already exists. + if aerr.StatusCode == http.StatusBadRequest { + if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { + return FileAlreadyExistsError{path} + } + } + + return err + } + + _, err = io.Copy(handle, reader) + cerr := handle.Close() + if err == nil { + err = cerr + } + return err +} + func (w *DbfsClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { absPath, err := w.root.Join(name) if err != nil { From 95f41b1f30e02c80d286876624275e55f8b134a3 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 11:59:26 +0530 Subject: [PATCH 17/28] add streaming uploads --- libs/filer/dbfs_client.go | 65 ++++++++++++++++++++-------------- libs/filer/dbfs_client_test.go | 4 ++- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 251cde26a..268b23d5a 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -1,7 +1,6 @@ package filer import ( - "bytes" "context" "errors" "fmt" @@ -106,36 +105,50 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f overwriteField = "True" } - buf := &bytes.Buffer{} - writer := multipart.NewWriter(buf) - err := writer.WriteField("path", path) - if err != nil { - return err - } - err = writer.WriteField("overwrite", overwriteField) - if err != nil { - return err - } - contents, err := writer.CreateFormFile("contents", "") - if err != nil { - return err - } + pr, pw := io.Pipe() + writer := multipart.NewWriter(pw) + go func() { + defer pw.Close() - _, err = io.Copy(contents, file) - if err != nil { - return err - } - - err = writer.Close() - if err != nil { - return err - } + err := writer.WriteField("path", path) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to write field path field in multipath form: %w", err)) + return + } + err = writer.WriteField("overwrite", overwriteField) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to write field overwrite field in multipath form: %w", err)) + return + } + contents, err := writer.CreateFormFile("contents", "") + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to write contents field in multipath form: %w", err)) + return + } + for { + // Copy the file in 10 MB chunks. This reduces the memory usage of the + // program when copying large files. + _, err := io.CopyN(contents, file, 10*1024*1024) + if err == io.EOF { + break + } + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to copy file in multipath form: %w", err)) + return + } + } + err = writer.Close() + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to close multipath form writer: %w", err)) + return + } + }() // Request bodies of Content-Type multipart/form-data are not supported by // the Go SDK directly for DBFS. So we use the Do method directly. - err = w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ + err := w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ "Content-Type": writer.FormDataContentType(), - }, buf.Bytes(), nil) + }, pr, nil) var aerr *apierr.APIError if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { return FileAlreadyExistsError{path} diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go index 7bf840958..a63ed856c 100644 --- a/libs/filer/dbfs_client_test.go +++ b/libs/filer/dbfs_client_test.go @@ -31,7 +31,9 @@ func (m *mockDbfsApiClient) Do(ctx context.Context, method, path string, require.Equal(m.t, "POST", method) require.Equal(m.t, "/api/2.0/dbfs/put", path) require.Contains(m.t, headers["Content-Type"], "multipart/form-data; boundary=") - require.Contains(m.t, string(request.([]byte)), "hello world") + contents, err := io.ReadAll(request.(io.Reader)) + require.NoError(m.t, err) + require.Contains(m.t, string(contents), "hello world") return nil } From 8ec1e0746d4dad9bb8ff56f2e0e8ef262bcd8d3b Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 12:11:38 +0530 Subject: [PATCH 18/28] Revert "add streaming uploads" This reverts commit 95f41b1f30e02c80d286876624275e55f8b134a3. --- libs/filer/dbfs_client.go | 65 ++++++++++++++-------------------- libs/filer/dbfs_client_test.go | 4 +-- 2 files changed, 27 insertions(+), 42 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 268b23d5a..251cde26a 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -1,6 +1,7 @@ package filer import ( + "bytes" "context" "errors" "fmt" @@ -105,50 +106,36 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f overwriteField = "True" } - pr, pw := io.Pipe() - writer := multipart.NewWriter(pw) - go func() { - defer pw.Close() + buf := &bytes.Buffer{} + writer := multipart.NewWriter(buf) + err := writer.WriteField("path", path) + if err != nil { + return err + } + err = writer.WriteField("overwrite", overwriteField) + if err != nil { + return err + } + contents, err := writer.CreateFormFile("contents", "") + if err != nil { + return err + } - err := writer.WriteField("path", path) - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to write field path field in multipath form: %w", err)) - return - } - err = writer.WriteField("overwrite", overwriteField) - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to write field overwrite field in multipath form: %w", err)) - return - } - contents, err := writer.CreateFormFile("contents", "") - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to write contents field in multipath form: %w", err)) - return - } - for { - // Copy the file in 10 MB chunks. This reduces the memory usage of the - // program when copying large files. - _, err := io.CopyN(contents, file, 10*1024*1024) - if err == io.EOF { - break - } - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to copy file in multipath form: %w", err)) - return - } - } - err = writer.Close() - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to close multipath form writer: %w", err)) - return - } - }() + _, err = io.Copy(contents, file) + if err != nil { + return err + } + + err = writer.Close() + if err != nil { + return err + } // Request bodies of Content-Type multipart/form-data are not supported by // the Go SDK directly for DBFS. So we use the Do method directly. - err := w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ + err = w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ "Content-Type": writer.FormDataContentType(), - }, pr, nil) + }, buf.Bytes(), nil) var aerr *apierr.APIError if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { return FileAlreadyExistsError{path} diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go index a63ed856c..7bf840958 100644 --- a/libs/filer/dbfs_client_test.go +++ b/libs/filer/dbfs_client_test.go @@ -31,9 +31,7 @@ func (m *mockDbfsApiClient) Do(ctx context.Context, method, path string, require.Equal(m.t, "POST", method) require.Equal(m.t, "/api/2.0/dbfs/put", path) require.Contains(m.t, headers["Content-Type"], "multipart/form-data; boundary=") - contents, err := io.ReadAll(request.(io.Reader)) - require.NoError(m.t, err) - require.Contains(m.t, string(contents), "hello world") + require.Contains(m.t, string(request.([]byte)), "hello world") return nil } From 890b48f70df7cfe38edd950feebb8551ade6a69c Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 16:55:56 +0530 Subject: [PATCH 19/28] Reapply "add streaming uploads" This reverts commit 8ec1e0746d4dad9bb8ff56f2e0e8ef262bcd8d3b. --- libs/filer/dbfs_client.go | 65 ++++++++++++++++++++-------------- libs/filer/dbfs_client_test.go | 4 ++- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 251cde26a..268b23d5a 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -1,7 +1,6 @@ package filer import ( - "bytes" "context" "errors" "fmt" @@ -106,36 +105,50 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f overwriteField = "True" } - buf := &bytes.Buffer{} - writer := multipart.NewWriter(buf) - err := writer.WriteField("path", path) - if err != nil { - return err - } - err = writer.WriteField("overwrite", overwriteField) - if err != nil { - return err - } - contents, err := writer.CreateFormFile("contents", "") - if err != nil { - return err - } + pr, pw := io.Pipe() + writer := multipart.NewWriter(pw) + go func() { + defer pw.Close() - _, err = io.Copy(contents, file) - if err != nil { - return err - } - - err = writer.Close() - if err != nil { - return err - } + err := writer.WriteField("path", path) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to write field path field in multipath form: %w", err)) + return + } + err = writer.WriteField("overwrite", overwriteField) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to write field overwrite field in multipath form: %w", err)) + return + } + contents, err := writer.CreateFormFile("contents", "") + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to write contents field in multipath form: %w", err)) + return + } + for { + // Copy the file in 10 MB chunks. This reduces the memory usage of the + // program when copying large files. + _, err := io.CopyN(contents, file, 10*1024*1024) + if err == io.EOF { + break + } + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to copy file in multipath form: %w", err)) + return + } + } + err = writer.Close() + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to close multipath form writer: %w", err)) + return + } + }() // Request bodies of Content-Type multipart/form-data are not supported by // the Go SDK directly for DBFS. So we use the Do method directly. - err = w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ + err := w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ "Content-Type": writer.FormDataContentType(), - }, buf.Bytes(), nil) + }, pr, nil) var aerr *apierr.APIError if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { return FileAlreadyExistsError{path} diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go index 7bf840958..a63ed856c 100644 --- a/libs/filer/dbfs_client_test.go +++ b/libs/filer/dbfs_client_test.go @@ -31,7 +31,9 @@ func (m *mockDbfsApiClient) Do(ctx context.Context, method, path string, require.Equal(m.t, "POST", method) require.Equal(m.t, "/api/2.0/dbfs/put", path) require.Contains(m.t, headers["Content-Type"], "multipart/form-data; boundary=") - require.Contains(m.t, string(request.([]byte)), "hello world") + contents, err := io.ReadAll(request.(io.Reader)) + require.NoError(m.t, err) + require.Contains(m.t, string(contents), "hello world") return nil } From f70c47253e24f2c094acd6f4a15e7266af93a4cb Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 17:20:22 +0530 Subject: [PATCH 20/28] calculate content length before upload --- libs/filer/dbfs_client.go | 51 +++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 268b23d5a..3f6d56b36 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -1,6 +1,7 @@ package filer import ( + "bytes" "context" "errors" "fmt" @@ -99,6 +100,36 @@ func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { }, nil } +// The PUT API for DBFS requires setting the content length header beforehand in the HTTP +// request. +func putContentLength(path string, overwriteField string, file *os.File) (int64, error) { + buf := &bytes.Buffer{} + writer := multipart.NewWriter(buf) + err := writer.WriteField("path", path) + if err != nil { + return 0, fmt.Errorf("failed to write field path field in multipart form: %w", err) + } + err = writer.WriteField("overwrite", overwriteField) + if err != nil { + return 0, fmt.Errorf("failed to write field overwrite field in multipart form: %w", err) + } + _, err = writer.CreateFormFile("contents", "") + if err != nil { + return 0, fmt.Errorf("failed to write contents field in multipart form: %w", err) + } + err = writer.Close() + if err != nil { + return 0, fmt.Errorf("failed to close multipart form writer: %w", err) + } + + stat, err := file.Stat() + if err != nil { + return 0, fmt.Errorf("failed to stat file %s: %w", path, err) + } + + return int64(buf.Len()) + stat.Size(), nil +} + func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, file *os.File) error { overwriteField := "False" if overwrite { @@ -112,17 +143,17 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f err := writer.WriteField("path", path) if err != nil { - pw.CloseWithError(fmt.Errorf("failed to write field path field in multipath form: %w", err)) + pw.CloseWithError(fmt.Errorf("failed to write field path field in multipart form: %w", err)) return } err = writer.WriteField("overwrite", overwriteField) if err != nil { - pw.CloseWithError(fmt.Errorf("failed to write field overwrite field in multipath form: %w", err)) + pw.CloseWithError(fmt.Errorf("failed to write field overwrite field in multipart form: %w", err)) return } contents, err := writer.CreateFormFile("contents", "") if err != nil { - pw.CloseWithError(fmt.Errorf("failed to write contents field in multipath form: %w", err)) + pw.CloseWithError(fmt.Errorf("failed to write contents field in multipart form: %w", err)) return } for { @@ -133,21 +164,27 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f break } if err != nil { - pw.CloseWithError(fmt.Errorf("failed to copy file in multipath form: %w", err)) + pw.CloseWithError(fmt.Errorf("failed to copy file in multipart form: %w", err)) return } } err = writer.Close() if err != nil { - pw.CloseWithError(fmt.Errorf("failed to close multipath form writer: %w", err)) + pw.CloseWithError(fmt.Errorf("failed to close multipart form writer: %w", err)) return } }() + cl, err := putContentLength(path, overwriteField, file) + if err != nil { + return fmt.Errorf("failed to calculate content length: %w", err) + } + // Request bodies of Content-Type multipart/form-data are not supported by // the Go SDK directly for DBFS. So we use the Do method directly. - err := w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ - "Content-Type": writer.FormDataContentType(), + err = w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ + "Content-Type": writer.FormDataContentType(), + "Content-Length": fmt.Sprintf("%d", cl), }, pr, nil) var aerr *apierr.APIError if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { From 6991dea00bb100d0adccae5ca11d6b86aae8040a Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 17:59:53 +0530 Subject: [PATCH 21/28] make content length work --- libs/filer/dbfs_client.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 3f6d56b36..2869c6f0a 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -130,6 +130,17 @@ func putContentLength(path string, overwriteField string, file *os.File) (int64, return int64(buf.Len()) + stat.Size(), nil } +func contentLengthVisitor(path string, overwriteField string, file *os.File) func(*http.Request) error { + return func(r *http.Request) error { + cl, err := putContentLength(path, overwriteField, file) + if err != nil { + return fmt.Errorf("failed to calculate content length: %w", err) + } + r.ContentLength = cl + return nil + } +} + func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, file *os.File) error { overwriteField := "False" if overwrite { @@ -175,17 +186,15 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f } }() - cl, err := putContentLength(path, overwriteField, file) - if err != nil { - return fmt.Errorf("failed to calculate content length: %w", err) - } - // Request bodies of Content-Type multipart/form-data are not supported by // the Go SDK directly for DBFS. So we use the Do method directly. - err = w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ - "Content-Type": writer.FormDataContentType(), - "Content-Length": fmt.Sprintf("%d", cl), - }, pr, nil) + err := w.apiClient.Do(ctx, + http.MethodPost, + "/api/2.0/dbfs/put", + map[string]string{"Content-Type": writer.FormDataContentType()}, + pr, + nil, + contentLengthVisitor(path, overwriteField, file)) var aerr *apierr.APIError if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { return FileAlreadyExistsError{path} From 583637aed6f82d9aaa50bc184e7cfe6a78c24bc1 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 18:08:14 +0530 Subject: [PATCH 22/28] lint --- libs/filer/dbfs_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 2869c6f0a..88e17b8ae 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -130,7 +130,7 @@ func putContentLength(path string, overwriteField string, file *os.File) (int64, return int64(buf.Len()) + stat.Size(), nil } -func contentLengthVisitor(path string, overwriteField string, file *os.File) func(*http.Request) error { +func contentLengthVisitor(path, overwriteField string, file *os.File) func(*http.Request) error { return func(r *http.Request) error { cl, err := putContentLength(path, overwriteField, file) if err != nil { From 9552131a2a2153dc43530f15ca003332dc351a43 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 18:11:35 +0530 Subject: [PATCH 23/28] lint --- libs/filer/dbfs_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 88e17b8ae..c90814aa0 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -102,7 +102,7 @@ func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { // The PUT API for DBFS requires setting the content length header beforehand in the HTTP // request. -func putContentLength(path string, overwriteField string, file *os.File) (int64, error) { +func putContentLength(path, overwriteField string, file *os.File) (int64, error) { buf := &bytes.Buffer{} writer := multipart.NewWriter(buf) err := writer.WriteField("path", path) From 7ab9fb7cec23dfbb76fc90367f908bd6c1fc8c4a Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 18:53:14 +0530 Subject: [PATCH 24/28] simplify copy --- libs/filer/dbfs_client.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index c90814aa0..6a7f90600 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -167,18 +167,7 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f pw.CloseWithError(fmt.Errorf("failed to write contents field in multipart form: %w", err)) return } - for { - // Copy the file in 10 MB chunks. This reduces the memory usage of the - // program when copying large files. - _, err := io.CopyN(contents, file, 10*1024*1024) - if err == io.EOF { - break - } - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to copy file in multipart form: %w", err)) - return - } - } + _, err = io.Copy(contents, file) err = writer.Close() if err != nil { pw.CloseWithError(fmt.Errorf("failed to close multipart form writer: %w", err)) From ac37ca0d9825297bf28644caab4206a32563cf84 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 18:54:35 +0530 Subject: [PATCH 25/28] - --- libs/filer/dbfs_client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 6a7f90600..0a799e7ad 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -168,6 +168,10 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f return } _, err = io.Copy(contents, file) + if err != nil { + pw.CloseWithError(fmt.Errorf("error while streaming file to dbfs: %w", err)) + return + } err = writer.Close() if err != nil { pw.CloseWithError(fmt.Errorf("failed to close multipart form writer: %w", err)) From f4623ebbb966aadac5f6b8c157e8e8cc17af7124 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 23:56:58 +0530 Subject: [PATCH 26/28] cleanup --- libs/filer/dbfs_client.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 0a799e7ad..6f3b2fec0 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -69,11 +69,9 @@ func (info dbfsFileInfo) Sys() any { } // Interface to allow mocking of the Databricks API client. -// -//nolint:gofumpt type databricksClient interface { Do(ctx context.Context, method, path string, headers map[string]string, - requestBody any, responseBody any, visitors ...func(*http.Request) error) error + requestBody, responseBody any, visitors ...func(*http.Request) error) error } // DbfsClient implements the [Filer] interface for the DBFS backend. @@ -102,7 +100,7 @@ func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { // The PUT API for DBFS requires setting the content length header beforehand in the HTTP // request. -func putContentLength(path, overwriteField string, file *os.File) (int64, error) { +func contentLength(path, overwriteField string, file *os.File) (int64, error) { buf := &bytes.Buffer{} writer := multipart.NewWriter(buf) err := writer.WriteField("path", path) @@ -132,7 +130,7 @@ func putContentLength(path, overwriteField string, file *os.File) (int64, error) func contentLengthVisitor(path, overwriteField string, file *os.File) func(*http.Request) error { return func(r *http.Request) error { - cl, err := putContentLength(path, overwriteField, file) + cl, err := contentLength(path, overwriteField, file) if err != nil { return fmt.Errorf("failed to calculate content length: %w", err) } From ee9499bc68ef5b9248ecedb3c1a07707ac292381 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 3 Jan 2025 00:01:55 +0530 Subject: [PATCH 27/28] use write testutil --- integration/libs/filer/filer_test.go | 10 +++------- libs/filer/dbfs_client_test.go | 3 +-- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/integration/libs/filer/filer_test.go b/integration/libs/filer/filer_test.go index 9e65af396..8bba493f4 100644 --- a/integration/libs/filer/filer_test.go +++ b/integration/libs/filer/filer_test.go @@ -909,8 +909,7 @@ func TestDbfsFilerForStreamingUploads(t *testing.T) { // Write a file to local disk. tmpDir := t.TempDir() - err := os.WriteFile(filepath.Join(tmpDir, "foo.txt"), []byte("foobar"), 0o644) - require.NoError(t, err) + testutil.WriteFile(t, filepath.Join(tmpDir, "foo.txt"), "foobar") fd, err := os.Open(filepath.Join(tmpDir, "foo.txt")) require.NoError(t, err) @@ -941,11 +940,8 @@ func TestDbfsFilerForPutUploads(t *testing.T) { // Write a file to local disk. tmpDir := t.TempDir() - err := os.WriteFile(filepath.Join(tmpDir, "foo.txt"), []byte("foobar"), 0o644) - require.NoError(t, err) - err = os.WriteFile(filepath.Join(tmpDir, "bar.txt"), []byte("barfoo"), 0o644) - require.NoError(t, err) - + testutil.WriteFile(t, filepath.Join(tmpDir, "foo.txt"), "foobar") + testutil.WriteFile(t, filepath.Join(tmpDir, "bar.txt"), "barfoo") fdFoo, err := os.Open(filepath.Join(tmpDir, "foo.txt")) require.NoError(t, err) defer fdFoo.Close() diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go index a63ed856c..506bff2dd 100644 --- a/libs/filer/dbfs_client_test.go +++ b/libs/filer/dbfs_client_test.go @@ -41,8 +41,7 @@ func TestDbfsClientForSmallFiles(t *testing.T) { // write file to local disk tmp := t.TempDir() localPath := filepath.Join(tmp, "hello.txt") - err := os.WriteFile(localPath, []byte("hello world"), 0o644) - require.NoError(t, err) + testutil.WriteFile(t, localPath, "hello world") // setup DBFS client with mocks m := mocks.NewMockWorkspaceClient(t) From e9b0afb337ec152a79235449a4baec9322b6c96c Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 3 Jan 2025 00:03:16 +0530 Subject: [PATCH 28/28] - --- libs/filer/dbfs_client_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go index 506bff2dd..df962e5a3 100644 --- a/libs/filer/dbfs_client_test.go +++ b/libs/filer/dbfs_client_test.go @@ -91,8 +91,7 @@ func TestDbfsClientForLargerFiles(t *testing.T) { // write file to local disk tmp := t.TempDir() localPath := filepath.Join(tmp, "hello.txt") - err := os.WriteFile(localPath, []byte("hello world"), 0o644) - require.NoError(t, err) + testutil.WriteFile(t, localPath, "hello world") // Modify the max file size to 1 byte to simulate // a large file that needs to be uploaded in chunks.