Merge remote-tracking branch 'upstream/main' into add-debug-runs

This commit is contained in:
Lennart Kats 2023-06-20 11:34:47 +02:00
commit b7a80af946
15 changed files with 520 additions and 35 deletions

View File

@ -1,5 +1,29 @@
# Version changelog
## 0.200.0
This version marks the first version available as public preview.
The minor bump to 200 better disambiguates between Databricks CLI "v1" (the Python version)
and this version, Databricks CLI "v2". The minor version of 0.100 may look lower than 0.17
to some, whereas 200 does not. This bump has no other significance.
CLI:
* Add filer.Filer implementation backed by the Files API ([#474](https://github.com/databricks/cli/pull/474)).
* Add fs cp command ([#463](https://github.com/databricks/cli/pull/463)).
* Correctly set ExactArgs if generated command has positional arguments ([#488](https://github.com/databricks/cli/pull/488)).
* Do not use white color as string output ([#489](https://github.com/databricks/cli/pull/489)).
* Update README to reflect public preview status ([#491](https://github.com/databricks/cli/pull/491)).
Bundles:
* Fix force flag not working for bundle destroy ([#434](https://github.com/databricks/cli/pull/434)).
* Fix locker unlock for destroy ([#492](https://github.com/databricks/cli/pull/492)).
* Use better error assertions and clean up locker API ([#490](https://github.com/databricks/cli/pull/490)).
Dependencies:
* Bump golang.org/x/mod from 0.10.0 to 0.11.0 ([#496](https://github.com/databricks/cli/pull/496)).
* Bump golang.org/x/sync from 0.2.0 to 0.3.0 ([#495](https://github.com/databricks/cli/pull/495)).
## 0.100.4
CLI:

View File

@ -2,15 +2,26 @@ package lock
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/locker"
"github.com/databricks/cli/libs/log"
)
type release struct{}
type Goal string
func Release() bundle.Mutator {
return &release{}
const (
GoalDeploy = Goal("deploy")
GoalDestroy = Goal("destroy")
)
type release struct {
goal Goal
}
func Release(goal Goal) bundle.Mutator {
return &release{goal}
}
func (m *release) Name() string {
@ -32,11 +43,12 @@ func (m *release) Apply(ctx context.Context, b *bundle.Bundle) error {
}
log.Infof(ctx, "Releasing deployment lock")
err := b.Locker.Unlock(ctx)
if err != nil {
log.Errorf(ctx, "Failed to release deployment lock: %v", err)
return err
switch m.goal {
case GoalDeploy:
return b.Locker.Unlock(ctx)
case GoalDestroy:
return b.Locker.Unlock(ctx, locker.AllowLockFileNotExist)
default:
return fmt.Errorf("unknown goal for lock release: %s", m.goal)
}
return nil
}

View File

@ -108,6 +108,7 @@ func (d *Deployer) LoadTerraformState(ctx context.Context) error {
if err != nil {
return err
}
defer r.Close()
err = os.MkdirAll(d.DefaultTerraformRoot(), os.ModeDir)
if err != nil {
return err

View File

@ -22,7 +22,7 @@ func Deploy() bundle.Mutator {
terraform.Apply(),
terraform.StatePush(),
),
lock.Release(),
lock.Release(lock.GoalDeploy),
),
)

View File

@ -20,7 +20,7 @@ func Destroy() bundle.Mutator {
terraform.StatePush(),
files.Delete(),
),
lock.Release(),
lock.Release(lock.GoalDestroy),
),
)

View File

@ -22,7 +22,7 @@ var deployCmd = &cobra.Command{
func deploy(cmd *cobra.Command, b *bundle.Bundle) error {
// If `--force` is specified, force acquisition of the deployment lock.
b.Config.Bundle.Lock.Force = force
b.Config.Bundle.Lock.Force = forceDeploy
if computeID == "" {
computeID = os.Getenv("DATABRICKS_COMPUTE")
@ -35,11 +35,11 @@ func deploy(cmd *cobra.Command, b *bundle.Bundle) error {
))
}
var force bool
var forceDeploy bool
var computeID string
func init() {
AddCommand(deployCmd)
deployCmd.Flags().BoolVar(&force, "force", false, "Force acquisition of deployment lock.")
deployCmd.Flags().BoolVar(&forceDeploy, "force", false, "Force acquisition of deployment lock.")
deployCmd.Flags().StringVar(&computeID, "compute", "", "Override compute in the deployment with the given compute ID.")
}

View File

@ -22,7 +22,7 @@ var destroyCmd = &cobra.Command{
b := bundle.Get(ctx)
// If `--force` is specified, force acquisition of the deployment lock.
b.Config.Bundle.Lock.Force = force
b.Config.Bundle.Lock.Force = forceDestroy
// If `--auto-approve`` is specified, we skip confirmation checks
b.AutoApprove = autoApprove
@ -51,8 +51,10 @@ var destroyCmd = &cobra.Command{
}
var autoApprove bool
var forceDestroy bool
func init() {
AddCommand(destroyCmd)
destroyCmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals for deleting resources and files")
destroyCmd.Flags().BoolVar(&forceDestroy, "force", false, "Force acquisition of deployment lock.")
}

View File

@ -105,7 +105,7 @@ func init() {
runOptions.Define(runCmd.Flags())
rootCmd.AddCommand(runCmd)
runCmd.Flags().BoolVar(&deployFlag, "deploy", false, "Call deploy before run.")
runCmd.Flags().BoolVar(&force, "force", false, "Force acquisition of deployment lock.")
runCmd.Flags().BoolVar(&forceDeploy, "force", false, "Force acquisition of deployment lock.")
runCmd.Flags().BoolVar(&noWait, "no-wait", false, "Don't wait for the run to complete.")
runCmd.Flags().StringVar(&computeID, "compute", "", "Override compute in the deployment with the given compute ID.")
}

View File

@ -28,6 +28,11 @@ func filerForPath(ctx context.Context, fullPath string) (filer.Filer, string, er
switch scheme {
case DbfsScheme:
w := root.WorkspaceClient(ctx)
// If the specified path has the "Volumes" prefix, use the Files API.
if strings.HasPrefix(path, "Volumes/") {
f, err := filer.NewFilesClient(w, "/")
return f, path, err
}
f, err := filer.NewDbfsClient(w, "/")
return f, path, err

4
go.mod
View File

@ -23,9 +23,9 @@ require (
github.com/stretchr/testify v1.8.4 // MIT
github.com/whilp/git-urls v1.0.0 // MIT
golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0
golang.org/x/mod v0.10.0
golang.org/x/mod v0.11.0
golang.org/x/oauth2 v0.9.0
golang.org/x/sync v0.2.0
golang.org/x/sync v0.3.0
golang.org/x/term v0.9.0
golang.org/x/text v0.10.0
gopkg.in/ini.v1 v1.67.0 // Apache 2.0

8
go.sum
View File

@ -172,8 +172,8 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk=
golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -198,8 +198,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.2.0 h1:PUR+T4wwASmuSTYdKjYHI5TD22Wy5ogLU5qZCOLxBrI=
golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View File

@ -82,7 +82,7 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
assert.NoError(t, err)
// Stat on a directory should succeed.
// Note: size and modification time behave differently between WSFS and DBFS.
// Note: size and modification time behave differently between backends.
info, err := f.Stat(ctx, "/foo")
require.NoError(t, err)
assert.Equal(t, "foo", info.Name())
@ -90,7 +90,7 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
assert.Equal(t, true, info.IsDir())
// Stat on a file should succeed.
// Note: size and modification time behave differently between WSFS and DBFS.
// Note: size and modification time behave differently between backends.
info, err = f.Stat(ctx, "/foo/bar")
require.NoError(t, err)
assert.Equal(t, "bar", info.Name())
@ -465,3 +465,123 @@ func TestAccFilerLocalReadDir(t *testing.T) {
ctx, f := setupFilerLocalTest(t)
runFilerReadDirTest(t, ctx, f)
}
func temporaryVolumeDir(t *testing.T, w *databricks.WorkspaceClient) string {
// Assume this test is run against the internal testing workspace.
path := RandomName("/Volumes/bogdanghita/default/v3_shared/cli-testing/integration-test-filer-")
// The Files API doesn't include support for creating and removing directories yet.
// Directories are created implicitly by writing a file to a path that doesn't exist.
// We therefore assume we can use the specified path without creating it first.
t.Logf("using dbfs:%s", path)
return path
}
func setupFilerFilesApiTest(t *testing.T) (context.Context, filer.Filer) {
t.SkipNow() // until available on prod
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := temporaryVolumeDir(t, w)
f, err := filer.NewFilesClient(w, tmpdir)
require.NoError(t, err)
return ctx, f
}
func TestAccFilerFilesApiReadWrite(t *testing.T) {
ctx, f := setupFilerFilesApiTest(t)
// The Files API doesn't know about directories yet.
// Below is a copy of [runFilerReadWriteTest] with
// assertions that don't work commented out.
var err error
// Write should fail because the root path doesn't yet exist.
// err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`))
// assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
// assert.True(t, errors.Is(err, fs.ErrNotExist))
// Read should fail because the root path doesn't yet exist.
_, err = f.Read(ctx, "/foo/bar")
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
assert.True(t, errors.Is(err, fs.ErrNotExist))
// Read should fail because the path points to a directory
// err = f.Mkdir(ctx, "/dir")
// require.NoError(t, err)
// _, err = f.Read(ctx, "/dir")
// assert.ErrorIs(t, err, fs.ErrInvalid)
// Write with CreateParentDirectories flag should succeed.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories)
assert.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`)
// Write should fail because there is an existing file at the specified path.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`))
assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{}))
assert.True(t, errors.Is(err, fs.ErrExist))
// Write with OverwriteIfExists should succeed.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists)
assert.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`)
// Write should succeed if there is no existing file at the specified path.
err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`))
assert.NoError(t, err)
// Stat on a directory should succeed.
// Note: size and modification time behave differently between backends.
info, err := f.Stat(ctx, "/foo")
require.NoError(t, err)
assert.Equal(t, "foo", info.Name())
assert.True(t, info.Mode().IsDir())
assert.Equal(t, true, info.IsDir())
// Stat on a file should succeed.
// Note: size and modification time behave differently between backends.
info, err = f.Stat(ctx, "/foo/bar")
require.NoError(t, err)
assert.Equal(t, "bar", info.Name())
assert.True(t, info.Mode().IsRegular())
assert.Equal(t, false, info.IsDir())
// Delete should fail if the file doesn't exist.
err = f.Delete(ctx, "/doesnt_exist")
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
assert.True(t, errors.Is(err, fs.ErrNotExist))
// Stat should fail if the file doesn't exist.
_, err = f.Stat(ctx, "/doesnt_exist")
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
assert.True(t, errors.Is(err, fs.ErrNotExist))
// Delete should succeed for file that does exist.
err = f.Delete(ctx, "/foo/bar")
assert.NoError(t, err)
// Delete should fail for a non-empty directory.
err = f.Delete(ctx, "/foo")
assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{}))
assert.True(t, errors.Is(err, fs.ErrInvalid))
// Delete should succeed for a non-empty directory if the DeleteRecursively flag is set.
// err = f.Delete(ctx, "/foo", filer.DeleteRecursively)
// assert.NoError(t, err)
// Delete of the filer root should ALWAYS fail, otherwise subsequent writes would fail.
// It is not in the filer's purview to delete its root directory.
err = f.Delete(ctx, "/")
assert.True(t, errors.As(err, &filer.CannotDeleteRootError{}))
assert.True(t, errors.Is(err, fs.ErrInvalid))
}
func TestAccFilerFilesApiReadDir(t *testing.T) {
t.Skipf("no support for ReadDir yet")
ctx, f := setupFilerFilesApiTest(t)
runFilerReadDirTest(t, ctx, f)
}

View File

@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/databricks/cli/libs/filer"
lockpkg "github.com/databricks/cli/libs/locker"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/workspace"
@ -126,6 +127,7 @@ func TestAccLock(t *testing.T) {
// read active locker file
r, err := lockers[indexOfActiveLocker].Read(ctx, "foo.json")
require.NoError(t, err)
defer r.Close()
b, err := io.ReadAll(r)
require.NoError(t, err)
@ -159,3 +161,67 @@ func TestAccLock(t *testing.T) {
assert.NoError(t, err)
assert.True(t, lockers[indexOfAnInactiveLocker].Active)
}
func setupLockerTest(ctx context.Context, t *testing.T) (*lockpkg.Locker, filer.Filer) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
// create temp wsfs dir
tmpDir := temporaryWorkspaceDir(t, w)
f, err := filer.NewWorkspaceFilesClient(w, tmpDir)
require.NoError(t, err)
// create locker
locker, err := lockpkg.CreateLocker("redfoo@databricks.com", tmpDir, w)
require.NoError(t, err)
return locker, f
}
func TestAccLockUnlockWithoutAllowsLockFileNotExist(t *testing.T) {
ctx := context.Background()
locker, f := setupLockerTest(ctx, t)
var err error
// Acquire lock on tmp directory
err = locker.Lock(ctx, false)
require.NoError(t, err)
// Assert lock file is created
_, err = f.Stat(ctx, "deploy.lock")
assert.NoError(t, err)
// Manually delete lock file
err = f.Delete(ctx, "deploy.lock")
assert.NoError(t, err)
// Assert error, because lock file does not exist
err = locker.Unlock(ctx)
assert.ErrorIs(t, err, fs.ErrNotExist)
}
func TestAccLockUnlockWithAllowsLockFileNotExist(t *testing.T) {
ctx := context.Background()
locker, f := setupLockerTest(ctx, t)
var err error
// Acquire lock on tmp directory
err = locker.Lock(ctx, false)
require.NoError(t, err)
assert.True(t, locker.Active)
// Assert lock file is created
_, err = f.Stat(ctx, "deploy.lock")
assert.NoError(t, err)
// Manually delete lock file
err = f.Delete(ctx, "deploy.lock")
assert.NoError(t, err)
// Assert error, because lock file does not exist
err = locker.Unlock(ctx, lockpkg.AllowLockFileNotExist)
assert.NoError(t, err)
assert.False(t, locker.Active)
}

241
libs/filer/files_client.go Normal file
View File

@ -0,0 +1,241 @@
package filer
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"net/url"
"path"
"strings"
"time"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client"
"golang.org/x/exp/slices"
)
// Type that implements fs.FileInfo for the Files API.
type filesApiFileInfo struct {
absPath string
isDir bool
}
func (info filesApiFileInfo) Name() string {
return path.Base(info.absPath)
}
func (info filesApiFileInfo) Size() int64 {
// No way to get the file size in the Files API.
return 0
}
func (info filesApiFileInfo) Mode() fs.FileMode {
mode := fs.ModePerm
if info.isDir {
mode |= fs.ModeDir
}
return mode
}
func (info filesApiFileInfo) ModTime() time.Time {
return time.Time{}
}
func (info filesApiFileInfo) IsDir() bool {
return info.isDir
}
func (info filesApiFileInfo) Sys() any {
return nil
}
// FilesClient implements the [Filer] interface for the Files API backend.
type FilesClient struct {
workspaceClient *databricks.WorkspaceClient
apiClient *client.DatabricksClient
// File operations will be relative to this path.
root RootPath
}
func filesNotImplementedError(fn string) error {
return fmt.Errorf("filer.%s is not implemented for the Files API", fn)
}
func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
apiClient, err := client.New(w.Config)
if err != nil {
return nil, err
}
return &FilesClient{
workspaceClient: w,
apiClient: apiClient,
root: NewRootPath(root),
}, nil
}
func (w *FilesClient) urlPath(name string) (string, string, error) {
absPath, err := w.root.Join(name)
if err != nil {
return "", "", err
}
// The user specified part of the path must be escaped.
urlPath := fmt.Sprintf(
"/api/2.0/fs/files/%s",
url.PathEscape(strings.TrimLeft(absPath, "/")),
)
return absPath, urlPath, nil
}
func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
absPath, urlPath, err := w.urlPath(name)
if err != nil {
return err
}
overwrite := slices.Contains(mode, OverwriteIfExists)
urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite)
err = w.apiClient.Do(ctx, http.MethodPut, urlPath, reader, nil,
func(r *http.Request) error {
r.Header.Set("Content-Type", "application/octet-stream")
return nil
})
// Return early on success.
if err == nil {
return nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}
// This API returns 409 if the file already exists, when the object type is file
if aerr.StatusCode == http.StatusConflict {
return FileAlreadyExistsError{absPath}
}
return err
}
func (w *FilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {
absPath, urlPath, err := w.urlPath(name)
if err != nil {
return nil, err
}
var buf bytes.Buffer
err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &buf)
// Return early on success.
if err == nil {
return io.NopCloser(&buf), nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}
// This API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
return nil, FileDoesNotExistError{absPath}
}
return nil, err
}
func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
absPath, urlPath, err := w.urlPath(name)
if err != nil {
return err
}
// Illegal to delete the root path.
if absPath == w.root.rootPath {
return CannotDeleteRootError{}
}
err = w.apiClient.Do(ctx, http.MethodDelete, urlPath, nil, nil)
// Return early on success.
if err == nil {
return nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}
// This API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
return FileDoesNotExistError{absPath}
}
// This API returns 409 if the underlying path is a directory.
if aerr.StatusCode == http.StatusConflict {
return DirectoryNotEmptyError{absPath}
}
return err
}
func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
return nil, filesNotImplementedError("ReadDir")
}
func (w *FilesClient) Mkdir(ctx context.Context, name string) error {
// Directories are created implicitly.
// No need to do anything.
return nil
}
func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
absPath, urlPath, err := w.urlPath(name)
if err != nil {
return nil, err
}
err = w.apiClient.Do(ctx, http.MethodHead, urlPath, nil, nil,
func(r *http.Request) error {
r.Header.Del("Content-Type")
return nil
})
// If the HEAD requests succeeds, the file exists.
if err == nil {
return filesApiFileInfo{absPath: absPath, isDir: false}, nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}
// This API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
return nil, FileDoesNotExistError{absPath}
}
// This API returns 409 if the underlying path is a directory.
if aerr.StatusCode == http.StatusConflict {
return filesApiFileInfo{absPath: absPath, isDir: true}, nil
}
return nil, err
}

View File

@ -13,8 +13,17 @@ import (
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/google/uuid"
"golang.org/x/exp/slices"
)
type UnlockOption int
const (
AllowLockFileNotExist UnlockOption = iota
)
const LockFileName = "deploy.lock"
// Locker object enables exclusive access to TargetDir's scope for a client. This
// enables multiple clients to deploy to the same scope (ie TargetDir) in an atomic
// manner
@ -65,10 +74,11 @@ type LockState struct {
// GetActiveLockState returns current lock state, irrespective of us holding it.
func (locker *Locker) GetActiveLockState(ctx context.Context) (*LockState, error) {
reader, err := locker.filer.Read(ctx, locker.RemotePath())
reader, err := locker.filer.Read(ctx, LockFileName)
if err != nil {
return nil, err
}
defer reader.Close()
bytes, err := io.ReadAll(reader)
if err != nil {
@ -89,7 +99,7 @@ func (locker *Locker) GetActiveLockState(ctx context.Context) (*LockState, error
func (locker *Locker) assertLockHeld(ctx context.Context) error {
activeLockState, err := locker.GetActiveLockState(ctx)
if errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("no active lock on target dir: %s", err)
return fmt.Errorf("no active lock on target dir: %w", err)
}
if err != nil {
return err
@ -140,7 +150,7 @@ func (locker *Locker) Lock(ctx context.Context, isForced bool) error {
modes = append(modes, filer.OverwriteIfExists)
}
err = locker.filer.Write(ctx, locker.RemotePath(), bytes.NewReader(buf), modes...)
err = locker.filer.Write(ctx, LockFileName, bytes.NewReader(buf), modes...)
if err != nil {
// If the write failed because the lock file already exists, don't return
// the error and instead fall through to [assertLockHeld] below.
@ -161,15 +171,24 @@ func (locker *Locker) Lock(ctx context.Context, isForced bool) error {
return nil
}
func (locker *Locker) Unlock(ctx context.Context) error {
func (locker *Locker) Unlock(ctx context.Context, opts ...UnlockOption) error {
if !locker.Active {
return fmt.Errorf("unlock called when lock is not held")
}
// if allowLockFileNotExist is set, do not throw an error if the lock file does
// not exist. This is helpful when destroying a bundle in which case the lock
// file will be deleted before we have a chance to unlock
if _, err := locker.filer.Stat(ctx, LockFileName); errors.Is(err, fs.ErrNotExist) && slices.Contains(opts, AllowLockFileNotExist) {
locker.Active = false
return nil
}
err := locker.assertLockHeld(ctx)
if err != nil {
return fmt.Errorf("unlock called when lock is not held: %s", err)
return fmt.Errorf("unlock called when lock is not held: %w", err)
}
err = locker.filer.Delete(ctx, locker.RemotePath())
err = locker.filer.Delete(ctx, LockFileName)
if err != nil {
return err
}
@ -177,11 +196,6 @@ func (locker *Locker) Unlock(ctx context.Context) error {
return nil
}
func (locker *Locker) RemotePath() string {
// Note: remote paths are scoped to `targetDir`. Also see [CreateLocker].
return "deploy.lock"
}
func CreateLocker(user string, targetDir string, w *databricks.WorkspaceClient) (*Locker, error) {
filer, err := filer.NewWorkspaceFilesClient(w, targetDir)
if err != nil {