diff --git a/cmd/workspace/workspace/events.go b/cmd/workspace/workspace/events.go new file mode 100644 index 00000000..c4eb0f74 --- /dev/null +++ b/cmd/workspace/workspace/events.go @@ -0,0 +1,46 @@ +package workspace + +type fileIOEvent struct { + SourcePath string `json:"source_path,omitempty"` + TargetPath string `json:"target_path,omitempty"` + Type EventType `json:"type"` +} + +type EventType string + +const ( + EventTypeFileExported = EventType("FILE_EXPORTED") + EventTypeExportStarted = EventType("EXPORT_STARTED") + EventTypeExportCompleted = EventType("EXPORT_COMPLETED") + EventTypeFileSkipped = EventType("FILE_SKIPPED") +) + +func newFileExportedEvent(sourcePath, targetPath string) fileIOEvent { + return fileIOEvent{ + SourcePath: sourcePath, + TargetPath: targetPath, + Type: EventTypeFileExported, + } +} + +func newExportCompletedEvent(targetPath string) fileIOEvent { + return fileIOEvent{ + TargetPath: targetPath, + Type: EventTypeExportCompleted, + } +} + +func newFileSkippedEvent(sourcePath, targetPath string) fileIOEvent { + return fileIOEvent{ + SourcePath: sourcePath, + TargetPath: targetPath, + Type: EventTypeFileSkipped, + } +} + +func newExportStartedEvent(sourcePath string) fileIOEvent { + return fileIOEvent{ + SourcePath: sourcePath, + Type: EventTypeExportStarted, + } +} diff --git a/cmd/workspace/workspace/export_dir.go b/cmd/workspace/workspace/export_dir.go new file mode 100644 index 00000000..1c3fe968 --- /dev/null +++ b/cmd/workspace/workspace/export_dir.go @@ -0,0 +1,125 @@ +package workspace + +import ( + "context" + "io" + "io/fs" + "os" + "path" + "path/filepath" + + "github.com/databricks/cli/cmd/root" + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/databricks-sdk-go/service/workspace" + "github.com/spf13/cobra" +) + +// The callback function exports the file specified at relPath. This function is +// meant to be used in conjunction with fs.WalkDir +func exportFileCallback(ctx context.Context, workspaceFiler filer.Filer, sourceDir, targetDir string) func(string, fs.DirEntry, error) error { + return func(relPath string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + sourcePath := path.Join(sourceDir, relPath) + targetPath := filepath.Join(targetDir, relPath) + + // create directory and return early + if d.IsDir() { + return os.MkdirAll(targetPath, 0755) + } + + // Add extension to local file path if the file is a notebook + info, err := d.Info() + if err != nil { + return err + } + objectInfo := info.Sys().(workspace.ObjectInfo) + if objectInfo.ObjectType == workspace.ObjectTypeNotebook { + switch objectInfo.Language { + case workspace.LanguagePython: + targetPath += ".py" + case workspace.LanguageR: + targetPath += ".r" + case workspace.LanguageScala: + targetPath += ".scala" + case workspace.LanguageSql: + targetPath += ".sql" + default: + // Do not add any extension to the file name + } + } + + // Skip file if a file already exists in path. + // os.Stat returns a fs.ErrNotExist if a file does not exist at path. + // If a file exists, and overwrite is not set, we skip exporting the file + if _, err := os.Stat(targetPath); err == nil && !exportOverwrite { + // Log event that this file/directory has been skipped + return cmdio.RenderWithTemplate(ctx, newFileSkippedEvent(relPath, targetPath), "{{.SourcePath}} -> {{.TargetPath}} (skipped; already exists)\n") + } + + // create the file + f, err := os.Create(targetPath) + if err != nil { + return err + } + defer f.Close() + + // Write content to the local file + r, err := workspaceFiler.Read(ctx, relPath) + if err != nil { + return err + } + _, err = io.Copy(f, r) + if err != nil { + return err + } + return cmdio.RenderWithTemplate(ctx, newFileExportedEvent(sourcePath, targetPath), "{{.SourcePath}} -> {{.TargetPath}}\n") + } +} + +var exportDirCommand = &cobra.Command{ + Use: "export-dir SOURCE_PATH TARGET_PATH", + Short: `Export a directory from a Databricks workspace to the local file system.`, + Long: ` +Export a directory recursively from a Databricks workspace to the local file system. +Notebooks will have one of the following extensions added .scala, .py, .sql, or .r +based on the language type. +`, + PreRunE: root.MustWorkspaceClient, + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) (err error) { + ctx := cmd.Context() + w := root.WorkspaceClient(ctx) + sourceDir := args[0] + targetDir := args[1] + + // Initialize a filer and a file system on the source directory + workspaceFiler, err := filer.NewWorkspaceFilesClient(w, sourceDir) + if err != nil { + return err + } + workspaceFS := filer.NewFS(ctx, workspaceFiler) + + // TODO: print progress events on stderr instead: https://github.com/databricks/cli/issues/448 + err = cmdio.RenderJson(ctx, newExportStartedEvent(sourceDir)) + if err != nil { + return err + } + + err = fs.WalkDir(workspaceFS, ".", exportFileCallback(ctx, workspaceFiler, sourceDir, targetDir)) + if err != nil { + return err + } + return cmdio.RenderJson(ctx, newExportCompletedEvent(targetDir)) + }, +} + +var exportOverwrite bool + +func init() { + exportDirCommand.Flags().BoolVar(&exportOverwrite, "overwrite", false, "overwrite existing local files") + Cmd.AddCommand(exportDirCommand) +} diff --git a/internal/filer_test.go b/internal/filer_test.go index 317d7c85..3042693a 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -53,6 +53,12 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) assert.True(t, errors.Is(err, fs.ErrNotExist)) + // Read should fail because the path points to a directory + err = f.Mkdir(ctx, "/dir") + require.NoError(t, err) + _, err = f.Read(ctx, "/dir") + assert.ErrorIs(t, err, fs.ErrInvalid) + // Write with CreateParentDirectories flag should succeed. err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) assert.NoError(t, err) diff --git a/internal/workspace_test.go b/internal/workspace_test.go index f8830990..bfa323c5 100644 --- a/internal/workspace_test.go +++ b/internal/workspace_test.go @@ -1,9 +1,19 @@ package internal import ( + "context" + "errors" + "net/http" + "os" + "path/filepath" + "strings" "testing" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAccWorkspaceList(t *testing.T) { @@ -27,3 +37,104 @@ func TestWorkpaceGetStatusErrorWhenNoArguments(t *testing.T) { _, _, err := RequireErrorRun(t, "workspace", "get-status") assert.Equal(t, "accepts 1 arg(s), received 0", err.Error()) } + +func setupWorkspaceImportExportTest(t *testing.T) (context.Context, filer.Filer, string) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + tmpdir := temporaryWorkspaceDir(t, w) + f, err := filer.NewWorkspaceFilesClient(w, tmpdir) + require.NoError(t, err) + + // Check if we can use this API here, skip test if we cannot. + _, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled") + var aerr *apierr.APIError + if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest { + t.Skip(aerr.Message) + } + + return ctx, f, tmpdir +} + +// TODO: add tests for the progress event output logs: https://github.com/databricks/cli/issues/447 +func assertFileContents(t *testing.T, path string, content string) { + require.FileExists(t, path) + b, err := os.ReadFile(path) + require.NoError(t, err) + assert.Contains(t, string(b), content) +} + +func TestAccExportDir(t *testing.T) { + ctx, f, sourceDir := setupWorkspaceImportExportTest(t) + targetDir := t.TempDir() + + var err error + + // Write test data to the workspace + err = f.Write(ctx, "file-a", strings.NewReader("abc")) + require.NoError(t, err) + err = f.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source")) + require.NoError(t, err) + err = f.Write(ctx, "sqlNotebook.sql", strings.NewReader("-- Databricks notebook source")) + require.NoError(t, err) + err = f.Write(ctx, "scalaNotebook.scala", strings.NewReader("// Databricks notebook source")) + require.NoError(t, err) + err = f.Write(ctx, "rNotebook.r", strings.NewReader("# Databricks notebook source")) + require.NoError(t, err) + err = f.Write(ctx, "a/b/c/file-b", strings.NewReader("def"), filer.CreateParentDirectories) + require.NoError(t, err) + + // Run Export + RequireSuccessfulRun(t, "workspace", "export-dir", sourceDir, targetDir) + + // Assert files were exported + assertFileContents(t, filepath.Join(targetDir, "file-a"), "abc") + assertFileContents(t, filepath.Join(targetDir, "pyNotebook.py"), "# Databricks notebook source") + assertFileContents(t, filepath.Join(targetDir, "sqlNotebook.sql"), "-- Databricks notebook source") + assertFileContents(t, filepath.Join(targetDir, "rNotebook.r"), "# Databricks notebook source") + assertFileContents(t, filepath.Join(targetDir, "scalaNotebook.scala"), "// Databricks notebook source") + assertFileContents(t, filepath.Join(targetDir, "a/b/c/file-b"), "def") +} + +func TestAccExportDirDoesNotOverwrite(t *testing.T) { + ctx, f, sourceDir := setupWorkspaceImportExportTest(t) + targetDir := t.TempDir() + + var err error + + // Write remote file + err = f.Write(ctx, "file-a", strings.NewReader("content from workspace")) + require.NoError(t, err) + + // Write local file + err = os.WriteFile(filepath.Join(targetDir, "file-a"), []byte("local content"), os.ModePerm) + require.NoError(t, err) + + // Run Export + RequireSuccessfulRun(t, "workspace", "export-dir", sourceDir, targetDir) + + // Assert file is not overwritten + assertFileContents(t, filepath.Join(targetDir, "file-a"), "local content") +} + +func TestAccExportDirWithOverwriteFlag(t *testing.T) { + ctx, f, sourceDir := setupWorkspaceImportExportTest(t) + targetDir := t.TempDir() + + var err error + + // Write remote file + err = f.Write(ctx, "file-a", strings.NewReader("content from workspace")) + require.NoError(t, err) + + // Write local file + err = os.WriteFile(filepath.Join(targetDir, "file-a"), []byte("local content"), os.ModePerm) + require.NoError(t, err) + + // Run Export + RequireSuccessfulRun(t, "workspace", "export-dir", sourceDir, targetDir, "--overwrite") + + // Assert file has been overwritten + assertFileContents(t, filepath.Join(targetDir, "file-a"), "content from workspace") +} diff --git a/libs/cmdio/io.go b/libs/cmdio/io.go index 32637b1d..762a9455 100644 --- a/libs/cmdio/io.go +++ b/libs/cmdio/io.go @@ -87,6 +87,14 @@ func RenderWithTemplate(ctx context.Context, v any, template string) error { } } +func RenderJson(ctx context.Context, v any) error { + c := fromContext(ctx) + if c.outputFormat == flags.OutputJSON { + return renderJson(c.out, v) + } + return nil +} + func RenderReader(ctx context.Context, r io.Reader) error { c := fromContext(ctx) switch c.outputFormat { diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index b165852d..dbf3cf60 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -59,7 +59,7 @@ func (info dbfsFileInfo) IsDir() bool { } func (info dbfsFileInfo) Sys() any { - return nil + return info.fi } // DbfsClient implements the [Filer] interface for the DBFS backend. @@ -145,24 +145,21 @@ func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) { return nil, err } - handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead) + // This stat call serves two purposes: + // 1. Checks file at path exists, and throws an error if it does not + // 2. Allows us to error out if the path is a directory. This is needed + // because the Dbfs.Open method on the SDK does not error when the path is + // a directory + // TODO(added 8 June 2023): remove this stat call on go sdk bump. https://github.com/databricks/cli/issues/450 + stat, err := w.Stat(ctx, name) if err != nil { - var aerr *apierr.APIError - if !errors.As(err, &aerr) { - return nil, err - } - - // This API returns a 404 if the file doesn't exist. - if aerr.StatusCode == http.StatusNotFound { - if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - return nil, FileDoesNotExistError{absPath} - } - } - return nil, err } + if stat.IsDir() { + return nil, NotAFile{absPath} + } - return handle, nil + return w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead) } func (w *DbfsClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { diff --git a/libs/filer/filer.go b/libs/filer/filer.go index dcee3596..1c5ec2b9 100644 --- a/libs/filer/filer.go +++ b/libs/filer/filer.go @@ -68,6 +68,18 @@ func (err NotADirectory) Is(other error) bool { return other == fs.ErrInvalid } +type NotAFile struct { + path string +} + +func (err NotAFile) Error() string { + return fmt.Sprintf("not a file: %s", err.path) +} + +func (err NotAFile) Is(other error) bool { + return other == fs.ErrInvalid +} + type DirectoryNotEmptyError struct { path string } diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 9b4178af..4d310b1a 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -3,6 +3,7 @@ package filer import ( "bytes" "context" + "encoding/base64" "errors" "fmt" "io" @@ -65,7 +66,7 @@ func (info wsfsFileInfo) IsDir() bool { } func (info wsfsFileInfo) Sys() any { - return nil + return info.oi } // WorkspaceFilesClient implements the files-in-workspace API. @@ -157,31 +158,33 @@ func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.Reader return nil, err } - // Remove leading "/" so we can use it in the URL. - urlPath := fmt.Sprintf( - "/api/2.0/workspace-files/%s", - strings.TrimLeft(absPath, "/"), - ) - - var res []byte - err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &res) - - // Return early on success. - if err == nil { - return bytes.NewReader(res), nil - } - - // Special handling of this error only if it is an API error. - var aerr *apierr.APIError - if !errors.As(err, &aerr) { + // This stat call serves two purposes: + // 1. Checks file at path exists, and throws an error if it does not + // 2. Allows us to error out if the path is a directory. This is needed + // because the /workspace/export API does not error out, and returns the directory + // as a DBC archive even if format "SOURCE" is specified + stat, err := w.Stat(ctx, name) + if err != nil { return nil, err } - - if aerr.StatusCode == http.StatusNotFound { - return nil, FileDoesNotExistError{absPath} + if stat.IsDir() { + return nil, NotAFile{absPath} } - return nil, err + // Export file contents. Note the /workspace/export API has a limit of 10MBs + // for the file size + // TODO: use direct download once it's fixed. see: https://github.com/databricks/cli/issues/452 + res, err := w.workspaceClient.Workspace.Export(ctx, workspace.ExportRequest{ + Path: absPath, + }) + if err != nil { + return nil, err + } + b, err := base64.StdEncoding.DecodeString(res.Content) + if err != nil { + return nil, err + } + return bytes.NewReader(b), nil } func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {