From 48d6df3dfaac0f8ad30b4b3d4800883206f4e52a Mon Sep 17 00:00:00 2001 From: Lennart Kats Date: Sun, 18 Jun 2023 16:47:01 +0200 Subject: [PATCH] Add support for "debug runs" - Add "mode: debug" property for environments - Add "--deploy", "--compute", "--no-wait" CLI flags --- bundle/config/bundle.go | 2 + bundle/config/environment.go | 10 +++ bundle/config/mutator/override_compute.go | 50 +++++++++++++ .../config/mutator/override_compute_test.go | 47 ++++++++++++ .../config/mutator/populate_current_user.go | 4 + .../mutator/process_environment_mode.go | 68 +++++++++++++++++ .../mutator/process_environment_mode_test.go | 75 +++++++++++++++++++ bundle/config/root.go | 4 + bundle/config/root_test.go | 9 +++ bundle/phases/initialize.go | 4 +- bundle/run/job.go | 12 +++ bundle/run/options.go | 1 + bundle/run/pipeline.go | 4 + bundle/tests/job_and_pipeline/bundle.yml | 1 + bundle/tests/job_and_pipeline_test.go | 2 + cmd/bundle/deploy.go | 4 +- cmd/bundle/destroy.go | 2 +- cmd/bundle/run.go | 26 ++++++- cmd/bundle/sync.go | 2 +- cmd/bundle/validate.go | 2 +- 20 files changed, 323 insertions(+), 6 deletions(-) create mode 100644 bundle/config/mutator/override_compute.go create mode 100644 bundle/config/mutator/override_compute_test.go create mode 100644 bundle/config/mutator/process_environment_mode.go create mode 100644 bundle/config/mutator/process_environment_mode_test.go diff --git a/bundle/config/bundle.go b/bundle/config/bundle.go index ba173f10..ea9eb188 100644 --- a/bundle/config/bundle.go +++ b/bundle/config/bundle.go @@ -28,4 +28,6 @@ type Bundle struct { // Contains Git information like current commit, current branch and // origin url. Automatically loaded by reading .git directory if not specified Git Git `json:"git,omitempty"` + + Mode Mode `json:"mode,omitempty"` } diff --git a/bundle/config/environment.go b/bundle/config/environment.go index 02c6e08c..5288676e 100644 --- a/bundle/config/environment.go +++ b/bundle/config/environment.go @@ -1,5 +1,13 @@ package config +type Mode string + +const ( + Debug Mode = "debug" + Default Mode = "default" + PullRequest Mode = "pull-request" +) + // Environment defines overrides for a single environment. // This structure is recursively merged into the root configuration. type Environment struct { @@ -7,6 +15,8 @@ type Environment struct { // by the user (through environment variable or command line argument). Default bool `json:"default,omitempty"` + Mode Mode `json:"mode,omitempty"` + Bundle *Bundle `json:"bundle,omitempty"` Workspace *Workspace `json:"workspace,omitempty"` diff --git a/bundle/config/mutator/override_compute.go b/bundle/config/mutator/override_compute.go new file mode 100644 index 00000000..0e1aaa21 --- /dev/null +++ b/bundle/config/mutator/override_compute.go @@ -0,0 +1,50 @@ +package mutator + +import ( + "context" + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" +) + +type overrideCompute struct { + compute string +} + +func OverrideCompute(compute string) bundle.Mutator { + return &overrideCompute{compute: compute} +} + +func (m *overrideCompute) Name() string { + return "OverrideCompute" +} + +func (m *overrideCompute) overrideJobCompute(j *resources.Job) { + for i := range j.Tasks { + task := &j.Tasks[i] + if task.NewCluster != nil { + task.NewCluster = nil + task.ExistingClusterId = m.compute + } else if task.ExistingClusterId != "" { + task.ExistingClusterId = m.compute + } + } +} + +func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) error { + if m.compute == "" { + return nil + } + if b.Config.Bundle.Mode != config.Debug { + return fmt.Errorf("cannot override compute for an environment that does not use 'mode: debug'") + } + + r := b.Config.Resources + for i := range r.Jobs { + m.overrideJobCompute(r.Jobs[i]) + } + + return nil +} diff --git a/bundle/config/mutator/override_compute_test.go b/bundle/config/mutator/override_compute_test.go new file mode 100644 index 00000000..f6e0eca7 --- /dev/null +++ b/bundle/config/mutator/override_compute_test.go @@ -0,0 +1,47 @@ +package mutator_test + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOverrideCompute(t *testing.T) { + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + Mode: config.Debug, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{ + Name: "job1", + Tasks: []jobs.JobTaskSettings{ + { + NewCluster: &compute.BaseClusterInfo{}, + }, + { + ExistingClusterId: "cluster2", + }, + }, + }}, + }, + }, + }, + } + + m := mutator.OverrideCompute("newClusterID") + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Nil(t, bundle.Config.Resources.Jobs["job1"].Tasks[0].NewCluster) + assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[0].ExistingClusterId) + assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[1].ExistingClusterId) +} diff --git a/bundle/config/mutator/populate_current_user.go b/bundle/config/mutator/populate_current_user.go index 34c6ff6e..c0bf541d 100644 --- a/bundle/config/mutator/populate_current_user.go +++ b/bundle/config/mutator/populate_current_user.go @@ -18,6 +18,10 @@ func (m *populateCurrentUser) Name() string { } func (m *populateCurrentUser) Apply(ctx context.Context, b *bundle.Bundle) error { + if b.Config.Workspace.CurrentUser != nil { + return nil + } + w := b.WorkspaceClient() me, err := w.CurrentUser.Me(ctx) if err != nil { diff --git a/bundle/config/mutator/process_environment_mode.go b/bundle/config/mutator/process_environment_mode.go new file mode 100644 index 00000000..37de53e5 --- /dev/null +++ b/bundle/config/mutator/process_environment_mode.go @@ -0,0 +1,68 @@ +package mutator + +import ( + "context" + "fmt" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/databricks-sdk-go/service/ml" +) + +type processEnvironmentMode struct{} + +func ProcessEnvironmentMode() bundle.Mutator { + return &processEnvironmentMode{} +} + +func (m *processEnvironmentMode) Name() string { + return "ProcessEnvironmentMode" +} + +// Mark all resources as being for 'debug' purposes, i.e. +// changing their their name, adding tags, and (in the future) +// marking them as 'hidden' in the UI. +func processDebugMode(b *bundle.Bundle) error { + r := b.Config.Resources + + for i := range r.Jobs { + r.Jobs[i].Name = "[debug] " + r.Jobs[i].Name + if r.Jobs[i].Tags == nil { + r.Jobs[i].Tags = make(map[string]string) + } + r.Jobs[i].Tags["debug"] = "" + } + + for i := range r.Pipelines { + r.Pipelines[i].Name = "[debug] " + r.Pipelines[i].Name + r.Pipelines[i].Development = true + // (pipelines don't have tags) + } + + for i := range r.Models { + r.Models[i].Name = "[debug] " + r.Models[i].Name + r.Models[i].Tags = append(r.Models[i].Tags, ml.ModelTag{Key: "debug", Value: ""}) + } + + for i := range r.Experiments { + r.Experiments[i].Name = "[debug] " + r.Experiments[i].Name + r.Experiments[i].Tags = append(r.Experiments[i].Tags, ml.ExperimentTag{Key: "debug", Value: ""}) + } + + return nil +} + +func (m *processEnvironmentMode) Apply(ctx context.Context, b *bundle.Bundle) error { + switch b.Config.Bundle.Mode { + case config.Debug: + return processDebugMode(b) + case config.Default, "": + // No action + case config.PullRequest: + return fmt.Errorf("not implemented") + default: + return fmt.Errorf("unsupported value specified for 'mode': %s", b.Config.Bundle.Mode) + } + + return nil +} diff --git a/bundle/config/mutator/process_environment_mode_test.go b/bundle/config/mutator/process_environment_mode_test.go new file mode 100644 index 00000000..c48182c7 --- /dev/null +++ b/bundle/config/mutator/process_environment_mode_test.go @@ -0,0 +1,75 @@ +package mutator_test + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/ml" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProcessEnvironmentModeApplyDebug(t *testing.T) { + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + Mode: config.Debug, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{Name: "job1"}}, + }, + Pipelines: map[string]*resources.Pipeline{ + "pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}}, + }, + Experiments: map[string]*resources.MlflowExperiment{ + "experiment1": {Experiment: &ml.Experiment{Name: "experiment1"}}, + }, + Models: map[string]*resources.MlflowModel{ + "model1": {Model: &ml.Model{Name: "model1"}}, + }, + }, + }, + } + + m := mutator.ProcessEnvironmentMode() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Equal(t, "[debug] job1", bundle.Config.Resources.Jobs["job1"].Name) + assert.Equal(t, "[debug] pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name) + assert.Equal(t, "[debug] experiment1", bundle.Config.Resources.Experiments["experiment1"].Name) + assert.Equal(t, "[debug] model1", bundle.Config.Resources.Models["model1"].Name) + assert.Equal(t, "debug", bundle.Config.Resources.Experiments["experiment1"].Experiment.Tags[0].Key) + assert.True(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) +} + +func TestProcessEnvironmentModeApplyDefault(t *testing.T) { + bundle := &bundle.Bundle{ + Config: config.Root{ + Bundle: config.Bundle{ + Mode: config.Default, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": {JobSettings: &jobs.JobSettings{Name: "job1"}}, + }, + Pipelines: map[string]*resources.Pipeline{ + "pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}}, + }, + }, + }, + } + + m := mutator.ProcessEnvironmentMode() + err := m.Apply(context.Background(), bundle) + require.NoError(t, err) + assert.Equal(t, "job1", bundle.Config.Resources.Jobs["job1"].Name) + assert.Equal(t, "pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name) + assert.False(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development) +} diff --git a/bundle/config/root.go b/bundle/config/root.go index 28333e98..ac3aa20d 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -190,5 +190,9 @@ func (r *Root) MergeEnvironment(env *Environment) error { } } + if env.Mode != "" { + r.Bundle.Mode = env.Mode + } + return nil } diff --git a/bundle/config/root_test.go b/bundle/config/root_test.go index b53f1ab7..3081b899 100644 --- a/bundle/config/root_test.go +++ b/bundle/config/root_test.go @@ -154,3 +154,12 @@ func TestInitializeVariablesUndefinedVariables(t *testing.T) { err := root.InitializeVariables([]string{"bar=567"}) assert.ErrorContains(t, err, "variable bar has not been defined") } + +func TestRootMergeEnvironmentWithMode(t *testing.T) { + root := &Root{ + Bundle: Bundle{}, + } + env := &Environment{Mode: Debug} + require.NoError(t, root.MergeEnvironment(env)) + assert.Equal(t, Debug, root.Bundle.Mode) +} diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index 36d25154..2d71c887 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -11,7 +11,7 @@ import ( // The initialize phase fills in defaults and connects to the workspace. // Interpolation of fields referring to the "bundle" and "workspace" keys // happens upon completion of this phase. -func Initialize() bundle.Mutator { +func Initialize(overrideCompute string) bundle.Mutator { return newPhase( "initialize", []bundle.Mutator{ @@ -25,6 +25,8 @@ func Initialize() bundle.Mutator { interpolation.IncludeLookupsInPath("workspace"), interpolation.IncludeLookupsInPath(variable.VariableReferencePrefix), ), + mutator.OverrideCompute(overrideCompute), + mutator.ProcessEnvironmentMode(), mutator.TranslatePaths(), terraform.Initialize(), }, diff --git a/bundle/run/job.go b/bundle/run/job.go index eeb85689..9bfec220 100644 --- a/bundle/run/job.go +++ b/bundle/run/job.go @@ -255,6 +255,18 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e } logProgress := logProgressCallback(ctx, progressLogger) + if opts.NoWait { + run, err := w.Jobs.RunNow(ctx, *req) + if err != nil { + return nil, err + } + details, err := w.Jobs.GetRun(ctx, jobs.GetRunRequest{ + RunId: run.RunId, + }) + progressLogger.Log(progress.NewJobRunUrlEvent(details.RunPageUrl)) + return nil, err + } + run, err := w.Jobs.RunNowAndWait(ctx, *req, retries.Timeout[jobs.Run](jobRunTimeout), pullRunId, logDebug, logProgress) if err != nil && runId != nil { diff --git a/bundle/run/options.go b/bundle/run/options.go index cc9dd413..3194fb32 100644 --- a/bundle/run/options.go +++ b/bundle/run/options.go @@ -7,6 +7,7 @@ import ( type Options struct { Job JobOptions Pipeline PipelineOptions + NoWait bool } func (o *Options) Define(fs *flag.FlagSet) { diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go index 621da071..dc8fd5ae 100644 --- a/bundle/run/pipeline.go +++ b/bundle/run/pipeline.go @@ -167,6 +167,10 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp return nil, fmt.Errorf("no progress logger found") } + if opts.NoWait { + log.Warnf(ctx, "--no-wait is not yet implemented for pipelines") + } + // Log the pipeline update URL as soon as it is available. progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID)) diff --git a/bundle/tests/job_and_pipeline/bundle.yml b/bundle/tests/job_and_pipeline/bundle.yml index f4a5719a..9046b10d 100644 --- a/bundle/tests/job_and_pipeline/bundle.yml +++ b/bundle/tests/job_and_pipeline/bundle.yml @@ -8,6 +8,7 @@ resources: environments: development: + mode: debug resources: pipelines: nyc_taxi_pipeline: diff --git a/bundle/tests/job_and_pipeline_test.go b/bundle/tests/job_and_pipeline_test.go index 8fc032a5..d6f05b93 100644 --- a/bundle/tests/job_and_pipeline_test.go +++ b/bundle/tests/job_and_pipeline_test.go @@ -4,6 +4,7 @@ import ( "path/filepath" "testing" + "github.com/databricks/cli/bundle/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -15,6 +16,7 @@ func TestJobAndPipelineDevelopment(t *testing.T) { p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"] assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(p.ConfigFilePath)) + assert.Equal(t, b.Config.Bundle.Mode, config.Debug) assert.True(t, p.Development) require.Len(t, p.Libraries, 1) assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path) diff --git a/cmd/bundle/deploy.go b/cmd/bundle/deploy.go index 5ecc72d2..f1179634 100644 --- a/cmd/bundle/deploy.go +++ b/cmd/bundle/deploy.go @@ -18,7 +18,7 @@ var deployCmd = &cobra.Command{ b.Config.Bundle.Lock.Force = force return bundle.Apply(cmd.Context(), b, bundle.Seq( - phases.Initialize(), + phases.Initialize(""), phases.Build(), phases.Deploy(), )) @@ -26,8 +26,10 @@ var deployCmd = &cobra.Command{ } var force bool +var computeID string func init() { AddCommand(deployCmd) deployCmd.Flags().BoolVar(&force, "force", false, "Force acquisition of deployment lock.") + deployCmd.Flags().StringVar(&computeID, "compute", "", "Override compute in the deployment with the given compute ID.") } diff --git a/cmd/bundle/destroy.go b/cmd/bundle/destroy.go index 979f01a9..9369e08a 100644 --- a/cmd/bundle/destroy.go +++ b/cmd/bundle/destroy.go @@ -43,7 +43,7 @@ var destroyCmd = &cobra.Command{ } return bundle.Apply(ctx, b, bundle.Seq( - phases.Initialize(), + phases.Initialize(""), phases.Build(), phases.Destroy(), )) diff --git a/cmd/bundle/run.go b/cmd/bundle/run.go index 1eb7aa4b..65e17f3d 100644 --- a/cmd/bundle/run.go +++ b/cmd/bundle/run.go @@ -14,6 +14,8 @@ import ( ) var runOptions run.Options +var deploy bool +var noWait bool var runCmd = &cobra.Command{ Use: "run [flags] KEY", @@ -23,8 +25,25 @@ var runCmd = &cobra.Command{ PreRunE: ConfigureBundleWithVariables, RunE: func(cmd *cobra.Command, args []string) error { b := bundle.Get(cmd.Context()) + + if deploy { + b.Config.Bundle.Lock.Force = force + err := bundle.Apply(cmd.Context(), b, bundle.Seq( + phases.Initialize(computeID), + phases.Build(), + phases.Deploy(), + )) + if err != nil { + return err + } + } else if computeID != "" { + // Running notebooks is not yet implemented, otherwise we could + // use --compute with a notebook + return fmt.Errorf("not supported: --compute specified without --deploy") + } + err := bundle.Apply(cmd.Context(), b, bundle.Seq( - phases.Initialize(), + phases.Initialize(computeID), terraform.Interpolate(), terraform.Write(), terraform.StatePull(), @@ -39,6 +58,7 @@ var runCmd = &cobra.Command{ return err } + runOptions.NoWait = noWait output, err := runner.Run(cmd.Context(), &runOptions) if err != nil { return err @@ -89,4 +109,8 @@ var runCmd = &cobra.Command{ func init() { runOptions.Define(runCmd.Flags()) rootCmd.AddCommand(runCmd) + runCmd.Flags().BoolVar(&deploy, "deploy", false, "Call deploy before run.") + runCmd.Flags().BoolVar(&force, "force", false, "Force acquisition of deployment lock.") + runCmd.Flags().BoolVar(&noWait, "no-wait", false, "Don't wait for the run to complete.") + runCmd.Flags().StringVar(&computeID, "compute", "", "Override compute in the deployment with the given compute ID.") } diff --git a/cmd/bundle/sync.go b/cmd/bundle/sync.go index 19adc2dd..2b7c29a3 100644 --- a/cmd/bundle/sync.go +++ b/cmd/bundle/sync.go @@ -39,7 +39,7 @@ var syncCmd = &cobra.Command{ b := bundle.Get(cmd.Context()) // Run initialize phase to make sure paths are set. - err := bundle.Apply(cmd.Context(), b, phases.Initialize()) + err := bundle.Apply(cmd.Context(), b, phases.Initialize("")) if err != nil { return err } diff --git a/cmd/bundle/validate.go b/cmd/bundle/validate.go index 65ab3890..35896991 100644 --- a/cmd/bundle/validate.go +++ b/cmd/bundle/validate.go @@ -16,7 +16,7 @@ var validateCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { b := bundle.Get(cmd.Context()) - err := bundle.Apply(cmd.Context(), b, phases.Initialize()) + err := bundle.Apply(cmd.Context(), b, phases.Initialize("")) if err != nil { return err }