Add support for "debug runs"

- Add "mode: debug" property for environments
- Add "--deploy", "--compute", "--no-wait" CLI flags
This commit is contained in:
Lennart Kats 2023-06-18 16:47:01 +02:00
parent e4415bfbcf
commit 48d6df3dfa
20 changed files with 323 additions and 6 deletions

View File

@ -28,4 +28,6 @@ type Bundle struct {
// Contains Git information like current commit, current branch and
// origin url. Automatically loaded by reading .git directory if not specified
Git Git `json:"git,omitempty"`
Mode Mode `json:"mode,omitempty"`
}

View File

@ -1,5 +1,13 @@
package config
type Mode string
const (
Debug Mode = "debug"
Default Mode = "default"
PullRequest Mode = "pull-request"
)
// Environment defines overrides for a single environment.
// This structure is recursively merged into the root configuration.
type Environment struct {
@ -7,6 +15,8 @@ type Environment struct {
// by the user (through environment variable or command line argument).
Default bool `json:"default,omitempty"`
Mode Mode `json:"mode,omitempty"`
Bundle *Bundle `json:"bundle,omitempty"`
Workspace *Workspace `json:"workspace,omitempty"`

View File

@ -0,0 +1,50 @@
package mutator
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
)
type overrideCompute struct {
compute string
}
func OverrideCompute(compute string) bundle.Mutator {
return &overrideCompute{compute: compute}
}
func (m *overrideCompute) Name() string {
return "OverrideCompute"
}
func (m *overrideCompute) overrideJobCompute(j *resources.Job) {
for i := range j.Tasks {
task := &j.Tasks[i]
if task.NewCluster != nil {
task.NewCluster = nil
task.ExistingClusterId = m.compute
} else if task.ExistingClusterId != "" {
task.ExistingClusterId = m.compute
}
}
}
func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) error {
if m.compute == "" {
return nil
}
if b.Config.Bundle.Mode != config.Debug {
return fmt.Errorf("cannot override compute for an environment that does not use 'mode: debug'")
}
r := b.Config.Resources
for i := range r.Jobs {
m.overrideJobCompute(r.Jobs[i])
}
return nil
}

View File

@ -0,0 +1,47 @@
package mutator_test
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestOverrideCompute(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Debug,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{
Name: "job1",
Tasks: []jobs.JobTaskSettings{
{
NewCluster: &compute.BaseClusterInfo{},
},
{
ExistingClusterId: "cluster2",
},
},
}},
},
},
},
}
m := mutator.OverrideCompute("newClusterID")
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Nil(t, bundle.Config.Resources.Jobs["job1"].Tasks[0].NewCluster)
assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[0].ExistingClusterId)
assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[1].ExistingClusterId)
}

View File

@ -18,6 +18,10 @@ func (m *populateCurrentUser) Name() string {
}
func (m *populateCurrentUser) Apply(ctx context.Context, b *bundle.Bundle) error {
if b.Config.Workspace.CurrentUser != nil {
return nil
}
w := b.WorkspaceClient()
me, err := w.CurrentUser.Me(ctx)
if err != nil {

View File

@ -0,0 +1,68 @@
package mutator
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/databricks-sdk-go/service/ml"
)
type processEnvironmentMode struct{}
func ProcessEnvironmentMode() bundle.Mutator {
return &processEnvironmentMode{}
}
func (m *processEnvironmentMode) Name() string {
return "ProcessEnvironmentMode"
}
// Mark all resources as being for 'debug' purposes, i.e.
// changing their their name, adding tags, and (in the future)
// marking them as 'hidden' in the UI.
func processDebugMode(b *bundle.Bundle) error {
r := b.Config.Resources
for i := range r.Jobs {
r.Jobs[i].Name = "[debug] " + r.Jobs[i].Name
if r.Jobs[i].Tags == nil {
r.Jobs[i].Tags = make(map[string]string)
}
r.Jobs[i].Tags["debug"] = ""
}
for i := range r.Pipelines {
r.Pipelines[i].Name = "[debug] " + r.Pipelines[i].Name
r.Pipelines[i].Development = true
// (pipelines don't have tags)
}
for i := range r.Models {
r.Models[i].Name = "[debug] " + r.Models[i].Name
r.Models[i].Tags = append(r.Models[i].Tags, ml.ModelTag{Key: "debug", Value: ""})
}
for i := range r.Experiments {
r.Experiments[i].Name = "[debug] " + r.Experiments[i].Name
r.Experiments[i].Tags = append(r.Experiments[i].Tags, ml.ExperimentTag{Key: "debug", Value: ""})
}
return nil
}
func (m *processEnvironmentMode) Apply(ctx context.Context, b *bundle.Bundle) error {
switch b.Config.Bundle.Mode {
case config.Debug:
return processDebugMode(b)
case config.Default, "":
// No action
case config.PullRequest:
return fmt.Errorf("not implemented")
default:
return fmt.Errorf("unsupported value specified for 'mode': %s", b.Config.Bundle.Mode)
}
return nil
}

View File

@ -0,0 +1,75 @@
package mutator_test
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestProcessEnvironmentModeApplyDebug(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Debug,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{Name: "job1"}},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}},
},
Experiments: map[string]*resources.MlflowExperiment{
"experiment1": {Experiment: &ml.Experiment{Name: "experiment1"}},
},
Models: map[string]*resources.MlflowModel{
"model1": {Model: &ml.Model{Name: "model1"}},
},
},
},
}
m := mutator.ProcessEnvironmentMode()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Equal(t, "[debug] job1", bundle.Config.Resources.Jobs["job1"].Name)
assert.Equal(t, "[debug] pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name)
assert.Equal(t, "[debug] experiment1", bundle.Config.Resources.Experiments["experiment1"].Name)
assert.Equal(t, "[debug] model1", bundle.Config.Resources.Models["model1"].Name)
assert.Equal(t, "debug", bundle.Config.Resources.Experiments["experiment1"].Experiment.Tags[0].Key)
assert.True(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development)
}
func TestProcessEnvironmentModeApplyDefault(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Default,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{Name: "job1"}},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}},
},
},
},
}
m := mutator.ProcessEnvironmentMode()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Equal(t, "job1", bundle.Config.Resources.Jobs["job1"].Name)
assert.Equal(t, "pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name)
assert.False(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development)
}

View File

@ -190,5 +190,9 @@ func (r *Root) MergeEnvironment(env *Environment) error {
}
}
if env.Mode != "" {
r.Bundle.Mode = env.Mode
}
return nil
}

View File

@ -154,3 +154,12 @@ func TestInitializeVariablesUndefinedVariables(t *testing.T) {
err := root.InitializeVariables([]string{"bar=567"})
assert.ErrorContains(t, err, "variable bar has not been defined")
}
func TestRootMergeEnvironmentWithMode(t *testing.T) {
root := &Root{
Bundle: Bundle{},
}
env := &Environment{Mode: Debug}
require.NoError(t, root.MergeEnvironment(env))
assert.Equal(t, Debug, root.Bundle.Mode)
}

View File

@ -11,7 +11,7 @@ import (
// The initialize phase fills in defaults and connects to the workspace.
// Interpolation of fields referring to the "bundle" and "workspace" keys
// happens upon completion of this phase.
func Initialize() bundle.Mutator {
func Initialize(overrideCompute string) bundle.Mutator {
return newPhase(
"initialize",
[]bundle.Mutator{
@ -25,6 +25,8 @@ func Initialize() bundle.Mutator {
interpolation.IncludeLookupsInPath("workspace"),
interpolation.IncludeLookupsInPath(variable.VariableReferencePrefix),
),
mutator.OverrideCompute(overrideCompute),
mutator.ProcessEnvironmentMode(),
mutator.TranslatePaths(),
terraform.Initialize(),
},

View File

@ -255,6 +255,18 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
}
logProgress := logProgressCallback(ctx, progressLogger)
if opts.NoWait {
run, err := w.Jobs.RunNow(ctx, *req)
if err != nil {
return nil, err
}
details, err := w.Jobs.GetRun(ctx, jobs.GetRunRequest{
RunId: run.RunId,
})
progressLogger.Log(progress.NewJobRunUrlEvent(details.RunPageUrl))
return nil, err
}
run, err := w.Jobs.RunNowAndWait(ctx, *req,
retries.Timeout[jobs.Run](jobRunTimeout), pullRunId, logDebug, logProgress)
if err != nil && runId != nil {

View File

@ -7,6 +7,7 @@ import (
type Options struct {
Job JobOptions
Pipeline PipelineOptions
NoWait bool
}
func (o *Options) Define(fs *flag.FlagSet) {

View File

@ -167,6 +167,10 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp
return nil, fmt.Errorf("no progress logger found")
}
if opts.NoWait {
log.Warnf(ctx, "--no-wait is not yet implemented for pipelines")
}
// Log the pipeline update URL as soon as it is available.
progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID))

View File

@ -8,6 +8,7 @@ resources:
environments:
development:
mode: debug
resources:
pipelines:
nyc_taxi_pipeline:

View File

@ -4,6 +4,7 @@ import (
"path/filepath"
"testing"
"github.com/databricks/cli/bundle/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -15,6 +16,7 @@ func TestJobAndPipelineDevelopment(t *testing.T) {
p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"]
assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(p.ConfigFilePath))
assert.Equal(t, b.Config.Bundle.Mode, config.Debug)
assert.True(t, p.Development)
require.Len(t, p.Libraries, 1)
assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path)

View File

@ -18,7 +18,7 @@ var deployCmd = &cobra.Command{
b.Config.Bundle.Lock.Force = force
return bundle.Apply(cmd.Context(), b, bundle.Seq(
phases.Initialize(),
phases.Initialize(""),
phases.Build(),
phases.Deploy(),
))
@ -26,8 +26,10 @@ var deployCmd = &cobra.Command{
}
var force bool
var computeID string
func init() {
AddCommand(deployCmd)
deployCmd.Flags().BoolVar(&force, "force", false, "Force acquisition of deployment lock.")
deployCmd.Flags().StringVar(&computeID, "compute", "", "Override compute in the deployment with the given compute ID.")
}

View File

@ -43,7 +43,7 @@ var destroyCmd = &cobra.Command{
}
return bundle.Apply(ctx, b, bundle.Seq(
phases.Initialize(),
phases.Initialize(""),
phases.Build(),
phases.Destroy(),
))

View File

@ -14,6 +14,8 @@ import (
)
var runOptions run.Options
var deploy bool
var noWait bool
var runCmd = &cobra.Command{
Use: "run [flags] KEY",
@ -23,8 +25,25 @@ var runCmd = &cobra.Command{
PreRunE: ConfigureBundleWithVariables,
RunE: func(cmd *cobra.Command, args []string) error {
b := bundle.Get(cmd.Context())
if deploy {
b.Config.Bundle.Lock.Force = force
err := bundle.Apply(cmd.Context(), b, bundle.Seq(
phases.Initialize(computeID),
phases.Build(),
phases.Deploy(),
))
if err != nil {
return err
}
} else if computeID != "" {
// Running notebooks is not yet implemented, otherwise we could
// use --compute with a notebook
return fmt.Errorf("not supported: --compute specified without --deploy")
}
err := bundle.Apply(cmd.Context(), b, bundle.Seq(
phases.Initialize(),
phases.Initialize(computeID),
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
@ -39,6 +58,7 @@ var runCmd = &cobra.Command{
return err
}
runOptions.NoWait = noWait
output, err := runner.Run(cmd.Context(), &runOptions)
if err != nil {
return err
@ -89,4 +109,8 @@ var runCmd = &cobra.Command{
func init() {
runOptions.Define(runCmd.Flags())
rootCmd.AddCommand(runCmd)
runCmd.Flags().BoolVar(&deploy, "deploy", false, "Call deploy before run.")
runCmd.Flags().BoolVar(&force, "force", false, "Force acquisition of deployment lock.")
runCmd.Flags().BoolVar(&noWait, "no-wait", false, "Don't wait for the run to complete.")
runCmd.Flags().StringVar(&computeID, "compute", "", "Override compute in the deployment with the given compute ID.")
}

View File

@ -39,7 +39,7 @@ var syncCmd = &cobra.Command{
b := bundle.Get(cmd.Context())
// Run initialize phase to make sure paths are set.
err := bundle.Apply(cmd.Context(), b, phases.Initialize())
err := bundle.Apply(cmd.Context(), b, phases.Initialize(""))
if err != nil {
return err
}

View File

@ -16,7 +16,7 @@ var validateCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
b := bundle.Get(cmd.Context())
err := bundle.Apply(cmd.Context(), b, phases.Initialize())
err := bundle.Apply(cmd.Context(), b, phases.Initialize(""))
if err != nil {
return err
}