From e19eaca4d19034181cac3e8baf3b4131621fa6d0 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 19 Jun 2023 20:29:13 +0200 Subject: [PATCH] Add filer.Filer implementation backed by the Files API (#474) ## Tests New integration test for the read/write parts of the other filers. The integration test cannot be shared just yet because the Files API doesn't include support for creating/listing/removing directories yet. --- cmd/fs/helpers.go | 5 + internal/filer_test.go | 124 ++++++++++++++++++- libs/filer/files_client.go | 241 +++++++++++++++++++++++++++++++++++++ 3 files changed, 368 insertions(+), 2 deletions(-) create mode 100644 libs/filer/files_client.go diff --git a/cmd/fs/helpers.go b/cmd/fs/helpers.go index 72193d9f..aecee9e0 100644 --- a/cmd/fs/helpers.go +++ b/cmd/fs/helpers.go @@ -28,6 +28,11 @@ func filerForPath(ctx context.Context, fullPath string) (filer.Filer, string, er switch scheme { case DbfsScheme: w := root.WorkspaceClient(ctx) + // If the specified path has the "Volumes" prefix, use the Files API. + if strings.HasPrefix(path, "Volumes/") { + f, err := filer.NewFilesClient(w, "/") + return f, path, err + } f, err := filer.NewDbfsClient(w, "/") return f, path, err diff --git a/internal/filer_test.go b/internal/filer_test.go index bc005feb..0e126abc 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -82,7 +82,7 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.NoError(t, err) // Stat on a directory should succeed. - // Note: size and modification time behave differently between WSFS and DBFS. + // Note: size and modification time behave differently between backends. info, err := f.Stat(ctx, "/foo") require.NoError(t, err) assert.Equal(t, "foo", info.Name()) @@ -90,7 +90,7 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.Equal(t, true, info.IsDir()) // Stat on a file should succeed. - // Note: size and modification time behave differently between WSFS and DBFS. + // Note: size and modification time behave differently between backends. info, err = f.Stat(ctx, "/foo/bar") require.NoError(t, err) assert.Equal(t, "bar", info.Name()) @@ -465,3 +465,123 @@ func TestAccFilerLocalReadDir(t *testing.T) { ctx, f := setupFilerLocalTest(t) runFilerReadDirTest(t, ctx, f) } + +func temporaryVolumeDir(t *testing.T, w *databricks.WorkspaceClient) string { + // Assume this test is run against the internal testing workspace. + path := RandomName("/Volumes/bogdanghita/default/v3_shared/cli-testing/integration-test-filer-") + + // The Files API doesn't include support for creating and removing directories yet. + // Directories are created implicitly by writing a file to a path that doesn't exist. + // We therefore assume we can use the specified path without creating it first. + t.Logf("using dbfs:%s", path) + + return path +} + +func setupFilerFilesApiTest(t *testing.T) (context.Context, filer.Filer) { + t.SkipNow() // until available on prod + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + tmpdir := temporaryVolumeDir(t, w) + f, err := filer.NewFilesClient(w, tmpdir) + require.NoError(t, err) + return ctx, f +} + +func TestAccFilerFilesApiReadWrite(t *testing.T) { + ctx, f := setupFilerFilesApiTest(t) + + // The Files API doesn't know about directories yet. + // Below is a copy of [runFilerReadWriteTest] with + // assertions that don't work commented out. + + var err error + + // Write should fail because the root path doesn't yet exist. + // err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) + // assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) + // assert.True(t, errors.Is(err, fs.ErrNotExist)) + + // Read should fail because the root path doesn't yet exist. + _, err = f.Read(ctx, "/foo/bar") + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) + assert.True(t, errors.Is(err, fs.ErrNotExist)) + + // Read should fail because the path points to a directory + // err = f.Mkdir(ctx, "/dir") + // require.NoError(t, err) + // _, err = f.Read(ctx, "/dir") + // assert.ErrorIs(t, err, fs.ErrInvalid) + + // Write with CreateParentDirectories flag should succeed. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) + assert.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`) + + // Write should fail because there is an existing file at the specified path. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`)) + assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{})) + assert.True(t, errors.Is(err, fs.ErrExist)) + + // Write with OverwriteIfExists should succeed. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists) + assert.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`) + + // Write should succeed if there is no existing file at the specified path. + err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`)) + assert.NoError(t, err) + + // Stat on a directory should succeed. + // Note: size and modification time behave differently between backends. + info, err := f.Stat(ctx, "/foo") + require.NoError(t, err) + assert.Equal(t, "foo", info.Name()) + assert.True(t, info.Mode().IsDir()) + assert.Equal(t, true, info.IsDir()) + + // Stat on a file should succeed. + // Note: size and modification time behave differently between backends. + info, err = f.Stat(ctx, "/foo/bar") + require.NoError(t, err) + assert.Equal(t, "bar", info.Name()) + assert.True(t, info.Mode().IsRegular()) + assert.Equal(t, false, info.IsDir()) + + // Delete should fail if the file doesn't exist. + err = f.Delete(ctx, "/doesnt_exist") + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) + assert.True(t, errors.Is(err, fs.ErrNotExist)) + + // Stat should fail if the file doesn't exist. + _, err = f.Stat(ctx, "/doesnt_exist") + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) + assert.True(t, errors.Is(err, fs.ErrNotExist)) + + // Delete should succeed for file that does exist. + err = f.Delete(ctx, "/foo/bar") + assert.NoError(t, err) + + // Delete should fail for a non-empty directory. + err = f.Delete(ctx, "/foo") + assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{})) + assert.True(t, errors.Is(err, fs.ErrInvalid)) + + // Delete should succeed for a non-empty directory if the DeleteRecursively flag is set. + // err = f.Delete(ctx, "/foo", filer.DeleteRecursively) + // assert.NoError(t, err) + + // Delete of the filer root should ALWAYS fail, otherwise subsequent writes would fail. + // It is not in the filer's purview to delete its root directory. + err = f.Delete(ctx, "/") + assert.True(t, errors.As(err, &filer.CannotDeleteRootError{})) + assert.True(t, errors.Is(err, fs.ErrInvalid)) +} + +func TestAccFilerFilesApiReadDir(t *testing.T) { + t.Skipf("no support for ReadDir yet") + ctx, f := setupFilerFilesApiTest(t) + runFilerReadDirTest(t, ctx, f) +} diff --git a/libs/filer/files_client.go b/libs/filer/files_client.go new file mode 100644 index 00000000..6c1f5a97 --- /dev/null +++ b/libs/filer/files_client.go @@ -0,0 +1,241 @@ +package filer + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "net/url" + "path" + "strings" + "time" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/client" + "golang.org/x/exp/slices" +) + +// Type that implements fs.FileInfo for the Files API. +type filesApiFileInfo struct { + absPath string + isDir bool +} + +func (info filesApiFileInfo) Name() string { + return path.Base(info.absPath) +} + +func (info filesApiFileInfo) Size() int64 { + // No way to get the file size in the Files API. + return 0 +} + +func (info filesApiFileInfo) Mode() fs.FileMode { + mode := fs.ModePerm + if info.isDir { + mode |= fs.ModeDir + } + return mode +} + +func (info filesApiFileInfo) ModTime() time.Time { + return time.Time{} +} + +func (info filesApiFileInfo) IsDir() bool { + return info.isDir +} + +func (info filesApiFileInfo) Sys() any { + return nil +} + +// FilesClient implements the [Filer] interface for the Files API backend. +type FilesClient struct { + workspaceClient *databricks.WorkspaceClient + apiClient *client.DatabricksClient + + // File operations will be relative to this path. + root RootPath +} + +func filesNotImplementedError(fn string) error { + return fmt.Errorf("filer.%s is not implemented for the Files API", fn) +} + +func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) { + apiClient, err := client.New(w.Config) + if err != nil { + return nil, err + } + + return &FilesClient{ + workspaceClient: w, + apiClient: apiClient, + + root: NewRootPath(root), + }, nil +} + +func (w *FilesClient) urlPath(name string) (string, string, error) { + absPath, err := w.root.Join(name) + if err != nil { + return "", "", err + } + + // The user specified part of the path must be escaped. + urlPath := fmt.Sprintf( + "/api/2.0/fs/files/%s", + url.PathEscape(strings.TrimLeft(absPath, "/")), + ) + + return absPath, urlPath, nil +} + +func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { + absPath, urlPath, err := w.urlPath(name) + if err != nil { + return err + } + + overwrite := slices.Contains(mode, OverwriteIfExists) + urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite) + err = w.apiClient.Do(ctx, http.MethodPut, urlPath, reader, nil, + func(r *http.Request) error { + r.Header.Set("Content-Type", "application/octet-stream") + return nil + }) + + // Return early on success. + if err == nil { + return nil + } + + // Special handling of this error only if it is an API error. + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns 409 if the file already exists, when the object type is file + if aerr.StatusCode == http.StatusConflict { + return FileAlreadyExistsError{absPath} + } + + return err +} + +func (w *FilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { + absPath, urlPath, err := w.urlPath(name) + if err != nil { + return nil, err + } + + var buf bytes.Buffer + err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &buf) + + // Return early on success. + if err == nil { + return io.NopCloser(&buf), nil + } + + // Special handling of this error only if it is an API error. + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return nil, err + } + + // This API returns a 404 if the specified path does not exist. + if aerr.StatusCode == http.StatusNotFound { + return nil, FileDoesNotExistError{absPath} + } + + return nil, err +} + +func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { + absPath, urlPath, err := w.urlPath(name) + if err != nil { + return err + } + + // Illegal to delete the root path. + if absPath == w.root.rootPath { + return CannotDeleteRootError{} + } + + err = w.apiClient.Do(ctx, http.MethodDelete, urlPath, nil, nil) + + // Return early on success. + if err == nil { + return nil + } + + // Special handling of this error only if it is an API error. + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 404 if the specified path does not exist. + if aerr.StatusCode == http.StatusNotFound { + return FileDoesNotExistError{absPath} + } + + // This API returns 409 if the underlying path is a directory. + if aerr.StatusCode == http.StatusConflict { + return DirectoryNotEmptyError{absPath} + } + + return err +} + +func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { + return nil, filesNotImplementedError("ReadDir") +} + +func (w *FilesClient) Mkdir(ctx context.Context, name string) error { + // Directories are created implicitly. + // No need to do anything. + return nil +} + +func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { + absPath, urlPath, err := w.urlPath(name) + if err != nil { + return nil, err + } + + err = w.apiClient.Do(ctx, http.MethodHead, urlPath, nil, nil, + func(r *http.Request) error { + r.Header.Del("Content-Type") + return nil + }) + + // If the HEAD requests succeeds, the file exists. + if err == nil { + return filesApiFileInfo{absPath: absPath, isDir: false}, nil + } + + // Special handling of this error only if it is an API error. + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return nil, err + } + + // This API returns a 404 if the specified path does not exist. + if aerr.StatusCode == http.StatusNotFound { + return nil, FileDoesNotExistError{absPath} + } + + // This API returns 409 if the underlying path is a directory. + if aerr.StatusCode == http.StatusConflict { + return filesApiFileInfo{absPath: absPath, isDir: true}, nil + } + + return nil, err +}