databricks-cli/libs/sync/event.go

165 lines
3.2 KiB
Go
Raw Permalink Normal View History

Add optional JSON output for sync command (#230) JSON output makes it easy to process synchronization progress information in downstream tools (e.g. the vscode extension). This changes introduces a `sync.Event` interface type for progress events as well as an `sync.EventNotifier` that lets the sync code pass along progress events to calling code. Example output in text mode (default, this uses the existing logger calls): ```text 2023/03/03 14:07:17 [INFO] Remote file sync location: /Repos/pieter.noordhuis@databricks.com/... 2023/03/03 14:07:18 [INFO] Initial Sync Complete 2023/03/03 14:07:22 [INFO] Action: PUT: foo 2023/03/03 14:07:23 [INFO] Uploaded foo 2023/03/03 14:07:23 [INFO] Complete 2023/03/03 14:07:25 [INFO] Action: DELETE: foo 2023/03/03 14:07:25 [INFO] Deleted foo 2023/03/03 14:07:25 [INFO] Complete ``` Example output in JSON mode: ```json {"timestamp":"2023-03-03T14:08:15.459439+01:00","seq":0,"type":"start"} {"timestamp":"2023-03-03T14:08:15.459461+01:00","seq":0,"type":"complete"} {"timestamp":"2023-03-03T14:08:18.459821+01:00","seq":1,"type":"start","put":["foo"]} {"timestamp":"2023-03-03T14:08:18.459867+01:00","seq":1,"type":"progress","action":"put","path":"foo","progress":0} {"timestamp":"2023-03-03T14:08:19.418696+01:00","seq":1,"type":"progress","action":"put","path":"foo","progress":1} {"timestamp":"2023-03-03T14:08:19.421397+01:00","seq":1,"type":"complete","put":["foo"]} {"timestamp":"2023-03-03T14:08:22.459238+01:00","seq":2,"type":"start","delete":["foo"]} {"timestamp":"2023-03-03T14:08:22.459268+01:00","seq":2,"type":"progress","action":"delete","path":"foo","progress":0} {"timestamp":"2023-03-03T14:08:22.686413+01:00","seq":2,"type":"progress","action":"delete","path":"foo","progress":1} {"timestamp":"2023-03-03T14:08:22.688989+01:00","seq":2,"type":"complete","delete":["foo"]} ``` --------- Co-authored-by: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com>
2023-03-08 09:27:19 +00:00
package sync
import (
"context"
"fmt"
"strings"
"time"
)
type EventType string
const (
EventTypeStart = EventType("start")
EventTypeProgress = EventType("progress")
EventTypeComplete = EventType("complete")
)
type EventAction string
const (
EventActionPut = EventAction("put")
EventActionDelete = EventAction("delete")
)
type Event interface {
fmt.Stringer
}
type EventBase struct {
Timestamp time.Time `json:"timestamp"`
Seq int `json:"seq"`
Type EventType `json:"type"`
}
func newEventBase(seq int, typ EventType) *EventBase {
return &EventBase{
Timestamp: time.Now(),
Seq: seq,
Type: typ,
}
}
type EventChanges struct {
Put []string `json:"put,omitempty"`
Delete []string `json:"delete,omitempty"`
}
func (e *EventChanges) IsEmpty() bool {
return len(e.Put) == 0 && len(e.Delete) == 0
}
func (e *EventChanges) String() string {
var changes []string
if len(e.Put) > 0 {
changes = append(changes, fmt.Sprintf("PUT: %s", strings.Join(e.Put, ", ")))
}
if len(e.Delete) > 0 {
changes = append(changes, fmt.Sprintf("DELETE: %s", strings.Join(e.Delete, ", ")))
}
return strings.Join(changes, ", ")
}
type EventStart struct {
*EventBase
*EventChanges
}
func (e *EventStart) String() string {
if e.IsEmpty() {
return ""
}
return fmt.Sprintf("Action: %s", e.EventChanges.String())
}
func newEventStart(seq int, put []string, delete []string) Event {
return &EventStart{
EventBase: newEventBase(seq, EventTypeStart),
EventChanges: &EventChanges{Put: put, Delete: delete},
}
}
type EventSyncProgress struct {
*EventBase
Action EventAction `json:"action"`
Path string `json:"path"`
// Progress is in range [0, 1] where 0 means the operation started
// and 1 means the operation completed.
Progress float32 `json:"progress"`
}
func (e *EventSyncProgress) String() string {
if e.Progress < 1.0 {
return ""
}
switch e.Action {
case EventActionPut:
return fmt.Sprintf("Uploaded %s", e.Path)
case EventActionDelete:
return fmt.Sprintf("Deleted %s", e.Path)
default:
panic("invalid action")
}
}
func newEventProgress(seq int, action EventAction, path string, progress float32) Event {
return &EventSyncProgress{
EventBase: newEventBase(seq, EventTypeProgress),
Action: action,
Path: path,
Progress: progress,
}
}
type EventSyncComplete struct {
*EventBase
*EventChanges
}
func (e *EventSyncComplete) String() string {
if e.Seq == 0 {
return "Initial Sync Complete"
}
if e.IsEmpty() {
return ""
}
return "Complete"
}
func newEventComplete(seq int, put []string, delete []string) Event {
return &EventSyncComplete{
EventBase: newEventBase(seq, EventTypeComplete),
EventChanges: &EventChanges{Put: put, Delete: delete},
}
}
type EventNotifier interface {
Notify(ctx context.Context, event Event)
}
// ChannelNotifier implements [EventNotifier] and sends events to its channel.
type ChannelNotifier struct {
ch chan<- Event
}
func (n *ChannelNotifier) Notify(ctx context.Context, e Event) {
select {
case <-ctx.Done():
case n.ch <- e:
}
}
// NopNotifier implements [EventNotifier] and does nothing.
type NopNotifier struct{}
func (n *NopNotifier) Notify(ctx context.Context, e Event) {
// Discard
}