package sync import ( "context" "errors" "io/fs" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" "golang.org/x/sync/errgroup" ) // Maximum number of concurrent requests during sync. const MaxRequestsInFlight = 20 // Delete the specified path. func (s *Sync) applyDelete(ctx context.Context, remoteName string) error { s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) err := s.filer.Delete(ctx, remoteName) if err != nil && !errors.Is(err, fs.ErrNotExist) { return err } s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) return nil } // Remove the directory at the specified path. func (s *Sync) applyRmdir(ctx context.Context, remoteName string) error { s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) err := s.filer.Delete(ctx, remoteName) if err != nil { // Directory deletion is opportunistic, so we ignore errors. log.Debugf(ctx, "error removing directory %s: %s", remoteName, err) } s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) return nil } // Create a directory at the specified path. func (s *Sync) applyMkdir(ctx context.Context, localName string) error { s.notifyProgress(ctx, EventActionPut, localName, 0.0) err := s.filer.Mkdir(ctx, localName) if err != nil { return err } s.notifyProgress(ctx, EventActionPut, localName, 1.0) return nil } // Perform a PUT of the specified local path. func (s *Sync) applyPut(ctx context.Context, localName string) error { s.notifyProgress(ctx, EventActionPut, localName, 0.0) localFile, err := s.LocalPath.Open(localName) if err != nil { return err } defer localFile.Close() opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} err = s.filer.Write(ctx, localName, localFile, opts...) if err != nil { return err } s.notifyProgress(ctx, EventActionPut, localName, 1.0) return nil } func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context.Context, string) error, path string) { // Return early if the context has already been cancelled. select { case <-ctx.Done(): return default: // Proceed. } group.Go(func() error { return fn(ctx, path) }) } func groupRunParallel(ctx context.Context, paths []string, fn func(context.Context, string) error) error { group, ctx := errgroup.WithContext(ctx) group.SetLimit(MaxRequestsInFlight) for _, path := range paths { groupRunSingle(ctx, group, fn, path) } // Wait for goroutines to finish and return first non-nil error return if any. return group.Wait() } func (s *Sync) applyDiff(ctx context.Context, d diff) error { var err error // Delete files in parallel. err = groupRunParallel(ctx, d.delete, s.applyDelete) if err != nil { return err } // Delete directories ordered by depth from leaf to root. for _, group := range d.groupedRmdir() { err = groupRunParallel(ctx, group, s.applyRmdir) if err != nil { return err } } // Create directories (leafs only because intermediates are created automatically). for _, group := range d.groupedMkdir() { err = groupRunParallel(ctx, group, s.applyMkdir) if err != nil { return err } } // Put files in parallel. err = groupRunParallel(ctx, d.put, s.applyPut) return err }