From 32a37c1b83840e10de21026a23c80306b50c443f Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 15 Dec 2022 17:16:07 +0100 Subject: [PATCH] Use filer.Filer in bundle/deployer/locker (#136) Summary: * All remote path arguments for deployer and locker are now relative to root specified at initialization * The workspace client is now a struct field so it doesn't have to be passed around --- bundle/deployer/deployer.go | 14 ++-- bundle/deployer/locker.go | 113 +++++++++++++++------------ internal/locker_test.go | 31 ++++---- libs/filer/workspace_files_client.go | 5 +- utilities/workspace_files.go | 60 -------------- 5 files changed, 93 insertions(+), 130 deletions(-) delete mode 100644 utilities/workspace_files.go diff --git a/bundle/deployer/deployer.go b/bundle/deployer/deployer.go index ef46f65c..e2a112c7 100644 --- a/bundle/deployer/deployer.go +++ b/bundle/deployer/deployer.go @@ -5,7 +5,6 @@ import ( "fmt" "log" "os" - "path" "path/filepath" "strings" @@ -71,7 +70,7 @@ func Create(ctx context.Context, env, localRoot, remoteRoot string, wsc *databri if err != nil { return nil, err } - newLocker := CreateLocker(user.UserName, remoteRoot) + newLocker, err := CreateLocker(user.UserName, remoteRoot, wsc) if err != nil { return nil, err } @@ -89,7 +88,8 @@ func (b *Deployer) DefaultTerraformRoot() string { } func (b *Deployer) tfStateRemotePath() string { - return path.Join(b.remoteRoot, ".bundle", "terraform.tfstate") + // Note: remote paths are scoped to `remoteRoot` through the locker. Also see [Create]. + return ".bundle/terraform.tfstate" } func (b *Deployer) tfStateLocalPath() string { @@ -97,7 +97,7 @@ func (b *Deployer) tfStateLocalPath() string { } func (b *Deployer) LoadTerraformState(ctx context.Context) error { - bytes, err := b.locker.GetRawJsonFileContent(ctx, b.wsc, b.tfStateRemotePath()) + bytes, err := b.locker.GetRawJsonFileContent(ctx, b.tfStateRemotePath()) if err != nil { // If remote tf state is absent, use local tf state if strings.Contains(err.Error(), "File not found.") { @@ -119,15 +119,15 @@ func (b *Deployer) SaveTerraformState(ctx context.Context) error { if err != nil { return err } - return b.locker.PutFile(ctx, b.wsc, b.tfStateRemotePath(), bytes) + return b.locker.PutFile(ctx, b.tfStateRemotePath(), bytes) } func (d *Deployer) Lock(ctx context.Context, isForced bool) error { - return d.locker.Lock(ctx, d.wsc, isForced) + return d.locker.Lock(ctx, isForced) } func (d *Deployer) Unlock(ctx context.Context) error { - return d.locker.Unlock(ctx, d.wsc) + return d.locker.Unlock(ctx) } func (d *Deployer) ApplyTerraformConfig(ctx context.Context, configPath, terraformBinaryPath string, isForced bool) (DeploymentStatus, error) { diff --git a/bundle/deployer/locker.go b/bundle/deployer/locker.go index 4a2498b9..136203e2 100644 --- a/bundle/deployer/locker.go +++ b/bundle/deployer/locker.go @@ -4,16 +4,14 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" - "net/http" - "path" + "io" "strings" "time" - "github.com/databricks/bricks/utilities" + "github.com/databricks/bricks/libs/filer" "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/client" - "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/google/uuid" ) @@ -41,6 +39,8 @@ import ( // of exclusive access that other clients with non forcefully acquired // locks might have type Locker struct { + filer filer.Filer + // scope of the locker TargetDir string // Active == true implies exclusive access to TargetDir for the client. @@ -63,24 +63,31 @@ type LockState struct { User string } -// don't need to hold lock on TargetDir to read locker state -func GetActiveLockState(ctx context.Context, wsc *databricks.WorkspaceClient, path string) (*LockState, error) { - bytes, err := utilities.GetRawJsonFileContent(ctx, wsc, path) +// GetActiveLockState returns current lock state, irrespective of us holding it. +func (locker *Locker) GetActiveLockState(ctx context.Context) (*LockState, error) { + reader, err := locker.filer.Read(ctx, locker.RemotePath()) if err != nil { return nil, err } + + bytes, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + remoteLock := LockState{} err = json.Unmarshal(bytes, &remoteLock) if err != nil { return nil, err } + return &remoteLock, nil } // asserts whether lock is held by locker. Returns descriptive error with current // holder details if locker does not hold the lock -func (locker *Locker) assertLockHeld(ctx context.Context, wsc *databricks.WorkspaceClient) error { - activeLockState, err := GetActiveLockState(ctx, wsc, locker.RemotePath()) +func (locker *Locker) assertLockHeld(ctx context.Context) error { + activeLockState, err := locker.GetActiveLockState(ctx) if err != nil && strings.Contains(err.Error(), "File not found.") { return fmt.Errorf("no active lock on target dir: %s", err) } @@ -97,53 +104,58 @@ func (locker *Locker) assertLockHeld(ctx context.Context, wsc *databricks.Worksp } // idempotent function since overwrite is set to true -func (locker *Locker) PutFile(ctx context.Context, wsc *databricks.WorkspaceClient, pathToFile string, content []byte) error { +func (locker *Locker) PutFile(ctx context.Context, pathToFile string, content []byte) error { if !locker.Active { return fmt.Errorf("failed to put file. deploy lock not held") } - apiClient, err := client.New(wsc.Config) - if err != nil { - return err - } - apiPath := fmt.Sprintf( - "/api/2.0/workspace-files/import-file/%s?overwrite=true", - strings.TrimLeft(pathToFile, "/")) - - err = apiClient.Do(ctx, http.MethodPost, apiPath, bytes.NewReader(content), nil) - if err != nil { - // retry after creating parent dirs - err = wsc.Workspace.MkdirsByPath(ctx, path.Dir(pathToFile)) - if err != nil { - return fmt.Errorf("could not mkdir to put file: %s", err) - } - err = apiClient.Do(ctx, http.MethodPost, apiPath, bytes.NewReader(content), nil) - } - return err + return locker.filer.Write(ctx, pathToFile, bytes.NewReader(content), filer.OverwriteIfExists, filer.CreateParentDirectories) } -func (locker *Locker) GetRawJsonFileContent(ctx context.Context, wsc *databricks.WorkspaceClient, path string) ([]byte, error) { +func (locker *Locker) GetRawJsonFileContent(ctx context.Context, path string) ([]byte, error) { if !locker.Active { return nil, fmt.Errorf("failed to get file. deploy lock not held") } - return utilities.GetRawJsonFileContent(ctx, wsc, path) + reader, err := locker.filer.Read(ctx, path) + if err != nil { + return nil, err + } + return io.ReadAll(reader) } -func (locker *Locker) Lock(ctx context.Context, wsc *databricks.WorkspaceClient, isForced bool) error { +func (locker *Locker) Lock(ctx context.Context, isForced bool) error { newLockerState := LockState{ ID: locker.State.ID, AcquisitionTime: time.Now(), IsForced: isForced, User: locker.State.User, } - bytes, err := json.Marshal(newLockerState) + buf, err := json.Marshal(newLockerState) if err != nil { return err } - err = utilities.WriteFile(ctx, wsc, locker.RemotePath(), bytes, isForced) - if err != nil && !strings.Contains(err.Error(), fmt.Sprintf("%s already exists", locker.RemotePath())) { - return err + + var modes = []filer.WriteMode{ + // Always create parent directory if it doesn't yet exist. + filer.CreateParentDirectories, } - err = locker.assertLockHeld(ctx, wsc) + + // Only overwrite lock file if `isForced`. + if isForced { + modes = append(modes, filer.OverwriteIfExists) + } + + err = locker.filer.Write(ctx, locker.RemotePath(), bytes.NewReader(buf), modes...) + if err != nil { + // If the write failed because the lock file already exists, don't return + // the error and instead fall through to [assertLockHeld] below. + // This function will return a more descriptive error message that includes + // details about the current holder of the lock. + if !errors.As(err, &filer.FileAlreadyExistsError{}) { + return err + } + } + + err = locker.assertLockHeld(ctx) if err != nil { return err } @@ -153,20 +165,15 @@ func (locker *Locker) Lock(ctx context.Context, wsc *databricks.WorkspaceClient, return nil } -func (locker *Locker) Unlock(ctx context.Context, wsc *databricks.WorkspaceClient) error { +func (locker *Locker) Unlock(ctx context.Context) error { if !locker.Active { return fmt.Errorf("unlock called when lock is not held") } - err := locker.assertLockHeld(ctx, wsc) + err := locker.assertLockHeld(ctx) if err != nil { return fmt.Errorf("unlock called when lock is not held: %s", err) } - err = wsc.Workspace.Delete(ctx, - workspace.Delete{ - Path: locker.RemotePath(), - Recursive: false, - }, - ) + err = locker.filer.Delete(ctx, locker.RemotePath()) if err != nil { return err } @@ -175,11 +182,19 @@ func (locker *Locker) Unlock(ctx context.Context, wsc *databricks.WorkspaceClien } func (locker *Locker) RemotePath() string { - return path.Join(locker.TargetDir, ".bundle/deploy.lock") + // Note: remote paths are scoped to `targetDir`. Also see [CreateLocker]. + return ".bundle/deploy.lock" } -func CreateLocker(user string, targetDir string) *Locker { - return &Locker{ +func CreateLocker(user string, targetDir string, w *databricks.WorkspaceClient) (*Locker, error) { + filer, err := filer.NewWorkspaceFilesClient(w, targetDir) + if err != nil { + return nil, err + } + + locker := &Locker{ + filer: filer, + TargetDir: targetDir, Active: false, State: &LockState{ @@ -187,4 +202,6 @@ func CreateLocker(user string, targetDir string) *Locker { User: user, }, } + + return locker, nil } diff --git a/internal/locker_test.go b/internal/locker_test.go index bd7c6a27..c919a84b 100644 --- a/internal/locker_test.go +++ b/internal/locker_test.go @@ -7,7 +7,6 @@ import ( "math/rand" "os" "os/exec" - "path" "path/filepath" "sync" "testing" @@ -68,12 +67,16 @@ func TestAccLock(t *testing.T) { // 50 lockers try to acquire a lock at the same time numConcurrentLocks := 50 - var err error + // Keep single locker unlocked. + // We use this to check on the current lock through GetActiveLockState. + locker, err := deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot, wsc) + require.NoError(t, err) + lockerErrs := make([]error, numConcurrentLocks) lockers := make([]*deployer.Locker, numConcurrentLocks) - for i := 0; i < numConcurrentLocks; i++ { - lockers[i] = deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot) + lockers[i], err = deployer.CreateLocker("humpty.dumpty@databricks.com", remoteProjectRoot, wsc) + require.NoError(t, err) } var wg sync.WaitGroup @@ -83,7 +86,7 @@ func TestAccLock(t *testing.T) { go func() { defer wg.Done() time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) - lockerErrs[currentIndex] = lockers[currentIndex].Lock(ctx, wsc, false) + lockerErrs[currentIndex] = lockers[currentIndex].Lock(ctx, false) }() } wg.Wait() @@ -107,7 +110,7 @@ func TestAccLock(t *testing.T) { assert.Equal(t, 1, countActive, "Exactly one locker should successfull acquire the lock") // test remote lock matches active lock - remoteLocker, err := deployer.GetActiveLockState(ctx, wsc, lockers[indexOfActiveLocker].RemotePath()) + remoteLocker, err := locker.GetActiveLockState(ctx) require.NoError(t, err) assert.Equal(t, remoteLocker.ID, lockers[indexOfActiveLocker].State.ID, "remote locker id does not match active locker") assert.True(t, remoteLocker.AcquisitionTime.Equal(lockers[indexOfActiveLocker].State.AcquisitionTime), "remote locker acquisition time does not match active locker") @@ -118,7 +121,7 @@ func TestAccLock(t *testing.T) { continue } assert.NotEqual(t, remoteLocker.ID, lockers[i].State.ID) - err := lockers[i].Unlock(ctx, wsc) + err := lockers[i].Unlock(ctx) assert.ErrorContains(t, err, "unlock called when lock is not held") } @@ -127,16 +130,16 @@ func TestAccLock(t *testing.T) { if i == indexOfActiveLocker { continue } - err := lockers[i].PutFile(ctx, wsc, path.Join(remoteProjectRoot, "foo.json"), []byte(`'{"surname":"Khan", "name":"Shah Rukh"}`)) + err := lockers[i].PutFile(ctx, "foo.json", []byte(`'{"surname":"Khan", "name":"Shah Rukh"}`)) assert.ErrorContains(t, err, "failed to put file. deploy lock not held") } // active locker file write succeeds - err = lockers[indexOfActiveLocker].PutFile(ctx, wsc, path.Join(remoteProjectRoot, "foo.json"), []byte(`{"surname":"Khan", "name":"Shah Rukh"}`)) + err = lockers[indexOfActiveLocker].PutFile(ctx, "foo.json", []byte(`{"surname":"Khan", "name":"Shah Rukh"}`)) assert.NoError(t, err) // active locker file read succeeds with expected results - bytes, err := lockers[indexOfActiveLocker].GetRawJsonFileContent(ctx, wsc, path.Join(remoteProjectRoot, "foo.json")) + bytes, err := lockers[indexOfActiveLocker].GetRawJsonFileContent(ctx, "foo.json") var res map[string]string json.Unmarshal(bytes, &res) assert.NoError(t, err) @@ -148,21 +151,21 @@ func TestAccLock(t *testing.T) { if i == indexOfActiveLocker { continue } - _, err = lockers[i].GetRawJsonFileContent(ctx, wsc, path.Join(remoteProjectRoot, "foo.json")) + _, err = lockers[i].GetRawJsonFileContent(ctx, "foo.json") assert.ErrorContains(t, err, "failed to get file. deploy lock not held") } // Unlock active lock and check it becomes inactive - err = lockers[indexOfActiveLocker].Unlock(ctx, wsc) + err = lockers[indexOfActiveLocker].Unlock(ctx) assert.NoError(t, err) - remoteLocker, err = deployer.GetActiveLockState(ctx, wsc, lockers[indexOfActiveLocker].RemotePath()) + remoteLocker, err = locker.GetActiveLockState(ctx) assert.ErrorContains(t, err, "File not found.", "remote lock file not deleted on unlock") assert.Nil(t, remoteLocker) assert.False(t, lockers[indexOfActiveLocker].Active) // A locker that failed to acquire the lock should now be able to acquire it assert.False(t, lockers[indexOfAnInactiveLocker].Active) - err = lockers[indexOfAnInactiveLocker].Lock(ctx, wsc, false) + err = lockers[indexOfAnInactiveLocker].Lock(ctx, false) assert.NoError(t, err) assert.True(t, lockers[indexOfAnInactiveLocker].Active) } diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 512f5f25..a3efe019 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -17,7 +17,10 @@ import ( "golang.org/x/exp/slices" ) -// WorkspaceFilesClient implements the Files-in-Workspace API. +// WorkspaceFilesClient implements the files-in-workspace API. + +// NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled. +// It can access any workspace path if files-in-workspace is enabled. type WorkspaceFilesClient struct { workspaceClient *databricks.WorkspaceClient apiClient *client.DatabricksClient diff --git a/utilities/workspace_files.go b/utilities/workspace_files.go deleted file mode 100644 index d3428d83..00000000 --- a/utilities/workspace_files.go +++ /dev/null @@ -1,60 +0,0 @@ -package utilities - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "net/http" - "path" - "strconv" - "strings" - - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/client" -) - -// NOTE: This API is only available for files in /Repos if a workspace has repos -// in workspace enabled and files in workspace not enabled -// -// Get the file contents of a json file in workspace -// TODO(Nov 2022): add method in go sdk to get the raw bytes from response of an API -// -// TODO(Nov 2022): talk to eng-files team about what the response structure would look like. -// -// This function would have to be modfified probably in the future once this -// API goes to public preview -func GetRawJsonFileContent(ctx context.Context, wsc *databricks.WorkspaceClient, path string) ([]byte, error) { - apiClient, err := client.New(wsc.Config) - if err != nil { - return nil, err - } - exportApiPath := fmt.Sprintf( - "/api/2.0/workspace-files/%s", - strings.TrimLeft(path, "/")) - - var res json.RawMessage - - err = apiClient.Do(ctx, http.MethodGet, exportApiPath, nil, &res) - if err != nil { - return nil, fmt.Errorf("failed to fetch file %s: %s", path, err) - } - return res, nil -} - -func WriteFile(ctx context.Context, wsc *databricks.WorkspaceClient, pathToFile string, content []byte, overwrite bool) error { - apiClient, err := client.New(wsc.Config) - if err != nil { - return err - } - err = wsc.Workspace.MkdirsByPath(ctx, path.Dir(pathToFile)) - if err != nil { - return fmt.Errorf("could not mkdir to post file: %s", err) - } - - importApiPath := fmt.Sprintf( - "/api/2.0/workspace-files/import-file/%s?overwrite=%s", - strings.TrimLeft(pathToFile, "/"), strconv.FormatBool(overwrite)) - - return apiClient.Do(ctx, http.MethodPost, importApiPath, bytes.NewReader(content), nil) -}