2022-12-15 14:12:47 +00:00
|
|
|
package run
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
|
2023-05-16 16:35:39 +00:00
|
|
|
"github.com/databricks/cli/bundle"
|
|
|
|
"github.com/databricks/cli/bundle/config/resources"
|
|
|
|
"github.com/databricks/cli/bundle/run/output"
|
|
|
|
"github.com/databricks/cli/bundle/run/progress"
|
|
|
|
"github.com/databricks/cli/libs/cmdio"
|
|
|
|
"github.com/databricks/cli/libs/log"
|
2022-12-15 14:12:47 +00:00
|
|
|
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
|
|
|
)
|
|
|
|
|
2023-03-16 11:23:46 +00:00
|
|
|
func filterEventsByUpdateId(events []pipelines.PipelineEvent, updateId string) []pipelines.PipelineEvent {
|
|
|
|
result := []pipelines.PipelineEvent{}
|
|
|
|
for i := 0; i < len(events); i++ {
|
|
|
|
if events[i].Origin.UpdateId == updateId {
|
|
|
|
result = append(result, events[i])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
2023-03-17 14:17:31 +00:00
|
|
|
func (r *pipelineRunner) logEvent(ctx context.Context, event pipelines.PipelineEvent) {
|
|
|
|
logString := ""
|
2023-03-16 11:23:46 +00:00
|
|
|
if event.Message != "" {
|
|
|
|
logString += fmt.Sprintf(" %s\n", event.Message)
|
|
|
|
}
|
|
|
|
if event.Error != nil && len(event.Error.Exceptions) > 0 {
|
|
|
|
logString += "trace for most recent exception: \n"
|
|
|
|
for i := 0; i < len(event.Error.Exceptions); i++ {
|
|
|
|
logString += fmt.Sprintf("%s\n", event.Error.Exceptions[i].Message)
|
|
|
|
}
|
|
|
|
}
|
2023-03-17 14:17:31 +00:00
|
|
|
if logString != "" {
|
|
|
|
log.Errorf(ctx, fmt.Sprintf("[%s] %s", event.EventType, logString))
|
2023-03-16 11:23:46 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, updateId string) error {
|
|
|
|
w := r.bundle.WorkspaceClient()
|
|
|
|
|
|
|
|
// Note: For a 100 percent correct and complete solution we should use the
|
|
|
|
// w.Pipelines.ListPipelineEventsAll method to find all relevant events. However the
|
|
|
|
// probablity of the relevant last error event not being present in the most
|
|
|
|
// recent 100 error events is very close to 0 and the first 100 error events
|
|
|
|
// should give us a good picture of the error.
|
|
|
|
//
|
|
|
|
// Otherwise for long lived pipelines, there can be a lot of unnecessary
|
|
|
|
// latency due to multiple pagination API calls needed underneath the hood for
|
|
|
|
// ListPipelineEventsAll
|
2023-04-21 08:30:20 +00:00
|
|
|
res, err := w.Pipelines.Impl().ListPipelineEvents(ctx, pipelines.ListPipelineEventsRequest{
|
2023-03-16 11:23:46 +00:00
|
|
|
Filter: `level='ERROR'`,
|
|
|
|
MaxResults: 100,
|
|
|
|
PipelineId: pipelineId,
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
updateEvents := filterEventsByUpdateId(res.Events, updateId)
|
|
|
|
// The events API returns most recent events first. We iterate in a reverse order
|
|
|
|
// to print the events chronologically
|
|
|
|
for i := len(updateEvents) - 1; i >= 0; i-- {
|
2023-03-17 14:17:31 +00:00
|
|
|
r.logEvent(ctx, updateEvents[i])
|
2023-03-16 11:23:46 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-12-15 14:12:47 +00:00
|
|
|
type pipelineRunner struct {
|
|
|
|
key
|
|
|
|
|
|
|
|
bundle *bundle.Bundle
|
|
|
|
pipeline *resources.Pipeline
|
|
|
|
}
|
|
|
|
|
2023-09-11 18:03:12 +00:00
|
|
|
func (r *pipelineRunner) Name() string {
|
|
|
|
if r.pipeline == nil || r.pipeline.PipelineSpec == nil {
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
return r.pipeline.PipelineSpec.Name
|
|
|
|
}
|
|
|
|
|
2023-04-14 12:40:34 +00:00
|
|
|
func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, error) {
|
2022-12-15 14:12:47 +00:00
|
|
|
var pipelineID = r.pipeline.ID
|
|
|
|
|
2023-03-17 14:17:31 +00:00
|
|
|
// Include resource key in logger.
|
|
|
|
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("resource", r.Key()))
|
2022-12-15 14:12:47 +00:00
|
|
|
w := r.bundle.WorkspaceClient()
|
2022-12-22 08:46:17 +00:00
|
|
|
_, err := w.Pipelines.GetByPipelineId(ctx, pipelineID)
|
2022-12-15 14:12:47 +00:00
|
|
|
if err != nil {
|
2023-03-17 14:17:31 +00:00
|
|
|
log.Warnf(ctx, "Cannot get pipeline: %s", err)
|
2023-03-21 15:25:18 +00:00
|
|
|
return nil, err
|
2022-12-15 14:12:47 +00:00
|
|
|
}
|
|
|
|
|
2024-01-15 07:42:36 +00:00
|
|
|
req, err := opts.Pipeline.toPayload(r.pipeline, pipelineID)
|
2022-12-23 14:17:16 +00:00
|
|
|
if err != nil {
|
2023-03-21 15:25:18 +00:00
|
|
|
return nil, err
|
2022-12-23 14:17:16 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
res, err := w.Pipelines.StartUpdate(ctx, *req)
|
2022-12-15 14:12:47 +00:00
|
|
|
if err != nil {
|
2023-03-21 15:25:18 +00:00
|
|
|
return nil, err
|
2022-12-15 14:12:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
updateID := res.UpdateId
|
|
|
|
|
2023-03-31 15:04:12 +00:00
|
|
|
// setup progress logger and tracker to query events
|
2023-04-14 12:40:34 +00:00
|
|
|
updateTracker := progress.NewUpdateTracker(pipelineID, updateID, w)
|
2023-04-06 10:54:58 +00:00
|
|
|
progressLogger, ok := cmdio.FromContext(ctx)
|
2023-03-31 15:04:12 +00:00
|
|
|
if !ok {
|
|
|
|
return nil, fmt.Errorf("no progress logger found")
|
|
|
|
}
|
|
|
|
|
2022-12-15 14:12:47 +00:00
|
|
|
// Log the pipeline update URL as soon as it is available.
|
2023-04-18 12:40:45 +00:00
|
|
|
progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID))
|
2022-12-15 14:12:47 +00:00
|
|
|
|
2023-07-12 06:51:54 +00:00
|
|
|
if opts.NoWait {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
2022-12-15 14:12:47 +00:00
|
|
|
// Poll update for completion and post status.
|
|
|
|
// Note: there is no "StartUpdateAndWait" wrapper for this API.
|
|
|
|
var prevState *pipelines.UpdateInfoState
|
|
|
|
for {
|
2023-03-31 15:04:12 +00:00
|
|
|
events, err := updateTracker.Events(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
for _, event := range events {
|
|
|
|
progressLogger.Log(&event)
|
|
|
|
log.Infof(ctx, event.String())
|
|
|
|
}
|
|
|
|
|
2022-12-15 14:12:47 +00:00
|
|
|
update, err := w.Pipelines.GetUpdateByPipelineIdAndUpdateId(ctx, pipelineID, updateID)
|
|
|
|
if err != nil {
|
2023-03-21 15:25:18 +00:00
|
|
|
return nil, err
|
2022-12-15 14:12:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Log only if the current state is different from the previous state.
|
|
|
|
state := update.Update.State
|
|
|
|
if prevState == nil || *prevState != state {
|
2023-03-17 14:17:31 +00:00
|
|
|
log.Infof(ctx, "Update status: %s", state)
|
2022-12-15 14:12:47 +00:00
|
|
|
prevState = &state
|
|
|
|
}
|
|
|
|
|
|
|
|
if state == pipelines.UpdateInfoStateCanceled {
|
2023-03-17 14:17:31 +00:00
|
|
|
log.Infof(ctx, "Update was cancelled!")
|
2023-03-21 15:25:18 +00:00
|
|
|
return nil, fmt.Errorf("update cancelled")
|
2022-12-15 14:12:47 +00:00
|
|
|
}
|
|
|
|
if state == pipelines.UpdateInfoStateFailed {
|
2023-03-17 14:17:31 +00:00
|
|
|
log.Infof(ctx, "Update has failed!")
|
2023-03-16 11:23:46 +00:00
|
|
|
err := r.logErrorEvent(ctx, pipelineID, updateID)
|
|
|
|
if err != nil {
|
2023-03-21 15:25:18 +00:00
|
|
|
return nil, err
|
2023-03-16 11:23:46 +00:00
|
|
|
}
|
2023-03-21 15:25:18 +00:00
|
|
|
return nil, fmt.Errorf("update failed")
|
2022-12-15 14:12:47 +00:00
|
|
|
}
|
|
|
|
if state == pipelines.UpdateInfoStateCompleted {
|
2023-03-17 14:17:31 +00:00
|
|
|
log.Infof(ctx, "Update has completed successfully!")
|
2023-03-21 15:25:18 +00:00
|
|
|
return nil, nil
|
2022-12-15 14:12:47 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
}
|
|
|
|
}
|
2024-02-09 14:33:14 +00:00
|
|
|
|
|
|
|
func (r *pipelineRunner) Cancel(ctx context.Context) error {
|
|
|
|
w := r.bundle.WorkspaceClient()
|
|
|
|
wait, err := w.Pipelines.Stop(ctx, pipelines.StopRequest{
|
|
|
|
PipelineId: r.pipeline.ID,
|
|
|
|
})
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Waits for the Idle state of the pipeline
|
|
|
|
_, err = wait.GetWithTimeout(jobRunTimeout)
|
|
|
|
return err
|
|
|
|
}
|