Add job run output logging (#260)

This PR adds output logging for job runs

Tested using unit tests and manually
This commit is contained in:
shreyas-goenka 2023-03-21 16:25:18 +01:00 committed by GitHub
parent e7a7e5b95a
commit 047a189c1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 426 additions and 22 deletions

View File

@ -140,10 +140,10 @@ func (r *jobRunner) logFailedTasks(ctx context.Context, runId int64) {
} }
func (r *jobRunner) Run(ctx context.Context, opts *Options) error { func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) {
jobID, err := strconv.ParseInt(r.job.ID, 10, 64) jobID, err := strconv.ParseInt(r.job.ID, 10, 64)
if err != nil { if err != nil {
return fmt.Errorf("job ID is not an integer: %s", r.job.ID) return nil, fmt.Errorf("job ID is not an integer: %s", r.job.ID)
} }
var prevState *jobs.RunState var prevState *jobs.RunState
@ -176,7 +176,7 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) error {
req, err := opts.Job.toPayload(jobID) req, err := opts.Job.toPayload(jobID)
if err != nil { if err != nil {
return err return nil, err
} }
// Include resource key in logger. // Include resource key in logger.
@ -188,34 +188,34 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) error {
} }
if err != nil { if err != nil {
return err return nil, err
} }
if run.State.LifeCycleState == jobs.RunLifeCycleStateSkipped { if run.State.LifeCycleState == jobs.RunLifeCycleStateSkipped {
log.Infof(ctx, "Run was skipped!") log.Infof(ctx, "Run was skipped!")
return fmt.Errorf("run skipped: %s", run.State.StateMessage) return nil, fmt.Errorf("run skipped: %s", run.State.StateMessage)
} }
switch run.State.ResultState { switch run.State.ResultState {
// The run was canceled at user request. // The run was canceled at user request.
case jobs.RunResultStateCanceled: case jobs.RunResultStateCanceled:
log.Infof(ctx, "Run was cancelled!") log.Infof(ctx, "Run was cancelled!")
return fmt.Errorf("run canceled: %s", run.State.StateMessage) return nil, fmt.Errorf("run canceled: %s", run.State.StateMessage)
// The task completed with an error. // The task completed with an error.
case jobs.RunResultStateFailed: case jobs.RunResultStateFailed:
log.Infof(ctx, "Run has failed!") log.Infof(ctx, "Run has failed!")
return fmt.Errorf("run failed: %s", run.State.StateMessage) return nil, fmt.Errorf("run failed: %s", run.State.StateMessage)
// The task completed successfully. // The task completed successfully.
case jobs.RunResultStateSuccess: case jobs.RunResultStateSuccess:
log.Infof(ctx, "Run has completed successfully!") log.Infof(ctx, "Run has completed successfully!")
return nil return getJobOutput(ctx, r.bundle.WorkspaceClient(), *runId)
// The run was stopped after reaching the timeout. // The run was stopped after reaching the timeout.
case jobs.RunResultStateTimedout: case jobs.RunResultStateTimedout:
log.Infof(ctx, "Run has timed out!") log.Infof(ctx, "Run has timed out!")
return fmt.Errorf("run timed out: %s", run.State.StateMessage) return nil, fmt.Errorf("run timed out: %s", run.State.StateMessage)
} }
return err return nil, err
} }

72
bundle/run/job_output.go Normal file
View File

@ -0,0 +1,72 @@
package run
import (
"context"
"fmt"
"sort"
"strings"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs"
"golang.org/x/exp/maps"
)
type JobOutput struct {
// URL of the job run
RunPageUrl string
// output for tasks with a non empty output
TaskOutputs map[string]RunOutput
}
// TODO: Print the output respecting the execution order (https://github.com/databricks/bricks/issues/259)
func (out *JobOutput) String() (string, error) {
if len(out.TaskOutputs) == 0 {
return "", nil
}
// When only one task, just return that output without any formatting
if len(out.TaskOutputs) == 1 {
for _, v := range out.TaskOutputs {
return v.String()
}
}
result := strings.Builder{}
result.WriteString(fmt.Sprintf("Run URL: %s\n", out.RunPageUrl))
taskKeys := maps.Keys(out.TaskOutputs)
sort.Strings(taskKeys)
for _, k := range taskKeys {
taskString, err := out.TaskOutputs[k].String()
if err != nil {
return "", nil
}
result.WriteString("=======\n")
result.WriteString(fmt.Sprintf("Task %s:\n", k))
result.WriteString(fmt.Sprintf("%s\n", taskString))
}
return result.String(), nil
}
func getJobOutput(ctx context.Context, w *databricks.WorkspaceClient, runId int64) (*JobOutput, error) {
jobRun, err := w.Jobs.GetRun(ctx, jobs.GetRun{
RunId: runId,
})
if err != nil {
return nil, err
}
result := &JobOutput{
TaskOutputs: make(map[string]RunOutput),
}
result.RunPageUrl = jobRun.RunPageUrl
for _, task := range jobRun.Tasks {
jobRunOutput, err := w.Jobs.GetRunOutput(ctx, jobs.GetRunOutput{
RunId: task.RunId,
})
if err != nil {
return nil, err
}
result.TaskOutputs[task.TaskKey] = toRunOutput(jobRunOutput)
}
return result, nil
}

View File

@ -0,0 +1,126 @@
package run
import (
"testing"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestSingleTaskJobOutputToString(t *testing.T) {
taskNotebook := NotebookOutput{
Result: "foo",
Truncated: true,
}
myJob := JobOutput{
RunPageUrl: "my_job_url",
TaskOutputs: map[string]RunOutput{
"my_notebook_task": &taskNotebook,
},
}
actual, err := myJob.String()
require.NoError(t, err)
expected := "foo\n[truncated...]\n"
assert.Equal(t, expected, actual)
}
func TestMultiTaskJobOutputToString(t *testing.T) {
taskFoo := NotebookOutput{
Result: "foo",
Truncated: true,
}
taskBar := LogsOutput{
Logs: "bar",
LogsTruncated: false,
}
myJob := JobOutput{
RunPageUrl: "my_job_url",
TaskOutputs: map[string]RunOutput{
"my_foo_task": &taskFoo,
"my_bar_task": &taskBar,
},
}
actual, err := myJob.String()
require.NoError(t, err)
expected := `Run URL: my_job_url
=======
Task my_bar_task:
bar
=======
Task my_foo_task:
foo
[truncated...]
`
assert.Equal(t, expected, actual)
}
func TestNotebookOutputToRunOutput(t *testing.T) {
jobOutput := &jobs.RunOutput{
NotebookOutput: &jobs.NotebookOutput{
Result: "foo",
Truncated: true,
},
Logs: "hello :)",
LogsTruncated: true,
}
actual := toRunOutput(jobOutput)
expected := &NotebookOutput{
Result: "foo",
Truncated: true,
}
assert.Equal(t, expected, actual)
}
func TestDbtOutputToRunOutput(t *testing.T) {
jobOutput := &jobs.RunOutput{
DbtOutput: &jobs.DbtOutput{
ArtifactsLink: "foo",
},
Logs: "hello :)",
}
actual := toRunOutput(jobOutput)
expected := &DbtOutput{
ArtifactsLink: "foo",
}
assert.Equal(t, expected, actual)
}
func TestSqlOutputToRunOutput(t *testing.T) {
jobOutput := &jobs.RunOutput{
SqlOutput: &jobs.SqlOutput{
QueryOutput: &jobs.SqlQueryOutput{
OutputLink: "foo",
},
},
Logs: "hello :)",
}
actual := toRunOutput(jobOutput)
expected := &SqlOutput{
QueryOutput: &jobs.SqlQueryOutput{
OutputLink: "foo",
},
}
assert.Equal(t, expected, actual)
}
func TestLogOutputToRunOutput(t *testing.T) {
jobOutput := &jobs.RunOutput{
Logs: "hello :)",
LogsTruncated: true,
}
actual := toRunOutput(jobOutput)
expected := &LogsOutput{
Logs: "hello :)",
LogsTruncated: true,
}
assert.Equal(t, expected, actual)
}

View File

@ -133,7 +133,7 @@ type pipelineRunner struct {
pipeline *resources.Pipeline pipeline *resources.Pipeline
} }
func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) {
var pipelineID = r.pipeline.ID var pipelineID = r.pipeline.ID
// Include resource key in logger. // Include resource key in logger.
@ -142,17 +142,17 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
_, err := w.Pipelines.GetByPipelineId(ctx, pipelineID) _, err := w.Pipelines.GetByPipelineId(ctx, pipelineID)
if err != nil { if err != nil {
log.Warnf(ctx, "Cannot get pipeline: %s", err) log.Warnf(ctx, "Cannot get pipeline: %s", err)
return err return nil, err
} }
req, err := opts.Pipeline.toPayload(pipelineID) req, err := opts.Pipeline.toPayload(pipelineID)
if err != nil { if err != nil {
return err return nil, err
} }
res, err := w.Pipelines.StartUpdate(ctx, *req) res, err := w.Pipelines.StartUpdate(ctx, *req)
if err != nil { if err != nil {
return err return nil, err
} }
updateID := res.UpdateId updateID := res.UpdateId
@ -167,7 +167,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
for { for {
update, err := w.Pipelines.GetUpdateByPipelineIdAndUpdateId(ctx, pipelineID, updateID) update, err := w.Pipelines.GetUpdateByPipelineIdAndUpdateId(ctx, pipelineID, updateID)
if err != nil { if err != nil {
return err return nil, err
} }
// Log only if the current state is different from the previous state. // Log only if the current state is different from the previous state.
@ -179,19 +179,19 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
if state == pipelines.UpdateInfoStateCanceled { if state == pipelines.UpdateInfoStateCanceled {
log.Infof(ctx, "Update was cancelled!") log.Infof(ctx, "Update was cancelled!")
return fmt.Errorf("update cancelled") return nil, fmt.Errorf("update cancelled")
} }
if state == pipelines.UpdateInfoStateFailed { if state == pipelines.UpdateInfoStateFailed {
log.Infof(ctx, "Update has failed!") log.Infof(ctx, "Update has failed!")
err := r.logErrorEvent(ctx, pipelineID, updateID) err := r.logErrorEvent(ctx, pipelineID, updateID)
if err != nil { if err != nil {
return err return nil, err
} }
return fmt.Errorf("update failed") return nil, fmt.Errorf("update failed")
} }
if state == pipelines.UpdateInfoStateCompleted { if state == pipelines.UpdateInfoStateCompleted {
log.Infof(ctx, "Update has completed successfully!") log.Infof(ctx, "Update has completed successfully!")
return nil return nil, nil
} }
time.Sleep(time.Second) time.Sleep(time.Second)

View File

@ -14,6 +14,10 @@ func (k key) Key() string {
return string(k) return string(k)
} }
type RunOutput interface {
String() (string, error)
}
// Runner defines the interface for a runnable resource (or workload). // Runner defines the interface for a runnable resource (or workload).
type Runner interface { type Runner interface {
// Key returns the fully qualified (unique) identifier for this runnable resource. // Key returns the fully qualified (unique) identifier for this runnable resource.
@ -21,7 +25,7 @@ type Runner interface {
Key() string Key() string
// Run the underlying worklow. // Run the underlying worklow.
Run(ctx context.Context, opts *Options) error Run(ctx context.Context, opts *Options) (RunOutput, error)
} }
// Find locates a runner matching the specified argument. // Find locates a runner matching the specified argument.

88
bundle/run/task_output.go Normal file
View File

@ -0,0 +1,88 @@
package run
import (
"encoding/json"
"fmt"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
type NotebookOutput jobs.NotebookOutput
type DbtOutput jobs.DbtOutput
type SqlOutput jobs.SqlOutput
type LogsOutput struct {
Logs string
LogsTruncated bool
}
func structToString(val interface{}) (string, error) {
b, err := json.MarshalIndent(val, "", " ")
if err != nil {
return "", err
}
return string(b), nil
}
func (out *NotebookOutput) String() (string, error) {
if out.Truncated {
return fmt.Sprintf("%s\n[truncated...]\n", out.Result), nil
}
return out.Result, nil
}
func (out *DbtOutput) String() (string, error) {
outputString, err := structToString(out)
if err != nil {
return "", err
}
// We add this prefix to make this output non machine readable.
// JSON is used because it's a convenient representation.
// If user needs machine parsable output, they can use the --output json
// flag
return fmt.Sprintf("Dbt Task Output:\n%s", outputString), nil
}
func (out *SqlOutput) String() (string, error) {
outputString, err := structToString(out)
if err != nil {
return "", err
}
// We add this prefix to make this output non machine readable.
// JSON is used because it's a convenient representation.
// If user needs machine parsable output, they can use the --output json
// flag
return fmt.Sprintf("SQL Task Output:\n%s", outputString), nil
}
func (out *LogsOutput) String() (string, error) {
if out.LogsTruncated {
return fmt.Sprintf("%s\n[truncated...]\n", out.Logs), nil
}
return out.Logs, nil
}
func toRunOutput(output *jobs.RunOutput) RunOutput {
switch {
case output.NotebookOutput != nil:
result := NotebookOutput(*output.NotebookOutput)
return &result
case output.DbtOutput != nil:
result := DbtOutput(*output.DbtOutput)
return &result
case output.SqlOutput != nil:
result := SqlOutput(*output.SqlOutput)
return &result
// Corresponds to JAR, python script and python wheel tasks
case output.Logs != "":
result := LogsOutput{
Logs: output.Logs,
LogsTruncated: output.LogsTruncated,
}
return &result
default:
return nil
}
}

View File

@ -0,0 +1,90 @@
package run
import (
"fmt"
"testing"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNotebookOutputToString(t *testing.T) {
taskFoo := NotebookOutput{
Result: "foo",
Truncated: true,
}
taskBar := NotebookOutput{
Result: "bar",
Truncated: false,
}
actualFoo, err := taskFoo.String()
require.NoError(t, err)
assert.Equal(t, "foo\n[truncated...]\n", actualFoo)
actualBar, err := taskBar.String()
require.NoError(t, err)
assert.Equal(t, "bar", actualBar)
}
func TestLogsOutputToString(t *testing.T) {
taskFoo := LogsOutput{
Logs: "foo",
LogsTruncated: true,
}
taskBar := LogsOutput{
Logs: "bar",
LogsTruncated: false,
}
actualFoo, err := taskFoo.String()
require.NoError(t, err)
assert.Equal(t, "foo\n[truncated...]\n", actualFoo)
actualBar, err := taskBar.String()
require.NoError(t, err)
assert.Equal(t, "bar", actualBar)
}
func TestDbtOutputToString(t *testing.T) {
task := DbtOutput{
ArtifactsHeaders: map[string]string{"a": "b", "c": "d"},
ArtifactsLink: "my_link",
}
actual, err := task.String()
expected := `Dbt Task Output:
{
"artifacts_headers": {
"a": "b",
"c": "d"
},
"artifacts_link": "my_link"
}`
require.NoError(t, err)
assert.Equal(t, expected, actual)
}
func TestSqlOutputToString(t *testing.T) {
task := SqlOutput{
QueryOutput: &jobs.SqlQueryOutput{
OutputLink: "a",
QueryText: "b",
WarehouseId: "d",
},
}
actual, err := task.String()
expected := `SQL Task Output:
{
"query_output": {
"output_link": "a",
"query_text": "b",
"warehouse_id": "d"
}
}`
require.NoError(t, err)
fmt.Println("[DEBUG] actual: ", actual)
assert.Equal(t, expected, actual)
}

View File

@ -1,11 +1,15 @@
package bundle package bundle
import ( import (
"encoding/json"
"fmt"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/deploy/terraform" "github.com/databricks/bricks/bundle/deploy/terraform"
"github.com/databricks/bricks/bundle/phases" "github.com/databricks/bricks/bundle/phases"
"github.com/databricks/bricks/bundle/run" "github.com/databricks/bricks/bundle/run"
"github.com/databricks/bricks/cmd/root" "github.com/databricks/bricks/cmd/root"
"github.com/databricks/bricks/libs/flags"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -32,11 +36,28 @@ var runCmd = &cobra.Command{
return err return err
} }
err = runner.Run(cmd.Context(), &runOptions) output, err := runner.Run(cmd.Context(), &runOptions)
if err != nil { if err != nil {
return err return err
} }
if output != nil {
switch outputType {
case flags.OutputText:
resultString, err := output.String()
if err != nil {
return err
}
cmd.OutOrStdout().Write([]byte(resultString))
case flags.OutputJSON:
b, err := json.MarshalIndent(output, "", " ")
if err != nil {
return err
}
cmd.OutOrStdout().Write(b)
default:
return fmt.Errorf("unknown output type %s", outputType)
}
}
return nil return nil
}, },
@ -62,7 +83,10 @@ var runCmd = &cobra.Command{
}, },
} }
var outputType flags.Output = flags.OutputText
func init() { func init() {
runOptions.Define(runCmd.Flags()) runOptions.Define(runCmd.Flags())
rootCmd.AddCommand(runCmd) rootCmd.AddCommand(runCmd)
runCmd.Flags().Var(&outputType, "output", "type of output format")
} }