mirror of https://github.com/databricks/cli.git
Add filer.Filer implementation backed by the Files API (#474)
## Tests New integration test for the read/write parts of the other filers. The integration test cannot be shared just yet because the Files API doesn't include support for creating/listing/removing directories yet.
This commit is contained in:
parent
5d036ab6b8
commit
e19eaca4d1
|
@ -28,6 +28,11 @@ func filerForPath(ctx context.Context, fullPath string) (filer.Filer, string, er
|
|||
switch scheme {
|
||||
case DbfsScheme:
|
||||
w := root.WorkspaceClient(ctx)
|
||||
// If the specified path has the "Volumes" prefix, use the Files API.
|
||||
if strings.HasPrefix(path, "Volumes/") {
|
||||
f, err := filer.NewFilesClient(w, "/")
|
||||
return f, path, err
|
||||
}
|
||||
f, err := filer.NewDbfsClient(w, "/")
|
||||
return f, path, err
|
||||
|
||||
|
|
|
@ -82,7 +82,7 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
// Stat on a directory should succeed.
|
||||
// Note: size and modification time behave differently between WSFS and DBFS.
|
||||
// Note: size and modification time behave differently between backends.
|
||||
info, err := f.Stat(ctx, "/foo")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "foo", info.Name())
|
||||
|
@ -90,7 +90,7 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
|
|||
assert.Equal(t, true, info.IsDir())
|
||||
|
||||
// Stat on a file should succeed.
|
||||
// Note: size and modification time behave differently between WSFS and DBFS.
|
||||
// Note: size and modification time behave differently between backends.
|
||||
info, err = f.Stat(ctx, "/foo/bar")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "bar", info.Name())
|
||||
|
@ -465,3 +465,123 @@ func TestAccFilerLocalReadDir(t *testing.T) {
|
|||
ctx, f := setupFilerLocalTest(t)
|
||||
runFilerReadDirTest(t, ctx, f)
|
||||
}
|
||||
|
||||
func temporaryVolumeDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
// Assume this test is run against the internal testing workspace.
|
||||
path := RandomName("/Volumes/bogdanghita/default/v3_shared/cli-testing/integration-test-filer-")
|
||||
|
||||
// The Files API doesn't include support for creating and removing directories yet.
|
||||
// Directories are created implicitly by writing a file to a path that doesn't exist.
|
||||
// We therefore assume we can use the specified path without creating it first.
|
||||
t.Logf("using dbfs:%s", path)
|
||||
|
||||
return path
|
||||
}
|
||||
|
||||
func setupFilerFilesApiTest(t *testing.T) (context.Context, filer.Filer) {
|
||||
t.SkipNow() // until available on prod
|
||||
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||
|
||||
ctx := context.Background()
|
||||
w := databricks.Must(databricks.NewWorkspaceClient())
|
||||
tmpdir := temporaryVolumeDir(t, w)
|
||||
f, err := filer.NewFilesClient(w, tmpdir)
|
||||
require.NoError(t, err)
|
||||
return ctx, f
|
||||
}
|
||||
|
||||
func TestAccFilerFilesApiReadWrite(t *testing.T) {
|
||||
ctx, f := setupFilerFilesApiTest(t)
|
||||
|
||||
// The Files API doesn't know about directories yet.
|
||||
// Below is a copy of [runFilerReadWriteTest] with
|
||||
// assertions that don't work commented out.
|
||||
|
||||
var err error
|
||||
|
||||
// Write should fail because the root path doesn't yet exist.
|
||||
// err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`))
|
||||
// assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
|
||||
// assert.True(t, errors.Is(err, fs.ErrNotExist))
|
||||
|
||||
// Read should fail because the root path doesn't yet exist.
|
||||
_, err = f.Read(ctx, "/foo/bar")
|
||||
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
|
||||
assert.True(t, errors.Is(err, fs.ErrNotExist))
|
||||
|
||||
// Read should fail because the path points to a directory
|
||||
// err = f.Mkdir(ctx, "/dir")
|
||||
// require.NoError(t, err)
|
||||
// _, err = f.Read(ctx, "/dir")
|
||||
// assert.ErrorIs(t, err, fs.ErrInvalid)
|
||||
|
||||
// Write with CreateParentDirectories flag should succeed.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories)
|
||||
assert.NoError(t, err)
|
||||
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`)
|
||||
|
||||
// Write should fail because there is an existing file at the specified path.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`))
|
||||
assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{}))
|
||||
assert.True(t, errors.Is(err, fs.ErrExist))
|
||||
|
||||
// Write with OverwriteIfExists should succeed.
|
||||
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists)
|
||||
assert.NoError(t, err)
|
||||
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`)
|
||||
|
||||
// Write should succeed if there is no existing file at the specified path.
|
||||
err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`))
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Stat on a directory should succeed.
|
||||
// Note: size and modification time behave differently between backends.
|
||||
info, err := f.Stat(ctx, "/foo")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "foo", info.Name())
|
||||
assert.True(t, info.Mode().IsDir())
|
||||
assert.Equal(t, true, info.IsDir())
|
||||
|
||||
// Stat on a file should succeed.
|
||||
// Note: size and modification time behave differently between backends.
|
||||
info, err = f.Stat(ctx, "/foo/bar")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "bar", info.Name())
|
||||
assert.True(t, info.Mode().IsRegular())
|
||||
assert.Equal(t, false, info.IsDir())
|
||||
|
||||
// Delete should fail if the file doesn't exist.
|
||||
err = f.Delete(ctx, "/doesnt_exist")
|
||||
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
|
||||
assert.True(t, errors.Is(err, fs.ErrNotExist))
|
||||
|
||||
// Stat should fail if the file doesn't exist.
|
||||
_, err = f.Stat(ctx, "/doesnt_exist")
|
||||
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
|
||||
assert.True(t, errors.Is(err, fs.ErrNotExist))
|
||||
|
||||
// Delete should succeed for file that does exist.
|
||||
err = f.Delete(ctx, "/foo/bar")
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Delete should fail for a non-empty directory.
|
||||
err = f.Delete(ctx, "/foo")
|
||||
assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{}))
|
||||
assert.True(t, errors.Is(err, fs.ErrInvalid))
|
||||
|
||||
// Delete should succeed for a non-empty directory if the DeleteRecursively flag is set.
|
||||
// err = f.Delete(ctx, "/foo", filer.DeleteRecursively)
|
||||
// assert.NoError(t, err)
|
||||
|
||||
// Delete of the filer root should ALWAYS fail, otherwise subsequent writes would fail.
|
||||
// It is not in the filer's purview to delete its root directory.
|
||||
err = f.Delete(ctx, "/")
|
||||
assert.True(t, errors.As(err, &filer.CannotDeleteRootError{}))
|
||||
assert.True(t, errors.Is(err, fs.ErrInvalid))
|
||||
}
|
||||
|
||||
func TestAccFilerFilesApiReadDir(t *testing.T) {
|
||||
t.Skipf("no support for ReadDir yet")
|
||||
ctx, f := setupFilerFilesApiTest(t)
|
||||
runFilerReadDirTest(t, ctx, f)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,241 @@
|
|||
package filer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/client"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
// Type that implements fs.FileInfo for the Files API.
|
||||
type filesApiFileInfo struct {
|
||||
absPath string
|
||||
isDir bool
|
||||
}
|
||||
|
||||
func (info filesApiFileInfo) Name() string {
|
||||
return path.Base(info.absPath)
|
||||
}
|
||||
|
||||
func (info filesApiFileInfo) Size() int64 {
|
||||
// No way to get the file size in the Files API.
|
||||
return 0
|
||||
}
|
||||
|
||||
func (info filesApiFileInfo) Mode() fs.FileMode {
|
||||
mode := fs.ModePerm
|
||||
if info.isDir {
|
||||
mode |= fs.ModeDir
|
||||
}
|
||||
return mode
|
||||
}
|
||||
|
||||
func (info filesApiFileInfo) ModTime() time.Time {
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
func (info filesApiFileInfo) IsDir() bool {
|
||||
return info.isDir
|
||||
}
|
||||
|
||||
func (info filesApiFileInfo) Sys() any {
|
||||
return nil
|
||||
}
|
||||
|
||||
// FilesClient implements the [Filer] interface for the Files API backend.
|
||||
type FilesClient struct {
|
||||
workspaceClient *databricks.WorkspaceClient
|
||||
apiClient *client.DatabricksClient
|
||||
|
||||
// File operations will be relative to this path.
|
||||
root RootPath
|
||||
}
|
||||
|
||||
func filesNotImplementedError(fn string) error {
|
||||
return fmt.Errorf("filer.%s is not implemented for the Files API", fn)
|
||||
}
|
||||
|
||||
func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
|
||||
apiClient, err := client.New(w.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &FilesClient{
|
||||
workspaceClient: w,
|
||||
apiClient: apiClient,
|
||||
|
||||
root: NewRootPath(root),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (w *FilesClient) urlPath(name string) (string, string, error) {
|
||||
absPath, err := w.root.Join(name)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
// The user specified part of the path must be escaped.
|
||||
urlPath := fmt.Sprintf(
|
||||
"/api/2.0/fs/files/%s",
|
||||
url.PathEscape(strings.TrimLeft(absPath, "/")),
|
||||
)
|
||||
|
||||
return absPath, urlPath, nil
|
||||
}
|
||||
|
||||
func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
|
||||
absPath, urlPath, err := w.urlPath(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
overwrite := slices.Contains(mode, OverwriteIfExists)
|
||||
urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite)
|
||||
err = w.apiClient.Do(ctx, http.MethodPut, urlPath, reader, nil,
|
||||
func(r *http.Request) error {
|
||||
r.Header.Set("Content-Type", "application/octet-stream")
|
||||
return nil
|
||||
})
|
||||
|
||||
// Return early on success.
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Special handling of this error only if it is an API error.
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return err
|
||||
}
|
||||
|
||||
// This API returns 409 if the file already exists, when the object type is file
|
||||
if aerr.StatusCode == http.StatusConflict {
|
||||
return FileAlreadyExistsError{absPath}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *FilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {
|
||||
absPath, urlPath, err := w.urlPath(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &buf)
|
||||
|
||||
// Return early on success.
|
||||
if err == nil {
|
||||
return io.NopCloser(&buf), nil
|
||||
}
|
||||
|
||||
// Special handling of this error only if it is an API error.
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// This API returns a 404 if the specified path does not exist.
|
||||
if aerr.StatusCode == http.StatusNotFound {
|
||||
return nil, FileDoesNotExistError{absPath}
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
|
||||
absPath, urlPath, err := w.urlPath(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Illegal to delete the root path.
|
||||
if absPath == w.root.rootPath {
|
||||
return CannotDeleteRootError{}
|
||||
}
|
||||
|
||||
err = w.apiClient.Do(ctx, http.MethodDelete, urlPath, nil, nil)
|
||||
|
||||
// Return early on success.
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Special handling of this error only if it is an API error.
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return err
|
||||
}
|
||||
|
||||
// This API returns a 404 if the specified path does not exist.
|
||||
if aerr.StatusCode == http.StatusNotFound {
|
||||
return FileDoesNotExistError{absPath}
|
||||
}
|
||||
|
||||
// This API returns 409 if the underlying path is a directory.
|
||||
if aerr.StatusCode == http.StatusConflict {
|
||||
return DirectoryNotEmptyError{absPath}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
||||
return nil, filesNotImplementedError("ReadDir")
|
||||
}
|
||||
|
||||
func (w *FilesClient) Mkdir(ctx context.Context, name string) error {
|
||||
// Directories are created implicitly.
|
||||
// No need to do anything.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
|
||||
absPath, urlPath, err := w.urlPath(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = w.apiClient.Do(ctx, http.MethodHead, urlPath, nil, nil,
|
||||
func(r *http.Request) error {
|
||||
r.Header.Del("Content-Type")
|
||||
return nil
|
||||
})
|
||||
|
||||
// If the HEAD requests succeeds, the file exists.
|
||||
if err == nil {
|
||||
return filesApiFileInfo{absPath: absPath, isDir: false}, nil
|
||||
}
|
||||
|
||||
// Special handling of this error only if it is an API error.
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// This API returns a 404 if the specified path does not exist.
|
||||
if aerr.StatusCode == http.StatusNotFound {
|
||||
return nil, FileDoesNotExistError{absPath}
|
||||
}
|
||||
|
||||
// This API returns 409 if the underlying path is a directory.
|
||||
if aerr.StatusCode == http.StatusConflict {
|
||||
return filesApiFileInfo{absPath: absPath, isDir: true}, nil
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
Loading…
Reference in New Issue