databricks-cli/bundle/run/pipeline.go

222 lines
6.3 KiB
Go
Raw Permalink Normal View History

2022-12-15 14:12:47 +00:00
package run
import (
"context"
"fmt"
"strings"
2022-12-15 14:12:47 +00:00
"time"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/run/output"
"github.com/databricks/cli/bundle/run/progress"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/log"
2022-12-15 14:12:47 +00:00
"github.com/databricks/databricks-sdk-go/service/pipelines"
flag "github.com/spf13/pflag"
2022-12-15 14:12:47 +00:00
)
Log latest error event on pipeline run fail (#239) DAB config used to test this: bundle.yml ``` workspace: host: <deco-azure-prod> bundle: name: deco-538 resources: pipelines: foo: name: "[${bundle.name}] log pipeline errors" libraries: - notebook: path: ./myNb.py development: true ``` myNb.py ``` # Databricks notebook source print(1/0) ``` Before: ``` 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:28:46 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update has failed! Error: update failed ``` Now: ``` 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:29:33 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update has failed! 2023/03/09 01:29:40 [ERROR] [pipelines.foo] Update 27bc77 is FAILED. trace for most recent exception: Failed to execute python command for notebook '/Users/shreyas.goenka@databricks.com/.bundle/deco-538/default/files/myNb' with id RunnableCommandId(9070319781942164851) and error AnsiResult(--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) <command--1> in <cell line: 1>() ----> 1 print(1/0) ZeroDivisionError: division by zero,Map(),Map(),List(),List(),Map()) Error: update failed ```
2023-03-16 11:23:46 +00:00
func filterEventsByUpdateId(events []pipelines.PipelineEvent, updateId string) []pipelines.PipelineEvent {
result := []pipelines.PipelineEvent{}
for i := 0; i < len(events); i++ {
if events[i].Origin.UpdateId == updateId {
result = append(result, events[i])
}
}
return result
}
func (r *pipelineRunner) logEvent(ctx context.Context, event pipelines.PipelineEvent) {
logString := ""
Log latest error event on pipeline run fail (#239) DAB config used to test this: bundle.yml ``` workspace: host: <deco-azure-prod> bundle: name: deco-538 resources: pipelines: foo: name: "[${bundle.name}] log pipeline errors" libraries: - notebook: path: ./myNb.py development: true ``` myNb.py ``` # Databricks notebook source print(1/0) ``` Before: ``` 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:28:46 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update has failed! Error: update failed ``` Now: ``` 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:29:33 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update has failed! 2023/03/09 01:29:40 [ERROR] [pipelines.foo] Update 27bc77 is FAILED. trace for most recent exception: Failed to execute python command for notebook '/Users/shreyas.goenka@databricks.com/.bundle/deco-538/default/files/myNb' with id RunnableCommandId(9070319781942164851) and error AnsiResult(--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) <command--1> in <cell line: 1>() ----> 1 print(1/0) ZeroDivisionError: division by zero,Map(),Map(),List(),List(),Map()) Error: update failed ```
2023-03-16 11:23:46 +00:00
if event.Message != "" {
logString += fmt.Sprintf(" %s\n", event.Message)
}
if event.Error != nil && len(event.Error.Exceptions) > 0 {
logString += "trace for most recent exception: \n"
for i := 0; i < len(event.Error.Exceptions); i++ {
logString += fmt.Sprintf("%s\n", event.Error.Exceptions[i].Message)
}
}
if logString != "" {
log.Errorf(ctx, fmt.Sprintf("[%s] %s", event.EventType, logString))
Log latest error event on pipeline run fail (#239) DAB config used to test this: bundle.yml ``` workspace: host: <deco-azure-prod> bundle: name: deco-538 resources: pipelines: foo: name: "[${bundle.name}] log pipeline errors" libraries: - notebook: path: ./myNb.py development: true ``` myNb.py ``` # Databricks notebook source print(1/0) ``` Before: ``` 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:28:46 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update has failed! Error: update failed ``` Now: ``` 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:29:33 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update has failed! 2023/03/09 01:29:40 [ERROR] [pipelines.foo] Update 27bc77 is FAILED. trace for most recent exception: Failed to execute python command for notebook '/Users/shreyas.goenka@databricks.com/.bundle/deco-538/default/files/myNb' with id RunnableCommandId(9070319781942164851) and error AnsiResult(--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) <command--1> in <cell line: 1>() ----> 1 print(1/0) ZeroDivisionError: division by zero,Map(),Map(),List(),List(),Map()) Error: update failed ```
2023-03-16 11:23:46 +00:00
}
}
func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, updateId string) error {
w := r.bundle.WorkspaceClient()
// Note: For a 100 percent correct and complete solution we should use the
// w.Pipelines.ListPipelineEventsAll method to find all relevant events. However the
// probablity of the relevant last error event not being present in the most
// recent 100 error events is very close to 0 and the first 100 error events
// should give us a good picture of the error.
//
// Otherwise for long lived pipelines, there can be a lot of unnecessary
// latency due to multiple pagination API calls needed underneath the hood for
// ListPipelineEventsAll
res, err := w.Pipelines.Impl().ListPipelineEvents(ctx, pipelines.ListPipelineEventsRequest{
Log latest error event on pipeline run fail (#239) DAB config used to test this: bundle.yml ``` workspace: host: <deco-azure-prod> bundle: name: deco-538 resources: pipelines: foo: name: "[${bundle.name}] log pipeline errors" libraries: - notebook: path: ./myNb.py development: true ``` myNb.py ``` # Databricks notebook source print(1/0) ``` Before: ``` 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:28:46 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update has failed! Error: update failed ``` Now: ``` 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:29:33 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update has failed! 2023/03/09 01:29:40 [ERROR] [pipelines.foo] Update 27bc77 is FAILED. trace for most recent exception: Failed to execute python command for notebook '/Users/shreyas.goenka@databricks.com/.bundle/deco-538/default/files/myNb' with id RunnableCommandId(9070319781942164851) and error AnsiResult(--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) <command--1> in <cell line: 1>() ----> 1 print(1/0) ZeroDivisionError: division by zero,Map(),Map(),List(),List(),Map()) Error: update failed ```
2023-03-16 11:23:46 +00:00
Filter: `level='ERROR'`,
MaxResults: 100,
PipelineId: pipelineId,
})
if err != nil {
return err
}
updateEvents := filterEventsByUpdateId(res.Events, updateId)
// The events API returns most recent events first. We iterate in a reverse order
// to print the events chronologically
for i := len(updateEvents) - 1; i >= 0; i-- {
r.logEvent(ctx, updateEvents[i])
Log latest error event on pipeline run fail (#239) DAB config used to test this: bundle.yml ``` workspace: host: <deco-azure-prod> bundle: name: deco-538 resources: pipelines: foo: name: "[${bundle.name}] log pipeline errors" libraries: - notebook: path: ./myNb.py development: true ``` myNb.py ``` # Databricks notebook source print(1/0) ``` Before: ``` 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:28:46 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update has failed! Error: update failed ``` Now: ``` 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:29:33 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update has failed! 2023/03/09 01:29:40 [ERROR] [pipelines.foo] Update 27bc77 is FAILED. trace for most recent exception: Failed to execute python command for notebook '/Users/shreyas.goenka@databricks.com/.bundle/deco-538/default/files/myNb' with id RunnableCommandId(9070319781942164851) and error AnsiResult(--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) <command--1> in <cell line: 1>() ----> 1 print(1/0) ZeroDivisionError: division by zero,Map(),Map(),List(),List(),Map()) Error: update failed ```
2023-03-16 11:23:46 +00:00
}
return nil
}
// PipelineOptions defines options for running a pipeline update.
type PipelineOptions struct {
// Perform a full graph update.
RefreshAll bool
// List of tables to update.
Refresh []string
// Perform a full graph reset and recompute.
FullRefreshAll bool
// List of tables to reset and recompute.
FullRefresh []string
}
func (o *PipelineOptions) Define(fs *flag.FlagSet) {
fs.BoolVar(&o.RefreshAll, "refresh-all", false, "Perform a full graph update.")
fs.StringSliceVar(&o.Refresh, "refresh", nil, "List of tables to update.")
fs.BoolVar(&o.FullRefreshAll, "full-refresh-all", false, "Perform a full graph reset and recompute.")
fs.StringSliceVar(&o.FullRefresh, "full-refresh", nil, "List of tables to reset and recompute.")
}
// Validate returns if the combination of options is valid.
func (o *PipelineOptions) Validate() error {
set := []string{}
if o.RefreshAll {
set = append(set, "--refresh-all")
}
if len(o.Refresh) > 0 {
set = append(set, "--refresh")
}
if o.FullRefreshAll {
set = append(set, "--full-refresh-all")
}
if len(o.FullRefresh) > 0 {
set = append(set, "--full-refresh")
}
if len(set) > 1 {
return fmt.Errorf("pipeline run arguments are mutually exclusive (got %s)", strings.Join(set, ", "))
}
return nil
}
func (o *PipelineOptions) toPayload(pipelineID string) (*pipelines.StartUpdate, error) {
if err := o.Validate(); err != nil {
return nil, err
}
payload := &pipelines.StartUpdate{
PipelineId: pipelineID,
// Note: `RefreshAll` is implied if the fields below are not set.
RefreshSelection: o.Refresh,
FullRefresh: o.FullRefreshAll,
FullRefreshSelection: o.FullRefresh,
}
return payload, nil
}
2022-12-15 14:12:47 +00:00
type pipelineRunner struct {
key
bundle *bundle.Bundle
pipeline *resources.Pipeline
}
func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, error) {
2022-12-15 14:12:47 +00:00
var pipelineID = r.pipeline.ID
// Include resource key in logger.
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("resource", r.Key()))
2022-12-15 14:12:47 +00:00
w := r.bundle.WorkspaceClient()
2022-12-22 08:46:17 +00:00
_, err := w.Pipelines.GetByPipelineId(ctx, pipelineID)
2022-12-15 14:12:47 +00:00
if err != nil {
log.Warnf(ctx, "Cannot get pipeline: %s", err)
return nil, err
2022-12-15 14:12:47 +00:00
}
req, err := opts.Pipeline.toPayload(pipelineID)
if err != nil {
return nil, err
}
res, err := w.Pipelines.StartUpdate(ctx, *req)
2022-12-15 14:12:47 +00:00
if err != nil {
return nil, err
2022-12-15 14:12:47 +00:00
}
updateID := res.UpdateId
// setup progress logger and tracker to query events
updateTracker := progress.NewUpdateTracker(pipelineID, updateID, w)
progressLogger, ok := cmdio.FromContext(ctx)
if !ok {
return nil, fmt.Errorf("no progress logger found")
}
2022-12-15 14:12:47 +00:00
// Log the pipeline update URL as soon as it is available.
progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID))
2022-12-15 14:12:47 +00:00
Add development runs (#522) This implements the "development run" functionality that we desire for DABs in the workspace / IDE. ## bundle.yml changes In bundle.yml, there should be a "dev" environment that is marked as `mode: debug`: ``` environments: dev: default: true mode: development # future accepted values might include pull_request, production ``` Setting `mode` to `development` indicates that this environment is used just for running things for development. This results in several changes to deployed assets: * All assets will get '[dev]' in their name and will get a 'dev' tag * All assets will be hidden from the list of assets (future work; e.g. for jobs we would have a special job_type that hides it from the list) * All deployed assets will be ephemeral (future work, we need some form of garbage collection) * Pipelines will be marked as 'development: true' * Jobs can run on development compute through the `--compute` parameter in the CLI * Jobs get their schedule / triggers paused * Jobs get concurrent runs (it's really annoying if your runs get skipped because the last run was still in progress) Other accepted values for `mode` are `default` (which does nothing) and `pull-request` (which is reserved for future use). ## CLI changes To run a single job called "shark_sighting" on existing compute, use the following commands: ``` $ databricks bundle deploy --compute 0617-201942-9yd9g8ix $ databricks bundle run shark_sighting ``` which would deploy and run a job called "[dev] shark_sightings" on the compute provided. Note that `--compute` is not accepted in production environments, so we show an error if `mode: development` is not used. The `run --deploy` command offers a convenient shorthand for the common combination of deploying & running: ``` $ export DATABRICKS_COMPUTE=0617-201942-9yd9g8ix $ bundle run --deploy shark_sightings ``` The `--deploy` addition isn't really essential and I welcome feedback 🤔 I played with the idea of a "debug" or "dev" command but that seemed to only make the option space even broader for users. The above could work well with an IDE or workspace that automatically sets the target compute. One more thing I added is`run --no-wait` can now be used to run something without waiting for it to be completed (useful for IDE-like environments that can display progress themselves). ``` $ bundle run --deploy shark_sightings --no-wait ```
2023-07-12 06:51:54 +00:00
if opts.NoWait {
return nil, nil
}
2022-12-15 14:12:47 +00:00
// Poll update for completion and post status.
// Note: there is no "StartUpdateAndWait" wrapper for this API.
var prevState *pipelines.UpdateInfoState
for {
events, err := updateTracker.Events(ctx)
if err != nil {
return nil, err
}
for _, event := range events {
progressLogger.Log(&event)
log.Infof(ctx, event.String())
}
2022-12-15 14:12:47 +00:00
update, err := w.Pipelines.GetUpdateByPipelineIdAndUpdateId(ctx, pipelineID, updateID)
if err != nil {
return nil, err
2022-12-15 14:12:47 +00:00
}
// Log only if the current state is different from the previous state.
state := update.Update.State
if prevState == nil || *prevState != state {
log.Infof(ctx, "Update status: %s", state)
2022-12-15 14:12:47 +00:00
prevState = &state
}
if state == pipelines.UpdateInfoStateCanceled {
log.Infof(ctx, "Update was cancelled!")
return nil, fmt.Errorf("update cancelled")
2022-12-15 14:12:47 +00:00
}
if state == pipelines.UpdateInfoStateFailed {
log.Infof(ctx, "Update has failed!")
Log latest error event on pipeline run fail (#239) DAB config used to test this: bundle.yml ``` workspace: host: <deco-azure-prod> bundle: name: deco-538 resources: pipelines: foo: name: "[${bundle.name}] log pipeline errors" libraries: - notebook: path: ./myNb.py development: true ``` myNb.py ``` # Databricks notebook source print(1/0) ``` Before: ``` 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:28:46 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update has failed! Error: update failed ``` Now: ``` 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:29:33 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update has failed! 2023/03/09 01:29:40 [ERROR] [pipelines.foo] Update 27bc77 is FAILED. trace for most recent exception: Failed to execute python command for notebook '/Users/shreyas.goenka@databricks.com/.bundle/deco-538/default/files/myNb' with id RunnableCommandId(9070319781942164851) and error AnsiResult(--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) <command--1> in <cell line: 1>() ----> 1 print(1/0) ZeroDivisionError: division by zero,Map(),Map(),List(),List(),Map()) Error: update failed ```
2023-03-16 11:23:46 +00:00
err := r.logErrorEvent(ctx, pipelineID, updateID)
if err != nil {
return nil, err
Log latest error event on pipeline run fail (#239) DAB config used to test this: bundle.yml ``` workspace: host: <deco-azure-prod> bundle: name: deco-538 resources: pipelines: foo: name: "[${bundle.name}] log pipeline errors" libraries: - notebook: path: ./myNb.py development: true ``` myNb.py ``` # Databricks notebook source print(1/0) ``` Before: ``` 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:28:44 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:28:46 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:28:52 [INFO] [pipelines.foo] Update has failed! Error: update failed ``` Now: ``` 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update available at *** 2023/03/09 01:29:31 [INFO] [pipelines.foo] Update status: CREATED 2023/03/09 01:29:33 [INFO] [pipelines.foo] Update status: INITIALIZING 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update status: FAILED 2023/03/09 01:29:40 [INFO] [pipelines.foo] Update has failed! 2023/03/09 01:29:40 [ERROR] [pipelines.foo] Update 27bc77 is FAILED. trace for most recent exception: Failed to execute python command for notebook '/Users/shreyas.goenka@databricks.com/.bundle/deco-538/default/files/myNb' with id RunnableCommandId(9070319781942164851) and error AnsiResult(--------------------------------------------------------------------------- ZeroDivisionError Traceback (most recent call last) <command--1> in <cell line: 1>() ----> 1 print(1/0) ZeroDivisionError: division by zero,Map(),Map(),List(),List(),Map()) Error: update failed ```
2023-03-16 11:23:46 +00:00
}
return nil, fmt.Errorf("update failed")
2022-12-15 14:12:47 +00:00
}
if state == pipelines.UpdateInfoStateCompleted {
log.Infof(ctx, "Update has completed successfully!")
return nil, nil
2022-12-15 14:12:47 +00:00
}
time.Sleep(time.Second)
}
}