From 8ec1e0746d4dad9bb8ff56f2e0e8ef262bcd8d3b Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 12:11:38 +0530 Subject: [PATCH] Revert "add streaming uploads" This reverts commit 95f41b1f30e02c80d286876624275e55f8b134a3. --- libs/filer/dbfs_client.go | 65 ++++++++++++++-------------------- libs/filer/dbfs_client_test.go | 4 +-- 2 files changed, 27 insertions(+), 42 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 268b23d5a..251cde26a 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -1,6 +1,7 @@ package filer import ( + "bytes" "context" "errors" "fmt" @@ -105,50 +106,36 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f overwriteField = "True" } - pr, pw := io.Pipe() - writer := multipart.NewWriter(pw) - go func() { - defer pw.Close() + buf := &bytes.Buffer{} + writer := multipart.NewWriter(buf) + err := writer.WriteField("path", path) + if err != nil { + return err + } + err = writer.WriteField("overwrite", overwriteField) + if err != nil { + return err + } + contents, err := writer.CreateFormFile("contents", "") + if err != nil { + return err + } - err := writer.WriteField("path", path) - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to write field path field in multipath form: %w", err)) - return - } - err = writer.WriteField("overwrite", overwriteField) - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to write field overwrite field in multipath form: %w", err)) - return - } - contents, err := writer.CreateFormFile("contents", "") - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to write contents field in multipath form: %w", err)) - return - } - for { - // Copy the file in 10 MB chunks. This reduces the memory usage of the - // program when copying large files. - _, err := io.CopyN(contents, file, 10*1024*1024) - if err == io.EOF { - break - } - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to copy file in multipath form: %w", err)) - return - } - } - err = writer.Close() - if err != nil { - pw.CloseWithError(fmt.Errorf("failed to close multipath form writer: %w", err)) - return - } - }() + _, err = io.Copy(contents, file) + if err != nil { + return err + } + + err = writer.Close() + if err != nil { + return err + } // Request bodies of Content-Type multipart/form-data are not supported by // the Go SDK directly for DBFS. So we use the Do method directly. - err := w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ + err = w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ "Content-Type": writer.FormDataContentType(), - }, pr, nil) + }, buf.Bytes(), nil) var aerr *apierr.APIError if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { return FileAlreadyExistsError{path} diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go index a63ed856c..7bf840958 100644 --- a/libs/filer/dbfs_client_test.go +++ b/libs/filer/dbfs_client_test.go @@ -31,9 +31,7 @@ func (m *mockDbfsApiClient) Do(ctx context.Context, method, path string, require.Equal(m.t, "POST", method) require.Equal(m.t, "/api/2.0/dbfs/put", path) require.Contains(m.t, headers["Content-Type"], "multipart/form-data; boundary=") - contents, err := io.ReadAll(request.(io.Reader)) - require.NoError(m.t, err) - require.Contains(m.t, string(contents), "hello world") + require.Contains(m.t, string(request.([]byte)), "hello world") return nil }