From 5ba0aaa5c5ca9074cadcec61370abd76714b0ecf Mon Sep 17 00:00:00 2001 From: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com> Date: Tue, 20 Feb 2024 21:44:37 +0530 Subject: [PATCH] Add support for UC Volumes to the `databricks fs` commands (#1209) ## Changes ``` shreyas.goenka@THW32HFW6T cli % databricks fs -h Commands to do file system operations on DBFS and UC Volumes. Usage: databricks fs [command] Available Commands: cat Show file content. cp Copy files and directories. ls Lists files. mkdir Make directories. rm Remove files and directories. ``` This PR adds support for UC Volumes to the fs commands. The fs commands for UC volumes work the same as they currently do for DBFS. This is ensured by running the same test matrix we across both DBFS and UC Volumes versions of the fs commands. ## Tests Support for UC volumes is tested by running the same tests as we did originally for DBFS commands. The tests require a `main` catalog to exist in the workspace, which does in our test workspaces environments which have the `TEST_METASTORE_ID` environment variable set. For the Files API filer, we do the same by running mostly common tests to ensure the filers for "local", "wsfs", "dbfs" and "files API" are consistent. The tests are also made to all run in parallel to reduce the time taken. To ensure the separation of the tests, each test creates its own UC schema (for UC volumes tests) or DBFS directories (for DBFS tests). --- cmd/fs/cat.go | 4 +- cmd/fs/cp.go | 9 +- cmd/fs/fs.go | 2 +- cmd/fs/ls.go | 4 +- cmd/fs/mkdir.go | 4 +- cmd/fs/rm.go | 4 +- internal/filer_test.go | 322 +++++++++++++-------------------- internal/fs_cat_test.go | 68 ++++--- internal/fs_cp_test.go | 359 ++++++++++++++++++++++--------------- internal/fs_ls_test.go | 236 +++++++++++++----------- internal/fs_mkdir_test.go | 165 +++++++++-------- internal/fs_rm_test.go | 203 +++++++++++---------- internal/helpers.go | 98 ++++++++++ libs/filer/files_client.go | 331 +++++++++++++++++++++++++++++++--- 14 files changed, 1138 insertions(+), 671 deletions(-) diff --git a/cmd/fs/cat.go b/cmd/fs/cat.go index 8227cd78..be186653 100644 --- a/cmd/fs/cat.go +++ b/cmd/fs/cat.go @@ -9,8 +9,8 @@ import ( func newCatCommand() *cobra.Command { cmd := &cobra.Command{ Use: "cat FILE_PATH", - Short: "Show file content", - Long: `Show the contents of a file.`, + Short: "Show file content.", + Long: `Show the contents of a file in DBFS or a UC Volume.`, Args: cobra.ExactArgs(1), PreRunE: root.MustWorkspaceClient, } diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index 97fceb93..f0f480fe 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -129,10 +129,10 @@ func (c *copy) emitFileCopiedEvent(sourcePath, targetPath string) error { func newCpCommand() *cobra.Command { cmd := &cobra.Command{ Use: "cp SOURCE_PATH TARGET_PATH", - Short: "Copy files and directories to and from DBFS.", - Long: `Copy files to and from DBFS. + Short: "Copy files and directories.", + Long: `Copy files and directories to and from any paths on DBFS, UC Volumes or your local filesystem. - For paths in DBFS it is required that you specify the "dbfs" scheme. + For paths in DBFS and UC Volumes, it is required that you specify the "dbfs" scheme. For example: dbfs:/foo/bar. Recursively copying a directory will copy all files inside directory @@ -152,9 +152,6 @@ func newCpCommand() *cobra.Command { cmd.RunE = func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() - // TODO: Error if a user uses '\' as path separator on windows when "file" - // scheme is specified (https://github.com/databricks/cli/issues/485) - // Get source filer and source path without scheme fullSourcePath := args[0] sourceFiler, sourcePath, err := filerForPath(ctx, fullSourcePath) diff --git a/cmd/fs/fs.go b/cmd/fs/fs.go index 01d8a745..1f36696a 100644 --- a/cmd/fs/fs.go +++ b/cmd/fs/fs.go @@ -8,7 +8,7 @@ func New() *cobra.Command { cmd := &cobra.Command{ Use: "fs", Short: "Filesystem related commands", - Long: `Commands to do DBFS operations.`, + Long: `Commands to do file system operations on DBFS and UC Volumes.`, GroupID: "workspace", } diff --git a/cmd/fs/ls.go b/cmd/fs/ls.go index 7ae55e1f..be52b928 100644 --- a/cmd/fs/ls.go +++ b/cmd/fs/ls.go @@ -40,8 +40,8 @@ func toJsonDirEntry(f fs.DirEntry, baseDir string, isAbsolute bool) (*jsonDirEnt func newLsCommand() *cobra.Command { cmd := &cobra.Command{ Use: "ls DIR_PATH", - Short: "Lists files", - Long: `Lists files`, + Short: "Lists files.", + Long: `Lists files in DBFS and UC Volumes.`, Args: cobra.ExactArgs(1), PreRunE: root.MustWorkspaceClient, } diff --git a/cmd/fs/mkdir.go b/cmd/fs/mkdir.go index c6a5e607..dc054d8a 100644 --- a/cmd/fs/mkdir.go +++ b/cmd/fs/mkdir.go @@ -11,8 +11,8 @@ func newMkdirCommand() *cobra.Command { // Alias `mkdirs` for this command exists for legacy purposes. This command // is called databricks fs mkdirs in our legacy CLI: https://github.com/databricks/databricks-cli Aliases: []string{"mkdirs"}, - Short: "Make directories", - Long: `Mkdir will create directories along the path to the argument directory.`, + Short: "Make directories.", + Long: `Make directories in DBFS and UC Volumes. Mkdir will create directories along the path to the argument directory.`, Args: cobra.ExactArgs(1), PreRunE: root.MustWorkspaceClient, } diff --git a/cmd/fs/rm.go b/cmd/fs/rm.go index 3ce8d3b9..8a7b6571 100644 --- a/cmd/fs/rm.go +++ b/cmd/fs/rm.go @@ -9,8 +9,8 @@ import ( func newRmCommand() *cobra.Command { cmd := &cobra.Command{ Use: "rm PATH", - Short: "Remove files and directories from dbfs.", - Long: `Remove files and directories from dbfs.`, + Short: "Remove files and directories.", + Long: `Remove files and directories from DBFS and UC Volumes.`, Args: cobra.ExactArgs(1), PreRunE: root.MustWorkspaceClient, } diff --git a/internal/filer_test.go b/internal/filer_test.go index b1af6886..d333a1b7 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -6,14 +6,11 @@ import ( "errors" "io" "io/fs" - "net/http" "regexp" "strings" "testing" "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -40,15 +37,87 @@ func (f filerTest) assertContents(ctx context.Context, name string, contents str assert.Equal(f, contents, body.String()) } -func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { +func commonFilerRecursiveDeleteTest(t *testing.T, ctx context.Context, f filer.Filer) { var err error - // Write should fail because the root path doesn't yet exist. + err = f.Write(ctx, "dir/file1", strings.NewReader("content1"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/file1", `content1`) + + err = f.Write(ctx, "dir/file2", strings.NewReader("content2"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/file2", `content2`) + + err = f.Write(ctx, "dir/subdir1/file3", strings.NewReader("content3"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/subdir1/file3", `content3`) + + err = f.Write(ctx, "dir/subdir1/file4", strings.NewReader("content4"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/subdir1/file4", `content4`) + + err = f.Write(ctx, "dir/subdir2/file5", strings.NewReader("content5"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/subdir2/file5", `content5`) + + err = f.Write(ctx, "dir/subdir2/file6", strings.NewReader("content6"), filer.CreateParentDirectories) + require.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "dir/subdir2/file6", `content6`) + + entriesBeforeDelete, err := f.ReadDir(ctx, "dir") + require.NoError(t, err) + assert.Len(t, entriesBeforeDelete, 4) + + names := []string{} + for _, e := range entriesBeforeDelete { + names = append(names, e.Name()) + } + assert.Equal(t, names, []string{"file1", "file2", "subdir1", "subdir2"}) + + err = f.Delete(ctx, "dir") + assert.ErrorAs(t, err, &filer.DirectoryNotEmptyError{}) + + err = f.Delete(ctx, "dir", filer.DeleteRecursively) + assert.NoError(t, err) + _, err = f.ReadDir(ctx, "dir") + assert.ErrorAs(t, err, &filer.NoSuchDirectoryError{}) +} + +func TestAccFilerRecursiveDelete(t *testing.T) { + t.Parallel() + + for _, testCase := range []struct { + name string + f func(t *testing.T) (filer.Filer, string) + }{ + {"local", setupLocalFiler}, + {"workspace files", setupWsfsFiler}, + {"dbfs", setupDbfsFiler}, + {"files", setupUcVolumesFiler}, + } { + tc := testCase + + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + f, _ := tc.f(t) + ctx := context.Background() + + // Common tests we run across all filers to ensure consistent behavior. + commonFilerRecursiveDeleteTest(t, ctx, f) + }) + } +} + +// Common tests we run across all filers to ensure consistent behavior. +func commonFilerReadWriteTests(t *testing.T, ctx context.Context, f filer.Filer) { + var err error + + // Write should fail because the intermediate directory doesn't exist. err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) assert.True(t, errors.Is(err, fs.ErrNotExist)) - // Read should fail because the root path doesn't yet exist. + // Read should fail because the intermediate directory doesn't yet exist. _, err = f.Read(ctx, "/foo/bar") assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) assert.True(t, errors.Is(err, fs.ErrNotExist)) @@ -96,12 +165,12 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { // Delete should fail if the file doesn't exist. err = f.Delete(ctx, "/doesnt_exist") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) + assert.ErrorAs(t, err, &filer.FileDoesNotExistError{}) assert.True(t, errors.Is(err, fs.ErrNotExist)) // Stat should fail if the file doesn't exist. _, err = f.Stat(ctx, "/doesnt_exist") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) + assert.ErrorAs(t, err, &filer.FileDoesNotExistError{}) assert.True(t, errors.Is(err, fs.ErrNotExist)) // Delete should succeed for file that does exist. @@ -110,7 +179,7 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { // Delete should fail for a non-empty directory. err = f.Delete(ctx, "/foo") - assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{})) + assert.ErrorAs(t, err, &filer.DirectoryNotEmptyError{}) assert.True(t, errors.Is(err, fs.ErrInvalid)) // Delete should succeed for a non-empty directory if the DeleteRecursively flag is set. @@ -124,7 +193,33 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.True(t, errors.Is(err, fs.ErrInvalid)) } -func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { +func TestAccFilerReadWrite(t *testing.T) { + t.Parallel() + + for _, testCase := range []struct { + name string + f func(t *testing.T) (filer.Filer, string) + }{ + {"local", setupLocalFiler}, + {"workspace files", setupWsfsFiler}, + {"dbfs", setupDbfsFiler}, + {"files", setupUcVolumesFiler}, + } { + tc := testCase + + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + f, _ := tc.f(t) + ctx := context.Background() + + // Common tests we run across all filers to ensure consistent behavior. + commonFilerReadWriteTests(t, ctx, f) + }) + } +} + +// Common tests we run across all filers to ensure consistent behavior. +func commonFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { var err error var info fs.FileInfo @@ -206,54 +301,28 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.False(t, entries[0].IsDir()) } -func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFilerReadDir(t *testing.T) { + t.Parallel() - ctx := context.Background() - w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := TemporaryWorkspaceDir(t, w) - f, err := filer.NewWorkspaceFilesClient(w, tmpdir) - require.NoError(t, err) + for _, testCase := range []struct { + name string + f func(t *testing.T) (filer.Filer, string) + }{ + {"local", setupLocalFiler}, + {"workspace files", setupWsfsFiler}, + {"dbfs", setupDbfsFiler}, + {"files", setupUcVolumesFiler}, + } { + tc := testCase - // Check if we can use this API here, skip test if we cannot. - _, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled") - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest { - t.Skip(aerr.Message) + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + f, _ := tc.f(t) + ctx := context.Background() + + commonFilerReadDirTest(t, ctx, f) + }) } - - return ctx, f -} - -func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) { - ctx, f := setupWorkspaceFilesTest(t) - runFilerReadWriteTest(t, ctx, f) -} - -func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { - ctx, f := setupWorkspaceFilesTest(t) - runFilerReadDirTest(t, ctx, f) -} - -func setupFilerDbfsTest(t *testing.T) (context.Context, filer.Filer) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := TemporaryDbfsDir(t, w) - f, err := filer.NewDbfsClient(w, tmpdir) - require.NoError(t, err) - return ctx, f -} - -func TestAccFilerDbfsReadWrite(t *testing.T) { - ctx, f := setupFilerDbfsTest(t) - runFilerReadWriteTest(t, ctx, f) -} - -func TestAccFilerDbfsReadDir(t *testing.T) { - ctx, f := setupFilerDbfsTest(t) - runFilerReadDirTest(t, ctx, f) } var jupyterNotebookContent1 = ` @@ -305,7 +374,8 @@ var jupyterNotebookContent2 = ` ` func TestAccFilerWorkspaceNotebookConflict(t *testing.T) { - ctx, f := setupWorkspaceFilesTest(t) + f, _ := setupWsfsFiler(t) + ctx := context.Background() var err error // Upload the notebooks @@ -350,7 +420,8 @@ func TestAccFilerWorkspaceNotebookConflict(t *testing.T) { } func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) { - ctx, f := setupWorkspaceFilesTest(t) + f, _ := setupWsfsFiler(t) + ctx := context.Background() var err error // Upload notebooks @@ -391,140 +462,3 @@ func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) { filerTest{t, f}.assertContents(ctx, "scalaNb", "// Databricks notebook source\n println(\"second upload\"))") filerTest{t, f}.assertContents(ctx, "jupyterNb", "# Databricks notebook source\nprint(\"Jupyter Notebook Version 2\")") } - -func setupFilerLocalTest(t *testing.T) (context.Context, filer.Filer) { - ctx := context.Background() - f, err := filer.NewLocalClient(t.TempDir()) - require.NoError(t, err) - return ctx, f -} - -func TestAccFilerLocalReadWrite(t *testing.T) { - ctx, f := setupFilerLocalTest(t) - runFilerReadWriteTest(t, ctx, f) -} - -func TestAccFilerLocalReadDir(t *testing.T) { - ctx, f := setupFilerLocalTest(t) - runFilerReadDirTest(t, ctx, f) -} - -func temporaryVolumeDir(t *testing.T, w *databricks.WorkspaceClient) string { - // Assume this test is run against the internal testing workspace. - path := RandomName("/Volumes/bogdanghita/default/v3_shared/cli-testing/integration-test-filer-") - - // The Files API doesn't include support for creating and removing directories yet. - // Directories are created implicitly by writing a file to a path that doesn't exist. - // We therefore assume we can use the specified path without creating it first. - t.Logf("using dbfs:%s", path) - - return path -} - -func setupFilerFilesApiTest(t *testing.T) (context.Context, filer.Filer) { - t.SkipNow() // until available on prod - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := temporaryVolumeDir(t, w) - f, err := filer.NewFilesClient(w, tmpdir) - require.NoError(t, err) - return ctx, f -} - -func TestAccFilerFilesApiReadWrite(t *testing.T) { - ctx, f := setupFilerFilesApiTest(t) - - // The Files API doesn't know about directories yet. - // Below is a copy of [runFilerReadWriteTest] with - // assertions that don't work commented out. - - var err error - - // Write should fail because the root path doesn't yet exist. - // err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) - // assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) - // assert.True(t, errors.Is(err, fs.ErrNotExist)) - - // Read should fail because the root path doesn't yet exist. - _, err = f.Read(ctx, "/foo/bar") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) - assert.True(t, errors.Is(err, fs.ErrNotExist)) - - // Read should fail because the path points to a directory - // err = f.Mkdir(ctx, "/dir") - // require.NoError(t, err) - // _, err = f.Read(ctx, "/dir") - // assert.ErrorIs(t, err, fs.ErrInvalid) - - // Write with CreateParentDirectories flag should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) - assert.NoError(t, err) - filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`) - - // Write should fail because there is an existing file at the specified path. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`)) - assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{})) - assert.True(t, errors.Is(err, fs.ErrExist)) - - // Write with OverwriteIfExists should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists) - assert.NoError(t, err) - filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`) - - // Write should succeed if there is no existing file at the specified path. - err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`)) - assert.NoError(t, err) - - // Stat on a directory should succeed. - // Note: size and modification time behave differently between backends. - info, err := f.Stat(ctx, "/foo") - require.NoError(t, err) - assert.Equal(t, "foo", info.Name()) - assert.True(t, info.Mode().IsDir()) - assert.Equal(t, true, info.IsDir()) - - // Stat on a file should succeed. - // Note: size and modification time behave differently between backends. - info, err = f.Stat(ctx, "/foo/bar") - require.NoError(t, err) - assert.Equal(t, "bar", info.Name()) - assert.True(t, info.Mode().IsRegular()) - assert.Equal(t, false, info.IsDir()) - - // Delete should fail if the file doesn't exist. - err = f.Delete(ctx, "/doesnt_exist") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) - assert.True(t, errors.Is(err, fs.ErrNotExist)) - - // Stat should fail if the file doesn't exist. - _, err = f.Stat(ctx, "/doesnt_exist") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) - assert.True(t, errors.Is(err, fs.ErrNotExist)) - - // Delete should succeed for file that does exist. - err = f.Delete(ctx, "/foo/bar") - assert.NoError(t, err) - - // Delete should fail for a non-empty directory. - err = f.Delete(ctx, "/foo") - assert.True(t, errors.As(err, &filer.DirectoryNotEmptyError{})) - assert.True(t, errors.Is(err, fs.ErrInvalid)) - - // Delete should succeed for a non-empty directory if the DeleteRecursively flag is set. - // err = f.Delete(ctx, "/foo", filer.DeleteRecursively) - // assert.NoError(t, err) - - // Delete of the filer root should ALWAYS fail, otherwise subsequent writes would fail. - // It is not in the filer's purview to delete its root directory. - err = f.Delete(ctx, "/") - assert.True(t, errors.As(err, &filer.CannotDeleteRootError{})) - assert.True(t, errors.Is(err, fs.ErrInvalid)) -} - -func TestAccFilerFilesApiReadDir(t *testing.T) { - t.Skipf("no support for ReadDir yet") - ctx, f := setupFilerFilesApiTest(t) - runFilerReadDirTest(t, ctx, f) -} diff --git a/internal/fs_cat_test.go b/internal/fs_cat_test.go index 2c979ea7..6292aef1 100644 --- a/internal/fs_cat_test.go +++ b/internal/fs_cat_test.go @@ -13,31 +13,60 @@ import ( "github.com/stretchr/testify/require" ) -func TestAccFsCatForDbfs(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsCat(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + f, tmpDir := tc.setupFiler(t) + err := f.Write(context.Background(), "hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories) + require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) - - stdout, stderr := RequireSuccessfulRun(t, "fs", "cat", "dbfs:"+path.Join(tmpDir, "a", "hello.txt")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "abc", stdout.String()) + stdout, stderr := RequireSuccessfulRun(t, "fs", "cat", path.Join(tmpDir, "hello.txt")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "abcd", stdout.String()) + }) + } } -func TestAccFsCatForDbfsOnNonExistentFile(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsCatOnADir(t *testing.T) { + t.Parallel() - _, _, err := RequireErrorRun(t, "fs", "cat", "dbfs:/non-existent-file") - assert.ErrorIs(t, err, fs.ErrNotExist) + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + f, tmpDir := tc.setupFiler(t) + err := f.Mkdir(context.Background(), "dir1") + require.NoError(t, err) + + _, _, err = RequireErrorRun(t, "fs", "cat", path.Join(tmpDir, "dir1")) + assert.ErrorAs(t, err, &filer.NotAFile{}) + }) + } +} + +func TestAccFsCatOnNonExistentFile(t *testing.T) { + t.Parallel() + + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + _, tmpDir := tc.setupFiler(t) + + _, _, err := RequireErrorRun(t, "fs", "cat", path.Join(tmpDir, "non-existent-file")) + assert.ErrorIs(t, err, fs.ErrNotExist) + }) + } } func TestAccFsCatForDbfsInvalidScheme(t *testing.T) { @@ -65,6 +94,3 @@ func TestAccFsCatDoesNotSupportOutputModeJson(t *testing.T) { _, _, err = RequireErrorRun(t, "fs", "cat", "dbfs:"+path.Join(tmpDir, "hello.txt"), "--output=json") assert.ErrorContains(t, err, "json output not supported") } - -// TODO: Add test asserting an error when cat is called on an directory. Need this to be -// fixed in the SDK first (https://github.com/databricks/databricks-sdk-go/issues/414) diff --git a/internal/fs_cp_test.go b/internal/fs_cp_test.go index ab177a36..b69735bc 100644 --- a/internal/fs_cp_test.go +++ b/internal/fs_cp_test.go @@ -2,16 +2,15 @@ package internal import ( "context" - "fmt" "io" "path" "path/filepath" + "regexp" "runtime" "strings" "testing" "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -60,84 +59,124 @@ func assertTargetDir(t *testing.T, ctx context.Context, f filer.Filer) { assertFileContent(t, ctx, f, "a/b/c/hello.txt", "hello, world\n") } -func setupLocalFiler(t *testing.T) (filer.Filer, string) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - tmp := t.TempDir() - f, err := filer.NewLocalClient(tmp) - require.NoError(t, err) - - return f, path.Join(filepath.ToSlash(tmp)) -} - -func setupDbfsFiler(t *testing.T) (filer.Filer, string) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) - - tmpDir := TemporaryDbfsDir(t, w) - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) - - return f, path.Join("dbfs:/", tmpDir) -} - type cpTest struct { + name string setupSource func(*testing.T) (filer.Filer, string) setupTarget func(*testing.T) (filer.Filer, string) } -func setupTable() []cpTest { +func copyTests() []cpTest { return []cpTest{ - {setupSource: setupLocalFiler, setupTarget: setupLocalFiler}, - {setupSource: setupLocalFiler, setupTarget: setupDbfsFiler}, - {setupSource: setupDbfsFiler, setupTarget: setupLocalFiler}, - {setupSource: setupDbfsFiler, setupTarget: setupDbfsFiler}, + // source: local file system + { + name: "local to local", + setupSource: setupLocalFiler, + setupTarget: setupLocalFiler, + }, + { + name: "local to dbfs", + setupSource: setupLocalFiler, + setupTarget: setupDbfsFiler, + }, + { + name: "local to uc-volumes", + setupSource: setupLocalFiler, + setupTarget: setupUcVolumesFiler, + }, + + // source: dbfs + { + name: "dbfs to local", + setupSource: setupDbfsFiler, + setupTarget: setupLocalFiler, + }, + { + name: "dbfs to dbfs", + setupSource: setupDbfsFiler, + setupTarget: setupDbfsFiler, + }, + { + name: "dbfs to uc-volumes", + setupSource: setupDbfsFiler, + setupTarget: setupUcVolumesFiler, + }, + + // source: uc-volumes + { + name: "uc-volumes to local", + setupSource: setupUcVolumesFiler, + setupTarget: setupLocalFiler, + }, + { + name: "uc-volumes to dbfs", + setupSource: setupUcVolumesFiler, + setupTarget: setupDbfsFiler, + }, + { + name: "uc-volumes to uc-volumes", + setupSource: setupUcVolumesFiler, + setupTarget: setupUcVolumesFiler, + }, } } func TestAccFsCpDir(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - RequireSuccessfulRun(t, "fs", "cp", "-r", sourceDir, targetDir) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - assertTargetDir(t, ctx, targetFiler) + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) + + RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive") + + assertTargetDir(t, context.Background(), targetFiler) + }) } } func TestAccFsCpFileToFile(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceFile(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), path.Join(targetDir, "bar.txt")) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - assertTargetFile(t, ctx, targetFiler, "bar.txt") + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceFile(t, context.Background(), sourceFiler) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), path.Join(targetDir, "bar.txt")) + + assertTargetFile(t, context.Background(), targetFiler, "bar.txt") + }) } } func TestAccFsCpFileToDir(t *testing.T) { - ctx := context.Background() - table := setupTable() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceFile(t, ctx, sourceFiler) + t.Parallel() - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), targetDir) + for _, testCase := range copyTests() { + tc := testCase - assertTargetFile(t, ctx, targetFiler, "foo.txt") + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceFile(t, context.Background(), sourceFiler) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), targetDir) + + assertTargetFile(t, context.Background(), targetFiler, "foo.txt") + }) } } @@ -158,125 +197,161 @@ func TestAccFsCpFileToDirForWindowsPaths(t *testing.T) { } func TestAccFsCpDirToDirFileNotOverwritten(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive") - assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "this should not be overwritten") - assertFileContent(t, ctx, targetFiler, "query.sql", "SELECT 1") - assertFileContent(t, ctx, targetFiler, "pyNb.py", "# Databricks notebook source\nprint(123)") + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) + + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive") + assertFileContent(t, context.Background(), targetFiler, "a/b/c/hello.txt", "this should not be overwritten") + assertFileContent(t, context.Background(), targetFiler, "query.sql", "SELECT 1") + assertFileContent(t, context.Background(), targetFiler, "pyNb.py", "# Databricks notebook source\nprint(123)") + }) } } func TestAccFsCpFileToDirFileNotOverwritten(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c")) - assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "this should not be overwritten") + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) + + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c")) + assertFileContent(t, context.Background(), targetFiler, "a/b/c/hello.txt", "this should not be overwritten") + }) } } func TestAccFsCpFileToFileFileNotOverwritten(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/hola.txt"), "--recursive") - assertFileContent(t, ctx, targetFiler, "a/b/c/hola.txt", "this should not be overwritten") + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) + + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/dontoverwrite.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/dontoverwrite.txt")) + assertFileContent(t, context.Background(), targetFiler, "a/b/c/dontoverwrite.txt", "this should not be overwritten") + }) } } func TestAccFsCpDirToDirWithOverwriteFlag(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten"), filer.CreateParentDirectories) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive", "--overwrite") - assertTargetDir(t, ctx, targetFiler) + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) + + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive", "--overwrite") + assertTargetDir(t, context.Background(), targetFiler) + }) } } func TestAccFsCpFileToFileWithOverwriteFlag(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this will be overwritten. Such is life."), filer.CreateParentDirectories) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/hola.txt"), "--overwrite") - assertFileContent(t, ctx, targetFiler, "a/b/c/hola.txt", "hello, world\n") + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) + + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/overwritten.txt", strings.NewReader("this should be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/overwritten.txt"), "--overwrite") + assertFileContent(t, context.Background(), targetFiler, "a/b/c/overwritten.txt", "hello, world\n") + }) } } func TestAccFsCpFileToDirWithOverwriteFlag(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten :') "), filer.CreateParentDirectories) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c"), "--recursive", "--overwrite") - assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "hello, world\n") + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) + + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "a/b/c/hello.txt", strings.NewReader("this should be overwritten"), filer.CreateParentDirectories) + require.NoError(t, err) + + RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c"), "--overwrite") + assertFileContent(t, context.Background(), targetFiler, "a/b/c/hello.txt", "hello, world\n") + }) } } func TestAccFsCpErrorsWhenSourceIsDirWithoutRecursiveFlag(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + t.Parallel() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - _, _, err = RequireErrorRun(t, "fs", "cp", "dbfs:"+tmpDir, "dbfs:/tmp") - assert.Equal(t, fmt.Sprintf("source path %s is a directory. Please specify the --recursive flag", tmpDir), err.Error()) + _, tmpDir := tc.setupFiler(t) + + _, _, err := RequireErrorRun(t, "fs", "cp", path.Join(tmpDir), path.Join(tmpDir, "foobar")) + r := regexp.MustCompile("source path .* is a directory. Please specify the --recursive flag") + assert.Regexp(t, r, err.Error()) + }) + } } func TestAccFsCpErrorsOnInvalidScheme(t *testing.T) { @@ -287,20 +362,24 @@ func TestAccFsCpErrorsOnInvalidScheme(t *testing.T) { } func TestAccFsCpSourceIsDirectoryButTargetIsFile(t *testing.T) { - ctx := context.Background() - table := setupTable() + t.Parallel() - for _, row := range table { - sourceFiler, sourceDir := row.setupSource(t) - targetFiler, targetDir := row.setupTarget(t) - setupSourceDir(t, ctx, sourceFiler) + for _, testCase := range copyTests() { + tc := testCase - // Write a conflicting file to target - err := targetFiler.Write(ctx, "my_target", strings.NewReader("I'll block any attempts to recursively copy"), filer.CreateParentDirectories) - require.NoError(t, err) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - _, _, err = RequireErrorRun(t, "fs", "cp", sourceDir, path.Join(targetDir, "my_target"), "--recursive", "--overwrite") - assert.Error(t, err) + sourceFiler, sourceDir := tc.setupSource(t) + targetFiler, targetDir := tc.setupTarget(t) + setupSourceDir(t, context.Background(), sourceFiler) + + // Write a conflicting file to target + err := targetFiler.Write(context.Background(), "my_target", strings.NewReader("I'll block any attempts to recursively copy"), filer.CreateParentDirectories) + require.NoError(t, err) + + _, _, err = RequireErrorRun(t, "fs", "cp", sourceDir, path.Join(targetDir, "my_target"), "--recursive") + assert.Error(t, err) + }) } - } diff --git a/internal/fs_ls_test.go b/internal/fs_ls_test.go index 9e02b09c..994a4a42 100644 --- a/internal/fs_ls_test.go +++ b/internal/fs_ls_test.go @@ -11,131 +11,163 @@ import ( _ "github.com/databricks/cli/cmd/fs" "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestAccFsLsForDbfs(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) - - tmpDir := TemporaryDbfsDir(t, w) - - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) - - err = f.Mkdir(ctx, "a") - require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) - err = f.Write(ctx, "bye.txt", strings.NewReader("def")) - require.NoError(t, err) - - stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json") - assert.Equal(t, "", stderr.String()) - var parsedStdout []map[string]any - err = json.Unmarshal(stdout.Bytes(), &parsedStdout) - require.NoError(t, err) - - // assert on ls output - assert.Len(t, parsedStdout, 2) - assert.Equal(t, "a", parsedStdout[0]["name"]) - assert.Equal(t, true, parsedStdout[0]["is_directory"]) - assert.Equal(t, float64(0), parsedStdout[0]["size"]) - assert.Equal(t, "bye.txt", parsedStdout[1]["name"]) - assert.Equal(t, false, parsedStdout[1]["is_directory"]) - assert.Equal(t, float64(3), parsedStdout[1]["size"]) +type fsTest struct { + name string + setupFiler func(t *testing.T) (filer.Filer, string) } -func TestAccFsLsForDbfsWithAbsolutePaths(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) - - tmpDir := TemporaryDbfsDir(t, w) - - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) - - err = f.Mkdir(ctx, "a") - require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) - err = f.Write(ctx, "bye.txt", strings.NewReader("def")) - require.NoError(t, err) - - stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json", "--absolute") - assert.Equal(t, "", stderr.String()) - var parsedStdout []map[string]any - err = json.Unmarshal(stdout.Bytes(), &parsedStdout) - require.NoError(t, err) - - // assert on ls output - assert.Len(t, parsedStdout, 2) - assert.Equal(t, path.Join("dbfs:", tmpDir, "a"), parsedStdout[0]["name"]) - assert.Equal(t, true, parsedStdout[0]["is_directory"]) - assert.Equal(t, float64(0), parsedStdout[0]["size"]) - - assert.Equal(t, path.Join("dbfs:", tmpDir, "bye.txt"), parsedStdout[1]["name"]) - assert.Equal(t, false, parsedStdout[1]["is_directory"]) - assert.Equal(t, float64(3), parsedStdout[1]["size"]) +var fsTests = []fsTest{ + { + name: "dbfs", + setupFiler: setupDbfsFiler, + }, + { + name: "uc-volumes", + setupFiler: setupUcVolumesFiler, + }, } -func TestAccFsLsForDbfsOnFile(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() +func setupLsFiles(t *testing.T, f filer.Filer) { + err := f.Write(context.Background(), "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) require.NoError(t, err) - - tmpDir := TemporaryDbfsDir(t, w) - - f, err := filer.NewDbfsClient(w, tmpDir) + err = f.Write(context.Background(), "bye.txt", strings.NewReader("def")) require.NoError(t, err) - - err = f.Mkdir(ctx, "a") - require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) - - _, _, err = RequireErrorRun(t, "fs", "ls", "dbfs:"+path.Join(tmpDir, "a", "hello.txt"), "--output=json") - assert.Regexp(t, regexp.MustCompile("not a directory: .*/a/hello.txt"), err.Error()) } -func TestAccFsLsForDbfsOnEmptyDir(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsLs(t *testing.T) { + t.Parallel() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json") - assert.Equal(t, "", stderr.String()) - var parsedStdout []map[string]any - err = json.Unmarshal(stdout.Bytes(), &parsedStdout) - require.NoError(t, err) + f, tmpDir := tc.setupFiler(t) + setupLsFiles(t, f) - // assert on ls output - assert.Equal(t, 0, len(parsedStdout)) + stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", tmpDir, "--output=json") + assert.Equal(t, "", stderr.String()) + + var parsedStdout []map[string]any + err := json.Unmarshal(stdout.Bytes(), &parsedStdout) + require.NoError(t, err) + + // assert on ls output + assert.Len(t, parsedStdout, 2) + + assert.Equal(t, "a", parsedStdout[0]["name"]) + assert.Equal(t, true, parsedStdout[0]["is_directory"]) + assert.Equal(t, float64(0), parsedStdout[0]["size"]) + + assert.Equal(t, "bye.txt", parsedStdout[1]["name"]) + assert.Equal(t, false, parsedStdout[1]["is_directory"]) + assert.Equal(t, float64(3), parsedStdout[1]["size"]) + }) + } } -func TestAccFsLsForDbfsForNonexistingDir(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsLsWithAbsolutePaths(t *testing.T) { + t.Parallel() - _, _, err := RequireErrorRun(t, "fs", "ls", "dbfs:/john-cena", "--output=json") - assert.ErrorIs(t, err, fs.ErrNotExist) + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + f, tmpDir := tc.setupFiler(t) + setupLsFiles(t, f) + + stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", tmpDir, "--output=json", "--absolute") + assert.Equal(t, "", stderr.String()) + + var parsedStdout []map[string]any + err := json.Unmarshal(stdout.Bytes(), &parsedStdout) + require.NoError(t, err) + + // assert on ls output + assert.Len(t, parsedStdout, 2) + + assert.Equal(t, path.Join(tmpDir, "a"), parsedStdout[0]["name"]) + assert.Equal(t, true, parsedStdout[0]["is_directory"]) + assert.Equal(t, float64(0), parsedStdout[0]["size"]) + + assert.Equal(t, path.Join(tmpDir, "bye.txt"), parsedStdout[1]["name"]) + assert.Equal(t, false, parsedStdout[1]["is_directory"]) + assert.Equal(t, float64(3), parsedStdout[1]["size"]) + }) + } +} + +func TestAccFsLsOnFile(t *testing.T) { + t.Parallel() + + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + f, tmpDir := tc.setupFiler(t) + setupLsFiles(t, f) + + _, _, err := RequireErrorRun(t, "fs", "ls", path.Join(tmpDir, "a", "hello.txt"), "--output=json") + assert.Regexp(t, regexp.MustCompile("not a directory: .*/a/hello.txt"), err.Error()) + assert.ErrorAs(t, err, &filer.NotADirectory{}) + }) + } +} + +func TestAccFsLsOnEmptyDir(t *testing.T) { + t.Parallel() + + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + _, tmpDir := tc.setupFiler(t) + + stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", tmpDir, "--output=json") + assert.Equal(t, "", stderr.String()) + var parsedStdout []map[string]any + err := json.Unmarshal(stdout.Bytes(), &parsedStdout) + require.NoError(t, err) + + // assert on ls output + assert.Equal(t, 0, len(parsedStdout)) + }) + } +} + +func TestAccFsLsForNonexistingDir(t *testing.T) { + t.Parallel() + + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + _, tmpDir := tc.setupFiler(t) + + _, _, err := RequireErrorRun(t, "fs", "ls", path.Join(tmpDir, "nonexistent"), "--output=json") + assert.ErrorIs(t, err, fs.ErrNotExist) + assert.Regexp(t, regexp.MustCompile("no such directory: .*/nonexistent"), err.Error()) + }) + } } func TestAccFsLsWithoutScheme(t *testing.T) { + t.Parallel() + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - _, _, err := RequireErrorRun(t, "fs", "ls", "/ray-mysterio", "--output=json") + _, _, err := RequireErrorRun(t, "fs", "ls", "/path-without-a-dbfs-scheme", "--output=json") assert.ErrorIs(t, err, fs.ErrNotExist) } diff --git a/internal/fs_mkdir_test.go b/internal/fs_mkdir_test.go index af0e9d18..dd75c7c3 100644 --- a/internal/fs_mkdir_test.go +++ b/internal/fs_mkdir_test.go @@ -8,110 +8,127 @@ import ( "testing" "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestAccFsMkdirCreatesDirectory(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsMkdir(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + f, tmpDir := tc.setupFiler(t) - // create directory "a" - stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "a")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) + // create directory "a" + stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", path.Join(tmpDir, "a")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) - // assert directory "a" is created - info, err := f.Stat(ctx, "a") - require.NoError(t, err) - assert.Equal(t, "a", info.Name()) - assert.Equal(t, true, info.IsDir()) + // assert directory "a" is created + info, err := f.Stat(context.Background(), "a") + require.NoError(t, err) + assert.Equal(t, "a", info.Name()) + assert.Equal(t, true, info.IsDir()) + }) + } } -func TestAccFsMkdirCreatesMultipleDirectories(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsMkdirCreatesIntermediateDirectories(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + f, tmpDir := tc.setupFiler(t) - // create directory /a/b/c - stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "a", "b", "c")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) + // create directory "a/b/c" + stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", path.Join(tmpDir, "a", "b", "c")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) - // assert directory "a" is created - infoA, err := f.Stat(ctx, "a") - require.NoError(t, err) - assert.Equal(t, "a", infoA.Name()) - assert.Equal(t, true, infoA.IsDir()) + // assert directory "a" is created + infoA, err := f.Stat(context.Background(), "a") + require.NoError(t, err) + assert.Equal(t, "a", infoA.Name()) + assert.Equal(t, true, infoA.IsDir()) - // assert directory "b" is created - infoB, err := f.Stat(ctx, "a/b") - require.NoError(t, err) - assert.Equal(t, "b", infoB.Name()) - assert.Equal(t, true, infoB.IsDir()) + // assert directory "b" is created + infoB, err := f.Stat(context.Background(), "a/b") + require.NoError(t, err) + assert.Equal(t, "b", infoB.Name()) + assert.Equal(t, true, infoB.IsDir()) - // assert directory "c" is created - infoC, err := f.Stat(ctx, "a/b/c") - require.NoError(t, err) - assert.Equal(t, "c", infoC.Name()) - assert.Equal(t, true, infoC.IsDir()) + // assert directory "c" is created + infoC, err := f.Stat(context.Background(), "a/b/c") + require.NoError(t, err) + assert.Equal(t, "c", infoC.Name()) + assert.Equal(t, true, infoC.IsDir()) + }) + } } func TestAccFsMkdirWhenDirectoryAlreadyExists(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - // create directory "a" - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) - err = f.Mkdir(ctx, "a") - require.NoError(t, err) + f, tmpDir := tc.setupFiler(t) - // assert run is successful without any errors - stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "a")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) + // create directory "a" + err := f.Mkdir(context.Background(), "a") + require.NoError(t, err) + + // assert run is successful without any errors + stdout, stderr := RequireSuccessfulRun(t, "fs", "mkdir", path.Join(tmpDir, "a")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) + }) + } } func TestAccFsMkdirWhenFileExistsAtPath(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + t.Run("dbfs", func(t *testing.T) { + t.Parallel() - tmpDir := TemporaryDbfsDir(t, w) + f, tmpDir := setupDbfsFiler(t) - // create file hello - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) - err = f.Write(ctx, "hello", strings.NewReader("abc")) - require.NoError(t, err) + // create file "hello" + err := f.Write(context.Background(), "hello", strings.NewReader("abc")) + require.NoError(t, err) - // assert run fails - _, _, err = RequireErrorRun(t, "fs", "mkdir", "dbfs:"+path.Join(tmpDir, "hello")) - // Different cloud providers return different errors. - regex := regexp.MustCompile(`(^|: )Path is a file: .*$|(^|: )Cannot create directory .* because .* is an existing file\.$|(^|: )mkdirs\(hadoopPath: .*, permission: rwxrwxrwx\): failed$`) - assert.Regexp(t, regex, err.Error()) + // assert mkdir fails + _, _, err = RequireErrorRun(t, "fs", "mkdir", path.Join(tmpDir, "hello")) + + // Different cloud providers return different errors. + regex := regexp.MustCompile(`(^|: )Path is a file: .*$|(^|: )Cannot create directory .* because .* is an existing file\.$|(^|: )mkdirs\(hadoopPath: .*, permission: rwxrwxrwx\): failed$`) + assert.Regexp(t, regex, err.Error()) + }) + + t.Run("uc-volumes", func(t *testing.T) { + t.Parallel() + + f, tmpDir := setupUcVolumesFiler(t) + + // create file "hello" + err := f.Write(context.Background(), "hello", strings.NewReader("abc")) + require.NoError(t, err) + + // assert mkdir fails + _, _, err = RequireErrorRun(t, "fs", "mkdir", path.Join(tmpDir, "hello")) + + assert.ErrorAs(t, err, &filer.FileAlreadyExistsError{}) + }) } diff --git a/internal/fs_rm_test.go b/internal/fs_rm_test.go index d70827d1..e86f5713 100644 --- a/internal/fs_rm_test.go +++ b/internal/fs_rm_test.go @@ -8,139 +8,150 @@ import ( "testing" "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestAccFsRmForFile(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsRmFile(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + // Create a file + f, tmpDir := tc.setupFiler(t) + err := f.Write(context.Background(), "hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories) + require.NoError(t, err) - // create file to delete - err = f.Write(ctx, "hello.txt", strings.NewReader("abc")) - require.NoError(t, err) + // Check file was created + _, err = f.Stat(context.Background(), "hello.txt") + assert.NoError(t, err) - // check file was created - info, err := f.Stat(ctx, "hello.txt") - require.NoError(t, err) - require.Equal(t, "hello.txt", info.Name()) - require.Equal(t, info.IsDir(), false) + // Run rm command + stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", path.Join(tmpDir, "hello.txt")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) - // Run rm command - stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "hello.txt")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) - - // assert file was deleted - _, err = f.Stat(ctx, "hello.txt") - assert.ErrorIs(t, err, fs.ErrNotExist) + // Assert file was deleted + _, err = f.Stat(context.Background(), "hello.txt") + assert.ErrorIs(t, err, fs.ErrNotExist) + }) + } } -func TestAccFsRmForEmptyDirectory(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsRmEmptyDir(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + // Create a directory + f, tmpDir := tc.setupFiler(t) + err := f.Mkdir(context.Background(), "a") + require.NoError(t, err) - // create directory to delete - err = f.Mkdir(ctx, "avacado") - require.NoError(t, err) + // Check directory was created + _, err = f.Stat(context.Background(), "a") + assert.NoError(t, err) - // check directory was created - info, err := f.Stat(ctx, "avacado") - require.NoError(t, err) - require.Equal(t, "avacado", info.Name()) - require.Equal(t, info.IsDir(), true) + // Run rm command + stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", path.Join(tmpDir, "a")) + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) - // Run rm command - stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "avacado")) - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) - - // assert directory was deleted - _, err = f.Stat(ctx, "avacado") - assert.ErrorIs(t, err, fs.ErrNotExist) + // Assert directory was deleted + _, err = f.Stat(context.Background(), "a") + assert.ErrorIs(t, err, fs.ErrNotExist) + }) + } } -func TestAccFsRmForNonEmptyDirectory(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsRmNonEmptyDirectory(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + // Create a directory + f, tmpDir := tc.setupFiler(t) + err := f.Mkdir(context.Background(), "a") + require.NoError(t, err) - // create file in dir - err = f.Write(ctx, "avacado/guacamole", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) + // Create a file in the directory + err = f.Write(context.Background(), "a/hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories) + require.NoError(t, err) - // check file was created - info, err := f.Stat(ctx, "avacado/guacamole") - require.NoError(t, err) - require.Equal(t, "guacamole", info.Name()) - require.Equal(t, info.IsDir(), false) + // Check file was created + _, err = f.Stat(context.Background(), "a/hello.txt") + assert.NoError(t, err) - // Run rm command - _, _, err = RequireErrorRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "avacado")) - assert.ErrorIs(t, err, fs.ErrInvalid) - assert.ErrorContains(t, err, "directory not empty") + // Run rm command + _, _, err = RequireErrorRun(t, "fs", "rm", path.Join(tmpDir, "a")) + assert.ErrorIs(t, err, fs.ErrInvalid) + assert.ErrorAs(t, err, &filer.DirectoryNotEmptyError{}) + }) + } } func TestAccFsRmForNonExistentFile(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + t.Parallel() + + for _, testCase := range fsTests { + tc := testCase + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + _, tmpDir := tc.setupFiler(t) + + // Expect error if file does not exist + _, _, err := RequireErrorRun(t, "fs", "rm", path.Join(tmpDir, "does-not-exist")) + assert.ErrorIs(t, err, fs.ErrNotExist) + }) + } - // Expect error if file does not exist - _, _, err := RequireErrorRun(t, "fs", "rm", "dbfs:/does-not-exist") - assert.ErrorIs(t, err, fs.ErrNotExist) } -func TestAccFsRmForNonEmptyDirectoryWithRecursiveFlag(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) +func TestAccFsRmDirRecursively(t *testing.T) { + t.Parallel() - ctx := context.Background() - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) + for _, testCase := range fsTests { + tc := testCase - tmpDir := TemporaryDbfsDir(t, w) + t.Run(tc.name, func(t *testing.T) { + t.Parallel() - f, err := filer.NewDbfsClient(w, tmpDir) - require.NoError(t, err) + f, tmpDir := tc.setupFiler(t) - // create file in dir - err = f.Write(ctx, "avacado/guacamole", strings.NewReader("abc"), filer.CreateParentDirectories) - require.NoError(t, err) + // Create a directory + err := f.Mkdir(context.Background(), "a") + require.NoError(t, err) - // check file was created - info, err := f.Stat(ctx, "avacado/guacamole") - require.NoError(t, err) - require.Equal(t, "guacamole", info.Name()) - require.Equal(t, info.IsDir(), false) + // Create a file in the directory + err = f.Write(context.Background(), "a/hello.txt", strings.NewReader("abcd"), filer.CreateParentDirectories) + require.NoError(t, err) - // Run rm command - stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", "dbfs:"+path.Join(tmpDir, "avacado"), "--recursive") - assert.Equal(t, "", stderr.String()) - assert.Equal(t, "", stdout.String()) + // Check file was created + _, err = f.Stat(context.Background(), "a/hello.txt") + assert.NoError(t, err) - // assert directory was deleted - _, err = f.Stat(ctx, "avacado") - assert.ErrorIs(t, err, fs.ErrNotExist) + // Run rm command + stdout, stderr := RequireSuccessfulRun(t, "fs", "rm", path.Join(tmpDir, "a"), "--recursive") + assert.Equal(t, "", stderr.String()) + assert.Equal(t, "", stdout.String()) + + // Assert directory was deleted + _, err = f.Stat(context.Background(), "a") + assert.ErrorIs(t, err, fs.ErrNotExist) + }) + } } diff --git a/internal/helpers.go b/internal/helpers.go index 6377ae07..ca5aa25e 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -5,10 +5,13 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "math/rand" + "net/http" "os" + "path" "path/filepath" "reflect" "strings" @@ -19,8 +22,10 @@ import ( "github.com/databricks/cli/cmd" _ "github.com/databricks/cli/cmd/version" "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/filer" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/files" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -452,6 +457,40 @@ func TemporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string { return path } +// Create a new UC volume in a catalog called "main" in the workspace. +func temporaryUcVolume(t *testing.T, w *databricks.WorkspaceClient) string { + ctx := context.Background() + + // Create a schema + schema, err := w.Schemas.Create(ctx, catalog.CreateSchema{ + CatalogName: "main", + Name: RandomName("test-schema-"), + }) + require.NoError(t, err) + t.Cleanup(func() { + w.Schemas.Delete(ctx, catalog.DeleteSchemaRequest{ + FullName: schema.FullName, + }) + }) + + // Create a volume + volume, err := w.Volumes.Create(ctx, catalog.CreateVolumeRequestContent{ + CatalogName: "main", + SchemaName: schema.Name, + Name: "my-volume", + VolumeType: catalog.VolumeTypeManaged, + }) + require.NoError(t, err) + t.Cleanup(func() { + w.Volumes.Delete(ctx, catalog.DeleteVolumeRequest{ + Name: volume.FullName, + }) + }) + + return path.Join("/Volumes", "main", schema.Name, volume.Name) + +} + func TemporaryRepo(t *testing.T, w *databricks.WorkspaceClient) string { ctx := context.Background() me, err := w.CurrentUser.Me(ctx) @@ -489,3 +528,62 @@ func GetNodeTypeId(env string) string { } return "Standard_DS4_v2" } + +func setupLocalFiler(t *testing.T) (filer.Filer, string) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + tmp := t.TempDir() + f, err := filer.NewLocalClient(tmp) + require.NoError(t, err) + + return f, path.Join(filepath.ToSlash(tmp)) +} + +func setupWsfsFiler(t *testing.T) (filer.Filer, string) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + tmpdir := TemporaryWorkspaceDir(t, w) + f, err := filer.NewWorkspaceFilesClient(w, tmpdir) + require.NoError(t, err) + + // Check if we can use this API here, skip test if we cannot. + _, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled") + var aerr *apierr.APIError + if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest { + t.Skip(aerr.Message) + } + + return f, tmpdir +} + +func setupDbfsFiler(t *testing.T) (filer.Filer, string) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + + tmpDir := TemporaryDbfsDir(t, w) + f, err := filer.NewDbfsClient(w, tmpDir) + require.NoError(t, err) + + return f, path.Join("dbfs:/", tmpDir) +} + +func setupUcVolumesFiler(t *testing.T) (filer.Filer, string) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + if os.Getenv("TEST_METASTORE_ID") == "" { + t.Skip("Skipping tests that require a UC Volume when metastore id is not set.") + } + + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + + tmpDir := temporaryUcVolume(t, w) + f, err := filer.NewFilesClient(w, tmpDir) + require.NoError(t, err) + + return f, path.Join("dbfs:/", tmpDir) +} diff --git a/libs/filer/files_client.go b/libs/filer/files_client.go index 17884d57..9fc68bd5 100644 --- a/libs/filer/files_client.go +++ b/libs/filer/files_client.go @@ -11,18 +11,30 @@ import ( "net/url" "path" "slices" + "sort" "strings" "time" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/client" + "github.com/databricks/databricks-sdk-go/listing" + "github.com/databricks/databricks-sdk-go/service/files" + "golang.org/x/sync/errgroup" ) +// As of 19th Feb 2024, the Files API backend has a rate limit of 10 concurrent +// requests and 100 QPS. We limit the number of concurrent requests to 5 to +// avoid hitting the rate limit. +const maxFilesRequestsInFlight = 5 + // Type that implements fs.FileInfo for the Files API. +// This is required for the filer.Stat() method. type filesApiFileInfo struct { - absPath string - isDir bool + absPath string + isDir bool + fileSize int64 + lastModified int64 } func (info filesApiFileInfo) Name() string { @@ -30,8 +42,7 @@ func (info filesApiFileInfo) Name() string { } func (info filesApiFileInfo) Size() int64 { - // No way to get the file size in the Files API. - return 0 + return info.fileSize } func (info filesApiFileInfo) Mode() fs.FileMode { @@ -43,7 +54,7 @@ func (info filesApiFileInfo) Mode() fs.FileMode { } func (info filesApiFileInfo) ModTime() time.Time { - return time.Time{} + return time.UnixMilli(info.lastModified) } func (info filesApiFileInfo) IsDir() bool { @@ -54,6 +65,28 @@ func (info filesApiFileInfo) Sys() any { return nil } +// Type that implements fs.DirEntry for the Files API. +// This is required for the filer.ReadDir() method. +type filesApiDirEntry struct { + i filesApiFileInfo +} + +func (e filesApiDirEntry) Name() string { + return e.i.Name() +} + +func (e filesApiDirEntry) IsDir() bool { + return e.i.IsDir() +} + +func (e filesApiDirEntry) Type() fs.FileMode { + return e.i.Mode() +} + +func (e filesApiDirEntry) Info() (fs.FileInfo, error) { + return e.i, nil +} + // FilesClient implements the [Filer] interface for the Files API backend. type FilesClient struct { workspaceClient *databricks.WorkspaceClient @@ -63,10 +96,6 @@ type FilesClient struct { root WorkspaceRootPath } -func filesNotImplementedError(fn string) error { - return fmt.Errorf("filer.%s is not implemented for the Files API", fn) -} - func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) { apiClient, err := client.New(w.Config) if err != nil { @@ -102,6 +131,24 @@ func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, return err } + // Check that target path exists if CreateParentDirectories mode is not set + if !slices.Contains(mode, CreateParentDirectories) { + err := w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, path.Dir(absPath)) + if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 404 if the file doesn't exist. + if aerr.StatusCode == http.StatusNotFound { + return NoSuchDirectoryError{path.Dir(absPath)} + } + + return err + } + } + overwrite := slices.Contains(mode, OverwriteIfExists) urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite) headers := map[string]string{"Content-Type": "application/octet-stream"} @@ -119,7 +166,7 @@ func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, } // This API returns 409 if the file already exists, when the object type is file - if aerr.StatusCode == http.StatusConflict { + if aerr.StatusCode == http.StatusConflict && aerr.ErrorCode == "ALREADY_EXISTS" { return FileAlreadyExistsError{absPath} } @@ -148,14 +195,20 @@ func (w *FilesClient) Read(ctx context.Context, name string) (io.ReadCloser, err // This API returns a 404 if the specified path does not exist. if aerr.StatusCode == http.StatusNotFound { + // Check if the path is a directory. If so, return not a file error. + if _, err := w.statDir(ctx, name); err == nil { + return nil, NotAFile{absPath} + } + + // No file or directory exists at the specified path. Return no such file error. return nil, FileDoesNotExistError{absPath} } return nil, err } -func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { - absPath, urlPath, err := w.urlPath(name) +func (w *FilesClient) deleteFile(ctx context.Context, name string) error { + absPath, err := w.root.Join(name) if err != nil { return err } @@ -165,53 +218,232 @@ func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMod return CannotDeleteRootError{} } - err = w.apiClient.Do(ctx, http.MethodDelete, urlPath, nil, nil, nil) + err = w.workspaceClient.Files.DeleteByFilePath(ctx, absPath) // Return early on success. if err == nil { return nil } - // Special handling of this error only if it is an API error. var aerr *apierr.APIError + // Special handling of this error only if it is an API error. if !errors.As(err, &aerr) { return err } - // This API returns a 404 if the specified path does not exist. + // This files delete API returns a 404 if the specified path does not exist. if aerr.StatusCode == http.StatusNotFound { return FileDoesNotExistError{absPath} } - // This API returns 409 if the underlying path is a directory. - if aerr.StatusCode == http.StatusConflict { + return err +} + +func (w *FilesClient) deleteDirectory(ctx context.Context, name string) error { + absPath, err := w.root.Join(name) + if err != nil { + return err + } + + // Illegal to delete the root path. + if absPath == w.root.rootPath { + return CannotDeleteRootError{} + } + + err = w.workspaceClient.Files.DeleteDirectoryByDirectoryPath(ctx, absPath) + + var aerr *apierr.APIError + // Special handling of this error only if it is an API error. + if !errors.As(err, &aerr) { + return err + } + + // The directory delete API returns a 400 if the directory is not empty + if aerr.StatusCode == http.StatusBadRequest { + reasons := []string{} + for _, detail := range aerr.Details { + reasons = append(reasons, detail.Reason) + } + // Error code 400 is generic and can be returned for other reasons. Make + // sure one of the reasons for the error is that the directory is not empty. + if !slices.Contains(reasons, "FILES_API_DIRECTORY_IS_NOT_EMPTY") { + return err + } return DirectoryNotEmptyError{absPath} } return err } -func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { - return nil, filesNotImplementedError("ReadDir") -} +func (w *FilesClient) recursiveDelete(ctx context.Context, name string) error { + filerFS := NewFS(ctx, w) + dirsToDelete := make([]string, 0) + filesToDelete := make([]string, 0) + callback := func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } -func (w *FilesClient) Mkdir(ctx context.Context, name string) error { - // Directories are created implicitly. - // No need to do anything. + // Files API does not allowing deleting non-empty directories. We instead + // collect the directories to delete and delete them once all the files have + // been deleted. + if d.IsDir() { + dirsToDelete = append(dirsToDelete, path) + return nil + } + + filesToDelete = append(filesToDelete, path) + return nil + } + + // Walk the directory and accumulate the files and directories to delete. + err := fs.WalkDir(filerFS, name, callback) + if err != nil { + return err + } + + // Delete the files in parallel. + group, groupCtx := errgroup.WithContext(ctx) + group.SetLimit(maxFilesRequestsInFlight) + + for _, file := range filesToDelete { + file := file + + // Skip the file if the context has already been cancelled. + select { + case <-groupCtx.Done(): + continue + default: + // Proceed. + } + + group.Go(func() error { + return w.deleteFile(groupCtx, file) + }) + } + + // Wait for the files to be deleted and return the first non-nil error. + err = group.Wait() + if err != nil { + return err + } + + // Delete the directories in reverse order to ensure that the parent + // directories are deleted after the children. This is possible because + // fs.WalkDir walks the directories in lexicographical order. + for i := len(dirsToDelete) - 1; i >= 0; i-- { + err := w.deleteDirectory(ctx, dirsToDelete[i]) + if err != nil { + return err + } + } return nil } -func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { - absPath, urlPath, err := w.urlPath(name) +func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { + if slices.Contains(mode, DeleteRecursively) { + return w.recursiveDelete(ctx, name) + } + + // Issue a stat call to determine if the path is a file or directory. + info, err := w.Stat(ctx, name) + if err != nil { + return err + } + + // Issue the delete call for a directory + if info.IsDir() { + return w.deleteDirectory(ctx, name) + } + + return w.deleteFile(ctx, name) +} + +func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { + absPath, err := w.root.Join(name) if err != nil { return nil, err } - err = w.apiClient.Do(ctx, http.MethodHead, urlPath, nil, nil, nil) + iter := w.workspaceClient.Files.ListDirectoryContents(ctx, files.ListDirectoryContentsRequest{ + DirectoryPath: absPath, + }) + + files, err := listing.ToSlice(ctx, iter) + + // Return early on success. + if err == nil { + entries := make([]fs.DirEntry, len(files)) + for i, file := range files { + entries[i] = filesApiDirEntry{ + i: filesApiFileInfo{ + absPath: file.Path, + isDir: file.IsDirectory, + fileSize: file.FileSize, + lastModified: file.LastModified, + }, + } + } + + // Sort by name for parity with os.ReadDir. + sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() }) + return entries, nil + } + + // Special handling of this error only if it is an API error. + var apierr *apierr.APIError + if !errors.As(err, &apierr) { + return nil, err + } + + // This API returns a 404 if the specified path does not exist. + if apierr.StatusCode == http.StatusNotFound { + // Check if the path is a file. If so, return not a directory error. + if _, err := w.statFile(ctx, name); err == nil { + return nil, NotADirectory{absPath} + } + + // No file or directory exists at the specified path. Return no such directory error. + return nil, NoSuchDirectoryError{absPath} + } + return nil, err +} + +func (w *FilesClient) Mkdir(ctx context.Context, name string) error { + absPath, err := w.root.Join(name) + if err != nil { + return err + } + + err = w.workspaceClient.Files.CreateDirectory(ctx, files.CreateDirectoryRequest{ + DirectoryPath: absPath, + }) + + // Special handling of this error only if it is an API error. + var aerr *apierr.APIError + if errors.As(err, &aerr) && aerr.StatusCode == http.StatusConflict { + return FileAlreadyExistsError{absPath} + } + + return err +} + +// Get file metadata for a file using the Files API. +func (w *FilesClient) statFile(ctx context.Context, name string) (fs.FileInfo, error) { + absPath, err := w.root.Join(name) + if err != nil { + return nil, err + } + + fileInfo, err := w.workspaceClient.Files.GetMetadataByFilePath(ctx, absPath) // If the HEAD requests succeeds, the file exists. if err == nil { - return filesApiFileInfo{absPath: absPath, isDir: false}, nil + return filesApiFileInfo{ + absPath: absPath, + isDir: false, + fileSize: fileInfo.ContentLength, + }, nil } // Special handling of this error only if it is an API error. @@ -225,10 +457,51 @@ func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error return nil, FileDoesNotExistError{absPath} } - // This API returns 409 if the underlying path is a directory. - if aerr.StatusCode == http.StatusConflict { + return nil, err +} + +// Get file metadata for a directory using the Files API. +func (w *FilesClient) statDir(ctx context.Context, name string) (fs.FileInfo, error) { + absPath, err := w.root.Join(name) + if err != nil { + return nil, err + } + + err = w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, absPath) + + // If the HEAD requests succeeds, the directory exists. + if err == nil { return filesApiFileInfo{absPath: absPath, isDir: true}, nil } + // Special handling of this error only if it is an API error. + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return nil, err + } + + // The directory metadata API returns a 404 if the specified path does not exist. + if aerr.StatusCode == http.StatusNotFound { + return nil, NoSuchDirectoryError{absPath} + } + return nil, err } + +func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { + // Assume that the path is a directory and issue a stat call. + dirInfo, err := w.statDir(ctx, name) + + // If the file exists, return early. + if err == nil { + return dirInfo, nil + } + + // Return early if the error is not a NoSuchDirectoryError. + if !errors.As(err, &NoSuchDirectoryError{}) { + return nil, err + } + + // Since the path is not a directory, assume that it is a file and issue a stat call. + return w.statFile(ctx, name) +}