Let sync return early if an error occurs (#235)

The previous approach would proceed to execute all requests prior to
returning the first error. This is solved with `errgroup.WithContext`
that cancels the context if a routine returns an error.
This commit is contained in:
Pieter Noordhuis 2023-03-09 13:29:05 +01:00 committed by GitHub
parent 7ade32c734
commit fe738ede6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 46 deletions

View File

@ -118,8 +118,6 @@ func (s *Sync) notifyComplete(ctx context.Context, d diff) {
}
func (s *Sync) RunOnce(ctx context.Context) error {
applyDiff := syncCallback(ctx, s)
// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement
// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
all, err := s.fileSet.All()
@ -139,7 +137,7 @@ func (s *Sync) RunOnce(ctx context.Context) error {
return nil
}
err = applyDiff(change)
err = s.applyDiff(ctx, change)
if err != nil {
return err
}

View File

@ -6,53 +6,63 @@ import (
"golang.org/x/sync/errgroup"
)
// See https://docs.databricks.com/resources/limits.html#limits-api-rate-limits for per api
// rate limits
// Maximum number of concurrent requests during sync.
const MaxRequestsInFlight = 20
func syncCallback(ctx context.Context, s *Sync) func(localDiff diff) error {
return func(d diff) error {
// Abstraction over wait groups which allows you to get the errors
// returned in goroutines
var g errgroup.Group
// Perform a DELETE of the specified remote path.
func (s *Sync) applyDelete(ctx context.Context, group *errgroup.Group, remoteName string) {
// Return early if the context has already been cancelled.
select {
case <-ctx.Done():
return
default:
// Proceed.
}
// Allow MaxRequestLimit maxiumum concurrent api calls
g.SetLimit(MaxRequestsInFlight)
for _, remoteName := range d.delete {
// Copy of remoteName created to make this safe for concurrent use.
// directly using remoteName can cause race conditions since the loop
// might iterate over to the next remoteName before the go routine function
// is evaluated
remoteNameCopy := remoteName
g.Go(func() error {
s.notifyProgress(ctx, EventActionDelete, remoteNameCopy, 0.0)
err := s.repoFiles.DeleteFile(ctx, remoteNameCopy)
if err != nil {
return err
}
s.notifyProgress(ctx, EventActionDelete, remoteNameCopy, 1.0)
return nil
})
}
for _, localRelativePath := range d.put {
// Copy of localName created to make this safe for concurrent use.
localRelativePathCopy := localRelativePath
g.Go(func() error {
s.notifyProgress(ctx, EventActionPut, localRelativePathCopy, 0.0)
err := s.repoFiles.PutFile(ctx, localRelativePathCopy)
if err != nil {
return err
}
s.notifyProgress(ctx, EventActionPut, localRelativePathCopy, 1.0)
return nil
})
}
// wait for goroutines to finish and return first non-nil error return
// if any
if err := g.Wait(); err != nil {
group.Go(func() error {
s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0)
err := s.repoFiles.DeleteFile(ctx, remoteName)
if err != nil {
return err
}
s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0)
return nil
}
})
}
// Perform a PUT of the specified local path.
func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName string) {
// Return early if the context has already been cancelled.
select {
case <-ctx.Done():
return
default:
// Proceed.
}
group.Go(func() error {
s.notifyProgress(ctx, EventActionPut, localName, 0.0)
err := s.repoFiles.PutFile(ctx, localName)
if err != nil {
return err
}
s.notifyProgress(ctx, EventActionPut, localName, 1.0)
return nil
})
}
func (s *Sync) applyDiff(ctx context.Context, d diff) error {
group, ctx := errgroup.WithContext(ctx)
group.SetLimit(MaxRequestsInFlight)
for _, remoteName := range d.delete {
s.applyDelete(ctx, group, remoteName)
}
for _, localName := range d.put {
s.applyPut(ctx, group, localName)
}
// Wait for goroutines to finish and return first non-nil error return if any.
return group.Wait()
}