mirror of https://github.com/databricks/cli.git
wip
This commit is contained in:
parent
571076d5e1
commit
9ccea749e8
|
@ -317,6 +317,25 @@ func (r *jobRunner) Cancel(ctx context.Context) error {
|
|||
return errGroup.Wait()
|
||||
}
|
||||
|
||||
func (r *jobRunner) Restart(ctx context.Context, opts *Options) (output.RunOutput, error) {
|
||||
// We don't need to cancel existing runs if the job is continuous and unpaused.
|
||||
// the /jobs/run-now API will automatically cancel any existing runs before starting a new one.
|
||||
continuous := r.job.JobSettings.Continuous
|
||||
if continuous != nil && continuous.PauseStatus == jobs.PauseStatusUnpaused {
|
||||
r.Run(ctx, opts)
|
||||
}
|
||||
|
||||
s := cmdio.Spinner(ctx)
|
||||
s <- "Cancelling all active job runs"
|
||||
err := r.Cancel(ctx)
|
||||
close(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Run(ctx, opts)
|
||||
}
|
||||
|
||||
func (r *jobRunner) ParseArgs(args []string, opts *Options) error {
|
||||
return r.posArgsHandler().ParseArgs(args, opts)
|
||||
}
|
||||
|
|
|
@ -183,6 +183,18 @@ func (r *pipelineRunner) Cancel(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (r *pipelineRunner) Restart(ctx context.Context, opts *Options) (output.RunOutput, error) {
|
||||
s := cmdio.Spinner(ctx)
|
||||
s <- "Cancelling the active pipeline update"
|
||||
err := r.Cancel(ctx)
|
||||
close(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Run(ctx, opts)
|
||||
}
|
||||
|
||||
func (r *pipelineRunner) ParseArgs(args []string, opts *Options) error {
|
||||
if len(args) == 0 {
|
||||
return nil
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package run
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -8,8 +9,12 @@ import (
|
|||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/cli/libs/cmdio"
|
||||
"github.com/databricks/cli/libs/flags"
|
||||
sdk_config "github.com/databricks/databricks-sdk-go/config"
|
||||
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
||||
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -47,3 +52,66 @@ func TestPipelineRunnerCancel(t *testing.T) {
|
|||
err := runner.Cancel(context.Background())
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestPipelineRunnerRestart(t *testing.T) {
|
||||
pipeline := &resources.Pipeline{
|
||||
ID: "123",
|
||||
}
|
||||
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Pipelines: map[string]*resources.Pipeline{
|
||||
"test_pipeline": pipeline,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
runner := pipelineRunner{key: "test", bundle: b, pipeline: pipeline}
|
||||
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
m.WorkspaceClient.Config = &sdk_config.Config{
|
||||
Host: "https://test.com",
|
||||
}
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
ctx := context.Background()
|
||||
ctx = cmdio.InContext(ctx, cmdio.NewIO(flags.OutputText, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, "", "..."))
|
||||
ctx = cmdio.NewContext(ctx, cmdio.NewLogger(flags.ModeAppend))
|
||||
|
||||
mockWait := &pipelines.WaitGetPipelineIdle[struct{}]{
|
||||
Poll: func(time.Duration, func(*pipelines.GetPipelineResponse)) (*pipelines.GetPipelineResponse, error) {
|
||||
return nil, nil
|
||||
},
|
||||
}
|
||||
|
||||
pipelineApi := m.GetMockPipelinesAPI()
|
||||
pipelineApi.EXPECT().Stop(mock.Anything, pipelines.StopRequest{
|
||||
PipelineId: "123",
|
||||
}).Return(mockWait, nil)
|
||||
|
||||
// Runner does a
|
||||
pipelineApi.EXPECT().GetByPipelineId(mock.Anything, "123").Return(&pipelines.GetPipelineResponse{}, nil)
|
||||
|
||||
pipelineApi.EXPECT().StartUpdate(mock.Anything, pipelines.StartUpdate{
|
||||
PipelineId: "123",
|
||||
}).Return(&pipelines.StartUpdateResponse{
|
||||
UpdateId: "456",
|
||||
}, nil)
|
||||
|
||||
pipelineApi.EXPECT().ListPipelineEventsAll(mock.Anything, pipelines.ListPipelineEventsRequest{
|
||||
Filter: `update_id = '456'`,
|
||||
MaxResults: 100,
|
||||
PipelineId: "123",
|
||||
}).Return([]pipelines.PipelineEvent{}, nil)
|
||||
|
||||
pipelineApi.EXPECT().GetUpdateByPipelineIdAndUpdateId(mock.Anything, "123", "456").
|
||||
Return(&pipelines.GetUpdateResponse{
|
||||
Update: &pipelines.UpdateInfo{
|
||||
State: pipelines.UpdateInfoStateCompleted,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
_, err := runner.Restart(ctx, &Options{})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
@ -27,6 +27,10 @@ type Runner interface {
|
|||
// Run the underlying worklow.
|
||||
Run(ctx context.Context, opts *Options) (output.RunOutput, error)
|
||||
|
||||
// Restart the underlying workflow by cancelling any existing runs before
|
||||
// starting a new one.
|
||||
Restart(ctx context.Context, opts *Options) (output.RunOutput, error)
|
||||
|
||||
// Cancel the underlying workflow.
|
||||
Cancel(ctx context.Context) error
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/databricks/cli/bundle/deploy/terraform"
|
||||
"github.com/databricks/cli/bundle/phases"
|
||||
"github.com/databricks/cli/bundle/run"
|
||||
"github.com/databricks/cli/bundle/run/output"
|
||||
"github.com/databricks/cli/cmd/bundle/utils"
|
||||
"github.com/databricks/cli/cmd/root"
|
||||
"github.com/databricks/cli/libs/cmdio"
|
||||
|
@ -100,19 +101,17 @@ task or a Python wheel task, the second example applies.
|
|||
}
|
||||
|
||||
runOptions.NoWait = noWait
|
||||
|
||||
var output output.RunOutput
|
||||
if restart {
|
||||
s := cmdio.Spinner(ctx)
|
||||
s <- "Cancelling all runs"
|
||||
err := runner.Cancel(ctx)
|
||||
close(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
output, err := runner.Run(ctx, &runOptions)
|
||||
output, err = runner.Restart(ctx, &runOptions)
|
||||
} else {
|
||||
output, err = runner.Run(ctx, &runOptions)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if output != nil {
|
||||
switch root.OutputType(cmd) {
|
||||
case flags.OutputText:
|
||||
|
|
Loading…
Reference in New Issue