mirror of https://github.com/databricks/cli.git
Pass parameters to task when run with `--python-params` and `python_wheel_wrapper` is true (#1037)
## Changes It makes the behaviour consistent with or without `python_wheel_wrapper` on when job is run with `--python-params` flag. In `python_wheel_wrapper` mode it converts dynamic `python_params` in a dynamic specially named `notebook_param` and the wrapper reads them with `dbutils` and pass to `sys.argv` Fixes #1000 ## Tests Added an integration test. Integration tests pass.
This commit is contained in:
parent
76840176e3
commit
83d50001fc
|
@ -31,8 +31,21 @@ except ImportError: # for Python<3.8
|
||||||
from contextlib import redirect_stdout
|
from contextlib import redirect_stdout
|
||||||
import io
|
import io
|
||||||
import sys
|
import sys
|
||||||
|
import json
|
||||||
|
|
||||||
|
params = []
|
||||||
|
try:
|
||||||
|
python_params = dbutils.widgets.get("__python_params")
|
||||||
|
if python_params:
|
||||||
|
params = json.loads(python_params)
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
|
||||||
sys.argv = [{{.Params}}]
|
sys.argv = [{{.Params}}]
|
||||||
|
|
||||||
|
if params:
|
||||||
|
sys.argv = [sys.argv[0]] + params
|
||||||
|
|
||||||
entry = [ep for ep in metadata.distribution("{{.Task.PackageName}}").entry_points if ep.name == "{{.Task.EntryPoint}}"]
|
entry = [ep for ep in metadata.distribution("{{.Task.PackageName}}").entry_points if ep.name == "{{.Task.EntryPoint}}"]
|
||||||
|
|
||||||
f = io.StringIO()
|
f = io.StringIO()
|
||||||
|
|
|
@ -2,6 +2,7 @@ package run
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
@ -221,6 +222,11 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
|
||||||
|
|
||||||
runId := new(int64)
|
runId := new(int64)
|
||||||
|
|
||||||
|
err = r.convertPythonParams(opts)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
// construct request payload from cmd line flags args
|
// construct request payload from cmd line flags args
|
||||||
req, err := opts.Job.toPayload(jobID)
|
req, err := opts.Job.toPayload(jobID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -299,3 +305,42 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
|
||||||
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *jobRunner) convertPythonParams(opts *Options) error {
|
||||||
|
if r.bundle.Config.Experimental != nil && !r.bundle.Config.Experimental.PythonWheelWrapper {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
needConvert := false
|
||||||
|
for _, task := range r.job.Tasks {
|
||||||
|
if task.PythonWheelTask != nil {
|
||||||
|
needConvert = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !needConvert {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(opts.Job.pythonParams) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.Job.notebookParams == nil {
|
||||||
|
opts.Job.notebookParams = make(map[string]string)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(opts.Job.pythonParams) > 0 {
|
||||||
|
if _, ok := opts.Job.notebookParams["__python_params"]; ok {
|
||||||
|
return fmt.Errorf("can't use __python_params as notebook param, the name is reserved for internal use")
|
||||||
|
}
|
||||||
|
p, err := json.Marshal(opts.Job.pythonParams)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
opts.Job.notebookParams["__python_params"] = string(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
package run
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/bundle"
|
||||||
|
"github.com/databricks/cli/bundle/config"
|
||||||
|
"github.com/databricks/cli/bundle/config/resources"
|
||||||
|
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestConvertPythonParams(t *testing.T) {
|
||||||
|
job := &resources.Job{
|
||||||
|
JobSettings: &jobs.JobSettings{
|
||||||
|
Tasks: []jobs.Task{
|
||||||
|
{PythonWheelTask: &jobs.PythonWheelTask{
|
||||||
|
PackageName: "my_test_code",
|
||||||
|
EntryPoint: "run",
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
b := &bundle.Bundle{
|
||||||
|
Config: config.Root{
|
||||||
|
Resources: config.Resources{
|
||||||
|
Jobs: map[string]*resources.Job{
|
||||||
|
"test_job": job,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
runner := jobRunner{key: "test", bundle: b, job: job}
|
||||||
|
|
||||||
|
opts := &Options{
|
||||||
|
Job: JobOptions{},
|
||||||
|
}
|
||||||
|
runner.convertPythonParams(opts)
|
||||||
|
require.NotContains(t, opts.Job.notebookParams, "__python_params")
|
||||||
|
|
||||||
|
opts = &Options{
|
||||||
|
Job: JobOptions{
|
||||||
|
pythonParams: []string{"param1", "param2", "param3"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
runner.convertPythonParams(opts)
|
||||||
|
require.Contains(t, opts.Job.notebookParams, "__python_params")
|
||||||
|
require.Equal(t, opts.Job.notebookParams["__python_params"], `["param1","param2","param3"]`)
|
||||||
|
}
|
|
@ -62,6 +62,18 @@ func runResource(t *testing.T, path string, key string) (string, error) {
|
||||||
return stdout.String(), err
|
return stdout.String(), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runResourceWithParams(t *testing.T, path string, key string, params ...string) (string, error) {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx = cmdio.NewContext(ctx, cmdio.Default())
|
||||||
|
|
||||||
|
args := make([]string, 0)
|
||||||
|
args = append(args, "bundle", "run", key)
|
||||||
|
args = append(args, params...)
|
||||||
|
c := internal.NewCobraTestRunnerWithContext(t, ctx, args...)
|
||||||
|
stdout, _, err := c.Run()
|
||||||
|
return stdout.String(), err
|
||||||
|
}
|
||||||
|
|
||||||
func destroyBundle(t *testing.T, path string) error {
|
func destroyBundle(t *testing.T, path string) error {
|
||||||
t.Setenv("BUNDLE_ROOT", path)
|
t.Setenv("BUNDLE_ROOT", path)
|
||||||
c := internal.NewCobraTestRunner(t, "bundle", "destroy", "--auto-approve")
|
c := internal.NewCobraTestRunner(t, "bundle", "destroy", "--auto-approve")
|
||||||
|
|
|
@ -41,6 +41,12 @@ func runPythonWheelTest(t *testing.T, sparkVersion string, pythonWheelWrapper bo
|
||||||
require.Contains(t, out, "Hello from my func")
|
require.Contains(t, out, "Hello from my func")
|
||||||
require.Contains(t, out, "Got arguments:")
|
require.Contains(t, out, "Got arguments:")
|
||||||
require.Contains(t, out, "['my_test_code', 'one', 'two']")
|
require.Contains(t, out, "['my_test_code', 'one', 'two']")
|
||||||
|
|
||||||
|
out, err = runResourceWithParams(t, bundleRoot, "some_other_job", "--python-params=param1,param2")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, out, "Hello from my func")
|
||||||
|
require.Contains(t, out, "Got arguments:")
|
||||||
|
require.Contains(t, out, "['my_test_code', 'param1', 'param2']")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAccPythonWheelTaskDeployAndRunWithoutWrapper(t *testing.T) {
|
func TestAccPythonWheelTaskDeployAndRunWithoutWrapper(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue