From 74cb87f1b9403cacfb7c7d6dbdf1262d6db23a3a Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 00:35:50 +0200 Subject: [PATCH] - --- internal/filer_test.go | 52 ++++++++++++++++++++++++-- internal/helpers.go | 85 +++++++++++++++++++++++++----------------- 2 files changed, 100 insertions(+), 37 deletions(-) diff --git a/internal/filer_test.go b/internal/filer_test.go index 02a833398..81c3e4aea 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/service/files" + "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -159,9 +160,54 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.Equal(t, "c", entries[0].Name()) assert.True(t, entries[0].IsDir()) - // TODO: split into a separate PR + // Expect an error trying to call ReadDir on a file _, err = f.ReadDir(ctx, "/hello.txt") - assert.ErrorIs(t, err, filer.ErrNotADirectory) + assert.ErrorIs(t, err, fs.ErrInvalid) + + // Expect 0 entries for an empty directory + err = f.Mkdir(ctx, "empty-dir") + require.NoError(t, err) + entries, err = f.ReadDir(ctx, "empty-dir") + assert.NoError(t, err) + assert.Len(t, entries, 0) + + // Expect one entry for a directory with a file in it + err = f.Write(ctx, "dir-with-one-file/my-file.txt", strings.NewReader("abc"), filer.CreateParentDirectories) + require.NoError(t, err) + entries, err = f.ReadDir(ctx, "dir-with-one-file") + assert.NoError(t, err) + assert.Len(t, entries, 1) + assert.Equal(t, entries[0].Name(), "my-file.txt") + assert.False(t, entries[0].IsDir()) +} + +func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { + ctx := context.Background() + me, err := w.CurrentUser.Me(ctx) + require.NoError(t, err) + + path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-wsfs-")) + + // Ensure directory exists, but doesn't exist YET! + // Otherwise we could inadvertently remove a directory that already exists on cleanup. + t.Logf("mkdir %s", path) + err = w.Workspace.MkdirsByPath(ctx, path) + require.NoError(t, err) + + // Remove test directory on test completion. + t.Cleanup(func() { + t.Logf("rm -rf %s", path) + err := w.Workspace.Delete(ctx, workspace.Delete{ + Path: path, + Recursive: true, + }) + if err == nil || apierr.IsMissing(err) { + return + } + t.Logf("unable to remove temporary workspace directory %s: %#v", path, err) + }) + + return path } func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { @@ -195,7 +241,7 @@ func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string { ctx := context.Background() - path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-filer-dbfs-")) + path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-dbfs-")) // This call fails if the path already exists. t.Logf("mkdir dbfs:%s", path) diff --git a/internal/helpers.go b/internal/helpers.go index f1901f14e..1f7568719 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -5,18 +5,17 @@ import ( "bytes" "context" "fmt" + "io" "math/rand" "os" "path/filepath" "strings" + "sync" "testing" "time" "github.com/databricks/cli/cmd/root" _ "github.com/databricks/cli/cmd/version" - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/require" _ "github.com/databricks/cli/cmd/workspace" @@ -57,18 +56,54 @@ type cobraTestRunner struct { stdout bytes.Buffer stderr bytes.Buffer + // Line-by-line output. + // Background goroutines populate these channels by reading from stdout/stderr pipes. + stdoutLines <-chan string + stderrLines <-chan string + errch <-chan error } +func consumeLines(ctx context.Context, wg *sync.WaitGroup, r io.Reader) <-chan string { + ch := make(chan string, 1000) + wg.Add(1) + go func() { + defer close(ch) + defer wg.Done() + scanner := bufio.NewScanner(r) + for scanner.Scan() { + select { + case <-ctx.Done(): + return + case ch <- scanner.Text(): + } + } + }() + return ch +} + func (t *cobraTestRunner) RunBackground() { + var stdoutR, stderrR io.Reader + var stdoutW, stderrW io.WriteCloser + stdoutR, stdoutW = io.Pipe() + stderrR, stderrW = io.Pipe() root := root.RootCmd - root.SetOut(&t.stdout) - root.SetErr(&t.stderr) + root.SetOut(stdoutW) + root.SetErr(stderrW) root.SetArgs(t.args) errch := make(chan error) ctx, cancel := context.WithCancel(context.Background()) + // Tee stdout/stderr to buffers. + stdoutR = io.TeeReader(stdoutR, &t.stdout) + stderrR = io.TeeReader(stderrR, &t.stderr) + + // Consume stdout/stderr line-by-line. + var wg sync.WaitGroup + t.stdoutLines = consumeLines(ctx, &wg, stdoutR) + t.stderrLines = consumeLines(ctx, &wg, stderrR) + // Run command in background. go func() { cmd, err := root.ExecuteContextC(ctx) @@ -76,6 +111,14 @@ func (t *cobraTestRunner) RunBackground() { t.Logf("Error running command: %s", err) } + // Close pipes to signal EOF. + stdoutW.Close() + stderrW.Close() + + // Wait for the [consumeLines] routines to finish now that + // the pipes they're reading from have closed. + wg.Wait() + if t.stdout.Len() > 0 { // Make a copy of the buffer such that it remains "unread". scanner := bufio.NewScanner(bytes.NewBuffer(t.stdout.Bytes())) @@ -130,6 +173,9 @@ func (c *cobraTestRunner) Eventually(condition func() bool, waitFor time.Duratio ticker := time.NewTicker(tick) defer ticker.Stop() + // Kick off condition check immediately. + go func() { ch <- condition() }() + for tick := ticker.C; ; { select { case err := <-c.errch: @@ -179,32 +225,3 @@ func writeFile(t *testing.T, name string, body string) string { f.Close() return f.Name() } - -func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { - ctx := context.Background() - me, err := w.CurrentUser.Me(ctx) - require.NoError(t, err) - - path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-wsfs-")) - - // Ensure directory exists, but doesn't exist YET! - // Otherwise we could inadvertently remove a directory that already exists on cleanup. - t.Logf("mkdir %s", path) - err = w.Workspace.MkdirsByPath(ctx, path) - require.NoError(t, err) - - // Remove test directory on test completion. - t.Cleanup(func() { - t.Logf("rm -rf %s", path) - err := w.Workspace.Delete(ctx, workspace.Delete{ - Path: path, - Recursive: true, - }) - if err == nil || apierr.IsMissing(err) { - return - } - t.Logf("unable to remove temporary workspace directory %s: %#v", path, err) - }) - - return path -}