From 8fd3dccca9f40f1f47f4f8f2d48d6e28f6050d23 Mon Sep 17 00:00:00 2001 From: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com> Date: Wed, 29 Mar 2023 14:58:09 +0200 Subject: [PATCH] Add progress logs for job runs (#276) --- bundle/run/job.go | 91 +++++++++++++++++++++++---- bundle/run/job_progress_event.go | 35 +++++++++++ bundle/run/job_progress_event_test.go | 25 ++++++++ bundle/run/options.go | 4 +- bundle/run/runner.go | 2 + cmd/root/logger.go | 3 +- cmd/root/progress_logger.go | 48 ++++++++++++++ cmd/root/progress_logger_test.go | 45 +++++++++++++ cmd/root/root.go | 8 ++- go.mod | 1 + go.sum | 2 + libs/flags/progress_format.go | 58 +++++++++++++++++ libs/flags/progress_format_test.go | 59 +++++++++++++++++ libs/progress/context.go | 20 ++++++ libs/progress/event.go | 5 ++ libs/progress/logger.go | 55 ++++++++++++++++ 16 files changed, 444 insertions(+), 17 deletions(-) create mode 100644 bundle/run/job_progress_event.go create mode 100644 bundle/run/job_progress_event_test.go create mode 100644 cmd/root/progress_logger.go create mode 100644 cmd/root/progress_logger_test.go create mode 100644 libs/flags/progress_format.go create mode 100644 libs/flags/progress_format_test.go create mode 100644 libs/progress/context.go create mode 100644 libs/progress/event.go create mode 100644 libs/progress/logger.go diff --git a/bundle/run/job.go b/bundle/run/job.go index 428734dc..94f28fcb 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -9,6 +9,7 @@ import ( "github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle/config/resources" "github.com/databricks/bricks/libs/log" + "github.com/databricks/bricks/libs/progress" "github.com/databricks/databricks-sdk-go/retries" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/fatih/color" @@ -137,20 +138,24 @@ func (r *jobRunner) logFailedTasks(ctx context.Context, runId int64) { yellow(task.TaskKey), task.State.LifeCycleState) } } - } -func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) { - jobID, err := strconv.ParseInt(r.job.ID, 10, 64) - if err != nil { - return nil, fmt.Errorf("job ID is not an integer: %s", r.job.ID) +func pullRunIdCallback(runId *int64) func(info *retries.Info[jobs.Run]) { + return func(info *retries.Info[jobs.Run]) { + i := info.Info + if i == nil { + return + } + + if *runId == 0 { + *runId = i.RunId + } } +} +func logDebugCallback(ctx context.Context, runId *int64) func(info *retries.Info[jobs.Run]) { var prevState *jobs.RunState - var runId *int64 - - // This function is called each time the function below polls the run status. - update := func(info *retries.Info[jobs.Run]) { + return func(info *retries.Info[jobs.Run]) { i := info.Info if i == nil { return @@ -169,11 +174,55 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) { log.Infof(ctx, "Run status: %s", info.Info.State.LifeCycleState) prevState = state } - if runId == nil { - runId = &i.RunId + } +} + +func logProgressCallback(ctx context.Context, progressLogger *progress.Logger) func(info *retries.Info[jobs.Run]) { + var prevState *jobs.RunState + return func(info *retries.Info[jobs.Run]) { + i := info.Info + if i == nil { + return } + + state := i.State + if state == nil { + return + } + + if prevState != nil && prevState.LifeCycleState == state.LifeCycleState && + prevState.ResultState == state.ResultState { + return + } else { + prevState = state + } + + event := &JobProgressEvent{ + Timestamp: time.Now(), + JobId: i.JobId, + RunId: i.RunId, + RunName: i.RunName, + State: *i.State, + RunPageURL: i.RunPageUrl, + } + + // log progress events to stderr + progressLogger.Log(event) + + // log progress events in using the default logger + log.Infof(ctx, event.String()) + } +} + +func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) { + jobID, err := strconv.ParseInt(r.job.ID, 10, 64) + if err != nil { + return nil, fmt.Errorf("job ID is not an integer: %s", r.job.ID) } + runId := new(int64) + + // construct request payload from cmd line flags args req, err := opts.Job.toPayload(jobID) if err != nil { return nil, err @@ -181,11 +230,27 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) { // Include resource key in logger. ctx = log.NewContext(ctx, log.GetLogger(ctx).With("resource", r.Key())) + w := r.bundle.WorkspaceClient() - run, err := w.Jobs.RunNowAndWait(ctx, *req, retries.Timeout[jobs.Run](jobRunTimeout), update) + + // gets the run id from inside Jobs.RunNowAndWait + pullRunId := pullRunIdCallback(runId) + + // callback to log status updates to the universal log destination. + // Called on every poll request + logDebug := logDebugCallback(ctx, runId) + + // callback to log progress events. Called on every poll request + progressLogger, ok := progress.FromContext(ctx) + if !ok { + return nil, fmt.Errorf("no progress logger found") + } + logProgress := logProgressCallback(ctx, progressLogger) + + run, err := w.Jobs.RunNowAndWait(ctx, *req, + retries.Timeout[jobs.Run](jobRunTimeout), pullRunId, logDebug, logProgress) if err != nil && runId != nil { r.logFailedTasks(ctx, *runId) - } if err != nil { return nil, err diff --git a/bundle/run/job_progress_event.go b/bundle/run/job_progress_event.go new file mode 100644 index 00000000..323ad8fd --- /dev/null +++ b/bundle/run/job_progress_event.go @@ -0,0 +1,35 @@ +package run + +import ( + "strings" + "time" + + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +type JobProgressEvent struct { + Timestamp time.Time `json:"timestamp"` + JobId int64 `json:"job_id"` + RunId int64 `json:"run_id"` + RunName string `json:"run_name"` + State jobs.RunState `json:"state"` + RunPageURL string `json:"run_page_url"` +} + +func (event *JobProgressEvent) String() string { + result := strings.Builder{} + result.WriteString(event.Timestamp.Format("2006-01-02 15:04:05")) + result.WriteString(" ") + result.WriteString(event.RunName) + result.WriteString(" ") + result.WriteString(event.State.LifeCycleState.String()) + if event.State.ResultState.String() != "" { + result.WriteString(" ") + result.WriteString(event.State.ResultState.String()) + } + result.WriteString(" ") + result.WriteString(event.State.StateMessage) + result.WriteString(" ") + result.WriteString(event.RunPageURL) + return result.String() +} diff --git a/bundle/run/job_progress_event_test.go b/bundle/run/job_progress_event_test.go new file mode 100644 index 00000000..57b060a4 --- /dev/null +++ b/bundle/run/job_progress_event_test.go @@ -0,0 +1,25 @@ +package run + +import ( + "testing" + "time" + + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" +) + +func TestJobProgressEventString(t *testing.T) { + event := &JobProgressEvent{ + Timestamp: time.Date(0, 0, 0, 0, 0, 0, 0, &time.Location{}), + JobId: 123, + RunId: 456, + RunName: "run_name", + State: jobs.RunState{ + LifeCycleState: jobs.RunLifeCycleStateTerminated, + ResultState: jobs.RunResultStateSuccess, + StateMessage: "state_message", + }, + RunPageURL: "run_url", + } + assert.Equal(t, "-0001-11-30 00:00:00 run_name TERMINATED SUCCESS state_message run_url", event.String()) +} diff --git a/bundle/run/options.go b/bundle/run/options.go index 7a550af8..cc9dd413 100644 --- a/bundle/run/options.go +++ b/bundle/run/options.go @@ -1,6 +1,8 @@ package run -import flag "github.com/spf13/pflag" +import ( + flag "github.com/spf13/pflag" +) type Options struct { Job JobOptions diff --git a/bundle/run/runner.go b/bundle/run/runner.go index 6fb31021..8ac5a25e 100644 --- a/bundle/run/runner.go +++ b/bundle/run/runner.go @@ -8,6 +8,8 @@ import ( "github.com/databricks/bricks/bundle" ) +// TODO: refactor this package into service specific implementations. Its +// getting bloated. (https://github.com/databricks/bricks/issues/282) type key string func (k key) Key() string { diff --git a/cmd/root/logger.go b/cmd/root/logger.go index b20599c5..296075e9 100644 --- a/cmd/root/logger.go +++ b/cmd/root/logger.go @@ -7,7 +7,6 @@ import ( "github.com/databricks/bricks/libs/flags" "github.com/databricks/bricks/libs/log" - "github.com/spf13/cobra" "golang.org/x/exp/slog" ) @@ -17,7 +16,7 @@ const ( envBricksLogFormat = "BRICKS_LOG_FORMAT" ) -func initializeLogger(ctx context.Context, cmd *cobra.Command) (context.Context, error) { +func initializeLogger(ctx context.Context) (context.Context, error) { opts := slog.HandlerOptions{} opts.Level = logLevel.Level() opts.AddSource = true diff --git a/cmd/root/progress_logger.go b/cmd/root/progress_logger.go new file mode 100644 index 00000000..cfa74042 --- /dev/null +++ b/cmd/root/progress_logger.go @@ -0,0 +1,48 @@ +package root + +import ( + "context" + "fmt" + "os" + + "github.com/databricks/bricks/libs/flags" + "github.com/databricks/bricks/libs/progress" + "golang.org/x/term" +) + +const envBricksProgressFormat = "BRICKS_PROGRESS_FORMAT" + +func resolveModeDefault(format flags.ProgressLogFormat) flags.ProgressLogFormat { + if (logLevel.String() == "disabled" || logFile.String() != "stderr") && + term.IsTerminal(int(os.Stderr.Fd())) { + return flags.ModeInplace + } + return flags.ModeAppend +} + +func initializeProgressLogger(ctx context.Context) (context.Context, error) { + if logLevel.String() != "disabled" && logFile.String() == "stderr" && + progressFormat == flags.ModeInplace { + return nil, fmt.Errorf("inplace progress logging cannot be used when log-file is stderr") + } + + format := progressFormat + if format == flags.ModeDefault { + format = resolveModeDefault(format) + } + + progressLogger := progress.NewLogger(format) + return progress.NewContext(ctx, progressLogger), nil +} + +var progressFormat = flags.NewProgressLogFormat() + +func init() { + // Configure defaults from environment, if applicable. + // If the provided value is invalid it is ignored. + if v, ok := os.LookupEnv(envBricksProgressFormat); ok { + progressFormat.Set(v) + } + RootCmd.PersistentFlags().Var(&progressFormat, "progress-format", "format for progress logs (append, inplace, json)") + RootCmd.RegisterFlagCompletionFunc("progress-format", progressFormat.Complete) +} diff --git a/cmd/root/progress_logger_test.go b/cmd/root/progress_logger_test.go new file mode 100644 index 00000000..f962b660 --- /dev/null +++ b/cmd/root/progress_logger_test.go @@ -0,0 +1,45 @@ +package root + +import ( + "context" + "testing" + + "github.com/databricks/bricks/libs/flags" + "github.com/databricks/bricks/libs/progress" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestInitializeErrorOnIncompatibleConfig(t *testing.T) { + logLevel.Set("info") + logFile.Set("stderr") + progressFormat.Set("inplace") + _, err := initializeProgressLogger(context.Background()) + assert.ErrorContains(t, err, "inplace progress logging cannot be used when log-file is stderr") +} + +func TestNoErrorOnDisabledLogLevel(t *testing.T) { + logLevel.Set("disabled") + logFile.Set("stderr") + progressFormat.Set("inplace") + _, err := initializeProgressLogger(context.Background()) + assert.NoError(t, err) +} + +func TestNoErrorOnNonStderrLogFile(t *testing.T) { + logLevel.Set("info") + logFile.Set("stdout") + progressFormat.Set("inplace") + _, err := initializeProgressLogger(context.Background()) + assert.NoError(t, err) +} + +func TestDefaultLoggerModeResolution(t *testing.T) { + progressFormat = flags.NewProgressLogFormat() + require.Equal(t, progressFormat, flags.ModeDefault) + ctx, err := initializeProgressLogger(context.Background()) + require.NoError(t, err) + logger, ok := progress.FromContext(ctx) + assert.True(t, ok) + assert.Equal(t, logger.Mode, flags.ModeAppend) +} diff --git a/cmd/root/root.go b/cmd/root/root.go index 195e4562..15536e0f 100644 --- a/cmd/root/root.go +++ b/cmd/root/root.go @@ -24,7 +24,13 @@ var RootCmd = &cobra.Command{ ctx := cmd.Context() // Configure default logger. - ctx, err := initializeLogger(ctx, cmd) + ctx, err := initializeLogger(ctx) + if err != nil { + return err + } + + // Configure progress logger + ctx, err = initializeProgressLogger(ctx) if err != nil { return err } diff --git a/go.mod b/go.mod index 9091aaa3..3088ebf9 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/hashicorp/terraform-json v0.16.0 golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0 golang.org/x/sync v0.1.0 + golang.org/x/term v0.6.0 ) require ( diff --git a/go.sum b/go.sum index cc5d5f5b..b6e2a156 100644 --- a/go.sum +++ b/go.sum @@ -259,6 +259,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= +golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw= +golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/libs/flags/progress_format.go b/libs/flags/progress_format.go new file mode 100644 index 00000000..4dc3ed82 --- /dev/null +++ b/libs/flags/progress_format.go @@ -0,0 +1,58 @@ +package flags + +import ( + "fmt" + "strings" + + "github.com/spf13/cobra" +) + +type ProgressLogFormat string + +var ( + ModeAppend = ProgressLogFormat("append") + ModeInplace = ProgressLogFormat("inplace") + ModeJson = ProgressLogFormat("json") + ModeDefault = ProgressLogFormat("default") +) + +func (p *ProgressLogFormat) String() string { + return string(*p) +} + +func NewProgressLogFormat() ProgressLogFormat { + return ModeDefault +} + +func (p *ProgressLogFormat) Set(s string) error { + lower := strings.ToLower(s) + switch lower { + case ModeAppend.String(): + *p = ProgressLogFormat(ModeAppend.String()) + case ModeInplace.String(): + *p = ProgressLogFormat(ModeInplace.String()) + case ModeJson.String(): + *p = ProgressLogFormat(ModeJson.String()) + default: + valid := []string{ + ModeAppend.String(), + ModeInplace.String(), + ModeJson.String(), + } + return fmt.Errorf("accepted arguments are [%s]", strings.Join(valid, ", ")) + } + return nil +} + +func (p *ProgressLogFormat) Type() string { + return "format" +} + +// Complete is the Cobra compatible completion function for this flag. +func (f *ProgressLogFormat) Complete(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return []string{ + "append", + "inplace", + "json", + }, cobra.ShellCompDirectiveNoFileComp +} diff --git a/libs/flags/progress_format_test.go b/libs/flags/progress_format_test.go new file mode 100644 index 00000000..55d6da65 --- /dev/null +++ b/libs/flags/progress_format_test.go @@ -0,0 +1,59 @@ +package flags + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestProgressFormatNonTtyDefault(t *testing.T) { + format := NewProgressLogFormat() + assert.Equal(t, format, ModeDefault) +} + +func TestProgressFormatSet(t *testing.T) { + p := NewProgressLogFormat() + + // invalid arg + err := p.Set("foo") + assert.ErrorContains(t, err, "accepted arguments are [append, inplace, json]") + + // set json + err = p.Set("json") + assert.NoError(t, err) + assert.Equal(t, "json", p.String()) + + err = p.Set("JSON") + assert.NoError(t, err) + assert.Equal(t, "json", p.String()) + + err = p.Set("Json") + assert.NoError(t, err) + assert.Equal(t, "json", p.String()) + + // set append + err = p.Set("append") + assert.NoError(t, err) + assert.Equal(t, "append", p.String()) + + err = p.Set("Append") + assert.NoError(t, err) + assert.Equal(t, "append", p.String()) + + err = p.Set("APPEND") + assert.NoError(t, err) + assert.Equal(t, "append", p.String()) + + // set inplace + err = p.Set("inplace") + assert.NoError(t, err) + assert.Equal(t, "inplace", p.String()) + + err = p.Set("Inplace") + assert.NoError(t, err) + assert.Equal(t, "inplace", p.String()) + + err = p.Set("INPLACE") + assert.NoError(t, err) + assert.Equal(t, "inplace", p.String()) +} diff --git a/libs/progress/context.go b/libs/progress/context.go new file mode 100644 index 00000000..4fd60194 --- /dev/null +++ b/libs/progress/context.go @@ -0,0 +1,20 @@ +package progress + +import ( + "context" +) + +type progressLogger int + +var progressLoggerKey progressLogger + +// NewContext returns a new Context that carries the specified progress logger. +func NewContext(ctx context.Context, logger *Logger) context.Context { + return context.WithValue(ctx, progressLoggerKey, logger) +} + +// FromContext returns the progress logger value stored in ctx, if any. +func FromContext(ctx context.Context) (*Logger, bool) { + u, ok := ctx.Value(progressLoggerKey).(*Logger) + return u, ok +} diff --git a/libs/progress/event.go b/libs/progress/event.go new file mode 100644 index 00000000..a082c299 --- /dev/null +++ b/libs/progress/event.go @@ -0,0 +1,5 @@ +package progress + +type Event interface { + String() string +} diff --git a/libs/progress/logger.go b/libs/progress/logger.go new file mode 100644 index 00000000..d09f0e3d --- /dev/null +++ b/libs/progress/logger.go @@ -0,0 +1,55 @@ +package progress + +import ( + "encoding/json" + "io" + "os" + + "github.com/databricks/bricks/libs/flags" +) + +type Logger struct { + Mode flags.ProgressLogFormat + Writer io.Writer + + isFirstEvent bool +} + +func NewLogger(mode flags.ProgressLogFormat) *Logger { + return &Logger{ + Mode: mode, + Writer: os.Stderr, + isFirstEvent: true, + } +} + +func (l *Logger) Log(event Event) { + switch l.Mode { + case flags.ModeInplace: + if l.isFirstEvent { + l.Writer.Write([]byte("\n")) + } + l.Writer.Write([]byte("\033[1F")) + l.Writer.Write([]byte(event.String())) + l.Writer.Write([]byte("\n")) + + case flags.ModeJson: + b, err := json.MarshalIndent(event, "", " ") + if err != nil { + // we panic because there we cannot catch this in jobs.RunNowAndWait + panic(err) + } + l.Writer.Write([]byte(b)) + l.Writer.Write([]byte("\n")) + + case flags.ModeAppend: + l.Writer.Write([]byte(event.String())) + l.Writer.Write([]byte("\n")) + + default: + // we panic because errors are not captured in some log sides like + // jobs.RunNowAndWait + panic("unknown progress logger mode: " + l.Mode.String()) + } + l.isFirstEvent = false +}