diff --git a/internal/api_test.go b/internal/api_test.go index ffc01eae..80825fe4 100644 --- a/internal/api_test.go +++ b/internal/api_test.go @@ -15,7 +15,8 @@ import ( func TestAccApiGet(t *testing.T) { t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - stdout, _, err := run(t, "api", "get", "/api/2.0/preview/scim/v2/Me") + c := NewCobraTestRunner(t, "api", "get", "/api/2.0/preview/scim/v2/Me") + stdout, _, err := c.Run() require.NoError(t, err) // Deserialize SCIM API response. @@ -42,13 +43,15 @@ func TestAccApiPost(t *testing.T) { // Post to mkdir { - _, _, err := run(t, "api", "post", "--body=@"+requestPath, "/api/2.0/dbfs/mkdirs") + c := NewCobraTestRunner(t, "api", "post", "--body=@"+requestPath, "/api/2.0/dbfs/mkdirs") + _, _, err := c.Run() require.NoError(t, err) } // Post to delete { - _, _, err := run(t, "api", "post", "--body=@"+requestPath, "/api/2.0/dbfs/delete") + c := NewCobraTestRunner(t, "api", "post", "--body=@"+requestPath, "/api/2.0/dbfs/delete") + _, _, err := c.Run() require.NoError(t, err) } } diff --git a/internal/helpers.go b/internal/helpers.go index 940a0965..515ef3d3 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -1,7 +1,9 @@ package internal import ( + "bufio" "bytes" + "context" "fmt" "math/rand" "os" @@ -39,21 +41,114 @@ func RandomName(prefix ...string) string { return string(b) } -func run(t *testing.T, args ...string) (bytes.Buffer, bytes.Buffer, error) { - var stdout bytes.Buffer - var stderr bytes.Buffer +// Helper for running the bricks root command in the background. +// It ensures that the background goroutine terminates upon +// test completion through cancelling the command context. +type cobraTestRunner struct { + *testing.T + + args []string + stdout bytes.Buffer + stderr bytes.Buffer + + errch <-chan error +} + +func (t *cobraTestRunner) RunBackground() { root := root.RootCmd - root.SetOut(&stdout) - root.SetErr(&stderr) - root.SetArgs(args) - _, err := root.ExecuteC() - if stdout.Len() > 0 { - t.Logf("[stdout]: %s", stdout.String()) + root.SetOut(&t.stdout) + root.SetErr(&t.stderr) + root.SetArgs(t.args) + + errch := make(chan error) + ctx, cancel := context.WithCancel(context.Background()) + + // Run command in background. + go func() { + cmd, err := root.ExecuteContextC(ctx) + if err != nil { + t.Logf("Error running command: %s", err) + } + + if t.stdout.Len() > 0 { + // Make a copy of the buffer such that it remains "unread". + scanner := bufio.NewScanner(bytes.NewBuffer(t.stdout.Bytes())) + for scanner.Scan() { + t.Logf("[bricks stdout]: %s", scanner.Text()) + } + } + + if t.stderr.Len() > 0 { + // Make a copy of the buffer such that it remains "unread". + scanner := bufio.NewScanner(bytes.NewBuffer(t.stderr.Bytes())) + for scanner.Scan() { + t.Logf("[bricks stderr]: %s", scanner.Text()) + } + } + + // Reset context on command for the next test. + // These commands are globals so we have to clean up to the best of our ability after each run. + // See https://github.com/spf13/cobra/blob/a6f198b635c4b18fff81930c40d464904e55b161/command.go#L1062-L1066 + //lint:ignore SA1012 cobra sets the context and doesn't clear it + cmd.SetContext(nil) + + // Make caller aware of error. + errch <- err + close(errch) + }() + + // Ensure command terminates upon test completion (success or failure). + t.Cleanup(func() { + // Signal termination of command. + cancel() + // Wait for goroutine to finish. + <-errch + }) + + t.errch = errch +} + +func (t *cobraTestRunner) Run() (bytes.Buffer, bytes.Buffer, error) { + t.RunBackground() + err := <-t.errch + return t.stdout, t.stderr, err +} + +// Like [require.Eventually] but errors if the underlying command has failed. +func (c *cobraTestRunner) Eventually(condition func() bool, waitFor time.Duration, tick time.Duration, msgAndArgs ...interface{}) { + ch := make(chan bool, 1) + + timer := time.NewTimer(waitFor) + defer timer.Stop() + + ticker := time.NewTicker(tick) + defer ticker.Stop() + + for tick := ticker.C; ; { + select { + case err := <-c.errch: + require.Fail(c, "Command failed", err) + return + case <-timer.C: + require.Fail(c, "Condition never satisfied", msgAndArgs...) + return + case <-tick: + tick = nil + go func() { ch <- condition() }() + case v := <-ch: + if v { + return + } + tick = ticker.C + } } - if stderr.Len() > 0 { - t.Logf("[stderr]: %s", stderr.String()) +} + +func NewCobraTestRunner(t *testing.T, args ...string) *cobraTestRunner { + return &cobraTestRunner{ + T: t, + args: args, } - return stdout, stderr, err } func writeFile(t *testing.T, name string, body string) string { diff --git a/internal/sync_test.go b/internal/sync_test.go index b42be3e8..19fc660f 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -1,7 +1,6 @@ package internal import ( - "bytes" "context" "encoding/json" "fmt" @@ -9,12 +8,10 @@ import ( "os" "os/exec" "path/filepath" - "strings" "testing" "time" "github.com/databricks/bricks/cmd/sync" - "github.com/databricks/bricks/folders" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/repos" "github.com/databricks/databricks-sdk-go/service/workspace" @@ -26,19 +23,6 @@ import ( func TestAccFullSync(t *testing.T) { t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - // We assume cwd is in the bricks repo - wd, err := os.Getwd() - if err != nil { - t.Log("[WARN] error fetching current working dir: ", err) - } - t.Log("test run dir: ", wd) - bricksRepo, err := folders.FindDirWithLeaf(wd, ".git") - if err != nil { - t.Log("[ERROR] error finding git repo root in : ", wd) - } - t.Log("bricks repo location: : ", bricksRepo) - assert.Equal(t, "bricks", filepath.Base(bricksRepo)) - wsc := databricks.Must(databricks.NewWorkspaceClient()) ctx := context.Background() me, err := wsc.CurrentUser.Me(ctx) @@ -71,39 +55,13 @@ func TestAccFullSync(t *testing.T) { assert.NoError(t, err) defer f.Close() - // start bricks sync process - cmd = exec.Command("go", "run", "main.go", "sync", "--remote-path", repoPath, "--persist-snapshot=false") - - var cmdOut, cmdErr bytes.Buffer - cmd.Stdout = &cmdOut - cmd.Stderr = &cmdErr - cmd.Dir = bricksRepo - // bricks sync command will inherit the env vars from process + // Run `bricks sync` in the background. t.Setenv("BRICKS_ROOT", projectDir) - err = cmd.Start() - assert.NoError(t, err) - t.Cleanup(func() { - // We wait three seconds to allow the bricks sync process flush its - // stdout buffer - time.Sleep(3 * time.Second) - // terminate the bricks sync process - cmd.Process.Kill() - // Print the stdout and stderr logs from the bricks sync process - t.Log("\n\n\n\n\n\n") - t.Logf("bricks sync logs for command: %s", cmd.String()) - if err != nil { - t.Logf("error in bricks sync process: %s\n", err) - } - for _, line := range strings.Split(strings.TrimSuffix(cmdOut.String(), "\n"), "\n") { - t.Log("[bricks sync stdout]", line) - } - for _, line := range strings.Split(strings.TrimSuffix(cmdErr.String(), "\n"), "\n") { - t.Log("[bricks sync stderr]", line) - } - }) + c := NewCobraTestRunner(t, "sync", "--remote-path", repoPath, "--persist-snapshot=false") + c.RunBackground() // First upload assertion - assert.Eventually(t, func() bool { + c.Eventually(func() bool { objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) @@ -126,7 +84,7 @@ func TestAccFullSync(t *testing.T) { // Create new files and assert os.Create(filepath.Join(projectDir, "hello.txt")) os.Create(filepath.Join(projectDir, "world.txt")) - assert.Eventually(t, func() bool { + c.Eventually(func() bool { objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) @@ -150,7 +108,7 @@ func TestAccFullSync(t *testing.T) { // delete a file and assert os.Remove(filepath.Join(projectDir, "hello.txt")) - assert.Eventually(t, func() bool { + c.Eventually(func() bool { objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) @@ -198,19 +156,6 @@ func assertSnapshotContents(t *testing.T, host, repoPath, projectDir string, lis func TestAccIncrementalSync(t *testing.T) { t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - // We assume cwd is in the bricks repo - wd, err := os.Getwd() - if err != nil { - t.Log("[WARN] error fetching current working dir: ", err) - } - t.Log("test run dir: ", wd) - bricksRepo, err := folders.FindDirWithLeaf(wd, ".git") - if err != nil { - t.Log("[ERROR] error finding git repo root in : ", wd) - } - t.Log("bricks repo location: : ", bricksRepo) - assert.Equal(t, "bricks", filepath.Base(bricksRepo)) - wsc := databricks.Must(databricks.NewWorkspaceClient()) ctx := context.Background() me, err := wsc.CurrentUser.Me(ctx) @@ -247,40 +192,13 @@ func TestAccIncrementalSync(t *testing.T) { _, err = f2.Write(content) assert.NoError(t, err) - // start bricks sync process - cmd = exec.Command("go", "run", "main.go", "sync", "--remote-path", repoPath, "--persist-snapshot=true") - - var cmdOut, cmdErr bytes.Buffer - cmd.Stdout = &cmdOut - cmd.Stderr = &cmdErr - cmd.Dir = bricksRepo - // bricks sync command will inherit the env vars from process + // Run `bricks sync` in the background. t.Setenv("BRICKS_ROOT", projectDir) - err = cmd.Start() - assert.NoError(t, err) - t.Cleanup(func() { - // We wait three seconds to allow the bricks sync process flush its - // stdout buffer - time.Sleep(3 * time.Second) - // terminate the bricks sync process - cmd.Process.Kill() - // Print the stdout and stderr logs from the bricks sync process - // TODO: modify logs to suit multiple sync processes - t.Log("\n\n\n\n\n\n") - t.Logf("bricks sync logs for command: %s", cmd.String()) - if err != nil { - t.Logf("error in bricks sync process: %s\n", err) - } - for _, line := range strings.Split(strings.TrimSuffix(cmdOut.String(), "\n"), "\n") { - t.Log("[bricks sync stdout]", line) - } - for _, line := range strings.Split(strings.TrimSuffix(cmdErr.String(), "\n"), "\n") { - t.Log("[bricks sync stderr]", line) - } - }) + c := NewCobraTestRunner(t, "sync", "--remote-path", repoPath, "--persist-snapshot=true") + c.RunBackground() // First upload assertion - assert.Eventually(t, func() bool { + c.Eventually(func() bool { objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) @@ -306,7 +224,7 @@ func TestAccIncrementalSync(t *testing.T) { defer f.Close() // new file upload assertion - assert.Eventually(t, func() bool { + c.Eventually(func() bool { objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) @@ -329,7 +247,7 @@ func TestAccIncrementalSync(t *testing.T) { // delete a file and assert os.Remove(filepath.Join(projectDir, ".gitkeep")) - assert.Eventually(t, func() bool { + c.Eventually(func() bool { objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, })