Processing and completion of positional args to bundle run (#1120)

## Changes

With this change, both job parameters and task parameters can be
specified as positional arguments to bundle run. How the positional
arguments are interpreted depends on the configuration of the job.

### Examples:

For a job that has job parameters configured a user can specify:

```
databricks bundle run my_job -- --param1=value1 --param2=value2
```

And the run is kicked off with job parameters set to:
```json
{
  "param1": "value1",
  "param2": "value2"
}
```

Similarly, for a job that doesn't use job parameters and only has
`notebook_task` tasks, a user can specify:

```
databricks bundle run my_notebook_job -- --param1=value1 --param2=value2
```

And the run is kicked off with task level `notebook_params` configured
as:
```json
{
  "param1": "value1",
  "param2": "value2"
}
```

For a job that doesn't doesn't use job parameters and only has either
`spark_python_task` or `python_wheel_task` tasks, a user can specify:

```
databricks bundle run my_python_file_job -- --flag=value other arguments
```

And the run is kicked off with task level `python_params` configured as:
```json
[
  "--flag=value",
  "other",
  "arguments"
]
```

The same is applied to jobs with only `spark_jar_task` or
`spark_submit_task` tasks.

## Tests

Unit tests. Tested the completions manually.
This commit is contained in:
Pieter Noordhuis 2024-04-22 13:50:13 +02:00 committed by GitHub
parent 1872aa12b3
commit 3108883a8f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 730 additions and 8 deletions

127
bundle/run/args.go Normal file
View File

@ -0,0 +1,127 @@
package run
import (
"fmt"
"strings"
"github.com/spf13/cobra"
)
// argsHandler defines the (unexported) interface for the runners in this
// package to implement to handle context-specific positional arguments.
//
// For jobs, this means:
// - If a job uses job parameters: parse positional arguments into key-value pairs
// and pass them as job parameters.
// - If a job does not use job parameters AND only has Spark Python tasks:
// pass through the positional arguments as a list of Python parameters.
// - If a job does not use job parameters AND only has notebook tasks:
// parse arguments into key-value pairs and pass them as notebook parameters.
// - ...
//
// In all cases, we may be able to provide context-aware argument completions.
type argsHandler interface {
// Parse additional positional arguments.
ParseArgs(args []string, opts *Options) error
// Complete additional positional arguments.
CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective)
}
// nopArgsHandler is a no-op implementation of [argsHandler].
// It returns an error if any positional arguments are present and doesn't complete anything.
type nopArgsHandler struct{}
func (nopArgsHandler) ParseArgs(args []string, opts *Options) error {
if len(args) == 0 {
return nil
}
return fmt.Errorf("received %d unexpected positional arguments", len(args))
}
func (nopArgsHandler) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return nil, cobra.ShellCompDirectiveNoFileComp
}
// argsToKeyValueMap parses key-value pairs from the specified arguments.
//
// It accepts these formats:
// - `--key=value`
// - `--key`, `value`
//
// Remaining arguments are returned as-is.
func argsToKeyValueMap(args []string) (map[string]string, []string) {
kv := make(map[string]string)
key := ""
tail := args
for i, arg := range args {
// If key is set; use the next argument as value.
if key != "" {
kv[key] = arg
key = ""
tail = args[i+1:]
continue
}
if strings.HasPrefix(arg, "--") {
parts := strings.SplitN(arg[2:], "=", 2)
if len(parts) == 2 {
kv[parts[0]] = parts[1]
tail = args[i+1:]
continue
}
// Use this argument as key, the next as value.
key = parts[0]
continue
}
// If we cannot interpret it; return here.
break
}
return kv, tail
}
// genericParseKeyValueArgs parses key-value pairs from the specified arguments.
// If there are any positional arguments left, it returns an error.
func genericParseKeyValueArgs(args []string) (map[string]string, error) {
kv, args := argsToKeyValueMap(args)
if len(args) > 0 {
return nil, fmt.Errorf("received %d unexpected positional arguments", len(args))
}
return kv, nil
}
// genericCompleteKeyValueArgs completes key-value pairs from the specified arguments.
// Completion options that are already specified are skipped.
func genericCompleteKeyValueArgs(args []string, toComplete string, options []string) ([]string, cobra.ShellCompDirective) {
// If the string to complete contains an equals sign, then we are
// completing the value part (which we don't know here).
if strings.Contains(toComplete, "=") {
return nil, cobra.ShellCompDirectiveNoFileComp
}
// Remove already completed key/value pairs.
kv, args := argsToKeyValueMap(args)
// If the list of remaining args is empty, return possible completions.
if len(args) == 0 {
var completions []string
for _, option := range options {
// Skip options that have already been specified.
if _, ok := kv[option]; ok {
continue
}
completions = append(completions, fmt.Sprintf("--%s=", option))
}
// Note: we include cobra.ShellCompDirectiveNoSpace to suggest including
// the value part right after the equals sign.
return completions, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveNoSpace
}
return nil, cobra.ShellCompDirectiveNoFileComp
}

134
bundle/run/args_test.go Normal file
View File

@ -0,0 +1,134 @@
package run
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestNopArgsHandler(t *testing.T) {
h := nopArgsHandler{}
opts := &Options{}
// No error if no positional arguments are passed.
err := h.ParseArgs([]string{}, opts)
assert.NoError(t, err)
// Error if any positional arguments are passed.
err = h.ParseArgs([]string{"foo"}, opts)
assert.EqualError(t, err, "received 1 unexpected positional arguments")
// No completions.
completions, _ := h.CompleteArgs([]string{}, "")
assert.Nil(t, completions)
}
func TestArgsToKeyValueMap(t *testing.T) {
for _, tc := range []struct {
input []string
expected map[string]string
tail []string
err error
}{
{
input: []string{},
expected: map[string]string{},
tail: []string{},
},
{
input: []string{"--foo=bar", "--baz", "qux"},
expected: map[string]string{
"foo": "bar",
"baz": "qux",
},
tail: []string{},
},
{
input: []string{"--foo=bar", "--baz", "qux", "tail"},
expected: map[string]string{
"foo": "bar",
"baz": "qux",
},
tail: []string{"tail"},
},
{
input: []string{"--foo=bar", "--baz", "qux", "tail", "--foo=bar"},
expected: map[string]string{
"foo": "bar",
"baz": "qux",
},
tail: []string{"tail", "--foo=bar"},
},
{
input: []string{"--foo=bar", "--baz=qux"},
expected: map[string]string{
"foo": "bar",
"baz": "qux",
},
tail: []string{},
},
{
input: []string{"--foo=bar", "--baz=--qux"},
expected: map[string]string{
"foo": "bar",
"baz": "--qux",
},
tail: []string{},
},
{
input: []string{"--foo=bar", "--baz="},
expected: map[string]string{
"foo": "bar",
"baz": "",
},
tail: []string{},
},
{
input: []string{"--foo=bar", "--baz"},
expected: map[string]string{
"foo": "bar",
},
tail: []string{"--baz"},
},
} {
actual, tail := argsToKeyValueMap(tc.input)
assert.Equal(t, tc.expected, actual)
assert.Equal(t, tc.tail, tail)
}
}
func TestGenericParseKeyValueArgs(t *testing.T) {
kv, err := genericParseKeyValueArgs([]string{"--foo=bar", "--baz", "qux"})
assert.NoError(t, err)
assert.Equal(t, map[string]string{
"foo": "bar",
"baz": "qux",
}, kv)
_, err = genericParseKeyValueArgs([]string{"--foo=bar", "--baz", "qux", "tail"})
assert.EqualError(t, err, "received 1 unexpected positional arguments")
}
func TestGenericCompleteKeyValueArgs(t *testing.T) {
var completions []string
// Complete nothing if there are no options.
completions, _ = genericCompleteKeyValueArgs([]string{}, ``, []string{})
assert.Empty(t, completions)
// Complete nothing if we're in the middle of a key-value pair (as single argument with equals sign).
completions, _ = genericCompleteKeyValueArgs([]string{}, `--foo=`, []string{`foo`, `bar`})
assert.Empty(t, completions)
// Complete nothing if we're in the middle of a key-value pair (as two arguments).
completions, _ = genericCompleteKeyValueArgs([]string{`--foo`}, ``, []string{`foo`, `bar`})
assert.Empty(t, completions)
// Complete if we're at the beginning.
completions, _ = genericCompleteKeyValueArgs([]string{}, ``, []string{`foo`, `bar`})
assert.Equal(t, []string{`--foo=`, `--bar=`}, completions)
// Complete if we have already one key-value pair.
completions, _ = genericCompleteKeyValueArgs([]string{`--foo=bar`}, ``, []string{`foo`, `bar`})
assert.Equal(t, []string{`--bar=`}, completions)
}

View File

@ -15,6 +15,7 @@ import (
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/fatih/color"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)
@ -315,3 +316,11 @@ func (r *jobRunner) Cancel(ctx context.Context) error {
return errGroup.Wait()
}
func (r *jobRunner) ParseArgs(args []string, opts *Options) error {
return r.posArgsHandler().ParseArgs(args, opts)
}
func (r *jobRunner) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return r.posArgsHandler().CompleteArgs(args, toComplete)
}

184
bundle/run/job_args.go Normal file
View File

@ -0,0 +1,184 @@
package run
import (
"github.com/databricks/cli/bundle/config/resources"
"github.com/spf13/cobra"
"golang.org/x/exp/maps"
)
type jobParameterArgs struct {
*resources.Job
}
func (a jobParameterArgs) ParseArgs(args []string, opts *Options) error {
kv, err := genericParseKeyValueArgs(args)
if err != nil {
return err
}
// Merge the key-value pairs from the args into the options struct.
if opts.Job.jobParams == nil {
opts.Job.jobParams = kv
} else {
for k, v := range kv {
opts.Job.jobParams[k] = v
}
}
return nil
}
func (a jobParameterArgs) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
var completions []string
for _, param := range a.Parameters {
completions = append(completions, param.Name)
}
return genericCompleteKeyValueArgs(args, toComplete, completions)
}
type jobTaskNotebookParamArgs struct {
*resources.Job
}
func (a jobTaskNotebookParamArgs) ParseArgs(args []string, opts *Options) error {
kv, err := genericParseKeyValueArgs(args)
if err != nil {
return err
}
// Merge the key-value pairs from the args into the options struct.
if opts.Job.notebookParams == nil {
opts.Job.notebookParams = kv
} else {
for k, v := range kv {
opts.Job.notebookParams[k] = v
}
}
return nil
}
func (a jobTaskNotebookParamArgs) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
parameters := make(map[string]string)
for _, t := range a.Tasks {
if nt := t.NotebookTask; nt != nil {
maps.Copy(parameters, nt.BaseParameters)
}
}
return genericCompleteKeyValueArgs(args, toComplete, maps.Keys(parameters))
}
type jobTaskJarParamArgs struct {
*resources.Job
}
func (a jobTaskJarParamArgs) ParseArgs(args []string, opts *Options) error {
opts.Job.jarParams = append(opts.Job.jarParams, args...)
return nil
}
func (a jobTaskJarParamArgs) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return nil, cobra.ShellCompDirectiveNoFileComp
}
type jobTaskPythonParamArgs struct {
*resources.Job
}
func (a jobTaskPythonParamArgs) ParseArgs(args []string, opts *Options) error {
opts.Job.pythonParams = append(opts.Job.pythonParams, args...)
return nil
}
func (a jobTaskPythonParamArgs) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return nil, cobra.ShellCompDirectiveNoFileComp
}
type jobTaskSparkSubmitParamArgs struct {
*resources.Job
}
func (a jobTaskSparkSubmitParamArgs) ParseArgs(args []string, opts *Options) error {
opts.Job.sparkSubmitParams = append(opts.Job.sparkSubmitParams, args...)
return nil
}
func (a jobTaskSparkSubmitParamArgs) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return nil, cobra.ShellCompDirectiveNoFileComp
}
type jobTaskType int
const (
jobTaskTypeNotebook jobTaskType = iota + 1
jobTaskTypeSparkJar
jobTaskTypeSparkPython
jobTaskTypeSparkSubmit
jobTaskTypePipeline
jobTaskTypePythonWheel
jobTaskTypeSql
jobTaskTypeDbt
jobTaskTypeRunJob
)
func (r *jobRunner) posArgsHandler() argsHandler {
job := r.job
if job == nil || job.JobSettings == nil {
return nopArgsHandler{}
}
// Handle job parameters, if any are defined.
if len(job.Parameters) > 0 {
return &jobParameterArgs{job}
}
// Handle task parameters otherwise.
var seen = make(map[jobTaskType]bool)
for _, t := range job.Tasks {
if t.NotebookTask != nil {
seen[jobTaskTypeNotebook] = true
}
if t.SparkJarTask != nil {
seen[jobTaskTypeSparkJar] = true
}
if t.SparkPythonTask != nil {
seen[jobTaskTypeSparkPython] = true
}
if t.SparkSubmitTask != nil {
seen[jobTaskTypeSparkSubmit] = true
}
if t.PipelineTask != nil {
seen[jobTaskTypePipeline] = true
}
if t.PythonWheelTask != nil {
seen[jobTaskTypePythonWheel] = true
}
if t.SqlTask != nil {
seen[jobTaskTypeSql] = true
}
if t.DbtTask != nil {
seen[jobTaskTypeDbt] = true
}
if t.RunJobTask != nil {
seen[jobTaskTypeRunJob] = true
}
}
// Cannot handle positional arguments if we have more than one task type.
keys := maps.Keys(seen)
if len(keys) != 1 {
return nopArgsHandler{}
}
switch keys[0] {
case jobTaskTypeNotebook:
return jobTaskNotebookParamArgs{job}
case jobTaskTypeSparkJar:
return jobTaskJarParamArgs{job}
case jobTaskTypeSparkPython, jobTaskTypePythonWheel:
return jobTaskPythonParamArgs{job}
case jobTaskTypeSparkSubmit:
return jobTaskSparkSubmitParamArgs{job}
default:
// No positional argument handling for other task types.
return nopArgsHandler{}
}
}

223
bundle/run/job_args_test.go Normal file
View File

@ -0,0 +1,223 @@
package run
import (
"testing"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
)
func TestJobParameterArgs(t *testing.T) {
a := jobParameterArgs{
&resources.Job{
JobSettings: &jobs.JobSettings{
Parameters: []jobs.JobParameterDefinition{
{
Name: "foo",
Default: "value",
},
{
Name: "bar",
Default: "value",
},
},
},
},
}
t.Run("ParseArgsError", func(t *testing.T) {
var opts Options
err := a.ParseArgs([]string{"--p1=v1", "superfluous"}, &opts)
assert.ErrorContains(t, err, "unexpected positional arguments")
})
t.Run("ParseArgs", func(t *testing.T) {
var opts Options
err := a.ParseArgs([]string{"--p1=v1", "--p2=v2"}, &opts)
assert.NoError(t, err)
assert.Equal(
t,
map[string]string{
"p1": "v1",
"p2": "v2",
},
opts.Job.jobParams,
)
})
t.Run("ParseArgsAppend", func(t *testing.T) {
var opts Options
opts.Job.jobParams = map[string]string{"p1": "v1"}
err := a.ParseArgs([]string{"--p2=v2"}, &opts)
assert.NoError(t, err)
assert.Equal(
t,
map[string]string{
"p1": "v1",
"p2": "v2",
},
opts.Job.jobParams,
)
})
t.Run("CompleteArgs", func(t *testing.T) {
completions, _ := a.CompleteArgs([]string{}, "")
assert.Equal(t, []string{"--foo=", "--bar="}, completions)
})
}
func TestJobTaskNotebookParamArgs(t *testing.T) {
a := jobTaskNotebookParamArgs{
&resources.Job{
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
NotebookTask: &jobs.NotebookTask{
BaseParameters: map[string]string{
"foo": "value",
"bar": "value",
},
},
},
},
},
},
}
t.Run("ParseArgsError", func(t *testing.T) {
var opts Options
err := a.ParseArgs([]string{"--p1=v1", "superfluous"}, &opts)
assert.ErrorContains(t, err, "unexpected positional arguments")
})
t.Run("ParseArgs", func(t *testing.T) {
var opts Options
err := a.ParseArgs([]string{"--p1=v1", "--p2=v2"}, &opts)
assert.NoError(t, err)
assert.Equal(
t,
map[string]string{
"p1": "v1",
"p2": "v2",
},
opts.Job.notebookParams,
)
})
t.Run("ParseArgsAppend", func(t *testing.T) {
var opts Options
opts.Job.notebookParams = map[string]string{"p1": "v1"}
err := a.ParseArgs([]string{"--p2=v2"}, &opts)
assert.NoError(t, err)
assert.Equal(
t,
map[string]string{
"p1": "v1",
"p2": "v2",
},
opts.Job.notebookParams,
)
})
t.Run("CompleteArgs", func(t *testing.T) {
completions, _ := a.CompleteArgs([]string{}, "")
assert.ElementsMatch(t, []string{"--foo=", "--bar="}, completions)
})
}
func TestJobTaskJarParamArgs(t *testing.T) {
a := jobTaskJarParamArgs{}
t.Run("ParseArgs", func(t *testing.T) {
var opts Options
err := a.ParseArgs([]string{"foo", "bar"}, &opts)
assert.NoError(t, err)
assert.Equal(
t,
[]string{"foo", "bar"},
opts.Job.jarParams,
)
})
t.Run("ParseArgsAppend", func(t *testing.T) {
var opts Options
opts.Job.jarParams = []string{"foo"}
err := a.ParseArgs([]string{"bar"}, &opts)
assert.NoError(t, err)
assert.Equal(
t,
[]string{"foo", "bar"},
opts.Job.jarParams,
)
})
t.Run("CompleteArgs", func(t *testing.T) {
completions, _ := a.CompleteArgs([]string{}, "")
assert.Empty(t, completions)
})
}
func TestJobTaskPythonParamArgs(t *testing.T) {
a := jobTaskPythonParamArgs{}
t.Run("ParseArgs", func(t *testing.T) {
var opts Options
err := a.ParseArgs([]string{"foo", "bar"}, &opts)
assert.NoError(t, err)
assert.Equal(
t,
[]string{"foo", "bar"},
opts.Job.pythonParams,
)
})
t.Run("ParseArgsAppend", func(t *testing.T) {
var opts Options
opts.Job.pythonParams = []string{"foo"}
err := a.ParseArgs([]string{"bar"}, &opts)
assert.NoError(t, err)
assert.Equal(
t,
[]string{"foo", "bar"},
opts.Job.pythonParams,
)
})
t.Run("CompleteArgs", func(t *testing.T) {
completions, _ := a.CompleteArgs([]string{}, "")
assert.Empty(t, completions)
})
}
func TestJobTaskSparkSubmitParamArgs(t *testing.T) {
a := jobTaskSparkSubmitParamArgs{}
t.Run("ParseArgs", func(t *testing.T) {
var opts Options
err := a.ParseArgs([]string{"foo", "bar"}, &opts)
assert.NoError(t, err)
assert.Equal(
t,
[]string{"foo", "bar"},
opts.Job.sparkSubmitParams,
)
})
t.Run("ParseArgsAppend", func(t *testing.T) {
var opts Options
opts.Job.sparkSubmitParams = []string{"foo"}
err := a.ParseArgs([]string{"bar"}, &opts)
assert.NoError(t, err)
assert.Equal(
t,
[]string{"foo", "bar"},
opts.Job.sparkSubmitParams,
)
})
t.Run("CompleteArgs", func(t *testing.T) {
completions, _ := a.CompleteArgs([]string{}, "")
assert.Empty(t, completions)
})
}

View File

@ -12,6 +12,7 @@ import (
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/spf13/cobra"
)
func filterEventsByUpdateId(events []pipelines.PipelineEvent, updateId string) []pipelines.PipelineEvent {
@ -181,3 +182,15 @@ func (r *pipelineRunner) Cancel(ctx context.Context) error {
_, err = wait.GetWithTimeout(jobRunTimeout)
return err
}
func (r *pipelineRunner) ParseArgs(args []string, opts *Options) error {
if len(args) == 0 {
return nil
}
return fmt.Errorf("received %d unexpected positional arguments", len(args))
}
func (r *pipelineRunner) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return nil, cobra.ShellCompDirectiveNoFileComp
}

View File

@ -29,6 +29,9 @@ type Runner interface {
// Cancel the underlying workflow.
Cancel(ctx context.Context) error
// Runners support parsing and completion of additional positional arguments.
argsHandler
}
// Find locates a runner matching the specified argument.

View File

@ -18,8 +18,26 @@ import (
func newRunCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "run [flags] KEY",
Short: "Run a resource (e.g. a job or a pipeline)",
Args: root.MaximumNArgs(1),
Short: "Run a job or pipeline update",
Long: `Run the job or pipeline identified by KEY.
The KEY is the unique identifier of the resource to run. In addition to
customizing the run using any of the available flags, you can also specify
keyword or positional arguments as shown in these examples:
databricks bundle run my_job -- --key1 value1 --key2 value2
Or:
databricks bundle run my_job -- value1 value2 value3
If the specified job uses job parameters or the job has a notebook task with
parameters, the first example applies and flag names are mapped to the
parameter names.
If the specified job does not use job parameters and the job has a Python file
task or a Python wheel task, the second example applies.
`,
}
var runOptions run.Options
@ -62,7 +80,7 @@ func newRunCommand() *cobra.Command {
args = append(args, id)
}
if len(args) != 1 {
if len(args) < 1 {
return fmt.Errorf("expected a KEY of the resource to run")
}
@ -71,6 +89,12 @@ func newRunCommand() *cobra.Command {
return err
}
// Parse additional positional arguments.
err = runner.ParseArgs(args[1:], &runOptions)
if err != nil {
return err
}
runOptions.NoWait = noWait
if restart {
s := cmdio.Spinner(ctx)
@ -107,10 +131,6 @@ func newRunCommand() *cobra.Command {
}
cmd.ValidArgsFunction = func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
if len(args) > 0 {
return nil, cobra.ShellCompDirectiveNoFileComp
}
b, diags := root.MustConfigureBundle(cmd)
if err := diags.Error(); err != nil {
cobra.CompErrorln(err.Error())
@ -123,7 +143,16 @@ func newRunCommand() *cobra.Command {
return nil, cobra.ShellCompDirectiveNoFileComp
}
return run.ResourceCompletions(b), cobra.ShellCompDirectiveNoFileComp
if len(args) == 0 {
return run.ResourceCompletions(b), cobra.ShellCompDirectiveNoFileComp
} else {
// If we know the resource to run, we can complete additional positional arguments.
runner, err := run.Find(b, args[0])
if err != nil {
return nil, cobra.ShellCompDirectiveError
}
return runner.CompleteArgs(args[1:], toComplete)
}
}
return cmd