From 0b754e6de89d4a0b0137edf2f8d9fdd00eaa9ecb Mon Sep 17 00:00:00 2001 From: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com> Date: Wed, 5 Oct 2022 00:12:57 +0200 Subject: [PATCH] [DECO-94] Execute uploads in parallel instead of sequentailly (#81) Tested manually Upload seems fast enough, delete API calls though have a much longer turn around times Additional optimizations that can be done if/when the need arises: 1. First time upload can be done using zip batching of the files https://user-images.githubusercontent.com/88374338/192783332-9b2b19bc-d6c4-4a66-8dbc-e78287e6af1a.mov --- cmd/sync/watchdog.go | 80 ++++++++++++++++++++++++++++++-------------- go.mod | 2 ++ go.sum | 1 + 3 files changed, 58 insertions(+), 25 deletions(-) diff --git a/cmd/sync/watchdog.go b/cmd/sync/watchdog.go index 321bb322..c87cd660 100644 --- a/cmd/sync/watchdog.go +++ b/cmd/sync/watchdog.go @@ -17,6 +17,7 @@ import ( "github.com/databricks/databricks-sdk-go/databricks/client" "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/databricks/databricks-sdk-go/workspaces" + "golang.org/x/sync/errgroup" ) type watchdog struct { @@ -26,6 +27,10 @@ type watchdog struct { failure error // data race? make channel? } +// See https://docs.databricks.com/resources/limits.html#limits-api-rate-limits for per api +// rate limits +const MaxRequestsInFlight = 20 + func putFile(ctx context.Context, path string, content io.Reader) error { wsc := project.Get(ctx).WorkspacesClient() // workspace mkdirs is idempotent @@ -42,32 +47,57 @@ func putFile(ctx context.Context, path string, content io.Reader) error { func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, wsc *workspaces.WorkspacesClient) func(localDiff diff) error { return func(d diff) error { - for _, filePath := range d.delete { - err := wsc.Workspace.Delete(ctx, - workspace.Delete{ - Path: path.Join(remoteDir, filePath), - Recursive: true, - }, - ) - if err != nil { - return err - } - log.Printf("[INFO] Deleted %s", filePath) + + // Abstraction over wait groups which allows you to get the errors + // returned in goroutines + var g errgroup.Group + + // Allow MaxRequestLimit maxiumum concurrent api calls + g.SetLimit(MaxRequestsInFlight) + + for _, fileName := range d.delete { + // Copy of fileName created to make this safe for concurrent use. + // directly using fileName can cause race conditions since the loop + // might iterate over to the next fileName before the go routine function + // is evaluated + localFileName := fileName + g.Go(func() error { + err := wsc.Workspace.Delete(ctx, + workspace.Delete{ + Path: path.Join(remoteDir, localFileName), + Recursive: true, + }, + ) + if err != nil { + return err + } + log.Printf("[INFO] Deleted %s", localFileName) + return nil + }) } - for _, filePath := range d.put { - f, err := os.Open(filepath.Join(root, filePath)) - if err != nil { - return err - } - err = putFile(ctx, path.Join(remoteDir, filePath), f) - if err != nil { - return fmt.Errorf("failed to upload file: %s", err) // TODO: fmt.Errorf - } - err = f.Close() - if err != nil { - return err // TODO: fmt.Errorf - } - log.Printf("[INFO] Uploaded %s", filePath) + for _, fileName := range d.put { + localFileName := fileName + g.Go(func() error { + f, err := os.Open(filepath.Join(root, localFileName)) + if err != nil { + return err + } + err = putFile(ctx, path.Join(remoteDir, localFileName), f) + if err != nil { + return fmt.Errorf("failed to upload file: %s", err) + } + err = f.Close() + if err != nil { + return err + } + log.Printf("[INFO] Uploaded %s", localFileName) + return nil + }) + } + // wait for goroutines to finish and return first non-nil error return + // if any + if err := g.Wait(); err != nil { + return err } return nil } diff --git a/go.mod b/go.mod index 4b834a30..7dea5c30 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,8 @@ require ( gopkg.in/ini.v1 v1.67.0 // Apache 2.0 ) +require golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 + require ( cloud.google.com/go/compute v1.6.1 // indirect github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e // indirect diff --git a/go.sum b/go.sum index eb6b97cf..31e98125 100644 --- a/go.sum +++ b/go.sum @@ -354,6 +354,7 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=