Added end-to-end test for deploying and running Python wheel task (#741)

## Changes
Added end-to-end test for deploying and running Python wheel task

## Tests
Test successfully passed on all environments, takes about 9-10 minutes
to pass.

```
Deleted snapshot file at /var/folders/nt/xjv68qzs45319w4k36dhpylc0000gp/T/TestAccPythonWheelTaskDeployAndRun1845899209/002/.databricks/bundle/default/sync-snapshots/1f7cc766ffe038d6.json
Successfully deleted files!
2023/09/06 17:50:50 INFO Releasing deployment lock mutator=destroy mutator=seq mutator=seq mutator=deferred mutator=lock:release
--- PASS: TestAccPythonWheelTaskDeployAndRun (508.16s)
PASS
coverage: 77.9% of statements in ./...
ok      github.com/databricks/cli/internal/bundle       508.810s        coverage: 77.9% of statements in ./...
```

---------

Co-authored-by: Pieter Noordhuis <pieter.noordhuis@databricks.com>
This commit is contained in:
Andrew Nester 2023-09-07 16:08:16 +02:00 committed by GitHub
parent c0ebfb8101
commit 10e0836749
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 196 additions and 1 deletions

View File

@ -8,6 +8,7 @@ import (
"path/filepath" "path/filepath"
"runtime" "runtime"
"strings" "strings"
"time"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config"
@ -59,6 +60,7 @@ func (m *initialize) findExecPath(ctx context.Context, b *bundle.Bundle, tf *con
Product: product.Terraform, Product: product.Terraform,
Version: version.Must(version.NewVersion("1.5.5")), Version: version.Must(version.NewVersion("1.5.5")),
InstallDir: binDir, InstallDir: binDir,
Timeout: 1 * time.Minute,
} }
execPath, err = installer.Install(ctx) execPath, err = installer.Install(ctx)
if err != nil { if err != nil {

View File

@ -0,0 +1,17 @@
{
"properties": {
"project_name": {
"type": "string",
"default": "my_test_code",
"description": "Unique name for this project"
},
"spark_version": {
"type": "string",
"description": "Spark version used for job cluster"
},
"node_type_id": {
"type": "string",
"description": "Node type id for job cluster"
}
}
}

View File

@ -0,0 +1,21 @@
bundle:
name: wheel-task
resources:
jobs:
some_other_job:
name: "[${bundle.target}] Test Wheel Job"
tasks:
- task_key: TestTask
new_cluster:
num_workers: 1
spark_version: "{{.spark_version}}"
node_type_id: "{{.node_type_id}}"
python_wheel_task:
package_name: my_test_code
entry_point: run
parameters:
- "one"
- "two"
libraries:
- whl: ./dist/*.whl

View File

@ -0,0 +1,15 @@
from setuptools import setup, find_packages
import {{.project_name}}
setup(
name="{{.project_name}}",
version={{.project_name}}.__version__,
author={{.project_name}}.__author__,
url="https://databricks.com",
author_email="john.doe@databricks.com",
description="my example wheel",
packages=find_packages(include=["{{.project_name}}"]),
entry_points={"group1": "run={{.project_name}}.__main__:main"},
install_requires=["setuptools"],
)

View File

@ -0,0 +1,2 @@
__version__ = "0.0.1"
__author__ = "Databricks"

View File

@ -0,0 +1,16 @@
"""
The entry point of the Python Wheel
"""
import sys
def main():
# This method will print the provided arguments
print("Hello from my func")
print("Got arguments:")
print(sys.argv)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,70 @@
package bundle
import (
"context"
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/flags"
"github.com/databricks/cli/libs/template"
)
func initTestTemplate(t *testing.T, templateName string, config map[string]any) (string, error) {
templateRoot := filepath.Join("bundles", templateName)
bundleRoot := t.TempDir()
configFilePath, err := writeConfigFile(t, config)
if err != nil {
return "", err
}
ctx := root.SetWorkspaceClient(context.Background(), nil)
cmd := cmdio.NewIO(flags.OutputJSON, strings.NewReader(""), os.Stdout, os.Stderr, "bundles")
ctx = cmdio.InContext(ctx, cmd)
err = template.Materialize(ctx, configFilePath, templateRoot, bundleRoot)
return bundleRoot, err
}
func writeConfigFile(t *testing.T, config map[string]any) (string, error) {
bytes, err := json.Marshal(config)
if err != nil {
return "", err
}
dir := t.TempDir()
filepath := filepath.Join(dir, "config.json")
t.Log("Configuration for template: ", string(bytes))
err = os.WriteFile(filepath, bytes, 0644)
return filepath, err
}
func deployBundle(t *testing.T, path string) error {
t.Setenv("BUNDLE_ROOT", path)
c := internal.NewCobraTestRunner(t, "bundle", "deploy", "--force-lock")
_, _, err := c.Run()
return err
}
func runResource(t *testing.T, path string, key string) (string, error) {
ctx := context.Background()
ctx = cmdio.NewContext(ctx, cmdio.Default())
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "run", key)
stdout, _, err := c.Run()
return stdout.String(), err
}
func destroyBundle(t *testing.T, path string) error {
t.Setenv("BUNDLE_ROOT", path)
c := internal.NewCobraTestRunner(t, "bundle", "destroy", "--auto-approve")
_, _, err := c.Run()
return err
}

View File

@ -0,0 +1,41 @@
package bundle
import (
"testing"
"github.com/databricks/cli/internal"
"github.com/stretchr/testify/require"
)
func TestAccPythonWheelTaskDeployAndRun(t *testing.T) {
env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
t.Log(env)
var nodeTypeId string
if env == "gcp" {
nodeTypeId = "n1-standard-4"
} else if env == "aws" {
nodeTypeId = "i3.xlarge"
} else {
nodeTypeId = "Standard_DS4_v2"
}
bundleRoot, err := initTestTemplate(t, "python_wheel_task", map[string]any{
"node_type_id": nodeTypeId,
"spark_version": "13.2.x-snapshot-scala2.12",
})
require.NoError(t, err)
err = deployBundle(t, bundleRoot)
require.NoError(t, err)
t.Cleanup(func() {
destroyBundle(t, bundleRoot)
})
out, err := runResource(t, bundleRoot, "some_other_job")
require.NoError(t, err)
require.Contains(t, out, "Hello from my func")
require.Contains(t, out, "Got arguments:")
require.Contains(t, out, "['python', 'one', 'two']")
}

View File

@ -58,6 +58,8 @@ type cobraTestRunner struct {
stdout bytes.Buffer stdout bytes.Buffer
stderr bytes.Buffer stderr bytes.Buffer
ctx context.Context
// Line-by-line output. // Line-by-line output.
// Background goroutines populate these channels by reading from stdout/stderr pipes. // Background goroutines populate these channels by reading from stdout/stderr pipes.
stdoutLines <-chan string stdoutLines <-chan string
@ -128,7 +130,7 @@ func (t *cobraTestRunner) RunBackground() {
t.registerFlagCleanup(root) t.registerFlagCleanup(root)
errch := make(chan error) errch := make(chan error)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(t.ctx)
// Tee stdout/stderr to buffers. // Tee stdout/stderr to buffers.
stdoutR = io.TeeReader(stdoutR, &t.stdout) stdoutR = io.TeeReader(stdoutR, &t.stdout)
@ -234,6 +236,15 @@ func (c *cobraTestRunner) Eventually(condition func() bool, waitFor time.Duratio
func NewCobraTestRunner(t *testing.T, args ...string) *cobraTestRunner { func NewCobraTestRunner(t *testing.T, args ...string) *cobraTestRunner {
return &cobraTestRunner{ return &cobraTestRunner{
T: t, T: t,
ctx: context.Background(),
args: args,
}
}
func NewCobraTestRunnerWithContext(t *testing.T, ctx context.Context, args ...string) *cobraTestRunner {
return &cobraTestRunner{
T: t,
ctx: ctx,
args: args, args: args,
} }
} }