diff --git a/bundle/run/job.go b/bundle/run/job.go index 56004c9c..2ba82fc4 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -192,6 +192,10 @@ func logProgressCallback(ctx context.Context, progressLogger *cmdio.Logger) func return } + if prevState == nil { + progressLogger.Log(progress.NewJobRunUrlEvent(i.RunPageUrl)) + } + if prevState != nil && prevState.LifeCycleState == state.LifeCycleState && prevState.ResultState == state.ResultState { return @@ -200,12 +204,11 @@ func logProgressCallback(ctx context.Context, progressLogger *cmdio.Logger) func } event := &progress.JobProgressEvent{ - Timestamp: time.Now(), - JobId: i.JobId, - RunId: i.RunId, - RunName: i.RunName, - State: *i.State, - RunPageURL: i.RunPageUrl, + Timestamp: time.Now(), + JobId: i.JobId, + RunId: i.RunId, + RunName: i.RunName, + State: *i.State, } // log progress events to stderr diff --git a/bundle/run/output/job.go b/bundle/run/output/job.go index 15d0d91f..bb4cd60b 100644 --- a/bundle/run/output/job.go +++ b/bundle/run/output/job.go @@ -12,9 +12,6 @@ import ( ) type JobOutput struct { - // URL of the job run - RunPageUrl string `json:"run_page_url"` - // output for tasks with a non empty output TaskOutputs map[string]RunOutput `json:"task_outputs"` } @@ -31,7 +28,7 @@ func (out *JobOutput) String() (string, error) { } } result := strings.Builder{} - result.WriteString(fmt.Sprintf("Run URL: %s\n", out.RunPageUrl)) + result.WriteString("Output:\n") taskKeys := maps.Keys(out.TaskOutputs) sort.Strings(taskKeys) @@ -60,8 +57,6 @@ func GetJobOutput(ctx context.Context, w *databricks.WorkspaceClient, runId int6 result := &JobOutput{ TaskOutputs: make(map[string]RunOutput), } - result.RunPageUrl = jobRun.RunPageUrl - for _, task := range jobRun.Tasks { jobRunOutput, err := w.Jobs.GetRunOutput(ctx, jobs.GetRunOutput{ RunId: task.RunId, diff --git a/bundle/run/output/job_test.go b/bundle/run/output/job_test.go index 7852e849..12ab31a7 100644 --- a/bundle/run/output/job_test.go +++ b/bundle/run/output/job_test.go @@ -14,7 +14,6 @@ func TestSingleTaskJobOutputToString(t *testing.T) { Truncated: true, } myJob := JobOutput{ - RunPageUrl: "my_job_url", TaskOutputs: map[string]RunOutput{ "my_notebook_task": &taskNotebook, }, @@ -36,7 +35,6 @@ func TestMultiTaskJobOutputToString(t *testing.T) { LogsTruncated: false, } myJob := JobOutput{ - RunPageUrl: "my_job_url", TaskOutputs: map[string]RunOutput{ "my_foo_task": &taskFoo, "my_bar_task": &taskBar, @@ -46,7 +44,7 @@ func TestMultiTaskJobOutputToString(t *testing.T) { actual, err := myJob.String() require.NoError(t, err) - expected := `Run URL: my_job_url + expected := `Output: ======= Task my_bar_task: bar diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index 1f3184bf..fa087a9f 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -168,7 +168,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp } // Log the pipeline update URL as soon as it is available. - progressLogger.Log(progress.NewUpdateUrlEvent(w.Config.Host, updateID, pipelineID)) + progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID)) // Poll update for completion and post status. // Note: there is no "StartUpdateAndWait" wrapper for this API. diff --git a/bundle/run/progress/job.go b/bundle/run/progress/job.go index 58aaaf88..c0e2fe0e 100644 --- a/bundle/run/progress/job.go +++ b/bundle/run/progress/job.go @@ -1,6 +1,7 @@ package progress import ( + "fmt" "strings" "time" @@ -8,29 +9,22 @@ import ( ) type JobProgressEvent struct { - Timestamp time.Time `json:"timestamp"` - JobId int64 `json:"job_id"` - RunId int64 `json:"run_id"` - RunName string `json:"run_name"` - State jobs.RunState `json:"state"` - RunPageURL string `json:"run_page_url"` + Timestamp time.Time `json:"timestamp"` + JobId int64 `json:"job_id"` + RunId int64 `json:"run_id"` + RunName string `json:"run_name"` + State jobs.RunState `json:"state"` } func (event *JobProgressEvent) String() string { result := strings.Builder{} - result.WriteString(event.Timestamp.Format("2006-01-02 15:04:05")) - result.WriteString(" ") - result.WriteString(event.RunName) - result.WriteString(" ") - result.WriteString(event.State.LifeCycleState.String()) + result.WriteString(event.Timestamp.Format("2006-01-02 15:04:05") + " ") + result.WriteString(fmt.Sprintf(`"%s"`, event.RunName) + " ") + result.WriteString(event.State.LifeCycleState.String() + " ") if event.State.ResultState.String() != "" { - result.WriteString(" ") - result.WriteString(event.State.ResultState.String()) + result.WriteString(event.State.ResultState.String() + " ") } - result.WriteString(" ") result.WriteString(event.State.StateMessage) - result.WriteString(" ") - result.WriteString(event.RunPageURL) return result.String() } diff --git a/bundle/run/progress/job_events.go b/bundle/run/progress/job_events.go new file mode 100644 index 00000000..97463538 --- /dev/null +++ b/bundle/run/progress/job_events.go @@ -0,0 +1,23 @@ +package progress + +import "fmt" + +type JobRunUrlEvent struct { + Type string `json:"type"` + Url string `json:"url"` +} + +func NewJobRunUrlEvent(url string) *JobRunUrlEvent { + return &JobRunUrlEvent{ + Type: "job_run_url", + Url: url, + } +} + +func (event *JobRunUrlEvent) String() string { + return fmt.Sprintf("Run URL: %s\n", event.Url) +} + +func (event *JobRunUrlEvent) IsInplaceSupported() bool { + return false +} diff --git a/bundle/run/progress/job_test.go b/bundle/run/progress/job_test.go index 23fbd60b..31196520 100644 --- a/bundle/run/progress/job_test.go +++ b/bundle/run/progress/job_test.go @@ -19,7 +19,6 @@ func TestJobProgressEventString(t *testing.T) { ResultState: jobs.RunResultStateSuccess, StateMessage: "state_message", }, - RunPageURL: "run_url", } - assert.Equal(t, "-0001-11-30 00:00:00 run_name TERMINATED SUCCESS state_message run_url", event.String()) + assert.Equal(t, "-0001-11-30 00:00:00 \"run_name\" TERMINATED SUCCESS state_message", event.String()) } diff --git a/bundle/run/progress/pipeline_events.go b/bundle/run/progress/pipeline_events.go index a34d042f..c34cd80b 100644 --- a/bundle/run/progress/pipeline_events.go +++ b/bundle/run/progress/pipeline_events.go @@ -2,26 +2,26 @@ package progress import "fmt" -type UpdateUrlEvent struct { +type PipelineUpdateUrlEvent struct { Type string `json:"type"` UpdateId string `json:"update_id"` PipelineId string `json:"pipeline_id"` Url string `json:"url"` } -func NewUpdateUrlEvent(host, updateId, pipelineId string) *UpdateUrlEvent { - return &UpdateUrlEvent{ - Type: "update_url", +func NewPipelineUpdateUrlEvent(host, updateId, pipelineId string) *PipelineUpdateUrlEvent { + return &PipelineUpdateUrlEvent{ + Type: "pipeline_update_url", UpdateId: updateId, PipelineId: pipelineId, Url: fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", host, pipelineId, updateId), } } -func (event *UpdateUrlEvent) String() string { - return fmt.Sprintf("The update can be found at %s\n", event.Url) +func (event *PipelineUpdateUrlEvent) String() string { + return fmt.Sprintf("The pipeline update can be found at %s\n", event.Url) } -func (event *UpdateUrlEvent) IsInplaceSupported() bool { +func (event *PipelineUpdateUrlEvent) IsInplaceSupported() bool { return false }