mirror of https://github.com/databricks/cli.git
Support passing job parameters to bundle run (#1115)
## Changes This change adds support for job parameters. If job parameters are specified for a job that doesn't define job parameters it returns an error. Conversely, if task parameters are specified for a job that defines job parameters, it also returns an error. This change moves the options structs and their functions to separate files and backfills test coverage for them. Job parameters can now be specified with `--params foo=bar,bar=qux`. ## Tests Unit tests and manual integration testing.
This commit is contained in:
parent
2c0d06715c
commit
06b50670e1
|
@ -15,77 +15,8 @@ import (
|
|||
"github.com/databricks/cli/libs/log"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/fatih/color"
|
||||
flag "github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
// JobOptions defines options for running a job.
|
||||
type JobOptions struct {
|
||||
dbtCommands []string
|
||||
jarParams []string
|
||||
notebookParams map[string]string
|
||||
pipelineParams map[string]string
|
||||
pythonNamedParams map[string]string
|
||||
pythonParams []string
|
||||
sparkSubmitParams []string
|
||||
sqlParams map[string]string
|
||||
}
|
||||
|
||||
func (o *JobOptions) Define(fs *flag.FlagSet) {
|
||||
fs.StringSliceVar(&o.dbtCommands, "dbt-commands", nil, "A list of commands to execute for jobs with DBT tasks.")
|
||||
fs.StringSliceVar(&o.jarParams, "jar-params", nil, "A list of parameters for jobs with Spark JAR tasks.")
|
||||
fs.StringToStringVar(&o.notebookParams, "notebook-params", nil, "A map from keys to values for jobs with notebook tasks.")
|
||||
fs.StringToStringVar(&o.pipelineParams, "pipeline-params", nil, "A map from keys to values for jobs with pipeline tasks.")
|
||||
fs.StringToStringVar(&o.pythonNamedParams, "python-named-params", nil, "A map from keys to values for jobs with Python wheel tasks.")
|
||||
fs.StringSliceVar(&o.pythonParams, "python-params", nil, "A list of parameters for jobs with Python tasks.")
|
||||
fs.StringSliceVar(&o.sparkSubmitParams, "spark-submit-params", nil, "A list of parameters for jobs with Spark submit tasks.")
|
||||
fs.StringToStringVar(&o.sqlParams, "sql-params", nil, "A map from keys to values for jobs with SQL tasks.")
|
||||
}
|
||||
|
||||
func (o *JobOptions) validatePipelineParams() (*jobs.PipelineParams, error) {
|
||||
if len(o.pipelineParams) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var defaultErr = fmt.Errorf("job run argument --pipeline-params only supports `full_refresh=<bool>`")
|
||||
v, ok := o.pipelineParams["full_refresh"]
|
||||
if !ok {
|
||||
return nil, defaultErr
|
||||
}
|
||||
|
||||
b, err := strconv.ParseBool(v)
|
||||
if err != nil {
|
||||
return nil, defaultErr
|
||||
}
|
||||
|
||||
pipelineParams := &jobs.PipelineParams{
|
||||
FullRefresh: b,
|
||||
}
|
||||
|
||||
return pipelineParams, nil
|
||||
}
|
||||
|
||||
func (o *JobOptions) toPayload(jobID int64) (*jobs.RunNow, error) {
|
||||
pipelineParams, err := o.validatePipelineParams()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
payload := &jobs.RunNow{
|
||||
JobId: jobID,
|
||||
|
||||
DbtCommands: o.dbtCommands,
|
||||
JarParams: o.jarParams,
|
||||
NotebookParams: o.notebookParams,
|
||||
PipelineParams: pipelineParams,
|
||||
PythonNamedParams: o.pythonNamedParams,
|
||||
PythonParams: o.pythonParams,
|
||||
SparkSubmitParams: o.sparkSubmitParams,
|
||||
SqlParams: o.sqlParams,
|
||||
}
|
||||
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
// Default timeout for waiting for a job run to complete.
|
||||
var jobRunTimeout time.Duration = 24 * time.Hour
|
||||
|
||||
|
@ -228,7 +159,7 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
|
|||
}
|
||||
|
||||
// construct request payload from cmd line flags args
|
||||
req, err := opts.Job.toPayload(jobID)
|
||||
req, err := opts.Job.toPayload(r.job, jobID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,127 @@
|
|||
package run
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
flag "github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
// JobOptions defines options for running a job.
|
||||
type JobOptions struct {
|
||||
// Task parameters are specific to the type of task.
|
||||
dbtCommands []string
|
||||
jarParams []string
|
||||
notebookParams map[string]string
|
||||
pipelineParams map[string]string
|
||||
pythonNamedParams map[string]string
|
||||
pythonParams []string
|
||||
sparkSubmitParams []string
|
||||
sqlParams map[string]string
|
||||
|
||||
// Job parameters are a map of key-value pairs that are passed to the job.
|
||||
// If a job uses job parameters, it cannot use task parameters.
|
||||
// Also see https://docs.databricks.com/en/workflows/jobs/settings.html#add-parameters-for-all-job-tasks.
|
||||
jobParams map[string]string
|
||||
}
|
||||
|
||||
func (o *JobOptions) Define(fs *flag.FlagSet) {
|
||||
// Define task parameters flags.
|
||||
fs.StringSliceVar(&o.dbtCommands, "dbt-commands", nil, "A list of commands to execute for jobs with DBT tasks.")
|
||||
fs.StringSliceVar(&o.jarParams, "jar-params", nil, "A list of parameters for jobs with Spark JAR tasks.")
|
||||
fs.StringToStringVar(&o.notebookParams, "notebook-params", nil, "A map from keys to values for jobs with notebook tasks.")
|
||||
fs.StringToStringVar(&o.pipelineParams, "pipeline-params", nil, "A map from keys to values for jobs with pipeline tasks.")
|
||||
fs.StringToStringVar(&o.pythonNamedParams, "python-named-params", nil, "A map from keys to values for jobs with Python wheel tasks.")
|
||||
fs.StringSliceVar(&o.pythonParams, "python-params", nil, "A list of parameters for jobs with Python tasks.")
|
||||
fs.StringSliceVar(&o.sparkSubmitParams, "spark-submit-params", nil, "A list of parameters for jobs with Spark submit tasks.")
|
||||
fs.StringToStringVar(&o.sqlParams, "sql-params", nil, "A map from keys to values for jobs with SQL tasks.")
|
||||
|
||||
// Define job parameters flag.
|
||||
fs.StringToStringVar(&o.jobParams, "params", nil, "comma separated k=v pairs for job parameters")
|
||||
}
|
||||
|
||||
func (o *JobOptions) hasTaskParametersConfigured() bool {
|
||||
return len(o.dbtCommands) > 0 ||
|
||||
len(o.jarParams) > 0 ||
|
||||
len(o.notebookParams) > 0 ||
|
||||
len(o.pipelineParams) > 0 ||
|
||||
len(o.pythonNamedParams) > 0 ||
|
||||
len(o.pythonParams) > 0 ||
|
||||
len(o.sparkSubmitParams) > 0 ||
|
||||
len(o.sqlParams) > 0
|
||||
}
|
||||
|
||||
func (o *JobOptions) hasJobParametersConfigured() bool {
|
||||
return len(o.jobParams) > 0
|
||||
}
|
||||
|
||||
// Validate returns if the combination of options is valid.
|
||||
func (o *JobOptions) Validate(job *resources.Job) error {
|
||||
if job == nil {
|
||||
return fmt.Errorf("job not defined")
|
||||
}
|
||||
|
||||
// Ensure mutual exclusion on job parameters and task parameters.
|
||||
hasJobParams := len(job.Parameters) > 0
|
||||
if hasJobParams && o.hasTaskParametersConfigured() {
|
||||
return fmt.Errorf("the job to run defines job parameters; specifying task parameters is not allowed")
|
||||
}
|
||||
if !hasJobParams && o.hasJobParametersConfigured() {
|
||||
return fmt.Errorf("the job to run does not define job parameters; specifying job parameters is not allowed")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *JobOptions) validatePipelineParams() (*jobs.PipelineParams, error) {
|
||||
if len(o.pipelineParams) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var defaultErr = fmt.Errorf("job run argument --pipeline-params only supports `full_refresh=<bool>`")
|
||||
v, ok := o.pipelineParams["full_refresh"]
|
||||
if !ok {
|
||||
return nil, defaultErr
|
||||
}
|
||||
|
||||
b, err := strconv.ParseBool(v)
|
||||
if err != nil {
|
||||
return nil, defaultErr
|
||||
}
|
||||
|
||||
pipelineParams := &jobs.PipelineParams{
|
||||
FullRefresh: b,
|
||||
}
|
||||
|
||||
return pipelineParams, nil
|
||||
}
|
||||
|
||||
func (o *JobOptions) toPayload(job *resources.Job, jobID int64) (*jobs.RunNow, error) {
|
||||
if err := o.Validate(job); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pipelineParams, err := o.validatePipelineParams()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
payload := &jobs.RunNow{
|
||||
JobId: jobID,
|
||||
|
||||
DbtCommands: o.dbtCommands,
|
||||
JarParams: o.jarParams,
|
||||
NotebookParams: o.notebookParams,
|
||||
PipelineParams: pipelineParams,
|
||||
PythonNamedParams: o.pythonNamedParams,
|
||||
PythonParams: o.pythonParams,
|
||||
SparkSubmitParams: o.sparkSubmitParams,
|
||||
SqlParams: o.sqlParams,
|
||||
|
||||
JobParameters: o.jobParams,
|
||||
}
|
||||
|
||||
return payload, nil
|
||||
}
|
|
@ -0,0 +1,243 @@
|
|||
package run
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
flag "github.com/spf13/pflag"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func setupJobOptions(t *testing.T) (*flag.FlagSet, *JobOptions) {
|
||||
var fs flag.FlagSet
|
||||
var opts JobOptions
|
||||
opts.Define(&fs)
|
||||
return &fs, &opts
|
||||
}
|
||||
|
||||
func TestJobOptionsDbtCommands(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--dbt-commands=arg1,arg2,arg3`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.dbtCommands)
|
||||
}
|
||||
|
||||
func TestJobOptionsDbtCommandsWithQuotes(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--dbt-commands="arg1","arg2,arg3"`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.dbtCommands)
|
||||
}
|
||||
|
||||
func TestJobOptionsDbtCommandsMultiple(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{
|
||||
`--dbt-commands=arg1,arg2`,
|
||||
`--dbt-commands=arg3`,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.dbtCommands)
|
||||
}
|
||||
|
||||
func TestJobOptionsJarParams(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--jar-params=arg1,arg2,arg3`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.jarParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsJarParamsWithQuotes(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--jar-params="arg1","arg2,arg3"`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.jarParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsJarParamsMultiple(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{
|
||||
`--jar-params=arg1,arg2`,
|
||||
`--jar-params=arg3`,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.jarParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsNotebookParams(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--notebook-params=arg1=1,arg2=2,arg3=3`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.notebookParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsNotebookParamsWithQuotes(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--notebook-params="arg1=1","arg2=2,arg3=3"`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2,arg3=3"}, opts.notebookParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsNotebookParamsMultiple(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{
|
||||
`--notebook-params=arg1=1,arg2=2`,
|
||||
`--notebook-params=arg3=3`,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.notebookParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsPythonNamedParams(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--python-named-params=arg1=1,arg2=2,arg3=3`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.pythonNamedParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsPythonNamedParamsWithQuotes(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--python-named-params="arg1=1","arg2=2,arg3=3"`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2,arg3=3"}, opts.pythonNamedParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsPythonNamedParamsMultiple(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{
|
||||
`--python-named-params=arg1=1,arg2=2`,
|
||||
`--python-named-params=arg3=3`,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.pythonNamedParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsPythonParams(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--python-params=arg1,arg2,arg3`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.pythonParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsPythonParamsWithQuotes(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--python-params="arg1","arg2,arg3"`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.pythonParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsPythonParamsMultiple(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{
|
||||
`--python-params=arg1,arg2`,
|
||||
`--python-params=arg3`,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.pythonParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsSparkSubmitParams(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--spark-submit-params=arg1,arg2,arg3`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.sparkSubmitParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsSparkSubmitParamsWithQuotes(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--spark-submit-params="arg1","arg2,arg3"`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2,arg3"}, opts.sparkSubmitParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsSparkSubmitParamsMultiple(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{
|
||||
`--spark-submit-params=arg1,arg2`,
|
||||
`--spark-submit-params=arg3`,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.sparkSubmitParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsSqlParams(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--sql-params=arg1=1,arg2=2,arg3=3`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.sqlParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsSqlParamsWithQuotes(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--sql-params="arg1=1","arg2=2,arg3=3"`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2,arg3=3"}, opts.sqlParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsSqlParamsMultiple(t *testing.T) {
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{
|
||||
`--sql-params=arg1=1,arg2=2`,
|
||||
`--sql-params=arg3=3`,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, map[string]string{"arg1": "1", "arg2": "2", "arg3": "3"}, opts.sqlParams)
|
||||
}
|
||||
|
||||
func TestJobOptionsValidateIfJobHasJobParameters(t *testing.T) {
|
||||
job := &resources.Job{
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Parameters: []jobs.JobParameterDefinition{
|
||||
{
|
||||
Name: "param",
|
||||
Default: "value",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
{
|
||||
// Test error if task parameters are specified.
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--python-params=arg1`})
|
||||
require.NoError(t, err)
|
||||
err = opts.Validate(job)
|
||||
assert.ErrorContains(t, err, "the job to run defines job parameters; specifying task parameters is not allowed")
|
||||
}
|
||||
|
||||
{
|
||||
// Test no error if job parameters are specified.
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--params=arg1=val1`})
|
||||
require.NoError(t, err)
|
||||
err = opts.Validate(job)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobOptionsValidateIfJobHasNoJobParameters(t *testing.T) {
|
||||
job := &resources.Job{
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Parameters: []jobs.JobParameterDefinition{},
|
||||
},
|
||||
}
|
||||
|
||||
{
|
||||
// Test error if job parameters are specified.
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--params=arg1=val1`})
|
||||
require.NoError(t, err)
|
||||
err = opts.Validate(job)
|
||||
assert.ErrorContains(t, err, "the job to run does not define job parameters; specifying job parameters is not allowed")
|
||||
}
|
||||
|
||||
{
|
||||
// Test no error if task parameters are specified.
|
||||
fs, opts := setupJobOptions(t)
|
||||
err := fs.Parse([]string{`--python-params=arg1`})
|
||||
require.NoError(t, err)
|
||||
err = opts.Validate(job)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
|
@ -3,7 +3,6 @@ package run
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
|
@ -13,7 +12,6 @@ import (
|
|||
"github.com/databricks/cli/libs/cmdio"
|
||||
"github.com/databricks/cli/libs/log"
|
||||
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
||||
flag "github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
func filterEventsByUpdateId(events []pipelines.PipelineEvent, updateId string) []pipelines.PipelineEvent {
|
||||
|
@ -71,64 +69,6 @@ func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, u
|
|||
return nil
|
||||
}
|
||||
|
||||
// PipelineOptions defines options for running a pipeline update.
|
||||
type PipelineOptions struct {
|
||||
// Perform a full graph update.
|
||||
RefreshAll bool
|
||||
|
||||
// List of tables to update.
|
||||
Refresh []string
|
||||
|
||||
// Perform a full graph reset and recompute.
|
||||
FullRefreshAll bool
|
||||
|
||||
// List of tables to reset and recompute.
|
||||
FullRefresh []string
|
||||
}
|
||||
|
||||
func (o *PipelineOptions) Define(fs *flag.FlagSet) {
|
||||
fs.BoolVar(&o.RefreshAll, "refresh-all", false, "Perform a full graph update.")
|
||||
fs.StringSliceVar(&o.Refresh, "refresh", nil, "List of tables to update.")
|
||||
fs.BoolVar(&o.FullRefreshAll, "full-refresh-all", false, "Perform a full graph reset and recompute.")
|
||||
fs.StringSliceVar(&o.FullRefresh, "full-refresh", nil, "List of tables to reset and recompute.")
|
||||
}
|
||||
|
||||
// Validate returns if the combination of options is valid.
|
||||
func (o *PipelineOptions) Validate() error {
|
||||
set := []string{}
|
||||
if o.RefreshAll {
|
||||
set = append(set, "--refresh-all")
|
||||
}
|
||||
if len(o.Refresh) > 0 {
|
||||
set = append(set, "--refresh")
|
||||
}
|
||||
if o.FullRefreshAll {
|
||||
set = append(set, "--full-refresh-all")
|
||||
}
|
||||
if len(o.FullRefresh) > 0 {
|
||||
set = append(set, "--full-refresh")
|
||||
}
|
||||
if len(set) > 1 {
|
||||
return fmt.Errorf("pipeline run arguments are mutually exclusive (got %s)", strings.Join(set, ", "))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *PipelineOptions) toPayload(pipelineID string) (*pipelines.StartUpdate, error) {
|
||||
if err := o.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
payload := &pipelines.StartUpdate{
|
||||
PipelineId: pipelineID,
|
||||
|
||||
// Note: `RefreshAll` is implied if the fields below are not set.
|
||||
RefreshSelection: o.Refresh,
|
||||
FullRefresh: o.FullRefreshAll,
|
||||
FullRefreshSelection: o.FullRefresh,
|
||||
}
|
||||
return payload, nil
|
||||
}
|
||||
|
||||
type pipelineRunner struct {
|
||||
key
|
||||
|
||||
|
@ -155,7 +95,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp
|
|||
return nil, err
|
||||
}
|
||||
|
||||
req, err := opts.Pipeline.toPayload(pipelineID)
|
||||
req, err := opts.Pipeline.toPayload(r.pipeline, pipelineID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
package run
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
||||
flag "github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
// PipelineOptions defines options for running a pipeline update.
|
||||
type PipelineOptions struct {
|
||||
// Perform a full graph update.
|
||||
RefreshAll bool
|
||||
|
||||
// List of tables to update.
|
||||
Refresh []string
|
||||
|
||||
// Perform a full graph reset and recompute.
|
||||
FullRefreshAll bool
|
||||
|
||||
// List of tables to reset and recompute.
|
||||
FullRefresh []string
|
||||
}
|
||||
|
||||
func (o *PipelineOptions) Define(fs *flag.FlagSet) {
|
||||
fs.BoolVar(&o.RefreshAll, "refresh-all", false, "Perform a full graph update.")
|
||||
fs.StringSliceVar(&o.Refresh, "refresh", nil, "List of tables to update.")
|
||||
fs.BoolVar(&o.FullRefreshAll, "full-refresh-all", false, "Perform a full graph reset and recompute.")
|
||||
fs.StringSliceVar(&o.FullRefresh, "full-refresh", nil, "List of tables to reset and recompute.")
|
||||
}
|
||||
|
||||
// Validate returns if the combination of options is valid.
|
||||
func (o *PipelineOptions) Validate(pipeline *resources.Pipeline) error {
|
||||
set := []string{}
|
||||
if o.RefreshAll {
|
||||
set = append(set, "--refresh-all")
|
||||
}
|
||||
if len(o.Refresh) > 0 {
|
||||
set = append(set, "--refresh")
|
||||
}
|
||||
if o.FullRefreshAll {
|
||||
set = append(set, "--full-refresh-all")
|
||||
}
|
||||
if len(o.FullRefresh) > 0 {
|
||||
set = append(set, "--full-refresh")
|
||||
}
|
||||
if len(set) > 1 {
|
||||
return fmt.Errorf("pipeline run arguments are mutually exclusive (got %s)", strings.Join(set, ", "))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *PipelineOptions) toPayload(pipeline *resources.Pipeline, pipelineID string) (*pipelines.StartUpdate, error) {
|
||||
if err := o.Validate(pipeline); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
payload := &pipelines.StartUpdate{
|
||||
PipelineId: pipelineID,
|
||||
|
||||
// Note: `RefreshAll` is implied if the fields below are not set.
|
||||
RefreshSelection: o.Refresh,
|
||||
FullRefresh: o.FullRefreshAll,
|
||||
FullRefreshSelection: o.FullRefresh,
|
||||
}
|
||||
return payload, nil
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
package run
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
flag "github.com/spf13/pflag"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func setupPipelineOptions(t *testing.T) (*flag.FlagSet, *PipelineOptions) {
|
||||
var fs flag.FlagSet
|
||||
var opts PipelineOptions
|
||||
opts.Define(&fs)
|
||||
return &fs, &opts
|
||||
}
|
||||
|
||||
func TestPipelineOptionsRefreshAll(t *testing.T) {
|
||||
fs, opts := setupPipelineOptions(t)
|
||||
err := fs.Parse([]string{`--refresh-all`})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, opts.RefreshAll)
|
||||
}
|
||||
|
||||
func TestPipelineOptionsRefresh(t *testing.T) {
|
||||
fs, opts := setupPipelineOptions(t)
|
||||
err := fs.Parse([]string{`--refresh=arg1,arg2,arg3`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.Refresh)
|
||||
}
|
||||
|
||||
func TestPipelineOptionsFullRefreshAll(t *testing.T) {
|
||||
fs, opts := setupPipelineOptions(t)
|
||||
err := fs.Parse([]string{`--full-refresh-all`})
|
||||
require.NoError(t, err)
|
||||
assert.True(t, opts.FullRefreshAll)
|
||||
}
|
||||
|
||||
func TestPipelineOptionsFullRefresh(t *testing.T) {
|
||||
fs, opts := setupPipelineOptions(t)
|
||||
err := fs.Parse([]string{`--full-refresh=arg1,arg2,arg3`})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []string{"arg1", "arg2", "arg3"}, opts.FullRefresh)
|
||||
}
|
||||
|
||||
func TestPipelineOptionsValidateSuccessWithSingleOption(t *testing.T) {
|
||||
args := []string{
|
||||
`--refresh-all`,
|
||||
`--refresh=arg1,arg2,arg3`,
|
||||
`--full-refresh-all`,
|
||||
`--full-refresh=arg1,arg2,arg3`,
|
||||
}
|
||||
for _, arg := range args {
|
||||
fs, opts := setupPipelineOptions(t)
|
||||
err := fs.Parse([]string{arg})
|
||||
require.NoError(t, err)
|
||||
err = opts.Validate(nil)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipelineOptionsValidateFailureWithMultipleOptions(t *testing.T) {
|
||||
args := []string{
|
||||
`--refresh-all`,
|
||||
`--refresh=arg1,arg2,arg3`,
|
||||
`--full-refresh-all`,
|
||||
`--full-refresh=arg1,arg2,arg3`,
|
||||
}
|
||||
for i := range args {
|
||||
for j := range args {
|
||||
if i == j {
|
||||
continue
|
||||
}
|
||||
fs, opts := setupPipelineOptions(t)
|
||||
err := fs.Parse([]string{args[i], args[j]})
|
||||
require.NoError(t, err)
|
||||
err = opts.Validate(nil)
|
||||
assert.ErrorContains(t, err, "pipeline run arguments are mutually exclusive")
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue