2024-02-09 14:33:14 +00:00
|
|
|
package run
|
|
|
|
|
|
|
|
import (
|
2024-10-22 14:59:17 +00:00
|
|
|
"bytes"
|
2024-02-09 14:33:14 +00:00
|
|
|
"context"
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/databricks/cli/bundle"
|
|
|
|
"github.com/databricks/cli/bundle/config"
|
|
|
|
"github.com/databricks/cli/bundle/config/resources"
|
2024-10-22 14:59:17 +00:00
|
|
|
"github.com/databricks/cli/libs/cmdio"
|
|
|
|
"github.com/databricks/cli/libs/flags"
|
|
|
|
sdk_config "github.com/databricks/databricks-sdk-go/config"
|
2024-02-09 14:33:14 +00:00
|
|
|
"github.com/databricks/databricks-sdk-go/experimental/mocks"
|
|
|
|
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
2024-10-22 14:59:17 +00:00
|
|
|
"github.com/stretchr/testify/mock"
|
2024-02-09 14:33:14 +00:00
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
)
|
|
|
|
|
|
|
|
func TestPipelineRunnerCancel(t *testing.T) {
|
|
|
|
pipeline := &resources.Pipeline{
|
|
|
|
ID: "123",
|
|
|
|
}
|
|
|
|
|
|
|
|
b := &bundle.Bundle{
|
|
|
|
Config: config.Root{
|
|
|
|
Resources: config.Resources{
|
|
|
|
Pipelines: map[string]*resources.Pipeline{
|
|
|
|
"test_pipeline": pipeline,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
runner := pipelineRunner{key: "test", bundle: b, pipeline: pipeline}
|
|
|
|
|
|
|
|
m := mocks.NewMockWorkspaceClient(t)
|
|
|
|
b.SetWorkpaceClient(m.WorkspaceClient)
|
|
|
|
|
|
|
|
mockWait := &pipelines.WaitGetPipelineIdle[struct{}]{
|
|
|
|
Poll: func(time.Duration, func(*pipelines.GetPipelineResponse)) (*pipelines.GetPipelineResponse, error) {
|
|
|
|
return nil, nil
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
pipelineApi := m.GetMockPipelinesAPI()
|
|
|
|
pipelineApi.EXPECT().Stop(context.Background(), pipelines.StopRequest{
|
|
|
|
PipelineId: "123",
|
|
|
|
}).Return(mockWait, nil)
|
|
|
|
|
|
|
|
err := runner.Cancel(context.Background())
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|
2024-10-22 14:59:17 +00:00
|
|
|
|
|
|
|
func TestPipelineRunnerRestart(t *testing.T) {
|
|
|
|
pipeline := &resources.Pipeline{
|
|
|
|
ID: "123",
|
|
|
|
}
|
|
|
|
|
|
|
|
b := &bundle.Bundle{
|
|
|
|
Config: config.Root{
|
|
|
|
Resources: config.Resources{
|
|
|
|
Pipelines: map[string]*resources.Pipeline{
|
|
|
|
"test_pipeline": pipeline,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
runner := pipelineRunner{key: "test", bundle: b, pipeline: pipeline}
|
|
|
|
|
|
|
|
m := mocks.NewMockWorkspaceClient(t)
|
|
|
|
m.WorkspaceClient.Config = &sdk_config.Config{
|
|
|
|
Host: "https://test.com",
|
|
|
|
}
|
|
|
|
b.SetWorkpaceClient(m.WorkspaceClient)
|
|
|
|
ctx := context.Background()
|
|
|
|
ctx = cmdio.InContext(ctx, cmdio.NewIO(flags.OutputText, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, "", "..."))
|
|
|
|
ctx = cmdio.NewContext(ctx, cmdio.NewLogger(flags.ModeAppend))
|
|
|
|
|
|
|
|
mockWait := &pipelines.WaitGetPipelineIdle[struct{}]{
|
|
|
|
Poll: func(time.Duration, func(*pipelines.GetPipelineResponse)) (*pipelines.GetPipelineResponse, error) {
|
|
|
|
return nil, nil
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
pipelineApi := m.GetMockPipelinesAPI()
|
|
|
|
pipelineApi.EXPECT().Stop(mock.Anything, pipelines.StopRequest{
|
|
|
|
PipelineId: "123",
|
|
|
|
}).Return(mockWait, nil)
|
|
|
|
|
|
|
|
pipelineApi.EXPECT().GetByPipelineId(mock.Anything, "123").Return(&pipelines.GetPipelineResponse{}, nil)
|
|
|
|
|
|
|
|
// Mock runner starting a new update
|
|
|
|
pipelineApi.EXPECT().StartUpdate(mock.Anything, pipelines.StartUpdate{
|
|
|
|
PipelineId: "123",
|
|
|
|
}).Return(&pipelines.StartUpdateResponse{
|
|
|
|
UpdateId: "456",
|
|
|
|
}, nil)
|
|
|
|
|
|
|
|
// Mock runner polling for events
|
|
|
|
pipelineApi.EXPECT().ListPipelineEventsAll(mock.Anything, pipelines.ListPipelineEventsRequest{
|
|
|
|
Filter: `update_id = '456'`,
|
|
|
|
MaxResults: 100,
|
|
|
|
PipelineId: "123",
|
|
|
|
}).Return([]pipelines.PipelineEvent{}, nil)
|
|
|
|
|
|
|
|
// Mock runner polling for update status
|
|
|
|
pipelineApi.EXPECT().GetUpdateByPipelineIdAndUpdateId(mock.Anything, "123", "456").
|
|
|
|
Return(&pipelines.GetUpdateResponse{
|
|
|
|
Update: &pipelines.UpdateInfo{
|
|
|
|
State: pipelines.UpdateInfoStateCompleted,
|
|
|
|
},
|
|
|
|
}, nil)
|
|
|
|
|
|
|
|
_, err := runner.Restart(ctx, &Options{})
|
|
|
|
require.NoError(t, err)
|
|
|
|
}
|