added filter for updateId

This commit is contained in:
Shreyas Goenka 2023-03-09 01:26:14 +01:00
parent 3f6f643407
commit 0838b0d26a
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
1 changed files with 27 additions and 7 deletions

View File

@ -18,6 +18,8 @@ import (
flag "github.com/spf13/pflag" flag "github.com/spf13/pflag"
) )
// TODO: Use a sdk implementation of this API once it's incorporated in the openapi
// spec. https://databricks.atlassian.net/browse/DECO-573
type pipelineEventErrorException struct { type pipelineEventErrorException struct {
ClassName string `json:"class_name"` ClassName string `json:"class_name"`
Message string `json:"message"` Message string `json:"message"`
@ -27,23 +29,27 @@ type pipelineEventError struct {
Exceptions []pipelineEventErrorException `json:"exceptions"` Exceptions []pipelineEventErrorException `json:"exceptions"`
} }
type pipelineEventOrigin struct {
UpdateId string `json:"update_id"`
}
type pipelineEvent struct { type pipelineEvent struct {
Error *pipelineEventError `json:"error"` Error *pipelineEventError `json:"error"`
Message string `json:"message"` Message string `json:"message"`
Origin pipelineEventOrigin `json:"origin"`
} }
type pipelineEventsResponse struct { type pipelineEventsResponse struct {
Events []pipelineEvent `json:"events"` Events []pipelineEvent `json:"events"`
} }
// TODO: make ticket to replace this on sdk update func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, updateId string) error {
func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string) error {
apiClient, err := client.New(r.bundle.WorkspaceClient().Config) apiClient, err := client.New(r.bundle.WorkspaceClient().Config)
if err != nil { if err != nil {
return err return err
} }
filter := url.QueryEscape(`level='ERROR'`) filter := url.QueryEscape(`level='ERROR'`)
apiPath := fmt.Sprintf("/api/2.0/pipelines/%s/events?filter=%s", pipelineId, filter) apiPath := fmt.Sprintf("/api/2.0/pipelines/%s/events?filter=%s&max_results=100", pipelineId, filter)
res := pipelineEventsResponse{} res := pipelineEventsResponse{}
err = apiClient.Do(ctx, http.MethodGet, apiPath, nil, &res) err = apiClient.Do(ctx, http.MethodGet, apiPath, nil, &res)
if err != nil { if err != nil {
@ -52,21 +58,35 @@ func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string) e
if len(res.Events) == 0 { if len(res.Events) == 0 {
return nil return nil
} }
var latestEvent *pipelineEvent
// Note: For a 100 percent correct solution we should use the pagination token to find
// a last event which took place for updateId incase it's not present in the first 100 events.
// However the changes of the error event not being present in the last 100 events
// for the pipeline are pretty much 0, and this would not be worth the additional complexity
// and latency cost for that extremely rare edge case
for i := 0; i < len(res.Events); i++ {
if res.Events[i].Origin.UpdateId == updateId {
latestEvent = &res.Events[i]
break
}
}
if latestEvent == nil {
return nil
}
red := color.New(color.FgRed).SprintFunc() red := color.New(color.FgRed).SprintFunc()
errorPrefix := fmt.Sprintf("%s [%s]", red("[ERROR]"), r.Key()) errorPrefix := fmt.Sprintf("%s [%s]", red("[ERROR]"), r.Key())
latestEvent := res.Events[0]
logString := errorPrefix logString := errorPrefix
if latestEvent.Message != "" { if latestEvent.Message != "" {
logString += fmt.Sprintf(" %s\n", latestEvent.Message) logString += fmt.Sprintf(" %s\n", latestEvent.Message)
} }
if latestEvent.Error != nil && len(latestEvent.Error.Exceptions) > 0 { if latestEvent.Error != nil && len(latestEvent.Error.Exceptions) > 0 {
logString += "trace: \n" logString += "trace for most recent exception: \n"
for i := 0; i < len(latestEvent.Error.Exceptions); i++ { for i := 0; i < len(latestEvent.Error.Exceptions); i++ {
logString += fmt.Sprintf("%s\n", latestEvent.Error.Exceptions[i].Message) logString += fmt.Sprintf("%s\n", latestEvent.Error.Exceptions[i].Message)
} }
} }
if logString != errorPrefix { if logString != errorPrefix {
log.Printf(logString) log.Print(logString)
} }
return nil return nil
} }
@ -185,7 +205,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
} }
if state == pipelines.UpdateInfoStateFailed { if state == pipelines.UpdateInfoStateFailed {
log.Printf("%s Update has failed!", prefix) log.Printf("%s Update has failed!", prefix)
r.logErrorEvent(ctx, pipelineID) r.logErrorEvent(ctx, pipelineID, updateID)
return fmt.Errorf("update failed") return fmt.Errorf("update failed")
} }
if state == pipelines.UpdateInfoStateCompleted { if state == pipelines.UpdateInfoStateCompleted {