Fixed printing the tasks in job output in DAG execution order (#377)

Fixes #259

## Changes
Sort task output in an execution order based on task end time

## Tests
Added `TestTaskJobOutputOrderToString` unit test.
This commit is contained in:
Andrew Nester 2023-05-08 16:35:47 +02:00 committed by GitHub
parent 37af3d5c4f
commit 1916bc9d68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 18 deletions

View File

@ -8,15 +8,20 @@ import (
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs"
"golang.org/x/exp/maps"
)
type TaskOutput struct {
TaskKey string
Output RunOutput
EndTime int64
}
type JobOutput struct {
// output for tasks with a non empty output
TaskOutputs map[string]RunOutput `json:"task_outputs"`
TaskOutputs []TaskOutput `json:"task_outputs"`
}
// TODO: Print the output respecting the execution order (https://github.com/databricks/bricks/issues/259)
// Returns tasks output in text form sorted in execution order based on task end time
func (out *JobOutput) String() (string, error) {
if len(out.TaskOutputs) == 0 {
return "", nil
@ -24,24 +29,24 @@ func (out *JobOutput) String() (string, error) {
// When only one task, just return that output without any formatting
if len(out.TaskOutputs) == 1 {
for _, v := range out.TaskOutputs {
return v.String()
return v.Output.String()
}
}
result := strings.Builder{}
result.WriteString("Output:\n")
taskKeys := maps.Keys(out.TaskOutputs)
sort.Strings(taskKeys)
for _, k := range taskKeys {
if out.TaskOutputs[k] == nil {
sort.Slice(out.TaskOutputs, func(i, j int) bool {
return out.TaskOutputs[i].EndTime < out.TaskOutputs[j].EndTime
})
for _, v := range out.TaskOutputs {
if v.Output == nil {
continue
}
taskString, err := out.TaskOutputs[k].String()
taskString, err := v.Output.String()
if err != nil {
return "", nil
}
result.WriteString("=======\n")
result.WriteString(fmt.Sprintf("Task %s:\n", k))
result.WriteString(fmt.Sprintf("Task %s:\n", v.TaskKey))
result.WriteString(fmt.Sprintf("%s\n", taskString))
}
return result.String(), nil
@ -55,7 +60,7 @@ func GetJobOutput(ctx context.Context, w *databricks.WorkspaceClient, runId int6
return nil, err
}
result := &JobOutput{
TaskOutputs: make(map[string]RunOutput),
TaskOutputs: make([]TaskOutput, len(jobRun.Tasks)),
}
for _, task := range jobRun.Tasks {
jobRunOutput, err := w.Jobs.GetRunOutput(ctx, jobs.GetRunOutputRequest{
@ -64,7 +69,8 @@ func GetJobOutput(ctx context.Context, w *databricks.WorkspaceClient, runId int6
if err != nil {
return nil, err
}
result.TaskOutputs[task.TaskKey] = toRunOutput(jobRunOutput)
task := TaskOutput{TaskKey: task.TaskKey, Output: toRunOutput(jobRunOutput), EndTime: task.EndTime}
result.TaskOutputs = append(result.TaskOutputs, task)
}
return result, nil
}

View File

@ -14,8 +14,8 @@ func TestSingleTaskJobOutputToString(t *testing.T) {
Truncated: true,
}
myJob := JobOutput{
TaskOutputs: map[string]RunOutput{
"my_notebook_task": &taskNotebook,
TaskOutputs: []TaskOutput{
{TaskKey: "my_notebook_task", Output: &taskNotebook, EndTime: 0},
},
}
@ -35,9 +35,9 @@ func TestMultiTaskJobOutputToString(t *testing.T) {
LogsTruncated: false,
}
myJob := JobOutput{
TaskOutputs: map[string]RunOutput{
"my_foo_task": &taskFoo,
"my_bar_task": &taskBar,
TaskOutputs: []TaskOutput{
{TaskKey: "my_bar_task", Output: &taskBar, EndTime: 0},
{TaskKey: "my_foo_task", Output: &taskFoo, EndTime: 0},
},
}
@ -57,6 +57,41 @@ foo
assert.Equal(t, expected, actual)
}
func TestTaskJobOutputOrderToString(t *testing.T) {
taskFoo := NotebookOutput{
Result: "foo",
}
taskBar := LogsOutput{
Logs: "bar",
}
taskBaz := LogsOutput{
Logs: "baz",
}
myJob := JobOutput{
TaskOutputs: []TaskOutput{
{TaskKey: "my_baz_task", Output: &taskBaz, EndTime: 1683553233331},
{TaskKey: "my_bar_task", Output: &taskBar, EndTime: 1683553223508},
{TaskKey: "my_foo_task", Output: &taskFoo, EndTime: 1683553217598},
},
}
actual, err := myJob.String()
require.NoError(t, err)
expected := `Output:
=======
Task my_foo_task:
foo
=======
Task my_bar_task:
bar
=======
Task my_baz_task:
baz
`
assert.Equal(t, expected, actual)
}
func TestNotebookOutputToRunOutput(t *testing.T) {
jobOutput := &jobs.RunOutput{
NotebookOutput: &jobs.NotebookOutput{