Add support for UC Volumes to the `databricks fs` commands (#1209)

## Changes
```
shreyas.goenka@THW32HFW6T cli % databricks fs -h
Commands to do file system operations on DBFS and UC Volumes.

Usage:
  databricks fs [command]

Available Commands:
  cat         Show file content.
  cp          Copy files and directories.
  ls          Lists files.
  mkdir       Make directories.
  rm          Remove files and directories.
```

This PR adds support for UC Volumes to the fs commands. The fs commands
for UC volumes work the same as they currently do for DBFS. This is
ensured by running the same test matrix we across both DBFS and UC
Volumes versions of the fs commands.

## Tests
Support for UC volumes is tested by running the same tests as we did
originally for DBFS commands. The tests require a `main` catalog to
exist in the workspace, which does in our test workspaces environments
which have the `TEST_METASTORE_ID` environment variable set.

For the Files API filer, we do the same by running mostly common tests
to ensure the filers for "local", "wsfs", "dbfs" and "files API" are
consistent.

The tests are also made to all run in parallel to reduce the time taken.
To ensure the separation of the tests, each test creates its own UC
schema (for UC volumes tests) or DBFS directories (for DBFS tests).
This commit is contained in:
shreyas-goenka 2024-02-20 21:44:37 +05:30 committed by GitHub
parent d9f34e6b22
commit 5ba0aaa5c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 1138 additions and 671 deletions

View File

@ -9,8 +9,8 @@ import (
func newCatCommand() *cobra.Command { func newCatCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "cat FILE_PATH", Use: "cat FILE_PATH",
Short: "Show file content", Short: "Show file content.",
Long: `Show the contents of a file.`, Long: `Show the contents of a file in DBFS or a UC Volume.`,
Args: cobra.ExactArgs(1), Args: cobra.ExactArgs(1),
PreRunE: root.MustWorkspaceClient, PreRunE: root.MustWorkspaceClient,
} }

View File

@ -129,10 +129,10 @@ func (c *copy) emitFileCopiedEvent(sourcePath, targetPath string) error {
func newCpCommand() *cobra.Command { func newCpCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "cp SOURCE_PATH TARGET_PATH", Use: "cp SOURCE_PATH TARGET_PATH",
Short: "Copy files and directories to and from DBFS.", Short: "Copy files and directories.",
Long: `Copy files to and from DBFS. Long: `Copy files and directories to and from any paths on DBFS, UC Volumes or your local filesystem.
For paths in DBFS it is required that you specify the "dbfs" scheme. For paths in DBFS and UC Volumes, it is required that you specify the "dbfs" scheme.
For example: dbfs:/foo/bar. For example: dbfs:/foo/bar.
Recursively copying a directory will copy all files inside directory Recursively copying a directory will copy all files inside directory
@ -152,9 +152,6 @@ func newCpCommand() *cobra.Command {
cmd.RunE = func(cmd *cobra.Command, args []string) error { cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context() ctx := cmd.Context()
// TODO: Error if a user uses '\' as path separator on windows when "file"
// scheme is specified (https://github.com/databricks/cli/issues/485)
// Get source filer and source path without scheme // Get source filer and source path without scheme
fullSourcePath := args[0] fullSourcePath := args[0]
sourceFiler, sourcePath, err := filerForPath(ctx, fullSourcePath) sourceFiler, sourcePath, err := filerForPath(ctx, fullSourcePath)

View File

@ -8,7 +8,7 @@ func New() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "fs", Use: "fs",
Short: "Filesystem related commands", Short: "Filesystem related commands",
Long: `Commands to do DBFS operations.`, Long: `Commands to do file system operations on DBFS and UC Volumes.`,
GroupID: "workspace", GroupID: "workspace",
} }

View File

@ -40,8 +40,8 @@ func toJsonDirEntry(f fs.DirEntry, baseDir string, isAbsolute bool) (*jsonDirEnt
func newLsCommand() *cobra.Command { func newLsCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "ls DIR_PATH", Use: "ls DIR_PATH",
Short: "Lists files", Short: "Lists files.",
Long: `Lists files`, Long: `Lists files in DBFS and UC Volumes.`,
Args: cobra.ExactArgs(1), Args: cobra.ExactArgs(1),
PreRunE: root.MustWorkspaceClient, PreRunE: root.MustWorkspaceClient,
} }

View File

@ -11,8 +11,8 @@ func newMkdirCommand() *cobra.Command {
// Alias `mkdirs` for this command exists for legacy purposes. This command // Alias `mkdirs` for this command exists for legacy purposes. This command
// is called databricks fs mkdirs in our legacy CLI: https://github.com/databricks/databricks-cli // is called databricks fs mkdirs in our legacy CLI: https://github.com/databricks/databricks-cli
Aliases: []string{"mkdirs"}, Aliases: []string{"mkdirs"},
Short: "Make directories", Short: "Make directories.",
Long: `Mkdir will create directories along the path to the argument directory.`, Long: `Make directories in DBFS and UC Volumes. Mkdir will create directories along the path to the argument directory.`,
Args: cobra.ExactArgs(1), Args: cobra.ExactArgs(1),
PreRunE: root.MustWorkspaceClient, PreRunE: root.MustWorkspaceClient,
} }

View File

@ -9,8 +9,8 @@ import (
func newRmCommand() *cobra.Command { func newRmCommand() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
Use: "rm PATH", Use: "rm PATH",
Short: "Remove files and directories from dbfs.", Short: "Remove files and directories.",
Long: `Remove files and directories from dbfs.`, Long: `Remove files and directories from DBFS and UC Volumes.`,
Args: cobra.ExactArgs(1), Args: cobra.ExactArgs(1),
PreRunE: root.MustWorkspaceClient, PreRunE: root.MustWorkspaceClient,
} }

View File

@ -6,14 +6,11 @@ import (
"errors" "errors"
"io" "io"
"io/fs" "io/fs"
"net/http"
"regexp" "regexp"
"strings" "strings"
"testing" "testing"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -40,15 +37,87 @@ func (f filerTest) assertContents(ctx context.Context, name string, contents str
assert.Equal(f, contents, body.String()) assert.Equal(f, contents, body.String())
} }
func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { func commonFilerRecursiveDeleteTest(t *testing.T, ctx context.Context, f filer.Filer) {
var err error var err error
// Write should fail because the root path doesn't yet exist. err = f.Write(ctx, "dir/file1", strings.NewReader("content1"), filer.CreateParentDirectories)
require.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "dir/file1", `content1`)
err = f.Write(ctx, "dir/file2", strings.NewReader("content2"), filer.CreateParentDirectories)
require.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "dir/file2", `content2`)
err = f.Write(ctx, "dir/subdir1/file3", strings.NewReader("content3"), filer.CreateParentDirectories)
require.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "dir/subdir1/file3", `content3`)
err = f.Write(ctx, "dir/subdir1/file4", strings.NewReader("content4"), filer.CreateParentDirectories)
require.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "dir/subdir1/file4", `content4`)
err = f.Write(ctx, "dir/subdir2/file5", strings.NewReader("content5"), filer.CreateParentDirectories)
require.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "dir/subdir2/file5", `content5`)
err = f.Write(ctx, "dir/subdir2/file6", strings.NewReader("content6"), filer.CreateParentDirectories)
require.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "dir/subdir2/file6", `content6`)
entriesBeforeDelete, err := f.ReadDir(ctx, "dir")
require.NoError(t, err)
assert.Len(t, entriesBeforeDelete, 4)
names := []string{}
for _, e := range entriesBeforeDelete {
names = append(names, e.Name())
}
assert.Equal(t, names, []string{"file1", "file2", "subdir1", "subdir2"})
err = f.Delete(ctx, "dir")
assert.ErrorAs(t, err, &filer.DirectoryNotEmptyError{})
err = f.Delete(ctx, "dir", filer.DeleteRecursively)
assert.NoError(t, err)
_, err = f.ReadDir(ctx, "dir")
assert.ErrorAs(t, err, &filer.NoSuchDirectoryError{})
}
func TestAccFilerRecursiveDelete(t *testing.T) {
t.Parallel()
for _, testCase := range []struct {
name string
f func(t *testing.T) (filer.Filer, string)
}{
{"local", setupLocalFiler},
{"workspace files", setupWsfsFiler},
{"dbfs", setupDbfsFiler},
{"files", setupUcVolumesFiler},
} {
tc := testCase
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()
f, _ := tc.f(t)
ctx := context.Background()
// Common tests we run across all filers to ensure consistent behavior.
commonFilerRecursiveDeleteTest(t, ctx, f)
})
}
}
// Common tests we run across all filers to ensure consistent behavior.
func commonFilerReadWriteTests(t *testing.T, ctx context.Context, f filer.Filer) {
var err error
// Write should fail because the intermediate directory doesn't exist.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`))
assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
assert.True(t, errors.Is(err, fs.ErrNotExist)) assert.True(t, errors.Is(err, fs.ErrNotExist))
// Read should fail because the root path doesn't yet exist. // Read should fail because the intermediate directory doesn't yet exist.
_, err = f.Read(ctx, "/foo/bar") _, err = f.Read(ctx, "/foo/bar")
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
assert.True(t, errors.Is(err, fs.ErrNotExist)) assert.True(t, errors.Is(err, fs.ErrNotExist))
@ -96,12 +165,12 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
// Delete should fail if the file doesn't exist. // Delete should fail if the file doesn't exist.
err = f.Delete(ctx, "/doesnt_exist") err = f.Delete(ctx, "/doesnt_exist")
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) assert.ErrorAs(t, err, &filer.FileDoesNotExistError{})
assert.True(t, errors.Is(err, fs.ErrNotExist)) assert.True(t, errors.Is(err, fs.ErrNotExist))
// Stat should fail if the file doesn't exist. // Stat should fail if the file doesn't exist.
_, err = f.Stat(ctx, "/doesnt_exist") _, err = f.Stat(ctx, "/doesnt_exist")
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) assert.ErrorAs(t, err, &filer.FileDoesNotExistError{})
assert.True(t, errors.Is(err, fs.ErrNotExist)) assert.True(t, errors.Is(err, fs.ErrNotExist))
// Delete should succeed for file that does exist. // Delete should succeed for file that does exist.
@ -110,7 +179,7 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
// Delete should fail for a non-empty directory. // Delete should fail for a non-empty directory.
err = f.Delete(ctx, "/foo") err = f.Delete(ctx, "/foo")
assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{})) assert.ErrorAs(t, err, &filer.DirectoryNotEmptyError{})
assert.True(t, errors.Is(err, fs.ErrInvalid)) assert.True(t, errors.Is(err, fs.ErrInvalid))
// Delete should succeed for a non-empty directory if the DeleteRecursively flag is set. // Delete should succeed for a non-empty directory if the DeleteRecursively flag is set.
@ -124,7 +193,33 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
assert.True(t, errors.Is(err, fs.ErrInvalid)) assert.True(t, errors.Is(err, fs.ErrInvalid))
} }
func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { func TestAccFilerReadWrite(t *testing.T) {
t.Parallel()
for _, testCase := range []struct {
name string
f func(t *testing.T) (filer.Filer, string)
}{
{"local", setupLocalFiler},
{"workspace files", setupWsfsFiler},
{"dbfs", setupDbfsFiler},
{"files", setupUcVolumesFiler},
} {
tc := testCase
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()
f, _ := tc.f(t)
ctx := context.Background()
// Common tests we run across all filers to ensure consistent behavior.
commonFilerReadWriteTests(t, ctx, f)
})
}
}
// Common tests we run across all filers to ensure consistent behavior.
func commonFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) {
var err error var err error
var info fs.FileInfo var info fs.FileInfo
@ -206,54 +301,28 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) {
assert.False(t, entries[0].IsDir()) assert.False(t, entries[0].IsDir())
} }
func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { func TestAccFilerReadDir(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
for _, testCase := range []struct {
name string
f func(t *testing.T) (filer.Filer, string)
}{
{"local", setupLocalFiler},
{"workspace files", setupWsfsFiler},
{"dbfs", setupDbfsFiler},
{"files", setupUcVolumesFiler},
} {
tc := testCase
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()
f, _ := tc.f(t)
ctx := context.Background() ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := TemporaryWorkspaceDir(t, w)
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
require.NoError(t, err)
// Check if we can use this API here, skip test if we cannot. commonFilerReadDirTest(t, ctx, f)
_, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled") })
var aerr *apierr.APIError
if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest {
t.Skip(aerr.Message)
} }
return ctx, f
}
func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) {
ctx, f := setupWorkspaceFilesTest(t)
runFilerReadWriteTest(t, ctx, f)
}
func TestAccFilerWorkspaceFilesReadDir(t *testing.T) {
ctx, f := setupWorkspaceFilesTest(t)
runFilerReadDirTest(t, ctx, f)
}
func setupFilerDbfsTest(t *testing.T) (context.Context, filer.Filer) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := TemporaryDbfsDir(t, w)
f, err := filer.NewDbfsClient(w, tmpdir)
require.NoError(t, err)
return ctx, f
}
func TestAccFilerDbfsReadWrite(t *testing.T) {
ctx, f := setupFilerDbfsTest(t)
runFilerReadWriteTest(t, ctx, f)
}
func TestAccFilerDbfsReadDir(t *testing.T) {
ctx, f := setupFilerDbfsTest(t)
runFilerReadDirTest(t, ctx, f)
} }
var jupyterNotebookContent1 = ` var jupyterNotebookContent1 = `
@ -305,7 +374,8 @@ var jupyterNotebookContent2 = `
` `
func TestAccFilerWorkspaceNotebookConflict(t *testing.T) { func TestAccFilerWorkspaceNotebookConflict(t *testing.T) {
ctx, f := setupWorkspaceFilesTest(t) f, _ := setupWsfsFiler(t)
ctx := context.Background()
var err error var err error
// Upload the notebooks // Upload the notebooks
@ -350,7 +420,8 @@ func TestAccFilerWorkspaceNotebookConflict(t *testing.T) {
} }
func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) { func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) {
ctx, f := setupWorkspaceFilesTest(t) f, _ := setupWsfsFiler(t)
ctx := context.Background()
var err error var err error
// Upload notebooks // Upload notebooks
@ -391,140 +462,3 @@ func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) {
filerTest{t, f}.assertContents(ctx, "scalaNb", "// Databricks notebook source\n println(\"second upload\"))") filerTest{t, f}.assertContents(ctx, "scalaNb", "// Databricks notebook source\n println(\"second upload\"))")
filerTest{t, f}.assertContents(ctx, "jupyterNb", "# Databricks notebook source\nprint(\"Jupyter Notebook Version 2\")") filerTest{t, f}.assertContents(ctx, "jupyterNb", "# Databricks notebook source\nprint(\"Jupyter Notebook Version 2\")")
} }
func setupFilerLocalTest(t *testing.T) (context.Context, filer.Filer) {
ctx := context.Background()
f, err := filer.NewLocalClient(t.TempDir())
require.NoError(t, err)
return ctx, f
}
func TestAccFilerLocalReadWrite(t *testing.T) {
ctx, f := setupFilerLocalTest(t)
runFilerReadWriteTest(t, ctx, f)
}
func TestAccFilerLocalReadDir(t *testing.T) {
ctx, f := setupFilerLocalTest(t)
runFilerReadDirTest(t, ctx, f)
}
func temporaryVolumeDir(t *testing.T, w *databricks.WorkspaceClient) string {
// Assume this test is run against the internal testing workspace.
path := RandomName("/Volumes/bogdanghita/default/v3_shared/cli-testing/integration-test-filer-")
// The Files API doesn't include support for creating and removing directories yet.
// Directories are created implicitly by writing a file to a path that doesn't exist.
// We therefore assume we can use the specified path without creating it first.
t.Logf("using dbfs:%s", path)
return path
}
func setupFilerFilesApiTest(t *testing.T) (context.Context, filer.Filer) {
t.SkipNow() // until available on prod
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := temporaryVolumeDir(t, w)
f, err := filer.NewFilesClient(w, tmpdir)
require.NoError(t, err)
return ctx, f
}
func TestAccFilerFilesApiReadWrite(t *testing.T) {
ctx, f := setupFilerFilesApiTest(t)
// The Files API doesn't know about directories yet.
// Below is a copy of [runFilerReadWriteTest] with
// assertions that don't work commented out.
var err error
// Write should fail because the root path doesn't yet exist.
// err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`))
// assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}))
// assert.True(t, errors.Is(err, fs.ErrNotExist))
// Read should fail because the root path doesn't yet exist.
_, err = f.Read(ctx, "/foo/bar")
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
assert.True(t, errors.Is(err, fs.ErrNotExist))
// Read should fail because the path points to a directory
// err = f.Mkdir(ctx, "/dir")
// require.NoError(t, err)
// _, err = f.Read(ctx, "/dir")
// assert.ErrorIs(t, err, fs.ErrInvalid)
// Write with CreateParentDirectories flag should succeed.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories)
assert.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`)
// Write should fail because there is an existing file at the specified path.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`))
assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{}))
assert.True(t, errors.Is(err, fs.ErrExist))
// Write with OverwriteIfExists should succeed.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists)
assert.NoError(t, err)
filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`)
// Write should succeed if there is no existing file at the specified path.
err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`))
assert.NoError(t, err)
// Stat on a directory should succeed.
// Note: size and modification time behave differently between backends.
info, err := f.Stat(ctx, "/foo")
require.NoError(t, err)
assert.Equal(t, "foo", info.Name())
assert.True(t, info.Mode().IsDir())
assert.Equal(t, true, info.IsDir())
// Stat on a file should succeed.
// Note: size and modification time behave differently between backends.
info, err = f.Stat(ctx, "/foo/bar")
require.NoError(t, err)
assert.Equal(t, "bar", info.Name())
assert.True(t, info.Mode().IsRegular())
assert.Equal(t, false, info.IsDir())
// Delete should fail if the file doesn't exist.
err = f.Delete(ctx, "/doesnt_exist")
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
assert.True(t, errors.Is(err, fs.ErrNotExist))
// Stat should fail if the file doesn't exist.
_, err = f.Stat(ctx, "/doesnt_exist")
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
assert.True(t, errors.Is(err, fs.ErrNotExist))
// Delete should succeed for file that does exist.
err = f.Delete(ctx, "/foo/bar")
assert.NoError(t, err)
// Delete should fail for a non-empty directory.
err = f.Delete(ctx, "/foo")
assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{}))
assert.True(t, errors.Is(err, fs.ErrInvalid))
// Delete should succeed for a non-empty directory if the DeleteRecursively flag is set.
// err = f.Delete(ctx, "/foo", filer.DeleteRecursively)
// assert.NoError(t, err)
// Delete of the filer root should ALWAYS fail, otherwise subsequent writes would fail.
// It is not in the filer's purview to delete its root directory.
err = f.Delete(ctx, "/")
assert.True(t, errors.As(err, &filer.CannotDeleteRootError{}))
assert.True(t, errors.Is(err, fs.ErrInvalid))
}
func TestAccFilerFilesApiReadDir(t *testing.T) {
t.Skipf("no support for ReadDir yet")
ctx, f := setupFilerFilesApiTest(t)
runFilerReadDirTest(t, ctx, f)
}

View File

@ -13,31 +13,60 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestAccFsCatForDbfs(t *testing.T) { func TestAccFsCat(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() for _, testCase := range fsTests {
w, err := databricks.NewWorkspaceClient() tc := testCase
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
f, tmpDir := tc.setupFiler(t)
err := f.Write(context.Background(), "hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories)
require.NoError(t, err) require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) stdout, stderr := RequireSuccessfulRun(t, "fs", "cat", path.Join(tmpDir, "hello.txt"))
f, err := filer.NewDbfsClient(w, tmpDir)
require.NoError(t, err)
err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories)
require.NoError(t, err)
stdout, stderr := RequireSuccessfulRun(t, "fs", "cat", "dbfs:"+path.Join(tmpDir, "a", "hello.txt"))
assert.Equal(t, "", stderr.String()) assert.Equal(t, "", stderr.String())
assert.Equal(t, "abc", stdout.String()) assert.Equal(t, "abcd", stdout.String())
})
}
} }
func TestAccFsCatForDbfsOnNonExistentFile(t *testing.T) { func TestAccFsCatOnADir(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
_, _, err := RequireErrorRun(t, "fs", "cat", "dbfs:/non-existent-file") for _, testCase := range fsTests {
tc := testCase
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
f, tmpDir := tc.setupFiler(t)
err := f.Mkdir(context.Background(), "dir1")
require.NoError(t, err)
_, _, err = RequireErrorRun(t, "fs", "cat", path.Join(tmpDir, "dir1"))
assert.ErrorAs(t, err, &filer.NotAFile{})
})
}
}
func TestAccFsCatOnNonExistentFile(t *testing.T) {
t.Parallel()
for _, testCase := range fsTests {
tc := testCase
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
_, tmpDir := tc.setupFiler(t)
_, _, err := RequireErrorRun(t, "fs", "cat", path.Join(tmpDir, "non-existent-file"))
assert.ErrorIs(t, err, fs.ErrNotExist) assert.ErrorIs(t, err, fs.ErrNotExist)
})
}
} }
func TestAccFsCatForDbfsInvalidScheme(t *testing.T) { func TestAccFsCatForDbfsInvalidScheme(t *testing.T) {
@ -65,6 +94,3 @@ func TestAccFsCatDoesNotSupportOutputModeJson(t *testing.T) {
_, _, err = RequireErrorRun(t, "fs", "cat", "dbfs:"+path.Join(tmpDir, "hello.txt"), "--output=json") _, _, err = RequireErrorRun(t, "fs", "cat", "dbfs:"+path.Join(tmpDir, "hello.txt"), "--output=json")
assert.ErrorContains(t, err, "json output not supported") assert.ErrorContains(t, err, "json output not supported")
} }
// TODO: Add test asserting an error when cat is called on an directory. Need this to be
// fixed in the SDK first (https://github.com/databricks/databricks-sdk-go/issues/414)

View File

@ -2,16 +2,15 @@ package internal
import ( import (
"context" "context"
"fmt"
"io" "io"
"path" "path"
"path/filepath" "path/filepath"
"regexp"
"runtime" "runtime"
"strings" "strings"
"testing" "testing"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -60,84 +59,124 @@ func assertTargetDir(t *testing.T, ctx context.Context, f filer.Filer) {
assertFileContent(t, ctx, f, "a/b/c/hello.txt", "hello, world\n") assertFileContent(t, ctx, f, "a/b/c/hello.txt", "hello, world\n")
} }
func setupLocalFiler(t *testing.T) (filer.Filer, string) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
tmp := t.TempDir()
f, err := filer.NewLocalClient(tmp)
require.NoError(t, err)
return f, path.Join(filepath.ToSlash(tmp))
}
func setupDbfsFiler(t *testing.T) (filer.Filer, string) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w)
f, err := filer.NewDbfsClient(w, tmpDir)
require.NoError(t, err)
return f, path.Join("dbfs:/", tmpDir)
}
type cpTest struct { type cpTest struct {
name string
setupSource func(*testing.T) (filer.Filer, string) setupSource func(*testing.T) (filer.Filer, string)
setupTarget func(*testing.T) (filer.Filer, string) setupTarget func(*testing.T) (filer.Filer, string)
} }
func setupTable() []cpTest { func copyTests() []cpTest {
return []cpTest{ return []cpTest{
{setupSource: setupLocalFiler, setupTarget: setupLocalFiler}, // source: local file system
{setupSource: setupLocalFiler, setupTarget: setupDbfsFiler}, {
{setupSource: setupDbfsFiler, setupTarget: setupLocalFiler}, name: "local to local",
{setupSource: setupDbfsFiler, setupTarget: setupDbfsFiler}, setupSource: setupLocalFiler,
setupTarget: setupLocalFiler,
},
{
name: "local to dbfs",
setupSource: setupLocalFiler,
setupTarget: setupDbfsFiler,
},
{
name: "local to uc-volumes",
setupSource: setupLocalFiler,
setupTarget: setupUcVolumesFiler,
},
// source: dbfs
{
name: "dbfs to local",
setupSource: setupDbfsFiler,
setupTarget: setupLocalFiler,
},
{
name: "dbfs to dbfs",
setupSource: setupDbfsFiler,
setupTarget: setupDbfsFiler,
},
{
name: "dbfs to uc-volumes",
setupSource: setupDbfsFiler,
setupTarget: setupUcVolumesFiler,
},
// source: uc-volumes
{
name: "uc-volumes to local",
setupSource: setupUcVolumesFiler,
setupTarget: setupLocalFiler,
},
{
name: "uc-volumes to dbfs",
setupSource: setupUcVolumesFiler,
setupTarget: setupDbfsFiler,
},
{
name: "uc-volumes to uc-volumes",
setupSource: setupUcVolumesFiler,
setupTarget: setupUcVolumesFiler,
},
} }
} }
func TestAccFsCpDir(t *testing.T) { func TestAccFsCpDir(t *testing.T) {
ctx := context.Background() t.Parallel()
table := setupTable()
for _, row := range table { for _, testCase := range copyTests() {
sourceFiler, sourceDir := row.setupSource(t) tc := testCase
targetFiler, targetDir := row.setupTarget(t)
setupSourceDir(t, ctx, sourceFiler)
RequireSuccessfulRun(t, "fs", "cp", "-r", sourceDir, targetDir) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
assertTargetDir(t, ctx, targetFiler) sourceFiler, sourceDir := tc.setupSource(t)
targetFiler, targetDir := tc.setupTarget(t)
setupSourceDir(t, context.Background(), sourceFiler)
RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive")
assertTargetDir(t, context.Background(), targetFiler)
})
} }
} }
func TestAccFsCpFileToFile(t *testing.T) { func TestAccFsCpFileToFile(t *testing.T) {
ctx := context.Background() t.Parallel()
table := setupTable()
for _, row := range table { for _, testCase := range copyTests() {
sourceFiler, sourceDir := row.setupSource(t) tc := testCase
targetFiler, targetDir := row.setupTarget(t)
setupSourceFile(t, ctx, sourceFiler) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
sourceFiler, sourceDir := tc.setupSource(t)
targetFiler, targetDir := tc.setupTarget(t)
setupSourceFile(t, context.Background(), sourceFiler)
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), path.Join(targetDir, "bar.txt")) RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), path.Join(targetDir, "bar.txt"))
assertTargetFile(t, ctx, targetFiler, "bar.txt") assertTargetFile(t, context.Background(), targetFiler, "bar.txt")
})
} }
} }
func TestAccFsCpFileToDir(t *testing.T) { func TestAccFsCpFileToDir(t *testing.T) {
ctx := context.Background() t.Parallel()
table := setupTable()
for _, row := range table { for _, testCase := range copyTests() {
sourceFiler, sourceDir := row.setupSource(t) tc := testCase
targetFiler, targetDir := row.setupTarget(t)
setupSourceFile(t, ctx, sourceFiler) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
sourceFiler, sourceDir := tc.setupSource(t)
targetFiler, targetDir := tc.setupTarget(t)
setupSourceFile(t, context.Background(), sourceFiler)
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), targetDir) RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), targetDir)
assertTargetFile(t, ctx, targetFiler, "foo.txt") assertTargetFile(t, context.Background(), targetFiler, "foo.txt")
})
} }
} }
@ -158,125 +197,161 @@ func TestAccFsCpFileToDirForWindowsPaths(t *testing.T) {
} }
func TestAccFsCpDirToDirFileNotOverwritten(t *testing.T) { func TestAccFsCpDirToDirFileNotOverwritten(t *testing.T) {
ctx := context.Background() t.Parallel()
table := setupTable()
for _, row := range table { for _, testCase := range copyTests() {
sourceFiler, sourceDir := row.setupSource(t) tc := testCase
targetFiler, targetDir := row.setupTarget(t)
setupSourceDir(t, ctx, sourceFiler) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
sourceFiler, sourceDir := tc.setupSource(t)
targetFiler, targetDir := tc.setupTarget(t)
setupSourceDir(t, context.Background(), sourceFiler)
// Write a conflicting file to target // Write a conflicting file to target
err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories)
require.NoError(t, err) require.NoError(t, err)
RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive") RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive")
assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "this should not be overwritten") assertFileContent(t, context.Background(), targetFiler, "a/b/c/hello.txt", "this should not be overwritten")
assertFileContent(t, ctx, targetFiler, "query.sql", "SELECT 1") assertFileContent(t, context.Background(), targetFiler, "query.sql", "SELECT 1")
assertFileContent(t, ctx, targetFiler, "pyNb.py", "# Databricks notebook source\nprint(123)") assertFileContent(t, context.Background(), targetFiler, "pyNb.py", "# Databricks notebook source\nprint(123)")
})
} }
} }
func TestAccFsCpFileToDirFileNotOverwritten(t *testing.T) { func TestAccFsCpFileToDirFileNotOverwritten(t *testing.T) {
ctx := context.Background() t.Parallel()
table := setupTable()
for _, row := range table { for _, testCase := range copyTests() {
sourceFiler, sourceDir := row.setupSource(t) tc := testCase
targetFiler, targetDir := row.setupTarget(t)
setupSourceDir(t, ctx, sourceFiler) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
sourceFiler, sourceDir := tc.setupSource(t)
targetFiler, targetDir := tc.setupTarget(t)
setupSourceDir(t, context.Background(), sourceFiler)
// Write a conflicting file to target // Write a conflicting file to target
err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories)
require.NoError(t, err) require.NoError(t, err)
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c")) RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c"))
assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "this should not be overwritten") assertFileContent(t, context.Background(), targetFiler, "a/b/c/hello.txt", "this should not be overwritten")
})
} }
} }
func TestAccFsCpFileToFileFileNotOverwritten(t *testing.T) { func TestAccFsCpFileToFileFileNotOverwritten(t *testing.T) {
ctx := context.Background() t.Parallel()
table := setupTable()
for _, row := range table { for _, testCase := range copyTests() {
sourceFiler, sourceDir := row.setupSource(t) tc := testCase
targetFiler, targetDir := row.setupTarget(t)
setupSourceDir(t, ctx, sourceFiler) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
sourceFiler, sourceDir := tc.setupSource(t)
targetFiler, targetDir := tc.setupTarget(t)
setupSourceDir(t, context.Background(), sourceFiler)
// Write a conflicting file to target // Write a conflicting file to target
err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) err := targetFiler.Write(context.Background(), "a/b/c/dontoverwrite.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories)
require.NoError(t, err) require.NoError(t, err)
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/hola.txt"), "--recursive") RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/dontoverwrite.txt"))
assertFileContent(t, ctx, targetFiler, "a/b/c/hola.txt", "this should not be overwritten") assertFileContent(t, context.Background(), targetFiler, "a/b/c/dontoverwrite.txt", "this should not be overwritten")
})
} }
} }
func TestAccFsCpDirToDirWithOverwriteFlag(t *testing.T) { func TestAccFsCpDirToDirWithOverwriteFlag(t *testing.T) {
ctx := context.Background() t.Parallel()
table := setupTable()
for _, row := range table { for _, testCase := range copyTests() {
sourceFiler, sourceDir := row.setupSource(t) tc := testCase
targetFiler, targetDir := row.setupTarget(t)
setupSourceDir(t, ctx, sourceFiler) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
sourceFiler, sourceDir := tc.setupSource(t)
targetFiler, targetDir := tc.setupTarget(t)
setupSourceDir(t, context.Background(), sourceFiler)
// Write a conflicting file to target // Write a conflicting file to target
err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten"), filer.CreateParentDirectories) err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should be overwritten"), filer.CreateParentDirectories)
require.NoError(t, err) require.NoError(t, err)
RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive", "--overwrite") RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive", "--overwrite")
assertTargetDir(t, ctx, targetFiler) assertTargetDir(t, context.Background(), targetFiler)
})
} }
} }
func TestAccFsCpFileToFileWithOverwriteFlag(t *testing.T) { func TestAccFsCpFileToFileWithOverwriteFlag(t *testing.T) {
ctx := context.Background() t.Parallel()
table := setupTable()
for _, row := range table { for _, testCase := range copyTests() {
sourceFiler, sourceDir := row.setupSource(t) tc := testCase
targetFiler, targetDir := row.setupTarget(t)
setupSourceDir(t, ctx, sourceFiler) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
sourceFiler, sourceDir := tc.setupSource(t)
targetFiler, targetDir := tc.setupTarget(t)
setupSourceDir(t, context.Background(), sourceFiler)
// Write a conflicting file to target // Write a conflicting file to target
err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this will be overwritten. Such is life."), filer.CreateParentDirectories) err := targetFiler.Write(context.Background(), "a/b/c/overwritten.txt", strings.NewReader("this should be overwritten"), filer.CreateParentDirectories)
require.NoError(t, err) require.NoError(t, err)
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/hola.txt"), "--overwrite") RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/overwritten.txt"), "--overwrite")
assertFileContent(t, ctx, targetFiler, "a/b/c/hola.txt", "hello, world\n") assertFileContent(t, context.Background(), targetFiler, "a/b/c/overwritten.txt", "hello, world\n")
})
} }
} }
func TestAccFsCpFileToDirWithOverwriteFlag(t *testing.T) { func TestAccFsCpFileToDirWithOverwriteFlag(t *testing.T) {
ctx := context.Background() t.Parallel()
table := setupTable()
for _, row := range table { for _, testCase := range copyTests() {
sourceFiler, sourceDir := row.setupSource(t) tc := testCase
targetFiler, targetDir := row.setupTarget(t)
setupSourceDir(t, ctx, sourceFiler) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
sourceFiler, sourceDir := tc.setupSource(t)
targetFiler, targetDir := tc.setupTarget(t)
setupSourceDir(t, context.Background(), sourceFiler)
// Write a conflicting file to target // Write a conflicting file to target
err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten :') "), filer.CreateParentDirectories) err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should be overwritten"), filer.CreateParentDirectories)
require.NoError(t, err) require.NoError(t, err)
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c"), "--recursive", "--overwrite") RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c"), "--overwrite")
assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "hello, world\n") assertFileContent(t, context.Background(), targetFiler, "a/b/c/hello.txt", "hello, world\n")
})
} }
} }
func TestAccFsCpErrorsWhenSourceIsDirWithoutRecursiveFlag(t *testing.T) { func TestAccFsCpErrorsWhenSourceIsDirWithoutRecursiveFlag(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
w, err := databricks.NewWorkspaceClient() for _, testCase := range fsTests {
require.NoError(t, err) tc := testCase
tmpDir := TemporaryDbfsDir(t, w) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
_, _, err = RequireErrorRun(t, "fs", "cp", "dbfs:"+tmpDir, "dbfs:/tmp") _, tmpDir := tc.setupFiler(t)
assert.Equal(t, fmt.Sprintf("source path %s is a directory. Please specify the --recursive flag", tmpDir), err.Error())
_, _, err := RequireErrorRun(t, "fs", "cp", path.Join(tmpDir), path.Join(tmpDir, "foobar"))
r := regexp.MustCompile("source path .* is a directory. Please specify the --recursive flag")
assert.Regexp(t, r, err.Error())
})
}
} }
func TestAccFsCpErrorsOnInvalidScheme(t *testing.T) { func TestAccFsCpErrorsOnInvalidScheme(t *testing.T) {
@ -287,20 +362,24 @@ func TestAccFsCpErrorsOnInvalidScheme(t *testing.T) {
} }
func TestAccFsCpSourceIsDirectoryButTargetIsFile(t *testing.T) { func TestAccFsCpSourceIsDirectoryButTargetIsFile(t *testing.T) {
ctx := context.Background() t.Parallel()
table := setupTable()
for _, row := range table { for _, testCase := range copyTests() {
sourceFiler, sourceDir := row.setupSource(t) tc := testCase
targetFiler, targetDir := row.setupTarget(t)
setupSourceDir(t, ctx, sourceFiler) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
sourceFiler, sourceDir := tc.setupSource(t)
targetFiler, targetDir := tc.setupTarget(t)
setupSourceDir(t, context.Background(), sourceFiler)
// Write a conflicting file to target // Write a conflicting file to target
err := targetFiler.Write(ctx, "my_target", strings.NewReader("I'll block any attempts to recursively copy"), filer.CreateParentDirectories) err := targetFiler.Write(context.Background(), "my_target", strings.NewReader("I'll block any attempts to recursively copy"), filer.CreateParentDirectories)
require.NoError(t, err) require.NoError(t, err)
_, _, err = RequireErrorRun(t, "fs", "cp", sourceDir, path.Join(targetDir, "my_target"), "--recursive", "--overwrite") _, _, err = RequireErrorRun(t, "fs", "cp", sourceDir, path.Join(targetDir, "my_target"), "--recursive")
assert.Error(t, err) assert.Error(t, err)
})
} }
} }

View File

@ -11,131 +11,163 @@ import (
_ "github.com/databricks/cli/cmd/fs" _ "github.com/databricks/cli/cmd/fs"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestAccFsLsForDbfs(t *testing.T) { type fsTest struct {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) name string
setupFiler func(t *testing.T) (filer.Filer, string)
}
ctx := context.Background() var fsTests = []fsTest{
w, err := databricks.NewWorkspaceClient() {
require.NoError(t, err) name: "dbfs",
setupFiler: setupDbfsFiler,
},
{
name: "uc-volumes",
setupFiler: setupUcVolumesFiler,
},
}
tmpDir := TemporaryDbfsDir(t, w) func setupLsFiles(t *testing.T, f filer.Filer) {
err := f.Write(context.Background(), "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories)
require.NoError(t, err)
err = f.Write(context.Background(), "bye.txt", strings.NewReader("def"))
require.NoError(t, err)
}
f, err := filer.NewDbfsClient(w, tmpDir) func TestAccFsLs(t *testing.T) {
require.NoError(t, err) t.Parallel()
err = f.Mkdir(ctx, "a") for _, testCase := range fsTests {
require.NoError(t, err) tc := testCase
err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories)
require.NoError(t, err)
err = f.Write(ctx, "bye.txt", strings.NewReader("def"))
require.NoError(t, err)
stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json") t.Run(tc.name, func(t *testing.T) {
t.Parallel()
f, tmpDir := tc.setupFiler(t)
setupLsFiles(t, f)
stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", tmpDir, "--output=json")
assert.Equal(t, "", stderr.String()) assert.Equal(t, "", stderr.String())
var parsedStdout []map[string]any var parsedStdout []map[string]any
err = json.Unmarshal(stdout.Bytes(), &parsedStdout) err := json.Unmarshal(stdout.Bytes(), &parsedStdout)
require.NoError(t, err) require.NoError(t, err)
// assert on ls output // assert on ls output
assert.Len(t, parsedStdout, 2) assert.Len(t, parsedStdout, 2)
assert.Equal(t, "a", parsedStdout[0]["name"]) assert.Equal(t, "a", parsedStdout[0]["name"])
assert.Equal(t, true, parsedStdout[0]["is_directory"]) assert.Equal(t, true, parsedStdout[0]["is_directory"])
assert.Equal(t, float64(0), parsedStdout[0]["size"]) assert.Equal(t, float64(0), parsedStdout[0]["size"])
assert.Equal(t, "bye.txt", parsedStdout[1]["name"]) assert.Equal(t, "bye.txt", parsedStdout[1]["name"])
assert.Equal(t, false, parsedStdout[1]["is_directory"]) assert.Equal(t, false, parsedStdout[1]["is_directory"])
assert.Equal(t, float64(3), parsedStdout[1]["size"]) assert.Equal(t, float64(3), parsedStdout[1]["size"])
})
}
} }
func TestAccFsLsForDbfsWithAbsolutePaths(t *testing.T) { func TestAccFsLsWithAbsolutePaths(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() for _, testCase := range fsTests {
w, err := databricks.NewWorkspaceClient() tc := testCase
require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
f, err := filer.NewDbfsClient(w, tmpDir) f, tmpDir := tc.setupFiler(t)
require.NoError(t, err) setupLsFiles(t, f)
err = f.Mkdir(ctx, "a") stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", tmpDir, "--output=json", "--absolute")
require.NoError(t, err)
err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories)
require.NoError(t, err)
err = f.Write(ctx, "bye.txt", strings.NewReader("def"))
require.NoError(t, err)
stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json", "--absolute")
assert.Equal(t, "", stderr.String()) assert.Equal(t, "", stderr.String())
var parsedStdout []map[string]any var parsedStdout []map[string]any
err = json.Unmarshal(stdout.Bytes(), &parsedStdout) err := json.Unmarshal(stdout.Bytes(), &parsedStdout)
require.NoError(t, err) require.NoError(t, err)
// assert on ls output // assert on ls output
assert.Len(t, parsedStdout, 2) assert.Len(t, parsedStdout, 2)
assert.Equal(t, path.Join("dbfs:", tmpDir, "a"), parsedStdout[0]["name"])
assert.Equal(t, path.Join(tmpDir, "a"), parsedStdout[0]["name"])
assert.Equal(t, true, parsedStdout[0]["is_directory"]) assert.Equal(t, true, parsedStdout[0]["is_directory"])
assert.Equal(t, float64(0), parsedStdout[0]["size"]) assert.Equal(t, float64(0), parsedStdout[0]["size"])
assert.Equal(t, path.Join("dbfs:", tmpDir, "bye.txt"), parsedStdout[1]["name"]) assert.Equal(t, path.Join(tmpDir, "bye.txt"), parsedStdout[1]["name"])
assert.Equal(t, false, parsedStdout[1]["is_directory"]) assert.Equal(t, false, parsedStdout[1]["is_directory"])
assert.Equal(t, float64(3), parsedStdout[1]["size"]) assert.Equal(t, float64(3), parsedStdout[1]["size"])
})
}
} }
func TestAccFsLsForDbfsOnFile(t *testing.T) { func TestAccFsLsOnFile(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() for _, testCase := range fsTests {
w, err := databricks.NewWorkspaceClient() tc := testCase
require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
f, tmpDir := tc.setupFiler(t)
setupLsFiles(t, f)
f, err := filer.NewDbfsClient(w, tmpDir) _, _, err := RequireErrorRun(t, "fs", "ls", path.Join(tmpDir, "a", "hello.txt"), "--output=json")
require.NoError(t, err)
err = f.Mkdir(ctx, "a")
require.NoError(t, err)
err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories)
require.NoError(t, err)
_, _, err = RequireErrorRun(t, "fs", "ls", "dbfs:"+path.Join(tmpDir, "a", "hello.txt"), "--output=json")
assert.Regexp(t, regexp.MustCompile("not a directory: .*/a/hello.txt"), err.Error()) assert.Regexp(t, regexp.MustCompile("not a directory: .*/a/hello.txt"), err.Error())
assert.ErrorAs(t, err, &filer.NotADirectory{})
})
}
} }
func TestAccFsLsForDbfsOnEmptyDir(t *testing.T) { func TestAccFsLsOnEmptyDir(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
w, err := databricks.NewWorkspaceClient() for _, testCase := range fsTests {
require.NoError(t, err) tc := testCase
tmpDir := TemporaryDbfsDir(t, w) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json") _, tmpDir := tc.setupFiler(t)
stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", tmpDir, "--output=json")
assert.Equal(t, "", stderr.String()) assert.Equal(t, "", stderr.String())
var parsedStdout []map[string]any var parsedStdout []map[string]any
err = json.Unmarshal(stdout.Bytes(), &parsedStdout) err := json.Unmarshal(stdout.Bytes(), &parsedStdout)
require.NoError(t, err) require.NoError(t, err)
// assert on ls output // assert on ls output
assert.Equal(t, 0, len(parsedStdout)) assert.Equal(t, 0, len(parsedStdout))
})
}
} }
func TestAccFsLsForDbfsForNonexistingDir(t *testing.T) { func TestAccFsLsForNonexistingDir(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
_, _, err := RequireErrorRun(t, "fs", "ls", "dbfs:/john-cena", "--output=json") for _, testCase := range fsTests {
tc := testCase
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
_, tmpDir := tc.setupFiler(t)
_, _, err := RequireErrorRun(t, "fs", "ls", path.Join(tmpDir, "nonexistent"), "--output=json")
assert.ErrorIs(t, err, fs.ErrNotExist) assert.ErrorIs(t, err, fs.ErrNotExist)
assert.Regexp(t, regexp.MustCompile("no such directory: .*/nonexistent"), err.Error())
})
}
} }
func TestAccFsLsWithoutScheme(t *testing.T) { func TestAccFsLsWithoutScheme(t *testing.T) {
t.Parallel()
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
_, _, err := RequireErrorRun(t, "fs", "ls", "/ray-mysterio", "--output=json") _, _, err := RequireErrorRun(t, "fs", "ls", "/path-without-a-dbfs-scheme", "--output=json")
assert.ErrorIs(t, err, fs.ErrNotExist) assert.ErrorIs(t, err, fs.ErrNotExist)
} }

View File

@ -8,110 +8,127 @@ import (
"testing" "testing"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestAccFsMkdirCreatesDirectory(t *testing.T) { func TestAccFsMkdir(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() for _, testCase := range fsTests {
w, err := databricks.NewWorkspaceClient() tc := testCase
require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
f, err := filer.NewDbfsClient(w, tmpDir) f, tmpDir := tc.setupFiler(t)
require.NoError(t, err)
// create directory "a" // create directory "a"
stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "a")) stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", path.Join(tmpDir, "a"))
assert.Equal(t, "", stderr.String()) assert.Equal(t, "", stderr.String())
assert.Equal(t, "", stdout.String()) assert.Equal(t, "", stdout.String())
// assert directory "a" is created // assert directory "a" is created
info, err := f.Stat(ctx, "a") info, err := f.Stat(context.Background(), "a")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "a", info.Name()) assert.Equal(t, "a", info.Name())
assert.Equal(t, true, info.IsDir()) assert.Equal(t, true, info.IsDir())
})
}
} }
func TestAccFsMkdirCreatesMultipleDirectories(t *testing.T) { func TestAccFsMkdirCreatesIntermediateDirectories(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() for _, testCase := range fsTests {
w, err := databricks.NewWorkspaceClient() tc := testCase
require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
f, err := filer.NewDbfsClient(w, tmpDir) f, tmpDir := tc.setupFiler(t)
require.NoError(t, err)
// create directory /a/b/c // create directory "a/b/c"
stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "a", "b", "c")) stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", path.Join(tmpDir, "a", "b", "c"))
assert.Equal(t, "", stderr.String()) assert.Equal(t, "", stderr.String())
assert.Equal(t, "", stdout.String()) assert.Equal(t, "", stdout.String())
// assert directory "a" is created // assert directory "a" is created
infoA, err := f.Stat(ctx, "a") infoA, err := f.Stat(context.Background(), "a")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "a", infoA.Name()) assert.Equal(t, "a", infoA.Name())
assert.Equal(t, true, infoA.IsDir()) assert.Equal(t, true, infoA.IsDir())
// assert directory "b" is created // assert directory "b" is created
infoB, err := f.Stat(ctx, "a/b") infoB, err := f.Stat(context.Background(), "a/b")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "b", infoB.Name()) assert.Equal(t, "b", infoB.Name())
assert.Equal(t, true, infoB.IsDir()) assert.Equal(t, true, infoB.IsDir())
// assert directory "c" is created // assert directory "c" is created
infoC, err := f.Stat(ctx, "a/b/c") infoC, err := f.Stat(context.Background(), "a/b/c")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, "c", infoC.Name()) assert.Equal(t, "c", infoC.Name())
assert.Equal(t, true, infoC.IsDir()) assert.Equal(t, true, infoC.IsDir())
})
}
} }
func TestAccFsMkdirWhenDirectoryAlreadyExists(t *testing.T) { func TestAccFsMkdirWhenDirectoryAlreadyExists(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() for _, testCase := range fsTests {
w, err := databricks.NewWorkspaceClient() tc := testCase
require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) t.Run(tc.name, func(t *testing.T) {
t.Parallel()
f, tmpDir := tc.setupFiler(t)
// create directory "a" // create directory "a"
f, err := filer.NewDbfsClient(w, tmpDir) err := f.Mkdir(context.Background(), "a")
require.NoError(t, err)
err = f.Mkdir(ctx, "a")
require.NoError(t, err) require.NoError(t, err)
// assert run is successful without any errors // assert run is successful without any errors
stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "a")) stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", path.Join(tmpDir, "a"))
assert.Equal(t, "", stderr.String()) assert.Equal(t, "", stderr.String())
assert.Equal(t, "", stdout.String()) assert.Equal(t, "", stdout.String())
})
}
} }
func TestAccFsMkdirWhenFileExistsAtPath(t *testing.T) { func TestAccFsMkdirWhenFileExistsAtPath(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() t.Run("dbfs", func(t *testing.T) {
w, err := databricks.NewWorkspaceClient() t.Parallel()
f, tmpDir := setupDbfsFiler(t)
// create file "hello"
err := f.Write(context.Background(), "hello", strings.NewReader("abc"))
require.NoError(t, err) require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) // assert mkdir fails
_, _, err = RequireErrorRun(t, "fs", "mkdir", path.Join(tmpDir, "hello"))
// create file hello
f, err := filer.NewDbfsClient(w, tmpDir)
require.NoError(t, err)
err = f.Write(ctx, "hello", strings.NewReader("abc"))
require.NoError(t, err)
// assert run fails
_, _, err = RequireErrorRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "hello"))
// Different cloud providers return different errors. // Different cloud providers return different errors.
regex := regexp.MustCompile(`(^|: )Path is a file: .*$|(^|: )Cannot create directory .* because .* is an existing file\.$|(^|: )mkdirs\(hadoopPath: .*, permission: rwxrwxrwx\): failed$`) regex := regexp.MustCompile(`(^|: )Path is a file: .*$|(^|: )Cannot create directory .* because .* is an existing file\.$|(^|: )mkdirs\(hadoopPath: .*, permission: rwxrwxrwx\): failed$`)
assert.Regexp(t, regex, err.Error()) assert.Regexp(t, regex, err.Error())
})
t.Run("uc-volumes", func(t *testing.T) {
t.Parallel()
f, tmpDir := setupUcVolumesFiler(t)
// create file "hello"
err := f.Write(context.Background(), "hello", strings.NewReader("abc"))
require.NoError(t, err)
// assert mkdir fails
_, _, err = RequireErrorRun(t, "fs", "mkdir", path.Join(tmpDir, "hello"))
assert.ErrorAs(t, err, &filer.FileAlreadyExistsError{})
})
} }

View File

@ -8,139 +8,150 @@ import (
"testing" "testing"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestAccFsRmForFile(t *testing.T) { func TestAccFsRmFile(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() for _, testCase := range fsTests {
w, err := databricks.NewWorkspaceClient() tc := testCase
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// Create a file
f, tmpDir := tc.setupFiler(t)
err := f.Write(context.Background(), "hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories)
require.NoError(t, err) require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) // Check file was created
_, err = f.Stat(context.Background(), "hello.txt")
f, err := filer.NewDbfsClient(w, tmpDir) assert.NoError(t, err)
require.NoError(t, err)
// create file to delete
err = f.Write(ctx, "hello.txt", strings.NewReader("abc"))
require.NoError(t, err)
// check file was created
info, err := f.Stat(ctx, "hello.txt")
require.NoError(t, err)
require.Equal(t, "hello.txt", info.Name())
require.Equal(t, info.IsDir(), false)
// Run rm command // Run rm command
stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "hello.txt")) stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", path.Join(tmpDir, "hello.txt"))
assert.Equal(t, "", stderr.String()) assert.Equal(t, "", stderr.String())
assert.Equal(t, "", stdout.String()) assert.Equal(t, "", stdout.String())
// assert file was deleted // Assert file was deleted
_, err = f.Stat(ctx, "hello.txt") _, err = f.Stat(context.Background(), "hello.txt")
assert.ErrorIs(t, err, fs.ErrNotExist) assert.ErrorIs(t, err, fs.ErrNotExist)
})
}
} }
func TestAccFsRmForEmptyDirectory(t *testing.T) { func TestAccFsRmEmptyDir(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() for _, testCase := range fsTests {
w, err := databricks.NewWorkspaceClient() tc := testCase
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// Create a directory
f, tmpDir := tc.setupFiler(t)
err := f.Mkdir(context.Background(), "a")
require.NoError(t, err) require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) // Check directory was created
_, err = f.Stat(context.Background(), "a")
f, err := filer.NewDbfsClient(w, tmpDir) assert.NoError(t, err)
require.NoError(t, err)
// create directory to delete
err = f.Mkdir(ctx, "avacado")
require.NoError(t, err)
// check directory was created
info, err := f.Stat(ctx, "avacado")
require.NoError(t, err)
require.Equal(t, "avacado", info.Name())
require.Equal(t, info.IsDir(), true)
// Run rm command // Run rm command
stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "avacado")) stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", path.Join(tmpDir, "a"))
assert.Equal(t, "", stderr.String()) assert.Equal(t, "", stderr.String())
assert.Equal(t, "", stdout.String()) assert.Equal(t, "", stdout.String())
// assert directory was deleted // Assert directory was deleted
_, err = f.Stat(ctx, "avacado") _, err = f.Stat(context.Background(), "a")
assert.ErrorIs(t, err, fs.ErrNotExist) assert.ErrorIs(t, err, fs.ErrNotExist)
})
}
} }
func TestAccFsRmForNonEmptyDirectory(t *testing.T) { func TestAccFsRmNonEmptyDirectory(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() for _, testCase := range fsTests {
w, err := databricks.NewWorkspaceClient() tc := testCase
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
// Create a directory
f, tmpDir := tc.setupFiler(t)
err := f.Mkdir(context.Background(), "a")
require.NoError(t, err) require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) // Create a file in the directory
err = f.Write(context.Background(), "a/hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories)
f, err := filer.NewDbfsClient(w, tmpDir)
require.NoError(t, err) require.NoError(t, err)
// create file in dir // Check file was created
err = f.Write(ctx, "avacado/guacamole", strings.NewReader("abc"), filer.CreateParentDirectories) _, err = f.Stat(context.Background(), "a/hello.txt")
require.NoError(t, err) assert.NoError(t, err)
// check file was created
info, err := f.Stat(ctx, "avacado/guacamole")
require.NoError(t, err)
require.Equal(t, "guacamole", info.Name())
require.Equal(t, info.IsDir(), false)
// Run rm command // Run rm command
_, _, err = RequireErrorRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "avacado")) _, _, err = RequireErrorRun(t, "fs", "rm", path.Join(tmpDir, "a"))
assert.ErrorIs(t, err, fs.ErrInvalid) assert.ErrorIs(t, err, fs.ErrInvalid)
assert.ErrorContains(t, err, "directory not empty") assert.ErrorAs(t, err, &filer.DirectoryNotEmptyError{})
})
}
} }
func TestAccFsRmForNonExistentFile(t *testing.T) { func TestAccFsRmForNonExistentFile(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
for _, testCase := range fsTests {
tc := testCase
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
_, tmpDir := tc.setupFiler(t)
// Expect error if file does not exist // Expect error if file does not exist
_, _, err := RequireErrorRun(t, "fs", "rm", "dbfs:/does-not-exist") _, _, err := RequireErrorRun(t, "fs", "rm", path.Join(tmpDir, "does-not-exist"))
assert.ErrorIs(t, err, fs.ErrNotExist) assert.ErrorIs(t, err, fs.ErrNotExist)
})
}
} }
func TestAccFsRmForNonEmptyDirectoryWithRecursiveFlag(t *testing.T) { func TestAccFsRmDirRecursively(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) t.Parallel()
ctx := context.Background() for _, testCase := range fsTests {
w, err := databricks.NewWorkspaceClient() tc := testCase
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
f, tmpDir := tc.setupFiler(t)
// Create a directory
err := f.Mkdir(context.Background(), "a")
require.NoError(t, err) require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w) // Create a file in the directory
err = f.Write(context.Background(), "a/hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories)
f, err := filer.NewDbfsClient(w, tmpDir)
require.NoError(t, err) require.NoError(t, err)
// create file in dir // Check file was created
err = f.Write(ctx, "avacado/guacamole", strings.NewReader("abc"), filer.CreateParentDirectories) _, err = f.Stat(context.Background(), "a/hello.txt")
require.NoError(t, err) assert.NoError(t, err)
// check file was created
info, err := f.Stat(ctx, "avacado/guacamole")
require.NoError(t, err)
require.Equal(t, "guacamole", info.Name())
require.Equal(t, info.IsDir(), false)
// Run rm command // Run rm command
stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "avacado"), "--recursive") stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", path.Join(tmpDir, "a"), "--recursive")
assert.Equal(t, "", stderr.String()) assert.Equal(t, "", stderr.String())
assert.Equal(t, "", stdout.String()) assert.Equal(t, "", stdout.String())
// assert directory was deleted // Assert directory was deleted
_, err = f.Stat(ctx, "avacado") _, err = f.Stat(context.Background(), "a")
assert.ErrorIs(t, err, fs.ErrNotExist) assert.ErrorIs(t, err, fs.ErrNotExist)
})
}
} }

View File

@ -5,10 +5,13 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
"net/http"
"os" "os"
"path"
"path/filepath" "path/filepath"
"reflect" "reflect"
"strings" "strings"
@ -19,8 +22,10 @@ import (
"github.com/databricks/cli/cmd" "github.com/databricks/cli/cmd"
_ "github.com/databricks/cli/cmd/version" _ "github.com/databricks/cli/cmd/version"
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/files" "github.com/databricks/databricks-sdk-go/service/files"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
@ -452,6 +457,40 @@ func TemporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string {
return path return path
} }
// Create a new UC volume in a catalog called "main" in the workspace.
func temporaryUcVolume(t *testing.T, w *databricks.WorkspaceClient) string {
ctx := context.Background()
// Create a schema
schema, err := w.Schemas.Create(ctx, catalog.CreateSchema{
CatalogName: "main",
Name: RandomName("test-schema-"),
})
require.NoError(t, err)
t.Cleanup(func() {
w.Schemas.Delete(ctx, catalog.DeleteSchemaRequest{
FullName: schema.FullName,
})
})
// Create a volume
volume, err := w.Volumes.Create(ctx, catalog.CreateVolumeRequestContent{
CatalogName: "main",
SchemaName: schema.Name,
Name: "my-volume",
VolumeType: catalog.VolumeTypeManaged,
})
require.NoError(t, err)
t.Cleanup(func() {
w.Volumes.Delete(ctx, catalog.DeleteVolumeRequest{
Name: volume.FullName,
})
})
return path.Join("/Volumes", "main", schema.Name, volume.Name)
}
func TemporaryRepo(t *testing.T, w *databricks.WorkspaceClient) string { func TemporaryRepo(t *testing.T, w *databricks.WorkspaceClient) string {
ctx := context.Background() ctx := context.Background()
me, err := w.CurrentUser.Me(ctx) me, err := w.CurrentUser.Me(ctx)
@ -489,3 +528,62 @@ func GetNodeTypeId(env string) string {
} }
return "Standard_DS4_v2" return "Standard_DS4_v2"
} }
func setupLocalFiler(t *testing.T) (filer.Filer, string) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
tmp := t.TempDir()
f, err := filer.NewLocalClient(tmp)
require.NoError(t, err)
return f, path.Join(filepath.ToSlash(tmp))
}
func setupWsfsFiler(t *testing.T) (filer.Filer, string) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := TemporaryWorkspaceDir(t, w)
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
require.NoError(t, err)
// Check if we can use this API here, skip test if we cannot.
_, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled")
var aerr *apierr.APIError
if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest {
t.Skip(aerr.Message)
}
return f, tmpdir
}
func setupDbfsFiler(t *testing.T) (filer.Filer, string) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
tmpDir := TemporaryDbfsDir(t, w)
f, err := filer.NewDbfsClient(w, tmpDir)
require.NoError(t, err)
return f, path.Join("dbfs:/", tmpDir)
}
func setupUcVolumesFiler(t *testing.T) (filer.Filer, string) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
if os.Getenv("TEST_METASTORE_ID") == "" {
t.Skip("Skipping tests that require a UC Volume when metastore id is not set.")
}
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
tmpDir := temporaryUcVolume(t, w)
f, err := filer.NewFilesClient(w, tmpDir)
require.NoError(t, err)
return f, path.Join("dbfs:/", tmpDir)
}

View File

@ -11,18 +11,30 @@ import (
"net/url" "net/url"
"path" "path"
"slices" "slices"
"sort"
"strings" "strings"
"time" "time"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/listing"
"github.com/databricks/databricks-sdk-go/service/files"
"golang.org/x/sync/errgroup"
) )
// As of 19th Feb 2024, the Files API backend has a rate limit of 10 concurrent
// requests and 100 QPS. We limit the number of concurrent requests to 5 to
// avoid hitting the rate limit.
const maxFilesRequestsInFlight = 5
// Type that implements fs.FileInfo for the Files API. // Type that implements fs.FileInfo for the Files API.
// This is required for the filer.Stat() method.
type filesApiFileInfo struct { type filesApiFileInfo struct {
absPath string absPath string
isDir bool isDir bool
fileSize int64
lastModified int64
} }
func (info filesApiFileInfo) Name() string { func (info filesApiFileInfo) Name() string {
@ -30,8 +42,7 @@ func (info filesApiFileInfo) Name() string {
} }
func (info filesApiFileInfo) Size() int64 { func (info filesApiFileInfo) Size() int64 {
// No way to get the file size in the Files API. return info.fileSize
return 0
} }
func (info filesApiFileInfo) Mode() fs.FileMode { func (info filesApiFileInfo) Mode() fs.FileMode {
@ -43,7 +54,7 @@ func (info filesApiFileInfo) Mode() fs.FileMode {
} }
func (info filesApiFileInfo) ModTime() time.Time { func (info filesApiFileInfo) ModTime() time.Time {
return time.Time{} return time.UnixMilli(info.lastModified)
} }
func (info filesApiFileInfo) IsDir() bool { func (info filesApiFileInfo) IsDir() bool {
@ -54,6 +65,28 @@ func (info filesApiFileInfo) Sys() any {
return nil return nil
} }
// Type that implements fs.DirEntry for the Files API.
// This is required for the filer.ReadDir() method.
type filesApiDirEntry struct {
i filesApiFileInfo
}
func (e filesApiDirEntry) Name() string {
return e.i.Name()
}
func (e filesApiDirEntry) IsDir() bool {
return e.i.IsDir()
}
func (e filesApiDirEntry) Type() fs.FileMode {
return e.i.Mode()
}
func (e filesApiDirEntry) Info() (fs.FileInfo, error) {
return e.i, nil
}
// FilesClient implements the [Filer] interface for the Files API backend. // FilesClient implements the [Filer] interface for the Files API backend.
type FilesClient struct { type FilesClient struct {
workspaceClient *databricks.WorkspaceClient workspaceClient *databricks.WorkspaceClient
@ -63,10 +96,6 @@ type FilesClient struct {
root WorkspaceRootPath root WorkspaceRootPath
} }
func filesNotImplementedError(fn string) error {
return fmt.Errorf("filer.%s is not implemented for the Files API", fn)
}
func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) { func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
apiClient, err := client.New(w.Config) apiClient, err := client.New(w.Config)
if err != nil { if err != nil {
@ -102,6 +131,24 @@ func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader,
return err return err
} }
// Check that target path exists if CreateParentDirectories mode is not set
if !slices.Contains(mode, CreateParentDirectories) {
err := w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, path.Dir(absPath))
if err != nil {
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return err
}
// This API returns a 404 if the file doesn't exist.
if aerr.StatusCode == http.StatusNotFound {
return NoSuchDirectoryError{path.Dir(absPath)}
}
return err
}
}
overwrite := slices.Contains(mode, OverwriteIfExists) overwrite := slices.Contains(mode, OverwriteIfExists)
urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite) urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite)
headers := map[string]string{"Content-Type": "application/octet-stream"} headers := map[string]string{"Content-Type": "application/octet-stream"}
@ -119,7 +166,7 @@ func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader,
} }
// This API returns 409 if the file already exists, when the object type is file // This API returns 409 if the file already exists, when the object type is file
if aerr.StatusCode == http.StatusConflict { if aerr.StatusCode == http.StatusConflict && aerr.ErrorCode == "ALREADY_EXISTS" {
return FileAlreadyExistsError{absPath} return FileAlreadyExistsError{absPath}
} }
@ -148,14 +195,20 @@ func (w *FilesClient) Read(ctx context.Context, name string) (io.ReadCloser, err
// This API returns a 404 if the specified path does not exist. // This API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound { if aerr.StatusCode == http.StatusNotFound {
// Check if the path is a directory. If so, return not a file error.
if _, err := w.statDir(ctx, name); err == nil {
return nil, NotAFile{absPath}
}
// No file or directory exists at the specified path. Return no such file error.
return nil, FileDoesNotExistError{absPath} return nil, FileDoesNotExistError{absPath}
} }
return nil, err return nil, err
} }
func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { func (w *FilesClient) deleteFile(ctx context.Context, name string) error {
absPath, urlPath, err := w.urlPath(name) absPath, err := w.root.Join(name)
if err != nil { if err != nil {
return err return err
} }
@ -165,53 +218,232 @@ func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMod
return CannotDeleteRootError{} return CannotDeleteRootError{}
} }
err = w.apiClient.Do(ctx, http.MethodDelete, urlPath, nil, nil, nil) err = w.workspaceClient.Files.DeleteByFilePath(ctx, absPath)
// Return early on success. // Return early on success.
if err == nil { if err == nil {
return nil return nil
} }
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError var aerr *apierr.APIError
// Special handling of this error only if it is an API error.
if !errors.As(err, &aerr) { if !errors.As(err, &aerr) {
return err return err
} }
// This API returns a 404 if the specified path does not exist. // This files delete API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound { if aerr.StatusCode == http.StatusNotFound {
return FileDoesNotExistError{absPath} return FileDoesNotExistError{absPath}
} }
// This API returns 409 if the underlying path is a directory. return err
if aerr.StatusCode == http.StatusConflict { }
func (w *FilesClient) deleteDirectory(ctx context.Context, name string) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}
// Illegal to delete the root path.
if absPath == w.root.rootPath {
return CannotDeleteRootError{}
}
err = w.workspaceClient.Files.DeleteDirectoryByDirectoryPath(ctx, absPath)
var aerr *apierr.APIError
// Special handling of this error only if it is an API error.
if !errors.As(err, &aerr) {
return err
}
// The directory delete API returns a 400 if the directory is not empty
if aerr.StatusCode == http.StatusBadRequest {
reasons := []string{}
for _, detail := range aerr.Details {
reasons = append(reasons, detail.Reason)
}
// Error code 400 is generic and can be returned for other reasons. Make
// sure one of the reasons for the error is that the directory is not empty.
if !slices.Contains(reasons, "FILES_API_DIRECTORY_IS_NOT_EMPTY") {
return err
}
return DirectoryNotEmptyError{absPath} return DirectoryNotEmptyError{absPath}
} }
return err return err
} }
func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { func (w *FilesClient) recursiveDelete(ctx context.Context, name string) error {
return nil, filesNotImplementedError("ReadDir") filerFS := NewFS(ctx, w)
} dirsToDelete := make([]string, 0)
filesToDelete := make([]string, 0)
callback := func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
func (w *FilesClient) Mkdir(ctx context.Context, name string) error { // Files API does not allowing deleting non-empty directories. We instead
// Directories are created implicitly. // collect the directories to delete and delete them once all the files have
// No need to do anything. // been deleted.
if d.IsDir() {
dirsToDelete = append(dirsToDelete, path)
return nil
}
filesToDelete = append(filesToDelete, path)
return nil
}
// Walk the directory and accumulate the files and directories to delete.
err := fs.WalkDir(filerFS, name, callback)
if err != nil {
return err
}
// Delete the files in parallel.
group, groupCtx := errgroup.WithContext(ctx)
group.SetLimit(maxFilesRequestsInFlight)
for _, file := range filesToDelete {
file := file
// Skip the file if the context has already been cancelled.
select {
case <-groupCtx.Done():
continue
default:
// Proceed.
}
group.Go(func() error {
return w.deleteFile(groupCtx, file)
})
}
// Wait for the files to be deleted and return the first non-nil error.
err = group.Wait()
if err != nil {
return err
}
// Delete the directories in reverse order to ensure that the parent
// directories are deleted after the children. This is possible because
// fs.WalkDir walks the directories in lexicographical order.
for i := len(dirsToDelete) - 1; i >= 0; i-- {
err := w.deleteDirectory(ctx, dirsToDelete[i])
if err != nil {
return err
}
}
return nil return nil
} }
func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
absPath, urlPath, err := w.urlPath(name) if slices.Contains(mode, DeleteRecursively) {
return w.recursiveDelete(ctx, name)
}
// Issue a stat call to determine if the path is a file or directory.
info, err := w.Stat(ctx, name)
if err != nil {
return err
}
// Issue the delete call for a directory
if info.IsDir() {
return w.deleteDirectory(ctx, name)
}
return w.deleteFile(ctx, name)
}
func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
absPath, err := w.root.Join(name)
if err != nil { if err != nil {
return nil, err return nil, err
} }
err = w.apiClient.Do(ctx, http.MethodHead, urlPath, nil, nil, nil) iter := w.workspaceClient.Files.ListDirectoryContents(ctx, files.ListDirectoryContentsRequest{
DirectoryPath: absPath,
})
files, err := listing.ToSlice(ctx, iter)
// Return early on success.
if err == nil {
entries := make([]fs.DirEntry, len(files))
for i, file := range files {
entries[i] = filesApiDirEntry{
i: filesApiFileInfo{
absPath: file.Path,
isDir: file.IsDirectory,
fileSize: file.FileSize,
lastModified: file.LastModified,
},
}
}
// Sort by name for parity with os.ReadDir.
sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() })
return entries, nil
}
// Special handling of this error only if it is an API error.
var apierr *apierr.APIError
if !errors.As(err, &apierr) {
return nil, err
}
// This API returns a 404 if the specified path does not exist.
if apierr.StatusCode == http.StatusNotFound {
// Check if the path is a file. If so, return not a directory error.
if _, err := w.statFile(ctx, name); err == nil {
return nil, NotADirectory{absPath}
}
// No file or directory exists at the specified path. Return no such directory error.
return nil, NoSuchDirectoryError{absPath}
}
return nil, err
}
func (w *FilesClient) Mkdir(ctx context.Context, name string) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
}
err = w.workspaceClient.Files.CreateDirectory(ctx, files.CreateDirectoryRequest{
DirectoryPath: absPath,
})
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if errors.As(err, &aerr) && aerr.StatusCode == http.StatusConflict {
return FileAlreadyExistsError{absPath}
}
return err
}
// Get file metadata for a file using the Files API.
func (w *FilesClient) statFile(ctx context.Context, name string) (fs.FileInfo, error) {
absPath, err := w.root.Join(name)
if err != nil {
return nil, err
}
fileInfo, err := w.workspaceClient.Files.GetMetadataByFilePath(ctx, absPath)
// If the HEAD requests succeeds, the file exists. // If the HEAD requests succeeds, the file exists.
if err == nil { if err == nil {
return filesApiFileInfo{absPath: absPath, isDir: false}, nil return filesApiFileInfo{
absPath: absPath,
isDir: false,
fileSize: fileInfo.ContentLength,
}, nil
} }
// Special handling of this error only if it is an API error. // Special handling of this error only if it is an API error.
@ -225,10 +457,51 @@ func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error
return nil, FileDoesNotExistError{absPath} return nil, FileDoesNotExistError{absPath}
} }
// This API returns 409 if the underlying path is a directory. return nil, err
if aerr.StatusCode == http.StatusConflict { }
// Get file metadata for a directory using the Files API.
func (w *FilesClient) statDir(ctx context.Context, name string) (fs.FileInfo, error) {
absPath, err := w.root.Join(name)
if err != nil {
return nil, err
}
err = w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, absPath)
// If the HEAD requests succeeds, the directory exists.
if err == nil {
return filesApiFileInfo{absPath: absPath, isDir: true}, nil return filesApiFileInfo{absPath: absPath, isDir: true}, nil
} }
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}
// The directory metadata API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
return nil, NoSuchDirectoryError{absPath}
}
return nil, err return nil, err
} }
func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
// Assume that the path is a directory and issue a stat call.
dirInfo, err := w.statDir(ctx, name)
// If the file exists, return early.
if err == nil {
return dirInfo, nil
}
// Return early if the error is not a NoSuchDirectoryError.
if !errors.As(err, &NoSuchDirectoryError{}) {
return nil, err
}
// Since the path is not a directory, assume that it is a file and issue a stat call.
return w.statFile(ctx, name)
}