From 1916bc9d686fc7517a65f24f7b3c797258f6fa51 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Mon, 8 May 2023 16:35:47 +0200 Subject: [PATCH] Fixed printing the tasks in job output in DAG execution order (#377) Fixes #259 ## Changes Sort task output in an execution order based on task end time ## Tests Added `TestTaskJobOutputOrderToString` unit test. --- bundle/run/output/job.go | 32 +++++++++++++++---------- bundle/run/output/job_test.go | 45 +++++++++++++++++++++++++++++++---- 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/bundle/run/output/job.go b/bundle/run/output/job.go index b99f58fb..4bea4c7a 100644 --- a/bundle/run/output/job.go +++ b/bundle/run/output/job.go @@ -8,15 +8,20 @@ import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/jobs" - "golang.org/x/exp/maps" ) +type TaskOutput struct { + TaskKey string + Output RunOutput + EndTime int64 +} + type JobOutput struct { // output for tasks with a non empty output - TaskOutputs map[string]RunOutput `json:"task_outputs"` + TaskOutputs []TaskOutput `json:"task_outputs"` } -// TODO: Print the output respecting the execution order (https://github.com/databricks/bricks/issues/259) +// Returns tasks output in text form sorted in execution order based on task end time func (out *JobOutput) String() (string, error) { if len(out.TaskOutputs) == 0 { return "", nil @@ -24,24 +29,24 @@ func (out *JobOutput) String() (string, error) { // When only one task, just return that output without any formatting if len(out.TaskOutputs) == 1 { for _, v := range out.TaskOutputs { - return v.String() + return v.Output.String() } } result := strings.Builder{} result.WriteString("Output:\n") - - taskKeys := maps.Keys(out.TaskOutputs) - sort.Strings(taskKeys) - for _, k := range taskKeys { - if out.TaskOutputs[k] == nil { + sort.Slice(out.TaskOutputs, func(i, j int) bool { + return out.TaskOutputs[i].EndTime < out.TaskOutputs[j].EndTime + }) + for _, v := range out.TaskOutputs { + if v.Output == nil { continue } - taskString, err := out.TaskOutputs[k].String() + taskString, err := v.Output.String() if err != nil { return "", nil } result.WriteString("=======\n") - result.WriteString(fmt.Sprintf("Task %s:\n", k)) + result.WriteString(fmt.Sprintf("Task %s:\n", v.TaskKey)) result.WriteString(fmt.Sprintf("%s\n", taskString)) } return result.String(), nil @@ -55,7 +60,7 @@ func GetJobOutput(ctx context.Context, w *databricks.WorkspaceClient, runId int6 return nil, err } result := &JobOutput{ - TaskOutputs: make(map[string]RunOutput), + TaskOutputs: make([]TaskOutput, len(jobRun.Tasks)), } for _, task := range jobRun.Tasks { jobRunOutput, err := w.Jobs.GetRunOutput(ctx, jobs.GetRunOutputRequest{ @@ -64,7 +69,8 @@ func GetJobOutput(ctx context.Context, w *databricks.WorkspaceClient, runId int6 if err != nil { return nil, err } - result.TaskOutputs[task.TaskKey] = toRunOutput(jobRunOutput) + task := TaskOutput{TaskKey: task.TaskKey, Output: toRunOutput(jobRunOutput), EndTime: task.EndTime} + result.TaskOutputs = append(result.TaskOutputs, task) } return result, nil } diff --git a/bundle/run/output/job_test.go b/bundle/run/output/job_test.go index 12ab31a7..80c52c3e 100644 --- a/bundle/run/output/job_test.go +++ b/bundle/run/output/job_test.go @@ -14,8 +14,8 @@ func TestSingleTaskJobOutputToString(t *testing.T) { Truncated: true, } myJob := JobOutput{ - TaskOutputs: map[string]RunOutput{ - "my_notebook_task": &taskNotebook, + TaskOutputs: []TaskOutput{ + {TaskKey: "my_notebook_task", Output: &taskNotebook, EndTime: 0}, }, } @@ -35,9 +35,9 @@ func TestMultiTaskJobOutputToString(t *testing.T) { LogsTruncated: false, } myJob := JobOutput{ - TaskOutputs: map[string]RunOutput{ - "my_foo_task": &taskFoo, - "my_bar_task": &taskBar, + TaskOutputs: []TaskOutput{ + {TaskKey: "my_bar_task", Output: &taskBar, EndTime: 0}, + {TaskKey: "my_foo_task", Output: &taskFoo, EndTime: 0}, }, } @@ -57,6 +57,41 @@ foo assert.Equal(t, expected, actual) } +func TestTaskJobOutputOrderToString(t *testing.T) { + taskFoo := NotebookOutput{ + Result: "foo", + } + taskBar := LogsOutput{ + Logs: "bar", + } + taskBaz := LogsOutput{ + Logs: "baz", + } + myJob := JobOutput{ + TaskOutputs: []TaskOutput{ + {TaskKey: "my_baz_task", Output: &taskBaz, EndTime: 1683553233331}, + {TaskKey: "my_bar_task", Output: &taskBar, EndTime: 1683553223508}, + {TaskKey: "my_foo_task", Output: &taskFoo, EndTime: 1683553217598}, + }, + } + + actual, err := myJob.String() + require.NoError(t, err) + + expected := `Output: +======= +Task my_foo_task: +foo +======= +Task my_bar_task: +bar +======= +Task my_baz_task: +baz +` + assert.Equal(t, expected, actual) +} + func TestNotebookOutputToRunOutput(t *testing.T) { jobOutput := &jobs.RunOutput{ NotebookOutput: &jobs.NotebookOutput{