databricks-cli/cmd/sync/watchdog.go

137 lines
3.2 KiB
Go
Raw Normal View History

2022-07-07 18:56:59 +00:00
package sync
import (
"context"
"fmt"
"io"
2022-07-07 18:56:59 +00:00
"log"
"os"
"path"
"path/filepath"
"strings"
2022-07-07 18:56:59 +00:00
"sync"
"time"
"github.com/databricks/bricks/git"
"github.com/databricks/bricks/project"
"github.com/databricks/databricks-sdk-go/databricks/client"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/databricks/databricks-sdk-go/workspaces"
2022-07-07 18:56:59 +00:00
)
type watchdog struct {
files git.FileSet
ticker *time.Ticker
wg sync.WaitGroup
failure error // data race? make channel?
}
func putFile(ctx context.Context, path string, content io.Reader) error {
wsc := project.Get(ctx).WorkspacesClient()
// workspace mkdirs is idempotent
err := wsc.Workspace.MkdirsByPath(ctx, filepath.Dir(path))
if err != nil {
return fmt.Errorf("could not mkdir to put file: %s", err)
}
apiClient := client.New(wsc.Config)
apiPath := fmt.Sprintf(
"/api/2.0/workspace-files/import-file/%s?overwrite=true",
strings.TrimLeft(path, "/"))
return apiClient.Post(ctx, apiPath, content, nil)
}
func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, wsc *workspaces.WorkspacesClient) func(localDiff diff) error {
return func(d diff) error {
for _, filePath := range d.delete {
err := wsc.Workspace.Delete(ctx,
workspace.DeleteRequest{
Path: path.Join(remoteDir, filePath),
Recursive: true,
},
)
if err != nil {
return err
}
log.Printf("[INFO] Deleted %s", filePath)
}
for _, filePath := range d.put {
f, err := os.Open(filepath.Join(root, filePath))
if err != nil {
return err
}
err = putFile(ctx, path.Join(remoteDir, filePath), f)
if err != nil {
return fmt.Errorf("failed to upload file: %s", err) // TODO: fmt.Errorf
}
err = f.Close()
if err != nil {
return err // TODO: fmt.Errorf
}
log.Printf("[INFO] Uploaded %s", filePath)
}
return nil
}
}
func spawnSyncRoutine(ctx context.Context,
files git.FileSet,
interval time.Duration,
applyDiff func(diff) error) error {
2022-07-07 18:56:59 +00:00
w := &watchdog{
files: files,
ticker: time.NewTicker(interval),
}
w.wg.Add(1)
go w.main(ctx, applyDiff)
2022-07-07 18:56:59 +00:00
w.wg.Wait()
return w.failure
}
// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement
// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error) {
2022-07-07 18:56:59 +00:00
defer w.wg.Done()
// load from json or sync it every time there's an action
state := snapshot{}
root := w.files.Root()
if *persistSnapshot {
err := state.loadSnapshot(root)
if err != nil {
log.Printf("[ERROR] cannot load snapshot: %s", err)
w.failure = err
return
}
}
2022-07-07 18:56:59 +00:00
for {
select {
case <-ctx.Done():
return
case <-w.ticker.C:
all, err := w.files.All()
if err != nil {
log.Printf("[ERROR] cannot list files: %s", err)
w.failure = err
return
}
change := state.diff(all)
if change.IsEmpty() {
continue
}
log.Printf("[INFO] Action: %v", change)
err = applyDiff(change)
if err != nil {
w.failure = err
return
}
if *persistSnapshot {
err = state.storeSnapshot(root)
if err != nil {
log.Printf("[ERROR] cannot store snapshot: %s", err)
w.failure = err
return
}
2022-07-07 18:56:59 +00:00
}
}
}
}