mirror of https://github.com/databricks/cli.git
Fixes for pipeline progress logging (#330)
## Changes 1. Events are now printed in chronological order 2. Simplify events rendering by removing update/flow name. This makes it more consistent with the web UI too 3. Switch to server side filtering on update_id ## Tests Manually Happy run: ``` shreyas.goenka@THW32HFW6T pipeline-progress % bricks bundle run foo 2023-04-12T20:00:22.879Z update_progress INFO "Update e1becc is INITIALIZING." 2023-04-12T20:00:22.906Z update_progress INFO "Update e1becc is SETTING_UP_TABLES." 2023-04-12T20:00:24.496Z update_progress INFO "Update e1becc is RUNNING." 2023-04-12T20:00:24.497Z flow_progress INFO "Flow 'sales_orders_raw' is QUEUED." 2023-04-12T20:00:24.586Z flow_progress INFO "Flow 'sales_orders_raw' is STARTING." 2023-04-12T20:00:24.748Z flow_progress INFO "Flow 'sales_orders_raw' is RUNNING." 2023-04-12T20:00:26.672Z flow_progress INFO "Flow 'sales_orders_raw' has COMPLETED." 2023-04-12T20:00:27.753Z update_progress INFO "Update e1becc is COMPLETED." ``` Sad run: ``` shreyas.goenka@THW32HFW6T pipeline-progress % bricks bundle run foo 2023-04-12T20:02:07.764Z update_progress INFO "Update 04b80e is INITIALIZING." 2023-04-12T20:02:07.870Z update_progress ERROR "Update 04b80e is FAILED." Error: update failed ```
This commit is contained in:
parent
3894d5796d
commit
df0293510e
|
@ -15,17 +15,12 @@ func (event *ProgressEvent) String() string {
|
||||||
result := strings.Builder{}
|
result := strings.Builder{}
|
||||||
|
|
||||||
result.WriteString(event.Timestamp + " ")
|
result.WriteString(event.Timestamp + " ")
|
||||||
result.WriteString(event.EventType + " ")
|
|
||||||
|
|
||||||
// add name of the subject flow/pipeline
|
// Print event type with some padding to make output more pretty
|
||||||
switch event.EventType {
|
result.WriteString(fmt.Sprintf("%-15s", event.EventType) + " ")
|
||||||
case "flow_progress":
|
|
||||||
result.WriteString(event.Origin.FlowName + " ")
|
|
||||||
case "update_progress":
|
|
||||||
result.WriteString(event.Origin.PipelineName + " ")
|
|
||||||
}
|
|
||||||
result.WriteString(event.Level.String() + " ")
|
result.WriteString(event.Level.String() + " ")
|
||||||
result.WriteString(event.Message)
|
result.WriteString(fmt.Sprintf(`"%s"`, event.Message))
|
||||||
|
|
||||||
return result.String()
|
return result.String()
|
||||||
}
|
}
|
||||||
|
@ -59,9 +54,9 @@ func NewUpdateTracker(pipelineId string, updateId string, w *databricks.Workspac
|
||||||
// NOTE: Incase we want inplace logging, then we will need to implement pagination
|
// NOTE: Incase we want inplace logging, then we will need to implement pagination
|
||||||
func (l *UpdateTracker) Events(ctx context.Context) ([]ProgressEvent, error) {
|
func (l *UpdateTracker) Events(ctx context.Context) ([]ProgressEvent, error) {
|
||||||
// create filter to fetch only new events
|
// create filter to fetch only new events
|
||||||
filter := ""
|
filter := fmt.Sprintf(`update_id = '%s'`, l.UpdateId)
|
||||||
if l.LatestEventTimestamp != "" {
|
if l.LatestEventTimestamp != "" {
|
||||||
filter = fmt.Sprintf(`timestamp > '%s'`, l.LatestEventTimestamp)
|
filter = filter + fmt.Sprintf(" AND timestamp > '%s'", l.LatestEventTimestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// we only check the most recent 100 events for progress
|
// we only check the most recent 100 events for progress
|
||||||
|
@ -74,12 +69,11 @@ func (l *UpdateTracker) Events(ctx context.Context) ([]ProgressEvent, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// filter out update_progress and flow_progress events
|
|
||||||
result := make([]ProgressEvent, 0)
|
result := make([]ProgressEvent, 0)
|
||||||
for _, event := range response.Events {
|
// we iterate in reverse to return events in chronological order
|
||||||
if event.Origin.UpdateId != l.UpdateId {
|
for i := len(response.Events) - 1; i >= 0; i-- {
|
||||||
continue
|
event := response.Events[i]
|
||||||
}
|
// filter to only include update_progress and flow_progress events
|
||||||
if event.EventType == "flow_progress" || event.EventType == "update_progress" {
|
if event.EventType == "flow_progress" || event.EventType == "update_progress" {
|
||||||
result = append(result, ProgressEvent(event))
|
result = append(result, ProgressEvent(event))
|
||||||
}
|
}
|
||||||
|
@ -87,7 +81,8 @@ func (l *UpdateTracker) Events(ctx context.Context) ([]ProgressEvent, error) {
|
||||||
|
|
||||||
// update latest event timestamp for next time
|
// update latest event timestamp for next time
|
||||||
if len(result) > 0 {
|
if len(result) > 0 {
|
||||||
l.LatestEventTimestamp = result[0].Timestamp
|
l.LatestEventTimestamp = result[len(result)-1].Timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ func TestFlowProgressEventToString(t *testing.T) {
|
||||||
},
|
},
|
||||||
Timestamp: "2023-03-27T23:30:36.122Z",
|
Timestamp: "2023-03-27T23:30:36.122Z",
|
||||||
}
|
}
|
||||||
assert.Equal(t, "2023-03-27T23:30:36.122Z flow_progress my_flow INFO my_message", event.String())
|
assert.Equal(t, `2023-03-27T23:30:36.122Z flow_progress INFO "my_message"`, event.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdateProgressEventToString(t *testing.T) {
|
func TestUpdateProgressEventToString(t *testing.T) {
|
||||||
|
@ -32,5 +32,5 @@ func TestUpdateProgressEventToString(t *testing.T) {
|
||||||
},
|
},
|
||||||
Timestamp: "2023-03-27T23:30:36.122Z",
|
Timestamp: "2023-03-27T23:30:36.122Z",
|
||||||
}
|
}
|
||||||
assert.Equal(t, "2023-03-27T23:30:36.122Z update_progress my_pipeline ERROR my_message", event.String())
|
assert.Equal(t, `2023-03-27T23:30:36.122Z update_progress ERROR "my_message"`, event.String())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue