mirror of https://github.com/databricks/cli.git
133 lines
3.2 KiB
Go
133 lines
3.2 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io/fs"
|
|
|
|
"github.com/databricks/cli/libs/filer"
|
|
"github.com/databricks/cli/libs/log"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// Maximum number of concurrent requests during sync.
|
|
const MaxRequestsInFlight = 20
|
|
|
|
// Delete the specified path.
|
|
func (s *Sync) applyDelete(ctx context.Context, remoteName string) error {
|
|
s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0)
|
|
|
|
err := s.filer.Delete(ctx, remoteName)
|
|
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
|
return err
|
|
}
|
|
|
|
s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0)
|
|
return nil
|
|
}
|
|
|
|
// Remove the directory at the specified path.
|
|
func (s *Sync) applyRmdir(ctx context.Context, remoteName string) error {
|
|
s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0)
|
|
|
|
err := s.filer.Delete(ctx, remoteName)
|
|
if err != nil {
|
|
// Directory deletion is opportunistic, so we ignore errors.
|
|
log.Debugf(ctx, "error removing directory %s: %s", remoteName, err)
|
|
}
|
|
|
|
s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0)
|
|
return nil
|
|
}
|
|
|
|
// Create a directory at the specified path.
|
|
func (s *Sync) applyMkdir(ctx context.Context, localName string) error {
|
|
s.notifyProgress(ctx, EventActionPut, localName, 0.0)
|
|
|
|
err := s.filer.Mkdir(ctx, localName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.notifyProgress(ctx, EventActionPut, localName, 1.0)
|
|
return nil
|
|
}
|
|
|
|
// Perform a PUT of the specified local path.
|
|
func (s *Sync) applyPut(ctx context.Context, localName string) error {
|
|
s.notifyProgress(ctx, EventActionPut, localName, 0.0)
|
|
|
|
localFile, err := s.LocalPath.Open(localName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer localFile.Close()
|
|
|
|
opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists}
|
|
err = s.filer.Write(ctx, localName, localFile, opts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.notifyProgress(ctx, EventActionPut, localName, 1.0)
|
|
return nil
|
|
}
|
|
|
|
func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context.Context, string) error, path string) {
|
|
// Return early if the context has already been cancelled.
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
// Proceed.
|
|
}
|
|
|
|
group.Go(func() error {
|
|
return fn(ctx, path)
|
|
})
|
|
}
|
|
|
|
func groupRunParallel(ctx context.Context, paths []string, fn func(context.Context, string) error) error {
|
|
group, ctx := errgroup.WithContext(ctx)
|
|
group.SetLimit(MaxRequestsInFlight)
|
|
|
|
for _, path := range paths {
|
|
groupRunSingle(ctx, group, fn, path)
|
|
}
|
|
|
|
// Wait for goroutines to finish and return first non-nil error return if any.
|
|
return group.Wait()
|
|
}
|
|
|
|
func (s *Sync) applyDiff(ctx context.Context, d diff) error {
|
|
var err error
|
|
|
|
// Delete files in parallel.
|
|
err = groupRunParallel(ctx, d.delete, s.applyDelete)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete directories ordered by depth from leaf to root.
|
|
for _, group := range d.groupedRmdir() {
|
|
err = groupRunParallel(ctx, group, s.applyRmdir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Create directories (leafs only because intermediates are created automatically).
|
|
for _, group := range d.groupedMkdir() {
|
|
err = groupRunParallel(ctx, group, s.applyMkdir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Put files in parallel.
|
|
err = groupRunParallel(ctx, d.put, s.applyPut)
|
|
|
|
return err
|
|
}
|