Added `process.Background()` and `process.Forwarded()` (#804)

## Changes
This PR adds higher-level wrappers for calling subprocesses. One of the
steps to get https://github.com/databricks/cli/pull/637 in, as
previously discussed.

The reason to add `process.Forwarded()` is to proxy Python's `input()`
calls from a child process seamlessly. Another use-case is plugging in
`less` as a pager for the list results.

## Tests
`make test`
This commit is contained in:
Serge Smertin 2023-09-27 11:04:44 +02:00 committed by GitHub
parent 3ee89c41da
commit 7171874db0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 390 additions and 24 deletions

View File

@ -4,11 +4,11 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"os/exec"
"path" "path"
"strings" "strings"
"github.com/databricks/cli/bundle/config/paths" "github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/process"
"github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/compute"
) )
@ -56,13 +56,14 @@ func (a *Artifact) Build(ctx context.Context) ([]byte, error) {
commands := strings.Split(a.BuildCommand, " && ") commands := strings.Split(a.BuildCommand, " && ")
for _, command := range commands { for _, command := range commands {
buildParts := strings.Split(command, " ") buildParts := strings.Split(command, " ")
cmd := exec.CommandContext(ctx, buildParts[0], buildParts[1:]...) var buf bytes.Buffer
cmd.Dir = a.Path _, err := process.Background(ctx, buildParts,
res, err := cmd.CombinedOutput() process.WithCombinedOutput(&buf),
process.WithDir(a.Path))
if err != nil { if err != nil {
return res, err return buf.Bytes(), err
} }
out = append(out, res) out = append(out, buf.Bytes())
} }
return bytes.Join(out, []byte{}), nil return bytes.Join(out, []byte{}), nil
} }

View File

@ -61,6 +61,7 @@ func executeHook(ctx context.Context, b *bundle.Bundle, hook config.ScriptHook)
return nil, nil, err return nil, nil, err
} }
// TODO: switch to process.Background(...)
cmd := exec.CommandContext(ctx, interpreter, "-c", string(command)) cmd := exec.CommandContext(ctx, interpreter, "-c", string(command))
cmd.Dir = b.Config.Path cmd.Dir = b.Config.Path

19
libs/env/context.go vendored
View File

@ -3,6 +3,7 @@ package env
import ( import (
"context" "context"
"os" "os"
"strings"
) )
var envContextKey int var envContextKey int
@ -61,3 +62,21 @@ func Set(ctx context.Context, key, value string) context.Context {
m[key] = value m[key] = value
return setMap(ctx, m) return setMap(ctx, m)
} }
// All returns environment variables that are defined in both os.Environ
// and this package. `env.Set(ctx, x, y)` will override x from os.Environ.
func All(ctx context.Context) map[string]string {
m := map[string]string{}
for _, line := range os.Environ() {
split := strings.SplitN(line, "=", 2)
if len(split) != 2 {
continue
}
m[split[0]] = split[1]
}
// override existing environment variables with the ones we set
for k, v := range getMap(ctx) {
m[k] = v
}
return m
}

View File

@ -38,4 +38,12 @@ func TestContext(t *testing.T) {
assert.Equal(t, "qux", Get(ctx2, "FOO")) assert.Equal(t, "qux", Get(ctx2, "FOO"))
assert.Equal(t, "baz", Get(ctx1, "FOO")) assert.Equal(t, "baz", Get(ctx1, "FOO"))
assert.Equal(t, "bar", Get(ctx0, "FOO")) assert.Equal(t, "bar", Get(ctx0, "FOO"))
ctx3 := Set(ctx2, "BAR", "x=y")
all := All(ctx3)
assert.NotNil(t, all)
assert.Equal(t, "qux", all["FOO"])
assert.Equal(t, "x=y", all["BAR"])
assert.NotEmpty(t, all["PATH"])
} }

View File

@ -1,13 +1,14 @@
package git package git
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"os/exec" "os/exec"
"regexp" "regexp"
"strings" "strings"
"github.com/databricks/cli/libs/process"
) )
// source: https://stackoverflow.com/questions/59081778/rules-for-special-characters-in-github-repository-name // source: https://stackoverflow.com/questions/59081778/rules-for-special-characters-in-github-repository-name
@ -42,24 +43,18 @@ func (opts cloneOptions) args() []string {
} }
func (opts cloneOptions) clone(ctx context.Context) error { func (opts cloneOptions) clone(ctx context.Context) error {
cmd := exec.CommandContext(ctx, "git", opts.args()...) // start and wait for git clone to complete
var cmdErr bytes.Buffer _, err := process.Background(ctx, append([]string{"git"}, opts.args()...))
cmd.Stderr = &cmdErr
// start git clone
err := cmd.Start()
if errors.Is(err, exec.ErrNotFound) { if errors.Is(err, exec.ErrNotFound) {
return fmt.Errorf("please install git CLI to clone a repository: %w", err) return fmt.Errorf("please install git CLI to clone a repository: %w", err)
} }
var processErr *process.ProcessError
if errors.As(err, &processErr) {
return fmt.Errorf("git clone failed: %w. %s", err, processErr.Stderr)
}
if err != nil { if err != nil {
return fmt.Errorf("git clone failed: %w", err) return fmt.Errorf("git clone failed: %w", err)
} }
// wait for git clone to complete
err = cmd.Wait()
if err != nil {
return fmt.Errorf("git clone failed: %w. %s", err, cmdErr.String())
}
return nil return nil
} }

View File

@ -0,0 +1,59 @@
package process
import (
"bytes"
"context"
"fmt"
"os/exec"
"strings"
"github.com/databricks/cli/libs/env"
"github.com/databricks/cli/libs/log"
)
type ProcessError struct {
Command string
Err error
Stdout string
Stderr string
}
func (perr *ProcessError) Unwrap() error {
return perr.Err
}
func (perr *ProcessError) Error() string {
return fmt.Sprintf("%s: %s", perr.Command, perr.Err)
}
func Background(ctx context.Context, args []string, opts ...execOption) (string, error) {
commandStr := strings.Join(args, " ")
log.Debugf(ctx, "running: %s", commandStr)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
stdout := bytes.Buffer{}
stderr := bytes.Buffer{}
// For background processes, there's no standard input
cmd.Stdin = nil
cmd.Stdout = &stdout
cmd.Stderr = &stderr
// we pull the env through lib/env such that we can run
// parallel tests with anything using libs/process.
for k, v := range env.All(ctx) {
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
}
for _, o := range opts {
err := o(ctx, cmd)
if err != nil {
return "", err
}
}
if err := cmd.Run(); err != nil {
return stdout.String(), &ProcessError{
Err: err,
Command: commandStr,
Stdout: stdout.String(),
Stderr: stderr.String(),
}
}
return stdout.String(), nil
}

View File

@ -0,0 +1,91 @@
package process
import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestBackgroundUnwrapsNotFound(t *testing.T) {
ctx := context.Background()
_, err := Background(ctx, []string{"/bin/meeecho", "1"})
assert.ErrorIs(t, err, os.ErrNotExist)
}
func TestBackground(t *testing.T) {
ctx := context.Background()
res, err := Background(ctx, []string{"echo", "1"}, WithDir("/"))
assert.NoError(t, err)
assert.Equal(t, "1", strings.TrimSpace(res))
}
func TestBackgroundOnlyStdoutGetsoutOnSuccess(t *testing.T) {
ctx := context.Background()
res, err := Background(ctx, []string{
"python3", "-c", "import sys; sys.stderr.write('1'); sys.stdout.write('2')",
})
assert.NoError(t, err)
assert.Equal(t, "2", res)
}
func TestBackgroundCombinedOutput(t *testing.T) {
ctx := context.Background()
buf := bytes.Buffer{}
res, err := Background(ctx, []string{
"python3", "-c", "import sys, time; " +
`sys.stderr.write("1\n"); sys.stderr.flush(); ` +
"time.sleep(0.001); " +
"print('2', flush=True); sys.stdout.flush(); " +
"time.sleep(0.001)",
}, WithCombinedOutput(&buf))
assert.NoError(t, err)
assert.Equal(t, "2", strings.TrimSpace(res))
assert.Equal(t, "1\n2\n", strings.ReplaceAll(buf.String(), "\r", ""))
}
func TestBackgroundCombinedOutputFailure(t *testing.T) {
ctx := context.Background()
buf := bytes.Buffer{}
res, err := Background(ctx, []string{
"python3", "-c", "import sys, time; " +
`sys.stderr.write("1\n"); sys.stderr.flush(); ` +
"time.sleep(0.001); " +
"print('2', flush=True); sys.stdout.flush(); " +
"time.sleep(0.001); " +
"sys.exit(42)",
}, WithCombinedOutput(&buf))
var processErr *ProcessError
if assert.ErrorAs(t, err, &processErr) {
assert.Equal(t, "1", strings.TrimSpace(processErr.Stderr))
assert.Equal(t, "2", strings.TrimSpace(processErr.Stdout))
}
assert.Equal(t, "2", strings.TrimSpace(res))
assert.Equal(t, "1\n2\n", strings.ReplaceAll(buf.String(), "\r", ""))
}
func TestBackgroundNoStdin(t *testing.T) {
ctx := context.Background()
res, err := Background(ctx, []string{"cat"})
assert.NoError(t, err)
assert.Equal(t, "", res)
}
func TestBackgroundFails(t *testing.T) {
ctx := context.Background()
_, err := Background(ctx, []string{"ls", "/dev/null/x"})
assert.NotNil(t, err)
}
func TestBackgroundFailsOnOption(t *testing.T) {
ctx := context.Background()
_, err := Background(ctx, []string{"ls", "/dev/null/x"}, func(_ context.Context, c *exec.Cmd) error {
return fmt.Errorf("nope")
})
assert.EqualError(t, err, "nope")
}

43
libs/process/forwarded.go Normal file
View File

@ -0,0 +1,43 @@
package process
import (
"context"
"fmt"
"io"
"os/exec"
"strings"
"github.com/databricks/cli/libs/env"
"github.com/databricks/cli/libs/log"
)
func Forwarded(ctx context.Context, args []string, src io.Reader, outWriter, errWriter io.Writer, opts ...execOption) error {
commandStr := strings.Join(args, " ")
log.Debugf(ctx, "starting: %s", commandStr)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
// empirical tests showed buffered copies being more responsive
cmd.Stdout = outWriter
cmd.Stderr = errWriter
cmd.Stdin = src
// we pull the env through lib/env such that we can run
// parallel tests with anything using libs/process.
for k, v := range env.All(ctx) {
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
}
// apply common options
for _, o := range opts {
err := o(ctx, cmd)
if err != nil {
return err
}
}
err := cmd.Start()
if err != nil {
return err
}
return cmd.Wait()
}

View File

@ -0,0 +1,43 @@
package process
import (
"bytes"
"context"
"os/exec"
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestForwarded(t *testing.T) {
ctx := context.Background()
var buf bytes.Buffer
err := Forwarded(ctx, []string{
"python3", "-c", "print(input('input: '))",
}, strings.NewReader("abc\n"), &buf, &buf)
assert.NoError(t, err)
assert.Equal(t, "input: abc", strings.TrimSpace(buf.String()))
}
func TestForwardedFails(t *testing.T) {
ctx := context.Background()
var buf bytes.Buffer
err := Forwarded(ctx, []string{
"_non_existent_",
}, strings.NewReader("abc\n"), &buf, &buf)
assert.NotNil(t, err)
}
func TestForwardedFailsOnStdinPipe(t *testing.T) {
ctx := context.Background()
var buf bytes.Buffer
err := Forwarded(ctx, []string{
"_non_existent_",
}, strings.NewReader("abc\n"), &buf, &buf, func(_ context.Context, c *exec.Cmd) error {
c.Stdin = strings.NewReader("x")
return nil
})
assert.NotNil(t, err)
}

57
libs/process/opts.go Normal file
View File

@ -0,0 +1,57 @@
package process
import (
"bytes"
"context"
"fmt"
"io"
"os/exec"
)
type execOption func(context.Context, *exec.Cmd) error
func WithEnv(key, value string) execOption {
return func(ctx context.Context, c *exec.Cmd) error {
v := fmt.Sprintf("%s=%s", key, value)
c.Env = append(c.Env, v)
return nil
}
}
func WithEnvs(envs map[string]string) execOption {
return func(ctx context.Context, c *exec.Cmd) error {
for k, v := range envs {
err := WithEnv(k, v)(ctx, c)
if err != nil {
return err
}
}
return nil
}
}
func WithDir(dir string) execOption {
return func(_ context.Context, c *exec.Cmd) error {
c.Dir = dir
return nil
}
}
func WithStdoutPipe(dst *io.ReadCloser) execOption {
return func(_ context.Context, c *exec.Cmd) error {
outPipe, err := c.StdoutPipe()
if err != nil {
return err
}
*dst = outPipe
return nil
}
}
func WithCombinedOutput(buf *bytes.Buffer) execOption {
return func(_ context.Context, c *exec.Cmd) error {
c.Stdout = io.MultiWriter(buf, c.Stdout)
c.Stderr = io.MultiWriter(buf, c.Stderr)
return nil
}
}

47
libs/process/opts_test.go Normal file
View File

@ -0,0 +1,47 @@
package process
import (
"context"
"os/exec"
"runtime"
"sort"
"testing"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/env"
"github.com/stretchr/testify/assert"
)
func TestWithEnvs(t *testing.T) {
if runtime.GOOS == "windows" {
// Skipping test on windows for now because of the following error:
// /bin/sh -c echo $FOO $BAR: exec: "/bin/sh": file does not exist
t.SkipNow()
}
ctx := context.Background()
ctx2 := env.Set(ctx, "FOO", "foo")
res, err := Background(ctx2, []string{"/bin/sh", "-c", "echo $FOO $BAR"}, WithEnvs(map[string]string{
"BAR": "delirium",
}))
assert.NoError(t, err)
assert.Equal(t, "foo delirium\n", res)
}
func TestWorksWithLibsEnv(t *testing.T) {
testutil.CleanupEnvironment(t)
ctx := context.Background()
cmd := &exec.Cmd{}
err := WithEnvs(map[string]string{
"CCC": "DDD",
"EEE": "FFF",
})(ctx, cmd)
assert.NoError(t, err)
vars := cmd.Environ()
sort.Strings(vars)
assert.True(t, len(vars) >= 2)
assert.Equal(t, "CCC=DDD", vars[0])
assert.Equal(t, "EEE=FFF", vars[1])
}

View File

@ -8,6 +8,8 @@ import (
"os/exec" "os/exec"
"runtime" "runtime"
"strings" "strings"
"github.com/databricks/cli/libs/process"
) )
func PyInline(ctx context.Context, inlinePy string) (string, error) { func PyInline(ctx context.Context, inlinePy string) (string, error) {
@ -88,8 +90,8 @@ func DetectExecutable(ctx context.Context) (string, error) {
func execAndPassErr(ctx context.Context, name string, args ...string) ([]byte, error) { func execAndPassErr(ctx context.Context, name string, args ...string) ([]byte, error) {
// TODO: move out to a separate package, once we have Maven integration // TODO: move out to a separate package, once we have Maven integration
out, err := exec.CommandContext(ctx, name, args...).Output() out, err := process.Background(ctx, append([]string{name}, args...))
return out, nicerErr(err) return []byte(out), nicerErr(err)
} }
func getFirstMatch(out string) string { func getFirstMatch(out string) string {

View File

@ -20,7 +20,7 @@ func TestExecAndPassError(t *testing.T) {
} }
_, err := execAndPassErr(context.Background(), "which", "__non_existing__") _, err := execAndPassErr(context.Background(), "which", "__non_existing__")
assert.EqualError(t, err, "exit status 1") assert.EqualError(t, err, "which __non_existing__: exit status 1")
} }
func TestDetectPython(t *testing.T) { func TestDetectPython(t *testing.T) {
@ -77,7 +77,7 @@ func testTempdir(t *testing.T, dir *string) func() {
func TestPyError(t *testing.T) { func TestPyError(t *testing.T) {
_, err := Py(context.Background(), "__non_existing__.py") _, err := Py(context.Background(), "__non_existing__.py")
assert.Contains(t, err.Error(), "can't open file") assert.Contains(t, err.Error(), "exit status 2")
} }
func TestPyInline(t *testing.T) { func TestPyInline(t *testing.T) {
@ -90,5 +90,5 @@ func TestPyInlineStderr(t *testing.T) {
DetectExecutable(context.Background()) DetectExecutable(context.Background())
inline := "import sys; sys.stderr.write('___msg___'); sys.exit(1)" inline := "import sys; sys.stderr.write('___msg___'); sys.exit(1)"
_, err := PyInline(context.Background(), inline) _, err := PyInline(context.Background(), inline)
assert.EqualError(t, err, "___msg___") assert.ErrorContains(t, err, "___msg___")
} }