Log mutator messages using progress logger (#312)

This PR uses progress logger to log messages inside mutators
This commit is contained in:
shreyas-goenka 2023-04-18 16:55:06 +02:00 committed by GitHub
parent d0872b45e2
commit 598ad62688
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 102 additions and 29 deletions

View File

@ -3,7 +3,6 @@ package files
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/libs/cmdio" "github.com/databricks/bricks/libs/cmdio"
@ -23,16 +22,12 @@ func (m *delete) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator,
return nil, nil return nil, nil
} }
// interface to io with the user cmdio.LogString(ctx, "Starting deletion of remote bundle files")
logger, ok := cmdio.FromContext(ctx) cmdio.LogString(ctx, fmt.Sprintf("Bundle remote directory is %s", b.Config.Workspace.RootPath))
if !ok {
return nil, fmt.Errorf("no logger found")
}
red := color.New(color.FgRed).SprintFunc()
fmt.Fprintf(os.Stderr, "\nRemote directory %s will be deleted\n", b.Config.Workspace.RootPath) red := color.New(color.FgRed).SprintFunc()
if !b.AutoApprove { if !b.AutoApprove {
proceed, err := logger.Ask(fmt.Sprintf("%s and all files in it will be %s Proceed?: ", b.Config.Workspace.RootPath, red("deleted permanently!"))) proceed, err := cmdio.Ask(ctx, fmt.Sprintf("\n%s and all files in it will be %s Proceed?: ", b.Config.Workspace.RootPath, red("deleted permanently!")))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -59,7 +54,8 @@ func (m *delete) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator,
return nil, err return nil, err
} }
fmt.Println("Successfully deleted files!") cmdio.LogString(ctx, fmt.Sprintf("Deleted snapshot file at %s", sync.SnapshotPath()))
cmdio.LogString(ctx, "Successfully deleted files!")
return nil, nil return nil, nil
} }

View File

@ -2,8 +2,10 @@ package files
import ( import (
"context" "context"
"fmt"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/libs/cmdio"
) )
type upload struct{} type upload struct{}
@ -13,6 +15,7 @@ func (m *upload) Name() string {
} }
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) { func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
cmdio.LogString(ctx, "Starting upload of bundle files")
sync, err := getSync(ctx, b) sync, err := getSync(ctx, b)
if err != nil { if err != nil {
return nil, err return nil, err
@ -23,6 +26,7 @@ func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator,
return nil, err return nil, err
} }
cmdio.LogString(ctx, fmt.Sprintf("Uploaded bundle files at %s!\n", b.Config.Workspace.FilesPath))
return nil, nil return nil, nil
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/libs/cmdio"
"github.com/hashicorp/terraform-exec/tfexec" "github.com/hashicorp/terraform-exec/tfexec"
) )
@ -20,6 +21,8 @@ func (w *apply) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator,
return nil, fmt.Errorf("terraform not initialized") return nil, fmt.Errorf("terraform not initialized")
} }
cmdio.LogString(ctx, "Starting resource deployment")
err := tf.Init(ctx, tfexec.Upgrade(true)) err := tf.Init(ctx, tfexec.Upgrade(true))
if err != nil { if err != nil {
return nil, fmt.Errorf("terraform init: %w", err) return nil, fmt.Errorf("terraform init: %w", err)
@ -30,6 +33,7 @@ func (w *apply) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator,
return nil, fmt.Errorf("terraform apply: %w", err) return nil, fmt.Errorf("terraform apply: %w", err)
} }
cmdio.LogString(ctx, "Resource deployment completed!")
return nil, nil return nil, nil
} }

View File

@ -3,7 +3,6 @@ package terraform
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"strings" "strings"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
@ -13,8 +12,6 @@ import (
tfjson "github.com/hashicorp/terraform-json" tfjson "github.com/hashicorp/terraform-json"
) )
// TODO: This is temporary. Come up with a robust way to log mutator progress and
// status events
type PlanResourceChange struct { type PlanResourceChange struct {
ResourceType string `json:"resource_type"` ResourceType string `json:"resource_type"`
Action string `json:"action"` Action string `json:"action"`
@ -45,12 +42,11 @@ func (c *PlanResourceChange) IsInplaceSupported() bool {
return false return false
} }
func logDestroyPlan(l *cmdio.Logger, changes []*tfjson.ResourceChange) error { func logDestroyPlan(ctx context.Context, changes []*tfjson.ResourceChange) error {
// TODO: remove once we have mutator logging in place cmdio.LogString(ctx, "The following resources will be removed:")
fmt.Fprintln(os.Stderr, "The following resources will be removed: ")
for _, c := range changes { for _, c := range changes {
if c.Change.Actions.Delete() { if c.Change.Actions.Delete() {
l.Log(&PlanResourceChange{ cmdio.Log(ctx, &PlanResourceChange{
ResourceType: c.Type, ResourceType: c.Type,
Action: "delete", Action: "delete",
ResourceName: c.Name, ResourceName: c.Name,
@ -67,14 +63,9 @@ func (w *destroy) Name() string {
} }
func (w *destroy) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) { func (w *destroy) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
// interface to io with the user // return early if plan is empty
logger, ok := cmdio.FromContext(ctx)
if !ok {
return nil, fmt.Errorf("no logger found")
}
if b.Plan.IsEmpty { if b.Plan.IsEmpty {
fmt.Fprintln(os.Stderr, "No resources to destroy!") cmdio.LogString(ctx, "No resources to destroy in plan. Skipping destroy!")
return nil, nil return nil, nil
} }
@ -90,7 +81,7 @@ func (w *destroy) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator
} }
// print the resources that will be destroyed // print the resources that will be destroyed
err = logDestroyPlan(logger, plan.ResourceChanges) err = logDestroyPlan(ctx, plan.ResourceChanges)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -98,7 +89,7 @@ func (w *destroy) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator
// Ask for confirmation, if needed // Ask for confirmation, if needed
if !b.Plan.ConfirmApply { if !b.Plan.ConfirmApply {
red := color.New(color.FgRed).SprintFunc() red := color.New(color.FgRed).SprintFunc()
b.Plan.ConfirmApply, err = logger.Ask(fmt.Sprintf("\nThis will permanently %s resources! Proceed? [y/n]: ", red("destroy"))) b.Plan.ConfirmApply, err = cmdio.Ask(ctx, fmt.Sprintf("\nThis will permanently %s resources! Proceed? [y/n]: ", red("destroy")))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -113,13 +104,15 @@ func (w *destroy) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator
return nil, fmt.Errorf("no plan found") return nil, fmt.Errorf("no plan found")
} }
cmdio.LogString(ctx, "Starting to destroy resources")
// Apply terraform according to the computed destroy plan // Apply terraform according to the computed destroy plan
err = tf.Apply(ctx, tfexec.DirOrPlan(b.Plan.Path)) err = tf.Apply(ctx, tfexec.DirOrPlan(b.Plan.Path))
if err != nil { if err != nil {
return nil, fmt.Errorf("terraform destroy: %w", err) return nil, fmt.Errorf("terraform destroy: %w", err)
} }
fmt.Fprintln(os.Stderr, "Successfully destroyed resources!") cmdio.LogString(ctx, "Successfully destroyed resources!")
return nil, nil return nil, nil
} }

View File

@ -6,6 +6,7 @@ import (
"path/filepath" "path/filepath"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/libs/cmdio"
"github.com/databricks/bricks/libs/terraform" "github.com/databricks/bricks/libs/terraform"
"github.com/hashicorp/terraform-exec/tfexec" "github.com/hashicorp/terraform-exec/tfexec"
) )
@ -31,6 +32,8 @@ func (p *plan) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, e
return nil, fmt.Errorf("terraform not initialized") return nil, fmt.Errorf("terraform not initialized")
} }
cmdio.LogString(ctx, "Starting plan computation")
err := tf.Init(ctx, tfexec.Upgrade(true)) err := tf.Init(ctx, tfexec.Upgrade(true))
if err != nil { if err != nil {
return nil, fmt.Errorf("terraform init: %w", err) return nil, fmt.Errorf("terraform init: %w", err)
@ -43,6 +46,7 @@ func (p *plan) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, e
} }
planPath := filepath.Join(tfDir, "plan") planPath := filepath.Join(tfDir, "plan")
destroy := p.goal == PlanDestroy destroy := p.goal == PlanDestroy
notEmpty, err := tf.Plan(ctx, tfexec.Destroy(destroy), tfexec.Out(planPath)) notEmpty, err := tf.Plan(ctx, tfexec.Destroy(destroy), tfexec.Out(planPath))
if err != nil { if err != nil {
return nil, err return nil, err
@ -54,6 +58,8 @@ func (p *plan) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, e
ConfirmApply: b.AutoApprove, ConfirmApply: b.AutoApprove,
IsEmpty: !notEmpty, IsEmpty: !notEmpty,
} }
cmdio.LogString(ctx, fmt.Sprintf("Planning complete and persisted at %s\n", planPath))
return nil, nil return nil, nil
} }

View File

@ -19,7 +19,8 @@ var destroyCmd = &cobra.Command{
PreRunE: root.MustConfigureBundle, PreRunE: root.MustConfigureBundle,
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
b := bundle.Get(cmd.Context()) ctx := cmd.Context()
b := bundle.Get(ctx)
// If `--force` is specified, force acquisition of the deployment lock. // If `--force` is specified, force acquisition of the deployment lock.
b.Config.Bundle.Lock.Force = force b.Config.Bundle.Lock.Force = force
@ -33,7 +34,15 @@ var destroyCmd = &cobra.Command{
return fmt.Errorf("please specify --auto-approve to skip interactive confirmation checks for non tty consoles") return fmt.Errorf("please specify --auto-approve to skip interactive confirmation checks for non tty consoles")
} }
ctx := cmdio.NewContext(cmd.Context(), cmdio.NewLogger(flags.ModeAppend)) // Check auto-approve is selected for json logging
logger, ok := cmdio.FromContext(ctx)
if !ok {
return fmt.Errorf("progress logger not found")
}
if logger.Mode == flags.ModeJson && !autoApprove {
return fmt.Errorf("please specify --auto-approve since selected logging format is json")
}
return bundle.Apply(ctx, b, []bundle.Mutator{ return bundle.Apply(ctx, b, []bundle.Mutator{
phases.Initialize(), phases.Initialize(),
phases.Build(), phases.Build(),

View File

@ -2,6 +2,7 @@ package cmdio
import ( import (
"bufio" "bufio"
"context"
"encoding/json" "encoding/json"
"io" "io"
"os" "os"
@ -9,12 +10,20 @@ import (
"github.com/databricks/bricks/libs/flags" "github.com/databricks/bricks/libs/flags"
) )
// This is the interface for all io interactions with a user
type Logger struct { type Logger struct {
// Mode for the logger. One of (append, inplace, json).
Mode flags.ProgressLogFormat Mode flags.ProgressLogFormat
// Input stream (eg. stdin). Answers to questions prompted using the Ask() method
// are read from here
Reader bufio.Reader Reader bufio.Reader
// Output stream where the logger writes to
Writer io.Writer Writer io.Writer
// If true, indicates no events have been printed by the logger yet. Used
// by inplace logging for formatting
isFirstEvent bool isFirstEvent bool
} }
@ -27,6 +36,41 @@ func NewLogger(mode flags.ProgressLogFormat) *Logger {
} }
} }
func Default() *Logger {
return &Logger{
Mode: flags.ModeAppend,
Writer: os.Stderr,
Reader: *bufio.NewReader(os.Stdin),
isFirstEvent: true,
}
}
func Log(ctx context.Context, event Event) {
logger, ok := FromContext(ctx)
if !ok {
logger = Default()
}
logger.Log(event)
}
func LogString(ctx context.Context, message string) {
logger, ok := FromContext(ctx)
if !ok {
logger = Default()
}
logger.Log(&MessageEvent{
Message: message,
})
}
func Ask(ctx context.Context, question string) (bool, error) {
logger, ok := FromContext(ctx)
if !ok {
logger = Default()
}
return logger.Ask(question)
}
func (l *Logger) Ask(question string) (bool, error) { func (l *Logger) Ask(question string) (bool, error) {
l.Writer.Write([]byte(question)) l.Writer.Write([]byte(question))
ans, err := l.Reader.ReadString('\n') ans, err := l.Reader.ReadString('\n')

View File

@ -0,0 +1,13 @@
package cmdio
type MessageEvent struct {
Message string `json:"message"`
}
func (event *MessageEvent) String() string {
return event.Message
}
func (event *MessageEvent) IsInplaceSupported() bool {
return false
}

View File

@ -164,6 +164,10 @@ func (s *Sync) DestroySnapshot(ctx context.Context) error {
return s.snapshot.Destroy(ctx) return s.snapshot.Destroy(ctx)
} }
func (s *Sync) SnapshotPath() string {
return s.snapshot.SnapshotPath
}
func (s *Sync) RunContinuous(ctx context.Context) error { func (s *Sync) RunContinuous(ctx context.Context) error {
ticker := time.NewTicker(s.PollInterval) ticker := time.NewTicker(s.PollInterval)
defer ticker.Stop() defer ticker.Stop()