databricks-cli/bundle/run/job.go

349 lines
9.1 KiB
Go
Raw Normal View History

2022-12-15 14:12:47 +00:00
package run
import (
"context"
"encoding/json"
2022-12-15 14:12:47 +00:00
"fmt"
"strconv"
"time"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/run/output"
"github.com/databricks/cli/bundle/run/progress"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/log"
2022-12-15 14:12:47 +00:00
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/fatih/color"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
2022-12-15 14:12:47 +00:00
)
// Default timeout for waiting for a job run to complete.
var jobRunTimeout time.Duration = 24 * time.Hour
2022-12-15 14:12:47 +00:00
type jobRunner struct {
key
bundle *bundle.Bundle
job *resources.Job
}
func (r *jobRunner) Name() string {
if r.job == nil || r.job.JobSettings == nil {
return ""
}
return r.job.JobSettings.Name
}
func isFailed(task jobs.RunTask) bool {
return task.State.LifeCycleState == jobs.RunLifeCycleStateInternalError ||
(task.State.LifeCycleState == jobs.RunLifeCycleStateTerminated &&
task.State.ResultState == jobs.RunResultStateFailed)
}
func isSuccess(task jobs.RunTask) bool {
return task.State.LifeCycleState == jobs.RunLifeCycleStateTerminated &&
task.State.ResultState == jobs.RunResultStateSuccess
}
func (r *jobRunner) logFailedTasks(ctx context.Context, runId int64) {
w := r.bundle.WorkspaceClient()
red := color.New(color.FgRed).SprintFunc()
green := color.New(color.FgGreen).SprintFunc()
yellow := color.New(color.FgYellow).SprintFunc()
run, err := w.Jobs.GetRun(ctx, jobs.GetRunRequest{
RunId: runId,
})
if err != nil {
log.Errorf(ctx, "failed to log job run. Error: %s", err)
return
}
if run.State.ResultState == jobs.RunResultStateSuccess {
return
}
for _, task := range run.Tasks {
if isSuccess(task) {
log.Infof(ctx, "task %s completed successfully", green(task.TaskKey))
} else if isFailed(task) {
taskInfo, err := w.Jobs.GetRunOutput(ctx, jobs.GetRunOutputRequest{
RunId: task.RunId,
})
if err != nil {
log.Errorf(ctx, "task %s failed. Unable to fetch error trace: %s", red(task.TaskKey), err)
continue
}
if progressLogger, ok := cmdio.FromContext(ctx); ok {
progressLogger.Log(progress.NewTaskErrorEvent(task.TaskKey, taskInfo.Error, taskInfo.ErrorTrace))
}
log.Errorf(ctx, "Task %s failed!\nError:\n%s\nTrace:\n%s",
red(task.TaskKey), taskInfo.Error, taskInfo.ErrorTrace)
} else {
log.Infof(ctx, "task %s is in state %s",
yellow(task.TaskKey), task.State.LifeCycleState)
}
}
}
func pullRunIdCallback(runId *int64) func(info *jobs.Run) {
return func(i *jobs.Run) {
2023-03-29 12:58:09 +00:00
if *runId == 0 {
*runId = i.RunId
}
2022-12-15 14:12:47 +00:00
}
2023-03-29 12:58:09 +00:00
}
2022-12-15 14:12:47 +00:00
func logDebugCallback(ctx context.Context, runId *int64) func(info *jobs.Run) {
2022-12-15 14:12:47 +00:00
var prevState *jobs.RunState
return func(i *jobs.Run) {
state := i.State
2022-12-15 14:12:47 +00:00
if state == nil {
return
}
2022-12-15 14:12:47 +00:00
// Log the job run URL as soon as it is available.
if prevState == nil {
log.Infof(ctx, "Run available at %s", i.RunPageUrl)
2022-12-15 14:12:47 +00:00
}
if prevState == nil || prevState.LifeCycleState != state.LifeCycleState {
log.Infof(ctx, "Run status: %s", i.State.LifeCycleState)
2022-12-15 14:12:47 +00:00
prevState = state
}
2023-03-29 12:58:09 +00:00
}
}
func logProgressCallback(ctx context.Context, progressLogger *cmdio.Logger) func(info *jobs.Run) {
2023-03-29 12:58:09 +00:00
var prevState *jobs.RunState
return func(i *jobs.Run) {
2023-03-29 12:58:09 +00:00
state := i.State
if state == nil {
return
}
if prevState == nil {
progressLogger.Log(progress.NewJobRunUrlEvent(i.RunPageUrl))
}
2023-03-29 12:58:09 +00:00
if prevState != nil && prevState.LifeCycleState == state.LifeCycleState &&
prevState.ResultState == state.ResultState {
return
} else {
prevState = state
}
event := &progress.JobProgressEvent{
Timestamp: time.Now(),
JobId: i.JobId,
RunId: i.RunId,
RunName: i.RunName,
State: *i.State,
}
2023-03-29 12:58:09 +00:00
// log progress events to stderr
progressLogger.Log(event)
// log progress events in using the default logger
log.Info(ctx, event.String())
2022-12-15 14:12:47 +00:00
}
2023-03-29 12:58:09 +00:00
}
2022-12-15 14:12:47 +00:00
func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, error) {
2023-03-29 12:58:09 +00:00
jobID, err := strconv.ParseInt(r.job.ID, 10, 64)
if err != nil {
return nil, fmt.Errorf("job ID is not an integer: %s", r.job.ID)
}
runId := new(int64)
err = r.convertPythonParams(opts)
if err != nil {
return nil, err
}
2023-03-29 12:58:09 +00:00
// construct request payload from cmd line flags args
req, err := opts.Job.toPayload(r.job, jobID)
if err != nil {
return nil, err
}
// Include resource key in logger.
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("resource", r.Key()))
2023-03-29 12:58:09 +00:00
2022-12-15 14:12:47 +00:00
w := r.bundle.WorkspaceClient()
2023-03-29 12:58:09 +00:00
// gets the run id from inside Jobs.RunNowAndWait
pullRunId := pullRunIdCallback(runId)
// callback to log status updates to the universal log destination.
// Called on every poll request
logDebug := logDebugCallback(ctx, runId)
// callback to log progress events. Called on every poll request
progressLogger, ok := cmdio.FromContext(ctx)
2023-03-29 12:58:09 +00:00
if !ok {
return nil, fmt.Errorf("no progress logger found")
}
logProgress := logProgressCallback(ctx, progressLogger)
waiter, err := w.Jobs.RunNow(ctx, *req)
if err != nil {
return nil, fmt.Errorf("cannot start job")
}
Add development runs (#522) This implements the "development run" functionality that we desire for DABs in the workspace / IDE. ## bundle.yml changes In bundle.yml, there should be a "dev" environment that is marked as `mode: debug`: ``` environments: dev: default: true mode: development # future accepted values might include pull_request, production ``` Setting `mode` to `development` indicates that this environment is used just for running things for development. This results in several changes to deployed assets: * All assets will get '[dev]' in their name and will get a 'dev' tag * All assets will be hidden from the list of assets (future work; e.g. for jobs we would have a special job_type that hides it from the list) * All deployed assets will be ephemeral (future work, we need some form of garbage collection) * Pipelines will be marked as 'development: true' * Jobs can run on development compute through the `--compute` parameter in the CLI * Jobs get their schedule / triggers paused * Jobs get concurrent runs (it's really annoying if your runs get skipped because the last run was still in progress) Other accepted values for `mode` are `default` (which does nothing) and `pull-request` (which is reserved for future use). ## CLI changes To run a single job called "shark_sighting" on existing compute, use the following commands: ``` $ databricks bundle deploy --compute 0617-201942-9yd9g8ix $ databricks bundle run shark_sighting ``` which would deploy and run a job called "[dev] shark_sightings" on the compute provided. Note that `--compute` is not accepted in production environments, so we show an error if `mode: development` is not used. The `run --deploy` command offers a convenient shorthand for the common combination of deploying & running: ``` $ export DATABRICKS_COMPUTE=0617-201942-9yd9g8ix $ bundle run --deploy shark_sightings ``` The `--deploy` addition isn't really essential and I welcome feedback 🤔 I played with the idea of a "debug" or "dev" command but that seemed to only make the option space even broader for users. The above could work well with an IDE or workspace that automatically sets the target compute. One more thing I added is`run --no-wait` can now be used to run something without waiting for it to be completed (useful for IDE-like environments that can display progress themselves). ``` $ bundle run --deploy shark_sightings --no-wait ```
2023-07-12 06:51:54 +00:00
if opts.NoWait {
details, err := w.Jobs.GetRun(ctx, jobs.GetRunRequest{
RunId: waiter.RunId,
})
progressLogger.Log(progress.NewJobRunUrlEvent(details.RunPageUrl))
return nil, err
}
run, err := waiter.OnProgress(func(r *jobs.Run) {
pullRunId(r)
logDebug(r)
logProgress(r)
}).GetWithTimeout(jobRunTimeout)
if err != nil {
r.logFailedTasks(ctx, *runId)
}
2022-12-15 14:12:47 +00:00
if err != nil {
return nil, err
2022-12-15 14:12:47 +00:00
}
if run.State.LifeCycleState == jobs.RunLifeCycleStateSkipped {
log.Infof(ctx, "Run was skipped!")
return nil, fmt.Errorf("run skipped: %s", run.State.StateMessage)
}
2022-12-15 14:12:47 +00:00
switch run.State.ResultState {
// The run was canceled at user request.
case jobs.RunResultStateCanceled:
log.Infof(ctx, "Run was cancelled!")
return nil, fmt.Errorf("run canceled: %s", run.State.StateMessage)
2022-12-15 14:12:47 +00:00
// The task completed with an error.
case jobs.RunResultStateFailed:
log.Infof(ctx, "Run has failed!")
return nil, fmt.Errorf("run failed: %s", run.State.StateMessage)
2022-12-15 14:12:47 +00:00
// The task completed successfully.
case jobs.RunResultStateSuccess:
log.Infof(ctx, "Run has completed successfully!")
return output.GetJobOutput(ctx, r.bundle.WorkspaceClient(), *runId)
2022-12-15 14:12:47 +00:00
// The run was stopped after reaching the timeout.
case jobs.RunResultStateTimedout:
log.Infof(ctx, "Run has timed out!")
return nil, fmt.Errorf("run timed out: %s", run.State.StateMessage)
2022-12-15 14:12:47 +00:00
}
return nil, err
2022-12-15 14:12:47 +00:00
}
func (r *jobRunner) convertPythonParams(opts *Options) error {
if r.bundle.Config.Experimental != nil && !r.bundle.Config.Experimental.PythonWheelWrapper {
return nil
}
needConvert := false
for _, task := range r.job.Tasks {
if task.PythonWheelTask != nil {
needConvert = true
break
}
}
if !needConvert {
return nil
}
if len(opts.Job.pythonParams) == 0 {
return nil
}
if opts.Job.notebookParams == nil {
opts.Job.notebookParams = make(map[string]string)
}
if len(opts.Job.pythonParams) > 0 {
if _, ok := opts.Job.notebookParams["__python_params"]; ok {
return fmt.Errorf("can't use __python_params as notebook param, the name is reserved for internal use")
}
p, err := json.Marshal(opts.Job.pythonParams)
if err != nil {
return err
}
opts.Job.notebookParams["__python_params"] = string(p)
}
return nil
}
func (r *jobRunner) Cancel(ctx context.Context) error {
w := r.bundle.WorkspaceClient()
jobID, err := strconv.ParseInt(r.job.ID, 10, 64)
if err != nil {
return fmt.Errorf("job ID is not an integer: %s", r.job.ID)
}
runs, err := w.Jobs.ListRunsAll(ctx, jobs.ListRunsRequest{
ActiveOnly: true,
JobId: jobID,
})
if err != nil {
return err
}
if len(runs) == 0 {
return nil
}
errGroup, errCtx := errgroup.WithContext(ctx)
for _, run := range runs {
runId := run.RunId
errGroup.Go(func() error {
wait, err := w.Jobs.CancelRun(errCtx, jobs.CancelRun{
RunId: runId,
})
if err != nil {
return err
}
// Waits for the Terminated or Skipped state
_, err = wait.GetWithTimeout(jobRunTimeout)
return err
})
}
return errGroup.Wait()
}
func (r *jobRunner) Restart(ctx context.Context, opts *Options) (output.RunOutput, error) {
// We don't need to cancel existing runs if the job is continuous and unpaused.
// the /jobs/run-now API will automatically cancel any existing runs before starting a new one.
//
// /jobs/run-now will not cancel existing runs if the job is continuous and paused.
// New job runs will be queued instead and will wait for existing runs to finish.
// In this case, we need to cancel the existing runs before starting a new one.
continuous := r.job.JobSettings.Continuous
if continuous != nil && continuous.PauseStatus == jobs.PauseStatusUnpaused {
return r.Run(ctx, opts)
}
s := cmdio.Spinner(ctx)
s <- "Cancelling all active job runs"
err := r.Cancel(ctx)
close(s)
if err != nil {
return nil, err
}
return r.Run(ctx, opts)
}
func (r *jobRunner) ParseArgs(args []string, opts *Options) error {
return r.posArgsHandler().ParseArgs(args, opts)
}
func (r *jobRunner) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return r.posArgsHandler().CompleteArgs(args, toComplete)
}