mirror of https://github.com/databricks/cli.git
155 lines
5.1 KiB
Go
155 lines
5.1 KiB
Go
package internal
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/databricks/bricks/bundle/deployer"
|
|
"github.com/databricks/databricks-sdk-go"
|
|
"github.com/databricks/databricks-sdk-go/service/repos"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
// TODO: create a utility function to create an empty test repo for tests and refactor sync_test integration test
|
|
|
|
const EmptyRepoUrl = "https://github.com/shreyas-goenka/empty-repo.git"
|
|
|
|
func createRemoteTestProject(t *testing.T, projectNamePrefix string, wsc *databricks.WorkspaceClient) string {
|
|
ctx := context.TODO()
|
|
me, err := wsc.CurrentUser.Me(ctx)
|
|
assert.NoError(t, err)
|
|
|
|
remoteProjectRoot := fmt.Sprintf("/Repos/%s/%s", me.UserName, RandomName(projectNamePrefix))
|
|
repoInfo, err := wsc.Repos.Create(ctx, repos.CreateRepo{
|
|
Path: remoteProjectRoot,
|
|
Url: EmptyRepoUrl,
|
|
Provider: "gitHub",
|
|
})
|
|
assert.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
err := wsc.Repos.DeleteByRepoId(ctx, repoInfo.Id)
|
|
assert.NoError(t, err)
|
|
})
|
|
|
|
return remoteProjectRoot
|
|
}
|
|
|
|
func TestAccLock(t *testing.T) {
|
|
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
|
ctx := context.TODO()
|
|
wsc, err := databricks.NewWorkspaceClient()
|
|
require.NoError(t, err)
|
|
remoteProjectRoot := createRemoteTestProject(t, "lock-acc-", wsc)
|
|
|
|
// 50 lockers try to acquire a lock at the same time
|
|
numConcurrentLocks := 50
|
|
|
|
// Keep single locker unlocked.
|
|
// We use this to check on the current lock through GetActiveLockState.
|
|
locker, err := deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot, wsc)
|
|
require.NoError(t, err)
|
|
|
|
lockerErrs := make([]error, numConcurrentLocks)
|
|
lockers := make([]*deployer.Locker, numConcurrentLocks)
|
|
for i := 0; i < numConcurrentLocks; i++ {
|
|
lockers[i], err = deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot, wsc)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < numConcurrentLocks; i++ {
|
|
wg.Add(1)
|
|
currentIndex := i
|
|
go func() {
|
|
defer wg.Done()
|
|
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
|
|
lockerErrs[currentIndex] = lockers[currentIndex].Lock(ctx, false)
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
countActive := 0
|
|
indexOfActiveLocker := 0
|
|
indexOfAnInactiveLocker := -1
|
|
for i := 0; i < numConcurrentLocks; i++ {
|
|
if lockers[i].Active {
|
|
countActive += 1
|
|
assert.NoError(t, lockerErrs[i])
|
|
indexOfActiveLocker = i
|
|
} else {
|
|
if indexOfAnInactiveLocker == -1 {
|
|
indexOfAnInactiveLocker = i
|
|
}
|
|
assert.ErrorContains(t, lockerErrs[i], "lock acquired by")
|
|
assert.ErrorContains(t, lockerErrs[i], "Use --force to override")
|
|
}
|
|
}
|
|
assert.Equal(t, 1, countActive, "Exactly one locker should successfull acquire the lock")
|
|
|
|
// test remote lock matches active lock
|
|
remoteLocker, err := locker.GetActiveLockState(ctx)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, remoteLocker.ID, lockers[indexOfActiveLocker].State.ID, "remote locker id does not match active locker")
|
|
assert.True(t, remoteLocker.AcquisitionTime.Equal(lockers[indexOfActiveLocker].State.AcquisitionTime), "remote locker acquisition time does not match active locker")
|
|
|
|
// test all other locks (inactive ones) do not match the remote lock and Unlock fails
|
|
for i := 0; i < numConcurrentLocks; i++ {
|
|
if i == indexOfActiveLocker {
|
|
continue
|
|
}
|
|
assert.NotEqual(t, remoteLocker.ID, lockers[i].State.ID)
|
|
err := lockers[i].Unlock(ctx)
|
|
assert.ErrorContains(t, err, "unlock called when lock is not held")
|
|
}
|
|
|
|
// test inactive locks fail to write a file
|
|
for i := 0; i < numConcurrentLocks; i++ {
|
|
if i == indexOfActiveLocker {
|
|
continue
|
|
}
|
|
err := lockers[i].PutFile(ctx, "foo.json", []byte(`'{"surname":"Khan", "name":"Shah Rukh"}`))
|
|
assert.ErrorContains(t, err, "failed to put file. deploy lock not held")
|
|
}
|
|
|
|
// active locker file write succeeds
|
|
err = lockers[indexOfActiveLocker].PutFile(ctx, "foo.json", []byte(`{"surname":"Khan", "name":"Shah Rukh"}`))
|
|
assert.NoError(t, err)
|
|
|
|
// active locker file read succeeds with expected results
|
|
bytes, err := lockers[indexOfActiveLocker].GetRawJsonFileContent(ctx, "foo.json")
|
|
var res map[string]string
|
|
json.Unmarshal(bytes, &res)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, "Khan", res["surname"])
|
|
assert.Equal(t, "Shah Rukh", res["name"])
|
|
|
|
// inactive locker file reads fail
|
|
for i := 0; i < numConcurrentLocks; i++ {
|
|
if i == indexOfActiveLocker {
|
|
continue
|
|
}
|
|
_, err = lockers[i].GetRawJsonFileContent(ctx, "foo.json")
|
|
assert.ErrorContains(t, err, "failed to get file. deploy lock not held")
|
|
}
|
|
|
|
// Unlock active lock and check it becomes inactive
|
|
err = lockers[indexOfActiveLocker].Unlock(ctx)
|
|
assert.NoError(t, err)
|
|
remoteLocker, err = locker.GetActiveLockState(ctx)
|
|
assert.ErrorContains(t, err, "File not found.", "remote lock file not deleted on unlock")
|
|
assert.Nil(t, remoteLocker)
|
|
assert.False(t, lockers[indexOfActiveLocker].Active)
|
|
|
|
// A locker that failed to acquire the lock should now be able to acquire it
|
|
assert.False(t, lockers[indexOfAnInactiveLocker].Active)
|
|
err = lockers[indexOfAnInactiveLocker].Lock(ctx, false)
|
|
assert.NoError(t, err)
|
|
assert.True(t, lockers[indexOfAnInactiveLocker].Active)
|
|
}
|