diff --git a/bundle/run/args.go b/bundle/run/args.go new file mode 100644 index 00000000..2885cda0 --- /dev/null +++ b/bundle/run/args.go @@ -0,0 +1,127 @@ +package run + +import ( + "fmt" + "strings" + + "github.com/spf13/cobra" +) + +// argsHandler defines the (unexported) interface for the runners in this +// package to implement to handle context-specific positional arguments. +// +// For jobs, this means: +// - If a job uses job parameters: parse positional arguments into key-value pairs +// and pass them as job parameters. +// - If a job does not use job parameters AND only has Spark Python tasks: +// pass through the positional arguments as a list of Python parameters. +// - If a job does not use job parameters AND only has notebook tasks: +// parse arguments into key-value pairs and pass them as notebook parameters. +// - ... +// +// In all cases, we may be able to provide context-aware argument completions. +type argsHandler interface { + // Parse additional positional arguments. + ParseArgs(args []string, opts *Options) error + + // Complete additional positional arguments. + CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) +} + +// nopArgsHandler is a no-op implementation of [argsHandler]. +// It returns an error if any positional arguments are present and doesn't complete anything. +type nopArgsHandler struct{} + +func (nopArgsHandler) ParseArgs(args []string, opts *Options) error { + if len(args) == 0 { + return nil + } + + return fmt.Errorf("received %d unexpected positional arguments", len(args)) +} + +func (nopArgsHandler) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return nil, cobra.ShellCompDirectiveNoFileComp +} + +// argsToKeyValueMap parses key-value pairs from the specified arguments. +// +// It accepts these formats: +// - `--key=value` +// - `--key`, `value` +// +// Remaining arguments are returned as-is. +func argsToKeyValueMap(args []string) (map[string]string, []string) { + kv := make(map[string]string) + key := "" + tail := args + + for i, arg := range args { + // If key is set; use the next argument as value. + if key != "" { + kv[key] = arg + key = "" + tail = args[i+1:] + continue + } + + if strings.HasPrefix(arg, "--") { + parts := strings.SplitN(arg[2:], "=", 2) + if len(parts) == 2 { + kv[parts[0]] = parts[1] + tail = args[i+1:] + continue + } + + // Use this argument as key, the next as value. + key = parts[0] + continue + } + + // If we cannot interpret it; return here. + break + } + + return kv, tail +} + +// genericParseKeyValueArgs parses key-value pairs from the specified arguments. +// If there are any positional arguments left, it returns an error. +func genericParseKeyValueArgs(args []string) (map[string]string, error) { + kv, args := argsToKeyValueMap(args) + if len(args) > 0 { + return nil, fmt.Errorf("received %d unexpected positional arguments", len(args)) + } + + return kv, nil +} + +// genericCompleteKeyValueArgs completes key-value pairs from the specified arguments. +// Completion options that are already specified are skipped. +func genericCompleteKeyValueArgs(args []string, toComplete string, options []string) ([]string, cobra.ShellCompDirective) { + // If the string to complete contains an equals sign, then we are + // completing the value part (which we don't know here). + if strings.Contains(toComplete, "=") { + return nil, cobra.ShellCompDirectiveNoFileComp + } + + // Remove already completed key/value pairs. + kv, args := argsToKeyValueMap(args) + + // If the list of remaining args is empty, return possible completions. + if len(args) == 0 { + var completions []string + for _, option := range options { + // Skip options that have already been specified. + if _, ok := kv[option]; ok { + continue + } + completions = append(completions, fmt.Sprintf("--%s=", option)) + } + // Note: we include cobra.ShellCompDirectiveNoSpace to suggest including + // the value part right after the equals sign. + return completions, cobra.ShellCompDirectiveNoFileComp | cobra.ShellCompDirectiveNoSpace + } + + return nil, cobra.ShellCompDirectiveNoFileComp +} diff --git a/bundle/run/args_test.go b/bundle/run/args_test.go new file mode 100644 index 00000000..aff14b48 --- /dev/null +++ b/bundle/run/args_test.go @@ -0,0 +1,134 @@ +package run + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestNopArgsHandler(t *testing.T) { + h := nopArgsHandler{} + opts := &Options{} + + // No error if no positional arguments are passed. + err := h.ParseArgs([]string{}, opts) + assert.NoError(t, err) + + // Error if any positional arguments are passed. + err = h.ParseArgs([]string{"foo"}, opts) + assert.EqualError(t, err, "received 1 unexpected positional arguments") + + // No completions. + completions, _ := h.CompleteArgs([]string{}, "") + assert.Nil(t, completions) +} + +func TestArgsToKeyValueMap(t *testing.T) { + for _, tc := range []struct { + input []string + expected map[string]string + tail []string + err error + }{ + { + input: []string{}, + expected: map[string]string{}, + tail: []string{}, + }, + { + input: []string{"--foo=bar", "--baz", "qux"}, + expected: map[string]string{ + "foo": "bar", + "baz": "qux", + }, + tail: []string{}, + }, + { + input: []string{"--foo=bar", "--baz", "qux", "tail"}, + expected: map[string]string{ + "foo": "bar", + "baz": "qux", + }, + tail: []string{"tail"}, + }, + { + input: []string{"--foo=bar", "--baz", "qux", "tail", "--foo=bar"}, + expected: map[string]string{ + "foo": "bar", + "baz": "qux", + }, + tail: []string{"tail", "--foo=bar"}, + }, + { + input: []string{"--foo=bar", "--baz=qux"}, + expected: map[string]string{ + "foo": "bar", + "baz": "qux", + }, + tail: []string{}, + }, + { + input: []string{"--foo=bar", "--baz=--qux"}, + expected: map[string]string{ + "foo": "bar", + "baz": "--qux", + }, + tail: []string{}, + }, + { + input: []string{"--foo=bar", "--baz="}, + expected: map[string]string{ + "foo": "bar", + "baz": "", + }, + tail: []string{}, + }, + { + input: []string{"--foo=bar", "--baz"}, + expected: map[string]string{ + "foo": "bar", + }, + tail: []string{"--baz"}, + }, + } { + actual, tail := argsToKeyValueMap(tc.input) + assert.Equal(t, tc.expected, actual) + assert.Equal(t, tc.tail, tail) + } +} + +func TestGenericParseKeyValueArgs(t *testing.T) { + kv, err := genericParseKeyValueArgs([]string{"--foo=bar", "--baz", "qux"}) + assert.NoError(t, err) + assert.Equal(t, map[string]string{ + "foo": "bar", + "baz": "qux", + }, kv) + + _, err = genericParseKeyValueArgs([]string{"--foo=bar", "--baz", "qux", "tail"}) + assert.EqualError(t, err, "received 1 unexpected positional arguments") +} + +func TestGenericCompleteKeyValueArgs(t *testing.T) { + var completions []string + + // Complete nothing if there are no options. + completions, _ = genericCompleteKeyValueArgs([]string{}, ``, []string{}) + assert.Empty(t, completions) + + // Complete nothing if we're in the middle of a key-value pair (as single argument with equals sign). + completions, _ = genericCompleteKeyValueArgs([]string{}, `--foo=`, []string{`foo`, `bar`}) + assert.Empty(t, completions) + + // Complete nothing if we're in the middle of a key-value pair (as two arguments). + completions, _ = genericCompleteKeyValueArgs([]string{`--foo`}, ``, []string{`foo`, `bar`}) + assert.Empty(t, completions) + + // Complete if we're at the beginning. + completions, _ = genericCompleteKeyValueArgs([]string{}, ``, []string{`foo`, `bar`}) + assert.Equal(t, []string{`--foo=`, `--bar=`}, completions) + + // Complete if we have already one key-value pair. + completions, _ = genericCompleteKeyValueArgs([]string{`--foo=bar`}, ``, []string{`foo`, `bar`}) + assert.Equal(t, []string{`--bar=`}, completions) +} diff --git a/bundle/run/job.go b/bundle/run/job.go index 043ea846..8003c7d2 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/fatih/color" + "github.com/spf13/cobra" "golang.org/x/sync/errgroup" ) @@ -315,3 +316,11 @@ func (r *jobRunner) Cancel(ctx context.Context) error { return errGroup.Wait() } + +func (r *jobRunner) ParseArgs(args []string, opts *Options) error { + return r.posArgsHandler().ParseArgs(args, opts) +} + +func (r *jobRunner) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return r.posArgsHandler().CompleteArgs(args, toComplete) +} diff --git a/bundle/run/job_args.go b/bundle/run/job_args.go new file mode 100644 index 00000000..85cf96ef --- /dev/null +++ b/bundle/run/job_args.go @@ -0,0 +1,184 @@ +package run + +import ( + "github.com/databricks/cli/bundle/config/resources" + "github.com/spf13/cobra" + "golang.org/x/exp/maps" +) + +type jobParameterArgs struct { + *resources.Job +} + +func (a jobParameterArgs) ParseArgs(args []string, opts *Options) error { + kv, err := genericParseKeyValueArgs(args) + if err != nil { + return err + } + + // Merge the key-value pairs from the args into the options struct. + if opts.Job.jobParams == nil { + opts.Job.jobParams = kv + } else { + for k, v := range kv { + opts.Job.jobParams[k] = v + } + } + return nil +} + +func (a jobParameterArgs) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + var completions []string + for _, param := range a.Parameters { + completions = append(completions, param.Name) + } + return genericCompleteKeyValueArgs(args, toComplete, completions) +} + +type jobTaskNotebookParamArgs struct { + *resources.Job +} + +func (a jobTaskNotebookParamArgs) ParseArgs(args []string, opts *Options) error { + kv, err := genericParseKeyValueArgs(args) + if err != nil { + return err + } + + // Merge the key-value pairs from the args into the options struct. + if opts.Job.notebookParams == nil { + opts.Job.notebookParams = kv + } else { + for k, v := range kv { + opts.Job.notebookParams[k] = v + } + } + return nil +} + +func (a jobTaskNotebookParamArgs) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + parameters := make(map[string]string) + for _, t := range a.Tasks { + if nt := t.NotebookTask; nt != nil { + maps.Copy(parameters, nt.BaseParameters) + } + } + return genericCompleteKeyValueArgs(args, toComplete, maps.Keys(parameters)) +} + +type jobTaskJarParamArgs struct { + *resources.Job +} + +func (a jobTaskJarParamArgs) ParseArgs(args []string, opts *Options) error { + opts.Job.jarParams = append(opts.Job.jarParams, args...) + return nil +} + +func (a jobTaskJarParamArgs) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return nil, cobra.ShellCompDirectiveNoFileComp +} + +type jobTaskPythonParamArgs struct { + *resources.Job +} + +func (a jobTaskPythonParamArgs) ParseArgs(args []string, opts *Options) error { + opts.Job.pythonParams = append(opts.Job.pythonParams, args...) + return nil +} + +func (a jobTaskPythonParamArgs) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return nil, cobra.ShellCompDirectiveNoFileComp +} + +type jobTaskSparkSubmitParamArgs struct { + *resources.Job +} + +func (a jobTaskSparkSubmitParamArgs) ParseArgs(args []string, opts *Options) error { + opts.Job.sparkSubmitParams = append(opts.Job.sparkSubmitParams, args...) + return nil +} + +func (a jobTaskSparkSubmitParamArgs) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return nil, cobra.ShellCompDirectiveNoFileComp +} + +type jobTaskType int + +const ( + jobTaskTypeNotebook jobTaskType = iota + 1 + jobTaskTypeSparkJar + jobTaskTypeSparkPython + jobTaskTypeSparkSubmit + jobTaskTypePipeline + jobTaskTypePythonWheel + jobTaskTypeSql + jobTaskTypeDbt + jobTaskTypeRunJob +) + +func (r *jobRunner) posArgsHandler() argsHandler { + job := r.job + if job == nil || job.JobSettings == nil { + return nopArgsHandler{} + } + + // Handle job parameters, if any are defined. + if len(job.Parameters) > 0 { + return &jobParameterArgs{job} + } + + // Handle task parameters otherwise. + var seen = make(map[jobTaskType]bool) + for _, t := range job.Tasks { + if t.NotebookTask != nil { + seen[jobTaskTypeNotebook] = true + } + if t.SparkJarTask != nil { + seen[jobTaskTypeSparkJar] = true + } + if t.SparkPythonTask != nil { + seen[jobTaskTypeSparkPython] = true + } + if t.SparkSubmitTask != nil { + seen[jobTaskTypeSparkSubmit] = true + } + if t.PipelineTask != nil { + seen[jobTaskTypePipeline] = true + } + if t.PythonWheelTask != nil { + seen[jobTaskTypePythonWheel] = true + } + if t.SqlTask != nil { + seen[jobTaskTypeSql] = true + } + if t.DbtTask != nil { + seen[jobTaskTypeDbt] = true + } + if t.RunJobTask != nil { + seen[jobTaskTypeRunJob] = true + } + } + + // Cannot handle positional arguments if we have more than one task type. + keys := maps.Keys(seen) + if len(keys) != 1 { + return nopArgsHandler{} + } + + switch keys[0] { + case jobTaskTypeNotebook: + return jobTaskNotebookParamArgs{job} + case jobTaskTypeSparkJar: + return jobTaskJarParamArgs{job} + case jobTaskTypeSparkPython, jobTaskTypePythonWheel: + return jobTaskPythonParamArgs{job} + case jobTaskTypeSparkSubmit: + return jobTaskSparkSubmitParamArgs{job} + default: + // No positional argument handling for other task types. + return nopArgsHandler{} + } +} diff --git a/bundle/run/job_args_test.go b/bundle/run/job_args_test.go new file mode 100644 index 00000000..70999490 --- /dev/null +++ b/bundle/run/job_args_test.go @@ -0,0 +1,223 @@ +package run + +import ( + "testing" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" +) + +func TestJobParameterArgs(t *testing.T) { + a := jobParameterArgs{ + &resources.Job{ + JobSettings: &jobs.JobSettings{ + Parameters: []jobs.JobParameterDefinition{ + { + Name: "foo", + Default: "value", + }, + { + Name: "bar", + Default: "value", + }, + }, + }, + }, + } + + t.Run("ParseArgsError", func(t *testing.T) { + var opts Options + err := a.ParseArgs([]string{"--p1=v1", "superfluous"}, &opts) + assert.ErrorContains(t, err, "unexpected positional arguments") + }) + + t.Run("ParseArgs", func(t *testing.T) { + var opts Options + err := a.ParseArgs([]string{"--p1=v1", "--p2=v2"}, &opts) + assert.NoError(t, err) + assert.Equal( + t, + map[string]string{ + "p1": "v1", + "p2": "v2", + }, + opts.Job.jobParams, + ) + }) + + t.Run("ParseArgsAppend", func(t *testing.T) { + var opts Options + opts.Job.jobParams = map[string]string{"p1": "v1"} + err := a.ParseArgs([]string{"--p2=v2"}, &opts) + assert.NoError(t, err) + assert.Equal( + t, + map[string]string{ + "p1": "v1", + "p2": "v2", + }, + opts.Job.jobParams, + ) + }) + + t.Run("CompleteArgs", func(t *testing.T) { + completions, _ := a.CompleteArgs([]string{}, "") + assert.Equal(t, []string{"--foo=", "--bar="}, completions) + }) +} + +func TestJobTaskNotebookParamArgs(t *testing.T) { + a := jobTaskNotebookParamArgs{ + &resources.Job{ + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NotebookTask: &jobs.NotebookTask{ + BaseParameters: map[string]string{ + "foo": "value", + "bar": "value", + }, + }, + }, + }, + }, + }, + } + + t.Run("ParseArgsError", func(t *testing.T) { + var opts Options + err := a.ParseArgs([]string{"--p1=v1", "superfluous"}, &opts) + assert.ErrorContains(t, err, "unexpected positional arguments") + }) + + t.Run("ParseArgs", func(t *testing.T) { + var opts Options + err := a.ParseArgs([]string{"--p1=v1", "--p2=v2"}, &opts) + assert.NoError(t, err) + assert.Equal( + t, + map[string]string{ + "p1": "v1", + "p2": "v2", + }, + opts.Job.notebookParams, + ) + }) + + t.Run("ParseArgsAppend", func(t *testing.T) { + var opts Options + opts.Job.notebookParams = map[string]string{"p1": "v1"} + err := a.ParseArgs([]string{"--p2=v2"}, &opts) + assert.NoError(t, err) + assert.Equal( + t, + map[string]string{ + "p1": "v1", + "p2": "v2", + }, + opts.Job.notebookParams, + ) + }) + + t.Run("CompleteArgs", func(t *testing.T) { + completions, _ := a.CompleteArgs([]string{}, "") + assert.ElementsMatch(t, []string{"--foo=", "--bar="}, completions) + }) +} + +func TestJobTaskJarParamArgs(t *testing.T) { + a := jobTaskJarParamArgs{} + + t.Run("ParseArgs", func(t *testing.T) { + var opts Options + err := a.ParseArgs([]string{"foo", "bar"}, &opts) + assert.NoError(t, err) + assert.Equal( + t, + []string{"foo", "bar"}, + opts.Job.jarParams, + ) + }) + + t.Run("ParseArgsAppend", func(t *testing.T) { + var opts Options + opts.Job.jarParams = []string{"foo"} + err := a.ParseArgs([]string{"bar"}, &opts) + assert.NoError(t, err) + assert.Equal( + t, + []string{"foo", "bar"}, + opts.Job.jarParams, + ) + }) + + t.Run("CompleteArgs", func(t *testing.T) { + completions, _ := a.CompleteArgs([]string{}, "") + assert.Empty(t, completions) + }) +} + +func TestJobTaskPythonParamArgs(t *testing.T) { + a := jobTaskPythonParamArgs{} + + t.Run("ParseArgs", func(t *testing.T) { + var opts Options + err := a.ParseArgs([]string{"foo", "bar"}, &opts) + assert.NoError(t, err) + assert.Equal( + t, + []string{"foo", "bar"}, + opts.Job.pythonParams, + ) + }) + + t.Run("ParseArgsAppend", func(t *testing.T) { + var opts Options + opts.Job.pythonParams = []string{"foo"} + err := a.ParseArgs([]string{"bar"}, &opts) + assert.NoError(t, err) + assert.Equal( + t, + []string{"foo", "bar"}, + opts.Job.pythonParams, + ) + }) + + t.Run("CompleteArgs", func(t *testing.T) { + completions, _ := a.CompleteArgs([]string{}, "") + assert.Empty(t, completions) + }) +} + +func TestJobTaskSparkSubmitParamArgs(t *testing.T) { + a := jobTaskSparkSubmitParamArgs{} + + t.Run("ParseArgs", func(t *testing.T) { + var opts Options + err := a.ParseArgs([]string{"foo", "bar"}, &opts) + assert.NoError(t, err) + assert.Equal( + t, + []string{"foo", "bar"}, + opts.Job.sparkSubmitParams, + ) + }) + + t.Run("ParseArgsAppend", func(t *testing.T) { + var opts Options + opts.Job.sparkSubmitParams = []string{"foo"} + err := a.ParseArgs([]string{"bar"}, &opts) + assert.NoError(t, err) + assert.Equal( + t, + []string{"foo", "bar"}, + opts.Job.sparkSubmitParams, + ) + }) + + t.Run("CompleteArgs", func(t *testing.T) { + completions, _ := a.CompleteArgs([]string{}, "") + assert.Empty(t, completions) + }) +} diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index e1f5bfe5..4e29b9f3 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -12,6 +12,7 @@ import ( "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/spf13/cobra" ) func filterEventsByUpdateId(events []pipelines.PipelineEvent, updateId string) []pipelines.PipelineEvent { @@ -181,3 +182,15 @@ func (r *pipelineRunner) Cancel(ctx context.Context) error { _, err = wait.GetWithTimeout(jobRunTimeout) return err } + +func (r *pipelineRunner) ParseArgs(args []string, opts *Options) error { + if len(args) == 0 { + return nil + } + + return fmt.Errorf("received %d unexpected positional arguments", len(args)) +} + +func (r *pipelineRunner) CompleteArgs(args []string, toComplete string) ([]string, cobra.ShellCompDirective) { + return nil, cobra.ShellCompDirectiveNoFileComp +} diff --git a/bundle/run/runner.go b/bundle/run/runner.go index de2a1ae7..0f202ce7 100644 --- a/bundle/run/runner.go +++ b/bundle/run/runner.go @@ -29,6 +29,9 @@ type Runner interface { // Cancel the underlying workflow. Cancel(ctx context.Context) error + + // Runners support parsing and completion of additional positional arguments. + argsHandler } // Find locates a runner matching the specified argument. diff --git a/cmd/bundle/run.go b/cmd/bundle/run.go index e6a8e1ba..63458f85 100644 --- a/cmd/bundle/run.go +++ b/cmd/bundle/run.go @@ -18,8 +18,26 @@ import ( func newRunCommand() *cobra.Command { cmd := &cobra.Command{ Use: "run [flags] KEY", - Short: "Run a resource (e.g. a job or a pipeline)", - Args: root.MaximumNArgs(1), + Short: "Run a job or pipeline update", + Long: `Run the job or pipeline identified by KEY. + +The KEY is the unique identifier of the resource to run. In addition to +customizing the run using any of the available flags, you can also specify +keyword or positional arguments as shown in these examples: + + databricks bundle run my_job -- --key1 value1 --key2 value2 + +Or: + + databricks bundle run my_job -- value1 value2 value3 + +If the specified job uses job parameters or the job has a notebook task with +parameters, the first example applies and flag names are mapped to the +parameter names. + +If the specified job does not use job parameters and the job has a Python file +task or a Python wheel task, the second example applies. +`, } var runOptions run.Options @@ -62,7 +80,7 @@ func newRunCommand() *cobra.Command { args = append(args, id) } - if len(args) != 1 { + if len(args) < 1 { return fmt.Errorf("expected a KEY of the resource to run") } @@ -71,6 +89,12 @@ func newRunCommand() *cobra.Command { return err } + // Parse additional positional arguments. + err = runner.ParseArgs(args[1:], &runOptions) + if err != nil { + return err + } + runOptions.NoWait = noWait if restart { s := cmdio.Spinner(ctx) @@ -107,10 +131,6 @@ func newRunCommand() *cobra.Command { } cmd.ValidArgsFunction = func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { - if len(args) > 0 { - return nil, cobra.ShellCompDirectiveNoFileComp - } - b, diags := root.MustConfigureBundle(cmd) if err := diags.Error(); err != nil { cobra.CompErrorln(err.Error()) @@ -123,7 +143,16 @@ func newRunCommand() *cobra.Command { return nil, cobra.ShellCompDirectiveNoFileComp } - return run.ResourceCompletions(b), cobra.ShellCompDirectiveNoFileComp + if len(args) == 0 { + return run.ResourceCompletions(b), cobra.ShellCompDirectiveNoFileComp + } else { + // If we know the resource to run, we can complete additional positional arguments. + runner, err := run.Find(b, args[0]) + if err != nil { + return nil, cobra.ShellCompDirectiveError + } + return runner.CompleteArgs(args[1:], toComplete) + } } return cmd