diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index 20d80a4d..901e0a09 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -8,7 +8,10 @@ import ( "github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle/config/resources" + "github.com/databricks/bricks/bundle/run/pipeline" + "github.com/databricks/bricks/libs/flags" "github.com/databricks/bricks/libs/log" + "github.com/databricks/bricks/libs/progress" "github.com/databricks/databricks-sdk-go/service/pipelines" flag "github.com/spf13/pflag" ) @@ -157,6 +160,17 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (RunOutput, err updateID := res.UpdateId + // setup progress logger and tracker to query events + updateTracker := pipeline.NewUpdateTracker(pipelineID, updateID, w) + progressLogger, ok := progress.FromContext(ctx) + if !ok { + return nil, fmt.Errorf("no progress logger found") + } + // Inplace logger mode is not supported for pipelines right now + if progressLogger.Mode == flags.ModeInplace { + progressLogger.Mode = flags.ModeAppend + } + // Log the pipeline update URL as soon as it is available. updateUrl := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID) log.Infof(ctx, "Update available at %s", updateUrl) @@ -165,6 +179,15 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (RunOutput, err // Note: there is no "StartUpdateAndWait" wrapper for this API. var prevState *pipelines.UpdateInfoState for { + events, err := updateTracker.Events(ctx) + if err != nil { + return nil, err + } + for _, event := range events { + progressLogger.Log(&event) + log.Infof(ctx, event.String()) + } + update, err := w.Pipelines.GetUpdateByPipelineIdAndUpdateId(ctx, pipelineID, updateID) if err != nil { return nil, err diff --git a/bundle/run/pipeline/progress.go b/bundle/run/pipeline/progress.go new file mode 100644 index 00000000..94f9efd0 --- /dev/null +++ b/bundle/run/pipeline/progress.go @@ -0,0 +1,93 @@ +package pipeline + +import ( + "context" + "fmt" + "strings" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/pipelines" +) + +type ProgressEvent pipelines.PipelineEvent + +func (event *ProgressEvent) String() string { + result := strings.Builder{} + + result.WriteString(event.Timestamp + " ") + result.WriteString(event.EventType + " ") + + // add name of the subject flow/pipeline + switch event.EventType { + case "flow_progress": + result.WriteString(event.Origin.FlowName + " ") + case "update_progress": + result.WriteString(event.Origin.PipelineName + " ") + } + result.WriteString(event.Level.String() + " ") + result.WriteString(event.Message) + + return result.String() +} + +// TODO: Add inplace logging to pipelines. https://github.com/databricks/bricks/issues/280 +type UpdateTracker struct { + UpdateId string + PipelineId string + LatestEventTimestamp string + w *databricks.WorkspaceClient +} + +func NewUpdateTracker(pipelineId string, updateId string, w *databricks.WorkspaceClient) *UpdateTracker { + return &UpdateTracker{ + w: w, + PipelineId: pipelineId, + UpdateId: updateId, + LatestEventTimestamp: "", + } +} + +// To keep the logic simple we do not use pagination. This means that if there are +// more than 100 new events since the last query then we will miss out on progress events. +// +// This is fine because: +// 1. This should happen fairly rarely if ever +// 2. There is no expectation of the console progress logs being a complete representation +// +// # If a user needs the complete logs, they can always visit the run URL +// +// NOTE: Incase we want inplace logging, then we will need to implement pagination +func (l *UpdateTracker) Events(ctx context.Context) ([]ProgressEvent, error) { + // create filter to fetch only new events + filter := "" + if l.LatestEventTimestamp != "" { + filter = fmt.Sprintf(`timestamp > '%s'`, l.LatestEventTimestamp) + } + + // we only check the most recent 100 events for progress + response, err := l.w.Pipelines.Impl().ListPipelineEvents(ctx, pipelines.ListPipelineEvents{ + PipelineId: l.PipelineId, + MaxResults: 100, + Filter: filter, + }) + if err != nil { + return nil, err + } + + // filter out update_progress and flow_progress events + result := make([]ProgressEvent, 0) + for _, event := range response.Events { + if event.Origin.UpdateId != l.UpdateId { + continue + } + if event.EventType == "flow_progress" || event.EventType == "update_progress" { + result = append(result, ProgressEvent(event)) + } + } + + // update latest event timestamp for next time + if len(result) > 0 { + l.LatestEventTimestamp = result[0].Timestamp + } + return result, nil +} diff --git a/bundle/run/pipeline/progress_test.go b/bundle/run/pipeline/progress_test.go new file mode 100644 index 00000000..abc34dd8 --- /dev/null +++ b/bundle/run/pipeline/progress_test.go @@ -0,0 +1,36 @@ +package pipeline + +import ( + "testing" + + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" +) + +func TestFlowProgressEventToString(t *testing.T) { + event := ProgressEvent{ + EventType: "flow_progress", + Message: "my_message", + Level: pipelines.EventLevelInfo, + Origin: &pipelines.Origin{ + FlowName: "my_flow", + PipelineName: "my_pipeline", + }, + Timestamp: "2023-03-27T23:30:36.122Z", + } + assert.Equal(t, "2023-03-27T23:30:36.122Z flow_progress my_flow INFO my_message", event.String()) +} + +func TestUpdateProgressEventToString(t *testing.T) { + event := ProgressEvent{ + EventType: "update_progress", + Message: "my_message", + Level: pipelines.EventLevelError, + Origin: &pipelines.Origin{ + FlowName: "my_flow", + PipelineName: "my_pipeline", + }, + Timestamp: "2023-03-27T23:30:36.122Z", + } + assert.Equal(t, "2023-03-27T23:30:36.122Z update_progress my_pipeline ERROR my_message", event.String()) +}