Add progress logs for job runs (#276)

This commit is contained in:
shreyas-goenka 2023-03-29 14:58:09 +02:00 committed by GitHub
parent 9733cefae5
commit 8fd3dccca9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 444 additions and 17 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config/resources"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/bricks/libs/progress"
"github.com/databricks/databricks-sdk-go/retries"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/fatih/color"
@ -137,20 +138,24 @@ func (r *jobRunner) logFailedTasks(ctx context.Context, runId int64) {
yellow(task.TaskKey), task.State.LifeCycleState)
}
}
}
func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) {
jobID, err := strconv.ParseInt(r.job.ID, 10, 64)
if err != nil {
return nil, fmt.Errorf("job ID is not an integer: %s", r.job.ID)
func pullRunIdCallback(runId *int64) func(info *retries.Info[jobs.Run]) {
return func(info *retries.Info[jobs.Run]) {
i := info.Info
if i == nil {
return
}
if *runId == 0 {
*runId = i.RunId
}
}
}
func logDebugCallback(ctx context.Context, runId *int64) func(info *retries.Info[jobs.Run]) {
var prevState *jobs.RunState
var runId *int64
// This function is called each time the function below polls the run status.
update := func(info *retries.Info[jobs.Run]) {
return func(info *retries.Info[jobs.Run]) {
i := info.Info
if i == nil {
return
@ -169,11 +174,55 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) {
log.Infof(ctx, "Run status: %s", info.Info.State.LifeCycleState)
prevState = state
}
if runId == nil {
runId = &i.RunId
}
}
func logProgressCallback(ctx context.Context, progressLogger *progress.Logger) func(info *retries.Info[jobs.Run]) {
var prevState *jobs.RunState
return func(info *retries.Info[jobs.Run]) {
i := info.Info
if i == nil {
return
}
state := i.State
if state == nil {
return
}
if prevState != nil && prevState.LifeCycleState == state.LifeCycleState &&
prevState.ResultState == state.ResultState {
return
} else {
prevState = state
}
event := &JobProgressEvent{
Timestamp: time.Now(),
JobId: i.JobId,
RunId: i.RunId,
RunName: i.RunName,
State: *i.State,
RunPageURL: i.RunPageUrl,
}
// log progress events to stderr
progressLogger.Log(event)
// log progress events in using the default logger
log.Infof(ctx, event.String())
}
}
func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) {
jobID, err := strconv.ParseInt(r.job.ID, 10, 64)
if err != nil {
return nil, fmt.Errorf("job ID is not an integer: %s", r.job.ID)
}
runId := new(int64)
// construct request payload from cmd line flags args
req, err := opts.Job.toPayload(jobID)
if err != nil {
return nil, err
@ -181,11 +230,27 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) {
// Include resource key in logger.
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("resource", r.Key()))
w := r.bundle.WorkspaceClient()
run, err := w.Jobs.RunNowAndWait(ctx, *req, retries.Timeout[jobs.Run](jobRunTimeout), update)
// gets the run id from inside Jobs.RunNowAndWait
pullRunId := pullRunIdCallback(runId)
// callback to log status updates to the universal log destination.
// Called on every poll request
logDebug := logDebugCallback(ctx, runId)
// callback to log progress events. Called on every poll request
progressLogger, ok := progress.FromContext(ctx)
if !ok {
return nil, fmt.Errorf("no progress logger found")
}
logProgress := logProgressCallback(ctx, progressLogger)
run, err := w.Jobs.RunNowAndWait(ctx, *req,
retries.Timeout[jobs.Run](jobRunTimeout), pullRunId, logDebug, logProgress)
if err != nil && runId != nil {
r.logFailedTasks(ctx, *runId)
}
if err != nil {
return nil, err

View File

@ -0,0 +1,35 @@
package run
import (
"strings"
"time"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
type JobProgressEvent struct {
Timestamp time.Time `json:"timestamp"`
JobId int64 `json:"job_id"`
RunId int64 `json:"run_id"`
RunName string `json:"run_name"`
State jobs.RunState `json:"state"`
RunPageURL string `json:"run_page_url"`
}
func (event *JobProgressEvent) String() string {
result := strings.Builder{}
result.WriteString(event.Timestamp.Format("2006-01-02 15:04:05"))
result.WriteString(" ")
result.WriteString(event.RunName)
result.WriteString(" ")
result.WriteString(event.State.LifeCycleState.String())
if event.State.ResultState.String() != "" {
result.WriteString(" ")
result.WriteString(event.State.ResultState.String())
}
result.WriteString(" ")
result.WriteString(event.State.StateMessage)
result.WriteString(" ")
result.WriteString(event.RunPageURL)
return result.String()
}

View File

@ -0,0 +1,25 @@
package run
import (
"testing"
"time"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
)
func TestJobProgressEventString(t *testing.T) {
event := &JobProgressEvent{
Timestamp: time.Date(0, 0, 0, 0, 0, 0, 0, &time.Location{}),
JobId: 123,
RunId: 456,
RunName: "run_name",
State: jobs.RunState{
LifeCycleState: jobs.RunLifeCycleStateTerminated,
ResultState: jobs.RunResultStateSuccess,
StateMessage: "state_message",
},
RunPageURL: "run_url",
}
assert.Equal(t, "-0001-11-30 00:00:00 run_name TERMINATED SUCCESS state_message run_url", event.String())
}

View File

@ -1,6 +1,8 @@
package run
import flag "github.com/spf13/pflag"
import (
flag "github.com/spf13/pflag"
)
type Options struct {
Job JobOptions

View File

@ -8,6 +8,8 @@ import (
"github.com/databricks/bricks/bundle"
)
// TODO: refactor this package into service specific implementations. Its
// getting bloated. (https://github.com/databricks/bricks/issues/282)
type key string
func (k key) Key() string {

View File

@ -7,7 +7,6 @@ import (
"github.com/databricks/bricks/libs/flags"
"github.com/databricks/bricks/libs/log"
"github.com/spf13/cobra"
"golang.org/x/exp/slog"
)
@ -17,7 +16,7 @@ const (
envBricksLogFormat = "BRICKS_LOG_FORMAT"
)
func initializeLogger(ctx context.Context, cmd *cobra.Command) (context.Context, error) {
func initializeLogger(ctx context.Context) (context.Context, error) {
opts := slog.HandlerOptions{}
opts.Level = logLevel.Level()
opts.AddSource = true

View File

@ -0,0 +1,48 @@
package root
import (
"context"
"fmt"
"os"
"github.com/databricks/bricks/libs/flags"
"github.com/databricks/bricks/libs/progress"
"golang.org/x/term"
)
const envBricksProgressFormat = "BRICKS_PROGRESS_FORMAT"
func resolveModeDefault(format flags.ProgressLogFormat) flags.ProgressLogFormat {
if (logLevel.String() == "disabled" || logFile.String() != "stderr") &&
term.IsTerminal(int(os.Stderr.Fd())) {
return flags.ModeInplace
}
return flags.ModeAppend
}
func initializeProgressLogger(ctx context.Context) (context.Context, error) {
if logLevel.String() != "disabled" && logFile.String() == "stderr" &&
progressFormat == flags.ModeInplace {
return nil, fmt.Errorf("inplace progress logging cannot be used when log-file is stderr")
}
format := progressFormat
if format == flags.ModeDefault {
format = resolveModeDefault(format)
}
progressLogger := progress.NewLogger(format)
return progress.NewContext(ctx, progressLogger), nil
}
var progressFormat = flags.NewProgressLogFormat()
func init() {
// Configure defaults from environment, if applicable.
// If the provided value is invalid it is ignored.
if v, ok := os.LookupEnv(envBricksProgressFormat); ok {
progressFormat.Set(v)
}
RootCmd.PersistentFlags().Var(&progressFormat, "progress-format", "format for progress logs (append, inplace, json)")
RootCmd.RegisterFlagCompletionFunc("progress-format", progressFormat.Complete)
}

View File

@ -0,0 +1,45 @@
package root
import (
"context"
"testing"
"github.com/databricks/bricks/libs/flags"
"github.com/databricks/bricks/libs/progress"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestInitializeErrorOnIncompatibleConfig(t *testing.T) {
logLevel.Set("info")
logFile.Set("stderr")
progressFormat.Set("inplace")
_, err := initializeProgressLogger(context.Background())
assert.ErrorContains(t, err, "inplace progress logging cannot be used when log-file is stderr")
}
func TestNoErrorOnDisabledLogLevel(t *testing.T) {
logLevel.Set("disabled")
logFile.Set("stderr")
progressFormat.Set("inplace")
_, err := initializeProgressLogger(context.Background())
assert.NoError(t, err)
}
func TestNoErrorOnNonStderrLogFile(t *testing.T) {
logLevel.Set("info")
logFile.Set("stdout")
progressFormat.Set("inplace")
_, err := initializeProgressLogger(context.Background())
assert.NoError(t, err)
}
func TestDefaultLoggerModeResolution(t *testing.T) {
progressFormat = flags.NewProgressLogFormat()
require.Equal(t, progressFormat, flags.ModeDefault)
ctx, err := initializeProgressLogger(context.Background())
require.NoError(t, err)
logger, ok := progress.FromContext(ctx)
assert.True(t, ok)
assert.Equal(t, logger.Mode, flags.ModeAppend)
}

View File

@ -24,7 +24,13 @@ var RootCmd = &cobra.Command{
ctx := cmd.Context()
// Configure default logger.
ctx, err := initializeLogger(ctx, cmd)
ctx, err := initializeLogger(ctx)
if err != nil {
return err
}
// Configure progress logger
ctx, err = initializeProgressLogger(ctx)
if err != nil {
return err
}

1
go.mod
View File

@ -26,6 +26,7 @@ require (
github.com/hashicorp/terraform-json v0.16.0
golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0
golang.org/x/sync v0.1.0
golang.org/x/term v0.6.0
)
require (

2
go.sum
View File

@ -259,6 +259,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ=
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=

View File

@ -0,0 +1,58 @@
package flags
import (
"fmt"
"strings"
"github.com/spf13/cobra"
)
type ProgressLogFormat string
var (
ModeAppend = ProgressLogFormat("append")
ModeInplace = ProgressLogFormat("inplace")
ModeJson = ProgressLogFormat("json")
ModeDefault = ProgressLogFormat("default")
)
func (p *ProgressLogFormat) String() string {
return string(*p)
}
func NewProgressLogFormat() ProgressLogFormat {
return ModeDefault
}
func (p *ProgressLogFormat) Set(s string) error {
lower := strings.ToLower(s)
switch lower {
case ModeAppend.String():
*p = ProgressLogFormat(ModeAppend.String())
case ModeInplace.String():
*p = ProgressLogFormat(ModeInplace.String())
case ModeJson.String():
*p = ProgressLogFormat(ModeJson.String())
default:
valid := []string{
ModeAppend.String(),
ModeInplace.String(),
ModeJson.String(),
}
return fmt.Errorf("accepted arguments are [%s]", strings.Join(valid, ", "))
}
return nil
}
func (p *ProgressLogFormat) Type() string {
return "format"
}
// Complete is the Cobra compatible completion function for this flag.
func (f *ProgressLogFormat) Complete(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return []string{
"append",
"inplace",
"json",
}, cobra.ShellCompDirectiveNoFileComp
}

View File

@ -0,0 +1,59 @@
package flags
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestProgressFormatNonTtyDefault(t *testing.T) {
format := NewProgressLogFormat()
assert.Equal(t, format, ModeDefault)
}
func TestProgressFormatSet(t *testing.T) {
p := NewProgressLogFormat()
// invalid arg
err := p.Set("foo")
assert.ErrorContains(t, err, "accepted arguments are [append, inplace, json]")
// set json
err = p.Set("json")
assert.NoError(t, err)
assert.Equal(t, "json", p.String())
err = p.Set("JSON")
assert.NoError(t, err)
assert.Equal(t, "json", p.String())
err = p.Set("Json")
assert.NoError(t, err)
assert.Equal(t, "json", p.String())
// set append
err = p.Set("append")
assert.NoError(t, err)
assert.Equal(t, "append", p.String())
err = p.Set("Append")
assert.NoError(t, err)
assert.Equal(t, "append", p.String())
err = p.Set("APPEND")
assert.NoError(t, err)
assert.Equal(t, "append", p.String())
// set inplace
err = p.Set("inplace")
assert.NoError(t, err)
assert.Equal(t, "inplace", p.String())
err = p.Set("Inplace")
assert.NoError(t, err)
assert.Equal(t, "inplace", p.String())
err = p.Set("INPLACE")
assert.NoError(t, err)
assert.Equal(t, "inplace", p.String())
}

20
libs/progress/context.go Normal file
View File

@ -0,0 +1,20 @@
package progress
import (
"context"
)
type progressLogger int
var progressLoggerKey progressLogger
// NewContext returns a new Context that carries the specified progress logger.
func NewContext(ctx context.Context, logger *Logger) context.Context {
return context.WithValue(ctx, progressLoggerKey, logger)
}
// FromContext returns the progress logger value stored in ctx, if any.
func FromContext(ctx context.Context) (*Logger, bool) {
u, ok := ctx.Value(progressLoggerKey).(*Logger)
return u, ok
}

5
libs/progress/event.go Normal file
View File

@ -0,0 +1,5 @@
package progress
type Event interface {
String() string
}

55
libs/progress/logger.go Normal file
View File

@ -0,0 +1,55 @@
package progress
import (
"encoding/json"
"io"
"os"
"github.com/databricks/bricks/libs/flags"
)
type Logger struct {
Mode flags.ProgressLogFormat
Writer io.Writer
isFirstEvent bool
}
func NewLogger(mode flags.ProgressLogFormat) *Logger {
return &Logger{
Mode: mode,
Writer: os.Stderr,
isFirstEvent: true,
}
}
func (l *Logger) Log(event Event) {
switch l.Mode {
case flags.ModeInplace:
if l.isFirstEvent {
l.Writer.Write([]byte("\n"))
}
l.Writer.Write([]byte("\033[1F"))
l.Writer.Write([]byte(event.String()))
l.Writer.Write([]byte("\n"))
case flags.ModeJson:
b, err := json.MarshalIndent(event, "", " ")
if err != nil {
// we panic because there we cannot catch this in jobs.RunNowAndWait
panic(err)
}
l.Writer.Write([]byte(b))
l.Writer.Write([]byte("\n"))
case flags.ModeAppend:
l.Writer.Write([]byte(event.String()))
l.Writer.Write([]byte("\n"))
default:
// we panic because errors are not captured in some log sides like
// jobs.RunNowAndWait
panic("unknown progress logger mode: " + l.Mode.String())
}
l.isFirstEvent = false
}