mirror of https://github.com/databricks/cli.git
This commit is contained in:
parent
e9a309e85d
commit
74cb87f1b9
|
@ -15,6 +15,7 @@ import (
|
|||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/service/files"
|
||||
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -159,9 +160,54 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) {
|
|||
assert.Equal(t, "c", entries[0].Name())
|
||||
assert.True(t, entries[0].IsDir())
|
||||
|
||||
// TODO: split into a separate PR
|
||||
// Expect an error trying to call ReadDir on a file
|
||||
_, err = f.ReadDir(ctx, "/hello.txt")
|
||||
assert.ErrorIs(t, err, filer.ErrNotADirectory)
|
||||
assert.ErrorIs(t, err, fs.ErrInvalid)
|
||||
|
||||
// Expect 0 entries for an empty directory
|
||||
err = f.Mkdir(ctx, "empty-dir")
|
||||
require.NoError(t, err)
|
||||
entries, err = f.ReadDir(ctx, "empty-dir")
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, entries, 0)
|
||||
|
||||
// Expect one entry for a directory with a file in it
|
||||
err = f.Write(ctx, "dir-with-one-file/my-file.txt", strings.NewReader("abc"), filer.CreateParentDirectories)
|
||||
require.NoError(t, err)
|
||||
entries, err = f.ReadDir(ctx, "dir-with-one-file")
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, entries, 1)
|
||||
assert.Equal(t, entries[0].Name(), "my-file.txt")
|
||||
assert.False(t, entries[0].IsDir())
|
||||
}
|
||||
|
||||
func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
ctx := context.Background()
|
||||
me, err := w.CurrentUser.Me(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-wsfs-"))
|
||||
|
||||
// Ensure directory exists, but doesn't exist YET!
|
||||
// Otherwise we could inadvertently remove a directory that already exists on cleanup.
|
||||
t.Logf("mkdir %s", path)
|
||||
err = w.Workspace.MkdirsByPath(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Remove test directory on test completion.
|
||||
t.Cleanup(func() {
|
||||
t.Logf("rm -rf %s", path)
|
||||
err := w.Workspace.Delete(ctx, workspace.Delete{
|
||||
Path: path,
|
||||
Recursive: true,
|
||||
})
|
||||
if err == nil || apierr.IsMissing(err) {
|
||||
return
|
||||
}
|
||||
t.Logf("unable to remove temporary workspace directory %s: %#v", path, err)
|
||||
})
|
||||
|
||||
return path
|
||||
}
|
||||
|
||||
func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) {
|
||||
|
@ -195,7 +241,7 @@ func TestAccFilerWorkspaceFilesReadDir(t *testing.T) {
|
|||
|
||||
func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
ctx := context.Background()
|
||||
path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-filer-dbfs-"))
|
||||
path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-dbfs-"))
|
||||
|
||||
// This call fails if the path already exists.
|
||||
t.Logf("mkdir dbfs:%s", path)
|
||||
|
|
|
@ -5,18 +5,17 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/databricks/cli/cmd/root"
|
||||
_ "github.com/databricks/cli/cmd/version"
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
_ "github.com/databricks/cli/cmd/workspace"
|
||||
|
@ -57,18 +56,54 @@ type cobraTestRunner struct {
|
|||
stdout bytes.Buffer
|
||||
stderr bytes.Buffer
|
||||
|
||||
// Line-by-line output.
|
||||
// Background goroutines populate these channels by reading from stdout/stderr pipes.
|
||||
stdoutLines <-chan string
|
||||
stderrLines <-chan string
|
||||
|
||||
errch <-chan error
|
||||
}
|
||||
|
||||
func consumeLines(ctx context.Context, wg *sync.WaitGroup, r io.Reader) <-chan string {
|
||||
ch := make(chan string, 1000)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
defer wg.Done()
|
||||
scanner := bufio.NewScanner(r)
|
||||
for scanner.Scan() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case ch <- scanner.Text():
|
||||
}
|
||||
}
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (t *cobraTestRunner) RunBackground() {
|
||||
var stdoutR, stderrR io.Reader
|
||||
var stdoutW, stderrW io.WriteCloser
|
||||
stdoutR, stdoutW = io.Pipe()
|
||||
stderrR, stderrW = io.Pipe()
|
||||
root := root.RootCmd
|
||||
root.SetOut(&t.stdout)
|
||||
root.SetErr(&t.stderr)
|
||||
root.SetOut(stdoutW)
|
||||
root.SetErr(stderrW)
|
||||
root.SetArgs(t.args)
|
||||
|
||||
errch := make(chan error)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// Tee stdout/stderr to buffers.
|
||||
stdoutR = io.TeeReader(stdoutR, &t.stdout)
|
||||
stderrR = io.TeeReader(stderrR, &t.stderr)
|
||||
|
||||
// Consume stdout/stderr line-by-line.
|
||||
var wg sync.WaitGroup
|
||||
t.stdoutLines = consumeLines(ctx, &wg, stdoutR)
|
||||
t.stderrLines = consumeLines(ctx, &wg, stderrR)
|
||||
|
||||
// Run command in background.
|
||||
go func() {
|
||||
cmd, err := root.ExecuteContextC(ctx)
|
||||
|
@ -76,6 +111,14 @@ func (t *cobraTestRunner) RunBackground() {
|
|||
t.Logf("Error running command: %s", err)
|
||||
}
|
||||
|
||||
// Close pipes to signal EOF.
|
||||
stdoutW.Close()
|
||||
stderrW.Close()
|
||||
|
||||
// Wait for the [consumeLines] routines to finish now that
|
||||
// the pipes they're reading from have closed.
|
||||
wg.Wait()
|
||||
|
||||
if t.stdout.Len() > 0 {
|
||||
// Make a copy of the buffer such that it remains "unread".
|
||||
scanner := bufio.NewScanner(bytes.NewBuffer(t.stdout.Bytes()))
|
||||
|
@ -130,6 +173,9 @@ func (c *cobraTestRunner) Eventually(condition func() bool, waitFor time.Duratio
|
|||
ticker := time.NewTicker(tick)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Kick off condition check immediately.
|
||||
go func() { ch <- condition() }()
|
||||
|
||||
for tick := ticker.C; ; {
|
||||
select {
|
||||
case err := <-c.errch:
|
||||
|
@ -179,32 +225,3 @@ func writeFile(t *testing.T, name string, body string) string {
|
|||
f.Close()
|
||||
return f.Name()
|
||||
}
|
||||
|
||||
func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
ctx := context.Background()
|
||||
me, err := w.CurrentUser.Me(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-wsfs-"))
|
||||
|
||||
// Ensure directory exists, but doesn't exist YET!
|
||||
// Otherwise we could inadvertently remove a directory that already exists on cleanup.
|
||||
t.Logf("mkdir %s", path)
|
||||
err = w.Workspace.MkdirsByPath(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Remove test directory on test completion.
|
||||
t.Cleanup(func() {
|
||||
t.Logf("rm -rf %s", path)
|
||||
err := w.Workspace.Delete(ctx, workspace.Delete{
|
||||
Path: path,
|
||||
Recursive: true,
|
||||
})
|
||||
if err == nil || apierr.IsMissing(err) {
|
||||
return
|
||||
}
|
||||
t.Logf("unable to remove temporary workspace directory %s: %#v", path, err)
|
||||
})
|
||||
|
||||
return path
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue