Add `bricks bundle run` command (#134)

This commit is contained in:
Pieter Noordhuis 2022-12-15 15:12:47 +01:00 committed by GitHub
parent 72e89bf33c
commit b111416fe5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 565 additions and 0 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/databricks/bricks/bundle/config"
"github.com/databricks/databricks-sdk-go"
"github.com/hashicorp/terraform-exec/tfexec"
)
type Bundle struct {
@ -22,6 +23,9 @@ type Bundle struct {
// It can be initialized on demand after loading the configuration.
clientOnce sync.Once
client *databricks.WorkspaceClient
// Stores an initialized copy of this bundle's Terraform wrapper.
Terraform *tfexec.Terraform
}
func Load(path string) (*Bundle, error) {

View File

@ -2,9 +2,11 @@ package terraform
import (
"encoding/json"
"fmt"
"github.com/databricks/bricks/bundle/config"
"github.com/databricks/bricks/bundle/internal/tf/schema"
tfjson "github.com/hashicorp/terraform-json"
)
func conv(from any, to any) {
@ -81,3 +83,39 @@ func BundleToTerraform(config *config.Root) *schema.Root {
return tfroot
}
func TerraformToBundle(state *tfjson.State, config *config.Root) error {
if state.Values == nil {
return fmt.Errorf("state.Values not set")
}
if state.Values.RootModule == nil {
return fmt.Errorf("state.Values.RootModule not set")
}
for _, resource := range state.Values.RootModule.Resources {
// Limit to resources.
if resource.Mode != tfjson.ManagedResourceMode {
continue
}
switch resource.Type {
case "databricks_job":
var tmp schema.ResourceJob
conv(resource.AttributeValues, &tmp)
cur := config.Resources.Jobs[resource.Name]
conv(tmp, &cur)
config.Resources.Jobs[resource.Name] = cur
case "databricks_pipeline":
var tmp schema.ResourcePipeline
conv(resource.AttributeValues, &tmp)
cur := config.Resources.Pipelines[resource.Name]
conv(tmp, &cur)
config.Resources.Pipelines[resource.Name] = cur
default:
return fmt.Errorf("missing mapping for %s", resource.Type)
}
}
return nil
}

View File

@ -0,0 +1,39 @@
package terraform
import (
"context"
"os/exec"
"github.com/databricks/bricks/bundle"
"github.com/hashicorp/terraform-exec/tfexec"
)
type initialize struct{}
func (m *initialize) Name() string {
return "terraform.Initialize"
}
func (m *initialize) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
workingDir, err := Dir(b)
if err != nil {
return nil, err
}
execPath, err := exec.LookPath("terraform")
if err != nil {
return nil, err
}
tf, err := tfexec.NewTerraform(workingDir, execPath)
if err != nil {
return nil, err
}
b.Terraform = tf
return nil, nil
}
func Initialize() bundle.Mutator {
return &initialize{}
}

View File

@ -0,0 +1,32 @@
package terraform
import (
"context"
"github.com/databricks/bricks/bundle"
)
type load struct{}
func (l *load) Name() string {
return "terraform.Load"
}
func (l *load) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
state, err := b.Terraform.Show(ctx)
if err != nil {
return nil, err
}
// Merge state into configuration.
err = TerraformToBundle(state, &b.Config)
if err != nil {
return nil, err
}
return nil, nil
}
func Load() bundle.Mutator {
return &load{}
}

82
bundle/run/job.go Normal file
View File

@ -0,0 +1,82 @@
package run
import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/retries"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
// Default timeout for waiting for a job run to complete.
var jobRunTimeout time.Duration = 2 * time.Hour
type jobRunner struct {
key
bundle *bundle.Bundle
job *resources.Job
}
func (r *jobRunner) Run(ctx context.Context) error {
jobID, err := strconv.ParseInt(r.job.ID, 10, 64)
if err != nil {
return fmt.Errorf("job ID is not an integer: %s", r.job.ID)
}
var prefix = fmt.Sprintf("[INFO] [%s]", r.Key())
var prevState *jobs.RunState
// This function is called each time the function below polls the run status.
update := func(info *retries.Info[jobs.Run]) {
state := info.Info.State
if state == nil {
return
}
// Log the job run URL as soon as it is available.
if prevState == nil {
log.Printf("%s Run available at %s", prefix, info.Info.RunPageUrl)
}
if prevState == nil || prevState.LifeCycleState != state.LifeCycleState {
log.Printf("%s Run status: %s", prefix, info.Info.State.LifeCycleState)
prevState = state
}
}
w := r.bundle.WorkspaceClient()
run, err := w.Jobs.RunNowAndWait(ctx, jobs.RunNow{
JobId: jobID,
}, retries.Timeout[jobs.Run](jobRunTimeout), update)
if err != nil {
return err
}
switch run.State.ResultState {
// The run was canceled at user request.
case jobs.RunResultStateCanceled:
log.Printf("%s Run was cancelled!", prefix)
return fmt.Errorf("run canceled: %s", run.State.StateMessage)
// The task completed with an error.
case jobs.RunResultStateFailed:
log.Printf("%s Run has failed!", prefix)
return fmt.Errorf("run failed: %s", run.State.StateMessage)
// The task completed successfully.
case jobs.RunResultStateSuccess:
log.Printf("%s Run has completed successfully!", prefix)
return nil
// The run was stopped after reaching the timeout.
case jobs.RunResultStateTimedout:
log.Printf("%s Run has timed out!", prefix)
return fmt.Errorf("run timed out: %s", run.State.StateMessage)
}
return err
}

33
bundle/run/keys.go Normal file
View File

@ -0,0 +1,33 @@
package run
import (
"fmt"
"github.com/databricks/bricks/bundle"
)
// RunnerLookup maps identifiers to a list of workloads that match that identifier.
// The list can have more than 1 entry if resources of different types use the
// same key. When this happens, the user should disambiguate between them.
type RunnerLookup map[string][]Runner
// ResourceKeys computes a map with
func ResourceKeys(b *bundle.Bundle) (keyOnly RunnerLookup, keyWithType RunnerLookup) {
keyOnly = make(RunnerLookup)
keyWithType = make(RunnerLookup)
r := b.Config.Resources
for k, v := range r.Jobs {
kt := fmt.Sprintf("jobs.%s", k)
w := jobRunner{key: key(kt), bundle: b, job: v}
keyOnly[k] = append(keyOnly[k], &w)
keyWithType[kt] = append(keyWithType[kt], &w)
}
for k, v := range r.Pipelines {
kt := fmt.Sprintf("pipelines.%s", k)
w := pipelineRunner{key: key(kt), bundle: b, pipeline: v}
keyOnly[k] = append(keyOnly[k], &w)
keyWithType[kt] = append(keyWithType[kt], &w)
}
return
}

76
bundle/run/pipeline.go Normal file
View File

@ -0,0 +1,76 @@
package run
import (
"context"
"fmt"
"log"
"time"
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/pipelines"
)
type pipelineRunner struct {
key
bundle *bundle.Bundle
pipeline *resources.Pipeline
}
func (r *pipelineRunner) Run(ctx context.Context) error {
var prefix = fmt.Sprintf("[INFO] [%s]", r.Key())
var pipelineID = r.pipeline.ID
w := r.bundle.WorkspaceClient()
_, err := w.Pipelines.GetPipelineByPipelineId(ctx, pipelineID)
if err != nil {
log.Printf("[WARN] Cannot get pipeline: %s", err)
return err
}
res, err := w.Pipelines.StartUpdate(ctx, pipelines.StartUpdate{
PipelineId: pipelineID,
})
if err != nil {
return err
}
updateID := res.UpdateId
// Log the pipeline update URL as soon as it is available.
url := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID)
log.Printf("%s Update available at %s", prefix, url)
// Poll update for completion and post status.
// Note: there is no "StartUpdateAndWait" wrapper for this API.
var prevState *pipelines.UpdateInfoState
for {
update, err := w.Pipelines.GetUpdateByPipelineIdAndUpdateId(ctx, pipelineID, updateID)
if err != nil {
return err
}
// Log only if the current state is different from the previous state.
state := update.Update.State
if prevState == nil || *prevState != state {
log.Printf("%s Update status: %s", prefix, state)
prevState = &state
}
if state == pipelines.UpdateInfoStateCanceled {
log.Printf("%s Update was cancelled!", prefix)
return fmt.Errorf("update cancelled")
}
if state == pipelines.UpdateInfoStateFailed {
log.Printf("%s Update has failed!", prefix)
return fmt.Errorf("update failed")
}
if state == pipelines.UpdateInfoStateCompleted {
log.Printf("%s Update has completed successfully!", prefix)
return nil
}
time.Sleep(time.Second)
}
}

78
bundle/run/runner.go Normal file
View File

@ -0,0 +1,78 @@
package run
import (
"context"
"fmt"
"strings"
"github.com/databricks/bricks/bundle"
)
type key string
func (k key) Key() string {
return string(k)
}
// Runner defines the interface for a runnable resource (or workload).
type Runner interface {
// Key returns the fully qualified (unique) identifier for this runnable resource.
// This is used for showing the user hints w.r.t. disambiguation.
Key() string
// Run the underlying worklow.
Run(ctx context.Context) error
}
// Collect collects a list of runners given a list of arguments.
//
// Its behavior is as follows:
// 1. If no arguments are specified, it returns a runner for the only resource in the bundle.
// 2. If multiple arguments are specified, for each argument:
// 2.1. Try to find a resource with <key> identical to the argument.
// 2.2. Try to find a resource with <type>.<key> identical to the argument.
//
// If an argument resolves to multiple resources, it returns an error.
func Collect(b *bundle.Bundle, args []string) ([]Runner, error) {
keyOnly, keyWithType := ResourceKeys(b)
if len(keyWithType) == 0 {
return nil, fmt.Errorf("bundle defines no resources")
}
var out []Runner
// If the bundle contains only a single resource, we know what to run.
if len(args) == 0 {
if len(keyWithType) != 1 {
return nil, fmt.Errorf("bundle defines multiple resources; please specify resource to run")
}
for _, runners := range keyWithType {
if len(runners) != 1 {
// This invariant is covered by [ResourceKeys].
panic("length of []run.Runner must be 1")
}
out = append(out, runners[0])
}
return out, nil
}
for _, arg := range args {
runners, ok := keyOnly[arg]
if !ok {
runners, ok = keyWithType[arg]
if !ok {
return nil, fmt.Errorf("no such resource: %s", arg)
}
}
if len(runners) != 1 {
var keys []string
for _, runner := range runners {
keys = append(keys, runner.Key())
}
return nil, fmt.Errorf("ambiguous: %s (can resolve to all of %s)", arg, strings.Join(keys, ", "))
}
out = append(out, runners[0])
}
return out, nil
}

138
bundle/run/runner_test.go Normal file
View File

@ -0,0 +1,138 @@
package run
import (
"testing"
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config"
"github.com/databricks/bricks/bundle/config/resources"
"github.com/stretchr/testify/assert"
)
func TestCollectNoResources(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{},
},
}
_, err := Collect(b, []string{"foo"})
assert.ErrorContains(t, err, "bundle defines no resources")
}
func TestCollectNoArg(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {},
},
},
},
}
out, err := Collect(b, []string{})
assert.NoError(t, err)
assert.Len(t, out, 1)
}
func TestCollectNoArgMultipleResources(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {},
"bar": {},
},
},
},
}
_, err := Collect(b, []string{})
assert.ErrorContains(t, err, "bundle defines multiple resources")
}
func TestCollectSingleArg(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {},
},
},
},
}
out, err := Collect(b, []string{"foo"})
assert.NoError(t, err)
assert.Len(t, out, 1)
}
func TestCollectSingleArgNotFound(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {},
},
},
},
}
_, err := Collect(b, []string{"bar"})
assert.ErrorContains(t, err, "no such resource: bar")
}
func TestCollectSingleArgAmbiguous(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"key": {},
},
Pipelines: map[string]*resources.Pipeline{
"key": {},
},
},
},
}
_, err := Collect(b, []string{"key"})
assert.ErrorContains(t, err, "ambiguous: ")
}
func TestCollectSingleArgWithType(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"key": {},
},
},
},
}
out, err := Collect(b, []string{"jobs.key"})
assert.NoError(t, err)
assert.Len(t, out, 1)
}
func TestCollectMultipleArg(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {},
"bar": {},
},
Pipelines: map[string]*resources.Pipeline{
"qux": {},
},
},
},
}
out, err := Collect(b, []string{"foo", "bar", "qux"})
assert.NoError(t, err)
assert.Len(t, out, 3)
}

45
cmd/bundle/run.go Normal file
View File

@ -0,0 +1,45 @@
package bundle
import (
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/deploy/terraform"
"github.com/databricks/bricks/bundle/phases"
"github.com/databricks/bricks/bundle/run"
"github.com/spf13/cobra"
)
var runCmd = &cobra.Command{
Use: "run [flags] KEY...",
Short: "Run a workload (e.g. a job or a pipeline)",
PreRunE: ConfigureBundle,
RunE: func(cmd *cobra.Command, args []string) error {
b := bundle.Get(cmd.Context())
err := bundle.Apply(cmd.Context(), b, []bundle.Mutator{
phases.Initialize(),
terraform.Initialize(),
terraform.Load(),
})
if err != nil {
return err
}
runners, err := run.Collect(b, args)
if err != nil {
return err
}
for _, runner := range runners {
err = runner.Run(cmd.Context())
if err != nil {
return err
}
}
return nil
},
}
func init() {
rootCmd.AddCommand(runCmd)
}