Drain sync event channel before returning (#253)

Not waiting means the last few events may or may not be printed.
This is relevant in the mode where sync runs once and then terminates.
This commit is contained in:
Pieter Noordhuis 2023-03-16 17:48:17 +01:00 committed by GitHub
parent 32a29c6af4
commit c9340d6317
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 8 deletions

View File

@ -1,10 +1,10 @@
package sync package sync
import ( import (
"bufio"
"context" "context"
"encoding/json" "encoding/json"
"io" "io"
"log"
"github.com/databricks/bricks/libs/sync" "github.com/databricks/bricks/libs/sync"
) )
@ -30,8 +30,10 @@ func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) {
} }
} }
// Read synchronization events and log them at the INFO level. // Read synchronization events and write them as text to the specified writer (typically stdout).
func logOutput(ctx context.Context, ch <-chan sync.Event) { func textOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) {
bw := bufio.NewWriter(w)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -43,7 +45,9 @@ func logOutput(ctx context.Context, ch <-chan sync.Event) {
// Log only if something actually happened. // Log only if something actually happened.
// Sync events produce an empty string if nothing happened. // Sync events produce an empty string if nothing happened.
if str := e.String(); str != "" { if str := e.String(); str != "" {
log.Printf("[INFO] %s", e.String()) bw.WriteString(str)
bw.WriteString("\r\n")
bw.Flush()
} }
} }
} }

View File

@ -1,9 +1,12 @@
package sync package sync
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"io"
"path/filepath" "path/filepath"
stdsync "sync"
"time" "time"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
@ -95,18 +98,32 @@ var syncCmd = &cobra.Command{
return err return err
} }
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch output { switch output {
case flags.OutputText: case flags.OutputText:
go logOutput(ctx, s.Events()) outputFunc = textOutput
case flags.OutputJSON: case flags.OutputJSON:
go jsonOutput(ctx, s.Events(), cmd.OutOrStdout()) outputFunc = jsonOutput
}
var wg stdsync.WaitGroup
if outputFunc != nil {
wg.Add(1)
go func() {
defer wg.Done()
outputFunc(ctx, s.Events(), cmd.OutOrStdout())
}()
} }
if watch { if watch {
return s.RunContinuous(ctx) err = s.RunContinuous(ctx)
} else {
err = s.RunOnce(ctx)
} }
return s.RunOnce(ctx) s.Close()
wg.Wait()
return err
}, },
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {

View File

@ -142,6 +142,7 @@ func newEventComplete(seq int, put []string, delete []string) Event {
type EventNotifier interface { type EventNotifier interface {
Notify(ctx context.Context, event Event) Notify(ctx context.Context, event Event)
Close()
} }
// ChannelNotifier implements [EventNotifier] and sends events to its channel. // ChannelNotifier implements [EventNotifier] and sends events to its channel.
@ -156,9 +157,17 @@ func (n *ChannelNotifier) Notify(ctx context.Context, e Event) {
} }
} }
func (n *ChannelNotifier) Close() {
close(n.ch)
}
// NopNotifier implements [EventNotifier] and does nothing. // NopNotifier implements [EventNotifier] and does nothing.
type NopNotifier struct{} type NopNotifier struct{}
func (n *NopNotifier) Notify(ctx context.Context, e Event) { func (n *NopNotifier) Notify(ctx context.Context, e Event) {
// Discard // Discard
} }
func (n *NopNotifier) Close() {
// Nothing to do
}

View File

@ -96,6 +96,14 @@ func (s *Sync) Events() <-chan Event {
return ch return ch
} }
func (s *Sync) Close() {
if s.notifier == nil {
return
}
s.notifier.Close()
s.notifier = nil
}
func (s *Sync) notifyStart(ctx context.Context, d diff) { func (s *Sync) notifyStart(ctx context.Context, d diff) {
// If this is not the initial iteration we can ignore no-ops. // If this is not the initial iteration we can ignore no-ops.
if s.seq > 0 && d.IsEmpty() { if s.seq > 0 && d.IsEmpty() {