Use new logger throughout codebase (#256)

This commit is contained in:
Pieter Noordhuis 2023-03-17 15:17:31 +01:00 committed by GitHub
parent c9340d6317
commit ad666ff796
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 82 additions and 87 deletions

View File

@ -3,13 +3,13 @@ package terraform
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config" "github.com/databricks/bricks/bundle/config"
"github.com/databricks/bricks/libs/log"
"github.com/hashicorp/go-version" "github.com/hashicorp/go-version"
"github.com/hashicorp/hc-install/product" "github.com/hashicorp/hc-install/product"
"github.com/hashicorp/hc-install/releases" "github.com/hashicorp/hc-install/releases"
@ -30,7 +30,7 @@ func (m *initialize) findExecPath(ctx context.Context, b *bundle.Bundle, tf *con
return "", err return "", err
} }
tf.ExecPath = execPath tf.ExecPath = execPath
log.Printf("[DEBUG] Using Terraform at %s", tf.ExecPath) log.Debugf(ctx, "Using Terraform at %s", tf.ExecPath)
return tf.ExecPath, nil return tf.ExecPath, nil
} }
@ -47,7 +47,7 @@ func (m *initialize) findExecPath(ctx context.Context, b *bundle.Bundle, tf *con
} }
if err == nil { if err == nil {
tf.ExecPath = execPath tf.ExecPath = execPath
log.Printf("[DEBUG] Using Terraform at %s", tf.ExecPath) log.Debugf(ctx, "Using Terraform at %s", tf.ExecPath)
return tf.ExecPath, nil return tf.ExecPath, nil
} }
@ -63,7 +63,7 @@ func (m *initialize) findExecPath(ctx context.Context, b *bundle.Bundle, tf *con
} }
tf.ExecPath = execPath tf.ExecPath = execPath
log.Printf("[DEBUG] Using Terraform at %s", tf.ExecPath) log.Debugf(ctx, "Using Terraform at %s", tf.ExecPath)
return tf.ExecPath, nil return tf.ExecPath, nil
} }

View File

@ -3,11 +3,11 @@ package deployer
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
"github.com/hashicorp/terraform-exec/tfexec" "github.com/hashicorp/terraform-exec/tfexec"
) )
@ -138,30 +138,30 @@ func (d *Deployer) ApplyTerraformConfig(ctx context.Context, configPath, terrafo
defer func() { defer func() {
applyErr = d.Unlock(ctx) applyErr = d.Unlock(ctx)
if applyErr != nil { if applyErr != nil {
log.Printf("[ERROR] failed to unlock deployment mutex: %s", applyErr) log.Errorf(ctx, "failed to unlock deployment mutex: %s", applyErr)
} }
}() }()
applyErr = d.LoadTerraformState(ctx) applyErr = d.LoadTerraformState(ctx)
if applyErr != nil { if applyErr != nil {
log.Printf("[DEBUG] failed to load terraform state from workspace: %s", applyErr) log.Debugf(ctx, "failed to load terraform state from workspace: %s", applyErr)
return Failed, applyErr return Failed, applyErr
} }
tf, applyErr := tfexec.NewTerraform(configPath, terraformBinaryPath) tf, applyErr := tfexec.NewTerraform(configPath, terraformBinaryPath)
if applyErr != nil { if applyErr != nil {
log.Printf("[DEBUG] failed to construct terraform object: %s", applyErr) log.Debugf(ctx, "failed to construct terraform object: %s", applyErr)
return Failed, applyErr return Failed, applyErr
} }
isPlanNotEmpty, applyErr := tf.Plan(ctx) isPlanNotEmpty, applyErr := tf.Plan(ctx)
if applyErr != nil { if applyErr != nil {
log.Printf("[DEBUG] failed to compute terraform plan: %s", applyErr) log.Debugf(ctx, "failed to compute terraform plan: %s", applyErr)
return Failed, applyErr return Failed, applyErr
} }
if !isPlanNotEmpty { if !isPlanNotEmpty {
log.Printf("[DEBUG] terraform plan returned a empty diff") log.Debugf(ctx, "terraform plan returned a empty diff")
return NoChanges, nil return NoChanges, nil
} }
@ -170,16 +170,16 @@ func (d *Deployer) ApplyTerraformConfig(ctx context.Context, configPath, terrafo
saveStateErr := d.SaveTerraformState(ctx) saveStateErr := d.SaveTerraformState(ctx)
if applyErr != nil && saveStateErr != nil { if applyErr != nil && saveStateErr != nil {
log.Printf("[ERROR] terraform apply failed: %s", applyErr) log.Errorf(ctx, "terraform apply failed: %s", applyErr)
log.Printf("[ERROR] failed to upload terraform state after partial terraform apply: %s", saveStateErr) log.Errorf(ctx, "failed to upload terraform state after partial terraform apply: %s", saveStateErr)
return PartialButUntracked, fmt.Errorf("deploymented failed: %s", applyErr) return PartialButUntracked, fmt.Errorf("deploymented failed: %s", applyErr)
} }
if applyErr != nil { if applyErr != nil {
log.Printf("[ERROR] terraform apply failed: %s", applyErr) log.Errorf(ctx, "terraform apply failed: %s", applyErr)
return Partial, fmt.Errorf("deploymented failed: %s", applyErr) return Partial, fmt.Errorf("deploymented failed: %s", applyErr)
} }
if saveStateErr != nil { if saveStateErr != nil {
log.Printf("[ERROR] failed to upload terraform state after completing terraform apply: %s", saveStateErr) log.Errorf(ctx, "failed to upload terraform state after completing terraform apply: %s", saveStateErr)
return CompleteButUntracked, fmt.Errorf("failed to upload terraform state file: %s", saveStateErr) return CompleteButUntracked, fmt.Errorf("failed to upload terraform state file: %s", saveStateErr)
} }
return Complete, nil return Complete, nil

View File

@ -3,12 +3,12 @@ package run
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"strconv" "strconv"
"time" "time"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config/resources" "github.com/databricks/bricks/bundle/config/resources"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/databricks-sdk-go/retries" "github.com/databricks/databricks-sdk-go/retries"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/fatih/color" "github.com/fatih/color"
@ -109,13 +109,11 @@ func (r *jobRunner) logFailedTasks(ctx context.Context, runId int64) {
red := color.New(color.FgRed).SprintFunc() red := color.New(color.FgRed).SprintFunc()
green := color.New(color.FgGreen).SprintFunc() green := color.New(color.FgGreen).SprintFunc()
yellow := color.New(color.FgYellow).SprintFunc() yellow := color.New(color.FgYellow).SprintFunc()
errorPrefix := fmt.Sprintf("%s [%s]", red("[ERROR]"), r.Key())
infoPrefix := fmt.Sprintf("%s [%s]", "[INFO]", r.Key())
run, err := w.Jobs.GetRun(ctx, jobs.GetRun{ run, err := w.Jobs.GetRun(ctx, jobs.GetRun{
RunId: runId, RunId: runId,
}) })
if err != nil { if err != nil {
log.Printf("%s failed to log job run. Error: %s", errorPrefix, err) log.Errorf(ctx, "failed to log job run. Error: %s", err)
return return
} }
if run.State.ResultState == jobs.RunResultStateSuccess { if run.State.ResultState == jobs.RunResultStateSuccess {
@ -123,20 +121,19 @@ func (r *jobRunner) logFailedTasks(ctx context.Context, runId int64) {
} }
for _, task := range run.Tasks { for _, task := range run.Tasks {
if isSuccess(task) { if isSuccess(task) {
log.Printf("%s task %s completed successfully", infoPrefix, green(task.TaskKey)) log.Infof(ctx, "task %s completed successfully", green(task.TaskKey))
} else if isFailed(task) { } else if isFailed(task) {
taskInfo, err := w.Jobs.GetRunOutput(ctx, jobs.GetRunOutput{ taskInfo, err := w.Jobs.GetRunOutput(ctx, jobs.GetRunOutput{
RunId: task.RunId, RunId: task.RunId,
}) })
if err != nil { if err != nil {
log.Printf("%s task %s failed. Unable to fetch error trace: %s", log.Errorf(ctx, "task %s failed. Unable to fetch error trace: %s", red(task.TaskKey), err)
errorPrefix, red(task.TaskKey), err)
continue continue
} }
log.Printf("%s Task %s failed!\nError:\n%s\nTrace:\n%s", errorPrefix, log.Errorf(ctx, "Task %s failed!\nError:\n%s\nTrace:\n%s",
red(task.TaskKey), taskInfo.Error, taskInfo.ErrorTrace) red(task.TaskKey), taskInfo.Error, taskInfo.ErrorTrace)
} else { } else {
log.Printf("%s task %s is in state %s", infoPrefix, log.Infof(ctx, "task %s is in state %s",
yellow(task.TaskKey), task.State.LifeCycleState) yellow(task.TaskKey), task.State.LifeCycleState)
} }
} }
@ -149,7 +146,6 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) error {
return fmt.Errorf("job ID is not an integer: %s", r.job.ID) return fmt.Errorf("job ID is not an integer: %s", r.job.ID)
} }
var prefix = fmt.Sprintf("[INFO] [%s]", r.Key())
var prevState *jobs.RunState var prevState *jobs.RunState
var runId *int64 var runId *int64
@ -167,10 +163,10 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) error {
// Log the job run URL as soon as it is available. // Log the job run URL as soon as it is available.
if prevState == nil { if prevState == nil {
log.Printf("%s Run available at %s", prefix, info.Info.RunPageUrl) log.Infof(ctx, "Run available at %s", info.Info.RunPageUrl)
} }
if prevState == nil || prevState.LifeCycleState != state.LifeCycleState { if prevState == nil || prevState.LifeCycleState != state.LifeCycleState {
log.Printf("%s Run status: %s", prefix, info.Info.State.LifeCycleState) log.Infof(ctx, "Run status: %s", info.Info.State.LifeCycleState)
prevState = state prevState = state
} }
if runId == nil { if runId == nil {
@ -183,8 +179,9 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) error {
return err return err
} }
// Include resource key in logger.
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("resource", r.Key()))
w := r.bundle.WorkspaceClient() w := r.bundle.WorkspaceClient()
run, err := w.Jobs.RunNowAndWait(ctx, *req, retries.Timeout[jobs.Run](jobRunTimeout), update) run, err := w.Jobs.RunNowAndWait(ctx, *req, retries.Timeout[jobs.Run](jobRunTimeout), update)
if err != nil && runId != nil { if err != nil && runId != nil {
r.logFailedTasks(ctx, *runId) r.logFailedTasks(ctx, *runId)
@ -197,22 +194,22 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) error {
switch run.State.ResultState { switch run.State.ResultState {
// The run was canceled at user request. // The run was canceled at user request.
case jobs.RunResultStateCanceled: case jobs.RunResultStateCanceled:
log.Printf("%s Run was cancelled!", prefix) log.Infof(ctx, "Run was cancelled!")
return fmt.Errorf("run canceled: %s", run.State.StateMessage) return fmt.Errorf("run canceled: %s", run.State.StateMessage)
// The task completed with an error. // The task completed with an error.
case jobs.RunResultStateFailed: case jobs.RunResultStateFailed:
log.Printf("%s Run has failed!", prefix) log.Infof(ctx, "Run has failed!")
return fmt.Errorf("run failed: %s", run.State.StateMessage) return fmt.Errorf("run failed: %s", run.State.StateMessage)
// The task completed successfully. // The task completed successfully.
case jobs.RunResultStateSuccess: case jobs.RunResultStateSuccess:
log.Printf("%s Run has completed successfully!", prefix) log.Infof(ctx, "Run has completed successfully!")
return nil return nil
// The run was stopped after reaching the timeout. // The run was stopped after reaching the timeout.
case jobs.RunResultStateTimedout: case jobs.RunResultStateTimedout:
log.Printf("%s Run has timed out!", prefix) log.Infof(ctx, "Run has timed out!")
return fmt.Errorf("run timed out: %s", run.State.StateMessage) return fmt.Errorf("run timed out: %s", run.State.StateMessage)
} }

View File

@ -3,14 +3,13 @@ package run
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"strings" "strings"
"time" "time"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/config/resources" "github.com/databricks/bricks/bundle/config/resources"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/fatih/color"
flag "github.com/spf13/pflag" flag "github.com/spf13/pflag"
) )
@ -24,12 +23,8 @@ func filterEventsByUpdateId(events []pipelines.PipelineEvent, updateId string) [
return result return result
} }
func (r *pipelineRunner) logEvent(event pipelines.PipelineEvent) { func (r *pipelineRunner) logEvent(ctx context.Context, event pipelines.PipelineEvent) {
red := color.New(color.FgRed).SprintFunc() logString := ""
errorPrefix := red("[ERROR]")
pipelineKeyPrefix := red(fmt.Sprintf("[%s]", r.Key()))
eventTypePrefix := red(fmt.Sprintf("[%s]", event.EventType))
logString := errorPrefix + pipelineKeyPrefix + eventTypePrefix
if event.Message != "" { if event.Message != "" {
logString += fmt.Sprintf(" %s\n", event.Message) logString += fmt.Sprintf(" %s\n", event.Message)
} }
@ -39,13 +34,12 @@ func (r *pipelineRunner) logEvent(event pipelines.PipelineEvent) {
logString += fmt.Sprintf("%s\n", event.Error.Exceptions[i].Message) logString += fmt.Sprintf("%s\n", event.Error.Exceptions[i].Message)
} }
} }
if logString != errorPrefix { if logString != "" {
log.Print(logString) log.Errorf(ctx, fmt.Sprintf("[%s] %s", event.EventType, logString))
} }
} }
func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, updateId string) error { func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, updateId string) error {
w := r.bundle.WorkspaceClient() w := r.bundle.WorkspaceClient()
// Note: For a 100 percent correct and complete solution we should use the // Note: For a 100 percent correct and complete solution we should use the
@ -69,7 +63,7 @@ func (r *pipelineRunner) logErrorEvent(ctx context.Context, pipelineId string, u
// The events API returns most recent events first. We iterate in a reverse order // The events API returns most recent events first. We iterate in a reverse order
// to print the events chronologically // to print the events chronologically
for i := len(updateEvents) - 1; i >= 0; i-- { for i := len(updateEvents) - 1; i >= 0; i-- {
r.logEvent(updateEvents[i]) r.logEvent(ctx, updateEvents[i])
} }
return nil return nil
} }
@ -140,13 +134,14 @@ type pipelineRunner struct {
} }
func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error { func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
var prefix = fmt.Sprintf("[INFO] [%s]", r.Key())
var pipelineID = r.pipeline.ID var pipelineID = r.pipeline.ID
// Include resource key in logger.
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("resource", r.Key()))
w := r.bundle.WorkspaceClient() w := r.bundle.WorkspaceClient()
_, err := w.Pipelines.GetByPipelineId(ctx, pipelineID) _, err := w.Pipelines.GetByPipelineId(ctx, pipelineID)
if err != nil { if err != nil {
log.Printf("[WARN] Cannot get pipeline: %s", err) log.Warnf(ctx, "Cannot get pipeline: %s", err)
return err return err
} }
@ -164,7 +159,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
// Log the pipeline update URL as soon as it is available. // Log the pipeline update URL as soon as it is available.
updateUrl := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID) updateUrl := fmt.Sprintf("%s/#joblist/pipelines/%s/updates/%s", w.Config.Host, pipelineID, updateID)
log.Printf("%s Update available at %s", prefix, updateUrl) log.Infof(ctx, "Update available at %s", updateUrl)
// Poll update for completion and post status. // Poll update for completion and post status.
// Note: there is no "StartUpdateAndWait" wrapper for this API. // Note: there is no "StartUpdateAndWait" wrapper for this API.
@ -178,16 +173,16 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
// Log only if the current state is different from the previous state. // Log only if the current state is different from the previous state.
state := update.Update.State state := update.Update.State
if prevState == nil || *prevState != state { if prevState == nil || *prevState != state {
log.Printf("%s Update status: %s", prefix, state) log.Infof(ctx, "Update status: %s", state)
prevState = &state prevState = &state
} }
if state == pipelines.UpdateInfoStateCanceled { if state == pipelines.UpdateInfoStateCanceled {
log.Printf("%s Update was cancelled!", prefix) log.Infof(ctx, "Update was cancelled!")
return fmt.Errorf("update cancelled") return fmt.Errorf("update cancelled")
} }
if state == pipelines.UpdateInfoStateFailed { if state == pipelines.UpdateInfoStateFailed {
log.Printf("%s Update has failed!", prefix) log.Infof(ctx, "Update has failed!")
err := r.logErrorEvent(ctx, pipelineID, updateID) err := r.logErrorEvent(ctx, pipelineID, updateID)
if err != nil { if err != nil {
return err return err
@ -195,7 +190,7 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) error {
return fmt.Errorf("update failed") return fmt.Errorf("update failed")
} }
if state == pipelines.UpdateInfoStateCompleted { if state == pipelines.UpdateInfoStateCompleted {
log.Printf("%s Update has completed successfully!", prefix) log.Infof(ctx, "Update has completed successfully!")
return nil return nil
} }

View File

@ -1,12 +1,12 @@
package deploy package deploy
import ( import (
"log"
"os" "os"
"path/filepath" "path/filepath"
"github.com/databricks/bricks/bundle/deployer" "github.com/databricks/bricks/bundle/deployer"
"github.com/databricks/bricks/cmd/bundle/debug" "github.com/databricks/bricks/cmd/bundle/debug"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
"github.com/hashicorp/go-version" "github.com/hashicorp/go-version"
"github.com/hashicorp/hc-install/product" "github.com/hashicorp/hc-install/product"
@ -37,7 +37,7 @@ var deployTerraformCmd = &cobra.Command{
} }
execPath, err := installer.Install(ctx) execPath, err := installer.Install(ctx)
if err != nil { if err != nil {
log.Printf("[ERROR] error installing Terraform: %s", err) log.Errorf(ctx, "error installing Terraform: %s", err)
} }
*terraformBinaryPath = execPath *terraformBinaryPath = execPath
defer installer.Remove(ctx) defer installer.Remove(ctx)
@ -62,17 +62,17 @@ var deployTerraformCmd = &cobra.Command{
status, err := d.ApplyTerraformConfig(ctx, *terraformHcl, *terraformBinaryPath, *isForced) status, err := d.ApplyTerraformConfig(ctx, *terraformHcl, *terraformBinaryPath, *isForced)
switch status { switch status {
case deployer.Failed: case deployer.Failed:
log.Printf("[ERROR] failed to initiate deployment") log.Errorf(ctx, "failed to initiate deployment")
case deployer.NoChanges: case deployer.NoChanges:
log.Printf("[INFO] no changes detected") log.Infof(ctx, "no changes detected")
case deployer.Partial: case deployer.Partial:
log.Printf("[ERROR] started deployment, but failed to complete") log.Errorf(ctx, "started deployment, but failed to complete")
case deployer.PartialButUntracked: case deployer.PartialButUntracked:
log.Printf("[ERROR] started deployment, but failed to complete. Any partially deployed resources in this run are untracked in the databricks workspace and might not be cleaned up on future deployments") log.Errorf(ctx, "started deployment, but failed to complete. Any partially deployed resources in this run are untracked in the databricks workspace and might not be cleaned up on future deployments")
case deployer.CompleteButUntracked: case deployer.CompleteButUntracked:
log.Printf("[ERROR] deployment complete. Failed to track deployed resources. Any deployed resources in this run are untracked in the databricks workspace and might not be cleaned up on future deployments") log.Errorf(ctx, "deployment complete. Failed to track deployed resources. Any deployed resources in this run are untracked in the databricks workspace and might not be cleaned up on future deployments")
case deployer.Complete: case deployer.Complete:
log.Printf("[INFO] deployment complete") log.Infof(ctx, "deployment complete")
} }
return err return err
}, },

View File

@ -2,12 +2,12 @@ package bundle
import ( import (
"fmt" "fmt"
"log"
"time" "time"
"github.com/databricks/bricks/bundle" "github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/phases" "github.com/databricks/bricks/bundle/phases"
"github.com/databricks/bricks/cmd/root" "github.com/databricks/bricks/cmd/root"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/bricks/libs/sync" "github.com/databricks/bricks/libs/sync"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -58,7 +58,7 @@ var syncCmd = &cobra.Command{
return err return err
} }
log.Printf("[INFO] Remote file sync location: %v", opts.RemotePath) log.Infof(ctx, "Remote file sync location: %v", opts.RemotePath)
if watch { if watch {
return s.RunContinuous(ctx) return s.RunContinuous(ctx)

View File

@ -3,10 +3,10 @@ package sync
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"path" "path"
"strings" "strings"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/service/scim" "github.com/databricks/databricks-sdk-go/service/scim"
@ -101,8 +101,9 @@ func EnsureRemotePathIsUsable(ctx context.Context, wsc *databricks.WorkspaceClie
} }
} }
log.Printf( log.Debugf(
"[DEBUG] Path %s has type %s (ID: %d)", ctx,
"Path %s has type %s (ID: %d)",
info.Path, info.Path,
strings.ToLower(info.ObjectType.String()), strings.ToLower(info.ObjectType.String()),
info.ObjectId, info.ObjectId,

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
@ -14,6 +13,7 @@ import (
"encoding/hex" "encoding/hex"
"github.com/databricks/bricks/libs/fileset" "github.com/databricks/bricks/libs/fileset"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/bricks/libs/notebook" "github.com/databricks/bricks/libs/notebook"
) )
@ -88,7 +88,7 @@ func SnapshotPath(opts *SyncOptions) (string, error) {
return filepath.Join(snapshotDir, fileName), nil return filepath.Join(snapshotDir, fileName), nil
} }
func newSnapshot(opts *SyncOptions) (*Snapshot, error) { func newSnapshot(ctx context.Context, opts *SyncOptions) (*Snapshot, error) {
path, err := SnapshotPath(opts) path, err := SnapshotPath(opts)
if err != nil { if err != nil {
return nil, err return nil, err
@ -126,8 +126,8 @@ func (s *Snapshot) Save(ctx context.Context) error {
return nil return nil
} }
func loadOrNewSnapshot(opts *SyncOptions) (*Snapshot, error) { func loadOrNewSnapshot(ctx context.Context, opts *SyncOptions) (*Snapshot, error) {
snapshot, err := newSnapshot(opts) snapshot, err := newSnapshot(ctx, opts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -150,8 +150,8 @@ func loadOrNewSnapshot(opts *SyncOptions) (*Snapshot, error) {
// invalidate old snapshot with schema versions // invalidate old snapshot with schema versions
if fromDisk.Version != LatestSnapshotVersion { if fromDisk.Version != LatestSnapshotVersion {
log.Printf("Did not load existing snapshot because its version is %s while the latest version is %s", snapshot.Version, LatestSnapshotVersion) log.Warnf(ctx, "Did not load existing snapshot because its version is %s while the latest version is %s", snapshot.Version, LatestSnapshotVersion)
return newSnapshot(opts) return newSnapshot(ctx, opts)
} }
// unmarshal again over the existing snapshot instance // unmarshal again over the existing snapshot instance

View File

@ -1,6 +1,7 @@
package sync package sync
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
@ -296,7 +297,7 @@ func defaultOptions(t *testing.T) *SyncOptions {
func TestNewSnapshotDefaults(t *testing.T) { func TestNewSnapshotDefaults(t *testing.T) {
opts := defaultOptions(t) opts := defaultOptions(t)
snapshot, err := newSnapshot(opts) snapshot, err := newSnapshot(context.Background(), opts)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, LatestSnapshotVersion, snapshot.Version) assert.Equal(t, LatestSnapshotVersion, snapshot.Version)
@ -325,7 +326,7 @@ func TestOldSnapshotInvalidation(t *testing.T) {
snapshotFile.Close(t) snapshotFile.Close(t)
// assert snapshot did not get loaded // assert snapshot did not get loaded
snapshot, err := loadOrNewSnapshot(opts) snapshot, err := loadOrNewSnapshot(context.Background(), opts)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, snapshot.New) assert.True(t, snapshot.New)
} }
@ -347,7 +348,7 @@ func TestNoVersionSnapshotInvalidation(t *testing.T) {
snapshotFile.Close(t) snapshotFile.Close(t)
// assert snapshot did not get loaded // assert snapshot did not get loaded
snapshot, err := loadOrNewSnapshot(opts) snapshot, err := loadOrNewSnapshot(context.Background(), opts)
require.NoError(t, err) require.NoError(t, err)
assert.True(t, snapshot.New) assert.True(t, snapshot.New)
} }
@ -370,7 +371,7 @@ func TestLatestVersionSnapshotGetsLoaded(t *testing.T) {
snapshotFile.Close(t) snapshotFile.Close(t)
// assert snapshot gets loaded // assert snapshot gets loaded
snapshot, err := loadOrNewSnapshot(opts) snapshot, err := loadOrNewSnapshot(context.Background(), opts)
require.NoError(t, err) require.NoError(t, err)
assert.False(t, snapshot.New) assert.False(t, snapshot.New)
assert.Equal(t, LatestSnapshotVersion, snapshot.Version) assert.Equal(t, LatestSnapshotVersion, snapshot.Version)

View File

@ -3,10 +3,10 @@ package sync
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"time" "time"
"github.com/databricks/bricks/libs/git" "github.com/databricks/bricks/libs/git"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/bricks/libs/sync/repofiles" "github.com/databricks/bricks/libs/sync/repofiles"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
) )
@ -66,12 +66,12 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
// For incremental sync, we try to load an existing snapshot to start from. // For incremental sync, we try to load an existing snapshot to start from.
var snapshot *Snapshot var snapshot *Snapshot
if opts.Full { if opts.Full {
snapshot, err = newSnapshot(&opts) snapshot, err = newSnapshot(ctx, &opts)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to instantiate new sync snapshot: %w", err) return nil, fmt.Errorf("unable to instantiate new sync snapshot: %w", err)
} }
} else { } else {
snapshot, err = loadOrNewSnapshot(&opts) snapshot, err = loadOrNewSnapshot(ctx, &opts)
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to load sync snapshot: %w", err) return nil, fmt.Errorf("unable to load sync snapshot: %w", err)
} }
@ -130,7 +130,7 @@ func (s *Sync) RunOnce(ctx context.Context) error {
// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
all, err := s.fileSet.All() all, err := s.fileSet.All()
if err != nil { if err != nil {
log.Printf("[ERROR] cannot list files: %s", err) log.Errorf(ctx, "cannot list files: %s", err)
return err return err
} }
@ -152,7 +152,7 @@ func (s *Sync) RunOnce(ctx context.Context) error {
err = s.snapshot.Save(ctx) err = s.snapshot.Save(ctx)
if err != nil { if err != nil {
log.Printf("[ERROR] cannot store snapshot: %s", err) log.Errorf(ctx, "cannot store snapshot: %s", err)
return err return err
} }

View File

@ -4,9 +4,9 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"strings" "strings"
"github.com/databricks/bricks/libs/log"
"golang.org/x/mod/semver" "golang.org/x/mod/semver"
) )
@ -49,7 +49,7 @@ func DependencyFromSpec(raw string) (d Dependency) {
// TODO: write a normal parser for this // TODO: write a normal parser for this
rawSplit := strings.Split(raw, "==") rawSplit := strings.Split(raw, "==")
if len(rawSplit) != 2 { if len(rawSplit) != 2 {
log.Printf("[DEBUG] Skipping invalid dep: %s", raw) log.Debugf(context.Background(), "Skipping invalid dep: %s", raw)
return return
} }
d.Name = rawSplit[0] d.Name = rawSplit[0]

View File

@ -4,11 +4,11 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"log"
"os" "os"
"path" "path"
"strings" "strings"
"github.com/databricks/bricks/libs/log"
"github.com/databricks/bricks/project" "github.com/databricks/bricks/project"
"github.com/databricks/databricks-sdk-go/service/dbfs" "github.com/databricks/databricks-sdk-go/service/dbfs"
) )
@ -24,7 +24,7 @@ func BuildWheel(ctx context.Context, dir string) (string, error) {
if err != nil { if err != nil {
return "", err return "", err
} }
log.Printf("[DEBUG] Built wheel: %s", out) log.Debugf(ctx, "Built wheel: %s", out)
// and cleanup afterwards // and cleanup afterwards
silentlyCleanupWheelFolder(".") silentlyCleanupWheelFolder(".")
@ -93,12 +93,12 @@ func silentlyCleanupWheelFolder(dir string) {
func silentChildWithSuffix(dir, suffix string) string { func silentChildWithSuffix(dir, suffix string) string {
f, err := os.Open(dir) f, err := os.Open(dir)
if err != nil { if err != nil {
log.Printf("[DEBUG] open dir %s: %s", dir, err) log.Debugf(context.Background(), "open dir %s: %s", dir, err)
return "" return ""
} }
entries, err := f.ReadDir(0) entries, err := f.ReadDir(0)
if err != nil { if err != nil {
log.Printf("[DEBUG] read dir %s: %s", dir, err) log.Debugf(context.Background(), "read dir %s: %s", dir, err)
// todo: log // todo: log
return "" return ""
} }

View File

@ -3,10 +3,11 @@ package retries
import ( import (
"context" "context"
"fmt" "fmt"
"log"
"math/rand" "math/rand"
"strings" "strings"
"time" "time"
"github.com/databricks/bricks/libs/log"
) )
type Err struct { type Err struct {
@ -61,7 +62,7 @@ func Wait(pctx context.Context, timeout time.Duration, fn WaitFn) error {
jitter := rand.Intn(int(maxJitter)-int(minJitter)+1) + int(minJitter) jitter := rand.Intn(int(maxJitter)-int(minJitter)+1) + int(minJitter)
wait += time.Duration(jitter) wait += time.Duration(jitter)
timer := time.NewTimer(wait) timer := time.NewTimer(wait)
log.Printf("[TRACE] %s. Sleeping %s", log.Tracef(ctx, "%s. Sleeping %s",
strings.TrimSuffix(res.Err.Error(), "."), strings.TrimSuffix(res.Err.Error(), "."),
wait.Round(time.Millisecond)) wait.Round(time.Millisecond))
select { select {

View File

@ -150,7 +150,7 @@ const DeploymentStateRemoteLocation = "dbfs:/FileStore/deployment-state"
// if err != nil { // if err != nil {
// return err // return err
// } // }
// log.Printf("[DEBUG] remote serial is %d", serialDeployed) // log.Debugf(ctx, "remote serial is %d", serialDeployed)
// local, err := d.openLocalState() // local, err := d.openLocalState()
// if err != nil { // if err != nil {
// return err // return err