From fe738ede6a80ea89c855f575fdeb4679494a24a0 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 9 Mar 2023 13:29:05 +0100 Subject: [PATCH] Let sync return early if an error occurs (#235) The previous approach would proceed to execute all requests prior to returning the first error. This is solved with `errgroup.WithContext` that cancels the context if a routine returns an error. --- libs/sync/sync.go | 4 +- libs/sync/watchdog.go | 96 ++++++++++++++++++++++++------------------- 2 files changed, 54 insertions(+), 46 deletions(-) diff --git a/libs/sync/sync.go b/libs/sync/sync.go index b288685b..452cebad 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -118,8 +118,6 @@ func (s *Sync) notifyComplete(ctx context.Context, d diff) { } func (s *Sync) RunOnce(ctx context.Context) error { - applyDiff := syncCallback(ctx, s) - // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 all, err := s.fileSet.All() @@ -139,7 +137,7 @@ func (s *Sync) RunOnce(ctx context.Context) error { return nil } - err = applyDiff(change) + err = s.applyDiff(ctx, change) if err != nil { return err } diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 257b5442..3e7acccc 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -6,53 +6,63 @@ import ( "golang.org/x/sync/errgroup" ) -// See https://docs.databricks.com/resources/limits.html#limits-api-rate-limits for per api -// rate limits +// Maximum number of concurrent requests during sync. const MaxRequestsInFlight = 20 -func syncCallback(ctx context.Context, s *Sync) func(localDiff diff) error { - return func(d diff) error { - // Abstraction over wait groups which allows you to get the errors - // returned in goroutines - var g errgroup.Group +// Perform a DELETE of the specified remote path. +func (s *Sync) applyDelete(ctx context.Context, group *errgroup.Group, remoteName string) { + // Return early if the context has already been cancelled. + select { + case <-ctx.Done(): + return + default: + // Proceed. + } - // Allow MaxRequestLimit maxiumum concurrent api calls - g.SetLimit(MaxRequestsInFlight) - - for _, remoteName := range d.delete { - // Copy of remoteName created to make this safe for concurrent use. - // directly using remoteName can cause race conditions since the loop - // might iterate over to the next remoteName before the go routine function - // is evaluated - remoteNameCopy := remoteName - g.Go(func() error { - s.notifyProgress(ctx, EventActionDelete, remoteNameCopy, 0.0) - err := s.repoFiles.DeleteFile(ctx, remoteNameCopy) - if err != nil { - return err - } - s.notifyProgress(ctx, EventActionDelete, remoteNameCopy, 1.0) - return nil - }) - } - for _, localRelativePath := range d.put { - // Copy of localName created to make this safe for concurrent use. - localRelativePathCopy := localRelativePath - g.Go(func() error { - s.notifyProgress(ctx, EventActionPut, localRelativePathCopy, 0.0) - err := s.repoFiles.PutFile(ctx, localRelativePathCopy) - if err != nil { - return err - } - s.notifyProgress(ctx, EventActionPut, localRelativePathCopy, 1.0) - return nil - }) - } - // wait for goroutines to finish and return first non-nil error return - // if any - if err := g.Wait(); err != nil { + group.Go(func() error { + s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) + err := s.repoFiles.DeleteFile(ctx, remoteName) + if err != nil { return err } + s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) return nil - } + }) +} + +// Perform a PUT of the specified local path. +func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName string) { + // Return early if the context has already been cancelled. + select { + case <-ctx.Done(): + return + default: + // Proceed. + } + + group.Go(func() error { + s.notifyProgress(ctx, EventActionPut, localName, 0.0) + err := s.repoFiles.PutFile(ctx, localName) + if err != nil { + return err + } + s.notifyProgress(ctx, EventActionPut, localName, 1.0) + return nil + }) +} + +func (s *Sync) applyDiff(ctx context.Context, d diff) error { + group, ctx := errgroup.WithContext(ctx) + group.SetLimit(MaxRequestsInFlight) + + for _, remoteName := range d.delete { + s.applyDelete(ctx, group, remoteName) + } + + for _, localName := range d.put { + s.applyPut(ctx, group, localName) + } + + // Wait for goroutines to finish and return first non-nil error return if any. + return group.Wait() }