Add workspace export-dir command (#449)

## Changes
This PR:
1. Adds the export-dir command
2. Changes filer.Read to return an error if a user tries to read a
directory
3. Adds returning internal file structures from filer.Stat().Sys()

## Tests
Integration tests and manually
This commit is contained in:
shreyas-goenka 2023-06-08 18:15:12 +02:00 committed by GitHub
parent 53164ae880
commit 4818541062
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 345 additions and 37 deletions

View File

@ -0,0 +1,46 @@
package workspace
type fileIOEvent struct {
SourcePath string `json:"source_path,omitempty"`
TargetPath string `json:"target_path,omitempty"`
Type EventType `json:"type"`
}
type EventType string
const (
EventTypeFileExported = EventType("FILE_EXPORTED")
EventTypeExportStarted = EventType("EXPORT_STARTED")
EventTypeExportCompleted = EventType("EXPORT_COMPLETED")
EventTypeFileSkipped = EventType("FILE_SKIPPED")
)
func newFileExportedEvent(sourcePath, targetPath string) fileIOEvent {
return fileIOEvent{
SourcePath: sourcePath,
TargetPath: targetPath,
Type: EventTypeFileExported,
}
}
func newExportCompletedEvent(targetPath string) fileIOEvent {
return fileIOEvent{
TargetPath: targetPath,
Type: EventTypeExportCompleted,
}
}
func newFileSkippedEvent(sourcePath, targetPath string) fileIOEvent {
return fileIOEvent{
SourcePath: sourcePath,
TargetPath: targetPath,
Type: EventTypeFileSkipped,
}
}
func newExportStartedEvent(sourcePath string) fileIOEvent {
return fileIOEvent{
SourcePath: sourcePath,
Type: EventTypeExportStarted,
}
}

View File

@ -0,0 +1,125 @@
package workspace
import (
"context"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/spf13/cobra"
)
// The callback function exports the file specified at relPath. This function is
// meant to be used in conjunction with fs.WalkDir
func exportFileCallback(ctx context.Context, workspaceFiler filer.Filer, sourceDir, targetDir string) func(string, fs.DirEntry, error) error {
return func(relPath string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
sourcePath := path.Join(sourceDir, relPath)
targetPath := filepath.Join(targetDir, relPath)
// create directory and return early
if d.IsDir() {
return os.MkdirAll(targetPath, 0755)
}
// Add extension to local file path if the file is a notebook
info, err := d.Info()
if err != nil {
return err
}
objectInfo := info.Sys().(workspace.ObjectInfo)
if objectInfo.ObjectType == workspace.ObjectTypeNotebook {
switch objectInfo.Language {
case workspace.LanguagePython:
targetPath += ".py"
case workspace.LanguageR:
targetPath += ".r"
case workspace.LanguageScala:
targetPath += ".scala"
case workspace.LanguageSql:
targetPath += ".sql"
default:
// Do not add any extension to the file name
}
}
// Skip file if a file already exists in path.
// os.Stat returns a fs.ErrNotExist if a file does not exist at path.
// If a file exists, and overwrite is not set, we skip exporting the file
if _, err := os.Stat(targetPath); err == nil && !exportOverwrite {
// Log event that this file/directory has been skipped
return cmdio.RenderWithTemplate(ctx, newFileSkippedEvent(relPath, targetPath), "{{.SourcePath}} -> {{.TargetPath}} (skipped; already exists)\n")
}
// create the file
f, err := os.Create(targetPath)
if err != nil {
return err
}
defer f.Close()
// Write content to the local file
r, err := workspaceFiler.Read(ctx, relPath)
if err != nil {
return err
}
_, err = io.Copy(f, r)
if err != nil {
return err
}
return cmdio.RenderWithTemplate(ctx, newFileExportedEvent(sourcePath, targetPath), "{{.SourcePath}} -> {{.TargetPath}}\n")
}
}
var exportDirCommand = &cobra.Command{
Use: "export-dir SOURCE_PATH TARGET_PATH",
Short: `Export a directory from a Databricks workspace to the local file system.`,
Long: `
Export a directory recursively from a Databricks workspace to the local file system.
Notebooks will have one of the following extensions added .scala, .py, .sql, or .r
based on the language type.
`,
PreRunE: root.MustWorkspaceClient,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()
w := root.WorkspaceClient(ctx)
sourceDir := args[0]
targetDir := args[1]
// Initialize a filer and a file system on the source directory
workspaceFiler, err := filer.NewWorkspaceFilesClient(w, sourceDir)
if err != nil {
return err
}
workspaceFS := filer.NewFS(ctx, workspaceFiler)
// TODO: print progress events on stderr instead: https://github.com/databricks/cli/issues/448
err = cmdio.RenderJson(ctx, newExportStartedEvent(sourceDir))
if err != nil {
return err
}
err = fs.WalkDir(workspaceFS, ".", exportFileCallback(ctx, workspaceFiler, sourceDir, targetDir))
if err != nil {
return err
}
return cmdio.RenderJson(ctx, newExportCompletedEvent(targetDir))
},
}
var exportOverwrite bool
func init() {
exportDirCommand.Flags().BoolVar(&exportOverwrite, "overwrite", false, "overwrite existing local files")
Cmd.AddCommand(exportDirCommand)
}

View File

@ -53,6 +53,12 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
assert.True(t, errors.Is(err, fs.ErrNotExist)) assert.True(t, errors.Is(err, fs.ErrNotExist))
// Read should fail because the path points to a directory
err = f.Mkdir(ctx, "/dir")
require.NoError(t, err)
_, err = f.Read(ctx, "/dir")
assert.ErrorIs(t, err, fs.ErrInvalid)
// Write with CreateParentDirectories flag should succeed. // Write with CreateParentDirectories flag should succeed.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories)
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -1,9 +1,19 @@
package internal package internal
import ( import (
"context"
"errors"
"net/http"
"os"
"path/filepath"
"strings"
"testing" "testing"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
func TestAccWorkspaceList(t *testing.T) { func TestAccWorkspaceList(t *testing.T) {
@ -27,3 +37,104 @@ func TestWorkpaceGetStatusErrorWhenNoArguments(t *testing.T) {
_, _, err := RequireErrorRun(t, "workspace", "get-status") _, _, err := RequireErrorRun(t, "workspace", "get-status")
assert.Equal(t, "accepts 1 arg(s), received 0", err.Error()) assert.Equal(t, "accepts 1 arg(s), received 0", err.Error())
} }
func setupWorkspaceImportExportTest(t *testing.T) (context.Context, filer.Filer, string) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := temporaryWorkspaceDir(t, w)
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
require.NoError(t, err)
// Check if we can use this API here, skip test if we cannot.
_, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled")
var aerr *apierr.APIError
if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest {
t.Skip(aerr.Message)
}
return ctx, f, tmpdir
}
// TODO: add tests for the progress event output logs: https://github.com/databricks/cli/issues/447
func assertFileContents(t *testing.T, path string, content string) {
require.FileExists(t, path)
b, err := os.ReadFile(path)
require.NoError(t, err)
assert.Contains(t, string(b), content)
}
func TestAccExportDir(t *testing.T) {
ctx, f, sourceDir := setupWorkspaceImportExportTest(t)
targetDir := t.TempDir()
var err error
// Write test data to the workspace
err = f.Write(ctx, "file-a", strings.NewReader("abc"))
require.NoError(t, err)
err = f.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source"))
require.NoError(t, err)
err = f.Write(ctx, "sqlNotebook.sql", strings.NewReader("-- Databricks notebook source"))
require.NoError(t, err)
err = f.Write(ctx, "scalaNotebook.scala", strings.NewReader("// Databricks notebook source"))
require.NoError(t, err)
err = f.Write(ctx, "rNotebook.r", strings.NewReader("# Databricks notebook source"))
require.NoError(t, err)
err = f.Write(ctx, "a/b/c/file-b", strings.NewReader("def"), filer.CreateParentDirectories)
require.NoError(t, err)
// Run Export
RequireSuccessfulRun(t, "workspace", "export-dir", sourceDir, targetDir)
// Assert files were exported
assertFileContents(t, filepath.Join(targetDir, "file-a"), "abc")
assertFileContents(t, filepath.Join(targetDir, "pyNotebook.py"), "# Databricks notebook source")
assertFileContents(t, filepath.Join(targetDir, "sqlNotebook.sql"), "-- Databricks notebook source")
assertFileContents(t, filepath.Join(targetDir, "rNotebook.r"), "# Databricks notebook source")
assertFileContents(t, filepath.Join(targetDir, "scalaNotebook.scala"), "// Databricks notebook source")
assertFileContents(t, filepath.Join(targetDir, "a/b/c/file-b"), "def")
}
func TestAccExportDirDoesNotOverwrite(t *testing.T) {
ctx, f, sourceDir := setupWorkspaceImportExportTest(t)
targetDir := t.TempDir()
var err error
// Write remote file
err = f.Write(ctx, "file-a", strings.NewReader("content from workspace"))
require.NoError(t, err)
// Write local file
err = os.WriteFile(filepath.Join(targetDir, "file-a"), []byte("local content"), os.ModePerm)
require.NoError(t, err)
// Run Export
RequireSuccessfulRun(t, "workspace", "export-dir", sourceDir, targetDir)
// Assert file is not overwritten
assertFileContents(t, filepath.Join(targetDir, "file-a"), "local content")
}
func TestAccExportDirWithOverwriteFlag(t *testing.T) {
ctx, f, sourceDir := setupWorkspaceImportExportTest(t)
targetDir := t.TempDir()
var err error
// Write remote file
err = f.Write(ctx, "file-a", strings.NewReader("content from workspace"))
require.NoError(t, err)
// Write local file
err = os.WriteFile(filepath.Join(targetDir, "file-a"), []byte("local content"), os.ModePerm)
require.NoError(t, err)
// Run Export
RequireSuccessfulRun(t, "workspace", "export-dir", sourceDir, targetDir, "--overwrite")
// Assert file has been overwritten
assertFileContents(t, filepath.Join(targetDir, "file-a"), "content from workspace")
}

View File

@ -87,6 +87,14 @@ func RenderWithTemplate(ctx context.Context, v any, template string) error {
} }
} }
func RenderJson(ctx context.Context, v any) error {
c := fromContext(ctx)
if c.outputFormat == flags.OutputJSON {
return renderJson(c.out, v)
}
return nil
}
func RenderReader(ctx context.Context, r io.Reader) error { func RenderReader(ctx context.Context, r io.Reader) error {
c := fromContext(ctx) c := fromContext(ctx)
switch c.outputFormat { switch c.outputFormat {

View File

@ -59,7 +59,7 @@ func (info dbfsFileInfo) IsDir() bool {
} }
func (info dbfsFileInfo) Sys() any { func (info dbfsFileInfo) Sys() any {
return nil return info.fi
} }
// DbfsClient implements the [Filer] interface for the DBFS backend. // DbfsClient implements the [Filer] interface for the DBFS backend.
@ -145,24 +145,21 @@ func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) {
return nil, err return nil, err
} }
handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead) // This stat call serves two purposes:
// 1. Checks file at path exists, and throws an error if it does not
// 2. Allows us to error out if the path is a directory. This is needed
// because the Dbfs.Open method on the SDK does not error when the path is
// a directory
// TODO(added 8 June 2023): remove this stat call on go sdk bump. https://github.com/databricks/cli/issues/450
stat, err := w.Stat(ctx, name)
if err != nil { if err != nil {
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}
// This API returns a 404 if the file doesn't exist.
if aerr.StatusCode == http.StatusNotFound {
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
return nil, FileDoesNotExistError{absPath}
}
}
return nil, err return nil, err
} }
if stat.IsDir() {
return nil, NotAFile{absPath}
}
return handle, nil return w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead)
} }
func (w *DbfsClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { func (w *DbfsClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {

View File

@ -68,6 +68,18 @@ func (err NotADirectory) Is(other error) bool {
return other == fs.ErrInvalid return other == fs.ErrInvalid
} }
type NotAFile struct {
path string
}
func (err NotAFile) Error() string {
return fmt.Sprintf("not a file: %s", err.path)
}
func (err NotAFile) Is(other error) bool {
return other == fs.ErrInvalid
}
type DirectoryNotEmptyError struct { type DirectoryNotEmptyError struct {
path string path string
} }

View File

@ -3,6 +3,7 @@ package filer
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/base64"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -65,7 +66,7 @@ func (info wsfsFileInfo) IsDir() bool {
} }
func (info wsfsFileInfo) Sys() any { func (info wsfsFileInfo) Sys() any {
return nil return info.oi
} }
// WorkspaceFilesClient implements the files-in-workspace API. // WorkspaceFilesClient implements the files-in-workspace API.
@ -157,31 +158,33 @@ func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.Reader
return nil, err return nil, err
} }
// Remove leading "/" so we can use it in the URL. // This stat call serves two purposes:
urlPath := fmt.Sprintf( // 1. Checks file at path exists, and throws an error if it does not
"/api/2.0/workspace-files/%s", // 2. Allows us to error out if the path is a directory. This is needed
strings.TrimLeft(absPath, "/"), // because the /workspace/export API does not error out, and returns the directory
) // as a DBC archive even if format "SOURCE" is specified
stat, err := w.Stat(ctx, name)
var res []byte if err != nil {
err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &res)
// Return early on success.
if err == nil {
return bytes.NewReader(res), nil
}
// Special handling of this error only if it is an API error.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err return nil, err
} }
if stat.IsDir() {
if aerr.StatusCode == http.StatusNotFound { return nil, NotAFile{absPath}
return nil, FileDoesNotExistError{absPath}
} }
return nil, err // Export file contents. Note the /workspace/export API has a limit of 10MBs
// for the file size
// TODO: use direct download once it's fixed. see: https://github.com/databricks/cli/issues/452
res, err := w.workspaceClient.Workspace.Export(ctx, workspace.ExportRequest{
Path: absPath,
})
if err != nil {
return nil, err
}
b, err := base64.StdEncoding.DecodeString(res.Content)
if err != nil {
return nil, err
}
return bytes.NewReader(b), nil
} }
func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {