databricks-cli/libs/locker/locker.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

218 lines
6.3 KiB
Go
Raw Normal View History

package locker
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
Upgraded Go version to 1.21 (#664) ## Changes Upgraded Go version to 1.21 Upgraded to use `slices` and `slog` from core instead of experimental. Still use `exp/maps` as our code relies on `maps.Keys` which is not part of core package and therefore refactoring required. ### Tests Integration tests passed ``` [DEBUG] Test execution command: /opt/homebrew/opt/go@1.21/bin/go test ./... -json -timeout 1h -run ^TestAcc [DEBUG] Test execution directory: /Users/andrew.nester/cli 2023/08/15 13:20:51 [INFO] ✅ TestAccAlertsCreateErrWhenNoArguments (2.150s) 2023/08/15 13:20:52 [INFO] ✅ TestAccApiGet (0.580s) 2023/08/15 13:20:53 [INFO] ✅ TestAccClustersList (0.900s) 2023/08/15 13:20:54 [INFO] ✅ TestAccClustersGet (0.870s) 2023/08/15 13:21:06 [INFO] ✅ TestAccFilerWorkspaceFilesReadWrite (11.980s) 2023/08/15 13:21:13 [INFO] ✅ TestAccFilerWorkspaceFilesReadDir (7.060s) 2023/08/15 13:21:25 [INFO] ✅ TestAccFilerDbfsReadWrite (12.810s) 2023/08/15 13:21:33 [INFO] ✅ TestAccFilerDbfsReadDir (7.380s) 2023/08/15 13:21:41 [INFO] ✅ TestAccFilerWorkspaceNotebookConflict (7.760s) 2023/08/15 13:21:49 [INFO] ✅ TestAccFilerWorkspaceNotebookWithOverwriteFlag (8.660s) 2023/08/15 13:21:49 [INFO] ✅ TestAccFilerLocalReadWrite (0.020s) 2023/08/15 13:21:49 [INFO] ✅ TestAccFilerLocalReadDir (0.010s) 2023/08/15 13:21:52 [INFO] ✅ TestAccFsCatForDbfs (3.190s) 2023/08/15 13:21:53 [INFO] ✅ TestAccFsCatForDbfsOnNonExistentFile (0.890s) 2023/08/15 13:21:54 [INFO] ✅ TestAccFsCatForDbfsInvalidScheme (0.600s) 2023/08/15 13:21:57 [INFO] ✅ TestAccFsCatDoesNotSupportOutputModeJson (2.960s) 2023/08/15 13:22:28 [INFO] ✅ TestAccFsCpDir (31.480s) 2023/08/15 13:22:43 [INFO] ✅ TestAccFsCpFileToFile (14.530s) 2023/08/15 13:22:58 [INFO] ✅ TestAccFsCpFileToDir (14.610s) 2023/08/15 13:23:29 [INFO] ✅ TestAccFsCpDirToDirFileNotOverwritten (31.810s) 2023/08/15 13:23:47 [INFO] ✅ TestAccFsCpFileToDirFileNotOverwritten (17.500s) 2023/08/15 13:24:04 [INFO] ✅ TestAccFsCpFileToFileFileNotOverwritten (17.260s) 2023/08/15 13:24:37 [INFO] ✅ TestAccFsCpDirToDirWithOverwriteFlag (32.690s) 2023/08/15 13:24:56 [INFO] ✅ TestAccFsCpFileToFileWithOverwriteFlag (19.290s) 2023/08/15 13:25:15 [INFO] ✅ TestAccFsCpFileToDirWithOverwriteFlag (19.230s) 2023/08/15 13:25:17 [INFO] ✅ TestAccFsCpErrorsWhenSourceIsDirWithoutRecursiveFlag (2.010s) 2023/08/15 13:25:18 [INFO] ✅ TestAccFsCpErrorsOnInvalidScheme (0.610s) 2023/08/15 13:25:33 [INFO] ✅ TestAccFsCpSourceIsDirectoryButTargetIsFile (14.900s) 2023/08/15 13:25:37 [INFO] ✅ TestAccFsLsForDbfs (3.770s) 2023/08/15 13:25:41 [INFO] ✅ TestAccFsLsForDbfsWithAbsolutePaths (4.160s) 2023/08/15 13:25:44 [INFO] ✅ TestAccFsLsForDbfsOnFile (2.990s) 2023/08/15 13:25:46 [INFO] ✅ TestAccFsLsForDbfsOnEmptyDir (1.870s) 2023/08/15 13:25:46 [INFO] ✅ TestAccFsLsForDbfsForNonexistingDir (0.850s) 2023/08/15 13:25:47 [INFO] ✅ TestAccFsLsWithoutScheme (0.560s) 2023/08/15 13:25:49 [INFO] ✅ TestAccFsMkdirCreatesDirectory (2.310s) 2023/08/15 13:25:52 [INFO] ✅ TestAccFsMkdirCreatesMultipleDirectories (2.920s) 2023/08/15 13:25:55 [INFO] ✅ TestAccFsMkdirWhenDirectoryAlreadyExists (2.320s) 2023/08/15 13:25:57 [INFO] ✅ TestAccFsMkdirWhenFileExistsAtPath (2.820s) 2023/08/15 13:26:01 [INFO] ✅ TestAccFsRmForFile (4.030s) 2023/08/15 13:26:05 [INFO] ✅ TestAccFsRmForEmptyDirectory (3.530s) 2023/08/15 13:26:08 [INFO] ✅ TestAccFsRmForNonEmptyDirectory (3.190s) 2023/08/15 13:26:09 [INFO] ✅ TestAccFsRmForNonExistentFile (0.830s) 2023/08/15 13:26:13 [INFO] ✅ TestAccFsRmForNonEmptyDirectoryWithRecursiveFlag (3.580s) 2023/08/15 13:26:13 [INFO] ✅ TestAccGitClone (0.800s) 2023/08/15 13:26:14 [INFO] ✅ TestAccGitCloneWithOnlyRepoNameOnAlternateBranch (0.790s) 2023/08/15 13:26:15 [INFO] ✅ TestAccGitCloneErrorsWhenRepositoryDoesNotExist (0.540s) 2023/08/15 13:26:23 [INFO] ✅ TestAccLock (8.630s) 2023/08/15 13:26:27 [INFO] ✅ TestAccLockUnlockWithoutAllowsLockFileNotExist (3.490s) 2023/08/15 13:26:30 [INFO] ✅ TestAccLockUnlockWithAllowsLockFileNotExist (3.130s) 2023/08/15 13:26:39 [INFO] ✅ TestAccSyncFullFileSync (9.370s) 2023/08/15 13:26:50 [INFO] ✅ TestAccSyncIncrementalFileSync (10.390s) 2023/08/15 13:27:00 [INFO] ✅ TestAccSyncNestedFolderSync (10.680s) 2023/08/15 13:27:11 [INFO] ✅ TestAccSyncNestedFolderDoesntFailOnNonEmptyDirectory (10.970s) 2023/08/15 13:27:22 [INFO] ✅ TestAccSyncNestedSpacePlusAndHashAreEscapedSync (10.930s) 2023/08/15 13:27:29 [INFO] ✅ TestAccSyncIncrementalFileOverwritesFolder (7.020s) 2023/08/15 13:27:37 [INFO] ✅ TestAccSyncIncrementalSyncPythonNotebookToFile (7.380s) 2023/08/15 13:27:43 [INFO] ✅ TestAccSyncIncrementalSyncFileToPythonNotebook (6.050s) 2023/08/15 13:27:48 [INFO] ✅ TestAccSyncIncrementalSyncPythonNotebookDelete (5.390s) 2023/08/15 13:27:51 [INFO] ✅ TestAccSyncEnsureRemotePathIsUsableIfRepoDoesntExist (2.570s) 2023/08/15 13:27:56 [INFO] ✅ TestAccSyncEnsureRemotePathIsUsableIfRepoExists (5.540s) 2023/08/15 13:27:58 [INFO] ✅ TestAccSyncEnsureRemotePathIsUsableInWorkspace (1.840s) 2023/08/15 13:27:59 [INFO] ✅ TestAccWorkspaceList (0.790s) 2023/08/15 13:28:08 [INFO] ✅ TestAccExportDir (8.860s) 2023/08/15 13:28:11 [INFO] ✅ TestAccExportDirDoesNotOverwrite (3.090s) 2023/08/15 13:28:14 [INFO] ✅ TestAccExportDirWithOverwriteFlag (3.500s) 2023/08/15 13:28:23 [INFO] ✅ TestAccImportDir (8.330s) 2023/08/15 13:28:34 [INFO] ✅ TestAccImportDirDoesNotOverwrite (10.970s) 2023/08/15 13:28:44 [INFO] ✅ TestAccImportDirWithOverwriteFlag (10.130s) 2023/08/15 13:28:44 [INFO] ✅ 68/68 passed, 0 failed, 3 skipped ```
2023-08-15 13:50:40 +00:00
"slices"
"time"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/google/uuid"
)
type UnlockOption int
const (
AllowLockFileNotExist UnlockOption = iota
)
const LockFileName = "deploy.lock"
// Locker object enables exclusive access to TargetDir's scope for a client. This
// enables multiple clients to deploy to the same scope (ie TargetDir) in an atomic
// manner
//
// Here are some of the details of the locking protocol used here:
//
// 1. Potentially multiple clients race to create a deploy.lock file in
// TargetDir/.bundle directory with unique ID. The deploy.lock file
// is a json file containing the State from the locker
//
// 2. Clients read the remote deploy.lock file and if it's ID matches, the client
// assumes it has the lock on TargetDir. The client is now free to read/write code
// asserts and deploy databricks assets scoped under TargetDir
//
// 3. To sidestep clients failing to relinquish a lock during a failed deploy attempt
// we allow clients to forcefully acquire a lock on TargetDir. However forcefully acquired
// locks come with the following caveats:
//
// a. a forcefully acquired lock does not guarentee exclusive access to
// TargetDir's scope
// b. forcefully acquiring a lock(s) on TargetDir can break the assumption
// of exclusive access that other clients with non forcefully acquired
// locks might have
type Locker struct {
filer filer.Filer
// scope of the locker
TargetDir string
// Active == true implies exclusive access to TargetDir for the client.
// This implication break down if locks are forcefully acquired by a user
Active bool
// if locker is active, this information about the locker is uploaded onto
// the workspace so as to let other clients details about the active locker
State *LockState
}
type LockState struct {
// unique identifier for the locker
ID uuid.UUID
// last timestamp when locker was active
AcquisitionTime time.Time
// Only relevant for active lockers
// IsForced == true implies the lock was acquired forcefully
IsForced bool
// creator of this locker
User string
}
// GetActiveLockState returns current lock state, irrespective of us holding it.
func (locker *Locker) GetActiveLockState(ctx context.Context) (*LockState, error) {
reader, err := locker.filer.Read(ctx, LockFileName)
if err != nil {
return nil, err
}
defer reader.Close()
bytes, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
remoteLock := LockState{}
err = json.Unmarshal(bytes, &remoteLock)
if err != nil {
return nil, err
}
return &remoteLock, nil
}
// asserts whether lock is held by locker. Returns descriptive error with current
// holder details if locker does not hold the lock
func (locker *Locker) assertLockHeld(ctx context.Context) error {
activeLockState, err := locker.GetActiveLockState(ctx)
if errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("no active lock on target dir: %w", err)
}
if err != nil {
return err
}
if activeLockState.ID != locker.State.ID && !activeLockState.IsForced {
return fmt.Errorf("deploy lock acquired by %s at %v. Use --force-lock to override", activeLockState.User, activeLockState.AcquisitionTime)
}
if activeLockState.ID != locker.State.ID && activeLockState.IsForced {
return fmt.Errorf("deploy lock force acquired by %s at %v. Use --force-lock to override", activeLockState.User, activeLockState.AcquisitionTime)
}
return nil
}
// idempotent function since overwrite is set to true
func (locker *Locker) Write(ctx context.Context, pathToFile string, content []byte) error {
if !locker.Active {
return errors.New("failed to put file. deploy lock not held")
}
return locker.filer.Write(ctx, pathToFile, bytes.NewReader(content), filer.OverwriteIfExists, filer.CreateParentDirectories)
}
func (locker *Locker) Read(ctx context.Context, path string) (io.ReadCloser, error) {
if !locker.Active {
return nil, errors.New("failed to get file. deploy lock not held")
}
return locker.filer.Read(ctx, path)
}
func (locker *Locker) Lock(ctx context.Context, isForced bool) error {
newLockerState := LockState{
ID: locker.State.ID,
AcquisitionTime: time.Now(),
IsForced: isForced,
User: locker.State.User,
}
buf, err := json.Marshal(newLockerState)
if err != nil {
return err
}
modes := []filer.WriteMode{
// Always create parent directory if it doesn't yet exist.
filer.CreateParentDirectories,
}
// Only overwrite lock file if `isForced`.
if isForced {
modes = append(modes, filer.OverwriteIfExists)
}
err = locker.filer.Write(ctx, LockFileName, bytes.NewReader(buf), modes...)
if err != nil {
// If the write failed because the lock file already exists, don't return
// the error and instead fall through to [assertLockHeld] below.
// This function will return a more descriptive error message that includes
// details about the current holder of the lock.
if !errors.As(err, &filer.FileAlreadyExistsError{}) {
return err
}
}
err = locker.assertLockHeld(ctx)
if err != nil {
return err
}
locker.State = &newLockerState
locker.Active = true
return nil
}
func (locker *Locker) Unlock(ctx context.Context, opts ...UnlockOption) error {
if !locker.Active {
return errors.New("unlock called when lock is not held")
}
// if allowLockFileNotExist is set, do not throw an error if the lock file does
// not exist. This is helpful when destroying a bundle in which case the lock
// file will be deleted before we have a chance to unlock
if _, err := locker.filer.Stat(ctx, LockFileName); errors.Is(err, fs.ErrNotExist) && slices.Contains(opts, AllowLockFileNotExist) {
locker.Active = false
return nil
}
err := locker.assertLockHeld(ctx)
if err != nil {
return fmt.Errorf("unlock called when lock is not held: %w", err)
}
err = locker.filer.Delete(ctx, LockFileName)
if err != nil {
return err
}
locker.Active = false
return nil
}
func CreateLocker(user, targetDir string, w *databricks.WorkspaceClient) (*Locker, error) {
filer, err := filer.NewWorkspaceFilesClient(w, targetDir)
if err != nil {
return nil, err
}
locker := &Locker{
filer: filer,
TargetDir: targetDir,
Active: false,
State: &LockState{
ID: uuid.New(),
User: user,
},
}
return locker, nil
}