mirror of https://github.com/databricks/cli.git
265 lines
6.2 KiB
Go
265 lines
6.2 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
stdsync "sync"
|
|
"time"
|
|
|
|
"github.com/databricks/cli/libs/filer"
|
|
"github.com/databricks/cli/libs/fileset"
|
|
"github.com/databricks/cli/libs/git"
|
|
"github.com/databricks/cli/libs/log"
|
|
"github.com/databricks/cli/libs/set"
|
|
"github.com/databricks/cli/libs/vfs"
|
|
"github.com/databricks/databricks-sdk-go"
|
|
"github.com/databricks/databricks-sdk-go/service/iam"
|
|
)
|
|
|
|
type OutputHandler func(context.Context, <-chan Event)
|
|
|
|
type SyncOptions struct {
|
|
WorktreeRoot vfs.Path
|
|
LocalRoot vfs.Path
|
|
Paths []string
|
|
Include []string
|
|
Exclude []string
|
|
|
|
RemotePath string
|
|
|
|
Full bool
|
|
|
|
SnapshotBasePath string
|
|
|
|
PollInterval time.Duration
|
|
|
|
WorkspaceClient *databricks.WorkspaceClient
|
|
|
|
CurrentUser *iam.User
|
|
|
|
Host string
|
|
|
|
OutputHandler OutputHandler
|
|
}
|
|
|
|
type Sync struct {
|
|
*SyncOptions
|
|
|
|
fileSet *git.FileSet
|
|
includeFileSet *fileset.FileSet
|
|
excludeFileSet *fileset.FileSet
|
|
|
|
snapshot *Snapshot
|
|
filer filer.Filer
|
|
|
|
// Synchronization progress events are sent to this event notifier.
|
|
notifier EventNotifier
|
|
seq int
|
|
|
|
// WaitGroup is automatically created when an output handler is provided in the SyncOptions.
|
|
// Close call is required to ensure the output handler goroutine handles all events in time.
|
|
outputWaitGroup *stdsync.WaitGroup
|
|
}
|
|
|
|
// New initializes and returns a new [Sync] instance.
|
|
func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
|
|
fileSet, err := git.NewFileSet(opts.WorktreeRoot, opts.LocalRoot, opts.Paths)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = fileSet.EnsureValidGitIgnoreExists()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
includeFileSet, err := fileset.NewGlobSet(opts.LocalRoot, opts.Include)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
excludeFileSet, err := fileset.NewGlobSet(opts.LocalRoot, opts.Exclude)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Verify that the remote path we're about to synchronize to is valid and allowed.
|
|
err = EnsureRemotePathIsUsable(ctx, opts.WorkspaceClient, opts.RemotePath, opts.CurrentUser)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// TODO: The host may be late-initialized in certain Azure setups where we
|
|
// specify the workspace by its resource ID. tracked in: https://databricks.atlassian.net/browse/DECO-194
|
|
opts.Host = opts.WorkspaceClient.Config.Host
|
|
if opts.Host == "" {
|
|
return nil, errors.New("failed to resolve host for snapshot")
|
|
}
|
|
|
|
// For full sync, we start with an empty snapshot.
|
|
// For incremental sync, we try to load an existing snapshot to start from.
|
|
var snapshot *Snapshot
|
|
if opts.Full {
|
|
snapshot, err = newSnapshot(ctx, &opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to instantiate new sync snapshot: %w", err)
|
|
}
|
|
} else {
|
|
snapshot, err = loadOrNewSnapshot(ctx, &opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to load sync snapshot: %w", err)
|
|
}
|
|
}
|
|
|
|
filer, err := filer.NewWorkspaceFilesClient(opts.WorkspaceClient, opts.RemotePath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var notifier EventNotifier
|
|
outputWaitGroup := &stdsync.WaitGroup{}
|
|
if opts.OutputHandler != nil {
|
|
ch := make(chan Event, MaxRequestsInFlight)
|
|
notifier = &ChannelNotifier{ch}
|
|
outputWaitGroup.Add(1)
|
|
go func() {
|
|
defer outputWaitGroup.Done()
|
|
opts.OutputHandler(ctx, ch)
|
|
}()
|
|
} else {
|
|
notifier = &NopNotifier{}
|
|
}
|
|
|
|
return &Sync{
|
|
SyncOptions: &opts,
|
|
|
|
fileSet: fileSet,
|
|
includeFileSet: includeFileSet,
|
|
excludeFileSet: excludeFileSet,
|
|
snapshot: snapshot,
|
|
filer: filer,
|
|
notifier: notifier,
|
|
outputWaitGroup: outputWaitGroup,
|
|
seq: 0,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Sync) Close() {
|
|
if s.notifier == nil {
|
|
return
|
|
}
|
|
s.notifier.Close()
|
|
s.notifier = nil
|
|
s.outputWaitGroup.Wait()
|
|
}
|
|
|
|
func (s *Sync) notifyStart(ctx context.Context, d diff) {
|
|
// If this is not the initial iteration we can ignore no-ops.
|
|
if s.seq > 0 && d.IsEmpty() {
|
|
return
|
|
}
|
|
s.notifier.Notify(ctx, newEventStart(s.seq, d.put, d.delete))
|
|
}
|
|
|
|
func (s *Sync) notifyProgress(ctx context.Context, action EventAction, path string, progress float32) {
|
|
s.notifier.Notify(ctx, newEventProgress(s.seq, action, path, progress))
|
|
}
|
|
|
|
func (s *Sync) notifyComplete(ctx context.Context, d diff) {
|
|
// If this is not the initial iteration we can ignore no-ops.
|
|
if s.seq > 0 && d.IsEmpty() {
|
|
return
|
|
}
|
|
s.notifier.Notify(ctx, newEventComplete(s.seq, d.put, d.delete))
|
|
s.seq++
|
|
}
|
|
|
|
// Upload all files in the file tree rooted at the local path configured in the
|
|
// SyncOptions to the remote path configured in the SyncOptions.
|
|
//
|
|
// Returns the list of files tracked (and synchronized) by the syncer during the run,
|
|
// and an error if any occurred.
|
|
func (s *Sync) RunOnce(ctx context.Context) ([]fileset.File, error) {
|
|
files, err := s.GetFileList(ctx)
|
|
if err != nil {
|
|
return files, err
|
|
}
|
|
|
|
change, err := s.snapshot.diff(ctx, files)
|
|
if err != nil {
|
|
return files, err
|
|
}
|
|
|
|
s.notifyStart(ctx, change)
|
|
if change.IsEmpty() {
|
|
s.notifyComplete(ctx, change)
|
|
return files, nil
|
|
}
|
|
|
|
err = s.applyDiff(ctx, change)
|
|
if err != nil {
|
|
return files, err
|
|
}
|
|
|
|
err = s.snapshot.Save(ctx)
|
|
if err != nil {
|
|
log.Errorf(ctx, "cannot store snapshot: %s", err)
|
|
return files, err
|
|
}
|
|
|
|
s.notifyComplete(ctx, change)
|
|
return files, nil
|
|
}
|
|
|
|
func (s *Sync) GetFileList(ctx context.Context) ([]fileset.File, error) {
|
|
// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement
|
|
// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
|
|
all := set.NewSetF(func(f fileset.File) string {
|
|
return f.Relative
|
|
})
|
|
gitFiles, err := s.fileSet.Files()
|
|
if err != nil {
|
|
log.Errorf(ctx, "cannot list files: %s", err)
|
|
return nil, err
|
|
}
|
|
all.Add(gitFiles...)
|
|
|
|
include, err := s.includeFileSet.Files()
|
|
if err != nil {
|
|
log.Errorf(ctx, "cannot list include files: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
all.Add(include...)
|
|
|
|
exclude, err := s.excludeFileSet.Files()
|
|
if err != nil {
|
|
log.Errorf(ctx, "cannot list exclude files: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
for _, f := range exclude {
|
|
all.Remove(f)
|
|
}
|
|
|
|
return all.Iter(), nil
|
|
}
|
|
|
|
func (s *Sync) RunContinuous(ctx context.Context) error {
|
|
ticker := time.NewTicker(s.PollInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-ticker.C:
|
|
_, err := s.RunOnce(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
}
|