diff --git a/bundle/run/job.go b/bundle/run/job.go index a6343b97..a54279c1 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -15,77 +15,8 @@ import ( "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/fatih/color" - flag "github.com/spf13/pflag" ) -// JobOptions defines options for running a job. -type JobOptions struct { - dbtCommands []string - jarParams []string - notebookParams map[string]string - pipelineParams map[string]string - pythonNamedParams map[string]string - pythonParams []string - sparkSubmitParams []string - sqlParams map[string]string -} - -func (o *JobOptions) Define(fs *flag.FlagSet) { - fs.StringSliceVar(&o.dbtCommands, "dbt-commands", nil, "A list of commands to execute for jobs with DBT tasks.") - fs.StringSliceVar(&o.jarParams, "jar-params", nil, "A list of parameters for jobs with Spark JAR tasks.") - fs.StringToStringVar(&o.notebookParams, "notebook-params", nil, "A map from keys to values for jobs with notebook tasks.") - fs.StringToStringVar(&o.pipelineParams, "pipeline-params", nil, "A map from keys to values for jobs with pipeline tasks.") - fs.StringToStringVar(&o.pythonNamedParams, "python-named-params", nil, "A map from keys to values for jobs with Python wheel tasks.") - fs.StringSliceVar(&o.pythonParams, "python-params", nil, "A list of parameters for jobs with Python tasks.") - fs.StringSliceVar(&o.sparkSubmitParams, "spark-submit-params", nil, "A list of parameters for jobs with Spark submit tasks.") - fs.StringToStringVar(&o.sqlParams, "sql-params", nil, "A map from keys to values for jobs with SQL tasks.") -} - -func (o *JobOptions) validatePipelineParams() (*jobs.PipelineParams, error) { - if len(o.pipelineParams) == 0 { - return nil, nil - } - - var defaultErr = fmt.Errorf("job run argument --pipeline-params only supports `full_refresh=`") - v, ok := o.pipelineParams["full_refresh"] - if !ok { - return nil, defaultErr - } - - b, err := strconv.ParseBool(v) - if err != nil { - return nil, defaultErr - } - - pipelineParams := &jobs.PipelineParams{ - FullRefresh: b, - } - - return pipelineParams, nil -} - -func (o *JobOptions) toPayload(jobID int64) (*jobs.RunNow, error) { - pipelineParams, err := o.validatePipelineParams() - if err != nil { - return nil, err - } - - payload := &jobs.RunNow{ - JobId: jobID, - - DbtCommands: o.dbtCommands, - JarParams: o.jarParams, - NotebookParams: o.notebookParams, - PipelineParams: pipelineParams, - PythonNamedParams: o.pythonNamedParams, - PythonParams: o.pythonParams, - SparkSubmitParams: o.sparkSubmitParams, - SqlParams: o.sqlParams, - } - - return payload, nil -} - // Default timeout for waiting for a job run to complete. var jobRunTimeout time.Duration = 24 * time.Hour @@ -228,7 +159,7 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e } // construct request payload from cmd line flags args - req, err := opts.Job.toPayload(jobID) + req, err := opts.Job.toPayload(r.job, jobID) if err != nil { return nil, err } diff --git a/bundle/run/job_options.go b/bundle/run/job_options.go new file mode 100644 index 00000000..209591d7 --- /dev/null +++ b/bundle/run/job_options.go @@ -0,0 +1,127 @@ +package run + +import ( + "fmt" + "strconv" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + flag "github.com/spf13/pflag" +) + +// JobOptions defines options for running a job. +type JobOptions struct { + // Task parameters are specific to the type of task. + dbtCommands []string + jarParams []string + notebookParams map[string]string + pipelineParams map[string]string + pythonNamedParams map[string]string + pythonParams []string + sparkSubmitParams []string + sqlParams map[string]string + + // Job parameters are a map of key-value pairs that are passed to the job. + // If a job uses job parameters, it cannot use task parameters. + // Also see https://docs.databricks.com/en/workflows/jobs/settings.html#add-parameters-for-all-job-tasks. + jobParams map[string]string +} + +func (o *JobOptions) Define(fs *flag.FlagSet) { + // Define task parameters flags. + fs.StringSliceVar(&o.dbtCommands, "dbt-commands", nil, "A list of commands to execute for jobs with DBT tasks.") + fs.StringSliceVar(&o.jarParams, "jar-params", nil, "A list of parameters for jobs with Spark JAR tasks.") + fs.StringToStringVar(&o.notebookParams, "notebook-params", nil, "A map from keys to values for jobs with notebook tasks.") + fs.StringToStringVar(&o.pipelineParams, "pipeline-params", nil, "A map from keys to values for jobs with pipeline tasks.") + fs.StringToStringVar(&o.pythonNamedParams, "python-named-params", nil, "A map from keys to values for jobs with Python wheel tasks.") + fs.StringSliceVar(&o.pythonParams, "python-params", nil, "A list of parameters for jobs with Python tasks.") + fs.StringSliceVar(&o.sparkSubmitParams, "spark-submit-params", nil, "A list of parameters for jobs with Spark submit tasks.") + fs.StringToStringVar(&o.sqlParams, "sql-params", nil, "A map from keys to values for jobs with SQL tasks.") + + // Define job parameters flag. + fs.StringToStringVar(&o.jobParams, "params", nil, "comma separated k=v pairs for job parameters") +} + +func (o *JobOptions) hasTaskParametersConfigured() bool { + return len(o.dbtCommands) > 0 || + len(o.jarParams) > 0 || + len(o.notebookParams) > 0 || + len(o.pipelineParams) > 0 || + len(o.pythonNamedParams) > 0 || + len(o.pythonParams) > 0 || + len(o.sparkSubmitParams) > 0 || + len(o.sqlParams) > 0 +} + +func (o *JobOptions) hasJobParametersConfigured() bool { + return len(o.jobParams) > 0 +} + +// Validate returns if the combination of options is valid. +func (o *JobOptions) Validate(job *resources.Job) error { + if job == nil { + return fmt.Errorf("job not defined") + } + + // Ensure mutual exclusion on job parameters and task parameters. + hasJobParams := len(job.Parameters) > 0 + if hasJobParams && o.hasTaskParametersConfigured() { + return fmt.Errorf("the job to run defines job parameters; specifying task parameters is not allowed") + } + if !hasJobParams && o.hasJobParametersConfigured() { + return fmt.Errorf("the job to run does not define job parameters; specifying job parameters is not allowed") + } + + return nil +} + +func (o *JobOptions) validatePipelineParams() (*jobs.PipelineParams, error) { + if len(o.pipelineParams) == 0 { + return nil, nil + } + + var defaultErr = fmt.Errorf("job run argument --pipeline-params only supports `full_refresh=`") + v, ok := o.pipelineParams["full_refresh"] + if !ok { + return nil, defaultErr + } + + b, err := strconv.ParseBool(v) + if err != nil { + return nil, defaultErr + } + + pipelineParams := &jobs.PipelineParams{ + FullRefresh: b, + } + + return pipelineParams, nil +} + +func (o *JobOptions) toPayload(job *resources.Job, jobID int64) (*jobs.RunNow, error) { + if err := o.Validate(job); err != nil { + return nil, err + } + + pipelineParams, err := o.validatePipelineParams() + if err != nil { + return nil, err + } + + payload := &jobs.RunNow{ + JobId: jobID, + + DbtCommands: o.dbtCommands, + JarParams: o.jarParams, + NotebookParams: o.notebookParams, + PipelineParams: pipelineParams, + PythonNamedParams: o.pythonNamedParams, + PythonParams: o.pythonParams, + SparkSubmitParams: o.sparkSubmitParams, + SqlParams: o.sqlParams, + + JobParameters: o.jobParams, + } + + return payload, nil +} diff --git a/bundle/run/job_options_test.go b/bundle/run/job_options_test.go new file mode 100644 index 00000000..822771d8 --- /dev/null +++ b/bundle/run/job_options_test.go @@ -0,0 +1,243 @@ +package run + +import ( + "testing" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + flag "github.com/spf13/pflag" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func setupJobOptions(t *testing.T) (*flag.FlagSet, *JobOptions) { + var fs flag.FlagSet + var opts JobOptions + opts.Define(&fs) + return &fs, &opts +} + +func TestJobOptionsDbtCommands(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--dbt-commands=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.dbtCommands) +} + +func TestJobOptionsDbtCommandsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--dbt-commands="arg1","arg2,arg3"`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.dbtCommands) +} + +func TestJobOptionsDbtCommandsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--dbt-commands=arg1,arg2`, + `--dbt-commands=arg3`, + }) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.dbtCommands) +} + +func TestJobOptionsJarParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--jar-params=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.jarParams) +} + +func TestJobOptionsJarParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--jar-params="arg1","arg2,arg3"`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.jarParams) +} + +func TestJobOptionsJarParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--jar-params=arg1,arg2`, + `--jar-params=arg3`, + }) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.jarParams) +} + +func TestJobOptionsNotebookParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--notebook-params=arg1=1,arg2=2,arg3=3`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.notebookParams) +} + +func TestJobOptionsNotebookParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--notebook-params="arg1=1","arg2=2,arg3=3"`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2,arg3=3"}, opts.notebookParams) +} + +func TestJobOptionsNotebookParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--notebook-params=arg1=1,arg2=2`, + `--notebook-params=arg3=3`, + }) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.notebookParams) +} + +func TestJobOptionsPythonNamedParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-named-params=arg1=1,arg2=2,arg3=3`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.pythonNamedParams) +} + +func TestJobOptionsPythonNamedParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-named-params="arg1=1","arg2=2,arg3=3"`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2,arg3=3"}, opts.pythonNamedParams) +} + +func TestJobOptionsPythonNamedParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--python-named-params=arg1=1,arg2=2`, + `--python-named-params=arg3=3`, + }) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.pythonNamedParams) +} + +func TestJobOptionsPythonParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-params=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.pythonParams) +} + +func TestJobOptionsPythonParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-params="arg1","arg2,arg3"`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.pythonParams) +} + +func TestJobOptionsPythonParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--python-params=arg1,arg2`, + `--python-params=arg3`, + }) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.pythonParams) +} + +func TestJobOptionsSparkSubmitParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--spark-submit-params=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.sparkSubmitParams) +} + +func TestJobOptionsSparkSubmitParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--spark-submit-params="arg1","arg2,arg3"`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.sparkSubmitParams) +} + +func TestJobOptionsSparkSubmitParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--spark-submit-params=arg1,arg2`, + `--spark-submit-params=arg3`, + }) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.sparkSubmitParams) +} + +func TestJobOptionsSqlParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--sql-params=arg1=1,arg2=2,arg3=3`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.sqlParams) +} + +func TestJobOptionsSqlParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--sql-params="arg1=1","arg2=2,arg3=3"`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2,arg3=3"}, opts.sqlParams) +} + +func TestJobOptionsSqlParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--sql-params=arg1=1,arg2=2`, + `--sql-params=arg3=3`, + }) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.sqlParams) +} + +func TestJobOptionsValidateIfJobHasJobParameters(t *testing.T) { + job := &resources.Job{ + JobSettings: &jobs.JobSettings{ + Parameters: []jobs.JobParameterDefinition{ + { + Name: "param", + Default: "value", + }, + }, + }, + } + + { + // Test error if task parameters are specified. + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-params=arg1`}) + require.NoError(t, err) + err = opts.Validate(job) + assert.ErrorContains(t, err, "the job to run defines job parameters; specifying task parameters is not allowed") + } + + { + // Test no error if job parameters are specified. + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--params=arg1=val1`}) + require.NoError(t, err) + err = opts.Validate(job) + assert.NoError(t, err) + } +} + +func TestJobOptionsValidateIfJobHasNoJobParameters(t *testing.T) { + job := &resources.Job{ + JobSettings: &jobs.JobSettings{ + Parameters: []jobs.JobParameterDefinition{}, + }, + } + + { + // Test error if job parameters are specified. + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--params=arg1=val1`}) + require.NoError(t, err) + err = opts.Validate(job) + assert.ErrorContains(t, err, "the job to run does not define job parameters; specifying job parameters is not allowed") + } + + { + // Test no error if task parameters are specified. + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-params=arg1`}) + require.NoError(t, err) + err = opts.Validate(job) + assert.NoError(t, err) + } +} diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index 216712d3..342a771b 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -3,7 +3,6 @@ package run import ( "context" "fmt" - "strings" "time" "github.com/databricks/cli/bundle" @@ -13,7 +12,6 @@ import ( "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go/service/pipelines" - flag "github.com/spf13/pflag" ) func filterEventsByUpdateId(events []pipelines.PipelineEvent, updateId string) []pipelines.PipelineEvent { @@ -71,64 +69,6 @@ func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, u return nil } -// PipelineOptions defines options for running a pipeline update. -type PipelineOptions struct { - // Perform a full graph update. - RefreshAll bool - - // List of tables to update. - Refresh []string - - // Perform a full graph reset and recompute. - FullRefreshAll bool - - // List of tables to reset and recompute. - FullRefresh []string -} - -func (o *PipelineOptions) Define(fs *flag.FlagSet) { - fs.BoolVar(&o.RefreshAll, "refresh-all", false, "Perform a full graph update.") - fs.StringSliceVar(&o.Refresh, "refresh", nil, "List of tables to update.") - fs.BoolVar(&o.FullRefreshAll, "full-refresh-all", false, "Perform a full graph reset and recompute.") - fs.StringSliceVar(&o.FullRefresh, "full-refresh", nil, "List of tables to reset and recompute.") -} - -// Validate returns if the combination of options is valid. -func (o *PipelineOptions) Validate() error { - set := []string{} - if o.RefreshAll { - set = append(set, "--refresh-all") - } - if len(o.Refresh) > 0 { - set = append(set, "--refresh") - } - if o.FullRefreshAll { - set = append(set, "--full-refresh-all") - } - if len(o.FullRefresh) > 0 { - set = append(set, "--full-refresh") - } - if len(set) > 1 { - return fmt.Errorf("pipeline run arguments are mutually exclusive (got %s)", strings.Join(set, ", ")) - } - return nil -} - -func (o *PipelineOptions) toPayload(pipelineID string) (*pipelines.StartUpdate, error) { - if err := o.Validate(); err != nil { - return nil, err - } - payload := &pipelines.StartUpdate{ - PipelineId: pipelineID, - - // Note: `RefreshAll` is implied if the fields below are not set. - RefreshSelection: o.Refresh, - FullRefresh: o.FullRefreshAll, - FullRefreshSelection: o.FullRefresh, - } - return payload, nil -} - type pipelineRunner struct { key @@ -155,7 +95,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp return nil, err } - req, err := opts.Pipeline.toPayload(pipelineID) + req, err := opts.Pipeline.toPayload(r.pipeline, pipelineID) if err != nil { return nil, err } diff --git a/bundle/run/pipeline_options.go b/bundle/run/pipeline_options.go new file mode 100644 index 00000000..4917f9db --- /dev/null +++ b/bundle/run/pipeline_options.go @@ -0,0 +1,68 @@ +package run + +import ( + "fmt" + "strings" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/pipelines" + flag "github.com/spf13/pflag" +) + +// PipelineOptions defines options for running a pipeline update. +type PipelineOptions struct { + // Perform a full graph update. + RefreshAll bool + + // List of tables to update. + Refresh []string + + // Perform a full graph reset and recompute. + FullRefreshAll bool + + // List of tables to reset and recompute. + FullRefresh []string +} + +func (o *PipelineOptions) Define(fs *flag.FlagSet) { + fs.BoolVar(&o.RefreshAll, "refresh-all", false, "Perform a full graph update.") + fs.StringSliceVar(&o.Refresh, "refresh", nil, "List of tables to update.") + fs.BoolVar(&o.FullRefreshAll, "full-refresh-all", false, "Perform a full graph reset and recompute.") + fs.StringSliceVar(&o.FullRefresh, "full-refresh", nil, "List of tables to reset and recompute.") +} + +// Validate returns if the combination of options is valid. +func (o *PipelineOptions) Validate(pipeline *resources.Pipeline) error { + set := []string{} + if o.RefreshAll { + set = append(set, "--refresh-all") + } + if len(o.Refresh) > 0 { + set = append(set, "--refresh") + } + if o.FullRefreshAll { + set = append(set, "--full-refresh-all") + } + if len(o.FullRefresh) > 0 { + set = append(set, "--full-refresh") + } + if len(set) > 1 { + return fmt.Errorf("pipeline run arguments are mutually exclusive (got %s)", strings.Join(set, ", ")) + } + return nil +} + +func (o *PipelineOptions) toPayload(pipeline *resources.Pipeline, pipelineID string) (*pipelines.StartUpdate, error) { + if err := o.Validate(pipeline); err != nil { + return nil, err + } + payload := &pipelines.StartUpdate{ + PipelineId: pipelineID, + + // Note: `RefreshAll` is implied if the fields below are not set. + RefreshSelection: o.Refresh, + FullRefresh: o.FullRefreshAll, + FullRefreshSelection: o.FullRefresh, + } + return payload, nil +} diff --git a/bundle/run/pipeline_options_test.go b/bundle/run/pipeline_options_test.go new file mode 100644 index 00000000..3048a4d8 --- /dev/null +++ b/bundle/run/pipeline_options_test.go @@ -0,0 +1,81 @@ +package run + +import ( + "testing" + + flag "github.com/spf13/pflag" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func setupPipelineOptions(t *testing.T) (*flag.FlagSet, *PipelineOptions) { + var fs flag.FlagSet + var opts PipelineOptions + opts.Define(&fs) + return &fs, &opts +} + +func TestPipelineOptionsRefreshAll(t *testing.T) { + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{`--refresh-all`}) + require.NoError(t, err) + assert.True(t, opts.RefreshAll) +} + +func TestPipelineOptionsRefresh(t *testing.T) { + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{`--refresh=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.Refresh) +} + +func TestPipelineOptionsFullRefreshAll(t *testing.T) { + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{`--full-refresh-all`}) + require.NoError(t, err) + assert.True(t, opts.FullRefreshAll) +} + +func TestPipelineOptionsFullRefresh(t *testing.T) { + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{`--full-refresh=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.FullRefresh) +} + +func TestPipelineOptionsValidateSuccessWithSingleOption(t *testing.T) { + args := []string{ + `--refresh-all`, + `--refresh=arg1,arg2,arg3`, + `--full-refresh-all`, + `--full-refresh=arg1,arg2,arg3`, + } + for _, arg := range args { + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{arg}) + require.NoError(t, err) + err = opts.Validate(nil) + assert.NoError(t, err) + } +} + +func TestPipelineOptionsValidateFailureWithMultipleOptions(t *testing.T) { + args := []string{ + `--refresh-all`, + `--refresh=arg1,arg2,arg3`, + `--full-refresh-all`, + `--full-refresh=arg1,arg2,arg3`, + } + for i := range args { + for j := range args { + if i == j { + continue + } + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{args[i], args[j]}) + require.NoError(t, err) + err = opts.Validate(nil) + assert.ErrorContains(t, err, "pipeline run arguments are mutually exclusive") + } + } +}