diff --git a/bundle/bundle.go b/bundle/bundle.go index 71ebe268..7539694f 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -13,6 +13,7 @@ import ( "github.com/databricks/bricks/bundle/config" "github.com/databricks/databricks-sdk-go" + "github.com/hashicorp/terraform-exec/tfexec" ) type Bundle struct { @@ -22,6 +23,9 @@ type Bundle struct { // It can be initialized on demand after loading the configuration. clientOnce sync.Once client *databricks.WorkspaceClient + + // Stores an initialized copy of this bundle's Terraform wrapper. + Terraform *tfexec.Terraform } func Load(path string) (*Bundle, error) { diff --git a/bundle/deploy/terraform/convert.go b/bundle/deploy/terraform/convert.go index 571a7da4..6eb4178a 100644 --- a/bundle/deploy/terraform/convert.go +++ b/bundle/deploy/terraform/convert.go @@ -2,9 +2,11 @@ package terraform import ( "encoding/json" + "fmt" "github.com/databricks/bricks/bundle/config" "github.com/databricks/bricks/bundle/internal/tf/schema" + tfjson "github.com/hashicorp/terraform-json" ) func conv(from any, to any) { @@ -81,3 +83,39 @@ func BundleToTerraform(config *config.Root) *schema.Root { return tfroot } + +func TerraformToBundle(state *tfjson.State, config *config.Root) error { + if state.Values == nil { + return fmt.Errorf("state.Values not set") + } + + if state.Values.RootModule == nil { + return fmt.Errorf("state.Values.RootModule not set") + } + + for _, resource := range state.Values.RootModule.Resources { + // Limit to resources. + if resource.Mode != tfjson.ManagedResourceMode { + continue + } + + switch resource.Type { + case "databricks_job": + var tmp schema.ResourceJob + conv(resource.AttributeValues, &tmp) + cur := config.Resources.Jobs[resource.Name] + conv(tmp, &cur) + config.Resources.Jobs[resource.Name] = cur + case "databricks_pipeline": + var tmp schema.ResourcePipeline + conv(resource.AttributeValues, &tmp) + cur := config.Resources.Pipelines[resource.Name] + conv(tmp, &cur) + config.Resources.Pipelines[resource.Name] = cur + default: + return fmt.Errorf("missing mapping for %s", resource.Type) + } + } + + return nil +} diff --git a/bundle/deploy/terraform/init.go b/bundle/deploy/terraform/init.go new file mode 100644 index 00000000..b3badbea --- /dev/null +++ b/bundle/deploy/terraform/init.go @@ -0,0 +1,39 @@ +package terraform + +import ( + "context" + "os/exec" + + "github.com/databricks/bricks/bundle" + "github.com/hashicorp/terraform-exec/tfexec" +) + +type initialize struct{} + +func (m *initialize) Name() string { + return "terraform.Initialize" +} + +func (m *initialize) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) { + workingDir, err := Dir(b) + if err != nil { + return nil, err + } + + execPath, err := exec.LookPath("terraform") + if err != nil { + return nil, err + } + + tf, err := tfexec.NewTerraform(workingDir, execPath) + if err != nil { + return nil, err + } + + b.Terraform = tf + return nil, nil +} + +func Initialize() bundle.Mutator { + return &initialize{} +} diff --git a/bundle/deploy/terraform/load.go b/bundle/deploy/terraform/load.go new file mode 100644 index 00000000..efd329f4 --- /dev/null +++ b/bundle/deploy/terraform/load.go @@ -0,0 +1,32 @@ +package terraform + +import ( + "context" + + "github.com/databricks/bricks/bundle" +) + +type load struct{} + +func (l *load) Name() string { + return "terraform.Load" +} + +func (l *load) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) { + state, err := b.Terraform.Show(ctx) + if err != nil { + return nil, err + } + + // Merge state into configuration. + err = TerraformToBundle(state, &b.Config) + if err != nil { + return nil, err + } + + return nil, nil +} + +func Load() bundle.Mutator { + return &load{} +} diff --git a/bundle/run/job.go b/bundle/run/job.go new file mode 100644 index 00000000..45925a75 --- /dev/null +++ b/bundle/run/job.go @@ -0,0 +1,82 @@ +package run + +import ( + "context" + "fmt" + "log" + "strconv" + "time" + + "github.com/databricks/bricks/bundle" + "github.com/databricks/bricks/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/retries" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +// Default timeout for waiting for a job run to complete. +var jobRunTimeout time.Duration = 2 * time.Hour + +type jobRunner struct { + key + + bundle *bundle.Bundle + job *resources.Job +} + +func (r *jobRunner) Run(ctx context.Context) error { + jobID, err := strconv.ParseInt(r.job.ID, 10, 64) + if err != nil { + return fmt.Errorf("job ID is not an integer: %s", r.job.ID) + } + + var prefix = fmt.Sprintf("[INFO] [%s]", r.Key()) + var prevState *jobs.RunState + + // This function is called each time the function below polls the run status. + update := func(info *retries.Info[jobs.Run]) { + state := info.Info.State + if state == nil { + return + } + // Log the job run URL as soon as it is available. + if prevState == nil { + log.Printf("%s Run available at %s", prefix, info.Info.RunPageUrl) + } + if prevState == nil || prevState.LifeCycleState != state.LifeCycleState { + log.Printf("%s Run status: %s", prefix, info.Info.State.LifeCycleState) + prevState = state + } + } + + w := r.bundle.WorkspaceClient() + run, err := w.Jobs.RunNowAndWait(ctx, jobs.RunNow{ + JobId: jobID, + }, retries.Timeout[jobs.Run](jobRunTimeout), update) + if err != nil { + return err + } + + switch run.State.ResultState { + // The run was canceled at user request. + case jobs.RunResultStateCanceled: + log.Printf("%s Run was cancelled!", prefix) + return fmt.Errorf("run canceled: %s", run.State.StateMessage) + + // The task completed with an error. + case jobs.RunResultStateFailed: + log.Printf("%s Run has failed!", prefix) + return fmt.Errorf("run failed: %s", run.State.StateMessage) + + // The task completed successfully. + case jobs.RunResultStateSuccess: + log.Printf("%s Run has completed successfully!", prefix) + return nil + + // The run was stopped after reaching the timeout. + case jobs.RunResultStateTimedout: + log.Printf("%s Run has timed out!", prefix) + return fmt.Errorf("run timed out: %s", run.State.StateMessage) + } + + return err +} diff --git a/bundle/run/keys.go b/bundle/run/keys.go new file mode 100644 index 00000000..581b6587 --- /dev/null +++ b/bundle/run/keys.go @@ -0,0 +1,33 @@ +package run + +import ( + "fmt" + + "github.com/databricks/bricks/bundle" +) + +// RunnerLookup maps identifiers to a list of workloads that match that identifier. +// The list can have more than 1 entry if resources of different types use the +// same key. When this happens, the user should disambiguate between them. +type RunnerLookup map[string][]Runner + +// ResourceKeys computes a map with +func ResourceKeys(b *bundle.Bundle) (keyOnly RunnerLookup, keyWithType RunnerLookup) { + keyOnly = make(RunnerLookup) + keyWithType = make(RunnerLookup) + + r := b.Config.Resources + for k, v := range r.Jobs { + kt := fmt.Sprintf("jobs.%s", k) + w := jobRunner{key: key(kt), bundle: b, job: v} + keyOnly[k] = append(keyOnly[k], &w) + keyWithType[kt] = append(keyWithType[kt], &w) + } + for k, v := range r.Pipelines { + kt := fmt.Sprintf("pipelines.%s", k) + w := pipelineRunner{key: key(kt), bundle: b, pipeline: v} + keyOnly[k] = append(keyOnly[k], &w) + keyWithType[kt] = append(keyWithType[kt], &w) + } + return +} diff --git a/bundle/run/pipeline.go b/bundle/run/pipeline.go new file mode 100644 index 00000000..5ea02659 --- /dev/null +++ b/bundle/run/pipeline.go @@ -0,0 +1,76 @@ +package run + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/databricks/bricks/bundle" + "github.com/databricks/bricks/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/pipelines" +) + +type pipelineRunner struct { + key + + bundle *bundle.Bundle + pipeline *resources.Pipeline +} + +func (r *pipelineRunner) Run(ctx context.Context) error { + var prefix = fmt.Sprintf("[INFO] [%s]", r.Key()) + var pipelineID = r.pipeline.ID + + w := r.bundle.WorkspaceClient() + _, err := w.Pipelines.GetPipelineByPipelineId(ctx, pipelineID) + if err != nil { + log.Printf("[WARN] Cannot get pipeline: %s", err) + return err + } + + res, err := w.Pipelines.StartUpdate(ctx, pipelines.StartUpdate{ + PipelineId: pipelineID, + }) + if err != nil { + return err + } + + updateID := res.UpdateId + + // Log the pipeline update URL as soon as it is available. + url := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID) + log.Printf("%s Update available at %s", prefix, url) + + // Poll update for completion and post status. + // Note: there is no "StartUpdateAndWait" wrapper for this API. + var prevState *pipelines.UpdateInfoState + for { + update, err := w.Pipelines.GetUpdateByPipelineIdAndUpdateId(ctx, pipelineID, updateID) + if err != nil { + return err + } + + // Log only if the current state is different from the previous state. + state := update.Update.State + if prevState == nil || *prevState != state { + log.Printf("%s Update status: %s", prefix, state) + prevState = &state + } + + if state == pipelines.UpdateInfoStateCanceled { + log.Printf("%s Update was cancelled!", prefix) + return fmt.Errorf("update cancelled") + } + if state == pipelines.UpdateInfoStateFailed { + log.Printf("%s Update has failed!", prefix) + return fmt.Errorf("update failed") + } + if state == pipelines.UpdateInfoStateCompleted { + log.Printf("%s Update has completed successfully!", prefix) + return nil + } + + time.Sleep(time.Second) + } +} diff --git a/bundle/run/runner.go b/bundle/run/runner.go new file mode 100644 index 00000000..f97f19e0 --- /dev/null +++ b/bundle/run/runner.go @@ -0,0 +1,78 @@ +package run + +import ( + "context" + "fmt" + "strings" + + "github.com/databricks/bricks/bundle" +) + +type key string + +func (k key) Key() string { + return string(k) +} + +// Runner defines the interface for a runnable resource (or workload). +type Runner interface { + // Key returns the fully qualified (unique) identifier for this runnable resource. + // This is used for showing the user hints w.r.t. disambiguation. + Key() string + + // Run the underlying worklow. + Run(ctx context.Context) error +} + +// Collect collects a list of runners given a list of arguments. +// +// Its behavior is as follows: +// 1. If no arguments are specified, it returns a runner for the only resource in the bundle. +// 2. If multiple arguments are specified, for each argument: +// 2.1. Try to find a resource with identical to the argument. +// 2.2. Try to find a resource with . identical to the argument. +// +// If an argument resolves to multiple resources, it returns an error. +func Collect(b *bundle.Bundle, args []string) ([]Runner, error) { + keyOnly, keyWithType := ResourceKeys(b) + if len(keyWithType) == 0 { + return nil, fmt.Errorf("bundle defines no resources") + } + + var out []Runner + + // If the bundle contains only a single resource, we know what to run. + if len(args) == 0 { + if len(keyWithType) != 1 { + return nil, fmt.Errorf("bundle defines multiple resources; please specify resource to run") + } + for _, runners := range keyWithType { + if len(runners) != 1 { + // This invariant is covered by [ResourceKeys]. + panic("length of []run.Runner must be 1") + } + out = append(out, runners[0]) + } + return out, nil + } + + for _, arg := range args { + runners, ok := keyOnly[arg] + if !ok { + runners, ok = keyWithType[arg] + if !ok { + return nil, fmt.Errorf("no such resource: %s", arg) + } + } + if len(runners) != 1 { + var keys []string + for _, runner := range runners { + keys = append(keys, runner.Key()) + } + return nil, fmt.Errorf("ambiguous: %s (can resolve to all of %s)", arg, strings.Join(keys, ", ")) + } + out = append(out, runners[0]) + } + + return out, nil +} diff --git a/bundle/run/runner_test.go b/bundle/run/runner_test.go new file mode 100644 index 00000000..e284d979 --- /dev/null +++ b/bundle/run/runner_test.go @@ -0,0 +1,138 @@ +package run + +import ( + "testing" + + "github.com/databricks/bricks/bundle" + "github.com/databricks/bricks/bundle/config" + "github.com/databricks/bricks/bundle/config/resources" + "github.com/stretchr/testify/assert" +) + +func TestCollectNoResources(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{}, + }, + } + + _, err := Collect(b, []string{"foo"}) + assert.ErrorContains(t, err, "bundle defines no resources") +} + +func TestCollectNoArg(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": {}, + }, + }, + }, + } + + out, err := Collect(b, []string{}) + assert.NoError(t, err) + assert.Len(t, out, 1) +} + +func TestCollectNoArgMultipleResources(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": {}, + "bar": {}, + }, + }, + }, + } + + _, err := Collect(b, []string{}) + assert.ErrorContains(t, err, "bundle defines multiple resources") +} + +func TestCollectSingleArg(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": {}, + }, + }, + }, + } + + out, err := Collect(b, []string{"foo"}) + assert.NoError(t, err) + assert.Len(t, out, 1) +} + +func TestCollectSingleArgNotFound(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": {}, + }, + }, + }, + } + + _, err := Collect(b, []string{"bar"}) + assert.ErrorContains(t, err, "no such resource: bar") +} + +func TestCollectSingleArgAmbiguous(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "key": {}, + }, + Pipelines: map[string]*resources.Pipeline{ + "key": {}, + }, + }, + }, + } + + _, err := Collect(b, []string{"key"}) + assert.ErrorContains(t, err, "ambiguous: ") +} + +func TestCollectSingleArgWithType(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "key": {}, + }, + }, + }, + } + + out, err := Collect(b, []string{"jobs.key"}) + assert.NoError(t, err) + assert.Len(t, out, 1) +} + +func TestCollectMultipleArg(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": {}, + "bar": {}, + }, + Pipelines: map[string]*resources.Pipeline{ + "qux": {}, + }, + }, + }, + } + + out, err := Collect(b, []string{"foo", "bar", "qux"}) + assert.NoError(t, err) + assert.Len(t, out, 3) +} diff --git a/cmd/bundle/run.go b/cmd/bundle/run.go new file mode 100644 index 00000000..0185e904 --- /dev/null +++ b/cmd/bundle/run.go @@ -0,0 +1,45 @@ +package bundle + +import ( + "github.com/databricks/bricks/bundle" + "github.com/databricks/bricks/bundle/deploy/terraform" + "github.com/databricks/bricks/bundle/phases" + "github.com/databricks/bricks/bundle/run" + "github.com/spf13/cobra" +) + +var runCmd = &cobra.Command{ + Use: "run [flags] KEY...", + Short: "Run a workload (e.g. a job or a pipeline)", + + PreRunE: ConfigureBundle, + RunE: func(cmd *cobra.Command, args []string) error { + b := bundle.Get(cmd.Context()) + err := bundle.Apply(cmd.Context(), b, []bundle.Mutator{ + phases.Initialize(), + terraform.Initialize(), + terraform.Load(), + }) + if err != nil { + return err + } + + runners, err := run.Collect(b, args) + if err != nil { + return err + } + + for _, runner := range runners { + err = runner.Run(cmd.Context()) + if err != nil { + return err + } + } + + return nil + }, +} + +func init() { + rootCmd.AddCommand(runCmd) +}