Make a notebook wrapper for Python wheel tasks optional (#797)

## Changes
Instead of always using notebook wrapper for Python wheel tasks, let's
make this an opt-in option.

Now by default Python wheel tasks will be deployed as is to Databricks
platform.
If notebook wrapper required (DBR < 13.1 or other configuration
differences), users can provide a following experimental setting

```
experimental:
  python_wheel_wrapper: true
```

Fixes #783,
https://github.com/databricks/databricks-asset-bundles-dais2023/issues/8

## Tests
Added unit tests.

Integration tests passed for both cases

```
    helpers.go:163: [databricks stdout]: Hello from my func
    helpers.go:163: [databricks stdout]: Got arguments:
    helpers.go:163: [databricks stdout]: ['my_test_code', 'one', 'two']
    ...
Bundle remote directory is ***/.bundle/ac05d5e8-ed4b-4e34-b3f2-afa73f62b021
Deleted snapshot file at /var/folders/nt/xjv68qzs45319w4k36dhpylc0000gp/T/TestAccPythonWheelTaskDeployAndRunWithWrapper3733431114/001/.databricks/bundle/default/sync-snapshots/cac1e02f3941a97b.json
Successfully deleted files!
--- PASS: TestAccPythonWheelTaskDeployAndRunWithWrapper (214.18s)
PASS
coverage: 93.5% of statements in ./...
ok      github.com/databricks/cli/internal/bundle       214.495s        coverage: 93.5% of statements in ./...

```

```
    helpers.go:163: [databricks stdout]: Hello from my func
    helpers.go:163: [databricks stdout]: Got arguments:
    helpers.go:163: [databricks stdout]: ['my_test_code', 'one', 'two']
    ...
Bundle remote directory is ***/.bundle/0ef67aaf-5960-4049-bf1d-dc9e29157421
Deleted snapshot file at /var/folders/nt/xjv68qzs45319w4k36dhpylc0000gp/T/TestAccPythonWheelTaskDeployAndRunWithoutWrapper2340216760/001/.databricks/bundle/default/sync-snapshots/edf0b322cee93b13.json
Successfully deleted files!
--- PASS: TestAccPythonWheelTaskDeployAndRunWithoutWrapper (192.36s)
PASS
coverage: 93.5% of statements in ./...
ok      github.com/databricks/cli/internal/bundle       195.130s        coverage: 93.5% of statements in ./...

```
This commit is contained in:
Andrew Nester 2023-09-26 16:32:20 +02:00 committed by GitHub
parent e1b5912f59
commit 0daa0022af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 226 additions and 23 deletions

View File

@ -2,6 +2,14 @@ package config
type Experimental struct { type Experimental struct {
Scripts map[ScriptHook]Command `json:"scripts,omitempty"` Scripts map[ScriptHook]Command `json:"scripts,omitempty"`
// By default Python wheel tasks deployed as is to Databricks platform.
// If notebook wrapper required (for example, used in DBR < 13.1 or other configuration differences), users can provide a following experimental setting
// experimental:
// python_wheel_wrapper: true
// In this case the configured wheel task will be deployed as a notebook task which install defined wheel in runtime and executes it.
// For more details see https://github.com/databricks/cli/pull/797 and https://github.com/databricks/cli/pull/635
PythonWheelWrapper bool `json:"python_wheel_wrapper,omitempty"`
} }
type Command string type Command string

View File

@ -0,0 +1,35 @@
package mutator
import (
"context"
"github.com/databricks/cli/bundle"
)
type ifMutator struct {
condition func(*bundle.Bundle) bool
onTrueMutator bundle.Mutator
onFalseMutator bundle.Mutator
}
func If(
condition func(*bundle.Bundle) bool,
onTrueMutator bundle.Mutator,
onFalseMutator bundle.Mutator,
) bundle.Mutator {
return &ifMutator{
condition, onTrueMutator, onFalseMutator,
}
}
func (m *ifMutator) Apply(ctx context.Context, b *bundle.Bundle) error {
if m.condition(b) {
return bundle.Apply(ctx, b, m.onTrueMutator)
} else {
return bundle.Apply(ctx, b, m.onFalseMutator)
}
}
func (m *ifMutator) Name() string {
return "If"
}

View File

@ -0,0 +1,21 @@
package mutator
import (
"context"
"github.com/databricks/cli/bundle"
)
type noop struct{}
func (*noop) Apply(context.Context, *bundle.Bundle) error {
return nil
}
func (*noop) Name() string {
return "NoOp"
}
func NoOp() bundle.Mutator {
return &noop{}
}

View File

@ -0,0 +1,114 @@
package python
import (
"context"
"path"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)
func TestNoTransformByDefault(t *testing.T) {
tmpDir := t.TempDir()
b := &bundle.Bundle{
Config: config.Root{
Path: tmpDir,
Bundle: config.Bundle{
Target: "development",
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
TaskKey: "key1",
PythonWheelTask: &jobs.PythonWheelTask{
PackageName: "test_package",
EntryPoint: "main",
},
Libraries: []compute.Library{
{Whl: "/Workspace/Users/test@test.com/bundle/dist/test.whl"},
},
},
},
},
},
},
},
},
}
trampoline := TransformWheelTask()
err := bundle.Apply(context.Background(), b, trampoline)
require.NoError(t, err)
task := b.Config.Resources.Jobs["job1"].Tasks[0]
require.NotNil(t, task.PythonWheelTask)
require.Equal(t, "test_package", task.PythonWheelTask.PackageName)
require.Equal(t, "main", task.PythonWheelTask.EntryPoint)
require.Equal(t, "/Workspace/Users/test@test.com/bundle/dist/test.whl", task.Libraries[0].Whl)
require.Nil(t, task.NotebookTask)
}
func TestTransformWithExperimentalSettingSetToTrue(t *testing.T) {
tmpDir := t.TempDir()
b := &bundle.Bundle{
Config: config.Root{
Path: tmpDir,
Bundle: config.Bundle{
Target: "development",
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
TaskKey: "key1",
PythonWheelTask: &jobs.PythonWheelTask{
PackageName: "test_package",
EntryPoint: "main",
},
Libraries: []compute.Library{
{Whl: "/Workspace/Users/test@test.com/bundle/dist/test.whl"},
},
},
},
},
},
},
},
Experimental: &config.Experimental{
PythonWheelWrapper: true,
},
},
}
trampoline := TransformWheelTask()
err := bundle.Apply(context.Background(), b, trampoline)
require.NoError(t, err)
task := b.Config.Resources.Jobs["job1"].Tasks[0]
require.Nil(t, task.PythonWheelTask)
require.NotNil(t, task.NotebookTask)
dir, err := b.InternalDir(context.Background())
require.NoError(t, err)
internalDirRel, err := filepath.Rel(b.Config.Path, dir)
require.NoError(t, err)
require.Equal(t, path.Join(filepath.ToSlash(internalDirRel), "notebook_job1_key1"), task.NotebookTask.NotebookPath)
require.Empty(t, task.Libraries)
}

View File

@ -49,10 +49,16 @@ dbutils.notebook.exit(s)
// which installs uploaded wheels using %pip and then calling corresponding // which installs uploaded wheels using %pip and then calling corresponding
// entry point. // entry point.
func TransformWheelTask() bundle.Mutator { func TransformWheelTask() bundle.Mutator {
return mutator.NewTrampoline( return mutator.If(
"python_wheel", func(b *bundle.Bundle) bool {
&pythonTrampoline{}, return b.Config.Experimental != nil && b.Config.Experimental.PythonWheelWrapper
NOTEBOOK_TEMPLATE, },
mutator.NewTrampoline(
"python_wheel",
&pythonTrampoline{},
NOTEBOOK_TEMPLATE,
),
mutator.NoOp(),
) )
} }
@ -113,7 +119,7 @@ func (t *pythonTrampoline) generateParameters(task *jobs.PythonWheelTask) (strin
if task.Parameters != nil && task.NamedParameters != nil { if task.Parameters != nil && task.NamedParameters != nil {
return "", fmt.Errorf("not allowed to pass both paramaters and named_parameters") return "", fmt.Errorf("not allowed to pass both paramaters and named_parameters")
} }
params := append([]string{"python"}, task.Parameters...) params := append([]string{task.PackageName}, task.Parameters...)
for k, v := range task.NamedParameters { for k, v := range task.NamedParameters {
params = append(params, fmt.Sprintf("%s=%s", k, v)) params = append(params, fmt.Sprintf("%s=%s", k, v))
} }

View File

@ -25,26 +25,26 @@ type testCaseNamed struct {
} }
var paramsTestCases []testCase = []testCase{ var paramsTestCases []testCase = []testCase{
{[]string{}, `"python"`}, {[]string{}, `"my_test_code"`},
{[]string{"a"}, `"python", "a"`}, {[]string{"a"}, `"my_test_code", "a"`},
{[]string{"a", "b"}, `"python", "a", "b"`}, {[]string{"a", "b"}, `"my_test_code", "a", "b"`},
{[]string{"123!@#$%^&*()-="}, `"python", "123!@#$%^&*()-="`}, {[]string{"123!@#$%^&*()-="}, `"my_test_code", "123!@#$%^&*()-="`},
{[]string{`{"a": 1}`}, `"python", "{\"a\": 1}"`}, {[]string{`{"a": 1}`}, `"my_test_code", "{\"a\": 1}"`},
} }
var paramsTestCasesNamed []testCaseNamed = []testCaseNamed{ var paramsTestCasesNamed []testCaseNamed = []testCaseNamed{
{map[string]string{}, `"python"`}, {map[string]string{}, `"my_test_code"`},
{map[string]string{"a": "1"}, `"python", "a=1"`}, {map[string]string{"a": "1"}, `"my_test_code", "a=1"`},
{map[string]string{"a": "'1'"}, `"python", "a='1'"`}, {map[string]string{"a": "'1'"}, `"my_test_code", "a='1'"`},
{map[string]string{"a": `"1"`}, `"python", "a=\"1\""`}, {map[string]string{"a": `"1"`}, `"my_test_code", "a=\"1\""`},
{map[string]string{"a": "1", "b": "2"}, `"python", "a=1", "b=2"`}, {map[string]string{"a": "1", "b": "2"}, `"my_test_code", "a=1", "b=2"`},
{map[string]string{"data": `{"a": 1}`}, `"python", "data={\"a\": 1}"`}, {map[string]string{"data": `{"a": 1}`}, `"my_test_code", "data={\"a\": 1}"`},
} }
func TestGenerateParameters(t *testing.T) { func TestGenerateParameters(t *testing.T) {
trampoline := pythonTrampoline{} trampoline := pythonTrampoline{}
for _, c := range paramsTestCases { for _, c := range paramsTestCases {
task := &jobs.PythonWheelTask{Parameters: c.Actual} task := &jobs.PythonWheelTask{PackageName: "my_test_code", Parameters: c.Actual}
result, err := trampoline.generateParameters(task) result, err := trampoline.generateParameters(task)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, c.Expected, result) require.Equal(t, c.Expected, result)
@ -54,7 +54,7 @@ func TestGenerateParameters(t *testing.T) {
func TestGenerateNamedParameters(t *testing.T) { func TestGenerateNamedParameters(t *testing.T) {
trampoline := pythonTrampoline{} trampoline := pythonTrampoline{}
for _, c := range paramsTestCasesNamed { for _, c := range paramsTestCasesNamed {
task := &jobs.PythonWheelTask{NamedParameters: c.Actual} task := &jobs.PythonWheelTask{PackageName: "my_test_code", NamedParameters: c.Actual}
result, err := trampoline.generateParameters(task) result, err := trampoline.generateParameters(task)
require.NoError(t, err) require.NoError(t, err)

View File

@ -16,6 +16,10 @@
"unique_id": { "unique_id": {
"type": "string", "type": "string",
"description": "Unique ID for job name" "description": "Unique ID for job name"
},
"python_wheel_wrapper": {
"type": "boolean",
"description": "Whether or not to enable python wheel wrapper"
} }
} }
} }

View File

@ -4,6 +4,11 @@ bundle:
workspace: workspace:
root_path: "~/.bundle/{{.unique_id}}" root_path: "~/.bundle/{{.unique_id}}"
{{if .python_wheel_wrapper}}
experimental:
python_wheel_wrapper: true
{{end}}
resources: resources:
jobs: jobs:
some_other_job: some_other_job:
@ -14,6 +19,7 @@ resources:
num_workers: 1 num_workers: 1
spark_version: "{{.spark_version}}" spark_version: "{{.spark_version}}"
node_type_id: "{{.node_type_id}}" node_type_id: "{{.node_type_id}}"
data_security_mode: USER_ISOLATION
python_wheel_task: python_wheel_task:
package_name: my_test_code package_name: my_test_code
entry_point: run entry_point: run

View File

@ -8,7 +8,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestAccPythonWheelTaskDeployAndRun(t *testing.T) { func runPythonWheelTest(t *testing.T, pythonWheelWrapper bool) {
env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV") env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
t.Log(env) t.Log(env)
@ -22,9 +22,10 @@ func TestAccPythonWheelTaskDeployAndRun(t *testing.T) {
} }
bundleRoot, err := initTestTemplate(t, "python_wheel_task", map[string]any{ bundleRoot, err := initTestTemplate(t, "python_wheel_task", map[string]any{
"node_type_id": nodeTypeId, "node_type_id": nodeTypeId,
"unique_id": uuid.New().String(), "unique_id": uuid.New().String(),
"spark_version": "13.2.x-snapshot-scala2.12", "spark_version": "13.2.x-snapshot-scala2.12",
"python_wheel_wrapper": pythonWheelWrapper,
}) })
require.NoError(t, err) require.NoError(t, err)
@ -39,5 +40,13 @@ func TestAccPythonWheelTaskDeployAndRun(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Contains(t, out, "Hello from my func") require.Contains(t, out, "Hello from my func")
require.Contains(t, out, "Got arguments:") require.Contains(t, out, "Got arguments:")
require.Contains(t, out, "['python', 'one', 'two']") require.Contains(t, out, "['my_test_code', 'one', 'two']")
}
func TestAccPythonWheelTaskDeployAndRunWithoutWrapper(t *testing.T) {
runPythonWheelTest(t, false)
}
func TestAccPythonWheelTaskDeployAndRunWithWrapper(t *testing.T) {
runPythonWheelTest(t, true)
} }