databricks-cli/libs/sync/watchdog.go

133 lines
3.2 KiB
Go

package sync
import (
"context"
"errors"
"io/fs"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
"golang.org/x/sync/errgroup"
)
// Maximum number of concurrent requests during sync.
const MaxRequestsInFlight = 20
// Delete the specified path.
func (s *Sync) applyDelete(ctx context.Context, remoteName string) error {
s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0)
err := s.filer.Delete(ctx, remoteName)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return err
}
s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0)
return nil
}
// Remove the directory at the specified path.
func (s *Sync) applyRmdir(ctx context.Context, remoteName string) error {
s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0)
err := s.filer.Delete(ctx, remoteName)
if err != nil {
// Directory deletion is opportunistic, so we ignore errors.
log.Debugf(ctx, "error removing directory %s: %s", remoteName, err)
}
s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0)
return nil
}
// Create a directory at the specified path.
func (s *Sync) applyMkdir(ctx context.Context, localName string) error {
s.notifyProgress(ctx, EventActionPut, localName, 0.0)
err := s.filer.Mkdir(ctx, localName)
if err != nil {
return err
}
s.notifyProgress(ctx, EventActionPut, localName, 1.0)
return nil
}
// Perform a PUT of the specified local path.
func (s *Sync) applyPut(ctx context.Context, localName string) error {
s.notifyProgress(ctx, EventActionPut, localName, 0.0)
localFile, err := s.LocalRoot.Open(localName)
if err != nil {
return err
}
defer localFile.Close()
opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists}
err = s.filer.Write(ctx, localName, localFile, opts...)
if err != nil {
return err
}
s.notifyProgress(ctx, EventActionPut, localName, 1.0)
return nil
}
func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context.Context, string) error, path string) {
// Return early if the context has already been cancelled.
select {
case <-ctx.Done():
return
default:
// Proceed.
}
group.Go(func() error {
return fn(ctx, path)
})
}
func groupRunParallel(ctx context.Context, paths []string, fn func(context.Context, string) error) error {
group, ctx := errgroup.WithContext(ctx)
group.SetLimit(MaxRequestsInFlight)
for _, path := range paths {
groupRunSingle(ctx, group, fn, path)
}
// Wait for goroutines to finish and return first non-nil error return if any.
return group.Wait()
}
func (s *Sync) applyDiff(ctx context.Context, d diff) error {
var err error
// Delete files in parallel.
err = groupRunParallel(ctx, d.delete, s.applyDelete)
if err != nil {
return err
}
// Delete directories ordered by depth from leaf to root.
for _, group := range d.groupedRmdir() {
err = groupRunParallel(ctx, group, s.applyRmdir)
if err != nil {
return err
}
}
// Create directories (leafs only because intermediates are created automatically).
for _, group := range d.groupedMkdir() {
err = groupRunParallel(ctx, group, s.applyMkdir)
if err != nil {
return err
}
}
// Put files in parallel.
err = groupRunParallel(ctx, d.put, s.applyPut)
return err
}