From 047a189c1ee63ab213d79307f718f280e8cb5882 Mon Sep 17 00:00:00 2001 From: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com> Date: Tue, 21 Mar 2023 16:25:18 +0100 Subject: [PATCH] Add job run output logging (#260) This PR adds output logging for job runs Tested using unit tests and manually --- bundle/run/job.go | 20 +++--- bundle/run/job_output.go | 72 +++++++++++++++++++ bundle/run/job_output_test.go | 126 +++++++++++++++++++++++++++++++++ bundle/run/pipeline.go | 18 ++--- bundle/run/runner.go | 6 +- bundle/run/task_output.go | 88 +++++++++++++++++++++++ bundle/run/task_output_test.go | 90 +++++++++++++++++++++++ cmd/bundle/run.go | 28 +++++++- 8 files changed, 426 insertions(+), 22 deletions(-) create mode 100644 bundle/run/job_output.go create mode 100644 bundle/run/job_output_test.go create mode 100644 bundle/run/task_output.go create mode 100644 bundle/run/task_output_test.go diff --git a/bundle/run/job.go b/bundle/run/job.go index 976668e4..428734dc 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -140,10 +140,10 @@ func (r *jobRunner) logFailedTasks(ctx context.Context, runId int64) { } -func (r *jobRunner) Run(ctx context.Context, opts *Options) error { +func (r *jobRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) { jobID, err := strconv.ParseInt(r.job.ID, 10, 64) if err != nil { - return fmt.Errorf("job ID is not an integer: %s", r.job.ID) + return nil, fmt.Errorf("job ID is not an integer: %s", r.job.ID) } var prevState *jobs.RunState @@ -176,7 +176,7 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) error { req, err := opts.Job.toPayload(jobID) if err != nil { - return err + return nil, err } // Include resource key in logger. @@ -188,34 +188,34 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) error { } if err != nil { - return err + return nil, err } if run.State.LifeCycleState == jobs.RunLifeCycleStateSkipped { log.Infof(ctx, "Run was skipped!") - return fmt.Errorf("run skipped: %s", run.State.StateMessage) + return nil, fmt.Errorf("run skipped: %s", run.State.StateMessage) } switch run.State.ResultState { // The run was canceled at user request. case jobs.RunResultStateCanceled: log.Infof(ctx, "Run was cancelled!") - return fmt.Errorf("run canceled: %s", run.State.StateMessage) + return nil, fmt.Errorf("run canceled: %s", run.State.StateMessage) // The task completed with an error. case jobs.RunResultStateFailed: log.Infof(ctx, "Run has failed!") - return fmt.Errorf("run failed: %s", run.State.StateMessage) + return nil, fmt.Errorf("run failed: %s", run.State.StateMessage) // The task completed successfully. case jobs.RunResultStateSuccess: log.Infof(ctx, "Run has completed successfully!") - return nil + return getJobOutput(ctx, r.bundle.WorkspaceClient(), *runId) // The run was stopped after reaching the timeout. case jobs.RunResultStateTimedout: log.Infof(ctx, "Run has timed out!") - return fmt.Errorf("run timed out: %s", run.State.StateMessage) + return nil, fmt.Errorf("run timed out: %s", run.State.StateMessage) } - return err + return nil, err } diff --git a/bundle/run/job_output.go b/bundle/run/job_output.go new file mode 100644 index 00000000..e359858b --- /dev/null +++ b/bundle/run/job_output.go @@ -0,0 +1,72 @@ +package run + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/jobs" + "golang.org/x/exp/maps" +) + +type JobOutput struct { + // URL of the job run + RunPageUrl string + + // output for tasks with a non empty output + TaskOutputs map[string]RunOutput +} + +// TODO: Print the output respecting the execution order (https://github.com/databricks/bricks/issues/259) +func (out *JobOutput) String() (string, error) { + if len(out.TaskOutputs) == 0 { + return "", nil + } + // When only one task, just return that output without any formatting + if len(out.TaskOutputs) == 1 { + for _, v := range out.TaskOutputs { + return v.String() + } + } + result := strings.Builder{} + result.WriteString(fmt.Sprintf("Run URL: %s\n", out.RunPageUrl)) + + taskKeys := maps.Keys(out.TaskOutputs) + sort.Strings(taskKeys) + for _, k := range taskKeys { + taskString, err := out.TaskOutputs[k].String() + if err != nil { + return "", nil + } + result.WriteString("=======\n") + result.WriteString(fmt.Sprintf("Task %s:\n", k)) + result.WriteString(fmt.Sprintf("%s\n", taskString)) + } + return result.String(), nil +} + +func getJobOutput(ctx context.Context, w *databricks.WorkspaceClient, runId int64) (*JobOutput, error) { + jobRun, err := w.Jobs.GetRun(ctx, jobs.GetRun{ + RunId: runId, + }) + if err != nil { + return nil, err + } + result := &JobOutput{ + TaskOutputs: make(map[string]RunOutput), + } + result.RunPageUrl = jobRun.RunPageUrl + + for _, task := range jobRun.Tasks { + jobRunOutput, err := w.Jobs.GetRunOutput(ctx, jobs.GetRunOutput{ + RunId: task.RunId, + }) + if err != nil { + return nil, err + } + result.TaskOutputs[task.TaskKey] = toRunOutput(jobRunOutput) + } + return result, nil +} diff --git a/bundle/run/job_output_test.go b/bundle/run/job_output_test.go new file mode 100644 index 00000000..14d233a2 --- /dev/null +++ b/bundle/run/job_output_test.go @@ -0,0 +1,126 @@ +package run + +import ( + "testing" + + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSingleTaskJobOutputToString(t *testing.T) { + taskNotebook := NotebookOutput{ + Result: "foo", + Truncated: true, + } + myJob := JobOutput{ + RunPageUrl: "my_job_url", + TaskOutputs: map[string]RunOutput{ + "my_notebook_task": &taskNotebook, + }, + } + + actual, err := myJob.String() + require.NoError(t, err) + expected := "foo\n[truncated...]\n" + assert.Equal(t, expected, actual) +} + +func TestMultiTaskJobOutputToString(t *testing.T) { + taskFoo := NotebookOutput{ + Result: "foo", + Truncated: true, + } + taskBar := LogsOutput{ + Logs: "bar", + LogsTruncated: false, + } + myJob := JobOutput{ + RunPageUrl: "my_job_url", + TaskOutputs: map[string]RunOutput{ + "my_foo_task": &taskFoo, + "my_bar_task": &taskBar, + }, + } + + actual, err := myJob.String() + require.NoError(t, err) + + expected := `Run URL: my_job_url +======= +Task my_bar_task: +bar +======= +Task my_foo_task: +foo +[truncated...] + +` + assert.Equal(t, expected, actual) +} + +func TestNotebookOutputToRunOutput(t *testing.T) { + jobOutput := &jobs.RunOutput{ + NotebookOutput: &jobs.NotebookOutput{ + Result: "foo", + Truncated: true, + }, + Logs: "hello :)", + LogsTruncated: true, + } + actual := toRunOutput(jobOutput) + + expected := &NotebookOutput{ + Result: "foo", + Truncated: true, + } + assert.Equal(t, expected, actual) +} + +func TestDbtOutputToRunOutput(t *testing.T) { + jobOutput := &jobs.RunOutput{ + DbtOutput: &jobs.DbtOutput{ + ArtifactsLink: "foo", + }, + Logs: "hello :)", + } + actual := toRunOutput(jobOutput) + + expected := &DbtOutput{ + ArtifactsLink: "foo", + } + assert.Equal(t, expected, actual) +} + +func TestSqlOutputToRunOutput(t *testing.T) { + jobOutput := &jobs.RunOutput{ + SqlOutput: &jobs.SqlOutput{ + QueryOutput: &jobs.SqlQueryOutput{ + OutputLink: "foo", + }, + }, + Logs: "hello :)", + } + actual := toRunOutput(jobOutput) + + expected := &SqlOutput{ + QueryOutput: &jobs.SqlQueryOutput{ + OutputLink: "foo", + }, + } + assert.Equal(t, expected, actual) +} + +func TestLogOutputToRunOutput(t *testing.T) { + jobOutput := &jobs.RunOutput{ + Logs: "hello :)", + LogsTruncated: true, + } + actual := toRunOutput(jobOutput) + + expected := &LogsOutput{ + Logs: "hello :)", + LogsTruncated: true, + } + assert.Equal(t, expected, actual) +} diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index 10464ff5..20d80a4d 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -133,7 +133,7 @@ type pipelineRunner struct { pipeline *resources.Pipeline } -func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { +func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (RunOutput, error) { var pipelineID = r.pipeline.ID // Include resource key in logger. @@ -142,17 +142,17 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { _, err := w.Pipelines.GetByPipelineId(ctx, pipelineID) if err != nil { log.Warnf(ctx, "Cannot get pipeline: %s", err) - return err + return nil, err } req, err := opts.Pipeline.toPayload(pipelineID) if err != nil { - return err + return nil, err } res, err := w.Pipelines.StartUpdate(ctx, *req) if err != nil { - return err + return nil, err } updateID := res.UpdateId @@ -167,7 +167,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { for { update, err := w.Pipelines.GetUpdateByPipelineIdAndUpdateId(ctx, pipelineID, updateID) if err != nil { - return err + return nil, err } // Log only if the current state is different from the previous state. @@ -179,19 +179,19 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { if state == pipelines.UpdateInfoStateCanceled { log.Infof(ctx, "Update was cancelled!") - return fmt.Errorf("update cancelled") + return nil, fmt.Errorf("update cancelled") } if state == pipelines.UpdateInfoStateFailed { log.Infof(ctx, "Update has failed!") err := r.logErrorEvent(ctx, pipelineID, updateID) if err != nil { - return err + return nil, err } - return fmt.Errorf("update failed") + return nil, fmt.Errorf("update failed") } if state == pipelines.UpdateInfoStateCompleted { log.Infof(ctx, "Update has completed successfully!") - return nil + return nil, nil } time.Sleep(time.Second) diff --git a/bundle/run/runner.go b/bundle/run/runner.go index 2c6bd8c8..6fb31021 100644 --- a/bundle/run/runner.go +++ b/bundle/run/runner.go @@ -14,6 +14,10 @@ func (k key) Key() string { return string(k) } +type RunOutput interface { + String() (string, error) +} + // Runner defines the interface for a runnable resource (or workload). type Runner interface { // Key returns the fully qualified (unique) identifier for this runnable resource. @@ -21,7 +25,7 @@ type Runner interface { Key() string // Run the underlying worklow. - Run(ctx context.Context, opts *Options) error + Run(ctx context.Context, opts *Options) (RunOutput, error) } // Find locates a runner matching the specified argument. diff --git a/bundle/run/task_output.go b/bundle/run/task_output.go new file mode 100644 index 00000000..1e5247bf --- /dev/null +++ b/bundle/run/task_output.go @@ -0,0 +1,88 @@ +package run + +import ( + "encoding/json" + "fmt" + + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +type NotebookOutput jobs.NotebookOutput +type DbtOutput jobs.DbtOutput +type SqlOutput jobs.SqlOutput +type LogsOutput struct { + Logs string + LogsTruncated bool +} + +func structToString(val interface{}) (string, error) { + b, err := json.MarshalIndent(val, "", " ") + if err != nil { + return "", err + } + return string(b), nil +} + +func (out *NotebookOutput) String() (string, error) { + if out.Truncated { + return fmt.Sprintf("%s\n[truncated...]\n", out.Result), nil + } + return out.Result, nil +} + +func (out *DbtOutput) String() (string, error) { + outputString, err := structToString(out) + if err != nil { + return "", err + } + + // We add this prefix to make this output non machine readable. + // JSON is used because it's a convenient representation. + // If user needs machine parsable output, they can use the --output json + // flag + return fmt.Sprintf("Dbt Task Output:\n%s", outputString), nil +} + +func (out *SqlOutput) String() (string, error) { + outputString, err := structToString(out) + if err != nil { + return "", err + } + + // We add this prefix to make this output non machine readable. + // JSON is used because it's a convenient representation. + // If user needs machine parsable output, they can use the --output json + // flag + return fmt.Sprintf("SQL Task Output:\n%s", outputString), nil +} + +func (out *LogsOutput) String() (string, error) { + if out.LogsTruncated { + return fmt.Sprintf("%s\n[truncated...]\n", out.Logs), nil + } + return out.Logs, nil +} + +func toRunOutput(output *jobs.RunOutput) RunOutput { + switch { + case output.NotebookOutput != nil: + result := NotebookOutput(*output.NotebookOutput) + return &result + case output.DbtOutput != nil: + result := DbtOutput(*output.DbtOutput) + return &result + + case output.SqlOutput != nil: + result := SqlOutput(*output.SqlOutput) + return &result + // Corresponds to JAR, python script and python wheel tasks + case output.Logs != "": + result := LogsOutput{ + Logs: output.Logs, + LogsTruncated: output.LogsTruncated, + } + return &result + default: + return nil + } +} diff --git a/bundle/run/task_output_test.go b/bundle/run/task_output_test.go new file mode 100644 index 00000000..de3a520f --- /dev/null +++ b/bundle/run/task_output_test.go @@ -0,0 +1,90 @@ +package run + +import ( + "fmt" + "testing" + + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNotebookOutputToString(t *testing.T) { + taskFoo := NotebookOutput{ + Result: "foo", + Truncated: true, + } + taskBar := NotebookOutput{ + Result: "bar", + Truncated: false, + } + + actualFoo, err := taskFoo.String() + require.NoError(t, err) + assert.Equal(t, "foo\n[truncated...]\n", actualFoo) + + actualBar, err := taskBar.String() + require.NoError(t, err) + assert.Equal(t, "bar", actualBar) +} + +func TestLogsOutputToString(t *testing.T) { + taskFoo := LogsOutput{ + Logs: "foo", + LogsTruncated: true, + } + taskBar := LogsOutput{ + Logs: "bar", + LogsTruncated: false, + } + + actualFoo, err := taskFoo.String() + require.NoError(t, err) + assert.Equal(t, "foo\n[truncated...]\n", actualFoo) + + actualBar, err := taskBar.String() + require.NoError(t, err) + assert.Equal(t, "bar", actualBar) +} + +func TestDbtOutputToString(t *testing.T) { + task := DbtOutput{ + ArtifactsHeaders: map[string]string{"a": "b", "c": "d"}, + ArtifactsLink: "my_link", + } + + actual, err := task.String() + expected := `Dbt Task Output: +{ + "artifacts_headers": { + "a": "b", + "c": "d" + }, + "artifacts_link": "my_link" +}` + require.NoError(t, err) + assert.Equal(t, expected, actual) +} + +func TestSqlOutputToString(t *testing.T) { + task := SqlOutput{ + QueryOutput: &jobs.SqlQueryOutput{ + OutputLink: "a", + QueryText: "b", + WarehouseId: "d", + }, + } + + actual, err := task.String() + expected := `SQL Task Output: +{ + "query_output": { + "output_link": "a", + "query_text": "b", + "warehouse_id": "d" + } +}` + require.NoError(t, err) + fmt.Println("[DEBUG] actual: ", actual) + assert.Equal(t, expected, actual) +} diff --git a/cmd/bundle/run.go b/cmd/bundle/run.go index c45353cf..51d81e11 100644 --- a/cmd/bundle/run.go +++ b/cmd/bundle/run.go @@ -1,11 +1,15 @@ package bundle import ( + "encoding/json" + "fmt" + "github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle/deploy/terraform" "github.com/databricks/bricks/bundle/phases" "github.com/databricks/bricks/bundle/run" "github.com/databricks/bricks/cmd/root" + "github.com/databricks/bricks/libs/flags" "github.com/spf13/cobra" ) @@ -32,11 +36,28 @@ var runCmd = &cobra.Command{ return err } - err = runner.Run(cmd.Context(), &runOptions) + output, err := runner.Run(cmd.Context(), &runOptions) if err != nil { return err } - + if output != nil { + switch outputType { + case flags.OutputText: + resultString, err := output.String() + if err != nil { + return err + } + cmd.OutOrStdout().Write([]byte(resultString)) + case flags.OutputJSON: + b, err := json.MarshalIndent(output, "", " ") + if err != nil { + return err + } + cmd.OutOrStdout().Write(b) + default: + return fmt.Errorf("unknown output type %s", outputType) + } + } return nil }, @@ -62,7 +83,10 @@ var runCmd = &cobra.Command{ }, } +var outputType flags.Output = flags.OutputText + func init() { runOptions.Define(runCmd.Flags()) rootCmd.AddCommand(runCmd) + runCmd.Flags().Var(&outputType, "output", "type of output format") }