[DECO-94] Execute uploads in parallel instead of sequentailly (#81)

Tested manually

Upload seems fast enough, delete API calls though have a much longer
turn around times

Additional optimizations that can be done if/when the need arises:
1. First time upload can be done using zip batching of the files


https://user-images.githubusercontent.com/88374338/192783332-9b2b19bc-d6c4-4a66-8dbc-e78287e6af1a.mov
This commit is contained in:
shreyas-goenka 2022-10-05 00:12:57 +02:00 committed by GitHub
parent a1b6fdb2e8
commit 0b754e6de8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 25 deletions

View File

@ -17,6 +17,7 @@ import (
"github.com/databricks/databricks-sdk-go/databricks/client" "github.com/databricks/databricks-sdk-go/databricks/client"
"github.com/databricks/databricks-sdk-go/service/workspace" "github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/databricks/databricks-sdk-go/workspaces" "github.com/databricks/databricks-sdk-go/workspaces"
"golang.org/x/sync/errgroup"
) )
type watchdog struct { type watchdog struct {
@ -26,6 +27,10 @@ type watchdog struct {
failure error // data race? make channel? failure error // data race? make channel?
} }
// See https://docs.databricks.com/resources/limits.html#limits-api-rate-limits for per api
// rate limits
const MaxRequestsInFlight = 20
func putFile(ctx context.Context, path string, content io.Reader) error { func putFile(ctx context.Context, path string, content io.Reader) error {
wsc := project.Get(ctx).WorkspacesClient() wsc := project.Get(ctx).WorkspacesClient()
// workspace mkdirs is idempotent // workspace mkdirs is idempotent
@ -42,32 +47,57 @@ func putFile(ctx context.Context, path string, content io.Reader) error {
func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, wsc *workspaces.WorkspacesClient) func(localDiff diff) error { func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, wsc *workspaces.WorkspacesClient) func(localDiff diff) error {
return func(d diff) error { return func(d diff) error {
for _, filePath := range d.delete {
err := wsc.Workspace.Delete(ctx, // Abstraction over wait groups which allows you to get the errors
workspace.Delete{ // returned in goroutines
Path: path.Join(remoteDir, filePath), var g errgroup.Group
Recursive: true,
}, // Allow MaxRequestLimit maxiumum concurrent api calls
) g.SetLimit(MaxRequestsInFlight)
if err != nil {
return err for _, fileName := range d.delete {
} // Copy of fileName created to make this safe for concurrent use.
log.Printf("[INFO] Deleted %s", filePath) // directly using fileName can cause race conditions since the loop
// might iterate over to the next fileName before the go routine function
// is evaluated
localFileName := fileName
g.Go(func() error {
err := wsc.Workspace.Delete(ctx,
workspace.Delete{
Path: path.Join(remoteDir, localFileName),
Recursive: true,
},
)
if err != nil {
return err
}
log.Printf("[INFO] Deleted %s", localFileName)
return nil
})
} }
for _, filePath := range d.put { for _, fileName := range d.put {
f, err := os.Open(filepath.Join(root, filePath)) localFileName := fileName
if err != nil { g.Go(func() error {
return err f, err := os.Open(filepath.Join(root, localFileName))
} if err != nil {
err = putFile(ctx, path.Join(remoteDir, filePath), f) return err
if err != nil { }
return fmt.Errorf("failed to upload file: %s", err) // TODO: fmt.Errorf err = putFile(ctx, path.Join(remoteDir, localFileName), f)
} if err != nil {
err = f.Close() return fmt.Errorf("failed to upload file: %s", err)
if err != nil { }
return err // TODO: fmt.Errorf err = f.Close()
} if err != nil {
log.Printf("[INFO] Uploaded %s", filePath) return err
}
log.Printf("[INFO] Uploaded %s", localFileName)
return nil
})
}
// wait for goroutines to finish and return first non-nil error return
// if any
if err := g.Wait(); err != nil {
return err
} }
return nil return nil
} }

2
go.mod
View File

@ -17,6 +17,8 @@ require (
gopkg.in/ini.v1 v1.67.0 // Apache 2.0 gopkg.in/ini.v1 v1.67.0 // Apache 2.0
) )
require golang.org/x/sync v0.0.0-20220513210516-0976fa681c29
require ( require (
cloud.google.com/go/compute v1.6.1 // indirect cloud.google.com/go/compute v1.6.1 // indirect
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect

1
go.sum
View File

@ -354,6 +354,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4=
golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=