Add development runs (#522)

This implements the "development run" functionality that we desire for DABs in the workspace / IDE.

## bundle.yml changes

In bundle.yml, there should be a "dev" environment that is marked as
`mode: debug`:
```
environments:
  dev:
    default: true
    mode: development # future accepted values might include pull_request, production
```

Setting `mode` to `development` indicates that this environment is used
just for running things for development. This results in several changes
to deployed assets:
* All assets will get '[dev]' in their name and will get a 'dev' tag
* All assets will be hidden from the list of assets (future work; e.g.
for jobs we would have a special job_type that hides it from the list)
* All deployed assets will be ephemeral (future work, we need some form
of garbage collection)
* Pipelines will be marked as 'development: true'
* Jobs can run on development compute through the `--compute` parameter
in the CLI
* Jobs get their schedule / triggers paused
* Jobs get concurrent runs (it's really annoying if your runs get
skipped because the last run was still in progress)

Other accepted values for `mode` are `default` (which does nothing) and
`pull-request` (which is reserved for future use).

## CLI changes

To run a single job called "shark_sighting" on existing compute, use the
following commands:
```
$ databricks bundle deploy --compute 0617-201942-9yd9g8ix
$ databricks bundle run shark_sighting
```

which would deploy and run a job called "[dev] shark_sightings" on the
compute provided. Note that `--compute` is not accepted in production
environments, so we show an error if `mode: development` is not used.

The `run --deploy` command offers a convenient shorthand for the common
combination of deploying & running:
```
$ export DATABRICKS_COMPUTE=0617-201942-9yd9g8ix
$ bundle run --deploy shark_sightings
```
The `--deploy` addition isn't really essential and I welcome feedback 🤔
I played with the idea of a "debug" or "dev" command but that seemed to
only make the option space even broader for users. The above could work
well with an IDE or workspace that automatically sets the target
compute.

One more thing I added is`run --no-wait` can now be used to run
something without waiting for it to be completed (useful for IDE-like
environments that can display progress themselves).
```
$ bundle run --deploy shark_sightings --no-wait
```
This commit is contained in:
Lennart Kats (databricks) 2023-07-12 08:51:54 +02:00 committed by GitHub
parent e11704618c
commit 57e75d3e22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 424 additions and 0 deletions

View File

@ -28,4 +28,13 @@ type Bundle struct {
// Contains Git information like current commit, current branch and
// origin url. Automatically loaded by reading .git directory if not specified
Git Git `json:"git,omitempty"`
// Determines the mode of the environment.
// For example, 'mode: development' can be used for deployments for
// development purposes.
// Annotated readonly as this should be set at the environment level.
Mode Mode `json:"mode,omitempty" bundle:"readonly"`
// Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"`
}

View File

@ -1,5 +1,7 @@
package config
type Mode string
// Environment defines overrides for a single environment.
// This structure is recursively merged into the root configuration.
type Environment struct {
@ -7,6 +9,14 @@ type Environment struct {
// by the user (through environment variable or command line argument).
Default bool `json:"default,omitempty"`
// Determines the mode of the environment.
// For example, 'mode: development' can be used for deployments for
// development purposes.
Mode Mode `json:"mode,omitempty"`
// Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"`
Bundle *Bundle `json:"bundle,omitempty"`
Workspace *Workspace `json:"workspace,omitempty"`
@ -20,3 +30,9 @@ type Environment struct {
// in the scope of an environment
Variables map[string]string `json:"variables,omitempty"`
}
const (
// Right now, we just have a default / "" mode and a "development" mode.
// Additional modes are expected to come for pull-requests and production.
Development Mode = "development"
)

View File

@ -0,0 +1,56 @@
package mutator
import (
"context"
"fmt"
"os"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
)
type overrideCompute struct{}
func OverrideCompute() bundle.Mutator {
return &overrideCompute{}
}
func (m *overrideCompute) Name() string {
return "OverrideCompute"
}
func overrideJobCompute(j *resources.Job, compute string) {
for i := range j.Tasks {
task := &j.Tasks[i]
if task.NewCluster != nil {
task.NewCluster = nil
task.ExistingClusterId = compute
} else if task.ExistingClusterId != "" {
task.ExistingClusterId = compute
}
}
}
func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) error {
if b.Config.Bundle.Mode != config.Development {
if b.Config.Bundle.ComputeID != "" {
return fmt.Errorf("cannot override compute for an environment that does not use 'mode: development'")
}
return nil
}
if os.Getenv("DATABRICKS_CLUSTER_ID") != "" {
b.Config.Bundle.ComputeID = os.Getenv("DATABRICKS_CLUSTER_ID")
}
if b.Config.Bundle.ComputeID == "" {
return nil
}
r := b.Config.Resources
for i := range r.Jobs {
overrideJobCompute(r.Jobs[i], b.Config.Bundle.ComputeID)
}
return nil
}

View File

@ -0,0 +1,134 @@
package mutator_test
import (
"context"
"os"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestOverrideDevelopment(t *testing.T) {
os.Setenv("DATABRICKS_CLUSTER_ID", "")
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Development,
ComputeID: "newClusterID",
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{
Name: "job1",
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{},
},
{
ExistingClusterId: "cluster2",
},
},
}},
},
},
},
}
m := mutator.OverrideCompute()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Nil(t, bundle.Config.Resources.Jobs["job1"].Tasks[0].NewCluster)
assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[0].ExistingClusterId)
assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[1].ExistingClusterId)
}
func TestOverrideDevelopmentEnv(t *testing.T) {
os.Setenv("DATABRICKS_CLUSTER_ID", "newClusterId")
bundle := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{
Name: "job1",
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{},
},
{
ExistingClusterId: "cluster2",
},
},
}},
},
},
},
}
m := mutator.OverrideCompute()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Equal(t, "cluster2", bundle.Config.Resources.Jobs["job1"].Tasks[1].ExistingClusterId)
}
func TestOverrideProduction(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
ComputeID: "newClusterID",
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{
Name: "job1",
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{},
},
{
ExistingClusterId: "cluster2",
},
},
}},
},
},
},
}
m := mutator.OverrideCompute()
err := m.Apply(context.Background(), bundle)
require.Error(t, err)
}
func TestOverrideProductionEnv(t *testing.T) {
os.Setenv("DATABRICKS_CLUSTER_ID", "newClusterId")
bundle := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{
Name: "job1",
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{},
},
{
ExistingClusterId: "cluster2",
},
},
}},
},
},
},
}
m := mutator.OverrideCompute()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
}

View File

@ -0,0 +1,89 @@
package mutator
import (
"context"
"fmt"
"path"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml"
)
type processEnvironmentMode struct{}
const developmentConcurrentRuns = 4
func ProcessEnvironmentMode() bundle.Mutator {
return &processEnvironmentMode{}
}
func (m *processEnvironmentMode) Name() string {
return "ProcessEnvironmentMode"
}
// Mark all resources as being for 'development' purposes, i.e.
// changing their their name, adding tags, and (in the future)
// marking them as 'hidden' in the UI.
func processDevelopmentMode(b *bundle.Bundle) error {
r := b.Config.Resources
for i := range r.Jobs {
r.Jobs[i].Name = "[dev] " + r.Jobs[i].Name
if r.Jobs[i].Tags == nil {
r.Jobs[i].Tags = make(map[string]string)
}
r.Jobs[i].Tags["dev"] = ""
if r.Jobs[i].MaxConcurrentRuns == 0 {
r.Jobs[i].MaxConcurrentRuns = developmentConcurrentRuns
}
if r.Jobs[i].Schedule != nil {
r.Jobs[i].Schedule.PauseStatus = jobs.PauseStatusPaused
}
if r.Jobs[i].Continuous != nil {
r.Jobs[i].Continuous.PauseStatus = jobs.PauseStatusPaused
}
if r.Jobs[i].Trigger != nil {
r.Jobs[i].Trigger.PauseStatus = jobs.PauseStatusPaused
}
}
for i := range r.Pipelines {
r.Pipelines[i].Name = "[dev] " + r.Pipelines[i].Name
r.Pipelines[i].Development = true
// (pipelines don't yet support tags)
}
for i := range r.Models {
r.Models[i].Name = "[dev] " + r.Models[i].Name
r.Models[i].Tags = append(r.Models[i].Tags, ml.ModelTag{Key: "dev", Value: ""})
}
for i := range r.Experiments {
filepath := r.Experiments[i].Name
dir := path.Dir(filepath)
base := path.Base(filepath)
if dir == "." {
r.Experiments[i].Name = "[dev] " + base
} else {
r.Experiments[i].Name = dir + "/[dev] " + base
}
r.Experiments[i].Tags = append(r.Experiments[i].Tags, ml.ExperimentTag{Key: "dev", Value: ""})
}
return nil
}
func (m *processEnvironmentMode) Apply(ctx context.Context, b *bundle.Bundle) error {
switch b.Config.Bundle.Mode {
case config.Development:
return processDevelopmentMode(b)
case "":
// No action
default:
return fmt.Errorf("unsupported value specified for 'mode': %s", b.Config.Bundle.Mode)
}
return nil
}

View File

@ -0,0 +1,77 @@
package mutator_test
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestProcessEnvironmentModeApplyDebug(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Development,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{Name: "job1"}},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}},
},
Experiments: map[string]*resources.MlflowExperiment{
"experiment1": {Experiment: &ml.Experiment{Name: "/Users/lennart.kats@databricks.com/experiment1"}},
"experiment2": {Experiment: &ml.Experiment{Name: "experiment2"}},
},
Models: map[string]*resources.MlflowModel{
"model1": {Model: &ml.Model{Name: "model1"}},
},
},
},
}
m := mutator.ProcessEnvironmentMode()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Equal(t, "[dev] job1", bundle.Config.Resources.Jobs["job1"].Name)
assert.Equal(t, "[dev] pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name)
assert.Equal(t, "/Users/lennart.kats@databricks.com/[dev] experiment1", bundle.Config.Resources.Experiments["experiment1"].Name)
assert.Equal(t, "[dev] experiment2", bundle.Config.Resources.Experiments["experiment2"].Name)
assert.Equal(t, "[dev] model1", bundle.Config.Resources.Models["model1"].Name)
assert.Equal(t, "dev", bundle.Config.Resources.Experiments["experiment1"].Experiment.Tags[0].Key)
assert.True(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development)
}
func TestProcessEnvironmentModeApplyDefault(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: "",
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{Name: "job1"}},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}},
},
},
},
}
m := mutator.ProcessEnvironmentMode()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Equal(t, "job1", bundle.Config.Resources.Jobs["job1"].Name)
assert.Equal(t, "pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name)
assert.False(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development)
}

View File

@ -190,5 +190,13 @@ func (r *Root) MergeEnvironment(env *Environment) error {
}
}
if env.Mode != "" {
r.Bundle.Mode = env.Mode
}
if env.ComputeID != "" {
r.Bundle.ComputeID = env.ComputeID
}
return nil
}

View File

@ -154,3 +154,12 @@ func TestInitializeVariablesUndefinedVariables(t *testing.T) {
err := root.InitializeVariables([]string{"bar=567"})
assert.ErrorContains(t, err, "variable bar has not been defined")
}
func TestRootMergeEnvironmentWithMode(t *testing.T) {
root := &Root{
Bundle: Bundle{},
}
env := &Environment{Mode: Development}
require.NoError(t, root.MergeEnvironment(env))
assert.Equal(t, Development, root.Bundle.Mode)
}

View File

@ -25,6 +25,8 @@ func Initialize() bundle.Mutator {
interpolation.IncludeLookupsInPath("workspace"),
interpolation.IncludeLookupsInPath(variable.VariableReferencePrefix),
),
mutator.OverrideCompute(),
mutator.ProcessEnvironmentMode(),
mutator.TranslatePaths(),
terraform.Initialize(),
},

View File

@ -243,6 +243,15 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
if err != nil {
return nil, fmt.Errorf("cannot start job")
}
if opts.NoWait {
details, err := w.Jobs.GetRun(ctx, jobs.GetRunRequest{
RunId: waiter.RunId,
})
progressLogger.Log(progress.NewJobRunUrlEvent(details.RunPageUrl))
return nil, err
}
run, err := waiter.OnProgress(func(r *jobs.Run) {
pullRunId(r)
logDebug(r)

View File

@ -7,6 +7,7 @@ import (
type Options struct {
Job JobOptions
Pipeline PipelineOptions
NoWait bool
}
func (o *Options) Define(fs *flag.FlagSet) {

View File

@ -170,6 +170,10 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp
// Log the pipeline update URL as soon as it is available.
progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID))
if opts.NoWait {
return nil, nil
}
// Poll update for completion and post status.
// Note: there is no "StartUpdateAndWait" wrapper for this API.
var prevState *pipelines.UpdateInfoState

View File

@ -8,6 +8,7 @@ resources:
environments:
development:
mode: development
resources:
pipelines:
nyc_taxi_pipeline:

View File

@ -4,6 +4,7 @@ import (
"path/filepath"
"testing"
"github.com/databricks/cli/bundle/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -15,6 +16,7 @@ func TestJobAndPipelineDevelopment(t *testing.T) {
p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"]
assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(p.ConfigFilePath))
assert.Equal(t, b.Config.Bundle.Mode, config.Development)
assert.True(t, p.Development)
require.Len(t, p.Libraries, 1)
assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path)

View File

@ -16,6 +16,7 @@ var deployCmd = &cobra.Command{
// If `--force` is specified, force acquisition of the deployment lock.
b.Config.Bundle.Lock.Force = forceDeploy
b.Config.Bundle.ComputeID = computeID
return bundle.Apply(cmd.Context(), b, bundle.Seq(
phases.Initialize(),
@ -26,8 +27,10 @@ var deployCmd = &cobra.Command{
}
var forceDeploy bool
var computeID string
func init() {
AddCommand(deployCmd)
deployCmd.Flags().BoolVar(&forceDeploy, "force", false, "Force acquisition of deployment lock.")
deployCmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.")
}

View File

@ -14,6 +14,7 @@ import (
)
var runOptions run.Options
var noWait bool
var runCmd = &cobra.Command{
Use: "run [flags] KEY",
@ -23,6 +24,7 @@ var runCmd = &cobra.Command{
PreRunE: ConfigureBundleWithVariables,
RunE: func(cmd *cobra.Command, args []string) error {
b := bundle.Get(cmd.Context())
err := bundle.Apply(cmd.Context(), b, bundle.Seq(
phases.Initialize(),
terraform.Interpolate(),
@ -39,6 +41,7 @@ var runCmd = &cobra.Command{
return err
}
runOptions.NoWait = noWait
output, err := runner.Run(cmd.Context(), &runOptions)
if err != nil {
return err
@ -89,4 +92,5 @@ var runCmd = &cobra.Command{
func init() {
runOptions.Define(runCmd.Flags())
rootCmd.AddCommand(runCmd)
runCmd.Flags().BoolVar(&noWait, "no-wait", false, "Don't wait for the run to complete.")
}