Use filer.Filer in bundle/deployer/locker (#136)

Summary:
* All remote path arguments for deployer and locker are now relative to
root specified at initialization
* The workspace client is now a struct field so it doesn't have to be
passed around
This commit is contained in:
Pieter Noordhuis 2022-12-15 17:16:07 +01:00 committed by GitHub
parent b111416fe5
commit 32a37c1b83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 130 deletions

View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"path"
"path/filepath" "path/filepath"
"strings" "strings"
@ -71,7 +70,7 @@ func Create(ctx context.Context, env, localRoot, remoteRoot string, wsc *databri
if err != nil { if err != nil {
return nil, err return nil, err
} }
newLocker := CreateLocker(user.UserName, remoteRoot) newLocker, err := CreateLocker(user.UserName, remoteRoot, wsc)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -89,7 +88,8 @@ func (b *Deployer) DefaultTerraformRoot() string {
} }
func (b *Deployer) tfStateRemotePath() string { func (b *Deployer) tfStateRemotePath() string {
return path.Join(b.remoteRoot, ".bundle", "terraform.tfstate") // Note: remote paths are scoped to `remoteRoot` through the locker. Also see [Create].
return ".bundle/terraform.tfstate"
} }
func (b *Deployer) tfStateLocalPath() string { func (b *Deployer) tfStateLocalPath() string {
@ -97,7 +97,7 @@ func (b *Deployer) tfStateLocalPath() string {
} }
func (b *Deployer) LoadTerraformState(ctx context.Context) error { func (b *Deployer) LoadTerraformState(ctx context.Context) error {
bytes, err := b.locker.GetRawJsonFileContent(ctx, b.wsc, b.tfStateRemotePath()) bytes, err := b.locker.GetRawJsonFileContent(ctx, b.tfStateRemotePath())
if err != nil { if err != nil {
// If remote tf state is absent, use local tf state // If remote tf state is absent, use local tf state
if strings.Contains(err.Error(), "File not found.") { if strings.Contains(err.Error(), "File not found.") {
@ -119,15 +119,15 @@ func (b *Deployer) SaveTerraformState(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
return b.locker.PutFile(ctx, b.wsc, b.tfStateRemotePath(), bytes) return b.locker.PutFile(ctx, b.tfStateRemotePath(), bytes)
} }
func (d *Deployer) Lock(ctx context.Context, isForced bool) error { func (d *Deployer) Lock(ctx context.Context, isForced bool) error {
return d.locker.Lock(ctx, d.wsc, isForced) return d.locker.Lock(ctx, isForced)
} }
func (d *Deployer) Unlock(ctx context.Context) error { func (d *Deployer) Unlock(ctx context.Context) error {
return d.locker.Unlock(ctx, d.wsc) return d.locker.Unlock(ctx)
} }
func (d *Deployer) ApplyTerraformConfig(ctx context.Context, configPath, terraformBinaryPath string, isForced bool) (DeploymentStatus, error) { func (d *Deployer) ApplyTerraformConfig(ctx context.Context, configPath, terraformBinaryPath string, isForced bool) (DeploymentStatus, error) {

View File

@ -4,16 +4,14 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net/http" "io"
"path"
"strings" "strings"
"time" "time"
"github.com/databricks/bricks/utilities" "github.com/databricks/bricks/libs/filer"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/google/uuid" "github.com/google/uuid"
) )
@ -41,6 +39,8 @@ import (
// of exclusive access that other clients with non forcefully acquired // of exclusive access that other clients with non forcefully acquired
// locks might have // locks might have
type Locker struct { type Locker struct {
filer filer.Filer
// scope of the locker // scope of the locker
TargetDir string TargetDir string
// Active == true implies exclusive access to TargetDir for the client. // Active == true implies exclusive access to TargetDir for the client.
@ -63,24 +63,31 @@ type LockState struct {
User string User string
} }
// don't need to hold lock on TargetDir to read locker state // GetActiveLockState returns current lock state, irrespective of us holding it.
func GetActiveLockState(ctx context.Context, wsc *databricks.WorkspaceClient, path string) (*LockState, error) { func (locker *Locker) GetActiveLockState(ctx context.Context) (*LockState, error) {
bytes, err := utilities.GetRawJsonFileContent(ctx, wsc, path) reader, err := locker.filer.Read(ctx, locker.RemotePath())
if err != nil { if err != nil {
return nil, err return nil, err
} }
bytes, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
remoteLock := LockState{} remoteLock := LockState{}
err = json.Unmarshal(bytes, &remoteLock) err = json.Unmarshal(bytes, &remoteLock)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &remoteLock, nil return &remoteLock, nil
} }
// asserts whether lock is held by locker. Returns descriptive error with current // asserts whether lock is held by locker. Returns descriptive error with current
// holder details if locker does not hold the lock // holder details if locker does not hold the lock
func (locker *Locker) assertLockHeld(ctx context.Context, wsc *databricks.WorkspaceClient) error { func (locker *Locker) assertLockHeld(ctx context.Context) error {
activeLockState, err := GetActiveLockState(ctx, wsc, locker.RemotePath()) activeLockState, err := locker.GetActiveLockState(ctx)
if err != nil && strings.Contains(err.Error(), "File not found.") { if err != nil && strings.Contains(err.Error(), "File not found.") {
return fmt.Errorf("no active lock on target dir: %s", err) return fmt.Errorf("no active lock on target dir: %s", err)
} }
@ -97,53 +104,58 @@ func (locker *Locker) assertLockHeld(ctx context.Context, wsc *databricks.Worksp
} }
// idempotent function since overwrite is set to true // idempotent function since overwrite is set to true
func (locker *Locker) PutFile(ctx context.Context, wsc *databricks.WorkspaceClient, pathToFile string, content []byte) error { func (locker *Locker) PutFile(ctx context.Context, pathToFile string, content []byte) error {
if !locker.Active { if !locker.Active {
return fmt.Errorf("failed to put file. deploy lock not held") return fmt.Errorf("failed to put file. deploy lock not held")
} }
apiClient, err := client.New(wsc.Config) return locker.filer.Write(ctx, pathToFile, bytes.NewReader(content), filer.OverwriteIfExists, filer.CreateParentDirectories)
if err != nil {
return err
}
apiPath := fmt.Sprintf(
"/api/2.0/workspace-files/import-file/%s?overwrite=true",
strings.TrimLeft(pathToFile, "/"))
err = apiClient.Do(ctx, http.MethodPost, apiPath, bytes.NewReader(content), nil)
if err != nil {
// retry after creating parent dirs
err = wsc.Workspace.MkdirsByPath(ctx, path.Dir(pathToFile))
if err != nil {
return fmt.Errorf("could not mkdir to put file: %s", err)
}
err = apiClient.Do(ctx, http.MethodPost, apiPath, bytes.NewReader(content), nil)
}
return err
} }
func (locker *Locker) GetRawJsonFileContent(ctx context.Context, wsc *databricks.WorkspaceClient, path string) ([]byte, error) { func (locker *Locker) GetRawJsonFileContent(ctx context.Context, path string) ([]byte, error) {
if !locker.Active { if !locker.Active {
return nil, fmt.Errorf("failed to get file. deploy lock not held") return nil, fmt.Errorf("failed to get file. deploy lock not held")
} }
return utilities.GetRawJsonFileContent(ctx, wsc, path) reader, err := locker.filer.Read(ctx, path)
if err != nil {
return nil, err
}
return io.ReadAll(reader)
} }
func (locker *Locker) Lock(ctx context.Context, wsc *databricks.WorkspaceClient, isForced bool) error { func (locker *Locker) Lock(ctx context.Context, isForced bool) error {
newLockerState := LockState{ newLockerState := LockState{
ID: locker.State.ID, ID: locker.State.ID,
AcquisitionTime: time.Now(), AcquisitionTime: time.Now(),
IsForced: isForced, IsForced: isForced,
User: locker.State.User, User: locker.State.User,
} }
bytes, err := json.Marshal(newLockerState) buf, err := json.Marshal(newLockerState)
if err != nil { if err != nil {
return err return err
} }
err = utilities.WriteFile(ctx, wsc, locker.RemotePath(), bytes, isForced)
if err != nil && !strings.Contains(err.Error(), fmt.Sprintf("%s already exists", locker.RemotePath())) { var modes = []filer.WriteMode{
return err // Always create parent directory if it doesn't yet exist.
filer.CreateParentDirectories,
} }
err = locker.assertLockHeld(ctx, wsc)
// Only overwrite lock file if `isForced`.
if isForced {
modes = append(modes, filer.OverwriteIfExists)
}
err = locker.filer.Write(ctx, locker.RemotePath(), bytes.NewReader(buf), modes...)
if err != nil {
// If the write failed because the lock file already exists, don't return
// the error and instead fall through to [assertLockHeld] below.
// This function will return a more descriptive error message that includes
// details about the current holder of the lock.
if !errors.As(err, &filer.FileAlreadyExistsError{}) {
return err
}
}
err = locker.assertLockHeld(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -153,20 +165,15 @@ func (locker *Locker) Lock(ctx context.Context, wsc *databricks.WorkspaceClient,
return nil return nil
} }
func (locker *Locker) Unlock(ctx context.Context, wsc *databricks.WorkspaceClient) error { func (locker *Locker) Unlock(ctx context.Context) error {
if !locker.Active { if !locker.Active {
return fmt.Errorf("unlock called when lock is not held") return fmt.Errorf("unlock called when lock is not held")
} }
err := locker.assertLockHeld(ctx, wsc) err := locker.assertLockHeld(ctx)
if err != nil { if err != nil {
return fmt.Errorf("unlock called when lock is not held: %s", err) return fmt.Errorf("unlock called when lock is not held: %s", err)
} }
err = wsc.Workspace.Delete(ctx, err = locker.filer.Delete(ctx, locker.RemotePath())
workspace.Delete{
Path: locker.RemotePath(),
Recursive: false,
},
)
if err != nil { if err != nil {
return err return err
} }
@ -175,11 +182,19 @@ func (locker *Locker) Unlock(ctx context.Context, wsc *databricks.WorkspaceClien
} }
func (locker *Locker) RemotePath() string { func (locker *Locker) RemotePath() string {
return path.Join(locker.TargetDir, ".bundle/deploy.lock") // Note: remote paths are scoped to `targetDir`. Also see [CreateLocker].
return ".bundle/deploy.lock"
} }
func CreateLocker(user string, targetDir string) *Locker { func CreateLocker(user string, targetDir string, w *databricks.WorkspaceClient) (*Locker, error) {
return &Locker{ filer, err := filer.NewWorkspaceFilesClient(w, targetDir)
if err != nil {
return nil, err
}
locker := &Locker{
filer: filer,
TargetDir: targetDir, TargetDir: targetDir,
Active: false, Active: false,
State: &LockState{ State: &LockState{
@ -187,4 +202,6 @@ func CreateLocker(user string, targetDir string) *Locker {
User: user, User: user,
}, },
} }
return locker, nil
} }

View File

@ -7,7 +7,6 @@ import (
"math/rand" "math/rand"
"os" "os"
"os/exec" "os/exec"
"path"
"path/filepath" "path/filepath"
"sync" "sync"
"testing" "testing"
@ -68,12 +67,16 @@ func TestAccLock(t *testing.T) {
// 50 lockers try to acquire a lock at the same time // 50 lockers try to acquire a lock at the same time
numConcurrentLocks := 50 numConcurrentLocks := 50
var err error // Keep single locker unlocked.
// We use this to check on the current lock through GetActiveLockState.
locker, err := deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot, wsc)
require.NoError(t, err)
lockerErrs := make([]error, numConcurrentLocks) lockerErrs := make([]error, numConcurrentLocks)
lockers := make([]*deployer.Locker, numConcurrentLocks) lockers := make([]*deployer.Locker, numConcurrentLocks)
for i := 0; i < numConcurrentLocks; i++ { for i := 0; i < numConcurrentLocks; i++ {
lockers[i] = deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot) lockers[i], err = deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot, wsc)
require.NoError(t, err)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@ -83,7 +86,7 @@ func TestAccLock(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
lockerErrs[currentIndex] = lockers[currentIndex].Lock(ctx, wsc, false) lockerErrs[currentIndex] = lockers[currentIndex].Lock(ctx, false)
}() }()
} }
wg.Wait() wg.Wait()
@ -107,7 +110,7 @@ func TestAccLock(t *testing.T) {
assert.Equal(t, 1, countActive, "Exactly one locker should successfull acquire the lock") assert.Equal(t, 1, countActive, "Exactly one locker should successfull acquire the lock")
// test remote lock matches active lock // test remote lock matches active lock
remoteLocker, err := deployer.GetActiveLockState(ctx, wsc, lockers[indexOfActiveLocker].RemotePath()) remoteLocker, err := locker.GetActiveLockState(ctx)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, remoteLocker.ID, lockers[indexOfActiveLocker].State.ID, "remote locker id does not match active locker") assert.Equal(t, remoteLocker.ID, lockers[indexOfActiveLocker].State.ID, "remote locker id does not match active locker")
assert.True(t, remoteLocker.AcquisitionTime.Equal(lockers[indexOfActiveLocker].State.AcquisitionTime), "remote locker acquisition time does not match active locker") assert.True(t, remoteLocker.AcquisitionTime.Equal(lockers[indexOfActiveLocker].State.AcquisitionTime), "remote locker acquisition time does not match active locker")
@ -118,7 +121,7 @@ func TestAccLock(t *testing.T) {
continue continue
} }
assert.NotEqual(t, remoteLocker.ID, lockers[i].State.ID) assert.NotEqual(t, remoteLocker.ID, lockers[i].State.ID)
err := lockers[i].Unlock(ctx, wsc) err := lockers[i].Unlock(ctx)
assert.ErrorContains(t, err, "unlock called when lock is not held") assert.ErrorContains(t, err, "unlock called when lock is not held")
} }
@ -127,16 +130,16 @@ func TestAccLock(t *testing.T) {
if i == indexOfActiveLocker { if i == indexOfActiveLocker {
continue continue
} }
err := lockers[i].PutFile(ctx, wsc, path.Join(remoteProjectRoot, "foo.json"), []byte(`'{"surname":"Khan", "name":"Shah Rukh"}`)) err := lockers[i].PutFile(ctx, "foo.json", []byte(`'{"surname":"Khan", "name":"Shah Rukh"}`))
assert.ErrorContains(t, err, "failed to put file. deploy lock not held") assert.ErrorContains(t, err, "failed to put file. deploy lock not held")
} }
// active locker file write succeeds // active locker file write succeeds
err = lockers[indexOfActiveLocker].PutFile(ctx, wsc, path.Join(remoteProjectRoot, "foo.json"), []byte(`{"surname":"Khan", "name":"Shah Rukh"}`)) err = lockers[indexOfActiveLocker].PutFile(ctx, "foo.json", []byte(`{"surname":"Khan", "name":"Shah Rukh"}`))
assert.NoError(t, err) assert.NoError(t, err)
// active locker file read succeeds with expected results // active locker file read succeeds with expected results
bytes, err := lockers[indexOfActiveLocker].GetRawJsonFileContent(ctx, wsc, path.Join(remoteProjectRoot, "foo.json")) bytes, err := lockers[indexOfActiveLocker].GetRawJsonFileContent(ctx, "foo.json")
var res map[string]string var res map[string]string
json.Unmarshal(bytes, &res) json.Unmarshal(bytes, &res)
assert.NoError(t, err) assert.NoError(t, err)
@ -148,21 +151,21 @@ func TestAccLock(t *testing.T) {
if i == indexOfActiveLocker { if i == indexOfActiveLocker {
continue continue
} }
_, err = lockers[i].GetRawJsonFileContent(ctx, wsc, path.Join(remoteProjectRoot, "foo.json")) _, err = lockers[i].GetRawJsonFileContent(ctx, "foo.json")
assert.ErrorContains(t, err, "failed to get file. deploy lock not held") assert.ErrorContains(t, err, "failed to get file. deploy lock not held")
} }
// Unlock active lock and check it becomes inactive // Unlock active lock and check it becomes inactive
err = lockers[indexOfActiveLocker].Unlock(ctx, wsc) err = lockers[indexOfActiveLocker].Unlock(ctx)
assert.NoError(t, err) assert.NoError(t, err)
remoteLocker, err = deployer.GetActiveLockState(ctx, wsc, lockers[indexOfActiveLocker].RemotePath()) remoteLocker, err = locker.GetActiveLockState(ctx)
assert.ErrorContains(t, err, "File not found.", "remote lock file not deleted on unlock") assert.ErrorContains(t, err, "File not found.", "remote lock file not deleted on unlock")
assert.Nil(t, remoteLocker) assert.Nil(t, remoteLocker)
assert.False(t, lockers[indexOfActiveLocker].Active) assert.False(t, lockers[indexOfActiveLocker].Active)
// A locker that failed to acquire the lock should now be able to acquire it // A locker that failed to acquire the lock should now be able to acquire it
assert.False(t, lockers[indexOfAnInactiveLocker].Active) assert.False(t, lockers[indexOfAnInactiveLocker].Active)
err = lockers[indexOfAnInactiveLocker].Lock(ctx, wsc, false) err = lockers[indexOfAnInactiveLocker].Lock(ctx, false)
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, lockers[indexOfAnInactiveLocker].Active) assert.True(t, lockers[indexOfAnInactiveLocker].Active)
} }

View File

@ -17,7 +17,10 @@ import (
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
) )
// WorkspaceFilesClient implements the Files-in-Workspace API. // WorkspaceFilesClient implements the files-in-workspace API.
// NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled.
// It can access any workspace path if files-in-workspace is enabled.
type WorkspaceFilesClient struct { type WorkspaceFilesClient struct {
workspaceClient *databricks.WorkspaceClient workspaceClient *databricks.WorkspaceClient
apiClient *client.DatabricksClient apiClient *client.DatabricksClient

View File

@ -1,60 +0,0 @@
package utilities
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"path"
"strconv"
"strings"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/client"
)
// NOTE: This API is only available for files in /Repos if a workspace has repos
// in workspace enabled and files in workspace not enabled
//
// Get the file contents of a json file in workspace
// TODO(Nov 2022): add method in go sdk to get the raw bytes from response of an API
//
// TODO(Nov 2022): talk to eng-files team about what the response structure would look like.
//
// This function would have to be modfified probably in the future once this
// API goes to public preview
func GetRawJsonFileContent(ctx context.Context, wsc *databricks.WorkspaceClient, path string) ([]byte, error) {
apiClient, err := client.New(wsc.Config)
if err != nil {
return nil, err
}
exportApiPath := fmt.Sprintf(
"/api/2.0/workspace-files/%s",
strings.TrimLeft(path, "/"))
var res json.RawMessage
err = apiClient.Do(ctx, http.MethodGet, exportApiPath, nil, &res)
if err != nil {
return nil, fmt.Errorf("failed to fetch file %s: %s", path, err)
}
return res, nil
}
func WriteFile(ctx context.Context, wsc *databricks.WorkspaceClient, pathToFile string, content []byte, overwrite bool) error {
apiClient, err := client.New(wsc.Config)
if err != nil {
return err
}
err = wsc.Workspace.MkdirsByPath(ctx, path.Dir(pathToFile))
if err != nil {
return fmt.Errorf("could not mkdir to post file: %s", err)
}
importApiPath := fmt.Sprintf(
"/api/2.0/workspace-files/import-file/%s?overwrite=%s",
strings.TrimLeft(pathToFile, "/"), strconv.FormatBool(overwrite))
return apiClient.Do(ctx, http.MethodPost, importApiPath, bytes.NewReader(content), nil)
}