mirror of https://github.com/databricks/cli.git
196 lines
5.5 KiB
Go
196 lines
5.5 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"crypto/md5"
|
|
"encoding/hex"
|
|
|
|
"github.com/databricks/bricks/git"
|
|
"github.com/databricks/bricks/project"
|
|
)
|
|
|
|
// A snapshot is a persistant store of knowledge bricks cli has about state of files
|
|
// in the remote repo. We use the last modified times (mtime) of files to determine
|
|
// whether a files need to be updated in the remote repo.
|
|
//
|
|
// 1. Any stale files in the remote repo are updated. That is if the last modified
|
|
// time recorded in the snapshot is less than the actual last modified time of the file
|
|
//
|
|
// 2. Any files present in snapshot but absent locally are deleted from remote path
|
|
//
|
|
// Changing either the databricks workspace (ie Host) or the remote path (ie RemotePath)
|
|
// local files are being synced to will make bricks cli switch to a different
|
|
// snapshot for persisting/loading sync state
|
|
type Snapshot struct {
|
|
// hostname of the workspace this snapshot is for
|
|
Host string `json:"host"`
|
|
// Path in workspace for project repo
|
|
RemotePath string `json:"remote_path"`
|
|
// Map of all files present in the remote repo with the:
|
|
// key: relative file path from project root
|
|
// value: last time the remote instance of this file was updated
|
|
LastUpdatedTimes map[string]time.Time `json:"last_modified_times"`
|
|
}
|
|
|
|
type diff struct {
|
|
put []string
|
|
delete []string
|
|
}
|
|
|
|
const syncSnapshotDirName = "sync-snapshots"
|
|
|
|
func GetFileName(host, remotePath string) string {
|
|
hash := md5.Sum([]byte(host + remotePath))
|
|
hashString := hex.EncodeToString(hash[:])
|
|
return hashString[:16] + ".json"
|
|
}
|
|
|
|
// Compute path of the snapshot file on the local machine
|
|
// The file name for unique for a tuple of (host, remotePath)
|
|
// precisely it's the first 16 characters of md5(concat(host, remotePath))
|
|
func (s *Snapshot) getPath(ctx context.Context) (string, error) {
|
|
prj := project.Get(ctx)
|
|
cacheDir, err := prj.CacheDir()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
snapshotDir := filepath.Join(cacheDir, syncSnapshotDirName)
|
|
if _, err := os.Stat(snapshotDir); os.IsNotExist(err) {
|
|
err = os.Mkdir(snapshotDir, os.ModeDir|os.ModePerm)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to create config directory: %s", err)
|
|
}
|
|
}
|
|
fileName := GetFileName(s.Host, s.RemotePath)
|
|
return filepath.Join(snapshotDir, fileName), nil
|
|
}
|
|
|
|
func newSnapshot(ctx context.Context, remotePath string) (*Snapshot, error) {
|
|
prj := project.Get(ctx)
|
|
|
|
// Get host this snapshot is for
|
|
wsc := prj.WorkspacesClient()
|
|
|
|
// TODO: The host may be late-initialized in certain Azure setups where we
|
|
// specify the workspace by its resource ID. tracked in: https://databricks.atlassian.net/browse/DECO-194
|
|
host := wsc.Config.Host
|
|
if host == "" {
|
|
return nil, fmt.Errorf("failed to resolve host for snapshot")
|
|
}
|
|
|
|
return &Snapshot{
|
|
Host: host,
|
|
RemotePath: remotePath,
|
|
LastUpdatedTimes: make(map[string]time.Time),
|
|
}, nil
|
|
}
|
|
|
|
func (s *Snapshot) storeSnapshot(ctx context.Context) error {
|
|
snapshotPath, err := s.getPath(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
f, err := os.OpenFile(snapshotPath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create/open persisted sync snapshot file: %s", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
// persist snapshot to disk
|
|
bytes, err := json.MarshalIndent(s, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to json marshal in-memory snapshot: %s", err)
|
|
}
|
|
_, err = f.Write(bytes)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to write sync snapshot to disk: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Snapshot) loadSnapshot(ctx context.Context) error {
|
|
snapshotPath, err := s.getPath(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Snapshot file not found. We do not load anything
|
|
if _, err := os.Stat(snapshotPath); os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
|
|
f, err := os.Open(snapshotPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open persisted sync snapshot file: %s", err)
|
|
}
|
|
defer f.Close()
|
|
|
|
bytes, err := io.ReadAll(f)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read sync snapshot from disk: %s", err)
|
|
}
|
|
err = json.Unmarshal(bytes, &s)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to json unmarshal persisted snapshot: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d diff) IsEmpty() bool {
|
|
return len(d.put) == 0 && len(d.delete) == 0
|
|
}
|
|
|
|
func (d diff) String() string {
|
|
if d.IsEmpty() {
|
|
return "no changes"
|
|
}
|
|
var changes []string
|
|
if len(d.put) > 0 {
|
|
changes = append(changes, fmt.Sprintf("PUT: %s", strings.Join(d.put, ", ")))
|
|
}
|
|
if len(d.delete) > 0 {
|
|
changes = append(changes, fmt.Sprintf("DELETE: %s", strings.Join(d.delete, ", ")))
|
|
}
|
|
return strings.Join(changes, ", ")
|
|
}
|
|
|
|
func (s Snapshot) diff(all []git.File) (change diff) {
|
|
currentFilenames := map[string]bool{}
|
|
lastModifiedTimes := s.LastUpdatedTimes
|
|
for _, f := range all {
|
|
// create set of current files to figure out if removals are needed
|
|
currentFilenames[f.Relative] = true
|
|
// get current modified timestamp
|
|
modified := f.Modified()
|
|
lastSeenModified, seen := lastModifiedTimes[f.Relative]
|
|
|
|
if !seen || modified.After(lastSeenModified) {
|
|
change.put = append(change.put, f.Relative)
|
|
lastModifiedTimes[f.Relative] = modified
|
|
}
|
|
}
|
|
// figure out files in the snapshot, but not on local filesystem
|
|
for relative := range lastModifiedTimes {
|
|
_, exists := currentFilenames[relative]
|
|
if exists {
|
|
continue
|
|
}
|
|
// add them to a delete batch
|
|
change.delete = append(change.delete, relative)
|
|
// remove the file from snapshot
|
|
delete(lastModifiedTimes, relative)
|
|
}
|
|
// and remove them from the snapshot
|
|
for _, v := range change.delete {
|
|
delete(lastModifiedTimes, v)
|
|
}
|
|
return
|
|
}
|