mirror of https://github.com/databricks/cli.git
Implement DBFS filer (#139)
Adds a DBFS implementation of the `filer.Filer` interface. The integration tests are reused between the workspace filesystem and DBFS implementations to ensure identical behavior.
This commit is contained in:
parent
92cb52041d
commit
27df4e765c
|
@ -1,6 +1,7 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -12,6 +13,7 @@ import (
|
|||
"github.com/databricks/cli/libs/filer"
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/service/files"
|
||||
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -28,12 +30,98 @@ func (f filerTest) assertContents(ctx context.Context, name string, contents str
|
|||
return
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(reader)
|
||||
var body bytes.Buffer
|
||||
_, err = io.Copy(&body, reader)
|
||||
if !assert.NoError(f, err) {
|
||||
return
|
||||
}
|
||||
|
||||
assert.Equal(f, contents, string(body))
|
||||
assert.Equal(f, contents, body.String())
|
||||
}
|
||||
|
||||
func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
|
||||
var err error
|
||||
|
||||
// Write should fail because the root path doesn't yet exist.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`))
|
||||
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
|
||||
|
||||
// Read should fail because the root path doesn't yet exist.
|
||||
_, err = f.Read(ctx, "/foo/bar")
|
||||
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
|
||||
|
||||
// Write with CreateParentDirectories flag should succeed.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories)
|
||||
assert.NoError(t, err)
|
||||
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`)
|
||||
|
||||
// Write should fail because there is an existing file at the specified path.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`))
|
||||
assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{}))
|
||||
|
||||
// Write with OverwriteIfExists should succeed.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists)
|
||||
assert.NoError(t, err)
|
||||
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`)
|
||||
|
||||
// Delete should fail if the file doesn't exist.
|
||||
err = f.Delete(ctx, "/doesnt_exist")
|
||||
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
|
||||
|
||||
// Delete should succeed for file that does exist.
|
||||
err = f.Delete(ctx, "/foo/bar")
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) {
|
||||
var err error
|
||||
|
||||
// We start with an empty directory.
|
||||
entries, err := f.ReadDir(ctx, ".")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, entries, 0)
|
||||
|
||||
// Write a file.
|
||||
err = f.Write(ctx, "/hello.txt", strings.NewReader(`hello world`))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a directory.
|
||||
err = f.Mkdir(ctx, "/dir")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write a file.
|
||||
err = f.Write(ctx, "/dir/world.txt", strings.NewReader(`hello world`))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a nested directory (check that it creates intermediate directories).
|
||||
err = f.Mkdir(ctx, "/dir/a/b/c")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Expect an error if the path doesn't exist.
|
||||
_, err = f.ReadDir(ctx, "/dir/a/b/c/d/e")
|
||||
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}), err)
|
||||
|
||||
// Expect two entries in the root.
|
||||
entries, err = f.ReadDir(ctx, ".")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, entries, 2)
|
||||
assert.Equal(t, "dir", entries[0].Name)
|
||||
assert.Equal(t, "hello.txt", entries[1].Name)
|
||||
assert.Greater(t, entries[1].ModTime.Unix(), int64(0))
|
||||
|
||||
// Expect two entries in the directory.
|
||||
entries, err = f.ReadDir(ctx, "/dir")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, entries, 2)
|
||||
assert.Equal(t, "a", entries[0].Name)
|
||||
assert.Equal(t, "world.txt", entries[1].Name)
|
||||
assert.Greater(t, entries[1].ModTime.Unix(), int64(0))
|
||||
|
||||
// Expect a single entry in the nested path.
|
||||
entries, err = f.ReadDir(ctx, "/dir/a/b")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, entries, 1)
|
||||
assert.Equal(t, "c", entries[0].Name)
|
||||
}
|
||||
|
||||
func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
|
@ -41,7 +129,7 @@ func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
|||
me, err := w.CurrentUser.Me(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("wsfs-files-"))
|
||||
path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-filer-wsfs-"))
|
||||
|
||||
// Ensure directory exists, but doesn't exist YET!
|
||||
// Otherwise we could inadvertently remove a directory that already exists on cleanup.
|
||||
|
@ -59,7 +147,7 @@ func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
|||
if err == nil || apierr.IsMissing(err) {
|
||||
return
|
||||
}
|
||||
t.Logf("unable to remove temporary workspace path %s: %#v", path, err)
|
||||
t.Logf("unable to remove temporary workspace directory %s: %#v", path, err)
|
||||
})
|
||||
|
||||
return path
|
||||
|
@ -85,90 +173,57 @@ func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) {
|
|||
}
|
||||
|
||||
func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) {
|
||||
var err error
|
||||
|
||||
ctx, f := setupWorkspaceFilesTest(t)
|
||||
|
||||
// Write should fail because the root path doesn't yet exist.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`))
|
||||
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
|
||||
|
||||
// Read should fail because the root path doesn't yet exist.
|
||||
_, err = f.Read(ctx, "/foo/bar")
|
||||
assert.True(t, apierr.IsMissing(err))
|
||||
|
||||
// Write with CreateParentDirectories flag should succeed.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories)
|
||||
assert.NoError(t, err)
|
||||
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`)
|
||||
|
||||
// Write should fail because there is an existing file at the specified path.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`))
|
||||
assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{}))
|
||||
|
||||
// Write with OverwriteIfExists should succeed.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists)
|
||||
assert.NoError(t, err)
|
||||
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`)
|
||||
|
||||
// Delete should fail if the file doesn't exist.
|
||||
err = f.Delete(ctx, "/doesnt_exist")
|
||||
assert.True(t, apierr.IsMissing(err))
|
||||
|
||||
// Delete should succeed for file that does exist.
|
||||
err = f.Delete(ctx, "/foo/bar")
|
||||
assert.NoError(t, err)
|
||||
runFilerReadWriteTest(t, ctx, f)
|
||||
}
|
||||
|
||||
func TestAccFilerWorkspaceFilesReadDir(t *testing.T) {
|
||||
var err error
|
||||
|
||||
ctx, f := setupWorkspaceFilesTest(t)
|
||||
|
||||
// We start with an empty directory.
|
||||
entries, err := f.ReadDir(ctx, ".")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, entries, 0)
|
||||
|
||||
// Write a file.
|
||||
err = f.Write(ctx, "/hello.txt", strings.NewReader(`hello world`))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a directory.
|
||||
err = f.Mkdir(ctx, "/dir")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write a file.
|
||||
err = f.Write(ctx, "/dir/world.txt", strings.NewReader(`hello world`))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a nested directory (check that it creates intermediate directories).
|
||||
err = f.Mkdir(ctx, "/dir/a/b/c")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Expect an error if the path doesn't exist.
|
||||
_, err = f.ReadDir(ctx, "/dir/a/b/c/d/e")
|
||||
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
|
||||
|
||||
// Expect two entries in the root.
|
||||
entries, err = f.ReadDir(ctx, ".")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, entries, 2)
|
||||
assert.Equal(t, "dir", entries[0].Name)
|
||||
assert.Equal(t, "hello.txt", entries[1].Name)
|
||||
assert.Greater(t, entries[1].ModTime.Unix(), int64(0))
|
||||
|
||||
// Expect two entries in the directory.
|
||||
entries, err = f.ReadDir(ctx, "/dir")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, entries, 2)
|
||||
assert.Equal(t, "a", entries[0].Name)
|
||||
assert.Equal(t, "world.txt", entries[1].Name)
|
||||
assert.Greater(t, entries[1].ModTime.Unix(), int64(0))
|
||||
|
||||
// Expect a single entry in the nested path.
|
||||
entries, err = f.ReadDir(ctx, "/dir/a/b")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, entries, 1)
|
||||
assert.Equal(t, "c", entries[0].Name)
|
||||
runFilerReadDirTest(t, ctx, f)
|
||||
}
|
||||
|
||||
func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
ctx := context.Background()
|
||||
path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-filer-dbfs-"))
|
||||
|
||||
// This call fails if the path already exists.
|
||||
t.Logf("mkdir dbfs:%s", path)
|
||||
err := w.Dbfs.MkdirsByPath(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Remove test directory on test completion.
|
||||
t.Cleanup(func() {
|
||||
t.Logf("rm -rf dbfs:%s", path)
|
||||
err := w.Dbfs.Delete(ctx, files.Delete{
|
||||
Path: path,
|
||||
Recursive: true,
|
||||
})
|
||||
if err == nil || apierr.IsMissing(err) {
|
||||
return
|
||||
}
|
||||
t.Logf("unable to remove temporary dbfs directory %s: %#v", path, err)
|
||||
})
|
||||
|
||||
return path
|
||||
}
|
||||
|
||||
func setupFilerDbfsTest(t *testing.T) (context.Context, filer.Filer) {
|
||||
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||
|
||||
ctx := context.Background()
|
||||
w := databricks.Must(databricks.NewWorkspaceClient())
|
||||
tmpdir := temporaryDbfsDir(t, w)
|
||||
f, err := filer.NewDbfsClient(w, tmpdir)
|
||||
require.NoError(t, err)
|
||||
return ctx, f
|
||||
}
|
||||
|
||||
func TestAccFilerDbfsReadWrite(t *testing.T) {
|
||||
ctx, f := setupFilerDbfsTest(t)
|
||||
runFilerReadWriteTest(t, ctx, f)
|
||||
}
|
||||
|
||||
func TestAccFilerDbfsReadDir(t *testing.T) {
|
||||
ctx, f := setupFilerDbfsTest(t)
|
||||
runFilerReadDirTest(t, ctx, f)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,199 @@
|
|||
package filer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/service/files"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
// DbfsClient implements the [Filer] interface for the DBFS backend.
|
||||
type DbfsClient struct {
|
||||
workspaceClient *databricks.WorkspaceClient
|
||||
|
||||
// File operations will be relative to this path.
|
||||
root RootPath
|
||||
}
|
||||
|
||||
func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
|
||||
return &DbfsClient{
|
||||
workspaceClient: w,
|
||||
|
||||
root: NewRootPath(root),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
|
||||
absPath, err := w.root.Join(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fileMode := files.FileModeWrite
|
||||
if slices.Contains(mode, OverwriteIfExists) {
|
||||
fileMode |= files.FileModeOverwrite
|
||||
}
|
||||
|
||||
// Issue info call before write because it automatically creates parent directories.
|
||||
//
|
||||
// For discussion: we could decide this is actually convenient, remove the call below,
|
||||
// and apply the same semantics for the WSFS filer.
|
||||
//
|
||||
if !slices.Contains(mode, CreateParentDirectories) {
|
||||
_, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, path.Dir(absPath))
|
||||
if err != nil {
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return err
|
||||
}
|
||||
|
||||
// This API returns a 404 if the file doesn't exist.
|
||||
if aerr.StatusCode == http.StatusNotFound {
|
||||
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||
return NoSuchDirectoryError{path.Dir(absPath)}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, fileMode)
|
||||
if err != nil {
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return err
|
||||
}
|
||||
|
||||
// This API returns a 400 if the file already exists.
|
||||
if aerr.StatusCode == http.StatusBadRequest {
|
||||
if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" {
|
||||
return FileAlreadyExistsError{absPath}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = io.Copy(handle, reader)
|
||||
cerr := handle.Close()
|
||||
if err == nil {
|
||||
err = cerr
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) {
|
||||
absPath, err := w.root.Join(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead)
|
||||
if err != nil {
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// This API returns a 404 if the file doesn't exist.
|
||||
if aerr.StatusCode == http.StatusNotFound {
|
||||
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||
return nil, FileDoesNotExistError{absPath}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return handle, nil
|
||||
}
|
||||
|
||||
func (w *DbfsClient) Delete(ctx context.Context, name string) error {
|
||||
absPath, err := w.root.Join(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Issue info call before delete because delete succeeds if the specified path doesn't exist.
|
||||
//
|
||||
// For discussion: we could decide this is actually convenient, remove the call below,
|
||||
// and apply the same semantics for the WSFS filer.
|
||||
//
|
||||
_, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, absPath)
|
||||
if err != nil {
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return err
|
||||
}
|
||||
|
||||
// This API returns a 404 if the file doesn't exist.
|
||||
if aerr.StatusCode == http.StatusNotFound {
|
||||
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||
return FileDoesNotExistError{absPath}
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return w.workspaceClient.Dbfs.Delete(ctx, files.Delete{
|
||||
Path: absPath,
|
||||
Recursive: false,
|
||||
})
|
||||
}
|
||||
|
||||
func (w *DbfsClient) ReadDir(ctx context.Context, name string) ([]FileInfo, error) {
|
||||
absPath, err := w.root.Join(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := w.workspaceClient.Dbfs.ListByPath(ctx, absPath)
|
||||
if err != nil {
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// This API returns a 404 if the file doesn't exist.
|
||||
if aerr.StatusCode == http.StatusNotFound {
|
||||
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
|
||||
return nil, NoSuchDirectoryError{absPath}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info := make([]FileInfo, len(res.Files))
|
||||
for i, v := range res.Files {
|
||||
info[i] = FileInfo{
|
||||
Name: path.Base(v.Path),
|
||||
Size: v.FileSize,
|
||||
ModTime: time.UnixMilli(v.ModificationTime),
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by name for parity with os.ReadDir.
|
||||
sort.Slice(info, func(i, j int) bool { return info[i].Name < info[j].Name })
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (w *DbfsClient) Mkdir(ctx context.Context, name string) error {
|
||||
dirPath, err := w.root.Join(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return w.workspaceClient.Dbfs.MkdirsByPath(ctx, dirPath)
|
||||
}
|
|
@ -38,6 +38,14 @@ func (err FileAlreadyExistsError) Error() string {
|
|||
return fmt.Sprintf("file already exists: %s", err.path)
|
||||
}
|
||||
|
||||
type FileDoesNotExistError struct {
|
||||
path string
|
||||
}
|
||||
|
||||
func (err FileDoesNotExistError) Error() string {
|
||||
return fmt.Sprintf("file does not exist: %s", err.path)
|
||||
}
|
||||
|
||||
type NoSuchDirectoryError struct {
|
||||
path string
|
||||
}
|
||||
|
|
|
@ -68,7 +68,12 @@ func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io
|
|||
|
||||
err = w.apiClient.Do(ctx, http.MethodPost, urlPath, body, nil)
|
||||
|
||||
// If we got an API error we deal with it below.
|
||||
// Return early on success.
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Special handling of this error only if it is an API error.
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return err
|
||||
|
@ -112,11 +117,23 @@ func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.Reader
|
|||
|
||||
var res []byte
|
||||
err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &res)
|
||||
if err != nil {
|
||||
|
||||
// Return early on success.
|
||||
if err == nil {
|
||||
return bytes.NewReader(res), nil
|
||||
}
|
||||
|
||||
// Special handling of this error only if it is an API error.
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return bytes.NewReader(res), nil
|
||||
if aerr.StatusCode == http.StatusNotFound {
|
||||
return nil, FileDoesNotExistError{absPath}
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error {
|
||||
|
@ -125,10 +142,27 @@ func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
return w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{
|
||||
err = w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{
|
||||
Path: absPath,
|
||||
Recursive: false,
|
||||
})
|
||||
|
||||
// Return early on success.
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Special handling of this error only if it is an API error.
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return err
|
||||
}
|
||||
|
||||
if aerr.StatusCode == http.StatusNotFound {
|
||||
return FileDoesNotExistError{absPath}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]FileInfo, error) {
|
||||
|
|
Loading…
Reference in New Issue