implement

This commit is contained in:
Shreyas Goenka 2025-02-17 19:23:02 +01:00
parent d86ccf6b49
commit 6148681ed1
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
20 changed files with 359 additions and 44 deletions

View File

@ -77,6 +77,7 @@ func TestInprocessMode(t *testing.T) {
require.Equal(t, 1, testAccept(t, true, "selftest/server"))
}
// TODO: Maybe add flag to mark tests that cannot be completely debugged in test.toml.
func testAccept(t *testing.T, InprocessMode bool, singleTest string) int {
repls := testdiff.ReplacementsContext{}
cwd, err := os.Getwd()

42
acceptance/bin/wait_pid Executable file
View File

@ -0,0 +1,42 @@
#!/bin/bash
# wait <pid> in bash only works for processes that are direct children to the calling
# shell. This script is more general purpose.
wait_pid() {
local pid=$1
local max_attempts=100 # 100 * 0.1 seconds = 10 seconds
local attempt=0
local sleep_time=0.1
while [ $attempt -lt $max_attempts ]; do
if [[ "$OSTYPE" == "msys"* || "$OSTYPE" == "cygwin"* ]]; then
# Windows approach
if ! tasklist | grep -q $pid; then
echo "Process has ended"
return 0
fi
else
# Linux/macOS approach
if ! kill -0 $pid 2>/dev/null; then
echo "Process has ended"
return 0
fi
fi
sleep $sleep_time
attempt=$((attempt + 1))
done
echo "Timeout: Process $pid did not end within 10 seconds"
return 1
}
# Usage
if [ $# -eq 0 ]; then
echo "Usage: $0 <PID>"
exit 1
fi
wait_pid $1
exit $?

View File

@ -72,7 +72,7 @@
10:07:59 Debug: Apply pid=12345 mutator=initialize mutator=seq mutator=metadata.AnnotatePipelines
10:07:59 Debug: Apply pid=12345 mutator=initialize mutator=seq mutator=terraform.Initialize
10:07:59 Debug: Using Terraform from DATABRICKS_TF_EXEC_PATH at [TERRAFORM] pid=12345 mutator=initialize mutator=seq mutator=terraform.Initialize
10:07:59 Debug: Using Terraform CLI config from DATABRICKS_TF_CLI_CONFIG_FILE at [DATABRICKS_TF_CLI_CONFIG_FILE] pid=12345 mutator=initialize mutator=seq mutator=terraform.Initialize
10:07:59 Debug: DATABRICKS_TF_PROVIDER_VERSION as 1.62.0 does not match the current version 1.65.1, ignoring DATABRICKS_TF_CLI_CONFIG_FILE pid=12345 mutator=initialize mutator=seq mutator=terraform.Initialize
10:07:59 Debug: Environment variables for Terraform: ...redacted... pid=12345 mutator=initialize mutator=seq mutator=terraform.Initialize
10:07:59 Debug: Apply pid=12345 mutator=initialize mutator=seq mutator=scripts.postinit
10:07:59 Debug: No script defined for postinit, skipping pid=12345 mutator=initialize mutator=seq mutator=scripts.postinit

View File

@ -0,0 +1,13 @@
Parent process has started
Started the child process
Provided input: Hello from the other side
Released the child process
Parent process is exiting
====================
All output from this point on is from the child process
Parent process has exited
Received input from parent process:
Hello from the other side

View File

@ -0,0 +1,2 @@
waiting for child process to finish
Process has ended

View File

@ -0,0 +1,6 @@
export DATABRICKS_CLI_SELFTEST_CHILD_OUTPUT_FILE="out.parentchild.txt"
$CLI selftest parent &> out.parentchild.txt
echo "waiting for child process to finish"
wait_pid $(cat ./child.pid)

View File

@ -129,6 +129,7 @@ Additional Commands:
configure Configure authentication
help Help about any command
labs Manage Databricks Labs installations
selftest Non functional CLI commands that are useful for testing
version Retrieve information about the current version of this CLI
Flags:
@ -139,4 +140,7 @@ Flags:
-t, --target string bundle target to use (if applicable)
-v, --version version for databricks
Additional help topics:
databricks stream-
Use "databricks [command] --help" for more information about a command.

View File

@ -12,6 +12,7 @@ import (
"github.com/databricks/cli/cmd/fs"
"github.com/databricks/cli/cmd/labs"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/cmd/selftest"
"github.com/databricks/cli/cmd/sync"
"github.com/databricks/cli/cmd/version"
"github.com/databricks/cli/cmd/workspace"
@ -74,6 +75,7 @@ func New(ctx context.Context) *cobra.Command {
cli.AddCommand(labs.New(ctx))
cli.AddCommand(sync.New())
cli.AddCommand(version.New())
cli.AddCommand(selftest.New())
cli.AddCommand(&cobra.Command{
Use: "stream-",

70
cmd/selftest/child.go Normal file
View File

@ -0,0 +1,70 @@
package selftest
import (
"errors"
"fmt"
"io"
"os"
"strconv"
"github.com/databricks/cli/libs/daemon"
"github.com/databricks/cli/libs/process"
"github.com/spf13/cobra"
)
// TODO: Look into the release function and ensure whether I need to call it.
const ()
// TODO CONTINUE: Write command that wait for each other via the PID.
// Ensure to check the process name as the PID otherwise can be reused pretty
// quick.
//
// Implement dummy child and parent commands, and write acceptance tests to account
// for all variations.
//
// Ensure that a robust timeout mechanism exists for the telemetry process. We
// do not want the daemons to hang indefinitely. Can this also be tested?
//
// TODO: One set of tests will be asserting that the tests have the right
// properties. A thread on my personal slack account will help with that.
// The other set of tests will assert on the functional behaviour, that the
// parent and child process are indeed indpenedent, and that the child process
// does not block the parent process.
//
// All this requires some PID handler which get the process information based on
// the PID and some "name", since PIDs can be reused, being a source of flakyness.
//
// TODO: Make sure to acknowledge the risk of failing when people try to delete
// the binary in windows.
//
// TODO: Ensure that child stdout / stderr are not sent to the parent process.
func newChildCommand() *cobra.Command {
return &cobra.Command{
Use: "child",
RunE: func(cmd *cobra.Command, args []string) error {
parentPid, err := strconv.Atoi(os.Getenv(daemon.DatabricksCliParentPid))
if err != nil {
return fmt.Errorf("failed to parse parent PID: %w", err)
}
err = process.Wait(parentPid)
if err != nil && !errors.As(err, &process.ErrProcessNotFound{}) {
return fmt.Errorf("failed to wait for parent process: %w", err)
}
fmt.Println("\n====================")
fmt.Println("\nAll output from this point on is from the child process")
fmt.Println("Parent process has exited")
in, err := io.ReadAll(os.Stdin)
if err != nil {
return fmt.Errorf("failed to read from stdin: %w", err)
}
fmt.Println("Received input from parent process:")
fmt.Println(string(in))
return nil
},
}
}

48
cmd/selftest/parent.go Normal file
View File

@ -0,0 +1,48 @@
package selftest
import (
"fmt"
"os"
"github.com/databricks/cli/libs/daemon"
"github.com/spf13/cobra"
)
const OutputFile = "DATABRICKS_CLI_SELFTEST_CHILD_OUTPUT_FILE"
func newParentCommand() *cobra.Command {
return &cobra.Command{
Use: "parent",
RunE: func(cmd *cobra.Command, args []string) error {
fmt.Println("Parent process has started")
d := daemon.Daemon{
Env: os.Environ(),
Args: []string{"selftest", "child"},
LogFile: os.Getenv(OutputFile),
PidFilePath: "child.pid",
}
err := d.Start()
if err != nil {
return fmt.Errorf("failed to start child process: %w", err)
}
fmt.Println("Started the child process")
err = d.WriteInput([]byte("Hello from the other side\n"))
if err != nil {
return fmt.Errorf("failed to write to child process: %w", err)
}
fmt.Println("Provided input: Hello from the other side")
err = d.Release()
if err != nil {
return fmt.Errorf("failed to release child process: %w", err)
}
fmt.Println("Released the child process")
fmt.Println("Parent process is exiting")
return nil
},
}
}

View File

@ -1,26 +0,0 @@
package selftest
import (
"os"
"github.com/spf13/cobra"
)
const (
PrintStdinParentPid = "DATABRICKS_CLI_PRINT_STDIN_PARENT_PID"
)
// TODO CONTINUE: Write command that wait for each other via the PID.
// Ensure to check the process name as the PID otherwise can be reused pretty
// quick.
func newPrintStdin() *cobra.Command {
return &cobra.Command{
Use: "print-stdin",
RunE: func(cmd *cobra.Command, args []string) error {
if os.Getenv(PrintStdinParentPid) != "" {
return nil
}
},
}
}

View File

@ -10,6 +10,7 @@ func New() *cobra.Command {
Short: "Non functional CLI commands that are useful for testing",
}
cmd.AddCommand(newPrintStdin())
cmd.AddCommand(newChildCommand())
cmd.AddCommand(newParentCommand())
return cmd
}

View File

@ -1,6 +1,7 @@
package daemon
import (
"context"
"fmt"
"io"
"os"
@ -8,16 +9,26 @@ import (
"strconv"
)
const DatabricksCliParentPid = "DATABRICKS_CLI_PARENT_PID"
type Daemon struct {
// TODO: remove this.
ctx context.Context
// If provided, the child process will create a pid file at this path.
// TODO: Can we remove this?
PidFilePath string
// Environment variables to set in the child process.
Env []string
// Arguments to pass to the child process.
// Arguments to pass to the child process. The main executable is always the CLI
// binary itself.
Args []string
// Log file to write the child process's output to.
LogFile string
cmd *exec.Cmd
stdin io.WriteCloser
}
@ -29,9 +40,30 @@ func (d *Daemon) Start() error {
}
d.cmd = exec.Command(cli, d.Args...)
// Set environment variable so that the child process know's it's parent's PID.
d.Env = append(d.Env, fmt.Sprintf("%s=%d", DatabricksCliParentPid, os.Getpid()))
d.cmd.Env = d.Env
d.cmd.SysProcAttr = sysProcAttr()
// By default redirect stdout and stderr to /dev/null.
// TODO: Test that by default stdout and stderr do not leak to the parent process.
d.cmd.Stdout = nil
d.cmd.Stderr = nil
// If a log file is provided, redirect stdout and stderr to the log file.
if d.LogFile != "" {
f, err := os.OpenFile(d.LogFile, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return fmt.Errorf("failed to open log file: %w", err)
}
defer f.Close()
d.cmd.Stdout = f
d.cmd.Stderr = f
}
d.stdin, err = d.cmd.StdinPipe()
if err != nil {
return fmt.Errorf("failed to get stdin pipe: %w", err)
@ -52,14 +84,12 @@ func (d *Daemon) Start() error {
return nil
}
func (d *Daemon) Release() error {
if d.PidFilePath != "" {
err := os.Remove(d.PidFilePath)
if err != nil {
return fmt.Errorf("failed to remove pid file: %w", err)
}
}
func (d *Daemon) WriteInput(b []byte) error {
_, err := d.stdin.Write(b)
return err
}
func (d *Daemon) Release() error {
if d.stdin != nil {
err := d.stdin.Close()
if err != nil {
@ -71,5 +101,7 @@ func (d *Daemon) Release() error {
return nil
}
// This does not seem to be strictly necessary, but the docs recommend
// adding it if Wait is not called. Thus we add it here to be safe.
return d.cmd.Process.Release()
}

View File

@ -12,13 +12,6 @@ func sysProcAttr() *syscall.SysProcAttr {
// Create a new session for the child process. This ensures that the daemon
// is not terminated when the parent session is closed. This can happen
// for example when a ssh session is terminated.
// TODO: Test this.
Setsid: true,
Noctty: true,
// Start a new process group for the child process. This ensures that
// termination signals to the parent's process group are not propagated to
// the child process.
Setpgid: true,
}
}

View File

@ -10,6 +10,7 @@ import (
func sysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
CreationFlags: windows.CREATE_NEW_PROCESS_GROUP | windows.DETACHED_PROCESS | windows.CREATE_NO_WINDOW,
HideWindow: true,
CreationFlags: windows.CREATE_NEW_PROCESS_GROUP | windows.DETACHED_PROCESS,
}
}

15
libs/process/wait.go Normal file
View File

@ -0,0 +1,15 @@
package process
import "fmt"
type ErrProcessNotFound struct {
Pid int
}
func (e ErrProcessNotFound) Error() string {
return fmt.Sprintf("process with pid %d does not exist", e.Pid)
}
func Wait(pid int) error {
return waitForPid(pid)
}

13
libs/process/wait_test.go Normal file
View File

@ -0,0 +1,13 @@
package process
import (
"testing"
"github.com/stretchr/testify/assert"
)
// TODO: Test this in windows. Setup an IDE.
func TestWait(t *testing.T) {
err := Wait(1000000)
assert.EqualError(t, err, "process with pid 1000000 does not exist")
}

36
libs/process/wait_unix.go Normal file
View File

@ -0,0 +1,36 @@
//go:build linux || darwin
package process
import (
"errors"
"os"
"syscall"
"time"
)
func waitForPid(pid int) error {
p, err := os.FindProcess(pid)
if err != nil {
return err
}
// Initial existence check.
if err := p.Signal(syscall.Signal(0)); err != nil {
if errors.Is(err, os.ErrProcessDone) {
return ErrProcessNotFound{Pid: pid}
}
return err
}
// Polling loop until process exits
for {
if err := p.Signal(syscall.Signal(0)); err != nil {
if errors.Is(err, os.ErrProcessDone) {
return nil
}
return err
}
time.Sleep(100 * time.Millisecond)
}
}

View File

@ -0,0 +1,18 @@
package process
import (
"runtime"
"testing"
"github.com/stretchr/testify/assert"
)
func TestWaitForPidUnix(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
// Out of bounds pid. Should return an error.
err := waitForPid(1000000)
assert.EqualError(t, err, "process with pid 1000000 does not exist")
}

View File

@ -0,0 +1,44 @@
//go:build windows
package process
import (
"errors"
"fmt"
"time"
"golang.org/x/sys/windows"
)
func waitForPid(pid int) error {
handle, err := windows.OpenProcess(
windows.SYNCHRONIZE|windows.PROCESS_QUERY_INFORMATION,
false,
uint32(pid),
)
if errors.Is(err, windows.ERROR_INVALID_PARAMETER) {
return ErrProcessDoesNotExist{Pid: pid}
}
if err != nil {
return fmt.Errorf("OpenProcess failed: %v", err)
}
defer windows.CloseHandle(handle)
// Wait forever for the process to exit. Wait for 5 minutes max.
ret, err := windows.WaitForSingleObject(handle, uint32(5*time.Minute.Milliseconds()))
if err != nil {
return fmt.Errorf("Wait failed: %v", err)
}
switch ret {
case windows.WAIT_OBJECT_0:
return nil // Process exited
case 0x00000102:
// Standard library does not have have a constant defined for this
// so we use the hex value directly. This is the WAIT_TIMEOUT value.
// ref: https://learn.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-waitforsingleobject#return-value
return fmt.Errorf("process wait timed out")
default:
return fmt.Errorf("unexpected process wait return value: %d", ret)
}
}