Show error trace on pipeline run failure

This commit is contained in:
Shreyas Goenka 2023-03-09 01:07:38 +01:00
parent f0c35a2b27
commit 3f6f643407
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
1 changed files with 61 additions and 2 deletions

View File

@ -4,15 +4,73 @@ import (
"context"
"fmt"
"log"
"net/http"
"strings"
"time"
"net/url"
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/fatih/color"
flag "github.com/spf13/pflag"
)
type pipelineEventErrorException struct {
ClassName string `json:"class_name"`
Message string `json:"message"`
}
type pipelineEventError struct {
Exceptions []pipelineEventErrorException `json:"exceptions"`
}
type pipelineEvent struct {
Error *pipelineEventError `json:"error"`
Message string `json:"message"`
}
type pipelineEventsResponse struct {
Events []pipelineEvent `json:"events"`
}
// TODO: make ticket to replace this on sdk update
func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string) error {
apiClient, err := client.New(r.bundle.WorkspaceClient().Config)
if err != nil {
return err
}
filter := url.QueryEscape(`level='ERROR'`)
apiPath := fmt.Sprintf("/api/2.0/pipelines/%s/events?filter=%s", pipelineId, filter)
res := pipelineEventsResponse{}
err = apiClient.Do(ctx, http.MethodGet, apiPath, nil, &res)
if err != nil {
return err
}
if len(res.Events) == 0 {
return nil
}
red := color.New(color.FgRed).SprintFunc()
errorPrefix := fmt.Sprintf("%s [%s]", red("[ERROR]"), r.Key())
latestEvent := res.Events[0]
logString := errorPrefix
if latestEvent.Message != "" {
logString += fmt.Sprintf(" %s\n", latestEvent.Message)
}
if latestEvent.Error != nil && len(latestEvent.Error.Exceptions) > 0 {
logString += "trace: \n"
for i := 0; i < len(latestEvent.Error.Exceptions); i++ {
logString += fmt.Sprintf("%s\n", latestEvent.Error.Exceptions[i].Message)
}
}
if logString != errorPrefix {
log.Printf(logString)
}
return nil
}
// PipelineOptions defines options for running a pipeline update.
type PipelineOptions struct {
// Perform a full graph update.
@ -102,8 +160,8 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
updateID := res.UpdateId
// Log the pipeline update URL as soon as it is available.
url := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID)
log.Printf("%s Update available at %s", prefix, url)
updateUrl := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID)
log.Printf("%s Update available at %s", prefix, updateUrl)
// Poll update for completion and post status.
// Note: there is no "StartUpdateAndWait" wrapper for this API.
@ -127,6 +185,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
}
if state == pipelines.UpdateInfoStateFailed {
log.Printf("%s Update has failed!", prefix)
r.logErrorEvent(ctx, pipelineID)
return fmt.Errorf("update failed")
}
if state == pipelines.UpdateInfoStateCompleted {