diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index e3acb60d3..52d5fe81c 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -18,6 +18,8 @@ import ( flag "github.com/spf13/pflag" ) +// TODO: Use a sdk implementation of this API once it's incorporated in the openapi +// spec. https://databricks.atlassian.net/browse/DECO-573 type pipelineEventErrorException struct { ClassName string `json:"class_name"` Message string `json:"message"` @@ -27,23 +29,27 @@ type pipelineEventError struct { Exceptions []pipelineEventErrorException `json:"exceptions"` } +type pipelineEventOrigin struct { + UpdateId string `json:"update_id"` +} + type pipelineEvent struct { Error *pipelineEventError `json:"error"` Message string `json:"message"` + Origin pipelineEventOrigin `json:"origin"` } type pipelineEventsResponse struct { Events []pipelineEvent `json:"events"` } -// TODO: make ticket to replace this on sdk update -func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string) error { +func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, updateId string) error { apiClient, err := client.New(r.bundle.WorkspaceClient().Config) if err != nil { return err } filter := url.QueryEscape(`level='ERROR'`) - apiPath := fmt.Sprintf("/api/2.0/pipelines/%s/events?filter=%s", pipelineId, filter) + apiPath := fmt.Sprintf("/api/2.0/pipelines/%s/events?filter=%s&max_results=100", pipelineId, filter) res := pipelineEventsResponse{} err = apiClient.Do(ctx, http.MethodGet, apiPath, nil, &res) if err != nil { @@ -52,21 +58,35 @@ func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string) e if len(res.Events) == 0 { return nil } + var latestEvent *pipelineEvent + // Note: For a 100 percent correct solution we should use the pagination token to find + // a last event which took place for updateId incase it's not present in the first 100 events. + // However the changes of the error event not being present in the last 100 events + // for the pipeline are pretty much 0, and this would not be worth the additional complexity + // and latency cost for that extremely rare edge case + for i := 0; i < len(res.Events); i++ { + if res.Events[i].Origin.UpdateId == updateId { + latestEvent = &res.Events[i] + break + } + } + if latestEvent == nil { + return nil + } red := color.New(color.FgRed).SprintFunc() errorPrefix := fmt.Sprintf("%s [%s]", red("[ERROR]"), r.Key()) - latestEvent := res.Events[0] logString := errorPrefix if latestEvent.Message != "" { logString += fmt.Sprintf(" %s\n", latestEvent.Message) } if latestEvent.Error != nil && len(latestEvent.Error.Exceptions) > 0 { - logString += "trace: \n" + logString += "trace for most recent exception: \n" for i := 0; i < len(latestEvent.Error.Exceptions); i++ { logString += fmt.Sprintf("%s\n", latestEvent.Error.Exceptions[i].Message) } } if logString != errorPrefix { - log.Printf(logString) + log.Print(logString) } return nil } @@ -185,7 +205,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { } if state == pipelines.UpdateInfoStateFailed { log.Printf("%s Update has failed!", prefix) - r.logErrorEvent(ctx, pipelineID) + r.logErrorEvent(ctx, pipelineID, updateID) return fmt.Errorf("update failed") } if state == pipelines.UpdateInfoStateCompleted {