mirror of https://github.com/databricks/cli.git
Added integration test
This commit is contained in:
parent
d72b03eea6
commit
2ac239ab01
|
@ -51,7 +51,7 @@ func (a *App) Exists(ctx context.Context, w *databricks.WorkspaceClient, name st
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) TerraformResourceName() string {
|
func (a *App) TerraformResourceName() string {
|
||||||
return "databricks_cluster"
|
return "databricks_app"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *App) InitializeURL(baseURL url.URL) {
|
func (a *App) InitializeURL(baseURL url.URL) {
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/databricks/cli/libs/filer"
|
"github.com/databricks/cli/libs/filer"
|
||||||
"github.com/databricks/databricks-sdk-go/service/apps"
|
"github.com/databricks/databricks-sdk-go/service/apps"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
|
@ -90,8 +91,7 @@ func (a *appRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: We should return the app URL here.
|
cmdio.LogString(ctx, fmt.Sprintf("You can access the app at %s", createdApp.Url))
|
||||||
cmdio.LogString(ctx, "You can access the app at <app-url>")
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +106,7 @@ func (a *appRunner) start(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
startedApp, err := wait.OnProgress(func(p *apps.App) {
|
_, err = wait.OnProgress(func(p *apps.App) {
|
||||||
if p.AppStatus == nil {
|
if p.AppStatus == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -117,28 +117,6 @@ func (a *appRunner) start(ctx context.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the app has a pending deployment, wait for it to complete.
|
|
||||||
if startedApp.PendingDeployment != nil {
|
|
||||||
_, err := w.Apps.WaitGetDeploymentAppSucceeded(ctx,
|
|
||||||
startedApp.Name, startedApp.PendingDeployment.DeploymentId,
|
|
||||||
20*time.Minute, nil)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the app has an active deployment, wait for it to complete as well
|
|
||||||
if startedApp.ActiveDeployment != nil {
|
|
||||||
_, err := w.Apps.WaitGetDeploymentAppSucceeded(ctx,
|
|
||||||
startedApp.Name, startedApp.ActiveDeployment.DeploymentId,
|
|
||||||
20*time.Minute, nil)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logProgress(ctx, "App is started!")
|
logProgress(ctx, "App is started!")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -182,8 +160,27 @@ func (a *appRunner) deploy(ctx context.Context) error {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// If deploy returns an error, then there's an active deployment in progress, wait for it to complete.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
logProgress(ctx, err.Error())
|
||||||
|
err := a.waitForInProgressDeployments(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retry the deployment.
|
||||||
|
wait, err = w.Apps.Deploy(ctx, apps.CreateAppDeploymentRequest{
|
||||||
|
AppName: app.Name,
|
||||||
|
AppDeployment: &apps.AppDeployment{
|
||||||
|
Mode: apps.AppDeploymentModeSnapshot,
|
||||||
|
SourceCodePath: app.SourceCodePath,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// If it still fails, return the error.
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = wait.OnProgress(func(ad *apps.AppDeployment) {
|
_, err = wait.OnProgress(func(ad *apps.AppDeployment) {
|
||||||
|
@ -200,6 +197,37 @@ func (a *appRunner) deploy(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *appRunner) waitForInProgressDeployments(ctx context.Context) error {
|
||||||
|
app := a.app
|
||||||
|
b := a.bundle
|
||||||
|
w := b.WorkspaceClient()
|
||||||
|
|
||||||
|
allDeployments, err := w.Apps.ListDeploymentsAll(ctx, apps.ListAppDeploymentsRequest{AppName: app.Name})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
errGrp, ctx := errgroup.WithContext(ctx)
|
||||||
|
for _, deployment := range allDeployments {
|
||||||
|
if deployment.Status.State == apps.AppDeploymentStateInProgress {
|
||||||
|
id := deployment.DeploymentId
|
||||||
|
|
||||||
|
errGrp.Go(func() error {
|
||||||
|
logProgress(ctx, fmt.Sprintf("Waiting for deployment %s to complete", id))
|
||||||
|
d, err := w.Apps.WaitGetDeploymentAppSucceeded(ctx, app.Name, id, 20*time.Minute, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
logProgress(ctx, fmt.Sprintf("Deployment %s reached state %s", id, d.Status.State))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return errGrp.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func (a *appRunner) Cancel(ctx context.Context) error {
|
func (a *appRunner) Cancel(ctx context.Context) error {
|
||||||
// We should cancel the app by stopping it.
|
// We should cancel the app by stopping it.
|
||||||
app := a.app
|
app := a.app
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
package bundle
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/internal"
|
||||||
|
"github.com/databricks/cli/internal/acc"
|
||||||
|
"github.com/databricks/cli/libs/env"
|
||||||
|
"github.com/databricks/databricks-sdk-go/service/apps"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAccDeployBundleWithApp(t *testing.T) {
|
||||||
|
ctx, wt := acc.WorkspaceTest(t)
|
||||||
|
uniqueId := uuid.New().String()
|
||||||
|
appId := fmt.Sprintf("app-%s", uuid.New().String()[0:8])
|
||||||
|
nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
|
||||||
|
instancePoolId := env.Get(ctx, "TEST_INSTANCE_POOL_ID")
|
||||||
|
|
||||||
|
root, err := initTestTemplate(t, ctx, "apps", map[string]any{
|
||||||
|
"unique_id": uniqueId,
|
||||||
|
"app_id": appId,
|
||||||
|
"node_type_id": nodeTypeId,
|
||||||
|
"spark_version": defaultSparkVersion,
|
||||||
|
"instance_pool_id": instancePoolId,
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
err = destroyBundle(t, ctx, root)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
app, err := wt.W.Apps.Get(ctx, apps.GetAppRequest{Name: "test-app"})
|
||||||
|
if err != nil {
|
||||||
|
require.ErrorContains(t, err, "does not exist")
|
||||||
|
} else {
|
||||||
|
require.Contains(t, []apps.ApplicationState{apps.ApplicationStateUnavailable}, app.AppStatus.State)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
err = deployBundle(t, ctx, root)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// App should exists after bundle deployment
|
||||||
|
app, err := wt.W.Apps.Get(ctx, apps.GetAppRequest{Name: appId})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, app)
|
||||||
|
|
||||||
|
// Try to run the app
|
||||||
|
_, out, err := runResourceWithStderr(t, ctx, root, "test_app")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, out, app.Url)
|
||||||
|
|
||||||
|
// App should be in the running state
|
||||||
|
app, err = wt.W.Apps.Get(ctx, apps.GetAppRequest{Name: appId})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, app)
|
||||||
|
require.Equal(t, apps.ApplicationStateRunning, app.AppStatus.State)
|
||||||
|
|
||||||
|
// Stop the app
|
||||||
|
wait, err := wt.W.Apps.Stop(ctx, apps.StopAppRequest{Name: appId})
|
||||||
|
require.NoError(t, err)
|
||||||
|
app, err = wait.Get()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, app)
|
||||||
|
require.Equal(t, apps.ApplicationStateUnavailable, app.AppStatus.State)
|
||||||
|
|
||||||
|
// Try to run the app again
|
||||||
|
_, out, err = runResourceWithStderr(t, ctx, root, "test_app")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, out, app.Url)
|
||||||
|
|
||||||
|
// App should be in the running state
|
||||||
|
app, err = wt.W.Apps.Get(ctx, apps.GetAppRequest{Name: appId})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, app)
|
||||||
|
require.Equal(t, apps.ApplicationStateRunning, app.AppStatus.State)
|
||||||
|
}
|
|
@ -0,0 +1,24 @@
|
||||||
|
{
|
||||||
|
"properties": {
|
||||||
|
"unique_id": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Unique ID for job name"
|
||||||
|
},
|
||||||
|
"app_id": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Unique ID for app name"
|
||||||
|
},
|
||||||
|
"spark_version": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Spark version used for job cluster"
|
||||||
|
},
|
||||||
|
"node_type_id": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Node type id for job cluster"
|
||||||
|
},
|
||||||
|
"instance_pool_id": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Instance pool id for job cluster"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,15 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
from databricks.sdk import WorkspaceClient
|
||||||
|
from flask import Flask
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/")
|
||||||
|
def home():
|
||||||
|
job_id = os.getenv("JOB_ID")
|
||||||
|
|
||||||
|
w = WorkspaceClient()
|
||||||
|
job = w.jobs.get(job_id)
|
||||||
|
return job.settings.name
|
|
@ -0,0 +1,42 @@
|
||||||
|
bundle:
|
||||||
|
name: basic
|
||||||
|
|
||||||
|
workspace:
|
||||||
|
root_path: "~/.bundle/{{.unique_id}}"
|
||||||
|
|
||||||
|
resources:
|
||||||
|
apps:
|
||||||
|
test_app:
|
||||||
|
name: "{{.app_id}}"
|
||||||
|
description: "App which manages job created by this bundle"
|
||||||
|
source_code_path: ./app
|
||||||
|
config:
|
||||||
|
command:
|
||||||
|
- flask
|
||||||
|
- --app
|
||||||
|
- app
|
||||||
|
- run
|
||||||
|
env:
|
||||||
|
- name: JOB_ID
|
||||||
|
valueFrom: "app-job"
|
||||||
|
|
||||||
|
resources:
|
||||||
|
- name: "app-job"
|
||||||
|
description: "A job for app to be able to work with"
|
||||||
|
job:
|
||||||
|
id: ${resources.jobs.foo.id}
|
||||||
|
permission: "CAN_MANAGE_RUN"
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
foo:
|
||||||
|
name: test-job-with-cluster-{{.unique_id}}
|
||||||
|
tasks:
|
||||||
|
- task_key: my_notebook_task
|
||||||
|
new_cluster:
|
||||||
|
num_workers: 1
|
||||||
|
spark_version: "{{.spark_version}}"
|
||||||
|
node_type_id: "{{.node_type_id}}"
|
||||||
|
data_security_mode: USER_ISOLATION
|
||||||
|
instance_pool_id: "{{.instance_pool_id}}"
|
||||||
|
spark_python_task:
|
||||||
|
python_file: ./hello_world.py
|
|
@ -0,0 +1 @@
|
||||||
|
print("Hello World!")
|
|
@ -116,6 +116,15 @@ func runResource(t *testing.T, ctx context.Context, path string, key string) (st
|
||||||
return stdout.String(), err
|
return stdout.String(), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runResourceWithStderr(t *testing.T, ctx context.Context, path string, key string) (string, string, error) {
|
||||||
|
ctx = env.Set(ctx, "BUNDLE_ROOT", path)
|
||||||
|
ctx = cmdio.NewContext(ctx, cmdio.Default())
|
||||||
|
|
||||||
|
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "run", key)
|
||||||
|
stdout, stderr, err := c.Run()
|
||||||
|
return stdout.String(), stderr.String(), err
|
||||||
|
}
|
||||||
|
|
||||||
func runResourceWithParams(t *testing.T, ctx context.Context, path string, key string, params ...string) (string, error) {
|
func runResourceWithParams(t *testing.T, ctx context.Context, path string, key string, params ...string) (string, error) {
|
||||||
ctx = env.Set(ctx, "BUNDLE_ROOT", path)
|
ctx = env.Set(ctx, "BUNDLE_ROOT", path)
|
||||||
ctx = cmdio.NewContext(ctx, cmdio.Default())
|
ctx = cmdio.NewContext(ctx, cmdio.Default())
|
||||||
|
|
Loading…
Reference in New Issue