Log job run url using progress logger (#336)

## Changes
Logs the job url using the progress logger

## Tests
Manually
This commit is contained in:
shreyas-goenka 2023-04-18 14:40:45 +02:00 committed by GitHub
parent 85889dffb1
commit 1a7b3eef18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 53 additions and 41 deletions

View File

@ -192,6 +192,10 @@ func logProgressCallback(ctx context.Context, progressLogger *cmdio.Logger) func
return return
} }
if prevState == nil {
progressLogger.Log(progress.NewJobRunUrlEvent(i.RunPageUrl))
}
if prevState != nil && prevState.LifeCycleState == state.LifeCycleState && if prevState != nil && prevState.LifeCycleState == state.LifeCycleState &&
prevState.ResultState == state.ResultState { prevState.ResultState == state.ResultState {
return return
@ -200,12 +204,11 @@ func logProgressCallback(ctx context.Context, progressLogger *cmdio.Logger) func
} }
event := &progress.JobProgressEvent{ event := &progress.JobProgressEvent{
Timestamp: time.Now(), Timestamp: time.Now(),
JobId: i.JobId, JobId: i.JobId,
RunId: i.RunId, RunId: i.RunId,
RunName: i.RunName, RunName: i.RunName,
State: *i.State, State: *i.State,
RunPageURL: i.RunPageUrl,
} }
// log progress events to stderr // log progress events to stderr

View File

@ -12,9 +12,6 @@ import (
) )
type JobOutput struct { type JobOutput struct {
// URL of the job run
RunPageUrl string `json:"run_page_url"`
// output for tasks with a non empty output // output for tasks with a non empty output
TaskOutputs map[string]RunOutput `json:"task_outputs"` TaskOutputs map[string]RunOutput `json:"task_outputs"`
} }
@ -31,7 +28,7 @@ func (out *JobOutput) String() (string, error) {
} }
} }
result := strings.Builder{} result := strings.Builder{}
result.WriteString(fmt.Sprintf("Run URL: %s\n", out.RunPageUrl)) result.WriteString("Output:\n")
taskKeys := maps.Keys(out.TaskOutputs) taskKeys := maps.Keys(out.TaskOutputs)
sort.Strings(taskKeys) sort.Strings(taskKeys)
@ -60,8 +57,6 @@ func GetJobOutput(ctx context.Context, w *databricks.WorkspaceClient, runId int6
result := &JobOutput{ result := &JobOutput{
TaskOutputs: make(map[string]RunOutput), TaskOutputs: make(map[string]RunOutput),
} }
result.RunPageUrl = jobRun.RunPageUrl
for _, task := range jobRun.Tasks { for _, task := range jobRun.Tasks {
jobRunOutput, err := w.Jobs.GetRunOutput(ctx, jobs.GetRunOutput{ jobRunOutput, err := w.Jobs.GetRunOutput(ctx, jobs.GetRunOutput{
RunId: task.RunId, RunId: task.RunId,

View File

@ -14,7 +14,6 @@ func TestSingleTaskJobOutputToString(t *testing.T) {
Truncated: true, Truncated: true,
} }
myJob := JobOutput{ myJob := JobOutput{
RunPageUrl: "my_job_url",
TaskOutputs: map[string]RunOutput{ TaskOutputs: map[string]RunOutput{
"my_notebook_task": &taskNotebook, "my_notebook_task": &taskNotebook,
}, },
@ -36,7 +35,6 @@ func TestMultiTaskJobOutputToString(t *testing.T) {
LogsTruncated: false, LogsTruncated: false,
} }
myJob := JobOutput{ myJob := JobOutput{
RunPageUrl: "my_job_url",
TaskOutputs: map[string]RunOutput{ TaskOutputs: map[string]RunOutput{
"my_foo_task": &taskFoo, "my_foo_task": &taskFoo,
"my_bar_task": &taskBar, "my_bar_task": &taskBar,
@ -46,7 +44,7 @@ func TestMultiTaskJobOutputToString(t *testing.T) {
actual, err := myJob.String() actual, err := myJob.String()
require.NoError(t, err) require.NoError(t, err)
expected := `Run URL: my_job_url expected := `Output:
======= =======
Task my_bar_task: Task my_bar_task:
bar bar

View File

@ -168,7 +168,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp
} }
// Log the pipeline update URL as soon as it is available. // Log the pipeline update URL as soon as it is available.
progressLogger.Log(progress.NewUpdateUrlEvent(w.Config.Host, updateID, pipelineID)) progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID))
// Poll update for completion and post status. // Poll update for completion and post status.
// Note: there is no "StartUpdateAndWait" wrapper for this API. // Note: there is no "StartUpdateAndWait" wrapper for this API.

View File

@ -1,6 +1,7 @@
package progress package progress
import ( import (
"fmt"
"strings" "strings"
"time" "time"
@ -8,29 +9,22 @@ import (
) )
type JobProgressEvent struct { type JobProgressEvent struct {
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
JobId int64 `json:"job_id"` JobId int64 `json:"job_id"`
RunId int64 `json:"run_id"` RunId int64 `json:"run_id"`
RunName string `json:"run_name"` RunName string `json:"run_name"`
State jobs.RunState `json:"state"` State jobs.RunState `json:"state"`
RunPageURL string `json:"run_page_url"`
} }
func (event *JobProgressEvent) String() string { func (event *JobProgressEvent) String() string {
result := strings.Builder{} result := strings.Builder{}
result.WriteString(event.Timestamp.Format("2006-01-02 15:04:05")) result.WriteString(event.Timestamp.Format("2006-01-02 15:04:05") + " ")
result.WriteString(" ") result.WriteString(fmt.Sprintf(`"%s"`, event.RunName) + " ")
result.WriteString(event.RunName) result.WriteString(event.State.LifeCycleState.String() + " ")
result.WriteString(" ")
result.WriteString(event.State.LifeCycleState.String())
if event.State.ResultState.String() != "" { if event.State.ResultState.String() != "" {
result.WriteString(" ") result.WriteString(event.State.ResultState.String() + " ")
result.WriteString(event.State.ResultState.String())
} }
result.WriteString(" ")
result.WriteString(event.State.StateMessage) result.WriteString(event.State.StateMessage)
result.WriteString(" ")
result.WriteString(event.RunPageURL)
return result.String() return result.String()
} }

View File

@ -0,0 +1,23 @@
package progress
import "fmt"
type JobRunUrlEvent struct {
Type string `json:"type"`
Url string `json:"url"`
}
func NewJobRunUrlEvent(url string) *JobRunUrlEvent {
return &JobRunUrlEvent{
Type: "job_run_url",
Url: url,
}
}
func (event *JobRunUrlEvent) String() string {
return fmt.Sprintf("Run URL: %s\n", event.Url)
}
func (event *JobRunUrlEvent) IsInplaceSupported() bool {
return false
}

View File

@ -19,7 +19,6 @@ func TestJobProgressEventString(t *testing.T) {
ResultState: jobs.RunResultStateSuccess, ResultState: jobs.RunResultStateSuccess,
StateMessage: "state_message", StateMessage: "state_message",
}, },
RunPageURL: "run_url",
} }
assert.Equal(t, "-0001-11-30 00:00:00 run_name TERMINATED SUCCESS state_message run_url", event.String()) assert.Equal(t, "-0001-11-30 00:00:00 \"run_name\" TERMINATED SUCCESS state_message", event.String())
} }

View File

@ -2,26 +2,26 @@ package progress
import "fmt" import "fmt"
type UpdateUrlEvent struct { type PipelineUpdateUrlEvent struct {
Type string `json:"type"` Type string `json:"type"`
UpdateId string `json:"update_id"` UpdateId string `json:"update_id"`
PipelineId string `json:"pipeline_id"` PipelineId string `json:"pipeline_id"`
Url string `json:"url"` Url string `json:"url"`
} }
func NewUpdateUrlEvent(host, updateId, pipelineId string) *UpdateUrlEvent { func NewPipelineUpdateUrlEvent(host, updateId, pipelineId string) *PipelineUpdateUrlEvent {
return &UpdateUrlEvent{ return &PipelineUpdateUrlEvent{
Type: "update_url", Type: "pipeline_update_url",
UpdateId: updateId, UpdateId: updateId,
PipelineId: pipelineId, PipelineId: pipelineId,
Url: fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", host, pipelineId, updateId), Url: fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", host, pipelineId, updateId),
} }
} }
func (event *UpdateUrlEvent) String() string { func (event *PipelineUpdateUrlEvent) String() string {
return fmt.Sprintf("The update can be found at %s\n", event.Url) return fmt.Sprintf("The pipeline update can be found at %s\n", event.Url)
} }
func (event *UpdateUrlEvent) IsInplaceSupported() bool { func (event *PipelineUpdateUrlEvent) IsInplaceSupported() bool {
return false return false
} }