mirror of https://github.com/databricks/cli.git
Log latest error event on pipeline run fail (#239)
DAB config used to test this: bundle.yml ``` workspace: host: <deco-azure-prod> bundle: name: deco-538 resources: pipelines: foo: name: "[${bundle.name}] log pipeline errors" libraries: - notebook: path: ./myNb.py development: true ``` myNb.py ``` # Databricks notebook source print(1/0) ``` Before: ``` 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:28:46 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update has failed! Error: update failed ``` Now: ``` 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:29:33 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update has failed! 2023/03/09 01:29:40 [ERROR] [pipelines.foo] Update 27bc77 is FAILED. trace for most recent exception: Failed to execute python command for notebook '/Users/shreyas.goenka@databricks.com/.bundle/deco-538/default/files/myNb' with id RunnableCommandId(9070319781942164851) and error AnsiResult(--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) <command--1> in <cell line: 1>() ----> 1 print(1/0) ZeroDivisionError: division by zero,Map(),Map(),List(),List(),Map()) Error: update failed ```
This commit is contained in:
parent
dccb0aafce
commit
207777849b
|
@ -10,9 +10,70 @@ import (
|
|||
"github.com/databricks/bricks/bundle"
|
||||
"github.com/databricks/bricks/bundle/config/resources"
|
||||
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
||||
"github.com/fatih/color"
|
||||
flag "github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
func filterEventsByUpdateId(events []pipelines.PipelineEvent, updateId string) []pipelines.PipelineEvent {
|
||||
result := []pipelines.PipelineEvent{}
|
||||
for i := 0; i < len(events); i++ {
|
||||
if events[i].Origin.UpdateId == updateId {
|
||||
result = append(result, events[i])
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (r *pipelineRunner) logEvent(event pipelines.PipelineEvent) {
|
||||
red := color.New(color.FgRed).SprintFunc()
|
||||
errorPrefix := red("[ERROR]")
|
||||
pipelineKeyPrefix := red(fmt.Sprintf("[%s]", r.Key()))
|
||||
eventTypePrefix := red(fmt.Sprintf("[%s]", event.EventType))
|
||||
logString := errorPrefix + pipelineKeyPrefix + eventTypePrefix
|
||||
if event.Message != "" {
|
||||
logString += fmt.Sprintf(" %s\n", event.Message)
|
||||
}
|
||||
if event.Error != nil && len(event.Error.Exceptions) > 0 {
|
||||
logString += "trace for most recent exception: \n"
|
||||
for i := 0; i < len(event.Error.Exceptions); i++ {
|
||||
logString += fmt.Sprintf("%s\n", event.Error.Exceptions[i].Message)
|
||||
}
|
||||
}
|
||||
if logString != errorPrefix {
|
||||
log.Print(logString)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, updateId string) error {
|
||||
|
||||
w := r.bundle.WorkspaceClient()
|
||||
|
||||
// Note: For a 100 percent correct and complete solution we should use the
|
||||
// w.Pipelines.ListPipelineEventsAll method to find all relevant events. However the
|
||||
// probablity of the relevant last error event not being present in the most
|
||||
// recent 100 error events is very close to 0 and the first 100 error events
|
||||
// should give us a good picture of the error.
|
||||
//
|
||||
// Otherwise for long lived pipelines, there can be a lot of unnecessary
|
||||
// latency due to multiple pagination API calls needed underneath the hood for
|
||||
// ListPipelineEventsAll
|
||||
res, err := w.Pipelines.Impl().ListPipelineEvents(ctx, pipelines.ListPipelineEvents{
|
||||
Filter: `level='ERROR'`,
|
||||
MaxResults: 100,
|
||||
PipelineId: pipelineId,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
updateEvents := filterEventsByUpdateId(res.Events, updateId)
|
||||
// The events API returns most recent events first. We iterate in a reverse order
|
||||
// to print the events chronologically
|
||||
for i := len(updateEvents) - 1; i >= 0; i-- {
|
||||
r.logEvent(updateEvents[i])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PipelineOptions defines options for running a pipeline update.
|
||||
type PipelineOptions struct {
|
||||
// Perform a full graph update.
|
||||
|
@ -102,8 +163,8 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
|
|||
updateID := res.UpdateId
|
||||
|
||||
// Log the pipeline update URL as soon as it is available.
|
||||
url := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID)
|
||||
log.Printf("%s Update available at %s", prefix, url)
|
||||
updateUrl := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID)
|
||||
log.Printf("%s Update available at %s", prefix, updateUrl)
|
||||
|
||||
// Poll update for completion and post status.
|
||||
// Note: there is no "StartUpdateAndWait" wrapper for this API.
|
||||
|
@ -127,6 +188,10 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
|
|||
}
|
||||
if state == pipelines.UpdateInfoStateFailed {
|
||||
log.Printf("%s Update has failed!", prefix)
|
||||
err := r.logErrorEvent(ctx, pipelineID, updateID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("update failed")
|
||||
}
|
||||
if state == pipelines.UpdateInfoStateCompleted {
|
||||
|
|
Loading…
Reference in New Issue