Revert "add streaming uploads"

This reverts commit 95f41b1f30.
This commit is contained in:
Shreyas Goenka 2025-01-02 12:11:38 +05:30
parent 95f41b1f30
commit 8ec1e0746d
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
2 changed files with 27 additions and 42 deletions

View File

@ -1,6 +1,7 @@
package filer package filer
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -105,50 +106,36 @@ func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, f
overwriteField = "True" overwriteField = "True"
} }
pr, pw := io.Pipe() buf := &bytes.Buffer{}
writer := multipart.NewWriter(pw) writer := multipart.NewWriter(buf)
go func() {
defer pw.Close()
err := writer.WriteField("path", path) err := writer.WriteField("path", path)
if err != nil { if err != nil {
pw.CloseWithError(fmt.Errorf("failed to write field path field in multipath form: %w", err)) return err
return
} }
err = writer.WriteField("overwrite", overwriteField) err = writer.WriteField("overwrite", overwriteField)
if err != nil { if err != nil {
pw.CloseWithError(fmt.Errorf("failed to write field overwrite field in multipath form: %w", err)) return err
return
} }
contents, err := writer.CreateFormFile("contents", "") contents, err := writer.CreateFormFile("contents", "")
if err != nil { if err != nil {
pw.CloseWithError(fmt.Errorf("failed to write contents field in multipath form: %w", err)) return err
return
}
for {
// Copy the file in 10 MB chunks. This reduces the memory usage of the
// program when copying large files.
_, err := io.CopyN(contents, file, 10*1024*1024)
if err == io.EOF {
break
} }
_, err = io.Copy(contents, file)
if err != nil { if err != nil {
pw.CloseWithError(fmt.Errorf("failed to copy file in multipath form: %w", err)) return err
return
}
} }
err = writer.Close() err = writer.Close()
if err != nil { if err != nil {
pw.CloseWithError(fmt.Errorf("failed to close multipath form writer: %w", err)) return err
return
} }
}()
// Request bodies of Content-Type multipart/form-data are not supported by // Request bodies of Content-Type multipart/form-data are not supported by
// the Go SDK directly for DBFS. So we use the Do method directly. // the Go SDK directly for DBFS. So we use the Do method directly.
err := w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{ err = w.apiClient.Do(ctx, http.MethodPost, "/api/2.0/dbfs/put", map[string]string{
"Content-Type": writer.FormDataContentType(), "Content-Type": writer.FormDataContentType(),
}, pr, nil) }, buf.Bytes(), nil)
var aerr *apierr.APIError var aerr *apierr.APIError
if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" {
return FileAlreadyExistsError{path} return FileAlreadyExistsError{path}

View File

@ -31,9 +31,7 @@ func (m *mockDbfsApiClient) Do(ctx context.Context, method, path string,
require.Equal(m.t, "POST", method) require.Equal(m.t, "POST", method)
require.Equal(m.t, "/api/2.0/dbfs/put", path) require.Equal(m.t, "/api/2.0/dbfs/put", path)
require.Contains(m.t, headers["Content-Type"], "multipart/form-data; boundary=") require.Contains(m.t, headers["Content-Type"], "multipart/form-data; boundary=")
contents, err := io.ReadAll(request.(io.Reader)) require.Contains(m.t, string(request.([]byte)), "hello world")
require.NoError(m.t, err)
require.Contains(m.t, string(contents), "hello world")
return nil return nil
} }