From 57e75d3e22f9deb0c0e05c3794d7a12f8220f5bc Mon Sep 17 00:00:00 2001 From: "Lennart Kats (databricks)" Date: Wed, 12 Jul 2023 08:51:54 +0200 Subject: [PATCH] Add development runs (#522) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This implements the "development run" functionality that we desire for DABs in the workspace / IDE. ## bundle.yml changes In bundle.yml, there should be a "dev" environment that is marked as `mode: debug`: ``` environments: dev: default: true mode: development # future accepted values might include pull_request, production ``` Setting `mode` to `development` indicates that this environment is used just for running things for development. This results in several changes to deployed assets: * All assets will get '[dev]' in their name and will get a 'dev' tag * All assets will be hidden from the list of assets (future work; e.g. for jobs we would have a special job_type that hides it from the list) * All deployed assets will be ephemeral (future work, we need some form of garbage collection) * Pipelines will be marked as 'development: true' * Jobs can run on development compute through the `--compute` parameter in the CLI * Jobs get their schedule / triggers paused * Jobs get concurrent runs (it's really annoying if your runs get skipped because the last run was still in progress) Other accepted values for `mode` are `default` (which does nothing) and `pull-request` (which is reserved for future use). ## CLI changes To run a single job called "shark_sighting" on existing compute, use the following commands: ``` $ databricks bundle deploy --compute 0617-201942-9yd9g8ix $ databricks bundle run shark_sighting ``` which would deploy and run a job called "[dev] shark_sightings" on the compute provided. Note that `--compute` is not accepted in production environments, so we show an error if `mode: development` is not used. The `run --deploy` command offers a convenient shorthand for the common combination of deploying & running: ``` $ export DATABRICKS_COMPUTE=0617-201942-9yd9g8ix $ bundle run --deploy shark_sightings ``` The `--deploy` addition isn't really essential and I welcome feedback 🤔 I played with the idea of a "debug" or "dev" command but that seemed to only make the option space even broader for users. The above could work well with an IDE or workspace that automatically sets the target compute. One more thing I added is`run --no-wait` can now be used to run something without waiting for it to be completed (useful for IDE-like environments that can display progress themselves). ``` $ bundle run --deploy shark_sightings --no-wait ``` --- bundle/config/bundle.go | 9 ++ bundle/config/environment.go | 16 +++ bundle/config/mutator/override_compute.go | 56 ++++++++ .../config/mutator/override_compute_test.go | 134 ++++++++++++++++++ .../mutator/process_environment_mode.go | 89 ++++++++++++ .../mutator/process_environment_mode_test.go | 77 ++++++++++ bundle/config/root.go | 8 ++ bundle/config/root_test.go | 9 ++ bundle/phases/initialize.go | 2 + bundle/run/job.go | 9 ++ bundle/run/options.go | 1 + bundle/run/pipeline.go | 4 + bundle/tests/job_and_pipeline/bundle.yml | 1 + bundle/tests/job_and_pipeline_test.go | 2 + cmd/bundle/deploy.go | 3 + cmd/bundle/run.go | 4 + 16 files changed, 424 insertions(+) create mode 100644 bundle/config/mutator/override_compute.go create mode 100644 bundle/config/mutator/override_compute_test.go create mode 100644 bundle/config/mutator/process_environment_mode.go create mode 100644 bundle/config/mutator/process_environment_mode_test.go diff --git a/bundle/config/bundle.go b/bundle/config/bundle.go index ba173f10..cf386477 100644 --- a/bundle/config/bundle.go +++ b/bundle/config/bundle.go @@ -28,4 +28,13 @@ type Bundle struct { // Contains Git information like current commit, current branch and // origin url. Automatically loaded by reading .git directory if not specified Git Git `json:"git,omitempty"` + + // Determines the mode of the environment. + // For example, 'mode: development' can be used for deployments for + // development purposes. + // Annotated readonly as this should be set at the environment level. + Mode Mode `json:"mode,omitempty" bundle:"readonly"` + + // Overrides the compute used for jobs and other supported assets. + ComputeID string `json:"compute_id,omitempty"` } diff --git a/bundle/config/environment.go b/bundle/config/environment.go index 02c6e08c..06a8d890 100644 --- a/bundle/config/environment.go +++ b/bundle/config/environment.go @@ -1,5 +1,7 @@ package config +type Mode string + // Environment defines overrides for a single environment. // This structure is recursively merged into the root configuration. type Environment struct { @@ -7,6 +9,14 @@ type Environment struct { // by the user (through environment variable or command line argument). Default bool `json:"default,omitempty"` + // Determines the mode of the environment. + // For example, 'mode: development' can be used for deployments for + // development purposes. + Mode Mode `json:"mode,omitempty"` + + // Overrides the compute used for jobs and other supported assets. + ComputeID string `json:"compute_id,omitempty"` + Bundle *Bundle `json:"bundle,omitempty"` Workspace *Workspace `json:"workspace,omitempty"` @@ -20,3 +30,9 @@ type Environment struct { // in the scope of an environment Variables map[string]string `json:"variables,omitempty"` } + +const ( + // Right now, we just have a default / "" mode and a "development" mode. + // Additional modes are expected to come for pull-requests and production. + Development Mode = "development" +) diff --git a/bundle/config/mutator/override_compute.go b/bundle/config/mutator/override_compute.go new file mode 100644 index 00000000..ba3fd994 --- /dev/null +++ b/bundle/config/mutator/override_compute.go @@ -0,0 +1,56 @@ +package mutator + +import ( + "context" + "fmt" + "os" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" +) + +type overrideCompute struct{} + +func OverrideCompute() bundle.Mutator { + return &overrideCompute{} +} + +func (m *overrideCompute) Name() string { + return "OverrideCompute" +} + +func overrideJobCompute(j *resources.Job, compute string) { + for i := range j.Tasks { + task := &j.Tasks[i] + if task.NewCluster != nil { + task.NewCluster = nil + task.ExistingClusterId = compute + } else if task.ExistingClusterId != "" { + task.ExistingClusterId = compute + } + } +} + +func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) error { + if b.Config.Bundle.Mode != config.Development { + if b.Config.Bundle.ComputeID != "" { + return fmt.Errorf("cannot override compute for an environment that does not use 'mode: development'") + } + return nil + } + if os.Getenv("DATABRICKS_CLUSTER_ID") != "" { + b.Config.Bundle.ComputeID = os.Getenv("DATABRICKS_CLUSTER_ID") + } + + if b.Config.Bundle.ComputeID == "" { + return nil + } + + r := b.Config.Resources + for i := range r.Jobs { + overrideJobCompute(r.Jobs[i], b.Config.Bundle.ComputeID) + } + + return nil +} diff --git a/bundle/config/mutator/override_compute_test.go b/bundle/config/mutator/override_compute_test.go new file mode 100644 index 00000000..9eb99edb --- /dev/null +++ b/bundle/config/mutator/override_compute_test.go @@ -0,0 +1,134 @@ +package mutator_test + +import ( + "context" + "os" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOverrideDevelopment(t *testing.T) { + os.Setenv("DATABRICKS_CLUSTER_ID", "") + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + Mode: config.Development, + ComputeID: "newClusterID", + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{ + Name: "job1", + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{}, + }, + { + ExistingClusterId: "cluster2", + }, + }, + }}, + }, + }, + }, + } + + m := mutator.OverrideCompute() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Nil(t, bundle.Config.Resources.Jobs["job1"].Tasks[0].NewCluster) + assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[0].ExistingClusterId) + assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[1].ExistingClusterId) +} + +func TestOverrideDevelopmentEnv(t *testing.T) { + os.Setenv("DATABRICKS_CLUSTER_ID", "newClusterId") + bundle := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{ + Name: "job1", + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{}, + }, + { + ExistingClusterId: "cluster2", + }, + }, + }}, + }, + }, + }, + } + + m := mutator.OverrideCompute() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Equal(t, "cluster2", bundle.Config.Resources.Jobs["job1"].Tasks[1].ExistingClusterId) +} + +func TestOverrideProduction(t *testing.T) { + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + ComputeID: "newClusterID", + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{ + Name: "job1", + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{}, + }, + { + ExistingClusterId: "cluster2", + }, + }, + }}, + }, + }, + }, + } + + m := mutator.OverrideCompute() + err := m.Apply(context.Background(), bundle) + require.Error(t, err) +} + +func TestOverrideProductionEnv(t *testing.T) { + os.Setenv("DATABRICKS_CLUSTER_ID", "newClusterId") + bundle := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{ + Name: "job1", + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{}, + }, + { + ExistingClusterId: "cluster2", + }, + }, + }}, + }, + }, + }, + } + + m := mutator.OverrideCompute() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) +} diff --git a/bundle/config/mutator/process_environment_mode.go b/bundle/config/mutator/process_environment_mode.go new file mode 100644 index 00000000..3e1b7e81 --- /dev/null +++ b/bundle/config/mutator/process_environment_mode.go @@ -0,0 +1,89 @@ +package mutator + +import ( + "context" + "fmt" + "path" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/ml" +) + +type processEnvironmentMode struct{} + +const developmentConcurrentRuns = 4 + +func ProcessEnvironmentMode() bundle.Mutator { + return &processEnvironmentMode{} +} + +func (m *processEnvironmentMode) Name() string { + return "ProcessEnvironmentMode" +} + +// Mark all resources as being for 'development' purposes, i.e. +// changing their their name, adding tags, and (in the future) +// marking them as 'hidden' in the UI. +func processDevelopmentMode(b *bundle.Bundle) error { + r := b.Config.Resources + + for i := range r.Jobs { + r.Jobs[i].Name = "[dev] " + r.Jobs[i].Name + if r.Jobs[i].Tags == nil { + r.Jobs[i].Tags = make(map[string]string) + } + r.Jobs[i].Tags["dev"] = "" + if r.Jobs[i].MaxConcurrentRuns == 0 { + r.Jobs[i].MaxConcurrentRuns = developmentConcurrentRuns + } + if r.Jobs[i].Schedule != nil { + r.Jobs[i].Schedule.PauseStatus = jobs.PauseStatusPaused + } + if r.Jobs[i].Continuous != nil { + r.Jobs[i].Continuous.PauseStatus = jobs.PauseStatusPaused + } + if r.Jobs[i].Trigger != nil { + r.Jobs[i].Trigger.PauseStatus = jobs.PauseStatusPaused + } + } + + for i := range r.Pipelines { + r.Pipelines[i].Name = "[dev] " + r.Pipelines[i].Name + r.Pipelines[i].Development = true + // (pipelines don't yet support tags) + } + + for i := range r.Models { + r.Models[i].Name = "[dev] " + r.Models[i].Name + r.Models[i].Tags = append(r.Models[i].Tags, ml.ModelTag{Key: "dev", Value: ""}) + } + + for i := range r.Experiments { + filepath := r.Experiments[i].Name + dir := path.Dir(filepath) + base := path.Base(filepath) + if dir == "." { + r.Experiments[i].Name = "[dev] " + base + } else { + r.Experiments[i].Name = dir + "/[dev] " + base + } + r.Experiments[i].Tags = append(r.Experiments[i].Tags, ml.ExperimentTag{Key: "dev", Value: ""}) + } + + return nil +} + +func (m *processEnvironmentMode) Apply(ctx context.Context, b *bundle.Bundle) error { + switch b.Config.Bundle.Mode { + case config.Development: + return processDevelopmentMode(b) + case "": + // No action + default: + return fmt.Errorf("unsupported value specified for 'mode': %s", b.Config.Bundle.Mode) + } + + return nil +} diff --git a/bundle/config/mutator/process_environment_mode_test.go b/bundle/config/mutator/process_environment_mode_test.go new file mode 100644 index 00000000..5342de21 --- /dev/null +++ b/bundle/config/mutator/process_environment_mode_test.go @@ -0,0 +1,77 @@ +package mutator_test + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/ml" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcessEnvironmentModeApplyDebug(t *testing.T) { + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + Mode: config.Development, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{Name: "job1"}}, + }, + Pipelines: map[string]*resources.Pipeline{ + "pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}}, + }, + Experiments: map[string]*resources.MlflowExperiment{ + "experiment1": {Experiment: &ml.Experiment{Name: "/Users/lennart.kats@databricks.com/experiment1"}}, + "experiment2": {Experiment: &ml.Experiment{Name: "experiment2"}}, + }, + Models: map[string]*resources.MlflowModel{ + "model1": {Model: &ml.Model{Name: "model1"}}, + }, + }, + }, + } + + m := mutator.ProcessEnvironmentMode() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Equal(t, "[dev] job1", bundle.Config.Resources.Jobs["job1"].Name) + assert.Equal(t, "[dev] pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name) + assert.Equal(t, "/Users/lennart.kats@databricks.com/[dev] experiment1", bundle.Config.Resources.Experiments["experiment1"].Name) + assert.Equal(t, "[dev] experiment2", bundle.Config.Resources.Experiments["experiment2"].Name) + assert.Equal(t, "[dev] model1", bundle.Config.Resources.Models["model1"].Name) + assert.Equal(t, "dev", bundle.Config.Resources.Experiments["experiment1"].Experiment.Tags[0].Key) + assert.True(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) +} + +func TestProcessEnvironmentModeApplyDefault(t *testing.T) { + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + Mode: "", + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{Name: "job1"}}, + }, + Pipelines: map[string]*resources.Pipeline{ + "pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}}, + }, + }, + }, + } + + m := mutator.ProcessEnvironmentMode() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Equal(t, "job1", bundle.Config.Resources.Jobs["job1"].Name) + assert.Equal(t, "pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name) + assert.False(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) +} diff --git a/bundle/config/root.go b/bundle/config/root.go index 189b1a7f..5ee337d3 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -190,5 +190,13 @@ func (r *Root) MergeEnvironment(env *Environment) error { } } + if env.Mode != "" { + r.Bundle.Mode = env.Mode + } + + if env.ComputeID != "" { + r.Bundle.ComputeID = env.ComputeID + } + return nil } diff --git a/bundle/config/root_test.go b/bundle/config/root_test.go index b53f1ab7..818e89a2 100644 --- a/bundle/config/root_test.go +++ b/bundle/config/root_test.go @@ -154,3 +154,12 @@ func TestInitializeVariablesUndefinedVariables(t *testing.T) { err := root.InitializeVariables([]string{"bar=567"}) assert.ErrorContains(t, err, "variable bar has not been defined") } + +func TestRootMergeEnvironmentWithMode(t *testing.T) { + root := &Root{ + Bundle: Bundle{}, + } + env := &Environment{Mode: Development} + require.NoError(t, root.MergeEnvironment(env)) + assert.Equal(t, Development, root.Bundle.Mode) +} diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index 36d25154..fc5056f6 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -25,6 +25,8 @@ func Initialize() bundle.Mutator { interpolation.IncludeLookupsInPath("workspace"), interpolation.IncludeLookupsInPath(variable.VariableReferencePrefix), ), + mutator.OverrideCompute(), + mutator.ProcessEnvironmentMode(), mutator.TranslatePaths(), terraform.Initialize(), }, diff --git a/bundle/run/job.go b/bundle/run/job.go index b5ada946..f152a17d 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -243,6 +243,15 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e if err != nil { return nil, fmt.Errorf("cannot start job") } + + if opts.NoWait { + details, err := w.Jobs.GetRun(ctx, jobs.GetRunRequest{ + RunId: waiter.RunId, + }) + progressLogger.Log(progress.NewJobRunUrlEvent(details.RunPageUrl)) + return nil, err + } + run, err := waiter.OnProgress(func(r *jobs.Run) { pullRunId(r) logDebug(r) diff --git a/bundle/run/options.go b/bundle/run/options.go index cc9dd413..3194fb32 100644 --- a/bundle/run/options.go +++ b/bundle/run/options.go @@ -7,6 +7,7 @@ import ( type Options struct { Job JobOptions Pipeline PipelineOptions + NoWait bool } func (o *Options) Define(fs *flag.FlagSet) { diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index 621da071..7b82c3ea 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -170,6 +170,10 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp // Log the pipeline update URL as soon as it is available. progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID)) + if opts.NoWait { + return nil, nil + } + // Poll update for completion and post status. // Note: there is no "StartUpdateAndWait" wrapper for this API. var prevState *pipelines.UpdateInfoState diff --git a/bundle/tests/job_and_pipeline/bundle.yml b/bundle/tests/job_and_pipeline/bundle.yml index f4a5719a..d6942e8a 100644 --- a/bundle/tests/job_and_pipeline/bundle.yml +++ b/bundle/tests/job_and_pipeline/bundle.yml @@ -8,6 +8,7 @@ resources: environments: development: + mode: development resources: pipelines: nyc_taxi_pipeline: diff --git a/bundle/tests/job_and_pipeline_test.go b/bundle/tests/job_and_pipeline_test.go index 8fc032a5..775f415c 100644 --- a/bundle/tests/job_and_pipeline_test.go +++ b/bundle/tests/job_and_pipeline_test.go @@ -4,6 +4,7 @@ import ( "path/filepath" "testing" + "github.com/databricks/cli/bundle/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -15,6 +16,7 @@ func TestJobAndPipelineDevelopment(t *testing.T) { p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"] assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(p.ConfigFilePath)) + assert.Equal(t, b.Config.Bundle.Mode, config.Development) assert.True(t, p.Development) require.Len(t, p.Libraries, 1) assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path) diff --git a/cmd/bundle/deploy.go b/cmd/bundle/deploy.go index 7dee32da..e8c0d395 100644 --- a/cmd/bundle/deploy.go +++ b/cmd/bundle/deploy.go @@ -16,6 +16,7 @@ var deployCmd = &cobra.Command{ // If `--force` is specified, force acquisition of the deployment lock. b.Config.Bundle.Lock.Force = forceDeploy + b.Config.Bundle.ComputeID = computeID return bundle.Apply(cmd.Context(), b, bundle.Seq( phases.Initialize(), @@ -26,8 +27,10 @@ var deployCmd = &cobra.Command{ } var forceDeploy bool +var computeID string func init() { AddCommand(deployCmd) deployCmd.Flags().BoolVar(&forceDeploy, "force", false, "Force acquisition of deployment lock.") + deployCmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.") } diff --git a/cmd/bundle/run.go b/cmd/bundle/run.go index 1eb7aa4b..9ca8fe45 100644 --- a/cmd/bundle/run.go +++ b/cmd/bundle/run.go @@ -14,6 +14,7 @@ import ( ) var runOptions run.Options +var noWait bool var runCmd = &cobra.Command{ Use: "run [flags] KEY", @@ -23,6 +24,7 @@ var runCmd = &cobra.Command{ PreRunE: ConfigureBundleWithVariables, RunE: func(cmd *cobra.Command, args []string) error { b := bundle.Get(cmd.Context()) + err := bundle.Apply(cmd.Context(), b, bundle.Seq( phases.Initialize(), terraform.Interpolate(), @@ -39,6 +41,7 @@ var runCmd = &cobra.Command{ return err } + runOptions.NoWait = noWait output, err := runner.Run(cmd.Context(), &runOptions) if err != nil { return err @@ -89,4 +92,5 @@ var runCmd = &cobra.Command{ func init() { runOptions.Define(runCmd.Flags()) rootCmd.AddCommand(runCmd) + runCmd.Flags().BoolVar(&noWait, "no-wait", false, "Don't wait for the run to complete.") }