From 207777849bfd9e96c52efc9a41e70965af9ae282 Mon Sep 17 00:00:00 2001 From: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com> Date: Thu, 16 Mar 2023 12:23:46 +0100 Subject: [PATCH] Log latest error event on pipeline run fail (#239) DAB config used to test this: bundle.yml ``` workspace: host: bundle: name: deco-538 resources: pipelines: foo: name: "[${bundle.name}] log pipeline errors" libraries: - notebook: path: ./myNb.py development: true ``` myNb.py ``` # Databricks notebook source print(1/0) ``` Before: ``` 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:28:46 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update has failed! Error: update failed ``` Now: ``` 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:29:33 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update has failed! 2023/03/09 01:29:40 [ERROR] [pipelines.foo] Update 27bc77 is FAILED. trace for most recent exception: Failed to execute python command for notebook '/Users/shreyas.goenka@databricks.com/.bundle/deco-538/default/files/myNb' with id RunnableCommandId(9070319781942164851) and error AnsiResult(--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) in () ----> 1 print(1/0) ZeroDivisionError: division by zero,Map(),Map(),List(),List(),Map()) Error: update failed ``` --- bundle/run/pipeline.go | 69 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index d222d292..939c13ca 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -10,9 +10,70 @@ import ( "github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle/config/resources" "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/fatih/color" flag "github.com/spf13/pflag" ) +func filterEventsByUpdateId(events []pipelines.PipelineEvent, updateId string) []pipelines.PipelineEvent { + result := []pipelines.PipelineEvent{} + for i := 0; i < len(events); i++ { + if events[i].Origin.UpdateId == updateId { + result = append(result, events[i]) + } + } + return result +} + +func (r *pipelineRunner) logEvent(event pipelines.PipelineEvent) { + red := color.New(color.FgRed).SprintFunc() + errorPrefix := red("[ERROR]") + pipelineKeyPrefix := red(fmt.Sprintf("[%s]", r.Key())) + eventTypePrefix := red(fmt.Sprintf("[%s]", event.EventType)) + logString := errorPrefix + pipelineKeyPrefix + eventTypePrefix + if event.Message != "" { + logString += fmt.Sprintf(" %s\n", event.Message) + } + if event.Error != nil && len(event.Error.Exceptions) > 0 { + logString += "trace for most recent exception: \n" + for i := 0; i < len(event.Error.Exceptions); i++ { + logString += fmt.Sprintf("%s\n", event.Error.Exceptions[i].Message) + } + } + if logString != errorPrefix { + log.Print(logString) + } +} + +func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, updateId string) error { + + w := r.bundle.WorkspaceClient() + + // Note: For a 100 percent correct and complete solution we should use the + // w.Pipelines.ListPipelineEventsAll method to find all relevant events. However the + // probablity of the relevant last error event not being present in the most + // recent 100 error events is very close to 0 and the first 100 error events + // should give us a good picture of the error. + // + // Otherwise for long lived pipelines, there can be a lot of unnecessary + // latency due to multiple pagination API calls needed underneath the hood for + // ListPipelineEventsAll + res, err := w.Pipelines.Impl().ListPipelineEvents(ctx, pipelines.ListPipelineEvents{ + Filter: `level='ERROR'`, + MaxResults: 100, + PipelineId: pipelineId, + }) + if err != nil { + return err + } + updateEvents := filterEventsByUpdateId(res.Events, updateId) + // The events API returns most recent events first. We iterate in a reverse order + // to print the events chronologically + for i := len(updateEvents) - 1; i >= 0; i-- { + r.logEvent(updateEvents[i]) + } + return nil +} + // PipelineOptions defines options for running a pipeline update. type PipelineOptions struct { // Perform a full graph update. @@ -102,8 +163,8 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { updateID := res.UpdateId // Log the pipeline update URL as soon as it is available. - url := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID) - log.Printf("%s Update available at %s", prefix, url) + updateUrl := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID) + log.Printf("%s Update available at %s", prefix, updateUrl) // Poll update for completion and post status. // Note: there is no "StartUpdateAndWait" wrapper for this API. @@ -127,6 +188,10 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { } if state == pipelines.UpdateInfoStateFailed { log.Printf("%s Update has failed!", prefix) + err := r.logErrorEvent(ctx, pipelineID, updateID) + if err != nil { + return err + } return fmt.Errorf("update failed") } if state == pipelines.UpdateInfoStateCompleted {