diff --git a/internal/helpers.go b/internal/helpers.go index b51d005b..1f756871 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -5,10 +5,12 @@ import ( "bytes" "context" "fmt" + "io" "math/rand" "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -54,18 +56,54 @@ type cobraTestRunner struct { stdout bytes.Buffer stderr bytes.Buffer + // Line-by-line output. + // Background goroutines populate these channels by reading from stdout/stderr pipes. + stdoutLines <-chan string + stderrLines <-chan string + errch <-chan error } +func consumeLines(ctx context.Context, wg *sync.WaitGroup, r io.Reader) <-chan string { + ch := make(chan string, 1000) + wg.Add(1) + go func() { + defer close(ch) + defer wg.Done() + scanner := bufio.NewScanner(r) + for scanner.Scan() { + select { + case <-ctx.Done(): + return + case ch <- scanner.Text(): + } + } + }() + return ch +} + func (t *cobraTestRunner) RunBackground() { + var stdoutR, stderrR io.Reader + var stdoutW, stderrW io.WriteCloser + stdoutR, stdoutW = io.Pipe() + stderrR, stderrW = io.Pipe() root := root.RootCmd - root.SetOut(&t.stdout) - root.SetErr(&t.stderr) + root.SetOut(stdoutW) + root.SetErr(stderrW) root.SetArgs(t.args) errch := make(chan error) ctx, cancel := context.WithCancel(context.Background()) + // Tee stdout/stderr to buffers. + stdoutR = io.TeeReader(stdoutR, &t.stdout) + stderrR = io.TeeReader(stderrR, &t.stderr) + + // Consume stdout/stderr line-by-line. + var wg sync.WaitGroup + t.stdoutLines = consumeLines(ctx, &wg, stdoutR) + t.stderrLines = consumeLines(ctx, &wg, stderrR) + // Run command in background. go func() { cmd, err := root.ExecuteContextC(ctx) @@ -73,6 +111,14 @@ func (t *cobraTestRunner) RunBackground() { t.Logf("Error running command: %s", err) } + // Close pipes to signal EOF. + stdoutW.Close() + stderrW.Close() + + // Wait for the [consumeLines] routines to finish now that + // the pipes they're reading from have closed. + wg.Wait() + if t.stdout.Len() > 0 { // Make a copy of the buffer such that it remains "unread". scanner := bufio.NewScanner(bytes.NewBuffer(t.stdout.Bytes())) @@ -127,6 +173,9 @@ func (c *cobraTestRunner) Eventually(condition func() bool, waitFor time.Duratio ticker := time.NewTicker(tick) defer ticker.Stop() + // Kick off condition check immediately. + go func() { ch <- condition() }() + for tick := ticker.C; ; { select { case err := <-c.errch: diff --git a/internal/sync_test.go b/internal/sync_test.go index bcff7b22..89b318a9 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -26,7 +26,7 @@ import ( var ( repoUrl = "https://github.com/databricks/databricks-empty-ide-project.git" - repoFiles = []string{"README-IDE.md"} + repoFiles = []string{} ) // This test needs auth env vars to run. @@ -59,7 +59,7 @@ func setupRepo(t *testing.T, wsc *databricks.WorkspaceClient, ctx context.Contex return localRoot, remoteRoot } -type assertSync struct { +type syncTest struct { t *testing.T c *cobraTestRunner w *databricks.WorkspaceClient @@ -67,7 +67,54 @@ type assertSync struct { remoteRoot string } -func (a *assertSync) remoteDirContent(ctx context.Context, relativeDir string, expectedFiles []string) { +func setupSyncTest(t *testing.T, args ...string) *syncTest { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + w := databricks.Must(databricks.NewWorkspaceClient()) + localRoot := t.TempDir() + remoteRoot := temporaryWorkspaceDir(t, w) + + // Prepend common arguments. + args = append([]string{ + "sync", + localRoot, + remoteRoot, + "--output", + "json", + }, args...) + + c := NewCobraTestRunner(t, args...) + c.RunBackground() + + return &syncTest{ + t: t, + c: c, + w: w, + localRoot: localRoot, + remoteRoot: remoteRoot, + } +} + +func (s *syncTest) waitForCompletionMarker() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + for { + select { + case <-ctx.Done(): + s.t.Fatal("timed out waiting for sync to complete") + case line := <-s.c.stdoutLines: + var event sync.EventBase + err := json.Unmarshal([]byte(line), &event) + require.NoError(s.t, err) + if event.Type == sync.EventTypeComplete { + return + } + } + } +} + +func (a *syncTest) remoteDirContent(ctx context.Context, relativeDir string, expectedFiles []string) { remoteDir := path.Join(a.remoteRoot, relativeDir) a.c.Eventually(func() bool { objects, err := a.w.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{ @@ -92,7 +139,7 @@ func (a *assertSync) remoteDirContent(ctx context.Context, relativeDir string, e } } -func (a *assertSync) remoteFileContent(ctx context.Context, relativePath string, expectedContent string) { +func (a *syncTest) remoteFileContent(ctx context.Context, relativePath string, expectedContent string) { filePath := path.Join(a.remoteRoot, relativePath) // Remove leading "/" so we can use it in the URL. @@ -113,7 +160,7 @@ func (a *assertSync) remoteFileContent(ctx context.Context, relativePath string, }, 30*time.Second, 5*time.Second) } -func (a *assertSync) objectType(ctx context.Context, relativePath string, expected string) { +func (a *syncTest) objectType(ctx context.Context, relativePath string, expected string) { path := path.Join(a.remoteRoot, relativePath) a.c.Eventually(func() bool { @@ -125,7 +172,7 @@ func (a *assertSync) objectType(ctx context.Context, relativePath string, expect }, 30*time.Second, 5*time.Second) } -func (a *assertSync) language(ctx context.Context, relativePath string, expected string) { +func (a *syncTest) language(ctx context.Context, relativePath string, expected string) { path := path.Join(a.remoteRoot, relativePath) a.c.Eventually(func() bool { @@ -137,7 +184,7 @@ func (a *assertSync) language(ctx context.Context, relativePath string, expected }, 30*time.Second, 5*time.Second) } -func (a *assertSync) snapshotContains(files []string) { +func (a *syncTest) snapshotContains(files []string) { snapshotPath := filepath.Join(a.localRoot, ".databricks/sync-snapshots", sync.GetFileName(a.w.Config.Host, a.remoteRoot)) assert.FileExists(a.t, snapshotPath) @@ -160,123 +207,87 @@ func (a *assertSync) snapshotContains(files []string) { assert.Equal(a.t, len(files), len(s.LastUpdatedTimes)) } -func TestAccFullFileSync(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncFullFileSync(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--full", "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--full", "--watch") // .gitignore is created by the sync process to enforce .databricks is not synced + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) // New file - localFilePath := filepath.Join(localRepoPath, "foo.txt") + localFilePath := filepath.Join(assertSync.localRoot, "foo.txt") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, "foo.txt", ".gitignore")) assertSync.remoteFileContent(ctx, "foo.txt", "") // Write to file f.Overwrite(t, `{"statement": "Mi Gente"}`) + assertSync.waitForCompletionMarker() assertSync.remoteFileContent(ctx, "foo.txt", `{"statement": "Mi Gente"}`) // Write again f.Overwrite(t, `{"statement": "Young Dumb & Broke"}`) + assertSync.waitForCompletionMarker() assertSync.remoteFileContent(ctx, "foo.txt", `{"statement": "Young Dumb & Broke"}`) // delete f.Remove(t) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) } -func TestAccIncrementalFileSync(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncIncrementalFileSync(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--watch") // .gitignore is created by the sync process to enforce .databricks is not synced + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) // New file - localFilePath := filepath.Join(localRepoPath, "foo.txt") + localFilePath := filepath.Join(assertSync.localRoot, "foo.txt") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, "foo.txt", ".gitignore")) assertSync.remoteFileContent(ctx, "foo.txt", "") assertSync.snapshotContains(append(repoFiles, "foo.txt", ".gitignore")) // Write to file f.Overwrite(t, `{"statement": "Mi Gente"}`) + assertSync.waitForCompletionMarker() assertSync.remoteFileContent(ctx, "foo.txt", `{"statement": "Mi Gente"}`) // Write again f.Overwrite(t, `{"statement": "Young Dumb & Broke"}`) + assertSync.waitForCompletionMarker() assertSync.remoteFileContent(ctx, "foo.txt", `{"statement": "Young Dumb & Broke"}`) // delete f.Remove(t) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) assertSync.snapshotContains(append(repoFiles, ".gitignore")) } -func TestAccNestedFolderSync(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncNestedFolderSync(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--watch") // .gitignore is created by the sync process to enforce .databricks is not synced + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) // New file - localFilePath := filepath.Join(localRepoPath, "dir1/dir2/dir3/foo.txt") + localFilePath := filepath.Join(assertSync.localRoot, "dir1/dir2/dir3/foo.txt") err := os.MkdirAll(filepath.Dir(localFilePath), 0o755) assert.NoError(t, err) f := testfile.CreateFile(t, localFilePath) defer f.Close(t) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "dir1")) assertSync.remoteDirContent(ctx, "dir1", []string{"dir2"}) assertSync.remoteDirContent(ctx, "dir1/dir2", []string{"dir3"}) @@ -285,40 +296,27 @@ func TestAccNestedFolderSync(t *testing.T) { // delete f.Remove(t) + assertSync.waitForCompletionMarker() // directories are not cleaned up right now. This is not ideal assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{}) assertSync.snapshotContains(append(repoFiles, ".gitignore")) } -func TestAccNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--watch") // .gitignore is created by the sync process to enforce .databricks is not synced + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) // New file - localFilePath := filepath.Join(localRepoPath, "dir1/a b+c/c+d e/e+f g#i.txt") + localFilePath := filepath.Join(assertSync.localRoot, "dir1/a b+c/c+d e/e+f g#i.txt") err := os.MkdirAll(filepath.Dir(localFilePath), 0o755) assert.NoError(t, err) f := testfile.CreateFile(t, localFilePath) defer f.Close(t) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "dir1")) assertSync.remoteDirContent(ctx, "dir1", []string{"a b+c"}) assertSync.remoteDirContent(ctx, "dir1/a b+c", []string{"c+d e"}) @@ -327,6 +325,7 @@ func TestAccNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { // delete f.Remove(t) + assertSync.waitForCompletionMarker() // directories are not cleaned up right now. This is not ideal assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", []string{}) assertSync.snapshotContains(append(repoFiles, ".gitignore")) @@ -341,77 +340,49 @@ func TestAccNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { // // In the above scenario sync should delete the empty folder and add foo to the remote // file system -func TestAccIncrementalFileOverwritesFolder(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncIncrementalFileOverwritesFolder(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--watch") // create foo/bar.txt - localFilePath := filepath.Join(localRepoPath, "foo/bar.txt") + localFilePath := filepath.Join(assertSync.localRoot, "foo/bar.txt") err := os.MkdirAll(filepath.Dir(localFilePath), 0o755) assert.NoError(t, err) f := testfile.CreateFile(t, localFilePath) defer f.Close(t) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) assertSync.remoteDirContent(ctx, "foo", []string{"bar.txt"}) assertSync.snapshotContains(append(repoFiles, ".gitignore", filepath.FromSlash("foo/bar.txt"))) // delete foo/bar.txt f.Remove(t) - os.Remove(filepath.Join(localRepoPath, "foo")) + os.Remove(filepath.Join(assertSync.localRoot, "foo")) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "foo", []string{}) assertSync.objectType(ctx, "foo", "DIRECTORY") assertSync.snapshotContains(append(repoFiles, ".gitignore")) - f2 := testfile.CreateFile(t, filepath.Join(localRepoPath, "foo")) + f2 := testfile.CreateFile(t, filepath.Join(assertSync.localRoot, "foo")) defer f2.Close(t) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) assertSync.objectType(ctx, "foo", "FILE") assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo")) } -func TestAccIncrementalSyncPythonNotebookToFile(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncIncrementalSyncPythonNotebookToFile(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) + assertSync := setupSyncTest(t, "--watch") // create python notebook - localFilePath := filepath.Join(localRepoPath, "foo.py") + localFilePath := filepath.Join(assertSync.localRoot, "foo.py") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) f.Overwrite(t, "# Databricks notebook source") - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } - // notebook was uploaded properly + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) assertSync.objectType(ctx, "foo", "NOTEBOOK") assertSync.language(ctx, "foo", "PYTHON") @@ -419,40 +390,27 @@ func TestAccIncrementalSyncPythonNotebookToFile(t *testing.T) { // convert to vanilla python file f.Overwrite(t, "# No longer a python notebook") + assertSync.waitForCompletionMarker() assertSync.objectType(ctx, "foo.py", "FILE") assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo.py")) assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo.py")) // delete the vanilla python file f.Remove(t) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) assertSync.snapshotContains(append(repoFiles, ".gitignore")) } -func TestAccIncrementalSyncFileToPythonNotebook(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncIncrementalSyncFileToPythonNotebook(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync := setupSyncTest(t, "--watch") // create vanilla python file - localFilePath := filepath.Join(localRepoPath, "foo.py") + localFilePath := filepath.Join(assertSync.localRoot, "foo.py") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) + assertSync.waitForCompletionMarker() // assert file upload assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo.py")) @@ -461,37 +419,23 @@ func TestAccIncrementalSyncFileToPythonNotebook(t *testing.T) { // convert to notebook f.Overwrite(t, "# Databricks notebook source") + assertSync.waitForCompletionMarker() assertSync.objectType(ctx, "foo", "NOTEBOOK") assertSync.language(ctx, "foo", "PYTHON") assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) assertSync.snapshotContains(append(repoFiles, ".gitignore", "foo.py")) } -func TestAccIncrementalSyncPythonNotebookDelete(t *testing.T) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - wsc := databricks.Must(databricks.NewWorkspaceClient()) +func TestAccSyncIncrementalSyncPythonNotebookDelete(t *testing.T) { ctx := context.Background() - - localRepoPath, remoteRepoPath := setupRepo(t, wsc, ctx) + assertSync := setupSyncTest(t, "--watch") // create python notebook - localFilePath := filepath.Join(localRepoPath, "foo.py") + localFilePath := filepath.Join(assertSync.localRoot, "foo.py") f := testfile.CreateFile(t, localFilePath) defer f.Close(t) f.Overwrite(t, "# Databricks notebook source") - - // Run `databricks sync` in the background. - c := NewCobraTestRunner(t, "sync", localRepoPath, remoteRepoPath, "--watch") - c.RunBackground() - - assertSync := assertSync{ - t: t, - c: c, - w: wsc, - localRoot: localRepoPath, - remoteRoot: remoteRepoPath, - } + assertSync.waitForCompletionMarker() // notebook was uploaded properly assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore", "foo")) @@ -500,6 +444,7 @@ func TestAccIncrementalSyncPythonNotebookDelete(t *testing.T) { // Delete notebook f.Remove(t) + assertSync.waitForCompletionMarker() assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) }