diff --git a/bundle/config/bundle.go b/bundle/config/bundle.go index ba173f10..cf386477 100644 --- a/bundle/config/bundle.go +++ b/bundle/config/bundle.go @@ -28,4 +28,13 @@ type Bundle struct { // Contains Git information like current commit, current branch and // origin url. Automatically loaded by reading .git directory if not specified Git Git `json:"git,omitempty"` + + // Determines the mode of the environment. + // For example, 'mode: development' can be used for deployments for + // development purposes. + // Annotated readonly as this should be set at the environment level. + Mode Mode `json:"mode,omitempty" bundle:"readonly"` + + // Overrides the compute used for jobs and other supported assets. + ComputeID string `json:"compute_id,omitempty"` } diff --git a/bundle/config/environment.go b/bundle/config/environment.go index 02c6e08c..06a8d890 100644 --- a/bundle/config/environment.go +++ b/bundle/config/environment.go @@ -1,5 +1,7 @@ package config +type Mode string + // Environment defines overrides for a single environment. // This structure is recursively merged into the root configuration. type Environment struct { @@ -7,6 +9,14 @@ type Environment struct { // by the user (through environment variable or command line argument). Default bool `json:"default,omitempty"` + // Determines the mode of the environment. + // For example, 'mode: development' can be used for deployments for + // development purposes. + Mode Mode `json:"mode,omitempty"` + + // Overrides the compute used for jobs and other supported assets. + ComputeID string `json:"compute_id,omitempty"` + Bundle *Bundle `json:"bundle,omitempty"` Workspace *Workspace `json:"workspace,omitempty"` @@ -20,3 +30,9 @@ type Environment struct { // in the scope of an environment Variables map[string]string `json:"variables,omitempty"` } + +const ( + // Right now, we just have a default / "" mode and a "development" mode. + // Additional modes are expected to come for pull-requests and production. + Development Mode = "development" +) diff --git a/bundle/config/mutator/override_compute.go b/bundle/config/mutator/override_compute.go new file mode 100644 index 00000000..ba3fd994 --- /dev/null +++ b/bundle/config/mutator/override_compute.go @@ -0,0 +1,56 @@ +package mutator + +import ( + "context" + "fmt" + "os" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" +) + +type overrideCompute struct{} + +func OverrideCompute() bundle.Mutator { + return &overrideCompute{} +} + +func (m *overrideCompute) Name() string { + return "OverrideCompute" +} + +func overrideJobCompute(j *resources.Job, compute string) { + for i := range j.Tasks { + task := &j.Tasks[i] + if task.NewCluster != nil { + task.NewCluster = nil + task.ExistingClusterId = compute + } else if task.ExistingClusterId != "" { + task.ExistingClusterId = compute + } + } +} + +func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) error { + if b.Config.Bundle.Mode != config.Development { + if b.Config.Bundle.ComputeID != "" { + return fmt.Errorf("cannot override compute for an environment that does not use 'mode: development'") + } + return nil + } + if os.Getenv("DATABRICKS_CLUSTER_ID") != "" { + b.Config.Bundle.ComputeID = os.Getenv("DATABRICKS_CLUSTER_ID") + } + + if b.Config.Bundle.ComputeID == "" { + return nil + } + + r := b.Config.Resources + for i := range r.Jobs { + overrideJobCompute(r.Jobs[i], b.Config.Bundle.ComputeID) + } + + return nil +} diff --git a/bundle/config/mutator/override_compute_test.go b/bundle/config/mutator/override_compute_test.go new file mode 100644 index 00000000..9eb99edb --- /dev/null +++ b/bundle/config/mutator/override_compute_test.go @@ -0,0 +1,134 @@ +package mutator_test + +import ( + "context" + "os" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOverrideDevelopment(t *testing.T) { + os.Setenv("DATABRICKS_CLUSTER_ID", "") + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + Mode: config.Development, + ComputeID: "newClusterID", + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{ + Name: "job1", + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{}, + }, + { + ExistingClusterId: "cluster2", + }, + }, + }}, + }, + }, + }, + } + + m := mutator.OverrideCompute() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Nil(t, bundle.Config.Resources.Jobs["job1"].Tasks[0].NewCluster) + assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[0].ExistingClusterId) + assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[1].ExistingClusterId) +} + +func TestOverrideDevelopmentEnv(t *testing.T) { + os.Setenv("DATABRICKS_CLUSTER_ID", "newClusterId") + bundle := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{ + Name: "job1", + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{}, + }, + { + ExistingClusterId: "cluster2", + }, + }, + }}, + }, + }, + }, + } + + m := mutator.OverrideCompute() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Equal(t, "cluster2", bundle.Config.Resources.Jobs["job1"].Tasks[1].ExistingClusterId) +} + +func TestOverrideProduction(t *testing.T) { + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + ComputeID: "newClusterID", + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{ + Name: "job1", + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{}, + }, + { + ExistingClusterId: "cluster2", + }, + }, + }}, + }, + }, + }, + } + + m := mutator.OverrideCompute() + err := m.Apply(context.Background(), bundle) + require.Error(t, err) +} + +func TestOverrideProductionEnv(t *testing.T) { + os.Setenv("DATABRICKS_CLUSTER_ID", "newClusterId") + bundle := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{ + Name: "job1", + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{}, + }, + { + ExistingClusterId: "cluster2", + }, + }, + }}, + }, + }, + }, + } + + m := mutator.OverrideCompute() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) +} diff --git a/bundle/config/mutator/process_environment_mode.go b/bundle/config/mutator/process_environment_mode.go new file mode 100644 index 00000000..3e1b7e81 --- /dev/null +++ b/bundle/config/mutator/process_environment_mode.go @@ -0,0 +1,89 @@ +package mutator + +import ( + "context" + "fmt" + "path" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/ml" +) + +type processEnvironmentMode struct{} + +const developmentConcurrentRuns = 4 + +func ProcessEnvironmentMode() bundle.Mutator { + return &processEnvironmentMode{} +} + +func (m *processEnvironmentMode) Name() string { + return "ProcessEnvironmentMode" +} + +// Mark all resources as being for 'development' purposes, i.e. +// changing their their name, adding tags, and (in the future) +// marking them as 'hidden' in the UI. +func processDevelopmentMode(b *bundle.Bundle) error { + r := b.Config.Resources + + for i := range r.Jobs { + r.Jobs[i].Name = "[dev] " + r.Jobs[i].Name + if r.Jobs[i].Tags == nil { + r.Jobs[i].Tags = make(map[string]string) + } + r.Jobs[i].Tags["dev"] = "" + if r.Jobs[i].MaxConcurrentRuns == 0 { + r.Jobs[i].MaxConcurrentRuns = developmentConcurrentRuns + } + if r.Jobs[i].Schedule != nil { + r.Jobs[i].Schedule.PauseStatus = jobs.PauseStatusPaused + } + if r.Jobs[i].Continuous != nil { + r.Jobs[i].Continuous.PauseStatus = jobs.PauseStatusPaused + } + if r.Jobs[i].Trigger != nil { + r.Jobs[i].Trigger.PauseStatus = jobs.PauseStatusPaused + } + } + + for i := range r.Pipelines { + r.Pipelines[i].Name = "[dev] " + r.Pipelines[i].Name + r.Pipelines[i].Development = true + // (pipelines don't yet support tags) + } + + for i := range r.Models { + r.Models[i].Name = "[dev] " + r.Models[i].Name + r.Models[i].Tags = append(r.Models[i].Tags, ml.ModelTag{Key: "dev", Value: ""}) + } + + for i := range r.Experiments { + filepath := r.Experiments[i].Name + dir := path.Dir(filepath) + base := path.Base(filepath) + if dir == "." { + r.Experiments[i].Name = "[dev] " + base + } else { + r.Experiments[i].Name = dir + "/[dev] " + base + } + r.Experiments[i].Tags = append(r.Experiments[i].Tags, ml.ExperimentTag{Key: "dev", Value: ""}) + } + + return nil +} + +func (m *processEnvironmentMode) Apply(ctx context.Context, b *bundle.Bundle) error { + switch b.Config.Bundle.Mode { + case config.Development: + return processDevelopmentMode(b) + case "": + // No action + default: + return fmt.Errorf("unsupported value specified for 'mode': %s", b.Config.Bundle.Mode) + } + + return nil +} diff --git a/bundle/config/mutator/process_environment_mode_test.go b/bundle/config/mutator/process_environment_mode_test.go new file mode 100644 index 00000000..5342de21 --- /dev/null +++ b/bundle/config/mutator/process_environment_mode_test.go @@ -0,0 +1,77 @@ +package mutator_test + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/ml" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcessEnvironmentModeApplyDebug(t *testing.T) { + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + Mode: config.Development, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{Name: "job1"}}, + }, + Pipelines: map[string]*resources.Pipeline{ + "pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}}, + }, + Experiments: map[string]*resources.MlflowExperiment{ + "experiment1": {Experiment: &ml.Experiment{Name: "/Users/lennart.kats@databricks.com/experiment1"}}, + "experiment2": {Experiment: &ml.Experiment{Name: "experiment2"}}, + }, + Models: map[string]*resources.MlflowModel{ + "model1": {Model: &ml.Model{Name: "model1"}}, + }, + }, + }, + } + + m := mutator.ProcessEnvironmentMode() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Equal(t, "[dev] job1", bundle.Config.Resources.Jobs["job1"].Name) + assert.Equal(t, "[dev] pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name) + assert.Equal(t, "/Users/lennart.kats@databricks.com/[dev] experiment1", bundle.Config.Resources.Experiments["experiment1"].Name) + assert.Equal(t, "[dev] experiment2", bundle.Config.Resources.Experiments["experiment2"].Name) + assert.Equal(t, "[dev] model1", bundle.Config.Resources.Models["model1"].Name) + assert.Equal(t, "dev", bundle.Config.Resources.Experiments["experiment1"].Experiment.Tags[0].Key) + assert.True(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) +} + +func TestProcessEnvironmentModeApplyDefault(t *testing.T) { + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + Mode: "", + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{Name: "job1"}}, + }, + Pipelines: map[string]*resources.Pipeline{ + "pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}}, + }, + }, + }, + } + + m := mutator.ProcessEnvironmentMode() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Equal(t, "job1", bundle.Config.Resources.Jobs["job1"].Name) + assert.Equal(t, "pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name) + assert.False(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) +} diff --git a/bundle/config/root.go b/bundle/config/root.go index 189b1a7f..5ee337d3 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -190,5 +190,13 @@ func (r *Root) MergeEnvironment(env *Environment) error { } } + if env.Mode != "" { + r.Bundle.Mode = env.Mode + } + + if env.ComputeID != "" { + r.Bundle.ComputeID = env.ComputeID + } + return nil } diff --git a/bundle/config/root_test.go b/bundle/config/root_test.go index b53f1ab7..818e89a2 100644 --- a/bundle/config/root_test.go +++ b/bundle/config/root_test.go @@ -154,3 +154,12 @@ func TestInitializeVariablesUndefinedVariables(t *testing.T) { err := root.InitializeVariables([]string{"bar=567"}) assert.ErrorContains(t, err, "variable bar has not been defined") } + +func TestRootMergeEnvironmentWithMode(t *testing.T) { + root := &Root{ + Bundle: Bundle{}, + } + env := &Environment{Mode: Development} + require.NoError(t, root.MergeEnvironment(env)) + assert.Equal(t, Development, root.Bundle.Mode) +} diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index 36d25154..fc5056f6 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -25,6 +25,8 @@ func Initialize() bundle.Mutator { interpolation.IncludeLookupsInPath("workspace"), interpolation.IncludeLookupsInPath(variable.VariableReferencePrefix), ), + mutator.OverrideCompute(), + mutator.ProcessEnvironmentMode(), mutator.TranslatePaths(), terraform.Initialize(), }, diff --git a/bundle/run/job.go b/bundle/run/job.go index b5ada946..f152a17d 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -243,6 +243,15 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e if err != nil { return nil, fmt.Errorf("cannot start job") } + + if opts.NoWait { + details, err := w.Jobs.GetRun(ctx, jobs.GetRunRequest{ + RunId: waiter.RunId, + }) + progressLogger.Log(progress.NewJobRunUrlEvent(details.RunPageUrl)) + return nil, err + } + run, err := waiter.OnProgress(func(r *jobs.Run) { pullRunId(r) logDebug(r) diff --git a/bundle/run/options.go b/bundle/run/options.go index cc9dd413..3194fb32 100644 --- a/bundle/run/options.go +++ b/bundle/run/options.go @@ -7,6 +7,7 @@ import ( type Options struct { Job JobOptions Pipeline PipelineOptions + NoWait bool } func (o *Options) Define(fs *flag.FlagSet) { diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index 621da071..7b82c3ea 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -170,6 +170,10 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp // Log the pipeline update URL as soon as it is available. progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID)) + if opts.NoWait { + return nil, nil + } + // Poll update for completion and post status. // Note: there is no "StartUpdateAndWait" wrapper for this API. var prevState *pipelines.UpdateInfoState diff --git a/bundle/tests/job_and_pipeline/bundle.yml b/bundle/tests/job_and_pipeline/bundle.yml index f4a5719a..d6942e8a 100644 --- a/bundle/tests/job_and_pipeline/bundle.yml +++ b/bundle/tests/job_and_pipeline/bundle.yml @@ -8,6 +8,7 @@ resources: environments: development: + mode: development resources: pipelines: nyc_taxi_pipeline: diff --git a/bundle/tests/job_and_pipeline_test.go b/bundle/tests/job_and_pipeline_test.go index 8fc032a5..775f415c 100644 --- a/bundle/tests/job_and_pipeline_test.go +++ b/bundle/tests/job_and_pipeline_test.go @@ -4,6 +4,7 @@ import ( "path/filepath" "testing" + "github.com/databricks/cli/bundle/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -15,6 +16,7 @@ func TestJobAndPipelineDevelopment(t *testing.T) { p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"] assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(p.ConfigFilePath)) + assert.Equal(t, b.Config.Bundle.Mode, config.Development) assert.True(t, p.Development) require.Len(t, p.Libraries, 1) assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path) diff --git a/cmd/bundle/deploy.go b/cmd/bundle/deploy.go index 7dee32da..e8c0d395 100644 --- a/cmd/bundle/deploy.go +++ b/cmd/bundle/deploy.go @@ -16,6 +16,7 @@ var deployCmd = &cobra.Command{ // If `--force` is specified, force acquisition of the deployment lock. b.Config.Bundle.Lock.Force = forceDeploy + b.Config.Bundle.ComputeID = computeID return bundle.Apply(cmd.Context(), b, bundle.Seq( phases.Initialize(), @@ -26,8 +27,10 @@ var deployCmd = &cobra.Command{ } var forceDeploy bool +var computeID string func init() { AddCommand(deployCmd) deployCmd.Flags().BoolVar(&forceDeploy, "force", false, "Force acquisition of deployment lock.") + deployCmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.") } diff --git a/cmd/bundle/run.go b/cmd/bundle/run.go index 1eb7aa4b..9ca8fe45 100644 --- a/cmd/bundle/run.go +++ b/cmd/bundle/run.go @@ -14,6 +14,7 @@ import ( ) var runOptions run.Options +var noWait bool var runCmd = &cobra.Command{ Use: "run [flags] KEY", @@ -23,6 +24,7 @@ var runCmd = &cobra.Command{ PreRunE: ConfigureBundleWithVariables, RunE: func(cmd *cobra.Command, args []string) error { b := bundle.Get(cmd.Context()) + err := bundle.Apply(cmd.Context(), b, bundle.Seq( phases.Initialize(), terraform.Interpolate(), @@ -39,6 +41,7 @@ var runCmd = &cobra.Command{ return err } + runOptions.NoWait = noWait output, err := runner.Run(cmd.Context(), &runOptions) if err != nil { return err @@ -89,4 +92,5 @@ var runCmd = &cobra.Command{ func init() { runOptions.Define(runCmd.Flags()) rootCmd.AddCommand(runCmd) + runCmd.Flags().BoolVar(&noWait, "no-wait", false, "Don't wait for the run to complete.") }