Move sync logic from cmd/sync to libs/sync (#173)

Mechanical change. Ported global variables the logic relied on to a new
`sync.Sync` struct.
This commit is contained in:
Pieter Noordhuis 2023-01-23 13:52:39 +01:00 committed by GitHub
parent f122e29279
commit fc46d21f8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 46 additions and 24 deletions

View File

@ -8,8 +8,8 @@ import (
"time" "time"
"github.com/databricks/bricks/cmd/root" "github.com/databricks/bricks/cmd/root"
"github.com/databricks/bricks/cmd/sync/repofiles"
"github.com/databricks/bricks/git" "github.com/databricks/bricks/git"
"github.com/databricks/bricks/libs/sync"
"github.com/databricks/bricks/project" "github.com/databricks/bricks/project"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/apierr"
@ -120,11 +120,14 @@ var syncCmd = &cobra.Command{
return err return err
} }
root := prj.Root() s := sync.Sync{
repoFiles := repofiles.Create(*remotePath, root, wsc) LocalPath: prj.Root(),
syncCallback := syncCallback(ctx, repoFiles) RemotePath: *remotePath,
err = spawnWatchdog(ctx, *interval, syncCallback, *remotePath) PersistSnapshot: *persistSnapshot,
return err PollInterval: *interval,
}
return s.RunWatchdog(ctx, wsc)
}, },
} }

View File

@ -1,12 +0,0 @@
package sync
import (
"testing"
)
func TestItSyncs(t *testing.T) {
// ctx := context.Background()
// root.RootCmd.SetArgs([]string{"sync"})
// err := root.RootCmd.ExecuteContext(ctx)
// assert.NoError(t, err)
}

View File

@ -14,7 +14,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/databricks/bricks/cmd/sync" _ "github.com/databricks/bricks/cmd/sync"
"github.com/databricks/bricks/libs/sync"
"github.com/databricks/bricks/libs/testfile" "github.com/databricks/bricks/libs/testfile"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/client"

26
libs/sync/sync.go Normal file
View File

@ -0,0 +1,26 @@
package sync
import (
"context"
"time"
"github.com/databricks/bricks/libs/sync/repofiles"
"github.com/databricks/databricks-sdk-go"
)
type Sync struct {
LocalPath string
RemotePath string
PersistSnapshot bool
PollInterval time.Duration
}
// RunWatchdog kicks off a polling loop to monitor local changes and synchronize
// them to the remote workspace path.
func (s *Sync) RunWatchdog(ctx context.Context, wsc *databricks.WorkspaceClient) error {
repoFiles := repofiles.Create(s.RemotePath, s.LocalPath, wsc)
syncCallback := syncCallback(ctx, repoFiles)
return spawnWatchdog(ctx, s.PollInterval, syncCallback, s.RemotePath, s.PersistSnapshot)
}

View File

@ -6,7 +6,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/databricks/bricks/cmd/sync/repofiles" "github.com/databricks/bricks/libs/sync/repofiles"
"github.com/databricks/bricks/project" "github.com/databricks/bricks/project"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -16,6 +16,8 @@ type watchdog struct {
ticker *time.Ticker ticker *time.Ticker
wg sync.WaitGroup wg sync.WaitGroup
failure error // data race? make channel? failure error // data race? make channel?
persistSnapshot bool
} }
// See https://docs.databricks.com/resources/limits.html#limits-api-rate-limits for per api // See https://docs.databricks.com/resources/limits.html#limits-api-rate-limits for per api
@ -70,9 +72,11 @@ func syncCallback(ctx context.Context, repoFiles *repofiles.RepoFiles) func(loca
func spawnWatchdog(ctx context.Context, func spawnWatchdog(ctx context.Context,
interval time.Duration, interval time.Duration,
applyDiff func(diff) error, applyDiff func(diff) error,
remotePath string) error { remotePath string,
persistSnapshot bool) error {
w := &watchdog{ w := &watchdog{
ticker: time.NewTicker(interval), ticker: time.NewTicker(interval),
persistSnapshot: persistSnapshot,
} }
w.wg.Add(1) w.wg.Add(1)
go w.main(ctx, applyDiff, remotePath) go w.main(ctx, applyDiff, remotePath)
@ -90,7 +94,7 @@ func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error, remoteP
w.failure = err w.failure = err
return return
} }
if *persistSnapshot { if w.persistSnapshot {
err := snapshot.loadSnapshot(ctx) err := snapshot.loadSnapshot(ctx)
if err != nil { if err != nil {
log.Printf("[ERROR] cannot load snapshot: %s", err) log.Printf("[ERROR] cannot load snapshot: %s", err)
@ -128,7 +132,7 @@ func (w *watchdog) main(ctx context.Context, applyDiff func(diff) error, remoteP
w.failure = err w.failure = err
return return
} }
if *persistSnapshot { if w.persistSnapshot {
err = snapshot.storeSnapshot(ctx) err = snapshot.storeSnapshot(ctx)
if err != nil { if err != nil {
log.Printf("[ERROR] cannot store snapshot: %s", err) log.Printf("[ERROR] cannot store snapshot: %s", err)