mirror of https://github.com/databricks/cli.git
parent
b72742a129
commit
bb32067a80
|
@ -3,7 +3,6 @@ package fs
|
||||||
import (
|
import (
|
||||||
"github.com/databricks/cli/cmd/root"
|
"github.com/databricks/cli/cmd/root"
|
||||||
"github.com/databricks/cli/libs/cmdio"
|
"github.com/databricks/cli/libs/cmdio"
|
||||||
"github.com/databricks/cli/libs/filer"
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,14 +15,8 @@ var catCmd = &cobra.Command{
|
||||||
|
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
ctx := cmd.Context()
|
ctx := cmd.Context()
|
||||||
w := root.WorkspaceClient(ctx)
|
|
||||||
|
|
||||||
path, err := resolveDbfsPath(args[0])
|
f, path, err := filerForPath(ctx, args[0])
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := filer.NewDbfsClient(w, "/")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,179 @@
|
||||||
|
package fs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io/fs"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/cmd/root"
|
||||||
|
"github.com/databricks/cli/libs/cmdio"
|
||||||
|
"github.com/databricks/cli/libs/filer"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
type copy struct {
|
||||||
|
ctx context.Context
|
||||||
|
sourceFiler filer.Filer
|
||||||
|
targetFiler filer.Filer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *copy) cpWriteCallback(sourceDir, targetDir string) fs.WalkDirFunc {
|
||||||
|
return func(sourcePath string, d fs.DirEntry, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute path relative to the target directory
|
||||||
|
relPath, err := filepath.Rel(sourceDir, sourcePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
relPath = filepath.ToSlash(relPath)
|
||||||
|
|
||||||
|
// Compute target path for the file
|
||||||
|
targetPath := path.Join(targetDir, relPath)
|
||||||
|
|
||||||
|
// create directory and return early
|
||||||
|
if d.IsDir() {
|
||||||
|
return c.targetFiler.Mkdir(c.ctx, targetPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.cpFileToFile(sourcePath, targetPath)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *copy) cpDirToDir(sourceDir, targetDir string) error {
|
||||||
|
if !cpRecursive {
|
||||||
|
return fmt.Errorf("source path %s is a directory. Please specify the --recursive flag", sourceDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceFs := filer.NewFS(c.ctx, c.sourceFiler)
|
||||||
|
return fs.WalkDir(sourceFs, sourceDir, c.cpWriteCallback(sourceDir, targetDir))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *copy) cpFileToDir(sourcePath, targetDir string) error {
|
||||||
|
fileName := path.Base(sourcePath)
|
||||||
|
targetPath := path.Join(targetDir, fileName)
|
||||||
|
|
||||||
|
return c.cpFileToFile(sourcePath, targetPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *copy) cpFileToFile(sourcePath, targetPath string) error {
|
||||||
|
// Get reader for file at source path
|
||||||
|
r, err := c.sourceFiler.Read(c.ctx, sourcePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
if cpOverwrite {
|
||||||
|
err = c.targetFiler.Write(c.ctx, targetPath, r, filer.OverwriteIfExists)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = c.targetFiler.Write(c.ctx, targetPath, r)
|
||||||
|
// skip if file already exists
|
||||||
|
if err != nil && errors.Is(err, fs.ErrExist) {
|
||||||
|
return emitCpFileSkippedEvent(c.ctx, sourcePath, targetPath)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return emitCpFileCopiedEvent(c.ctx, sourcePath, targetPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: emit these events on stderr
|
||||||
|
// TODO: add integration tests for these events
|
||||||
|
func emitCpFileSkippedEvent(ctx context.Context, sourcePath, targetPath string) error {
|
||||||
|
event := newFileSkippedEvent(sourcePath, targetPath)
|
||||||
|
template := "{{.SourcePath}} -> {{.TargetPath}} (skipped; already exists)\n"
|
||||||
|
|
||||||
|
return cmdio.RenderWithTemplate(ctx, event, template)
|
||||||
|
}
|
||||||
|
|
||||||
|
func emitCpFileCopiedEvent(ctx context.Context, sourcePath, targetPath string) error {
|
||||||
|
event := newFileCopiedEvent(sourcePath, targetPath)
|
||||||
|
template := "{{.SourcePath}} -> {{.TargetPath}}\n"
|
||||||
|
|
||||||
|
return cmdio.RenderWithTemplate(ctx, event, template)
|
||||||
|
}
|
||||||
|
|
||||||
|
var cpOverwrite bool
|
||||||
|
var cpRecursive bool
|
||||||
|
|
||||||
|
// cpCmd represents the fs cp command
|
||||||
|
var cpCmd = &cobra.Command{
|
||||||
|
Use: "cp SOURCE_PATH TARGET_PATH",
|
||||||
|
Short: "Copy files and directories to and from DBFS.",
|
||||||
|
Long: `Copy files to and from DBFS.
|
||||||
|
|
||||||
|
It is required that you specify the scheme "file" for local files and
|
||||||
|
"dbfs" for dbfs files. For example: file:/foo/bar, file:/c:/foo/bar or dbfs:/foo/bar.
|
||||||
|
|
||||||
|
Recursively copying a directory will copy all files inside directory
|
||||||
|
at SOURCE_PATH to the directory at TARGET_PATH.
|
||||||
|
|
||||||
|
When copying a file, if TARGET_PATH is a directory, the file will be created
|
||||||
|
inside the directory, otherwise the file is created at TARGET_PATH.
|
||||||
|
`,
|
||||||
|
Args: cobra.ExactArgs(2),
|
||||||
|
PreRunE: root.MustWorkspaceClient,
|
||||||
|
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
ctx := cmd.Context()
|
||||||
|
|
||||||
|
// TODO: Error if a user uses '\' as path separator on windows when "file"
|
||||||
|
// scheme is specified (https://github.com/databricks/cli/issues/485)
|
||||||
|
|
||||||
|
// Get source filer and source path without scheme
|
||||||
|
fullSourcePath := args[0]
|
||||||
|
sourceFiler, sourcePath, err := filerForPath(ctx, fullSourcePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get target filer and target path without scheme
|
||||||
|
fullTargetPath := args[1]
|
||||||
|
targetFiler, targetPath, err := filerForPath(ctx, fullTargetPath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
c := copy{
|
||||||
|
ctx: ctx,
|
||||||
|
sourceFiler: sourceFiler,
|
||||||
|
targetFiler: targetFiler,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get information about file at source path
|
||||||
|
sourceInfo, err := sourceFiler.Stat(ctx, sourcePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 1: source path is a directory, then recursively create files at target path
|
||||||
|
if sourceInfo.IsDir() {
|
||||||
|
return c.cpDirToDir(sourcePath, targetPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 2: source path is a file, and target path is a directory. In this case
|
||||||
|
// we copy the file to inside the directory
|
||||||
|
if targetInfo, err := targetFiler.Stat(ctx, targetPath); err == nil && targetInfo.IsDir() {
|
||||||
|
return c.cpFileToDir(sourcePath, targetPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 3: source path is a file, and target path is a file
|
||||||
|
return c.cpFileToFile(sourcePath, targetPath)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
cpCmd.Flags().BoolVar(&cpOverwrite, "overwrite", false, "overwrite existing files")
|
||||||
|
cpCmd.Flags().BoolVarP(&cpRecursive, "recursive", "r", false, "recursively copy files from directory")
|
||||||
|
fsCmd.AddCommand(cpCmd)
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
package fs
|
||||||
|
|
||||||
|
type fileIOEvent struct {
|
||||||
|
SourcePath string `json:"source_path,omitempty"`
|
||||||
|
TargetPath string `json:"target_path,omitempty"`
|
||||||
|
Type EventType `json:"type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
EventTypeFileCopied = EventType("FILE_COPIED")
|
||||||
|
EventTypeFileSkipped = EventType("FILE_SKIPPED")
|
||||||
|
)
|
||||||
|
|
||||||
|
func newFileCopiedEvent(sourcePath, targetPath string) fileIOEvent {
|
||||||
|
return fileIOEvent{
|
||||||
|
SourcePath: sourcePath,
|
||||||
|
TargetPath: targetPath,
|
||||||
|
Type: EventTypeFileCopied,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFileSkippedEvent(sourcePath, targetPath string) fileIOEvent {
|
||||||
|
return fileIOEvent{
|
||||||
|
SourcePath: sourcePath,
|
||||||
|
TargetPath: targetPath,
|
||||||
|
Type: EventTypeFileSkipped,
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,14 +1,51 @@
|
||||||
package fs
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/cmd/root"
|
||||||
|
"github.com/databricks/cli/libs/filer"
|
||||||
)
|
)
|
||||||
|
|
||||||
func resolveDbfsPath(path string) (string, error) {
|
type Scheme string
|
||||||
if !strings.HasPrefix(path, "dbfs:/") {
|
|
||||||
return "", fmt.Errorf("expected dbfs path (with the dbfs:/ prefix): %s", path)
|
|
||||||
}
|
|
||||||
|
|
||||||
return strings.TrimPrefix(path, "dbfs:"), nil
|
const (
|
||||||
|
DbfsScheme = Scheme("dbfs")
|
||||||
|
LocalScheme = Scheme("file")
|
||||||
|
NoScheme = Scheme("")
|
||||||
|
)
|
||||||
|
|
||||||
|
func filerForPath(ctx context.Context, fullPath string) (filer.Filer, string, error) {
|
||||||
|
parts := strings.SplitN(fullPath, ":/", 2)
|
||||||
|
if len(parts) < 2 {
|
||||||
|
return nil, "", fmt.Errorf(`no scheme specified for path %s. Please specify scheme "dbfs" or "file". Example: file:/foo/bar or file:/c:/foo/bar`, fullPath)
|
||||||
|
}
|
||||||
|
scheme := Scheme(parts[0])
|
||||||
|
path := parts[1]
|
||||||
|
switch scheme {
|
||||||
|
case DbfsScheme:
|
||||||
|
w := root.WorkspaceClient(ctx)
|
||||||
|
f, err := filer.NewDbfsClient(w, "/")
|
||||||
|
return f, path, err
|
||||||
|
|
||||||
|
case LocalScheme:
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
parts := strings.SplitN(path, ":", 2)
|
||||||
|
if len(parts) < 2 {
|
||||||
|
return nil, "", fmt.Errorf("no volume specfied for path: %s", path)
|
||||||
|
}
|
||||||
|
volume := parts[0] + ":"
|
||||||
|
relPath := parts[1]
|
||||||
|
f, err := filer.NewLocalClient(volume)
|
||||||
|
return f, relPath, err
|
||||||
|
}
|
||||||
|
f, err := filer.NewLocalClient("/")
|
||||||
|
return f, path, err
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, "", fmt.Errorf(`unsupported scheme %s specified for path %s. Please specify scheme "dbfs" or "file". Example: file:/foo/bar or file:/c:/foo/bar`, scheme, fullPath)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,38 +1,26 @@
|
||||||
package fs
|
package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestResolveDbfsPath(t *testing.T) {
|
func TestNotSpecifyingVolumeForWindowsPathErrors(t *testing.T) {
|
||||||
path, err := resolveDbfsPath("dbfs:/")
|
if runtime.GOOS != "windows" {
|
||||||
|
t.Skip()
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
pathWithVolume := `file:/c:/foo/bar`
|
||||||
|
pathWOVolume := `file:/uno/dos`
|
||||||
|
|
||||||
|
_, path, err := filerForPath(ctx, pathWithVolume)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, "/", path)
|
assert.Equal(t, `/foo/bar`, path)
|
||||||
|
|
||||||
path, err = resolveDbfsPath("dbfs:/abc")
|
_, _, err = filerForPath(ctx, pathWOVolume)
|
||||||
assert.NoError(t, err)
|
assert.Equal(t, "no volume specfied for path: uno/dos", err.Error())
|
||||||
assert.Equal(t, "/abc", path)
|
|
||||||
|
|
||||||
path, err = resolveDbfsPath("dbfs:/a/b/c")
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, "/a/b/c", path)
|
|
||||||
|
|
||||||
path, err = resolveDbfsPath("dbfs:/a/b/.")
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, "/a/b/.", path)
|
|
||||||
|
|
||||||
path, err = resolveDbfsPath("dbfs:/a/../c")
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Equal(t, "/a/../c", path)
|
|
||||||
|
|
||||||
_, err = resolveDbfsPath("dbf:/a/b/c")
|
|
||||||
assert.ErrorContains(t, err, "expected dbfs path (with the dbfs:/ prefix): dbf:/a/b/c")
|
|
||||||
|
|
||||||
_, err = resolveDbfsPath("/a/b/c")
|
|
||||||
assert.ErrorContains(t, err, "expected dbfs path (with the dbfs:/ prefix): /a/b/c")
|
|
||||||
|
|
||||||
_, err = resolveDbfsPath("dbfs:a/b/c")
|
|
||||||
assert.ErrorContains(t, err, "expected dbfs path (with the dbfs:/ prefix): dbfs:a/b/c")
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
|
|
||||||
"github.com/databricks/cli/cmd/root"
|
"github.com/databricks/cli/cmd/root"
|
||||||
"github.com/databricks/cli/libs/cmdio"
|
"github.com/databricks/cli/libs/cmdio"
|
||||||
"github.com/databricks/cli/libs/filer"
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -42,14 +41,8 @@ var lsCmd = &cobra.Command{
|
||||||
|
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
ctx := cmd.Context()
|
ctx := cmd.Context()
|
||||||
w := root.WorkspaceClient(ctx)
|
|
||||||
|
|
||||||
path, err := resolveDbfsPath(args[0])
|
f, path, err := filerForPath(ctx, args[0])
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := filer.NewDbfsClient(w, "/")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/databricks/cli/cmd/root"
|
"github.com/databricks/cli/cmd/root"
|
||||||
"github.com/databricks/cli/libs/filer"
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -18,14 +17,8 @@ var mkdirCmd = &cobra.Command{
|
||||||
|
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
ctx := cmd.Context()
|
ctx := cmd.Context()
|
||||||
w := root.WorkspaceClient(ctx)
|
|
||||||
|
|
||||||
path, err := resolveDbfsPath(args[0])
|
f, path, err := filerForPath(ctx, args[0])
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := filer.NewDbfsClient(w, "/")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
13
cmd/fs/rm.go
13
cmd/fs/rm.go
|
@ -2,7 +2,7 @@ package fs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/databricks/cli/cmd/root"
|
"github.com/databricks/cli/cmd/root"
|
||||||
"github.com/databricks/databricks-sdk-go/service/files"
|
"github.com/databricks/cli/libs/filer"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -15,17 +15,16 @@ var rmCmd = &cobra.Command{
|
||||||
|
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
ctx := cmd.Context()
|
ctx := cmd.Context()
|
||||||
w := root.WorkspaceClient(ctx)
|
|
||||||
|
|
||||||
path, err := resolveDbfsPath(args[0])
|
f, path, err := filerForPath(ctx, args[0])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return w.Dbfs.Delete(ctx, files.Delete{
|
if recursive {
|
||||||
Path: path,
|
return f.Delete(ctx, path, filer.DeleteRecursively)
|
||||||
Recursive: recursive,
|
}
|
||||||
})
|
return f.Delete(ctx, path)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,296 @@
|
||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/libs/filer"
|
||||||
|
"github.com/databricks/databricks-sdk-go"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func setupSourceDir(t *testing.T, ctx context.Context, f filer.Filer) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint(123)"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = f.Write(ctx, "query.sql", strings.NewReader("SELECT 1"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = f.Write(ctx, "a/b/c/hello.txt", strings.NewReader("hello, world\n"), filer.CreateParentDirectories)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupSourceFile(t *testing.T, ctx context.Context, f filer.Filer) {
|
||||||
|
err := f.Write(ctx, "foo.txt", strings.NewReader("abc"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertTargetFile(t *testing.T, ctx context.Context, f filer.Filer, relPath string) {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
r, err := f.Read(ctx, relPath)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
defer r.Close()
|
||||||
|
b, err := io.ReadAll(r)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, "abc", string(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertFileContent(t *testing.T, ctx context.Context, f filer.Filer, path, expectedContent string) {
|
||||||
|
r, err := f.Read(ctx, path)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer r.Close()
|
||||||
|
b, err := io.ReadAll(r)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, expectedContent, string(b))
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertTargetDir(t *testing.T, ctx context.Context, f filer.Filer) {
|
||||||
|
assertFileContent(t, ctx, f, "pyNb.py", "# Databricks notebook source\nprint(123)")
|
||||||
|
assertFileContent(t, ctx, f, "query.sql", "SELECT 1")
|
||||||
|
assertFileContent(t, ctx, f, "a/b/c/hello.txt", "hello, world\n")
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupLocalFiler(t *testing.T) (filer.Filer, string) {
|
||||||
|
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||||
|
|
||||||
|
tmp := t.TempDir()
|
||||||
|
f, err := filer.NewLocalClient(tmp)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return f, path.Join("file:/", filepath.ToSlash(tmp))
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupDbfsFiler(t *testing.T) (filer.Filer, string) {
|
||||||
|
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||||
|
|
||||||
|
w, err := databricks.NewWorkspaceClient()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tmpDir := temporaryDbfsDir(t, w)
|
||||||
|
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return f, path.Join("dbfs:/", tmpDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
type cpTest struct {
|
||||||
|
setupSource func(*testing.T) (filer.Filer, string)
|
||||||
|
setupTarget func(*testing.T) (filer.Filer, string)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupTable() []cpTest {
|
||||||
|
return []cpTest{
|
||||||
|
{setupSource: setupLocalFiler, setupTarget: setupLocalFiler},
|
||||||
|
{setupSource: setupLocalFiler, setupTarget: setupDbfsFiler},
|
||||||
|
{setupSource: setupDbfsFiler, setupTarget: setupLocalFiler},
|
||||||
|
{setupSource: setupDbfsFiler, setupTarget: setupDbfsFiler},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpDir(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
table := setupTable()
|
||||||
|
|
||||||
|
for _, row := range table {
|
||||||
|
sourceFiler, sourceDir := row.setupSource(t)
|
||||||
|
targetFiler, targetDir := row.setupTarget(t)
|
||||||
|
setupSourceDir(t, ctx, sourceFiler)
|
||||||
|
|
||||||
|
RequireSuccessfulRun(t, "fs", "cp", "-r", sourceDir, targetDir)
|
||||||
|
|
||||||
|
assertTargetDir(t, ctx, targetFiler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpFileToFile(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
table := setupTable()
|
||||||
|
|
||||||
|
for _, row := range table {
|
||||||
|
sourceFiler, sourceDir := row.setupSource(t)
|
||||||
|
targetFiler, targetDir := row.setupTarget(t)
|
||||||
|
setupSourceFile(t, ctx, sourceFiler)
|
||||||
|
|
||||||
|
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), path.Join(targetDir, "bar.txt"))
|
||||||
|
|
||||||
|
assertTargetFile(t, ctx, targetFiler, "bar.txt")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpFileToDir(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
table := setupTable()
|
||||||
|
for _, row := range table {
|
||||||
|
sourceFiler, sourceDir := row.setupSource(t)
|
||||||
|
targetFiler, targetDir := row.setupTarget(t)
|
||||||
|
setupSourceFile(t, ctx, sourceFiler)
|
||||||
|
|
||||||
|
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "foo.txt"), targetDir)
|
||||||
|
|
||||||
|
assertTargetFile(t, ctx, targetFiler, "foo.txt")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpDirToDirFileNotOverwritten(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
table := setupTable()
|
||||||
|
|
||||||
|
for _, row := range table {
|
||||||
|
sourceFiler, sourceDir := row.setupSource(t)
|
||||||
|
targetFiler, targetDir := row.setupTarget(t)
|
||||||
|
setupSourceDir(t, ctx, sourceFiler)
|
||||||
|
|
||||||
|
// Write a conflicting file to target
|
||||||
|
err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive")
|
||||||
|
assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "this should not be overwritten")
|
||||||
|
assertFileContent(t, ctx, targetFiler, "query.sql", "SELECT 1")
|
||||||
|
assertFileContent(t, ctx, targetFiler, "pyNb.py", "# Databricks notebook source\nprint(123)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpFileToDirFileNotOverwritten(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
table := setupTable()
|
||||||
|
|
||||||
|
for _, row := range table {
|
||||||
|
sourceFiler, sourceDir := row.setupSource(t)
|
||||||
|
targetFiler, targetDir := row.setupTarget(t)
|
||||||
|
setupSourceDir(t, ctx, sourceFiler)
|
||||||
|
|
||||||
|
// Write a conflicting file to target
|
||||||
|
err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c"))
|
||||||
|
assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "this should not be overwritten")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpFileToFileFileNotOverwritten(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
table := setupTable()
|
||||||
|
|
||||||
|
for _, row := range table {
|
||||||
|
sourceFiler, sourceDir := row.setupSource(t)
|
||||||
|
targetFiler, targetDir := row.setupTarget(t)
|
||||||
|
setupSourceDir(t, ctx, sourceFiler)
|
||||||
|
|
||||||
|
// Write a conflicting file to target
|
||||||
|
err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/hola.txt"), "--recursive")
|
||||||
|
assertFileContent(t, ctx, targetFiler, "a/b/c/hola.txt", "this should not be overwritten")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpDirToDirWithOverwriteFlag(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
table := setupTable()
|
||||||
|
|
||||||
|
for _, row := range table {
|
||||||
|
sourceFiler, sourceDir := row.setupSource(t)
|
||||||
|
targetFiler, targetDir := row.setupTarget(t)
|
||||||
|
setupSourceDir(t, ctx, sourceFiler)
|
||||||
|
|
||||||
|
// Write a conflicting file to target
|
||||||
|
err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten"), filer.CreateParentDirectories)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive", "--overwrite")
|
||||||
|
assertTargetDir(t, ctx, targetFiler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpFileToFileWithOverwriteFlag(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
table := setupTable()
|
||||||
|
|
||||||
|
for _, row := range table {
|
||||||
|
sourceFiler, sourceDir := row.setupSource(t)
|
||||||
|
targetFiler, targetDir := row.setupTarget(t)
|
||||||
|
setupSourceDir(t, ctx, sourceFiler)
|
||||||
|
|
||||||
|
// Write a conflicting file to target
|
||||||
|
err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this will be overwritten. Such is life."), filer.CreateParentDirectories)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/hola.txt"), "--overwrite")
|
||||||
|
assertFileContent(t, ctx, targetFiler, "a/b/c/hola.txt", "hello, world\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpFileToDirWithOverwriteFlag(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
table := setupTable()
|
||||||
|
|
||||||
|
for _, row := range table {
|
||||||
|
sourceFiler, sourceDir := row.setupSource(t)
|
||||||
|
targetFiler, targetDir := row.setupTarget(t)
|
||||||
|
setupSourceDir(t, ctx, sourceFiler)
|
||||||
|
|
||||||
|
// Write a conflicting file to target
|
||||||
|
err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten :') "), filer.CreateParentDirectories)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c"), "--recursive", "--overwrite")
|
||||||
|
assertFileContent(t, ctx, targetFiler, "a/b/c/hello.txt", "hello, world\n")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpErrorsWhenSourceIsDirWithoutRecursiveFlag(t *testing.T) {
|
||||||
|
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||||
|
|
||||||
|
w, err := databricks.NewWorkspaceClient()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
tmpDir := temporaryDbfsDir(t, w)
|
||||||
|
|
||||||
|
_, _, err = RequireErrorRun(t, "fs", "cp", "dbfs:"+tmpDir, "dbfs:/tmp")
|
||||||
|
assert.Equal(t, fmt.Sprintf("source path %s is a directory. Please specify the --recursive flag", strings.TrimPrefix(tmpDir, "/")), err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpErrorsOnNoScheme(t *testing.T) {
|
||||||
|
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||||
|
|
||||||
|
_, _, err := RequireErrorRun(t, "fs", "cp", "/a", "/b")
|
||||||
|
assert.Equal(t, "no scheme specified for path /a. Please specify scheme \"dbfs\" or \"file\". Example: file:/foo/bar or file:/c:/foo/bar", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpErrorsOnInvalidScheme(t *testing.T) {
|
||||||
|
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||||
|
|
||||||
|
_, _, err := RequireErrorRun(t, "fs", "cp", "dbfs:/a", "https:/b")
|
||||||
|
assert.Equal(t, "unsupported scheme https specified for path https:/b. Please specify scheme \"dbfs\" or \"file\". Example: file:/foo/bar or file:/c:/foo/bar", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAccFsCpSourceIsDirectoryButTargetIsFile(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
table := setupTable()
|
||||||
|
|
||||||
|
for _, row := range table {
|
||||||
|
sourceFiler, sourceDir := row.setupSource(t)
|
||||||
|
targetFiler, targetDir := row.setupTarget(t)
|
||||||
|
setupSourceDir(t, ctx, sourceFiler)
|
||||||
|
|
||||||
|
// Write a conflicting file to target
|
||||||
|
err := targetFiler.Write(ctx, "my_target", strings.NewReader("I'll block any attempts to recursively copy"), filer.CreateParentDirectories)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, _, err = RequireErrorRun(t, "fs", "cp", sourceDir, path.Join(targetDir, "my_target"), "--recursive", "--overwrite")
|
||||||
|
assert.Error(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -240,6 +240,7 @@ func NewCobraTestRunner(t *testing.T, args ...string) *cobraTestRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequireSuccessfulRun(t *testing.T, args ...string) (bytes.Buffer, bytes.Buffer) {
|
func RequireSuccessfulRun(t *testing.T, args ...string) (bytes.Buffer, bytes.Buffer) {
|
||||||
|
t.Logf("run args: [%s]", strings.Join(args, ", "))
|
||||||
c := NewCobraTestRunner(t, args...)
|
c := NewCobraTestRunner(t, args...)
|
||||||
stdout, stderr, err := c.Run()
|
stdout, stderr, err := c.Run()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -169,6 +169,5 @@ func (w *LocalClient) Stat(ctx context.Context, name string) (fs.FileInfo, error
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil, FileDoesNotExistError{path: absPath}
|
return nil, FileDoesNotExistError{path: absPath}
|
||||||
}
|
}
|
||||||
|
|
||||||
return stat, err
|
return stat, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue