databricks-cli/libs/sync/watchdog.go

145 lines
3.5 KiB
Go
Raw Normal View History

2022-07-07 18:56:59 +00:00
package sync
import (
"context"
"log"
"sync"
"time"
"github.com/databricks/bricks/libs/sync/repofiles"
"golang.org/x/sync/errgroup"
2022-07-07 18:56:59 +00:00
)
// TODO: add .databricks to .gitignore on bricks init
2022-07-07 18:56:59 +00:00
type watchdog struct {
ticker *time.Ticker
wg sync.WaitGroup
failure error // data race? make channel?
sync *Sync
2022-07-07 18:56:59 +00:00
}
// See https://docs.databricks.com/resources/limits.html#limits-api-rate-limits for per api
// rate limits
const MaxRequestsInFlight = 20
func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(localDiff diff) error {
return func(d diff) error {
// Abstraction over wait groups which allows you to get the errors
// returned in goroutines
var g errgroup.Group
// Allow MaxRequestLimit maxiumum concurrent api calls
g.SetLimit(MaxRequestsInFlight)
for _, remoteName := range d.delete {
// Copy of remoteName created to make this safe for concurrent use.
// directly using remoteName can cause race conditions since the loop
// might iterate over to the next remoteName before the go routine function
// is evaluated
remoteNameCopy := remoteName
g.Go(func() error {
err := repoFiles.DeleteFile(ctx, remoteNameCopy)
if err != nil {
return err
}
log.Printf("[INFO] Deleted %s", remoteNameCopy)
return nil
})
}
for _, localRelativePath := range d.put {
// Copy of localName created to make this safe for concurrent use.
localRelativePathCopy := localRelativePath
g.Go(func() error {
err := repoFiles.PutFile(ctx, localRelativePathCopy)
if err != nil {
return err
}
log.Printf("[INFO] Uploaded %s", localRelativePathCopy)
return nil
})
}
// wait for goroutines to finish and return first non-nil error return
// if any
if err := g.Wait(); err != nil {
return err
}
return nil
}
}
func spawnWatchdog(ctx context.Context,
applyDiff func(diff) error,
sync *Sync) error {
2022-07-07 18:56:59 +00:00
w := &watchdog{
ticker: time.NewTicker(sync.PollInterval),
sync: sync,
2022-07-07 18:56:59 +00:00
}
w.wg.Add(1)
go w.main(ctx, applyDiff, sync.RemotePath)
2022-07-07 18:56:59 +00:00
w.wg.Wait()
return w.failure
}
// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement
// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error, remotePath string) {
2022-07-07 18:56:59 +00:00
defer w.wg.Done()
snapshot, err := newSnapshot(w.sync.SyncOptions)
if err != nil {
log.Printf("[ERROR] cannot create snapshot: %s", err)
w.failure = err
return
}
if w.sync.PersistSnapshot {
snapshot, err = loadOrNewSnapshot(w.sync.SyncOptions)
if err != nil {
log.Printf("[ERROR] cannot load snapshot: %s", err)
w.failure = err
return
}
}
var onlyOnceInitLog sync.Once
2022-07-07 18:56:59 +00:00
for {
select {
case <-ctx.Done():
return
case <-w.ticker.C:
all, err := w.sync.fileSet.All()
2022-07-07 18:56:59 +00:00
if err != nil {
log.Printf("[ERROR] cannot list files: %s", err)
w.failure = err
return
}
change, err := snapshot.diff(all)
if err != nil {
w.failure = err
return
}
2022-07-07 18:56:59 +00:00
if change.IsEmpty() {
onlyOnceInitLog.Do(func() {
log.Printf("[INFO] Initial Sync Complete")
})
2022-07-07 18:56:59 +00:00
continue
}
log.Printf("[INFO] Action: %v", change)
err = applyDiff(change)
if err != nil {
w.failure = err
return
}
if w.sync.PersistSnapshot {
err = snapshot.Save(ctx)
if err != nil {
log.Printf("[ERROR] cannot store snapshot: %s", err)
w.failure = err
return
}
2022-07-07 18:56:59 +00:00
}
onlyOnceInitLog.Do(func() {
log.Printf("[INFO] Initial Sync Complete")
})
2022-07-07 18:56:59 +00:00
}
}
}