Include recursive deletion in filer interface (#442)

## Changes

This captures the recursive deletion of a directory tree in the filer interface.

Prompted by #433.

## Tests

Integration tests pass (ran the filer ones on AWS and Azure).
This commit is contained in:
Pieter Noordhuis 2023-06-06 08:27:47 +02:00 committed by GitHub
parent d6d35e314f
commit be10ff9a75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 103 additions and 9 deletions

View File

@ -68,6 +68,10 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
assert.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`)
// Write should succeed if there is no existing file at the specified path.
err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`))
assert.NoError(t, err)
// Stat on a directory should succeed.
// Note: size and modification time behave differently between WSFS and DBFS.
info, err := f.Stat(ctx, "/foo")
@ -97,6 +101,21 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
// Delete should succeed for file that does exist.
err = f.Delete(ctx, "/foo/bar")
assert.NoError(t, err)
// Delete should fail for a non-empty directory.
err = f.Delete(ctx, "/foo")
assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{}))
assert.True(t, errors.Is(err, fs.ErrInvalid))
// Delete should succeed for a non-empty directory if the DeleteRecursively flag is set.
err = f.Delete(ctx, "/foo", filer.DeleteRecursively)
assert.NoError(t, err)
// Delete of the filer root should ALWAYS fail, otherwise subsequent writes would fail.
// It is not in the filer's purview to delete its root directory.
err = f.Delete(ctx, "/")
assert.True(t, errors.As(err, &filer.CannotDeleteRootError{}))
assert.True(t, errors.Is(err, fs.ErrInvalid))
}
func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) {

View File

@ -165,12 +165,17 @@ func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) {
return handle, nil
}
func (w *DbfsClient) Delete(ctx context.Context, name string) error {
func (w *DbfsClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}
// Illegal to delete the root path.
if absPath == w.root.rootPath {
return CannotDeleteRootError{}
}
// Issue info call before delete because delete succeeds if the specified path doesn't exist.
//
// For discussion: we could decide this is actually convenient, remove the call below,
@ -193,10 +198,36 @@ func (w *DbfsClient) Delete(ctx context.Context, name string) error {
return err
}
return w.workspaceClient.Dbfs.Delete(ctx, files.Delete{
recursive := false
if slices.Contains(mode, DeleteRecursively) {
recursive = true
}
err = w.workspaceClient.Dbfs.Delete(ctx, files.Delete{
Path: absPath,
Recursive: false,
Recursive: recursive,
})
// Return early on success.
if err == nil {
return nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}
switch aerr.StatusCode {
case http.StatusBadRequest:
// Anecdotally, this error is returned when attempting to delete a non-empty directory.
if aerr.ErrorCode == "IO_ERROR" {
return DirectoryNotEmptyError{absPath}
}
}
return err
}
func (w *DbfsClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {

View File

@ -14,6 +14,12 @@ const (
CreateParentDirectories = iota << 1
)
type DeleteMode int
const (
DeleteRecursively DeleteMode = iota
)
type FileAlreadyExistsError struct {
path string
}
@ -62,6 +68,29 @@ func (err NotADirectory) Is(other error) bool {
return other == fs.ErrInvalid
}
type DirectoryNotEmptyError struct {
path string
}
func (err DirectoryNotEmptyError) Error() string {
return fmt.Sprintf("directory not empty: %s", err.path)
}
func (err DirectoryNotEmptyError) Is(other error) bool {
return other == fs.ErrInvalid
}
type CannotDeleteRootError struct {
}
func (err CannotDeleteRootError) Error() string {
return "unable to delete filer root"
}
func (err CannotDeleteRootError) Is(other error) bool {
return other == fs.ErrInvalid
}
// Filer is used to access files in a workspace.
// It has implementations for accessing files in WSFS and in DBFS.
type Filer interface {
@ -72,8 +101,8 @@ type Filer interface {
// Read file at `path`.
Read(ctx context.Context, path string) (io.Reader, error)
// Delete file at `path`.
Delete(ctx context.Context, path string) error
// Delete file or directory at `path`.
Delete(ctx context.Context, path string, mode ...DeleteMode) error
// Return contents of directory at `path`.
ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error)

View File

@ -79,7 +79,7 @@ func (f *fakeFiler) Read(ctx context.Context, p string) (io.Reader, error) {
return strings.NewReader("foo"), nil
}
func (f *fakeFiler) Delete(ctx context.Context, p string) error {
func (f *fakeFiler) Delete(ctx context.Context, p string, mode ...DeleteMode) error {
return fmt.Errorf("not implemented")
}

View File

@ -184,15 +184,25 @@ func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.Reader
return nil, err
}
func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error {
func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}
// Illegal to delete the root path.
if absPath == w.root.rootPath {
return CannotDeleteRootError{}
}
recursive := false
if slices.Contains(mode, DeleteRecursively) {
recursive = true
}
err = w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{
Path: absPath,
Recursive: false,
Recursive: recursive,
})
// Return early on success.
@ -206,7 +216,12 @@ func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error {
return err
}
if aerr.StatusCode == http.StatusNotFound {
switch aerr.StatusCode {
case http.StatusBadRequest:
if aerr.ErrorCode == "DIRECTORY_NOT_EMPTY" {
return DirectoryNotEmptyError{absPath}
}
case http.StatusNotFound:
return FileDoesNotExistError{absPath}
}