From 06b50670e1dbcc0c6a5b91feef5d8262f36b7a45 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 15 Jan 2024 08:42:36 +0100 Subject: [PATCH] Support passing job parameters to bundle run (#1115) ## Changes This change adds support for job parameters. If job parameters are specified for a job that doesn't define job parameters it returns an error. Conversely, if task parameters are specified for a job that defines job parameters, it also returns an error. This change moves the options structs and their functions to separate files and backfills test coverage for them. Job parameters can now be specified with `--params foo=bar,bar=qux`. ## Tests Unit tests and manual integration testing. --- bundle/run/job.go | 71 +------- bundle/run/job_options.go | 127 +++++++++++++++ bundle/run/job_options_test.go | 243 ++++++++++++++++++++++++++++ bundle/run/pipeline.go | 62 +------ bundle/run/pipeline_options.go | 68 ++++++++ bundle/run/pipeline_options_test.go | 81 ++++++++++ 6 files changed, 521 insertions(+), 131 deletions(-) create mode 100644 bundle/run/job_options.go create mode 100644 bundle/run/job_options_test.go create mode 100644 bundle/run/pipeline_options.go create mode 100644 bundle/run/pipeline_options_test.go diff --git a/bundle/run/job.go b/bundle/run/job.go index a6343b97..a54279c1 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -15,77 +15,8 @@ import ( "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/fatih/color" - flag "github.com/spf13/pflag" ) -// JobOptions defines options for running a job. -type JobOptions struct { - dbtCommands []string - jarParams []string - notebookParams map[string]string - pipelineParams map[string]string - pythonNamedParams map[string]string - pythonParams []string - sparkSubmitParams []string - sqlParams map[string]string -} - -func (o *JobOptions) Define(fs *flag.FlagSet) { - fs.StringSliceVar(&o.dbtCommands, "dbt-commands", nil, "A list of commands to execute for jobs with DBT tasks.") - fs.StringSliceVar(&o.jarParams, "jar-params", nil, "A list of parameters for jobs with Spark JAR tasks.") - fs.StringToStringVar(&o.notebookParams, "notebook-params", nil, "A map from keys to values for jobs with notebook tasks.") - fs.StringToStringVar(&o.pipelineParams, "pipeline-params", nil, "A map from keys to values for jobs with pipeline tasks.") - fs.StringToStringVar(&o.pythonNamedParams, "python-named-params", nil, "A map from keys to values for jobs with Python wheel tasks.") - fs.StringSliceVar(&o.pythonParams, "python-params", nil, "A list of parameters for jobs with Python tasks.") - fs.StringSliceVar(&o.sparkSubmitParams, "spark-submit-params", nil, "A list of parameters for jobs with Spark submit tasks.") - fs.StringToStringVar(&o.sqlParams, "sql-params", nil, "A map from keys to values for jobs with SQL tasks.") -} - -func (o *JobOptions) validatePipelineParams() (*jobs.PipelineParams, error) { - if len(o.pipelineParams) == 0 { - return nil, nil - } - - var defaultErr = fmt.Errorf("job run argument --pipeline-params only supports `full_refresh=`") - v, ok := o.pipelineParams["full_refresh"] - if !ok { - return nil, defaultErr - } - - b, err := strconv.ParseBool(v) - if err != nil { - return nil, defaultErr - } - - pipelineParams := &jobs.PipelineParams{ - FullRefresh: b, - } - - return pipelineParams, nil -} - -func (o *JobOptions) toPayload(jobID int64) (*jobs.RunNow, error) { - pipelineParams, err := o.validatePipelineParams() - if err != nil { - return nil, err - } - - payload := &jobs.RunNow{ - JobId: jobID, - - DbtCommands: o.dbtCommands, - JarParams: o.jarParams, - NotebookParams: o.notebookParams, - PipelineParams: pipelineParams, - PythonNamedParams: o.pythonNamedParams, - PythonParams: o.pythonParams, - SparkSubmitParams: o.sparkSubmitParams, - SqlParams: o.sqlParams, - } - - return payload, nil -} - // Default timeout for waiting for a job run to complete. var jobRunTimeout time.Duration = 24 * time.Hour @@ -228,7 +159,7 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e } // construct request payload from cmd line flags args - req, err := opts.Job.toPayload(jobID) + req, err := opts.Job.toPayload(r.job, jobID) if err != nil { return nil, err } diff --git a/bundle/run/job_options.go b/bundle/run/job_options.go new file mode 100644 index 00000000..209591d7 --- /dev/null +++ b/bundle/run/job_options.go @@ -0,0 +1,127 @@ +package run + +import ( + "fmt" + "strconv" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + flag "github.com/spf13/pflag" +) + +// JobOptions defines options for running a job. +type JobOptions struct { + // Task parameters are specific to the type of task. + dbtCommands []string + jarParams []string + notebookParams map[string]string + pipelineParams map[string]string + pythonNamedParams map[string]string + pythonParams []string + sparkSubmitParams []string + sqlParams map[string]string + + // Job parameters are a map of key-value pairs that are passed to the job. + // If a job uses job parameters, it cannot use task parameters. + // Also see https://docs.databricks.com/en/workflows/jobs/settings.html#add-parameters-for-all-job-tasks. + jobParams map[string]string +} + +func (o *JobOptions) Define(fs *flag.FlagSet) { + // Define task parameters flags. + fs.StringSliceVar(&o.dbtCommands, "dbt-commands", nil, "A list of commands to execute for jobs with DBT tasks.") + fs.StringSliceVar(&o.jarParams, "jar-params", nil, "A list of parameters for jobs with Spark JAR tasks.") + fs.StringToStringVar(&o.notebookParams, "notebook-params", nil, "A map from keys to values for jobs with notebook tasks.") + fs.StringToStringVar(&o.pipelineParams, "pipeline-params", nil, "A map from keys to values for jobs with pipeline tasks.") + fs.StringToStringVar(&o.pythonNamedParams, "python-named-params", nil, "A map from keys to values for jobs with Python wheel tasks.") + fs.StringSliceVar(&o.pythonParams, "python-params", nil, "A list of parameters for jobs with Python tasks.") + fs.StringSliceVar(&o.sparkSubmitParams, "spark-submit-params", nil, "A list of parameters for jobs with Spark submit tasks.") + fs.StringToStringVar(&o.sqlParams, "sql-params", nil, "A map from keys to values for jobs with SQL tasks.") + + // Define job parameters flag. + fs.StringToStringVar(&o.jobParams, "params", nil, "comma separated k=v pairs for job parameters") +} + +func (o *JobOptions) hasTaskParametersConfigured() bool { + return len(o.dbtCommands) > 0 || + len(o.jarParams) > 0 || + len(o.notebookParams) > 0 || + len(o.pipelineParams) > 0 || + len(o.pythonNamedParams) > 0 || + len(o.pythonParams) > 0 || + len(o.sparkSubmitParams) > 0 || + len(o.sqlParams) > 0 +} + +func (o *JobOptions) hasJobParametersConfigured() bool { + return len(o.jobParams) > 0 +} + +// Validate returns if the combination of options is valid. +func (o *JobOptions) Validate(job *resources.Job) error { + if job == nil { + return fmt.Errorf("job not defined") + } + + // Ensure mutual exclusion on job parameters and task parameters. + hasJobParams := len(job.Parameters) > 0 + if hasJobParams && o.hasTaskParametersConfigured() { + return fmt.Errorf("the job to run defines job parameters; specifying task parameters is not allowed") + } + if !hasJobParams && o.hasJobParametersConfigured() { + return fmt.Errorf("the job to run does not define job parameters; specifying job parameters is not allowed") + } + + return nil +} + +func (o *JobOptions) validatePipelineParams() (*jobs.PipelineParams, error) { + if len(o.pipelineParams) == 0 { + return nil, nil + } + + var defaultErr = fmt.Errorf("job run argument --pipeline-params only supports `full_refresh=`") + v, ok := o.pipelineParams["full_refresh"] + if !ok { + return nil, defaultErr + } + + b, err := strconv.ParseBool(v) + if err != nil { + return nil, defaultErr + } + + pipelineParams := &jobs.PipelineParams{ + FullRefresh: b, + } + + return pipelineParams, nil +} + +func (o *JobOptions) toPayload(job *resources.Job, jobID int64) (*jobs.RunNow, error) { + if err := o.Validate(job); err != nil { + return nil, err + } + + pipelineParams, err := o.validatePipelineParams() + if err != nil { + return nil, err + } + + payload := &jobs.RunNow{ + JobId: jobID, + + DbtCommands: o.dbtCommands, + JarParams: o.jarParams, + NotebookParams: o.notebookParams, + PipelineParams: pipelineParams, + PythonNamedParams: o.pythonNamedParams, + PythonParams: o.pythonParams, + SparkSubmitParams: o.sparkSubmitParams, + SqlParams: o.sqlParams, + + JobParameters: o.jobParams, + } + + return payload, nil +} diff --git a/bundle/run/job_options_test.go b/bundle/run/job_options_test.go new file mode 100644 index 00000000..822771d8 --- /dev/null +++ b/bundle/run/job_options_test.go @@ -0,0 +1,243 @@ +package run + +import ( + "testing" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + flag "github.com/spf13/pflag" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func setupJobOptions(t *testing.T) (*flag.FlagSet, *JobOptions) { + var fs flag.FlagSet + var opts JobOptions + opts.Define(&fs) + return &fs, &opts +} + +func TestJobOptionsDbtCommands(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--dbt-commands=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.dbtCommands) +} + +func TestJobOptionsDbtCommandsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--dbt-commands="arg1","arg2,arg3"`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.dbtCommands) +} + +func TestJobOptionsDbtCommandsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--dbt-commands=arg1,arg2`, + `--dbt-commands=arg3`, + }) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.dbtCommands) +} + +func TestJobOptionsJarParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--jar-params=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.jarParams) +} + +func TestJobOptionsJarParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--jar-params="arg1","arg2,arg3"`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.jarParams) +} + +func TestJobOptionsJarParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--jar-params=arg1,arg2`, + `--jar-params=arg3`, + }) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.jarParams) +} + +func TestJobOptionsNotebookParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--notebook-params=arg1=1,arg2=2,arg3=3`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.notebookParams) +} + +func TestJobOptionsNotebookParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--notebook-params="arg1=1","arg2=2,arg3=3"`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2,arg3=3"}, opts.notebookParams) +} + +func TestJobOptionsNotebookParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--notebook-params=arg1=1,arg2=2`, + `--notebook-params=arg3=3`, + }) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.notebookParams) +} + +func TestJobOptionsPythonNamedParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-named-params=arg1=1,arg2=2,arg3=3`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.pythonNamedParams) +} + +func TestJobOptionsPythonNamedParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-named-params="arg1=1","arg2=2,arg3=3"`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2,arg3=3"}, opts.pythonNamedParams) +} + +func TestJobOptionsPythonNamedParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--python-named-params=arg1=1,arg2=2`, + `--python-named-params=arg3=3`, + }) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.pythonNamedParams) +} + +func TestJobOptionsPythonParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-params=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.pythonParams) +} + +func TestJobOptionsPythonParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-params="arg1","arg2,arg3"`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.pythonParams) +} + +func TestJobOptionsPythonParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--python-params=arg1,arg2`, + `--python-params=arg3`, + }) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.pythonParams) +} + +func TestJobOptionsSparkSubmitParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--spark-submit-params=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.sparkSubmitParams) +} + +func TestJobOptionsSparkSubmitParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--spark-submit-params="arg1","arg2,arg3"`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.sparkSubmitParams) +} + +func TestJobOptionsSparkSubmitParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--spark-submit-params=arg1,arg2`, + `--spark-submit-params=arg3`, + }) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.sparkSubmitParams) +} + +func TestJobOptionsSqlParams(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--sql-params=arg1=1,arg2=2,arg3=3`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.sqlParams) +} + +func TestJobOptionsSqlParamsWithQuotes(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--sql-params="arg1=1","arg2=2,arg3=3"`}) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2,arg3=3"}, opts.sqlParams) +} + +func TestJobOptionsSqlParamsMultiple(t *testing.T) { + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{ + `--sql-params=arg1=1,arg2=2`, + `--sql-params=arg3=3`, + }) + require.NoError(t, err) + assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.sqlParams) +} + +func TestJobOptionsValidateIfJobHasJobParameters(t *testing.T) { + job := &resources.Job{ + JobSettings: &jobs.JobSettings{ + Parameters: []jobs.JobParameterDefinition{ + { + Name: "param", + Default: "value", + }, + }, + }, + } + + { + // Test error if task parameters are specified. + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-params=arg1`}) + require.NoError(t, err) + err = opts.Validate(job) + assert.ErrorContains(t, err, "the job to run defines job parameters; specifying task parameters is not allowed") + } + + { + // Test no error if job parameters are specified. + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--params=arg1=val1`}) + require.NoError(t, err) + err = opts.Validate(job) + assert.NoError(t, err) + } +} + +func TestJobOptionsValidateIfJobHasNoJobParameters(t *testing.T) { + job := &resources.Job{ + JobSettings: &jobs.JobSettings{ + Parameters: []jobs.JobParameterDefinition{}, + }, + } + + { + // Test error if job parameters are specified. + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--params=arg1=val1`}) + require.NoError(t, err) + err = opts.Validate(job) + assert.ErrorContains(t, err, "the job to run does not define job parameters; specifying job parameters is not allowed") + } + + { + // Test no error if task parameters are specified. + fs, opts := setupJobOptions(t) + err := fs.Parse([]string{`--python-params=arg1`}) + require.NoError(t, err) + err = opts.Validate(job) + assert.NoError(t, err) + } +} diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index 216712d3..342a771b 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -3,7 +3,6 @@ package run import ( "context" "fmt" - "strings" "time" "github.com/databricks/cli/bundle" @@ -13,7 +12,6 @@ import ( "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go/service/pipelines" - flag "github.com/spf13/pflag" ) func filterEventsByUpdateId(events []pipelines.PipelineEvent, updateId string) []pipelines.PipelineEvent { @@ -71,64 +69,6 @@ func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, u return nil } -// PipelineOptions defines options for running a pipeline update. -type PipelineOptions struct { - // Perform a full graph update. - RefreshAll bool - - // List of tables to update. - Refresh []string - - // Perform a full graph reset and recompute. - FullRefreshAll bool - - // List of tables to reset and recompute. - FullRefresh []string -} - -func (o *PipelineOptions) Define(fs *flag.FlagSet) { - fs.BoolVar(&o.RefreshAll, "refresh-all", false, "Perform a full graph update.") - fs.StringSliceVar(&o.Refresh, "refresh", nil, "List of tables to update.") - fs.BoolVar(&o.FullRefreshAll, "full-refresh-all", false, "Perform a full graph reset and recompute.") - fs.StringSliceVar(&o.FullRefresh, "full-refresh", nil, "List of tables to reset and recompute.") -} - -// Validate returns if the combination of options is valid. -func (o *PipelineOptions) Validate() error { - set := []string{} - if o.RefreshAll { - set = append(set, "--refresh-all") - } - if len(o.Refresh) > 0 { - set = append(set, "--refresh") - } - if o.FullRefreshAll { - set = append(set, "--full-refresh-all") - } - if len(o.FullRefresh) > 0 { - set = append(set, "--full-refresh") - } - if len(set) > 1 { - return fmt.Errorf("pipeline run arguments are mutually exclusive (got %s)", strings.Join(set, ", ")) - } - return nil -} - -func (o *PipelineOptions) toPayload(pipelineID string) (*pipelines.StartUpdate, error) { - if err := o.Validate(); err != nil { - return nil, err - } - payload := &pipelines.StartUpdate{ - PipelineId: pipelineID, - - // Note: `RefreshAll` is implied if the fields below are not set. - RefreshSelection: o.Refresh, - FullRefresh: o.FullRefreshAll, - FullRefreshSelection: o.FullRefresh, - } - return payload, nil -} - type pipelineRunner struct { key @@ -155,7 +95,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp return nil, err } - req, err := opts.Pipeline.toPayload(pipelineID) + req, err := opts.Pipeline.toPayload(r.pipeline, pipelineID) if err != nil { return nil, err } diff --git a/bundle/run/pipeline_options.go b/bundle/run/pipeline_options.go new file mode 100644 index 00000000..4917f9db --- /dev/null +++ b/bundle/run/pipeline_options.go @@ -0,0 +1,68 @@ +package run + +import ( + "fmt" + "strings" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/pipelines" + flag "github.com/spf13/pflag" +) + +// PipelineOptions defines options for running a pipeline update. +type PipelineOptions struct { + // Perform a full graph update. + RefreshAll bool + + // List of tables to update. + Refresh []string + + // Perform a full graph reset and recompute. + FullRefreshAll bool + + // List of tables to reset and recompute. + FullRefresh []string +} + +func (o *PipelineOptions) Define(fs *flag.FlagSet) { + fs.BoolVar(&o.RefreshAll, "refresh-all", false, "Perform a full graph update.") + fs.StringSliceVar(&o.Refresh, "refresh", nil, "List of tables to update.") + fs.BoolVar(&o.FullRefreshAll, "full-refresh-all", false, "Perform a full graph reset and recompute.") + fs.StringSliceVar(&o.FullRefresh, "full-refresh", nil, "List of tables to reset and recompute.") +} + +// Validate returns if the combination of options is valid. +func (o *PipelineOptions) Validate(pipeline *resources.Pipeline) error { + set := []string{} + if o.RefreshAll { + set = append(set, "--refresh-all") + } + if len(o.Refresh) > 0 { + set = append(set, "--refresh") + } + if o.FullRefreshAll { + set = append(set, "--full-refresh-all") + } + if len(o.FullRefresh) > 0 { + set = append(set, "--full-refresh") + } + if len(set) > 1 { + return fmt.Errorf("pipeline run arguments are mutually exclusive (got %s)", strings.Join(set, ", ")) + } + return nil +} + +func (o *PipelineOptions) toPayload(pipeline *resources.Pipeline, pipelineID string) (*pipelines.StartUpdate, error) { + if err := o.Validate(pipeline); err != nil { + return nil, err + } + payload := &pipelines.StartUpdate{ + PipelineId: pipelineID, + + // Note: `RefreshAll` is implied if the fields below are not set. + RefreshSelection: o.Refresh, + FullRefresh: o.FullRefreshAll, + FullRefreshSelection: o.FullRefresh, + } + return payload, nil +} diff --git a/bundle/run/pipeline_options_test.go b/bundle/run/pipeline_options_test.go new file mode 100644 index 00000000..3048a4d8 --- /dev/null +++ b/bundle/run/pipeline_options_test.go @@ -0,0 +1,81 @@ +package run + +import ( + "testing" + + flag "github.com/spf13/pflag" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func setupPipelineOptions(t *testing.T) (*flag.FlagSet, *PipelineOptions) { + var fs flag.FlagSet + var opts PipelineOptions + opts.Define(&fs) + return &fs, &opts +} + +func TestPipelineOptionsRefreshAll(t *testing.T) { + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{`--refresh-all`}) + require.NoError(t, err) + assert.True(t, opts.RefreshAll) +} + +func TestPipelineOptionsRefresh(t *testing.T) { + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{`--refresh=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.Refresh) +} + +func TestPipelineOptionsFullRefreshAll(t *testing.T) { + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{`--full-refresh-all`}) + require.NoError(t, err) + assert.True(t, opts.FullRefreshAll) +} + +func TestPipelineOptionsFullRefresh(t *testing.T) { + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{`--full-refresh=arg1,arg2,arg3`}) + require.NoError(t, err) + assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.FullRefresh) +} + +func TestPipelineOptionsValidateSuccessWithSingleOption(t *testing.T) { + args := []string{ + `--refresh-all`, + `--refresh=arg1,arg2,arg3`, + `--full-refresh-all`, + `--full-refresh=arg1,arg2,arg3`, + } + for _, arg := range args { + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{arg}) + require.NoError(t, err) + err = opts.Validate(nil) + assert.NoError(t, err) + } +} + +func TestPipelineOptionsValidateFailureWithMultipleOptions(t *testing.T) { + args := []string{ + `--refresh-all`, + `--refresh=arg1,arg2,arg3`, + `--full-refresh-all`, + `--full-refresh=arg1,arg2,arg3`, + } + for i := range args { + for j := range args { + if i == j { + continue + } + fs, opts := setupPipelineOptions(t) + err := fs.Parse([]string{args[i], args[j]}) + require.NoError(t, err) + err = opts.Validate(nil) + assert.ErrorContains(t, err, "pipeline run arguments are mutually exclusive") + } + } +}