package internal import ( "context" "encoding/json" "fmt" "io" "io/fs" "math/rand" "sync" "testing" "time" "github.com/databricks/cli/libs/filer" lockpkg "github.com/databricks/cli/libs/locker" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // TODO: create a utility function to create an empty test repo for tests and refactor sync_test integration test const EmptyRepoUrl = "https://github.com/shreyas-goenka/empty-repo.git" func createRemoteTestProject(t *testing.T, projectNamePrefix string, wsc *databricks.WorkspaceClient) string { ctx := context.TODO() me, err := wsc.CurrentUser.Me(ctx) assert.NoError(t, err) remoteProjectRoot := fmt.Sprintf("/Repos/%s/%s", me.UserName, RandomName(projectNamePrefix)) repoInfo, err := wsc.Repos.Create(ctx, workspace.CreateRepo{ Path: remoteProjectRoot, Url: EmptyRepoUrl, Provider: "gitHub", }) assert.NoError(t, err) t.Cleanup(func() { err := wsc.Repos.DeleteByRepoId(ctx, repoInfo.Id) assert.NoError(t, err) }) return remoteProjectRoot } func TestAccLock(t *testing.T) { t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) ctx := context.TODO() wsc, err := databricks.NewWorkspaceClient() require.NoError(t, err) remoteProjectRoot := createRemoteTestProject(t, "lock-acc-", wsc) // 5 lockers try to acquire a lock at the same time numConcurrentLocks := 5 // Keep single locker unlocked. // We use this to check on the current lock through GetActiveLockState. locker, err := lockpkg.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot, wsc) require.NoError(t, err) lockerErrs := make([]error, numConcurrentLocks) lockers := make([]*lockpkg.Locker, numConcurrentLocks) for i := 0; i < numConcurrentLocks; i++ { lockers[i], err = lockpkg.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot, wsc) require.NoError(t, err) } var wg sync.WaitGroup for i := 0; i < numConcurrentLocks; i++ { wg.Add(1) currentIndex := i go func() { defer wg.Done() time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) lockerErrs[currentIndex] = lockers[currentIndex].Lock(ctx, false) }() } wg.Wait() countActive := 0 indexOfActiveLocker := 0 indexOfAnInactiveLocker := -1 for i := 0; i < numConcurrentLocks; i++ { if lockers[i].Active { countActive += 1 assert.NoError(t, lockerErrs[i]) indexOfActiveLocker = i } else { if indexOfAnInactiveLocker == -1 { indexOfAnInactiveLocker = i } assert.ErrorContains(t, lockerErrs[i], "lock acquired by") assert.ErrorContains(t, lockerErrs[i], "Use --force to override") } } assert.Equal(t, 1, countActive, "Exactly one locker should successfull acquire the lock") // test remote lock matches active lock remoteLocker, err := locker.GetActiveLockState(ctx) require.NoError(t, err) assert.Equal(t, remoteLocker.ID, lockers[indexOfActiveLocker].State.ID, "remote locker id does not match active locker") assert.True(t, remoteLocker.AcquisitionTime.Equal(lockers[indexOfActiveLocker].State.AcquisitionTime), "remote locker acquisition time does not match active locker") // test all other locks (inactive ones) do not match the remote lock and Unlock fails for i := 0; i < numConcurrentLocks; i++ { if i == indexOfActiveLocker { continue } assert.NotEqual(t, remoteLocker.ID, lockers[i].State.ID) err := lockers[i].Unlock(ctx) assert.ErrorContains(t, err, "unlock called when lock is not held") } // test inactive locks fail to write a file for i := 0; i < numConcurrentLocks; i++ { if i == indexOfActiveLocker { continue } err := lockers[i].Write(ctx, "foo.json", []byte(`'{"surname":"Khan", "name":"Shah Rukh"}`)) assert.ErrorContains(t, err, "failed to put file. deploy lock not held") } // active locker file write succeeds err = lockers[indexOfActiveLocker].Write(ctx, "foo.json", []byte(`{"surname":"Khan", "name":"Shah Rukh"}`)) assert.NoError(t, err) // read active locker file r, err := lockers[indexOfActiveLocker].Read(ctx, "foo.json") require.NoError(t, err) defer r.Close() b, err := io.ReadAll(r) require.NoError(t, err) // assert on active locker content var res map[string]string json.Unmarshal(b, &res) assert.NoError(t, err) assert.Equal(t, "Khan", res["surname"]) assert.Equal(t, "Shah Rukh", res["name"]) // inactive locker file reads fail for i := 0; i < numConcurrentLocks; i++ { if i == indexOfActiveLocker { continue } _, err = lockers[i].Read(ctx, "foo.json") assert.ErrorContains(t, err, "failed to get file. deploy lock not held") } // Unlock active lock and check it becomes inactive err = lockers[indexOfActiveLocker].Unlock(ctx) assert.NoError(t, err) remoteLocker, err = locker.GetActiveLockState(ctx) assert.ErrorIs(t, err, fs.ErrNotExist, "remote lock file not deleted on unlock") assert.Nil(t, remoteLocker) assert.False(t, lockers[indexOfActiveLocker].Active) // A locker that failed to acquire the lock should now be able to acquire it assert.False(t, lockers[indexOfAnInactiveLocker].Active) err = lockers[indexOfAnInactiveLocker].Lock(ctx, false) assert.NoError(t, err) assert.True(t, lockers[indexOfAnInactiveLocker].Active) } func setupLockerTest(ctx context.Context, t *testing.T) (*lockpkg.Locker, filer.Filer) { t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) w, err := databricks.NewWorkspaceClient() require.NoError(t, err) // create temp wsfs dir tmpDir := temporaryWorkspaceDir(t, w) f, err := filer.NewWorkspaceFilesClient(w, tmpDir) require.NoError(t, err) // create locker locker, err := lockpkg.CreateLocker("redfoo@databricks.com", tmpDir, w) require.NoError(t, err) return locker, f } func TestAccLockUnlockWithoutAllowsLockFileNotExist(t *testing.T) { ctx := context.Background() locker, f := setupLockerTest(ctx, t) var err error // Acquire lock on tmp directory err = locker.Lock(ctx, false) require.NoError(t, err) // Assert lock file is created _, err = f.Stat(ctx, "deploy.lock") assert.NoError(t, err) // Manually delete lock file err = f.Delete(ctx, "deploy.lock") assert.NoError(t, err) // Assert error, because lock file does not exist err = locker.Unlock(ctx) assert.ErrorIs(t, err, fs.ErrNotExist) } func TestAccLockUnlockWithAllowsLockFileNotExist(t *testing.T) { ctx := context.Background() locker, f := setupLockerTest(ctx, t) var err error // Acquire lock on tmp directory err = locker.Lock(ctx, false) require.NoError(t, err) assert.True(t, locker.Active) // Assert lock file is created _, err = f.Stat(ctx, "deploy.lock") assert.NoError(t, err) // Manually delete lock file err = f.Delete(ctx, "deploy.lock") assert.NoError(t, err) // Assert error, because lock file does not exist err = locker.Unlock(ctx, lockpkg.AllowLockFileNotExist) assert.NoError(t, err) assert.False(t, locker.Active) }