From d5b4a595afac571e970c95a4e0a35e3f562ddeac Mon Sep 17 00:00:00 2001 From: Anders Rex Date: Mon, 15 Jul 2024 17:32:56 +0300 Subject: [PATCH] Prototype bundle run notebooks results --- bundle/run/job.go | 129 ++++++++++++++++++++++++++++++++++++ bundle/run/options.go | 7 +- cmd/bundle/run.go | 3 + cmd/root/progress_logger.go | 11 ++- 4 files changed, 141 insertions(+), 9 deletions(-) diff --git a/bundle/run/job.go b/bundle/run/job.go index 8003c7d2..f0bf3a3f 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -2,9 +2,13 @@ package run import ( "context" + "encoding/base64" "encoding/json" "fmt" + "os" + "os/exec" "strconv" + "strings" "time" "github.com/databricks/cli/bundle" @@ -13,7 +17,9 @@ import ( "github.com/databricks/cli/bundle/run/progress" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/fatih/color" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" @@ -121,6 +127,7 @@ func logProgressCallback(ctx context.Context, progressLogger *cmdio.Logger) func } if prevState == nil { + openRunUrl(i.RunPageUrl) progressLogger.Log(progress.NewJobRunUrlEvent(i.RunPageUrl)) } @@ -147,6 +154,54 @@ func logProgressCallback(ctx context.Context, progressLogger *cmdio.Logger) func } } +func handleNotebookResultsCallback(ctx context.Context, progressLogger *cmdio.Logger, workspaceClient *databricks.WorkspaceClient) func(info *jobs.Run) { + loggedLineCount := 0 + + return func(i *jobs.Run) { + details, err := workspaceClient.Jobs.GetRun(ctx, jobs.GetRunRequest{ + RunId: i.RunId, + }) + + if err != nil { + return + } + + // Loop over details.Tasks and export each task + for _, task := range details.Tasks { + var exportReq workspace.ExportRequest + exportReq.Path = fmt.Sprintf("/Workspace/__databricks_jobs_tmp/job-%d-run-%d/notebook", details.JobId, task.RunId) + exportReq.Format = "JUPYTER" + response, err := workspaceClient.Workspace.Export(ctx, exportReq) + + if err != nil { + return + } + + notebookContent, err := base64.StdEncoding.DecodeString(response.Content) + + if err != nil { + // Ignore for now + } else { + writeNotebookContentToFile(string(notebookContent)) + renderedNotebook := renderNotebook() + filteredNotebook := filterNootebookLines(renderedNotebook) + filteredLines := strings.Split(filteredNotebook, "\n") + + // Only print lines that haven't been logged before + if loggedLineCount < len(filteredLines) && filteredNotebook != "" { + newLines := filteredLines[loggedLineCount:] + + // log progress events in using the default logger + progressLogger.Writer.Write([]byte(strings.Join(newLines, "\n"))) + progressLogger.Writer.Write([]byte("\n")) + + loggedLineCount = len(filteredLines) + } + } + } + } +} + func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, error) { jobID, err := strconv.ParseInt(r.job.ID, 10, 64) if err != nil { @@ -184,6 +239,7 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e return nil, fmt.Errorf("no progress logger found") } logProgress := logProgressCallback(ctx, progressLogger) + handleNotebookResults := handleNotebookResultsCallback(ctx, progressLogger, w) waiter, err := w.Jobs.RunNow(ctx, *req) if err != nil { @@ -202,6 +258,9 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e pullRunId(r) logDebug(r) logProgress(r) + if opts.LogResults { + handleNotebookResults(r) + } }).GetWithTimeout(jobRunTimeout) if err != nil && runId != nil { r.logFailedTasks(ctx, *runId) @@ -324,3 +383,73 @@ func (r *jobRunner) ParseArgs(args []string, opts *Options) error { func (r *jobRunner) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) { return r.posArgsHandler().CompleteArgs(args, toComplete) } + +func writeNotebookContentToFile(content string) { + // Write the content of the notebook to a file + // so that nbpreview can read it + file, err := os.Create("/tmp/dab_notebook_output.txt") + if err != nil { + fmt.Println(err) + } + defer file.Close() + + _, err = file.WriteString(content) + if err != nil { + fmt.Println(err) + } +} + +// Function to render the notebook using nbpreview +func renderNotebook() string { + cmd := exec.Command("nbpreview", "--decorated", "/tmp/dab_notebook_output.txt") + + output, err := cmd.Output() + if err != nil { + fmt.Println("Error:", err) + } + + outputStr := string(output) + return outputStr +} + +// Hack to open the run URL in a browser tab in the background using AppleScript. +// We have to do this because the notebook on the run URL page doesn't exist until +// the page is loaded. This should be done in the webapp. +func openRunUrl(url string) { + osascript := fmt.Sprintf("tell application \"Google Chrome\" to tell window 1 to make new tab with properties {URL:\"%s\"}", url) + cmd := exec.Command("osascript", "-e", osascript) + + _, err := cmd.Output() + if err != nil { + fmt.Println("Error:", err) + } +} + +// Function to filter out code cells from the nbpreview output +func filterNootebookLines(input string) string { + lines := strings.Split(input, "\n") + + // Prefixes to filter out + prefixes := []string{" ╭", " │", " ╰", "[0]:", " 🌐", " file:///var"} + + startsWithPrefix := func(line string) bool { + for _, prefix := range prefixes { + if strings.HasPrefix(line, prefix) { + return true + } + } + return false + } + + // Filter out lines that start with the prefixes or are empty + var filteredLines []string + for _, line := range lines { + if !startsWithPrefix(line) && strings.TrimSpace(line) != "" { + trimmedLine := strings.TrimSpace(line) + filteredLines = append(filteredLines, trimmedLine) + } + } + + // Join the filtered lines back into a single string + return strings.Join(filteredLines, "\n") +} diff --git a/bundle/run/options.go b/bundle/run/options.go index 4e50788a..ab4d29ce 100644 --- a/bundle/run/options.go +++ b/bundle/run/options.go @@ -6,9 +6,10 @@ import ( ) type Options struct { - Job JobOptions - Pipeline PipelineOptions - NoWait bool + Job JobOptions + Pipeline PipelineOptions + NoWait bool + LogResults bool } func (o *Options) Define(cmd *cobra.Command) { diff --git a/cmd/bundle/run.go b/cmd/bundle/run.go index 63458f85..2ee34b5c 100644 --- a/cmd/bundle/run.go +++ b/cmd/bundle/run.go @@ -45,8 +45,10 @@ task or a Python wheel task, the second example applies. var noWait bool var restart bool + var logResults bool cmd.Flags().BoolVar(&noWait, "no-wait", false, "Don't wait for the run to complete.") cmd.Flags().BoolVar(&restart, "restart", false, "Restart the run if it is already running.") + cmd.Flags().BoolVar(&logResults, "log-results", false, "Log notebook cell results.") cmd.RunE = func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() @@ -96,6 +98,7 @@ task or a Python wheel task, the second example applies. } runOptions.NoWait = noWait + runOptions.LogResults = logResults if restart { s := cmdio.Spinner(ctx) s <- "Cancelling all runs" diff --git a/cmd/root/progress_logger.go b/cmd/root/progress_logger.go index c05ecb04..b288d45f 100644 --- a/cmd/root/progress_logger.go +++ b/cmd/root/progress_logger.go @@ -3,13 +3,11 @@ package root import ( "context" "fmt" - "os" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/env" "github.com/databricks/cli/libs/flags" "github.com/spf13/cobra" - "golang.org/x/term" ) const envProgressFormat = "DATABRICKS_CLI_PROGRESS_FORMAT" @@ -21,10 +19,11 @@ type progressLoggerFlag struct { } func (f *progressLoggerFlag) resolveModeDefault(format flags.ProgressLogFormat) flags.ProgressLogFormat { - if (f.log.level.String() == "disabled" || f.log.file.String() != "stderr") && - term.IsTerminal(int(os.Stderr.Fd())) { - return flags.ModeInplace - } + // Disable ModeInplace for now to not mess with notebook formatting + // if (f.log.level.String() == "disabled" || f.log.file.String() != "stderr") && + // term.IsTerminal(int(os.Stderr.Fd())) { + // return flags.ModeInplace + // } return flags.ModeAppend }