diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index d222d292..e3acb60d 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -4,15 +4,73 @@ import ( "context" "fmt" "log" + "net/http" "strings" "time" + "net/url" + "github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/fatih/color" flag "github.com/spf13/pflag" ) +type pipelineEventErrorException struct { + ClassName string `json:"class_name"` + Message string `json:"message"` +} + +type pipelineEventError struct { + Exceptions []pipelineEventErrorException `json:"exceptions"` +} + +type pipelineEvent struct { + Error *pipelineEventError `json:"error"` + Message string `json:"message"` +} + +type pipelineEventsResponse struct { + Events []pipelineEvent `json:"events"` +} + +// TODO: make ticket to replace this on sdk update +func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string) error { + apiClient, err := client.New(r.bundle.WorkspaceClient().Config) + if err != nil { + return err + } + filter := url.QueryEscape(`level='ERROR'`) + apiPath := fmt.Sprintf("/api/2.0/pipelines/%s/events?filter=%s", pipelineId, filter) + res := pipelineEventsResponse{} + err = apiClient.Do(ctx, http.MethodGet, apiPath, nil, &res) + if err != nil { + return err + } + if len(res.Events) == 0 { + return nil + } + red := color.New(color.FgRed).SprintFunc() + errorPrefix := fmt.Sprintf("%s [%s]", red("[ERROR]"), r.Key()) + latestEvent := res.Events[0] + logString := errorPrefix + if latestEvent.Message != "" { + logString += fmt.Sprintf(" %s\n", latestEvent.Message) + } + if latestEvent.Error != nil && len(latestEvent.Error.Exceptions) > 0 { + logString += "trace: \n" + for i := 0; i < len(latestEvent.Error.Exceptions); i++ { + logString += fmt.Sprintf("%s\n", latestEvent.Error.Exceptions[i].Message) + } + } + if logString != errorPrefix { + log.Printf(logString) + } + return nil +} + // PipelineOptions defines options for running a pipeline update. type PipelineOptions struct { // Perform a full graph update. @@ -102,8 +160,8 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { updateID := res.UpdateId // Log the pipeline update URL as soon as it is available. - url := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID) - log.Printf("%s Update available at %s", prefix, url) + updateUrl := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID) + log.Printf("%s Update available at %s", prefix, updateUrl) // Poll update for completion and post status. // Note: there is no "StartUpdateAndWait" wrapper for this API. @@ -127,6 +185,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { } if state == pipelines.UpdateInfoStateFailed { log.Printf("%s Update has failed!", prefix) + r.logErrorEvent(ctx, pipelineID) return fmt.Errorf("update failed") } if state == pipelines.UpdateInfoStateCompleted {