diff --git a/bundle/config/resources/apps.go b/bundle/config/resources/apps.go index aad93081..e6d3f00e 100644 --- a/bundle/config/resources/apps.go +++ b/bundle/config/resources/apps.go @@ -51,7 +51,7 @@ func (a *App) Exists(ctx context.Context, w *databricks.WorkspaceClient, name st } func (a *App) TerraformResourceName() string { - return "databricks_cluster" + return "databricks_app" } func (a *App) InitializeURL(baseURL url.URL) { diff --git a/bundle/run/app.go b/bundle/run/app.go index 84ed4308..ec46de9a 100644 --- a/bundle/run/app.go +++ b/bundle/run/app.go @@ -16,6 +16,7 @@ import ( "github.com/databricks/cli/libs/filer" "github.com/databricks/databricks-sdk-go/service/apps" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" "gopkg.in/yaml.v3" ) @@ -90,8 +91,7 @@ func (a *appRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e return nil, err } - // TODO: We should return the app URL here. - cmdio.LogString(ctx, "You can access the app at ") + cmdio.LogString(ctx, fmt.Sprintf("You can access the app at %s", createdApp.Url)) return nil, nil } @@ -106,7 +106,7 @@ func (a *appRunner) start(ctx context.Context) error { return err } - startedApp, err := wait.OnProgress(func(p *apps.App) { + _, err = wait.OnProgress(func(p *apps.App) { if p.AppStatus == nil { return } @@ -117,28 +117,6 @@ func (a *appRunner) start(ctx context.Context) error { return err } - // If the app has a pending deployment, wait for it to complete. - if startedApp.PendingDeployment != nil { - _, err := w.Apps.WaitGetDeploymentAppSucceeded(ctx, - startedApp.Name, startedApp.PendingDeployment.DeploymentId, - 20*time.Minute, nil) - - if err != nil { - return err - } - } - - // If the app has an active deployment, wait for it to complete as well - if startedApp.ActiveDeployment != nil { - _, err := w.Apps.WaitGetDeploymentAppSucceeded(ctx, - startedApp.Name, startedApp.ActiveDeployment.DeploymentId, - 20*time.Minute, nil) - - if err != nil { - return err - } - } - logProgress(ctx, "App is started!") return nil } @@ -182,8 +160,27 @@ func (a *appRunner) deploy(ctx context.Context) error { }, }) + // If deploy returns an error, then there's an active deployment in progress, wait for it to complete. if err != nil { - return err + logProgress(ctx, err.Error()) + err := a.waitForInProgressDeployments(ctx) + if err != nil { + return err + } + + // Retry the deployment. + wait, err = w.Apps.Deploy(ctx, apps.CreateAppDeploymentRequest{ + AppName: app.Name, + AppDeployment: &apps.AppDeployment{ + Mode: apps.AppDeploymentModeSnapshot, + SourceCodePath: app.SourceCodePath, + }, + }) + + // If it still fails, return the error. + if err != nil { + return err + } } _, err = wait.OnProgress(func(ad *apps.AppDeployment) { @@ -200,6 +197,37 @@ func (a *appRunner) deploy(ctx context.Context) error { return nil } +func (a *appRunner) waitForInProgressDeployments(ctx context.Context) error { + app := a.app + b := a.bundle + w := b.WorkspaceClient() + + allDeployments, err := w.Apps.ListDeploymentsAll(ctx, apps.ListAppDeploymentsRequest{AppName: app.Name}) + if err != nil { + return err + } + + errGrp, ctx := errgroup.WithContext(ctx) + for _, deployment := range allDeployments { + if deployment.Status.State == apps.AppDeploymentStateInProgress { + id := deployment.DeploymentId + + errGrp.Go(func() error { + logProgress(ctx, fmt.Sprintf("Waiting for deployment %s to complete", id)) + d, err := w.Apps.WaitGetDeploymentAppSucceeded(ctx, app.Name, id, 20*time.Minute, nil) + if err != nil { + return err + } + + logProgress(ctx, fmt.Sprintf("Deployment %s reached state %s", id, d.Status.State)) + return nil + }) + } + } + + return errGrp.Wait() +} + func (a *appRunner) Cancel(ctx context.Context) error { // We should cancel the app by stopping it. app := a.app diff --git a/internal/bundle/apps_test.go b/internal/bundle/apps_test.go new file mode 100644 index 00000000..b25b3916 --- /dev/null +++ b/internal/bundle/apps_test.go @@ -0,0 +1,80 @@ +package bundle + +import ( + "fmt" + "testing" + + "github.com/databricks/cli/internal" + "github.com/databricks/cli/internal/acc" + "github.com/databricks/cli/libs/env" + "github.com/databricks/databricks-sdk-go/service/apps" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func TestAccDeployBundleWithApp(t *testing.T) { + ctx, wt := acc.WorkspaceTest(t) + uniqueId := uuid.New().String() + appId := fmt.Sprintf("app-%s", uuid.New().String()[0:8]) + nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV")) + instancePoolId := env.Get(ctx, "TEST_INSTANCE_POOL_ID") + + root, err := initTestTemplate(t, ctx, "apps", map[string]any{ + "unique_id": uniqueId, + "app_id": appId, + "node_type_id": nodeTypeId, + "spark_version": defaultSparkVersion, + "instance_pool_id": instancePoolId, + }) + require.NoError(t, err) + + t.Cleanup(func() { + err = destroyBundle(t, ctx, root) + require.NoError(t, err) + + app, err := wt.W.Apps.Get(ctx, apps.GetAppRequest{Name: "test-app"}) + if err != nil { + require.ErrorContains(t, err, "does not exist") + } else { + require.Contains(t, []apps.ApplicationState{apps.ApplicationStateUnavailable}, app.AppStatus.State) + } + }) + + err = deployBundle(t, ctx, root) + require.NoError(t, err) + + // App should exists after bundle deployment + app, err := wt.W.Apps.Get(ctx, apps.GetAppRequest{Name: appId}) + require.NoError(t, err) + require.NotNil(t, app) + + // Try to run the app + _, out, err := runResourceWithStderr(t, ctx, root, "test_app") + require.NoError(t, err) + require.Contains(t, out, app.Url) + + // App should be in the running state + app, err = wt.W.Apps.Get(ctx, apps.GetAppRequest{Name: appId}) + require.NoError(t, err) + require.NotNil(t, app) + require.Equal(t, apps.ApplicationStateRunning, app.AppStatus.State) + + // Stop the app + wait, err := wt.W.Apps.Stop(ctx, apps.StopAppRequest{Name: appId}) + require.NoError(t, err) + app, err = wait.Get() + require.NoError(t, err) + require.NotNil(t, app) + require.Equal(t, apps.ApplicationStateUnavailable, app.AppStatus.State) + + // Try to run the app again + _, out, err = runResourceWithStderr(t, ctx, root, "test_app") + require.NoError(t, err) + require.Contains(t, out, app.Url) + + // App should be in the running state + app, err = wt.W.Apps.Get(ctx, apps.GetAppRequest{Name: appId}) + require.NoError(t, err) + require.NotNil(t, app) + require.Equal(t, apps.ApplicationStateRunning, app.AppStatus.State) +} diff --git a/internal/bundle/bundles/apps/databricks_template_schema.json b/internal/bundle/bundles/apps/databricks_template_schema.json new file mode 100644 index 00000000..c9faeabf --- /dev/null +++ b/internal/bundle/bundles/apps/databricks_template_schema.json @@ -0,0 +1,24 @@ +{ + "properties": { + "unique_id": { + "type": "string", + "description": "Unique ID for job name" + }, + "app_id": { + "type": "string", + "description": "Unique ID for app name" + }, + "spark_version": { + "type": "string", + "description": "Spark version used for job cluster" + }, + "node_type_id": { + "type": "string", + "description": "Node type id for job cluster" + }, + "instance_pool_id": { + "type": "string", + "description": "Instance pool id for job cluster" + } + } +} diff --git a/internal/bundle/bundles/apps/template/app/app.py b/internal/bundle/bundles/apps/template/app/app.py new file mode 100644 index 00000000..a60c786f --- /dev/null +++ b/internal/bundle/bundles/apps/template/app/app.py @@ -0,0 +1,15 @@ +import os + +from databricks.sdk import WorkspaceClient +from flask import Flask + +app = Flask(__name__) + + +@app.route("/") +def home(): + job_id = os.getenv("JOB_ID") + + w = WorkspaceClient() + job = w.jobs.get(job_id) + return job.settings.name diff --git a/internal/bundle/bundles/apps/template/databricks.yml.tmpl b/internal/bundle/bundles/apps/template/databricks.yml.tmpl new file mode 100644 index 00000000..9ab21bf6 --- /dev/null +++ b/internal/bundle/bundles/apps/template/databricks.yml.tmpl @@ -0,0 +1,42 @@ +bundle: + name: basic + +workspace: + root_path: "~/.bundle/{{.unique_id}}" + +resources: + apps: + test_app: + name: "{{.app_id}}" + description: "App which manages job created by this bundle" + source_code_path: ./app + config: + command: + - flask + - --app + - app + - run + env: + - name: JOB_ID + valueFrom: "app-job" + + resources: + - name: "app-job" + description: "A job for app to be able to work with" + job: + id: ${resources.jobs.foo.id} + permission: "CAN_MANAGE_RUN" + + jobs: + foo: + name: test-job-with-cluster-{{.unique_id}} + tasks: + - task_key: my_notebook_task + new_cluster: + num_workers: 1 + spark_version: "{{.spark_version}}" + node_type_id: "{{.node_type_id}}" + data_security_mode: USER_ISOLATION + instance_pool_id: "{{.instance_pool_id}}" + spark_python_task: + python_file: ./hello_world.py diff --git a/internal/bundle/bundles/apps/template/hello_world.py b/internal/bundle/bundles/apps/template/hello_world.py new file mode 100644 index 00000000..f301245e --- /dev/null +++ b/internal/bundle/bundles/apps/template/hello_world.py @@ -0,0 +1 @@ +print("Hello World!") diff --git a/internal/bundle/helpers.go b/internal/bundle/helpers.go index dd9c841c..f187bf9f 100644 --- a/internal/bundle/helpers.go +++ b/internal/bundle/helpers.go @@ -116,6 +116,15 @@ func runResource(t *testing.T, ctx context.Context, path string, key string) (st return stdout.String(), err } +func runResourceWithStderr(t *testing.T, ctx context.Context, path string, key string) (string, string, error) { + ctx = env.Set(ctx, "BUNDLE_ROOT", path) + ctx = cmdio.NewContext(ctx, cmdio.Default()) + + c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "run", key) + stdout, stderr, err := c.Run() + return stdout.String(), stderr.String(), err +} + func runResourceWithParams(t *testing.T, ctx context.Context, path string, key string, params ...string) (string, error) { ctx = env.Set(ctx, "BUNDLE_ROOT", path) ctx = cmdio.NewContext(ctx, cmdio.Default())