Add progress logging for pipeline runs (#283)

Add progress logging for pipeline runs
This commit is contained in:
shreyas-goenka 2023-03-31 17:04:12 +02:00 committed by GitHub
parent 04e77102c9
commit b4a30c641c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 152 additions and 0 deletions

View File

@ -8,7 +8,10 @@ import (
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config/resources"
"github.com/databricks/bricks/bundle/run/pipeline"
"github.com/databricks/bricks/libs/flags"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/bricks/libs/progress"
"github.com/databricks/databricks-sdk-go/service/pipelines"
flag "github.com/spf13/pflag"
)
@ -157,6 +160,17 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (RunOutput, err
updateID := res.UpdateId
// setup progress logger and tracker to query events
updateTracker := pipeline.NewUpdateTracker(pipelineID, updateID, w)
progressLogger, ok := progress.FromContext(ctx)
if !ok {
return nil, fmt.Errorf("no progress logger found")
}
// Inplace logger mode is not supported for pipelines right now
if progressLogger.Mode == flags.ModeInplace {
progressLogger.Mode = flags.ModeAppend
}
// Log the pipeline update URL as soon as it is available.
updateUrl := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID)
log.Infof(ctx, "Update available at %s", updateUrl)
@ -165,6 +179,15 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (RunOutput, err
// Note: there is no "StartUpdateAndWait" wrapper for this API.
var prevState *pipelines.UpdateInfoState
for {
events, err := updateTracker.Events(ctx)
if err != nil {
return nil, err
}
for _, event := range events {
progressLogger.Log(&event)
log.Infof(ctx, event.String())
}
update, err := w.Pipelines.GetUpdateByPipelineIdAndUpdateId(ctx, pipelineID, updateID)
if err != nil {
return nil, err

View File

@ -0,0 +1,93 @@
package pipeline
import (
"context"
"fmt"
"strings"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/pipelines"
)
type ProgressEvent pipelines.PipelineEvent
func (event *ProgressEvent) String() string {
result := strings.Builder{}
result.WriteString(event.Timestamp + " ")
result.WriteString(event.EventType + " ")
// add name of the subject flow/pipeline
switch event.EventType {
case "flow_progress":
result.WriteString(event.Origin.FlowName + " ")
case "update_progress":
result.WriteString(event.Origin.PipelineName + " ")
}
result.WriteString(event.Level.String() + " ")
result.WriteString(event.Message)
return result.String()
}
// TODO: Add inplace logging to pipelines. https://github.com/databricks/bricks/issues/280
type UpdateTracker struct {
UpdateId string
PipelineId string
LatestEventTimestamp string
w *databricks.WorkspaceClient
}
func NewUpdateTracker(pipelineId string, updateId string, w *databricks.WorkspaceClient) *UpdateTracker {
return &UpdateTracker{
w: w,
PipelineId: pipelineId,
UpdateId: updateId,
LatestEventTimestamp: "",
}
}
// To keep the logic simple we do not use pagination. This means that if there are
// more than 100 new events since the last query then we will miss out on progress events.
//
// This is fine because:
// 1. This should happen fairly rarely if ever
// 2. There is no expectation of the console progress logs being a complete representation
//
// # If a user needs the complete logs, they can always visit the run URL
//
// NOTE: Incase we want inplace logging, then we will need to implement pagination
func (l *UpdateTracker) Events(ctx context.Context) ([]ProgressEvent, error) {
// create filter to fetch only new events
filter := ""
if l.LatestEventTimestamp != "" {
filter = fmt.Sprintf(`timestamp > '%s'`, l.LatestEventTimestamp)
}
// we only check the most recent 100 events for progress
response, err := l.w.Pipelines.Impl().ListPipelineEvents(ctx, pipelines.ListPipelineEvents{
PipelineId: l.PipelineId,
MaxResults: 100,
Filter: filter,
})
if err != nil {
return nil, err
}
// filter out update_progress and flow_progress events
result := make([]ProgressEvent, 0)
for _, event := range response.Events {
if event.Origin.UpdateId != l.UpdateId {
continue
}
if event.EventType == "flow_progress" || event.EventType == "update_progress" {
result = append(result, ProgressEvent(event))
}
}
// update latest event timestamp for next time
if len(result) > 0 {
l.LatestEventTimestamp = result[0].Timestamp
}
return result, nil
}

View File

@ -0,0 +1,36 @@
package pipeline
import (
"testing"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
)
func TestFlowProgressEventToString(t *testing.T) {
event := ProgressEvent{
EventType: "flow_progress",
Message: "my_message",
Level: pipelines.EventLevelInfo,
Origin: &pipelines.Origin{
FlowName: "my_flow",
PipelineName: "my_pipeline",
},
Timestamp: "2023-03-27T23:30:36.122Z",
}
assert.Equal(t, "2023-03-27T23:30:36.122Z flow_progress my_flow INFO my_message", event.String())
}
func TestUpdateProgressEventToString(t *testing.T) {
event := ProgressEvent{
EventType: "update_progress",
Message: "my_message",
Level: pipelines.EventLevelError,
Origin: &pipelines.Origin{
FlowName: "my_flow",
PipelineName: "my_pipeline",
},
Timestamp: "2023-03-27T23:30:36.122Z",
}
assert.Equal(t, "2023-03-27T23:30:36.122Z update_progress my_pipeline ERROR my_message", event.String())
}