From e872b587cc5bc6157488ab5351645889761ca031 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 8 Mar 2023 10:27:19 +0100 Subject: [PATCH] Add optional JSON output for sync command (#230) JSON output makes it easy to process synchronization progress information in downstream tools (e.g. the vscode extension). This changes introduces a `sync.Event` interface type for progress events as well as an `sync.EventNotifier` that lets the sync code pass along progress events to calling code. Example output in text mode (default, this uses the existing logger calls): ```text 2023/03/03 14:07:17 [INFO] Remote file sync location: /Repos/pieter.noordhuis@databricks.com/... 2023/03/03 14:07:18 [INFO] Initial Sync Complete 2023/03/03 14:07:22 [INFO] Action: PUT: foo 2023/03/03 14:07:23 [INFO] Uploaded foo 2023/03/03 14:07:23 [INFO] Complete 2023/03/03 14:07:25 [INFO] Action: DELETE: foo 2023/03/03 14:07:25 [INFO] Deleted foo 2023/03/03 14:07:25 [INFO] Complete ``` Example output in JSON mode: ```json {"timestamp":"2023-03-03T14:08:15.459439+01:00","seq":0,"type":"start"} {"timestamp":"2023-03-03T14:08:15.459461+01:00","seq":0,"type":"complete"} {"timestamp":"2023-03-03T14:08:18.459821+01:00","seq":1,"type":"start","put":["foo"]} {"timestamp":"2023-03-03T14:08:18.459867+01:00","seq":1,"type":"progress","action":"put","path":"foo","progress":0} {"timestamp":"2023-03-03T14:08:19.418696+01:00","seq":1,"type":"progress","action":"put","path":"foo","progress":1} {"timestamp":"2023-03-03T14:08:19.421397+01:00","seq":1,"type":"complete","put":["foo"]} {"timestamp":"2023-03-03T14:08:22.459238+01:00","seq":2,"type":"start","delete":["foo"]} {"timestamp":"2023-03-03T14:08:22.459268+01:00","seq":2,"type":"progress","action":"delete","path":"foo","progress":0} {"timestamp":"2023-03-03T14:08:22.686413+01:00","seq":2,"type":"progress","action":"delete","path":"foo","progress":1} {"timestamp":"2023-03-03T14:08:22.688989+01:00","seq":2,"type":"complete","delete":["foo"]} ``` --------- Co-authored-by: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com> --- cmd/sync/output.go | 50 ++++++++++++ cmd/sync/sync.go | 11 ++- docs/sync.md | 40 ++++++++++ libs/flags/output.go | 39 +++++++++ libs/flags/output_test.go | 37 +++++++++ libs/sync/diff.go | 19 ----- libs/sync/event.go | 164 ++++++++++++++++++++++++++++++++++++++ libs/sync/event_test.go | 130 ++++++++++++++++++++++++++++++ libs/sync/sync.go | 48 ++++++++--- libs/sync/watchdog.go | 14 ++-- 10 files changed, 514 insertions(+), 38 deletions(-) create mode 100644 cmd/sync/output.go create mode 100644 docs/sync.md create mode 100644 libs/flags/output.go create mode 100644 libs/flags/output_test.go create mode 100644 libs/sync/event.go create mode 100644 libs/sync/event_test.go diff --git a/cmd/sync/output.go b/cmd/sync/output.go new file mode 100644 index 00000000..d730301e --- /dev/null +++ b/cmd/sync/output.go @@ -0,0 +1,50 @@ +package sync + +import ( + "context" + "encoding/json" + "io" + "log" + + "github.com/databricks/bricks/libs/sync" +) + +// Read synchronization events and write them as JSON to the specified writer (typically stdout). +func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { + enc := json.NewEncoder(w) + for { + select { + case <-ctx.Done(): + return + case e, ok := <-ch: + if !ok { + return + } + err := enc.Encode(e) + // These are plain structs so this must always work. + // Panic on error so that a violation of this assumption does not go undetected. + if err != nil { + panic(err) + } + } + } +} + +// Read synchronization events and log them at the INFO level. +func logOutput(ctx context.Context, ch <-chan sync.Event) { + for { + select { + case <-ctx.Done(): + return + case e, ok := <-ch: + if !ok { + return + } + // Log only if something actually happened. + // Sync events produce an empty string if nothing happened. + if str := e.String(); str != "" { + log.Printf("[INFO] %s", e.String()) + } + } + } +} diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index a5ff20e0..0c56f5c5 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -3,12 +3,12 @@ package sync import ( "flag" "fmt" - "log" "path/filepath" "time" "github.com/databricks/bricks/bundle" "github.com/databricks/bricks/cmd/root" + "github.com/databricks/bricks/libs/flags" "github.com/databricks/bricks/libs/sync" "github.com/databricks/databricks-sdk-go" "github.com/spf13/cobra" @@ -95,7 +95,12 @@ var syncCmd = &cobra.Command{ return err } - log.Printf("[INFO] Remote file sync location: %v", opts.RemotePath) + switch output { + case flags.OutputText: + go logOutput(ctx, s.Events()) + case flags.OutputJSON: + go jsonOutput(ctx, s.Events(), cmd.OutOrStdout()) + } if watch { return s.RunContinuous(ctx) @@ -136,10 +141,12 @@ var syncCmd = &cobra.Command{ var interval time.Duration var full bool var watch bool +var output flags.Output = flags.OutputText func init() { root.RootCmd.AddCommand(syncCmd) syncCmd.Flags().DurationVar(&interval, "interval", 1*time.Second, "file system polling interval (for --watch)") syncCmd.Flags().BoolVar(&full, "full", false, "perform full synchronization (default is incremental)") syncCmd.Flags().BoolVar(&watch, "watch", false, "watch local file system for changes") + syncCmd.Flags().Var(&output, "output", "type of output format") } diff --git a/docs/sync.md b/docs/sync.md new file mode 100644 index 00000000..d1ecb5b4 --- /dev/null +++ b/docs/sync.md @@ -0,0 +1,40 @@ +# sync + +The sync command synchronizes a local directory tree to a Databricks workspace path. +The destination can be a repository (under `/Repos/`) or a workspace path (under `/Users/`). + +By default it performs incremental synchronization where only changes since the last synchronization are applied. + +Synchronization is **unidirectional**; changes to remote files are overwritten on a new invocation of the command. + +Beware: +* Sync will not remove pre-existing remote files that do not exist in the local directory tree. +* Sync will overwrite pre-existing remote files if they exist in the local directory tree. + +## Incremental synchronization + +The sync command stores a synchronization snapshot file in the local directory tree under a `.databricks` directory. +This snapshot file contains state to compute which changes to the local directory tree have happened since the last synchronization. + +To opt out of incremental synchronization and force a full synchronization, you can specify the `--full` argument. +This makes the command ignore any pre-existing snapshot and create a new one upon completion. + +## Output + +The sync command produces either text or JSON output. +Text output is intended to be human readable and prints the file names that the command operates on. +JSON output is intended to be machine readable. + +### JSON output + +If selected, this produces line-delimited JSON objects with a `type` field as discriminator. + +Every time the command... +* checks the file system for changes, you'll see a `start` event. +* starts or completes a create/update/delete of a file, you'll see a `progress` event. +* completes a set of create/update/delete file operations, you'll see a `complete` event. + +Every JSON object has a sequence number in the `seq` field that associates it with a synchronization run. + +Progress events have a `progress` floating point number field between 0 and 1 indicating how far the operation has progressed. +A value of 0 means the operation has started and 1 means the operation has completed. diff --git a/libs/flags/output.go b/libs/flags/output.go new file mode 100644 index 00000000..1223228f --- /dev/null +++ b/libs/flags/output.go @@ -0,0 +1,39 @@ +package flags + +import ( + "fmt" + "strings" +) + +// Output controls how the CLI should produce its output. +type Output string + +var ( + OutputText = Output("text") + OutputJSON = Output("json") +) + +func (f *Output) String() string { + return string(*f) +} + +func (f *Output) Set(s string) error { + lower := strings.ToLower(s) + switch lower { + case OutputText.String(): + *f = Output(OutputText.String()) + case OutputJSON.String(): + *f = Output(OutputJSON.String()) + default: + valid := []string{ + OutputText.String(), + OutputJSON.String(), + } + return fmt.Errorf("accepted arguments are %s", strings.Join(valid, " and ")) + } + return nil +} + +func (f *Output) Type() string { + return "type" +} diff --git a/libs/flags/output_test.go b/libs/flags/output_test.go new file mode 100644 index 00000000..dea7e9de --- /dev/null +++ b/libs/flags/output_test.go @@ -0,0 +1,37 @@ +package flags + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestOutputFlag(t *testing.T) { + var f Output + var err error + + // Invalid + err = f.Set("foo") + assert.Error(t, err) + assert.ErrorContains(t, err, "accepted arguments are text and json") + + // Lowercase + err = f.Set("text") + assert.NoError(t, err) + assert.Equal(t, "text", f.String()) + + // Uppercase + err = f.Set("TEXT") + assert.NoError(t, err) + assert.Equal(t, "text", f.String()) + + // Lowercase + err = f.Set("json") + assert.NoError(t, err) + assert.Equal(t, "json", f.String()) + + // Uppercase + err = f.Set("JSON") + assert.NoError(t, err) + assert.Equal(t, "json", f.String()) +} diff --git a/libs/sync/diff.go b/libs/sync/diff.go index eb5b9cba..72c1d5aa 100644 --- a/libs/sync/diff.go +++ b/libs/sync/diff.go @@ -1,10 +1,5 @@ package sync -import ( - "fmt" - "strings" -) - type diff struct { put []string delete []string @@ -13,17 +8,3 @@ type diff struct { func (d diff) IsEmpty() bool { return len(d.put) == 0 && len(d.delete) == 0 } - -func (d diff) String() string { - if d.IsEmpty() { - return "no changes" - } - var changes []string - if len(d.put) > 0 { - changes = append(changes, fmt.Sprintf("PUT: %s", strings.Join(d.put, ", "))) - } - if len(d.delete) > 0 { - changes = append(changes, fmt.Sprintf("DELETE: %s", strings.Join(d.delete, ", "))) - } - return strings.Join(changes, ", ") -} diff --git a/libs/sync/event.go b/libs/sync/event.go new file mode 100644 index 00000000..d2d00021 --- /dev/null +++ b/libs/sync/event.go @@ -0,0 +1,164 @@ +package sync + +import ( + "context" + "fmt" + "strings" + "time" +) + +type EventType string + +const ( + EventTypeStart = EventType("start") + EventTypeProgress = EventType("progress") + EventTypeComplete = EventType("complete") +) + +type EventAction string + +const ( + EventActionPut = EventAction("put") + EventActionDelete = EventAction("delete") +) + +type Event interface { + fmt.Stringer +} + +type EventBase struct { + Timestamp time.Time `json:"timestamp"` + Seq int `json:"seq"` + Type EventType `json:"type"` +} + +func newEventBase(seq int, typ EventType) *EventBase { + return &EventBase{ + Timestamp: time.Now(), + Seq: seq, + Type: typ, + } +} + +type EventChanges struct { + Put []string `json:"put,omitempty"` + Delete []string `json:"delete,omitempty"` +} + +func (e *EventChanges) IsEmpty() bool { + return len(e.Put) == 0 && len(e.Delete) == 0 +} + +func (e *EventChanges) String() string { + var changes []string + if len(e.Put) > 0 { + changes = append(changes, fmt.Sprintf("PUT: %s", strings.Join(e.Put, ", "))) + } + if len(e.Delete) > 0 { + changes = append(changes, fmt.Sprintf("DELETE: %s", strings.Join(e.Delete, ", "))) + } + return strings.Join(changes, ", ") +} + +type EventStart struct { + *EventBase + *EventChanges +} + +func (e *EventStart) String() string { + if e.IsEmpty() { + return "" + } + + return fmt.Sprintf("Action: %s", e.EventChanges.String()) +} + +func newEventStart(seq int, put []string, delete []string) Event { + return &EventStart{ + EventBase: newEventBase(seq, EventTypeStart), + EventChanges: &EventChanges{Put: put, Delete: delete}, + } +} + +type EventSyncProgress struct { + *EventBase + + Action EventAction `json:"action"` + Path string `json:"path"` + + // Progress is in range [0, 1] where 0 means the operation started + // and 1 means the operation completed. + Progress float32 `json:"progress"` +} + +func (e *EventSyncProgress) String() string { + if e.Progress < 1.0 { + return "" + } + + switch e.Action { + case EventActionPut: + return fmt.Sprintf("Uploaded %s", e.Path) + case EventActionDelete: + return fmt.Sprintf("Deleted %s", e.Path) + default: + panic("invalid action") + } +} + +func newEventProgress(seq int, action EventAction, path string, progress float32) Event { + return &EventSyncProgress{ + EventBase: newEventBase(seq, EventTypeProgress), + + Action: action, + Path: path, + Progress: progress, + } +} + +type EventSyncComplete struct { + *EventBase + *EventChanges +} + +func (e *EventSyncComplete) String() string { + if e.Seq == 0 { + return "Initial Sync Complete" + } + + if e.IsEmpty() { + return "" + } + + return "Complete" +} + +func newEventComplete(seq int, put []string, delete []string) Event { + return &EventSyncComplete{ + EventBase: newEventBase(seq, EventTypeComplete), + EventChanges: &EventChanges{Put: put, Delete: delete}, + } +} + +type EventNotifier interface { + Notify(ctx context.Context, event Event) +} + +// ChannelNotifier implements [EventNotifier] and sends events to its channel. +type ChannelNotifier struct { + ch chan<- Event +} + +func (n *ChannelNotifier) Notify(ctx context.Context, e Event) { + select { + case <-ctx.Done(): + case n.ch <- e: + } +} + +// NopNotifier implements [EventNotifier] and does nothing. +type NopNotifier struct{} + +func (n *NopNotifier) Notify(ctx context.Context, e Event) { + // Discard +} diff --git a/libs/sync/event_test.go b/libs/sync/event_test.go new file mode 100644 index 00000000..3fcb0709 --- /dev/null +++ b/libs/sync/event_test.go @@ -0,0 +1,130 @@ +package sync + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func jsonEqual(t *testing.T, expected string, e Event) { + var expected_, e_ map[string]any + + buf, err := json.Marshal(e) + require.NoError(t, err) + + err = json.Unmarshal([]byte(expected), &expected_) + require.NoError(t, err) + delete(expected_, "timestamp") + + err = json.Unmarshal(buf, &e_) + require.NoError(t, err) + delete(e_, "timestamp") + + assert.Equal(t, expected_, e_) +} + +func TestEventStart(t *testing.T) { + var e Event + + e = newEventStart(0, []string{"put"}, []string{"delete"}) + assert.Equal(t, "Action: PUT: put, DELETE: delete", e.String()) + + e = newEventStart(1, []string{"put"}, []string{}) + assert.Equal(t, "Action: PUT: put", e.String()) + + e = newEventStart(2, []string{}, []string{"delete"}) + assert.Equal(t, "Action: DELETE: delete", e.String()) + + e = newEventStart(3, []string{}, []string{}) + assert.Equal(t, "", e.String()) +} + +func TestEventStartJSON(t *testing.T) { + var e Event + + e = newEventStart(0, []string{"put"}, []string{"delete"}) + jsonEqual(t, `{"seq": 0, "type": "start", "put": ["put"], "delete": ["delete"]}`, e) + + e = newEventStart(1, []string{"put"}, []string{}) + jsonEqual(t, `{"seq": 1, "type": "start", "put": ["put"]}`, e) + + e = newEventStart(2, []string{}, []string{"delete"}) + jsonEqual(t, `{"seq": 2, "type": "start", "delete": ["delete"]}`, e) + + e = newEventStart(3, []string{}, []string{}) + jsonEqual(t, `{"seq": 3, "type": "start"}`, e) +} + +func TestEventProgress(t *testing.T) { + var e Event + + // Empty string if no progress has been made. + e = newEventProgress(0, EventActionPut, "path", 0.0) + assert.Equal(t, "", e.String()) + + e = newEventProgress(1, EventActionPut, "path", 1.0) + assert.Equal(t, "Uploaded path", e.String()) + + // Empty string if no progress has been made. + e = newEventProgress(2, EventActionDelete, "path", 0.0) + assert.Equal(t, "", e.String()) + + e = newEventProgress(3, EventActionDelete, "path", 1.0) + assert.Equal(t, "Deleted path", e.String()) +} + +func TestEventProgressJSON(t *testing.T) { + var e Event + + e = newEventProgress(0, EventActionPut, "path", 0.0) + jsonEqual(t, `{"seq": 0, "type": "progress", "action": "put", "path": "path", "progress": 0.0}`, e) + + e = newEventProgress(0, EventActionPut, "path", 0.5) + jsonEqual(t, `{"seq": 0, "type": "progress", "action": "put", "path": "path", "progress": 0.5}`, e) + + e = newEventProgress(1, EventActionPut, "path", 1.0) + jsonEqual(t, `{"seq": 1, "type": "progress", "action": "put", "path": "path", "progress": 1.0}`, e) + + e = newEventProgress(2, EventActionDelete, "path", 0.0) + jsonEqual(t, `{"seq": 2, "type": "progress", "action": "delete", "path": "path", "progress": 0.0}`, e) + + e = newEventProgress(2, EventActionDelete, "path", 0.5) + jsonEqual(t, `{"seq": 2, "type": "progress", "action": "delete", "path": "path", "progress": 0.5}`, e) + + e = newEventProgress(3, EventActionDelete, "path", 1.0) + jsonEqual(t, `{"seq": 3, "type": "progress", "action": "delete", "path": "path", "progress": 1.0}`, e) +} + +func TestEventComplete(t *testing.T) { + var e Event + + e = newEventComplete(0, []string{"put"}, []string{"delete"}) + assert.Equal(t, "Initial Sync Complete", e.String()) + + e = newEventComplete(1, []string{"put"}, []string{}) + assert.Equal(t, "Complete", e.String()) + + e = newEventComplete(2, []string{}, []string{"delete"}) + assert.Equal(t, "Complete", e.String()) + + e = newEventComplete(3, []string{}, []string{}) + assert.Equal(t, "", e.String()) +} + +func TestEventCompleteJSON(t *testing.T) { + var e Event + + e = newEventComplete(0, []string{"put"}, []string{"delete"}) + jsonEqual(t, `{"seq": 0, "type": "complete", "put": ["put"], "delete": ["delete"]}`, e) + + e = newEventComplete(1, []string{"put"}, []string{}) + jsonEqual(t, `{"seq": 1, "type": "complete", "put": ["put"]}`, e) + + e = newEventComplete(2, []string{}, []string{"delete"}) + jsonEqual(t, `{"seq": 2, "type": "complete", "delete": ["delete"]}`, e) + + e = newEventComplete(3, []string{}, []string{}) + jsonEqual(t, `{"seq": 3, "type": "complete"}`, e) +} diff --git a/libs/sync/sync.go b/libs/sync/sync.go index a7aef87c..b288685b 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log" - "sync" "time" "github.com/databricks/bricks/libs/git" @@ -33,6 +32,10 @@ type Sync struct { fileSet *git.FileSet snapshot *Snapshot repoFiles *repofiles.RepoFiles + + // Synchronization progress events are sent to this event notifier. + notifier EventNotifier + seq int } // New initializes and returns a new [Sync] instance. @@ -82,12 +85,40 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { fileSet: fileSet, snapshot: snapshot, repoFiles: repoFiles, + notifier: &NopNotifier{}, + seq: 0, }, nil } +func (s *Sync) Events() <-chan Event { + ch := make(chan Event, MaxRequestsInFlight) + s.notifier = &ChannelNotifier{ch} + return ch +} + +func (s *Sync) notifyStart(ctx context.Context, d diff) { + // If this is not the initial iteration we can ignore no-ops. + if s.seq > 0 && d.IsEmpty() { + return + } + s.notifier.Notify(ctx, newEventStart(s.seq, d.put, d.delete)) +} + +func (s *Sync) notifyProgress(ctx context.Context, action EventAction, path string, progress float32) { + s.notifier.Notify(ctx, newEventProgress(s.seq, action, path, progress)) +} + +func (s *Sync) notifyComplete(ctx context.Context, d diff) { + // If this is not the initial iteration we can ignore no-ops. + if s.seq > 0 && d.IsEmpty() { + return + } + s.notifier.Notify(ctx, newEventComplete(s.seq, d.put, d.delete)) + s.seq++ +} + func (s *Sync) RunOnce(ctx context.Context) error { - repoFiles := repofiles.Create(s.RemotePath, s.LocalPath, s.WorkspaceClient) - applyDiff := syncCallback(ctx, repoFiles) + applyDiff := syncCallback(ctx, s) // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 @@ -101,11 +132,13 @@ func (s *Sync) RunOnce(ctx context.Context) error { if err != nil { return err } + + s.notifyStart(ctx, change) if change.IsEmpty() { + s.notifyComplete(ctx, change) return nil } - log.Printf("[INFO] Action: %v", change) err = applyDiff(change) if err != nil { return err @@ -117,12 +150,11 @@ func (s *Sync) RunOnce(ctx context.Context) error { return err } + s.notifyComplete(ctx, change) return nil } func (s *Sync) RunContinuous(ctx context.Context) error { - var once sync.Once - ticker := time.NewTicker(s.PollInterval) defer ticker.Stop() @@ -135,10 +167,6 @@ func (s *Sync) RunContinuous(ctx context.Context) error { if err != nil { return err } - - once.Do(func() { - log.Printf("[INFO] Initial Sync Complete") - }) } } } diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 4ac86456..257b5442 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -2,9 +2,7 @@ package sync import ( "context" - "log" - "github.com/databricks/bricks/libs/sync/repofiles" "golang.org/x/sync/errgroup" ) @@ -12,7 +10,7 @@ import ( // rate limits const MaxRequestsInFlight = 20 -func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(localDiff diff) error { +func syncCallback(ctx context.Context, s *Sync) func(localDiff diff) error { return func(d diff) error { // Abstraction over wait groups which allows you to get the errors // returned in goroutines @@ -28,11 +26,12 @@ func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(loca // is evaluated remoteNameCopy := remoteName g.Go(func() error { - err := repoFiles.DeleteFile(ctx, remoteNameCopy) + s.notifyProgress(ctx, EventActionDelete, remoteNameCopy, 0.0) + err := s.repoFiles.DeleteFile(ctx, remoteNameCopy) if err != nil { return err } - log.Printf("[INFO] Deleted %s", remoteNameCopy) + s.notifyProgress(ctx, EventActionDelete, remoteNameCopy, 1.0) return nil }) } @@ -40,11 +39,12 @@ func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(loca // Copy of localName created to make this safe for concurrent use. localRelativePathCopy := localRelativePath g.Go(func() error { - err := repoFiles.PutFile(ctx, localRelativePathCopy) + s.notifyProgress(ctx, EventActionPut, localRelativePathCopy, 0.0) + err := s.repoFiles.PutFile(ctx, localRelativePathCopy) if err != nil { return err } - log.Printf("[INFO] Uploaded %s", localRelativePathCopy) + s.notifyProgress(ctx, EventActionPut, localRelativePathCopy, 1.0) return nil }) }