From c9340d63177a2ade5f48b55ce5ceef0066ffe5a4 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 16 Mar 2023 17:48:17 +0100 Subject: [PATCH] Drain sync event channel before returning (#253) Not waiting means the last few events may or may not be printed. This is relevant in the mode where sync runs once and then terminates. --- cmd/sync/output.go | 12 ++++++++---- cmd/sync/sync.go | 25 +++++++++++++++++++++---- libs/sync/event.go | 9 +++++++++ libs/sync/sync.go | 8 ++++++++ 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/cmd/sync/output.go b/cmd/sync/output.go index d730301e..0659bf7f 100644 --- a/cmd/sync/output.go +++ b/cmd/sync/output.go @@ -1,10 +1,10 @@ package sync import ( + "bufio" "context" "encoding/json" "io" - "log" "github.com/databricks/bricks/libs/sync" ) @@ -30,8 +30,10 @@ func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { } } -// Read synchronization events and log them at the INFO level. -func logOutput(ctx context.Context, ch <-chan sync.Event) { +// Read synchronization events and write them as text to the specified writer (typically stdout). +func textOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { + bw := bufio.NewWriter(w) + for { select { case <-ctx.Done(): @@ -43,7 +45,9 @@ func logOutput(ctx context.Context, ch <-chan sync.Event) { // Log only if something actually happened. // Sync events produce an empty string if nothing happened. if str := e.String(); str != "" { - log.Printf("[INFO] %s", e.String()) + bw.WriteString(str) + bw.WriteString("\r\n") + bw.Flush() } } } diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index 0c56f5c5..76381fc8 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -1,9 +1,12 @@ package sync import ( + "context" "flag" "fmt" + "io" "path/filepath" + stdsync "sync" "time" "github.com/databricks/bricks/bundle" @@ -95,18 +98,32 @@ var syncCmd = &cobra.Command{ return err } + var outputFunc func(context.Context, <-chan sync.Event, io.Writer) switch output { case flags.OutputText: - go logOutput(ctx, s.Events()) + outputFunc = textOutput case flags.OutputJSON: - go jsonOutput(ctx, s.Events(), cmd.OutOrStdout()) + outputFunc = jsonOutput + } + + var wg stdsync.WaitGroup + if outputFunc != nil { + wg.Add(1) + go func() { + defer wg.Done() + outputFunc(ctx, s.Events(), cmd.OutOrStdout()) + }() } if watch { - return s.RunContinuous(ctx) + err = s.RunContinuous(ctx) + } else { + err = s.RunOnce(ctx) } - return s.RunOnce(ctx) + s.Close() + wg.Wait() + return err }, ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { diff --git a/libs/sync/event.go b/libs/sync/event.go index d2d00021..8e5c0efa 100644 --- a/libs/sync/event.go +++ b/libs/sync/event.go @@ -142,6 +142,7 @@ func newEventComplete(seq int, put []string, delete []string) Event { type EventNotifier interface { Notify(ctx context.Context, event Event) + Close() } // ChannelNotifier implements [EventNotifier] and sends events to its channel. @@ -156,9 +157,17 @@ func (n *ChannelNotifier) Notify(ctx context.Context, e Event) { } } +func (n *ChannelNotifier) Close() { + close(n.ch) +} + // NopNotifier implements [EventNotifier] and does nothing. type NopNotifier struct{} func (n *NopNotifier) Notify(ctx context.Context, e Event) { // Discard } + +func (n *NopNotifier) Close() { + // Nothing to do +} diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 452cebad..f32c0cf7 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -96,6 +96,14 @@ func (s *Sync) Events() <-chan Event { return ch } +func (s *Sync) Close() { + if s.notifier == nil { + return + } + s.notifier.Close() + s.notifier = nil +} + func (s *Sync) notifyStart(ctx context.Context, d diff) { // If this is not the initial iteration we can ignore no-ops. if s.seq > 0 && d.IsEmpty() {