From 890b48f70df7cfe38edd950feebb8551ade6a69c Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 2 Jan 2025 16:55:56 +0530 Subject: [PATCH] Reapply "add streaming uploads" This reverts commit 8ec1e0746d4dad9bb8ff56f2e0e8ef262bcd8d3b. --- libs/filer/dbfs_client.go | 65 ++++++++++++++++++++-------------- libs/filer/dbfs_client_test.go | 4 ++- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 251cde26a..268b23d5a 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -1,7 +1,6 @@ package filer import ( - "bytes" "context" "errors" "fmt" @@ -106,36 +105,50 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f overwriteField = "True" } - buf := &bytes.Buffer{} - writer := multipart.NewWriter(buf) - err := writer.WriteField("path", path) - if err != nil { - return err - } - err = writer.WriteField("overwrite", overwriteField) - if err != nil { - return err - } - contents, err := writer.CreateFormFile("contents", "") - if err != nil { - return err - } + pr, pw := io.Pipe() + writer := multipart.NewWriter(pw) + go func() { + defer pw.Close() - _, err = io.Copy(contents, file) - if err != nil { - return err - } - - err = writer.Close() - if err != nil { - return err - } + err := writer.WriteField("path", path) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to write field path field in multipath form: %w", err)) + return + } + err = writer.WriteField("overwrite", overwriteField) + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to write field overwrite field in multipath form: %w", err)) + return + } + contents, err := writer.CreateFormFile("contents", "") + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to write contents field in multipath form: %w", err)) + return + } + for { + // Copy the file in 10 MB chunks. This reduces the memory usage of the + // program when copying large files. + _, err := io.CopyN(contents, file, 10*1024*1024) + if err == io.EOF { + break + } + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to copy file in multipath form: %w", err)) + return + } + } + err = writer.Close() + if err != nil { + pw.CloseWithError(fmt.Errorf("failed to close multipath form writer: %w", err)) + return + } + }() // Request bodies of Content-Type multipart/form-data are not supported by // the Go SDK directly for DBFS. So we use the Do method directly. - err = w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ + err := w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ "Content-Type": writer.FormDataContentType(), - }, buf.Bytes(), nil) + }, pr, nil) var aerr *apierr.APIError if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { return FileAlreadyExistsError{path} diff --git a/libs/filer/dbfs_client_test.go b/libs/filer/dbfs_client_test.go index 7bf840958..a63ed856c 100644 --- a/libs/filer/dbfs_client_test.go +++ b/libs/filer/dbfs_client_test.go @@ -31,7 +31,9 @@ func (m *mockDbfsApiClient) Do(ctx context.Context, method, path string, require.Equal(m.t, "POST", method) require.Equal(m.t, "/api/2.0/dbfs/put", path) require.Contains(m.t, headers["Content-Type"], "multipart/form-data; boundary=") - require.Contains(m.t, string(request.([]byte)), "hello world") + contents, err := io.ReadAll(request.(io.Reader)) + require.NoError(m.t, err) + require.Contains(m.t, string(contents), "hello world") return nil }