mirror of https://github.com/databricks/cli.git
107 lines
3.4 KiB
Go
107 lines
3.4 KiB
Go
package progress
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/databricks/databricks-sdk-go"
|
|
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
|
)
|
|
|
|
// The dlt backend computes events for pipeline runs which are accessable through
|
|
// the 2.0/pipelines/{pipeline_id}/events API
|
|
//
|
|
// There are 4 levels for these events: ("ERROR", "WARN", "INFO", "METRICS")
|
|
//
|
|
// Here's short introduction to a few important events we display on the console:
|
|
//
|
|
// 1. `update_progress`: A state transition occured for the entire pipeline update
|
|
// 2. `flow_progress`: A state transition occured for a single flow in the pipeine
|
|
type ProgressEvent pipelines.PipelineEvent
|
|
|
|
func (event *ProgressEvent) String() string {
|
|
result := strings.Builder{}
|
|
result.WriteString(event.Timestamp + " ")
|
|
|
|
// Print event type with some padding to make output more pretty
|
|
result.WriteString(fmt.Sprintf("%-15s", event.EventType) + " ")
|
|
|
|
result.WriteString(event.Level.String() + " ")
|
|
result.WriteString(fmt.Sprintf(`"%s"`, event.Message))
|
|
|
|
// construct error string if level=`Error`
|
|
if event.Level == pipelines.EventLevelError && event.Error != nil {
|
|
for _, exception := range event.Error.Exceptions {
|
|
result.WriteString(fmt.Sprintf("\n%s", exception.Message))
|
|
}
|
|
}
|
|
return result.String()
|
|
}
|
|
|
|
func (event *ProgressEvent) IsInplaceSupported() bool {
|
|
return false
|
|
}
|
|
|
|
// TODO: Add inplace logging to pipelines. https://github.com/databricks/cli/issues/280
|
|
type UpdateTracker struct {
|
|
UpdateId string
|
|
PipelineId string
|
|
LatestEventTimestamp string
|
|
w *databricks.WorkspaceClient
|
|
}
|
|
|
|
func NewUpdateTracker(pipelineId string, updateId string, w *databricks.WorkspaceClient) *UpdateTracker {
|
|
return &UpdateTracker{
|
|
w: w,
|
|
PipelineId: pipelineId,
|
|
UpdateId: updateId,
|
|
LatestEventTimestamp: "",
|
|
}
|
|
}
|
|
|
|
// To keep the logic simple we do not use pagination. This means that if there are
|
|
// more than 100 new events since the last query then we will miss out on progress events.
|
|
//
|
|
// This is fine because:
|
|
// 1. This should happen fairly rarely if ever
|
|
// 2. There is no expectation of the console progress logs being a complete representation
|
|
//
|
|
// # If a user needs the complete logs, they can always visit the run URL
|
|
//
|
|
// NOTE: Incase we want inplace logging, then we will need to implement pagination
|
|
func (l *UpdateTracker) Events(ctx context.Context) ([]ProgressEvent, error) {
|
|
// create filter to fetch only new events
|
|
filter := fmt.Sprintf(`update_id = '%s'`, l.UpdateId)
|
|
if l.LatestEventTimestamp != "" {
|
|
filter = filter + fmt.Sprintf(" AND timestamp > '%s'", l.LatestEventTimestamp)
|
|
}
|
|
|
|
// we only check the most recent 100 events for progress
|
|
response, err := l.w.Pipelines.Impl().ListPipelineEvents(ctx, pipelines.ListPipelineEventsRequest{
|
|
PipelineId: l.PipelineId,
|
|
MaxResults: 100,
|
|
Filter: filter,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := make([]ProgressEvent, 0)
|
|
// we iterate in reverse to return events in chronological order
|
|
for i := len(response.Events) - 1; i >= 0; i-- {
|
|
event := response.Events[i]
|
|
// filter to only include update_progress and flow_progress events
|
|
if event.EventType == "flow_progress" || event.EventType == "update_progress" {
|
|
result = append(result, ProgressEvent(event))
|
|
}
|
|
}
|
|
|
|
// update latest event timestamp for next time
|
|
if len(result) > 0 {
|
|
l.LatestEventTimestamp = result[len(result)-1].Timestamp
|
|
}
|
|
|
|
return result, nil
|
|
}
|