Add optional JSON output for sync command (#230)

JSON output makes it easy to process synchronization progress
information in downstream tools (e.g. the vscode extension).
This changes introduces a `sync.Event` interface type for progress events as
well as an `sync.EventNotifier` that lets the sync code pass along
progress events to calling code.

Example output in text mode (default, this uses the existing logger calls):
```text
2023/03/03 14:07:17 [INFO] Remote file sync location: /Repos/pieter.noordhuis@databricks.com/...
2023/03/03 14:07:18 [INFO] Initial Sync Complete
2023/03/03 14:07:22 [INFO] Action: PUT: foo
2023/03/03 14:07:23 [INFO] Uploaded foo
2023/03/03 14:07:23 [INFO] Complete
2023/03/03 14:07:25 [INFO] Action: DELETE: foo
2023/03/03 14:07:25 [INFO] Deleted foo
2023/03/03 14:07:25 [INFO] Complete
```

Example output in JSON mode:
```json
{"timestamp":"2023-03-03T14:08:15.459439+01:00","seq":0,"type":"start"}
{"timestamp":"2023-03-03T14:08:15.459461+01:00","seq":0,"type":"complete"}
{"timestamp":"2023-03-03T14:08:18.459821+01:00","seq":1,"type":"start","put":["foo"]}
{"timestamp":"2023-03-03T14:08:18.459867+01:00","seq":1,"type":"progress","action":"put","path":"foo","progress":0}
{"timestamp":"2023-03-03T14:08:19.418696+01:00","seq":1,"type":"progress","action":"put","path":"foo","progress":1}
{"timestamp":"2023-03-03T14:08:19.421397+01:00","seq":1,"type":"complete","put":["foo"]}
{"timestamp":"2023-03-03T14:08:22.459238+01:00","seq":2,"type":"start","delete":["foo"]}
{"timestamp":"2023-03-03T14:08:22.459268+01:00","seq":2,"type":"progress","action":"delete","path":"foo","progress":0}
{"timestamp":"2023-03-03T14:08:22.686413+01:00","seq":2,"type":"progress","action":"delete","path":"foo","progress":1}
{"timestamp":"2023-03-03T14:08:22.688989+01:00","seq":2,"type":"complete","delete":["foo"]}
```

---------

Co-authored-by: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com>
This commit is contained in:
Pieter Noordhuis 2023-03-08 10:27:19 +01:00 committed by GitHub
parent 5166055efb
commit e872b587cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 514 additions and 38 deletions

50
cmd/sync/output.go Normal file
View File

@ -0,0 +1,50 @@
package sync
import (
"context"
"encoding/json"
"io"
"log"
"github.com/databricks/bricks/libs/sync"
)
// Read synchronization events and write them as JSON to the specified writer (typically stdout).
func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) {
enc := json.NewEncoder(w)
for {
select {
case <-ctx.Done():
return
case e, ok := <-ch:
if !ok {
return
}
err := enc.Encode(e)
// These are plain structs so this must always work.
// Panic on error so that a violation of this assumption does not go undetected.
if err != nil {
panic(err)
}
}
}
}
// Read synchronization events and log them at the INFO level.
func logOutput(ctx context.Context, ch <-chan sync.Event) {
for {
select {
case <-ctx.Done():
return
case e, ok := <-ch:
if !ok {
return
}
// Log only if something actually happened.
// Sync events produce an empty string if nothing happened.
if str := e.String(); str != "" {
log.Printf("[INFO] %s", e.String())
}
}
}
}

View File

@ -3,12 +3,12 @@ package sync
import (
"flag"
"fmt"
"log"
"path/filepath"
"time"
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/cmd/root"
"github.com/databricks/bricks/libs/flags"
"github.com/databricks/bricks/libs/sync"
"github.com/databricks/databricks-sdk-go"
"github.com/spf13/cobra"
@ -95,7 +95,12 @@ var syncCmd = &cobra.Command{
return err
}
log.Printf("[INFO] Remote file sync location: %v", opts.RemotePath)
switch output {
case flags.OutputText:
go logOutput(ctx, s.Events())
case flags.OutputJSON:
go jsonOutput(ctx, s.Events(), cmd.OutOrStdout())
}
if watch {
return s.RunContinuous(ctx)
@ -136,10 +141,12 @@ var syncCmd = &cobra.Command{
var interval time.Duration
var full bool
var watch bool
var output flags.Output = flags.OutputText
func init() {
root.RootCmd.AddCommand(syncCmd)
syncCmd.Flags().DurationVar(&interval, "interval", 1*time.Second, "file system polling interval (for --watch)")
syncCmd.Flags().BoolVar(&full, "full", false, "perform full synchronization (default is incremental)")
syncCmd.Flags().BoolVar(&watch, "watch", false, "watch local file system for changes")
syncCmd.Flags().Var(&output, "output", "type of output format")
}

40
docs/sync.md Normal file
View File

@ -0,0 +1,40 @@
# sync
The sync command synchronizes a local directory tree to a Databricks workspace path.
The destination can be a repository (under `/Repos/<user>`) or a workspace path (under `/Users/<user>`).
By default it performs incremental synchronization where only changes since the last synchronization are applied.
Synchronization is **unidirectional**; changes to remote files are overwritten on a new invocation of the command.
Beware:
* Sync will not remove pre-existing remote files that do not exist in the local directory tree.
* Sync will overwrite pre-existing remote files if they exist in the local directory tree.
## Incremental synchronization
The sync command stores a synchronization snapshot file in the local directory tree under a `.databricks` directory.
This snapshot file contains state to compute which changes to the local directory tree have happened since the last synchronization.
To opt out of incremental synchronization and force a full synchronization, you can specify the `--full` argument.
This makes the command ignore any pre-existing snapshot and create a new one upon completion.
## Output
The sync command produces either text or JSON output.
Text output is intended to be human readable and prints the file names that the command operates on.
JSON output is intended to be machine readable.
### JSON output
If selected, this produces line-delimited JSON objects with a `type` field as discriminator.
Every time the command...
* checks the file system for changes, you'll see a `start` event.
* starts or completes a create/update/delete of a file, you'll see a `progress` event.
* completes a set of create/update/delete file operations, you'll see a `complete` event.
Every JSON object has a sequence number in the `seq` field that associates it with a synchronization run.
Progress events have a `progress` floating point number field between 0 and 1 indicating how far the operation has progressed.
A value of 0 means the operation has started and 1 means the operation has completed.

39
libs/flags/output.go Normal file
View File

@ -0,0 +1,39 @@
package flags
import (
"fmt"
"strings"
)
// Output controls how the CLI should produce its output.
type Output string
var (
OutputText = Output("text")
OutputJSON = Output("json")
)
func (f *Output) String() string {
return string(*f)
}
func (f *Output) Set(s string) error {
lower := strings.ToLower(s)
switch lower {
case OutputText.String():
*f = Output(OutputText.String())
case OutputJSON.String():
*f = Output(OutputJSON.String())
default:
valid := []string{
OutputText.String(),
OutputJSON.String(),
}
return fmt.Errorf("accepted arguments are %s", strings.Join(valid, " and "))
}
return nil
}
func (f *Output) Type() string {
return "type"
}

37
libs/flags/output_test.go Normal file
View File

@ -0,0 +1,37 @@
package flags
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestOutputFlag(t *testing.T) {
var f Output
var err error
// Invalid
err = f.Set("foo")
assert.Error(t, err)
assert.ErrorContains(t, err, "accepted arguments are text and json")
// Lowercase
err = f.Set("text")
assert.NoError(t, err)
assert.Equal(t, "text", f.String())
// Uppercase
err = f.Set("TEXT")
assert.NoError(t, err)
assert.Equal(t, "text", f.String())
// Lowercase
err = f.Set("json")
assert.NoError(t, err)
assert.Equal(t, "json", f.String())
// Uppercase
err = f.Set("JSON")
assert.NoError(t, err)
assert.Equal(t, "json", f.String())
}

View File

@ -1,10 +1,5 @@
package sync
import (
"fmt"
"strings"
)
type diff struct {
put []string
delete []string
@ -13,17 +8,3 @@ type diff struct {
func (d diff) IsEmpty() bool {
return len(d.put) == 0 && len(d.delete) == 0
}
func (d diff) String() string {
if d.IsEmpty() {
return "no changes"
}
var changes []string
if len(d.put) > 0 {
changes = append(changes, fmt.Sprintf("PUT: %s", strings.Join(d.put, ", ")))
}
if len(d.delete) > 0 {
changes = append(changes, fmt.Sprintf("DELETE: %s", strings.Join(d.delete, ", ")))
}
return strings.Join(changes, ", ")
}

164
libs/sync/event.go Normal file
View File

@ -0,0 +1,164 @@
package sync
import (
"context"
"fmt"
"strings"
"time"
)
type EventType string
const (
EventTypeStart = EventType("start")
EventTypeProgress = EventType("progress")
EventTypeComplete = EventType("complete")
)
type EventAction string
const (
EventActionPut = EventAction("put")
EventActionDelete = EventAction("delete")
)
type Event interface {
fmt.Stringer
}
type EventBase struct {
Timestamp time.Time `json:"timestamp"`
Seq int `json:"seq"`
Type EventType `json:"type"`
}
func newEventBase(seq int, typ EventType) *EventBase {
return &EventBase{
Timestamp: time.Now(),
Seq: seq,
Type: typ,
}
}
type EventChanges struct {
Put []string `json:"put,omitempty"`
Delete []string `json:"delete,omitempty"`
}
func (e *EventChanges) IsEmpty() bool {
return len(e.Put) == 0 && len(e.Delete) == 0
}
func (e *EventChanges) String() string {
var changes []string
if len(e.Put) > 0 {
changes = append(changes, fmt.Sprintf("PUT: %s", strings.Join(e.Put, ", ")))
}
if len(e.Delete) > 0 {
changes = append(changes, fmt.Sprintf("DELETE: %s", strings.Join(e.Delete, ", ")))
}
return strings.Join(changes, ", ")
}
type EventStart struct {
*EventBase
*EventChanges
}
func (e *EventStart) String() string {
if e.IsEmpty() {
return ""
}
return fmt.Sprintf("Action: %s", e.EventChanges.String())
}
func newEventStart(seq int, put []string, delete []string) Event {
return &EventStart{
EventBase: newEventBase(seq, EventTypeStart),
EventChanges: &EventChanges{Put: put, Delete: delete},
}
}
type EventSyncProgress struct {
*EventBase
Action EventAction `json:"action"`
Path string `json:"path"`
// Progress is in range [0, 1] where 0 means the operation started
// and 1 means the operation completed.
Progress float32 `json:"progress"`
}
func (e *EventSyncProgress) String() string {
if e.Progress < 1.0 {
return ""
}
switch e.Action {
case EventActionPut:
return fmt.Sprintf("Uploaded %s", e.Path)
case EventActionDelete:
return fmt.Sprintf("Deleted %s", e.Path)
default:
panic("invalid action")
}
}
func newEventProgress(seq int, action EventAction, path string, progress float32) Event {
return &EventSyncProgress{
EventBase: newEventBase(seq, EventTypeProgress),
Action: action,
Path: path,
Progress: progress,
}
}
type EventSyncComplete struct {
*EventBase
*EventChanges
}
func (e *EventSyncComplete) String() string {
if e.Seq == 0 {
return "Initial Sync Complete"
}
if e.IsEmpty() {
return ""
}
return "Complete"
}
func newEventComplete(seq int, put []string, delete []string) Event {
return &EventSyncComplete{
EventBase: newEventBase(seq, EventTypeComplete),
EventChanges: &EventChanges{Put: put, Delete: delete},
}
}
type EventNotifier interface {
Notify(ctx context.Context, event Event)
}
// ChannelNotifier implements [EventNotifier] and sends events to its channel.
type ChannelNotifier struct {
ch chan<- Event
}
func (n *ChannelNotifier) Notify(ctx context.Context, e Event) {
select {
case <-ctx.Done():
case n.ch <- e:
}
}
// NopNotifier implements [EventNotifier] and does nothing.
type NopNotifier struct{}
func (n *NopNotifier) Notify(ctx context.Context, e Event) {
// Discard
}

130
libs/sync/event_test.go Normal file
View File

@ -0,0 +1,130 @@
package sync
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func jsonEqual(t *testing.T, expected string, e Event) {
var expected_, e_ map[string]any
buf, err := json.Marshal(e)
require.NoError(t, err)
err = json.Unmarshal([]byte(expected), &expected_)
require.NoError(t, err)
delete(expected_, "timestamp")
err = json.Unmarshal(buf, &e_)
require.NoError(t, err)
delete(e_, "timestamp")
assert.Equal(t, expected_, e_)
}
func TestEventStart(t *testing.T) {
var e Event
e = newEventStart(0, []string{"put"}, []string{"delete"})
assert.Equal(t, "Action: PUT: put, DELETE: delete", e.String())
e = newEventStart(1, []string{"put"}, []string{})
assert.Equal(t, "Action: PUT: put", e.String())
e = newEventStart(2, []string{}, []string{"delete"})
assert.Equal(t, "Action: DELETE: delete", e.String())
e = newEventStart(3, []string{}, []string{})
assert.Equal(t, "", e.String())
}
func TestEventStartJSON(t *testing.T) {
var e Event
e = newEventStart(0, []string{"put"}, []string{"delete"})
jsonEqual(t, `{"seq": 0, "type": "start", "put": ["put"], "delete": ["delete"]}`, e)
e = newEventStart(1, []string{"put"}, []string{})
jsonEqual(t, `{"seq": 1, "type": "start", "put": ["put"]}`, e)
e = newEventStart(2, []string{}, []string{"delete"})
jsonEqual(t, `{"seq": 2, "type": "start", "delete": ["delete"]}`, e)
e = newEventStart(3, []string{}, []string{})
jsonEqual(t, `{"seq": 3, "type": "start"}`, e)
}
func TestEventProgress(t *testing.T) {
var e Event
// Empty string if no progress has been made.
e = newEventProgress(0, EventActionPut, "path", 0.0)
assert.Equal(t, "", e.String())
e = newEventProgress(1, EventActionPut, "path", 1.0)
assert.Equal(t, "Uploaded path", e.String())
// Empty string if no progress has been made.
e = newEventProgress(2, EventActionDelete, "path", 0.0)
assert.Equal(t, "", e.String())
e = newEventProgress(3, EventActionDelete, "path", 1.0)
assert.Equal(t, "Deleted path", e.String())
}
func TestEventProgressJSON(t *testing.T) {
var e Event
e = newEventProgress(0, EventActionPut, "path", 0.0)
jsonEqual(t, `{"seq": 0, "type": "progress", "action": "put", "path": "path", "progress": 0.0}`, e)
e = newEventProgress(0, EventActionPut, "path", 0.5)
jsonEqual(t, `{"seq": 0, "type": "progress", "action": "put", "path": "path", "progress": 0.5}`, e)
e = newEventProgress(1, EventActionPut, "path", 1.0)
jsonEqual(t, `{"seq": 1, "type": "progress", "action": "put", "path": "path", "progress": 1.0}`, e)
e = newEventProgress(2, EventActionDelete, "path", 0.0)
jsonEqual(t, `{"seq": 2, "type": "progress", "action": "delete", "path": "path", "progress": 0.0}`, e)
e = newEventProgress(2, EventActionDelete, "path", 0.5)
jsonEqual(t, `{"seq": 2, "type": "progress", "action": "delete", "path": "path", "progress": 0.5}`, e)
e = newEventProgress(3, EventActionDelete, "path", 1.0)
jsonEqual(t, `{"seq": 3, "type": "progress", "action": "delete", "path": "path", "progress": 1.0}`, e)
}
func TestEventComplete(t *testing.T) {
var e Event
e = newEventComplete(0, []string{"put"}, []string{"delete"})
assert.Equal(t, "Initial Sync Complete", e.String())
e = newEventComplete(1, []string{"put"}, []string{})
assert.Equal(t, "Complete", e.String())
e = newEventComplete(2, []string{}, []string{"delete"})
assert.Equal(t, "Complete", e.String())
e = newEventComplete(3, []string{}, []string{})
assert.Equal(t, "", e.String())
}
func TestEventCompleteJSON(t *testing.T) {
var e Event
e = newEventComplete(0, []string{"put"}, []string{"delete"})
jsonEqual(t, `{"seq": 0, "type": "complete", "put": ["put"], "delete": ["delete"]}`, e)
e = newEventComplete(1, []string{"put"}, []string{})
jsonEqual(t, `{"seq": 1, "type": "complete", "put": ["put"]}`, e)
e = newEventComplete(2, []string{}, []string{"delete"})
jsonEqual(t, `{"seq": 2, "type": "complete", "delete": ["delete"]}`, e)
e = newEventComplete(3, []string{}, []string{})
jsonEqual(t, `{"seq": 3, "type": "complete"}`, e)
}

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/databricks/bricks/libs/git"
@ -33,6 +32,10 @@ type Sync struct {
fileSet *git.FileSet
snapshot *Snapshot
repoFiles *repofiles.RepoFiles
// Synchronization progress events are sent to this event notifier.
notifier EventNotifier
seq int
}
// New initializes and returns a new [Sync] instance.
@ -82,12 +85,40 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
fileSet: fileSet,
snapshot: snapshot,
repoFiles: repoFiles,
notifier: &NopNotifier{},
seq: 0,
}, nil
}
func (s *Sync) Events() <-chan Event {
ch := make(chan Event, MaxRequestsInFlight)
s.notifier = &ChannelNotifier{ch}
return ch
}
func (s *Sync) notifyStart(ctx context.Context, d diff) {
// If this is not the initial iteration we can ignore no-ops.
if s.seq > 0 && d.IsEmpty() {
return
}
s.notifier.Notify(ctx, newEventStart(s.seq, d.put, d.delete))
}
func (s *Sync) notifyProgress(ctx context.Context, action EventAction, path string, progress float32) {
s.notifier.Notify(ctx, newEventProgress(s.seq, action, path, progress))
}
func (s *Sync) notifyComplete(ctx context.Context, d diff) {
// If this is not the initial iteration we can ignore no-ops.
if s.seq > 0 && d.IsEmpty() {
return
}
s.notifier.Notify(ctx, newEventComplete(s.seq, d.put, d.delete))
s.seq++
}
func (s *Sync) RunOnce(ctx context.Context) error {
repoFiles := repofiles.Create(s.RemotePath, s.LocalPath, s.WorkspaceClient)
applyDiff := syncCallback(ctx, repoFiles)
applyDiff := syncCallback(ctx, s)
// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement
// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
@ -101,11 +132,13 @@ func (s *Sync) RunOnce(ctx context.Context) error {
if err != nil {
return err
}
s.notifyStart(ctx, change)
if change.IsEmpty() {
s.notifyComplete(ctx, change)
return nil
}
log.Printf("[INFO] Action: %v", change)
err = applyDiff(change)
if err != nil {
return err
@ -117,12 +150,11 @@ func (s *Sync) RunOnce(ctx context.Context) error {
return err
}
s.notifyComplete(ctx, change)
return nil
}
func (s *Sync) RunContinuous(ctx context.Context) error {
var once sync.Once
ticker := time.NewTicker(s.PollInterval)
defer ticker.Stop()
@ -135,10 +167,6 @@ func (s *Sync) RunContinuous(ctx context.Context) error {
if err != nil {
return err
}
once.Do(func() {
log.Printf("[INFO] Initial Sync Complete")
})
}
}
}

View File

@ -2,9 +2,7 @@ package sync
import (
"context"
"log"
"github.com/databricks/bricks/libs/sync/repofiles"
"golang.org/x/sync/errgroup"
)
@ -12,7 +10,7 @@ import (
// rate limits
const MaxRequestsInFlight = 20
func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(localDiff diff) error {
func syncCallback(ctx context.Context, s *Sync) func(localDiff diff) error {
return func(d diff) error {
// Abstraction over wait groups which allows you to get the errors
// returned in goroutines
@ -28,11 +26,12 @@ func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(loca
// is evaluated
remoteNameCopy := remoteName
g.Go(func() error {
err := repoFiles.DeleteFile(ctx, remoteNameCopy)
s.notifyProgress(ctx, EventActionDelete, remoteNameCopy, 0.0)
err := s.repoFiles.DeleteFile(ctx, remoteNameCopy)
if err != nil {
return err
}
log.Printf("[INFO] Deleted %s", remoteNameCopy)
s.notifyProgress(ctx, EventActionDelete, remoteNameCopy, 1.0)
return nil
})
}
@ -40,11 +39,12 @@ func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(loca
// Copy of localName created to make this safe for concurrent use.
localRelativePathCopy := localRelativePath
g.Go(func() error {
err := repoFiles.PutFile(ctx, localRelativePathCopy)
s.notifyProgress(ctx, EventActionPut, localRelativePathCopy, 0.0)
err := s.repoFiles.PutFile(ctx, localRelativePathCopy)
if err != nil {
return err
}
log.Printf("[INFO] Uploaded %s", localRelativePathCopy)
s.notifyProgress(ctx, EventActionPut, localRelativePathCopy, 1.0)
return nil
})
}