mirror of https://github.com/databricks/cli.git
Merge remote-tracking branch 'origin' into import_dir
This commit is contained in:
commit
1244b3a414
|
@ -2,14 +2,15 @@ package terraform
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
|
"io/fs"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/databricks/cli/bundle"
|
"github.com/databricks/cli/bundle"
|
||||||
"github.com/databricks/cli/libs/filer"
|
"github.com/databricks/cli/libs/filer"
|
||||||
"github.com/databricks/cli/libs/log"
|
"github.com/databricks/cli/libs/log"
|
||||||
"github.com/databricks/databricks-sdk-go/apierr"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type statePull struct{}
|
type statePull struct{}
|
||||||
|
@ -34,7 +35,7 @@ func (l *statePull) Apply(ctx context.Context, b *bundle.Bundle) error {
|
||||||
remote, err := f.Read(ctx, TerraformStateFileName)
|
remote, err := f.Read(ctx, TerraformStateFileName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// On first deploy this state file doesn't yet exist.
|
// On first deploy this state file doesn't yet exist.
|
||||||
if apierr.IsMissing(err) {
|
if errors.Is(err, fs.ErrNotExist) {
|
||||||
log.Infof(ctx, "Remote state file does not exist")
|
log.Infof(ctx, "Remote state file does not exist")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -14,7 +14,7 @@ require (
|
||||||
github.com/hashicorp/terraform-json v0.16.0 // MPL 2.0
|
github.com/hashicorp/terraform-json v0.16.0 // MPL 2.0
|
||||||
github.com/imdario/mergo v0.3.13 // BSD-3-Clause
|
github.com/imdario/mergo v0.3.13 // BSD-3-Clause
|
||||||
github.com/manifoldco/promptui v0.9.0 // BSD-3-Clause
|
github.com/manifoldco/promptui v0.9.0 // BSD-3-Clause
|
||||||
github.com/mattn/go-isatty v0.0.18 // MIT
|
github.com/mattn/go-isatty v0.0.19 // MIT
|
||||||
github.com/nwidger/jsoncolor v0.3.2 // MIT
|
github.com/nwidger/jsoncolor v0.3.2 // MIT
|
||||||
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // BSD-2-Clause
|
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // BSD-2-Clause
|
||||||
github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06 // MIT
|
github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06 // MIT
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -118,8 +118,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
|
||||||
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
||||||
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
|
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
|
||||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||||
github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98=
|
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
|
||||||
github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
github.com/nwidger/jsoncolor v0.3.2 h1:rVJJlwAWDJShnbTYOQ5RM7yTA20INyKXlJ/fg4JMhHQ=
|
github.com/nwidger/jsoncolor v0.3.2 h1:rVJJlwAWDJShnbTYOQ5RM7yTA20INyKXlJ/fg4JMhHQ=
|
||||||
github.com/nwidger/jsoncolor v0.3.2/go.mod h1:Cs34umxLbJvgBMnVNVqhji9BhoT/N/KinHqZptQ7cf4=
|
github.com/nwidger/jsoncolor v0.3.2/go.mod h1:Cs34umxLbJvgBMnVNVqhji9BhoT/N/KinHqZptQ7cf4=
|
||||||
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
|
github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4=
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
package internal
|
package internal
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/fs"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -12,6 +14,7 @@ import (
|
||||||
"github.com/databricks/cli/libs/filer"
|
"github.com/databricks/cli/libs/filer"
|
||||||
"github.com/databricks/databricks-sdk-go"
|
"github.com/databricks/databricks-sdk-go"
|
||||||
"github.com/databricks/databricks-sdk-go/apierr"
|
"github.com/databricks/databricks-sdk-go/apierr"
|
||||||
|
"github.com/databricks/databricks-sdk-go/service/files"
|
||||||
"github.com/databricks/databricks-sdk-go/service/workspace"
|
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -28,12 +31,113 @@ func (f filerTest) assertContents(ctx context.Context, name string, contents str
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
body, err := io.ReadAll(reader)
|
var body bytes.Buffer
|
||||||
|
_, err = io.Copy(&body, reader)
|
||||||
if !assert.NoError(f, err) {
|
if !assert.NoError(f, err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Equal(f, contents, string(body))
|
assert.Equal(f, contents, body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
// Write should fail because the root path doesn't yet exist.
|
||||||
|
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`))
|
||||||
|
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
|
||||||
|
assert.True(t, errors.Is(err, fs.ErrNotExist))
|
||||||
|
|
||||||
|
// Read should fail because the root path doesn't yet exist.
|
||||||
|
_, err = f.Read(ctx, "/foo/bar")
|
||||||
|
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
|
||||||
|
assert.True(t, errors.Is(err, fs.ErrNotExist))
|
||||||
|
|
||||||
|
// Write with CreateParentDirectories flag should succeed.
|
||||||
|
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`)
|
||||||
|
|
||||||
|
// Write should fail because there is an existing file at the specified path.
|
||||||
|
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`))
|
||||||
|
assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{}))
|
||||||
|
assert.True(t, errors.Is(err, fs.ErrExist))
|
||||||
|
|
||||||
|
// Write with OverwriteIfExists should succeed.
|
||||||
|
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`)
|
||||||
|
|
||||||
|
// Delete should fail if the file doesn't exist.
|
||||||
|
err = f.Delete(ctx, "/doesnt_exist")
|
||||||
|
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
|
||||||
|
assert.True(t, errors.Is(err, fs.ErrNotExist))
|
||||||
|
|
||||||
|
// Delete should succeed for file that does exist.
|
||||||
|
err = f.Delete(ctx, "/foo/bar")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) {
|
||||||
|
var err error
|
||||||
|
var info fs.FileInfo
|
||||||
|
|
||||||
|
// We start with an empty directory.
|
||||||
|
entries, err := f.ReadDir(ctx, ".")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, entries, 0)
|
||||||
|
|
||||||
|
// Write a file.
|
||||||
|
err = f.Write(ctx, "/hello.txt", strings.NewReader(`hello world`))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create a directory.
|
||||||
|
err = f.Mkdir(ctx, "/dir")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Write a file.
|
||||||
|
err = f.Write(ctx, "/dir/world.txt", strings.NewReader(`hello world`))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create a nested directory (check that it creates intermediate directories).
|
||||||
|
err = f.Mkdir(ctx, "/dir/a/b/c")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Expect an error if the path doesn't exist.
|
||||||
|
_, err = f.ReadDir(ctx, "/dir/a/b/c/d/e")
|
||||||
|
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}), err)
|
||||||
|
assert.True(t, errors.Is(err, fs.ErrNotExist))
|
||||||
|
|
||||||
|
// Expect two entries in the root.
|
||||||
|
entries, err = f.ReadDir(ctx, ".")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, entries, 2)
|
||||||
|
assert.Equal(t, "dir", entries[0].Name())
|
||||||
|
assert.True(t, entries[0].IsDir())
|
||||||
|
assert.Equal(t, "hello.txt", entries[1].Name())
|
||||||
|
assert.False(t, entries[1].IsDir())
|
||||||
|
info, err = entries[1].Info()
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Greater(t, info.ModTime().Unix(), int64(0))
|
||||||
|
|
||||||
|
// Expect two entries in the directory.
|
||||||
|
entries, err = f.ReadDir(ctx, "/dir")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, entries, 2)
|
||||||
|
assert.Equal(t, "a", entries[0].Name())
|
||||||
|
assert.True(t, entries[0].IsDir())
|
||||||
|
assert.Equal(t, "world.txt", entries[1].Name())
|
||||||
|
assert.False(t, entries[1].IsDir())
|
||||||
|
info, err = entries[1].Info()
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Greater(t, info.ModTime().Unix(), int64(0))
|
||||||
|
|
||||||
|
// Expect a single entry in the nested path.
|
||||||
|
entries, err = f.ReadDir(ctx, "/dir/a/b")
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Len(t, entries, 1)
|
||||||
|
assert.Equal(t, "c", entries[0].Name())
|
||||||
|
assert.True(t, entries[0].IsDir())
|
||||||
}
|
}
|
||||||
|
|
||||||
func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||||
|
@ -41,7 +145,7 @@ func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||||
me, err := w.CurrentUser.Me(ctx)
|
me, err := w.CurrentUser.Me(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("wsfs-files-"))
|
path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-filer-wsfs-"))
|
||||||
|
|
||||||
// Ensure directory exists, but doesn't exist YET!
|
// Ensure directory exists, but doesn't exist YET!
|
||||||
// Otherwise we could inadvertently remove a directory that already exists on cleanup.
|
// Otherwise we could inadvertently remove a directory that already exists on cleanup.
|
||||||
|
@ -59,7 +163,7 @@ func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||||
if err == nil || apierr.IsMissing(err) {
|
if err == nil || apierr.IsMissing(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.Logf("unable to remove temporary workspace path %s: %#v", path, err)
|
t.Logf("unable to remove temporary workspace directory %s: %#v", path, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
return path
|
return path
|
||||||
|
@ -85,90 +189,57 @@ func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) {
|
func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) {
|
||||||
var err error
|
|
||||||
|
|
||||||
ctx, f := setupWorkspaceFilesTest(t)
|
ctx, f := setupWorkspaceFilesTest(t)
|
||||||
|
runFilerReadWriteTest(t, ctx, f)
|
||||||
// Write should fail because the root path doesn't yet exist.
|
|
||||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`))
|
|
||||||
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
|
|
||||||
|
|
||||||
// Read should fail because the root path doesn't yet exist.
|
|
||||||
_, err = f.Read(ctx, "/foo/bar")
|
|
||||||
assert.True(t, apierr.IsMissing(err))
|
|
||||||
|
|
||||||
// Write with CreateParentDirectories flag should succeed.
|
|
||||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`)
|
|
||||||
|
|
||||||
// Write should fail because there is an existing file at the specified path.
|
|
||||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`))
|
|
||||||
assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{}))
|
|
||||||
|
|
||||||
// Write with OverwriteIfExists should succeed.
|
|
||||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`)
|
|
||||||
|
|
||||||
// Delete should fail if the file doesn't exist.
|
|
||||||
err = f.Delete(ctx, "/doesnt_exist")
|
|
||||||
assert.True(t, apierr.IsMissing(err))
|
|
||||||
|
|
||||||
// Delete should succeed for file that does exist.
|
|
||||||
err = f.Delete(ctx, "/foo/bar")
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAccFilerWorkspaceFilesReadDir(t *testing.T) {
|
func TestAccFilerWorkspaceFilesReadDir(t *testing.T) {
|
||||||
var err error
|
|
||||||
|
|
||||||
ctx, f := setupWorkspaceFilesTest(t)
|
ctx, f := setupWorkspaceFilesTest(t)
|
||||||
|
runFilerReadDirTest(t, ctx, f)
|
||||||
// We start with an empty directory.
|
}
|
||||||
entries, err := f.ReadDir(ctx, ".")
|
|
||||||
require.NoError(t, err)
|
func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||||
assert.Len(t, entries, 0)
|
ctx := context.Background()
|
||||||
|
path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-filer-dbfs-"))
|
||||||
// Write a file.
|
|
||||||
err = f.Write(ctx, "/hello.txt", strings.NewReader(`hello world`))
|
// This call fails if the path already exists.
|
||||||
require.NoError(t, err)
|
t.Logf("mkdir dbfs:%s", path)
|
||||||
|
err := w.Dbfs.MkdirsByPath(ctx, path)
|
||||||
// Create a directory.
|
require.NoError(t, err)
|
||||||
err = f.Mkdir(ctx, "/dir")
|
|
||||||
require.NoError(t, err)
|
// Remove test directory on test completion.
|
||||||
|
t.Cleanup(func() {
|
||||||
// Write a file.
|
t.Logf("rm -rf dbfs:%s", path)
|
||||||
err = f.Write(ctx, "/dir/world.txt", strings.NewReader(`hello world`))
|
err := w.Dbfs.Delete(ctx, files.Delete{
|
||||||
require.NoError(t, err)
|
Path: path,
|
||||||
|
Recursive: true,
|
||||||
// Create a nested directory (check that it creates intermediate directories).
|
})
|
||||||
err = f.Mkdir(ctx, "/dir/a/b/c")
|
if err == nil || apierr.IsMissing(err) {
|
||||||
require.NoError(t, err)
|
return
|
||||||
|
}
|
||||||
// Expect an error if the path doesn't exist.
|
t.Logf("unable to remove temporary dbfs directory %s: %#v", path, err)
|
||||||
_, err = f.ReadDir(ctx, "/dir/a/b/c/d/e")
|
})
|
||||||
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
|
|
||||||
|
return path
|
||||||
// Expect two entries in the root.
|
}
|
||||||
entries, err = f.ReadDir(ctx, ".")
|
|
||||||
require.NoError(t, err)
|
func setupFilerDbfsTest(t *testing.T) (context.Context, filer.Filer) {
|
||||||
assert.Len(t, entries, 2)
|
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||||
assert.Equal(t, "dir", entries[0].Name)
|
|
||||||
assert.Equal(t, "hello.txt", entries[1].Name)
|
ctx := context.Background()
|
||||||
assert.Greater(t, entries[1].ModTime.Unix(), int64(0))
|
w := databricks.Must(databricks.NewWorkspaceClient())
|
||||||
|
tmpdir := temporaryDbfsDir(t, w)
|
||||||
// Expect two entries in the directory.
|
f, err := filer.NewDbfsClient(w, tmpdir)
|
||||||
entries, err = f.ReadDir(ctx, "/dir")
|
require.NoError(t, err)
|
||||||
require.NoError(t, err)
|
return ctx, f
|
||||||
assert.Len(t, entries, 2)
|
}
|
||||||
assert.Equal(t, "a", entries[0].Name)
|
|
||||||
assert.Equal(t, "world.txt", entries[1].Name)
|
func TestAccFilerDbfsReadWrite(t *testing.T) {
|
||||||
assert.Greater(t, entries[1].ModTime.Unix(), int64(0))
|
ctx, f := setupFilerDbfsTest(t)
|
||||||
|
runFilerReadWriteTest(t, ctx, f)
|
||||||
// Expect a single entry in the nested path.
|
}
|
||||||
entries, err = f.ReadDir(ctx, "/dir/a/b")
|
|
||||||
require.NoError(t, err)
|
func TestAccFilerDbfsReadDir(t *testing.T) {
|
||||||
assert.Len(t, entries, 1)
|
ctx, f := setupFilerDbfsTest(t)
|
||||||
assert.Equal(t, "c", entries[0].Name)
|
runFilerReadDirTest(t, ctx, f)
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -142,7 +143,7 @@ func TestAccLock(t *testing.T) {
|
||||||
err = lockers[indexOfActiveLocker].Unlock(ctx)
|
err = lockers[indexOfActiveLocker].Unlock(ctx)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
remoteLocker, err = locker.GetActiveLockState(ctx)
|
remoteLocker, err = locker.GetActiveLockState(ctx)
|
||||||
assert.ErrorContains(t, err, "File not found.", "remote lock file not deleted on unlock")
|
assert.ErrorIs(t, err, fs.ErrNotExist, "remote lock file not deleted on unlock")
|
||||||
assert.Nil(t, remoteLocker)
|
assert.Nil(t, remoteLocker)
|
||||||
assert.False(t, lockers[indexOfActiveLocker].Active)
|
assert.False(t, lockers[indexOfActiveLocker].Active)
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,242 @@
|
||||||
|
package filer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"io/fs"
|
||||||
|
"net/http"
|
||||||
|
"path"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/databricks/databricks-sdk-go"
|
||||||
|
"github.com/databricks/databricks-sdk-go/apierr"
|
||||||
|
"github.com/databricks/databricks-sdk-go/service/files"
|
||||||
|
"golang.org/x/exp/slices"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Type that implements fs.DirEntry for DBFS.
|
||||||
|
type dbfsDirEntry struct {
|
||||||
|
dbfsFileInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (entry dbfsDirEntry) Type() fs.FileMode {
|
||||||
|
typ := fs.ModePerm
|
||||||
|
if entry.fi.IsDir {
|
||||||
|
typ |= fs.ModeDir
|
||||||
|
}
|
||||||
|
return typ
|
||||||
|
}
|
||||||
|
|
||||||
|
func (entry dbfsDirEntry) Info() (fs.FileInfo, error) {
|
||||||
|
return entry.dbfsFileInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type that implements fs.FileInfo for DBFS.
|
||||||
|
type dbfsFileInfo struct {
|
||||||
|
fi files.FileInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info dbfsFileInfo) Name() string {
|
||||||
|
return path.Base(info.fi.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info dbfsFileInfo) Size() int64 {
|
||||||
|
return info.fi.FileSize
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info dbfsFileInfo) Mode() fs.FileMode {
|
||||||
|
return fs.ModePerm
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info dbfsFileInfo) ModTime() time.Time {
|
||||||
|
return time.UnixMilli(info.fi.ModificationTime)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info dbfsFileInfo) IsDir() bool {
|
||||||
|
return info.fi.IsDir
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info dbfsFileInfo) Sys() any {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DbfsClient implements the [Filer] interface for the DBFS backend.
|
||||||
|
type DbfsClient struct {
|
||||||
|
workspaceClient *databricks.WorkspaceClient
|
||||||
|
|
||||||
|
// File operations will be relative to this path.
|
||||||
|
root RootPath
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
|
||||||
|
return &DbfsClient{
|
||||||
|
workspaceClient: w,
|
||||||
|
|
||||||
|
root: NewRootPath(root),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
|
||||||
|
absPath, err := w.root.Join(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
fileMode := files.FileModeWrite
|
||||||
|
if slices.Contains(mode, OverwriteIfExists) {
|
||||||
|
fileMode |= files.FileModeOverwrite
|
||||||
|
}
|
||||||
|
|
||||||
|
// Issue info call before write because it automatically creates parent directories.
|
||||||
|
//
|
||||||
|
// For discussion: we could decide this is actually convenient, remove the call below,
|
||||||
|
// and apply the same semantics for the WSFS filer.
|
||||||
|
//
|
||||||
|
if !slices.Contains(mode, CreateParentDirectories) {
|
||||||
|
_, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, path.Dir(absPath))
|
||||||
|
if err != nil {
|
||||||
|
var aerr *apierr.APIError
|
||||||
|
if !errors.As(err, &aerr) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This API returns a 404 if the file doesn't exist.
|
||||||
|
if aerr.StatusCode == http.StatusNotFound {
|
||||||
|
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||||
|
return NoSuchDirectoryError{path.Dir(absPath)}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, fileMode)
|
||||||
|
if err != nil {
|
||||||
|
var aerr *apierr.APIError
|
||||||
|
if !errors.As(err, &aerr) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This API returns a 400 if the file already exists.
|
||||||
|
if aerr.StatusCode == http.StatusBadRequest {
|
||||||
|
if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" {
|
||||||
|
return FileAlreadyExistsError{absPath}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = io.Copy(handle, reader)
|
||||||
|
cerr := handle.Close()
|
||||||
|
if err == nil {
|
||||||
|
err = cerr
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) {
|
||||||
|
absPath, err := w.root.Join(name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead)
|
||||||
|
if err != nil {
|
||||||
|
var aerr *apierr.APIError
|
||||||
|
if !errors.As(err, &aerr) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This API returns a 404 if the file doesn't exist.
|
||||||
|
if aerr.StatusCode == http.StatusNotFound {
|
||||||
|
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||||
|
return nil, FileDoesNotExistError{absPath}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return handle, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *DbfsClient) Delete(ctx context.Context, name string) error {
|
||||||
|
absPath, err := w.root.Join(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Issue info call before delete because delete succeeds if the specified path doesn't exist.
|
||||||
|
//
|
||||||
|
// For discussion: we could decide this is actually convenient, remove the call below,
|
||||||
|
// and apply the same semantics for the WSFS filer.
|
||||||
|
//
|
||||||
|
_, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, absPath)
|
||||||
|
if err != nil {
|
||||||
|
var aerr *apierr.APIError
|
||||||
|
if !errors.As(err, &aerr) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This API returns a 404 if the file doesn't exist.
|
||||||
|
if aerr.StatusCode == http.StatusNotFound {
|
||||||
|
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||||
|
return FileDoesNotExistError{absPath}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return w.workspaceClient.Dbfs.Delete(ctx, files.Delete{
|
||||||
|
Path: absPath,
|
||||||
|
Recursive: false,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *DbfsClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
||||||
|
absPath, err := w.root.Join(name)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := w.workspaceClient.Dbfs.ListByPath(ctx, absPath)
|
||||||
|
if err != nil {
|
||||||
|
var aerr *apierr.APIError
|
||||||
|
if !errors.As(err, &aerr) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// This API returns a 404 if the file doesn't exist.
|
||||||
|
if aerr.StatusCode == http.StatusNotFound {
|
||||||
|
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||||
|
return nil, NoSuchDirectoryError{absPath}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
info := make([]fs.DirEntry, len(res.Files))
|
||||||
|
for i, v := range res.Files {
|
||||||
|
info[i] = dbfsDirEntry{dbfsFileInfo: dbfsFileInfo{fi: v}}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort by name for parity with os.ReadDir.
|
||||||
|
sort.Slice(info, func(i, j int) bool { return info[i].Name() < info[j].Name() })
|
||||||
|
return info, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *DbfsClient) Mkdir(ctx context.Context, name string) error {
|
||||||
|
dirPath, err := w.root.Join(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return w.workspaceClient.Dbfs.MkdirsByPath(ctx, dirPath)
|
||||||
|
}
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"time"
|
"io/fs"
|
||||||
)
|
)
|
||||||
|
|
||||||
type WriteMode int
|
type WriteMode int
|
||||||
|
@ -14,22 +14,6 @@ const (
|
||||||
CreateParentDirectories = iota << 1
|
CreateParentDirectories = iota << 1
|
||||||
)
|
)
|
||||||
|
|
||||||
// FileInfo abstracts over file information from different file systems.
|
|
||||||
// Inspired by https://pkg.go.dev/io/fs#FileInfo.
|
|
||||||
type FileInfo struct {
|
|
||||||
// The type of the file in workspace.
|
|
||||||
Type string
|
|
||||||
|
|
||||||
// Base name.
|
|
||||||
Name string
|
|
||||||
|
|
||||||
// Size in bytes.
|
|
||||||
Size int64
|
|
||||||
|
|
||||||
// Modification time.
|
|
||||||
ModTime time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
type FileAlreadyExistsError struct {
|
type FileAlreadyExistsError struct {
|
||||||
path string
|
path string
|
||||||
}
|
}
|
||||||
|
@ -38,6 +22,22 @@ func (err FileAlreadyExistsError) Error() string {
|
||||||
return fmt.Sprintf("file already exists: %s", err.path)
|
return fmt.Sprintf("file already exists: %s", err.path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (err FileAlreadyExistsError) Is(other error) bool {
|
||||||
|
return other == fs.ErrExist
|
||||||
|
}
|
||||||
|
|
||||||
|
type FileDoesNotExistError struct {
|
||||||
|
path string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (err FileDoesNotExistError) Is(other error) bool {
|
||||||
|
return other == fs.ErrNotExist
|
||||||
|
}
|
||||||
|
|
||||||
|
func (err FileDoesNotExistError) Error() string {
|
||||||
|
return fmt.Sprintf("file does not exist: %s", err.path)
|
||||||
|
}
|
||||||
|
|
||||||
type NoSuchDirectoryError struct {
|
type NoSuchDirectoryError struct {
|
||||||
path string
|
path string
|
||||||
}
|
}
|
||||||
|
@ -46,6 +46,10 @@ func (err NoSuchDirectoryError) Error() string {
|
||||||
return fmt.Sprintf("no such directory: %s", err.path)
|
return fmt.Sprintf("no such directory: %s", err.path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (err NoSuchDirectoryError) Is(other error) bool {
|
||||||
|
return other == fs.ErrNotExist
|
||||||
|
}
|
||||||
|
|
||||||
// Filer is used to access files in a workspace.
|
// Filer is used to access files in a workspace.
|
||||||
// It has implementations for accessing files in WSFS and in DBFS.
|
// It has implementations for accessing files in WSFS and in DBFS.
|
||||||
type Filer interface {
|
type Filer interface {
|
||||||
|
@ -60,7 +64,7 @@ type Filer interface {
|
||||||
Delete(ctx context.Context, path string) error
|
Delete(ctx context.Context, path string) error
|
||||||
|
|
||||||
// Return contents of directory at `path`.
|
// Return contents of directory at `path`.
|
||||||
ReadDir(ctx context.Context, path string) ([]FileInfo, error)
|
ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error)
|
||||||
|
|
||||||
// Creates directory at `path`, creating any intermediate directories as required.
|
// Creates directory at `path`, creating any intermediate directories as required.
|
||||||
Mkdir(ctx context.Context, path string) error
|
Mkdir(ctx context.Context, path string) error
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/fs"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
|
@ -20,6 +21,53 @@ import (
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Type that implements fs.DirEntry for WSFS.
|
||||||
|
type wsfsDirEntry struct {
|
||||||
|
wsfsFileInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (entry wsfsDirEntry) Type() fs.FileMode {
|
||||||
|
return entry.wsfsFileInfo.Mode()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (entry wsfsDirEntry) Info() (fs.FileInfo, error) {
|
||||||
|
return entry.wsfsFileInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type that implements fs.FileInfo for WSFS.
|
||||||
|
type wsfsFileInfo struct {
|
||||||
|
oi workspace.ObjectInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info wsfsFileInfo) Name() string {
|
||||||
|
return path.Base(info.oi.Path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info wsfsFileInfo) Size() int64 {
|
||||||
|
return info.oi.Size
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info wsfsFileInfo) Mode() fs.FileMode {
|
||||||
|
switch info.oi.ObjectType {
|
||||||
|
case workspace.ObjectTypeDirectory:
|
||||||
|
return fs.ModeDir
|
||||||
|
default:
|
||||||
|
return fs.ModePerm
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info wsfsFileInfo) ModTime() time.Time {
|
||||||
|
return time.UnixMilli(info.oi.ModifiedAt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info wsfsFileInfo) IsDir() bool {
|
||||||
|
return info.oi.ObjectType == workspace.ObjectTypeDirectory
|
||||||
|
}
|
||||||
|
|
||||||
|
func (info wsfsFileInfo) Sys() any {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// WorkspaceFilesClient implements the files-in-workspace API.
|
// WorkspaceFilesClient implements the files-in-workspace API.
|
||||||
|
|
||||||
// NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled.
|
// NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled.
|
||||||
|
@ -68,7 +116,12 @@ func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io
|
||||||
|
|
||||||
err = w.apiClient.Do(ctx, http.MethodPost, urlPath, body, nil)
|
err = w.apiClient.Do(ctx, http.MethodPost, urlPath, body, nil)
|
||||||
|
|
||||||
// If we got an API error we deal with it below.
|
// Return early on success.
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Special handling of this error only if it is an API error.
|
||||||
var aerr *apierr.APIError
|
var aerr *apierr.APIError
|
||||||
if !errors.As(err, &aerr) {
|
if !errors.As(err, &aerr) {
|
||||||
return err
|
return err
|
||||||
|
@ -112,11 +165,23 @@ func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.Reader
|
||||||
|
|
||||||
var res []byte
|
var res []byte
|
||||||
err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &res)
|
err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &res)
|
||||||
if err != nil {
|
|
||||||
|
// Return early on success.
|
||||||
|
if err == nil {
|
||||||
|
return bytes.NewReader(res), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Special handling of this error only if it is an API error.
|
||||||
|
var aerr *apierr.APIError
|
||||||
|
if !errors.As(err, &aerr) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return bytes.NewReader(res), nil
|
if aerr.StatusCode == http.StatusNotFound {
|
||||||
|
return nil, FileDoesNotExistError{absPath}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error {
|
func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error {
|
||||||
|
@ -125,13 +190,30 @@ func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{
|
err = w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{
|
||||||
Path: absPath,
|
Path: absPath,
|
||||||
Recursive: false,
|
Recursive: false,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Return early on success.
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]FileInfo, error) {
|
// Special handling of this error only if it is an API error.
|
||||||
|
var aerr *apierr.APIError
|
||||||
|
if !errors.As(err, &aerr) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if aerr.StatusCode == http.StatusNotFound {
|
||||||
|
return FileDoesNotExistError{absPath}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
||||||
absPath, err := w.root.Join(name)
|
absPath, err := w.root.Join(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -155,18 +237,13 @@ func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]File
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
info := make([]FileInfo, len(objects))
|
info := make([]fs.DirEntry, len(objects))
|
||||||
for i, v := range objects {
|
for i, v := range objects {
|
||||||
info[i] = FileInfo{
|
info[i] = wsfsDirEntry{wsfsFileInfo{oi: v}}
|
||||||
Type: string(v.ObjectType),
|
|
||||||
Name: path.Base(v.Path),
|
|
||||||
Size: v.Size,
|
|
||||||
ModTime: time.UnixMilli(v.ModifiedAt),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort by name for parity with os.ReadDir.
|
// Sort by name for parity with os.ReadDir.
|
||||||
sort.Slice(info, func(i, j int) bool { return info[i].Name < info[j].Name })
|
sort.Slice(info, func(i, j int) bool { return info[i].Name() < info[j].Name() })
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue