package sync import ( "context" "fmt" "time" "github.com/databricks/bricks/libs/git" "github.com/databricks/bricks/libs/log" "github.com/databricks/bricks/libs/sync/repofiles" "github.com/databricks/databricks-sdk-go" ) type SyncOptions struct { LocalPath string RemotePath string Full bool SnapshotBasePath string PollInterval time.Duration WorkspaceClient *databricks.WorkspaceClient Host string } type Sync struct { *SyncOptions fileSet *git.FileSet snapshot *Snapshot repoFiles *repofiles.RepoFiles // Synchronization progress events are sent to this event notifier. notifier EventNotifier seq int } // New initializes and returns a new [Sync] instance. func New(ctx context.Context, opts SyncOptions) (*Sync, error) { fileSet, err := git.NewFileSet(opts.LocalPath) if err != nil { return nil, err } err = fileSet.EnsureValidGitIgnoreExists() if err != nil { return nil, err } // Verify that the remote path we're about to synchronize to is valid and allowed. err = EnsureRemotePathIsUsable(ctx, opts.WorkspaceClient, opts.RemotePath) if err != nil { return nil, err } // TODO: The host may be late-initialized in certain Azure setups where we // specify the workspace by its resource ID. tracked in: https://databricks.atlassian.net/browse/DECO-194 opts.Host = opts.WorkspaceClient.Config.Host if opts.Host == "" { return nil, fmt.Errorf("failed to resolve host for snapshot") } // For full sync, we start with an empty snapshot. // For incremental sync, we try to load an existing snapshot to start from. var snapshot *Snapshot if opts.Full { snapshot, err = newSnapshot(ctx, &opts) if err != nil { return nil, fmt.Errorf("unable to instantiate new sync snapshot: %w", err) } } else { snapshot, err = loadOrNewSnapshot(ctx, &opts) if err != nil { return nil, fmt.Errorf("unable to load sync snapshot: %w", err) } } repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient) return &Sync{ SyncOptions: &opts, fileSet: fileSet, snapshot: snapshot, repoFiles: repoFiles, notifier: &NopNotifier{}, seq: 0, }, nil } func (s *Sync) Events() <-chan Event { ch := make(chan Event, MaxRequestsInFlight) s.notifier = &ChannelNotifier{ch} return ch } func (s *Sync) Close() { if s.notifier == nil { return } s.notifier.Close() s.notifier = nil } func (s *Sync) notifyStart(ctx context.Context, d diff) { // If this is not the initial iteration we can ignore no-ops. if s.seq > 0 && d.IsEmpty() { return } s.notifier.Notify(ctx, newEventStart(s.seq, d.put, d.delete)) } func (s *Sync) notifyProgress(ctx context.Context, action EventAction, path string, progress float32) { s.notifier.Notify(ctx, newEventProgress(s.seq, action, path, progress)) } func (s *Sync) notifyComplete(ctx context.Context, d diff) { // If this is not the initial iteration we can ignore no-ops. if s.seq > 0 && d.IsEmpty() { return } s.notifier.Notify(ctx, newEventComplete(s.seq, d.put, d.delete)) s.seq++ } func (s *Sync) RunOnce(ctx context.Context) error { // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 all, err := s.fileSet.All() if err != nil { log.Errorf(ctx, "cannot list files: %s", err) return err } change, err := s.snapshot.diff(all) if err != nil { return err } s.notifyStart(ctx, change) if change.IsEmpty() { s.notifyComplete(ctx, change) return nil } err = s.applyDiff(ctx, change) if err != nil { return err } err = s.snapshot.Save(ctx) if err != nil { log.Errorf(ctx, "cannot store snapshot: %s", err) return err } s.notifyComplete(ctx, change) return nil } func (s *Sync) DestroySnapshot(ctx context.Context) error { return s.snapshot.Destroy(ctx) } func (s *Sync) SnapshotPath() string { return s.snapshot.SnapshotPath } func (s *Sync) RunContinuous(ctx context.Context) error { ticker := time.NewTicker(s.PollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: err := s.RunOnce(ctx) if err != nil { return err } } } }