Compare commits

...

60 Commits

Author SHA1 Message Date
shreyas-goenka e435344f80
Merge 79fad7abb1 into f2096eddcc 2025-02-11 15:23:32 +05:30
Shreyas Goenka 79fad7abb1
pass test 2025-02-10 17:12:57 +01:00
Shreyas Goenka 58bf931308
comment 2025-02-10 17:11:49 +01:00
Shreyas Goenka c8ac08c24b
consolidate test.toml 2025-02-10 15:59:24 +01:00
Shreyas Goenka 9d65761122
pretty print 2025-02-10 15:55:31 +01:00
Shreyas Goenka 23b42e9f8f
Merge remote-tracking branch 'origin' into async-logger-clean 2025-02-10 15:53:57 +01:00
Shreyas Goenka a6e8e9285a
Merge remote-tracking branch 'origin' into async-logger-clean 2025-02-10 13:52:26 +01:00
Shreyas Goenka f88db7799d
- 2025-02-10 13:52:21 +01:00
Shreyas Goenka e43a0a0262
skip end to end integration tests 2025-02-06 11:41:50 +01:00
Shreyas Goenka 369faff1b4
return 501 2025-02-05 21:06:16 +01:00
Shreyas Goenka 0253039aa2
- 2025-02-05 20:59:01 +01:00
Shreyas Goenka 5e2e03a90c
fix test 2025-02-05 20:48:08 +01:00
Shreyas Goenka 73fac825ce
remove pid file 2025-02-05 15:32:19 +01:00
Shreyas Goenka fd6b129582
make test generic 2025-02-05 15:30:34 +01:00
Shreyas Goenka c9ebc82232
add error test case' 2025-02-05 15:28:45 +01:00
Shreyas Goenka 1bb45377e0
Merge remote-tracking branch 'origin' into async-logger-clean 2025-02-05 15:20:34 +01:00
Shreyas Goenka 8c90ad0ec4
clean 2025-02-04 16:52:13 +01:00
Shreyas Goenka d5e03f08d5
- 2025-02-04 16:25:59 +01:00
Shreyas Goenka 414a94df3b
- 2025-02-04 16:04:07 +01:00
Shreyas Goenka 407e9e0ef0
- 2025-02-04 16:03:32 +01:00
Shreyas Goenka 0abba860f2
= 2025-02-04 16:03:08 +01:00
Shreyas Goenka 9e2a689619
- 2025-02-04 16:02:14 +01:00
Shreyas Goenka 963022af0f
- 2025-02-04 16:00:43 +01:00
Shreyas Goenka 5b6ffd57bf
- 2025-02-04 15:58:12 +01:00
Shreyas Goenka 17698a5147
- 2025-02-04 15:57:50 +01:00
Shreyas Goenka 33ff865d6e
cleaner output 2025-02-04 15:56:23 +01:00
Shreyas Goenka 39ff2909db
pass test 2025-02-04 15:52:39 +01:00
Shreyas Goenka 918af62827
remove eventually files 2025-02-04 15:44:03 +01:00
Shreyas Goenka 981dbf787d
add bash script for waiting 2025-02-04 15:34:25 +01:00
Shreyas Goenka 0423b09733
add filtering for auth 2025-02-03 20:31:25 +01:00
Shreyas Goenka 403f61228d
address comments 2025-02-03 13:41:21 +01:00
Shreyas Goenka f3e7594f39
fx test 2025-02-03 13:14:38 +01:00
Shreyas Goenka 2cbc39fdc9
- 2025-02-03 13:10:42 +01:00
Shreyas Goenka 2cd25e388e
- 2025-02-03 12:24:32 +01:00
Shreyas Goenka dc0ab300dd
- 2025-02-03 12:22:22 +01:00
Shreyas Goenka 5c2205a6f7
- 2025-02-03 12:17:25 +01:00
Shreyas Goenka a8b366ee79
- 2025-02-03 12:16:49 +01:00
Shreyas Goenka 4f979007af
- 2025-02-03 12:14:23 +01:00
Shreyas Goenka c1a322555a
fx test 2025-02-03 12:09:54 +01:00
Shreyas Goenka 5385faf7d8
fix test 2025-02-03 11:54:01 +01:00
Shreyas Goenka 88015876ad
pass test 2025-02-03 11:42:17 +01:00
Shreyas Goenka da0cf951b9
- 2025-02-03 06:28:37 +01:00
Shreyas Goenka c412eb7666
- 2025-02-03 06:28:16 +01:00
Shreyas Goenka 382efe41f8
- 2025-02-03 06:27:34 +01:00
Shreyas Goenka d7bf1dc87e
add test for upload 2025-02-03 05:19:26 +01:00
Shreyas Goenka e4a1f42737
- 2025-02-03 04:40:01 +01:00
Shreyas Goenka 259a21a120
- 2025-02-03 04:33:21 +01:00
Shreyas Goenka 90148d8a50
replace os 2025-02-03 04:26:58 +01:00
Shreyas Goenka f09a780887
fix panic 2025-02-03 04:24:09 +01:00
Shreyas Goenka b83e57621e
- 2025-02-03 04:20:24 +01:00
Shreyas Goenka 427c755ea7
major cleanup 2025-02-03 04:18:38 +01:00
Shreyas Goenka 5d75c3f098
Merge remote-tracking branch 'origin' into implement-async-logger 2025-02-03 00:21:02 +01:00
Shreyas Goenka f092e21594
add back worker input 2025-01-29 16:45:21 +01:00
Shreyas Goenka 8f8463f665
- 2025-01-29 16:05:25 +01:00
Shreyas Goenka ee3568cf64
-' 2025-01-29 16:02:41 +01:00
Shreyas Goenka acd64fa296
merge 2025-01-29 16:02:09 +01:00
Shreyas Goenka 155fe7b83d
- 2025-01-29 16:00:23 +01:00
Shreyas Goenka 01d63dd20e
Merge remote-tracking branch 'origin' into implement-async-logger 2025-01-23 14:19:46 +01:00
Shreyas Goenka 3964d8d454
[WIP] In process telemetry logger 2025-01-22 22:03:13 +01:00
Shreyas Goenka ab10720027
squash all commits 2025-01-20 17:28:02 +01:00
33 changed files with 749 additions and 9 deletions

27
acceptance/bin/wait_file Executable file
View File

@ -0,0 +1,27 @@
#!/bin/bash
wait_file() {
local file_path="$1"
local max_attempts=100
local attempt=0
while [ $attempt -lt $max_attempts ]; do
if [ -e "$file_path" ]; then
echo "File $file_path exists"
return 0
fi
sleep 0.1
attempt=$((attempt + 1))
done
echo "Timeout: File $file_path did not appear within 10 seconds"
return 1
}
if [ $# -eq 0 ]; then
echo "Usage: $0 <file_path>"
exit 1
fi
wait_file "$1"
exit $?

42
acceptance/bin/wait_pid Executable file
View File

@ -0,0 +1,42 @@
#!/bin/bash
# wait <pid> in bash only works for processes that are direct children to the calling
# shell. This script is more general purpose.
wait_pid() {
local pid=$1
local max_attempts=100 # 100 * 0.1 seconds = 10 seconds
local attempt=0
local sleep_time=0.1
while [ $attempt -lt $max_attempts ]; do
if [[ "$OSTYPE" == "msys"* || "$OSTYPE" == "cygwin"* ]]; then
# Windows approach
if ! tasklist | grep -q $pid; then
echo "Process has ended"
return 0
fi
else
# Linux/macOS approach
if ! kill -0 $pid 2>/dev/null; then
echo "Process has ended"
return 0
fi
fi
sleep $sleep_time
attempt=$((attempt + 1))
done
echo "Timeout: Process $pid did not end within 10 seconds"
return 1
}
# Usage
if [ $# -eq 0 ]; then
echo "Usage: $0 <PID>"
exit 1
fi
wait_pid $1
exit $?

View File

@ -0,0 +1,12 @@
{
"method": "POST",
"path": "/telemetry-ext",
"body": {
"uploadTime": "UNIX_TIME_MILLIS",
"items": [],
"protoLogs": [
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"telemetry_dummy\",\"operating_system\":\"OS\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}",
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"telemetry_dummy\",\"operating_system\":\"OS\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}"
]
}
}

View File

@ -0,0 +1,2 @@
error: Failed to upload telemetry logs: Endpoint not implemented.

View File

@ -0,0 +1,5 @@
>>> [CLI] telemetry dummy
waiting for telemetry process to finish
File ./telemetry.pid exists
Process has ended

View File

@ -0,0 +1,15 @@
export DATABRICKS_CLI_TELEMETRY_PID_FILE=./telemetry.pid
export DATABRICKS_CLI_TELEMETRY_UPLOAD_LOGS_FILE=./out.upload.txt
# This test ensures that the main CLI command does not error even if
# telemetry upload fails.
trace $CLI telemetry dummy
echo "waiting for telemetry process to finish"
# Wait for the child telemetry process to finish
wait_file ./telemetry.pid
wait_pid $(cat ./telemetry.pid)
# cleanup the pid file
rm -f ./telemetry.pid

View File

@ -0,0 +1,13 @@
[[Server]]
Pattern = "POST /telemetry-ext"
Response.Body = '''
{
"error_code": "ERROR_CODE",
"message": "Endpoint not implemented."
}
'''
Response.StatusCode = 501
[[Repls]]
Old = 'execution_time_ms\\\":\d{1,5},'
New = 'execution_time_ms\":\"SMALL_INT\",'

View File

@ -0,0 +1,12 @@
{
"method": "POST",
"path": "/telemetry-ext",
"body": {
"uploadTime": "UNIX_TIME_MILLIS",
"items": [],
"protoLogs": [
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"telemetry_dummy\",\"operating_system\":\"OS\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}",
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"telemetry_dummy\",\"operating_system\":\"OS\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}"
]
}
}

View File

@ -0,0 +1,3 @@
Telemetry logs uploaded successfully
Response:
{"errors":null,"numProtoSuccess":2}

View File

@ -0,0 +1,5 @@
>>> [CLI] telemetry dummy
waiting for telemetry process to finish
File ./telemetry.pid exists
Process has ended

View File

@ -0,0 +1,13 @@
export DATABRICKS_CLI_TELEMETRY_PID_FILE=./telemetry.pid
export DATABRICKS_CLI_TELEMETRY_UPLOAD_LOGS_FILE=./out.upload.txt
trace $CLI telemetry dummy
echo "waiting for telemetry process to finish"
# Wait for the child telemetry process to finish
wait_file ./telemetry.pid
wait_pid $(cat ./telemetry.pid)
# cleanup the pid file
rm -f ./telemetry.pid

View File

@ -0,0 +1,11 @@
[[Server]]
Pattern = "POST /telemetry-ext"
Response.Body = '''
{
"numProtoSuccess": 2
}
'''
[[Repls]]
Old = 'execution_time_ms\\\":\d{1,5},'
New = 'execution_time_ms\":\"SMALL_INT\",'

View File

@ -0,0 +1,10 @@
LocalOnly = true
RecordRequests = true
[[Repls]]
Old = '17\d{11}'
New = '"UNIX_TIME_MILLIS"'
[[Repls]]
Old = 'darwin|linux|windows'
New = 'OS'

View File

@ -0,0 +1,12 @@
{
"method": "POST",
"path": "/telemetry-ext",
"body": {
"uploadTime": "UNIX_TIME_MILLIS",
"items": [],
"protoLogs": [
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE1\"}}}}",
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}"
]
}
}

View File

@ -0,0 +1,3 @@
Telemetry logs uploaded successfully
Response:
{"errors":null,"numProtoSuccess":2}

View File

@ -0,0 +1,2 @@
>>> [CLI] telemetry upload

View File

@ -0,0 +1,5 @@
export DATABRICKS_CLI_TELEMETRY_UPLOAD_LOGS_FILE=./out.upload.txt
# This command / test cannot be run in inprocess / debug mode. This is because
# it does not go through the [root.Execute] function.
trace $CLI telemetry upload < stdin

View File

@ -0,0 +1,24 @@
{
"logs": [
{
"frontend_log_event_id": "BB79BB52-96F6-42C5-9E44-E63EEA84888D",
"entry": {
"databricks_cli_log": {
"cli_test_event": {
"name": "VALUE1"
}
}
}
},
{
"frontend_log_event_id": "A7F597B0-66D1-462D-824C-C5C706F232E8",
"entry": {
"databricks_cli_log": {
"cli_test_event": {
"name": "VALUE2"
}
}
}
}
]
}

View File

@ -0,0 +1,7 @@
[[Server]]
Pattern = "POST /telemetry-ext"
Response.Body = '''
{
"numProtoSuccess": 2
}
'''

View File

@ -13,6 +13,7 @@ import (
"github.com/databricks/cli/cmd/labs"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/cmd/sync"
"github.com/databricks/cli/cmd/telemetry"
"github.com/databricks/cli/cmd/version"
"github.com/databricks/cli/cmd/workspace"
"github.com/spf13/cobra"
@ -74,6 +75,6 @@ func New(ctx context.Context) *cobra.Command {
cli.AddCommand(labs.New(ctx))
cli.AddCommand(sync.New())
cli.AddCommand(version.New())
cli.AddCommand(telemetry.New())
return cli
}

View File

@ -2,16 +2,26 @@ package root
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"os"
"os/exec"
"runtime"
"slices"
"strconv"
"strings"
"time"
"github.com/databricks/cli/internal/build"
"github.com/databricks/cli/libs/auth"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/dbr"
"github.com/databricks/cli/libs/env"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/telemetry"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/spf13/cobra"
)
@ -73,9 +83,6 @@ func New(ctx context.Context) *cobra.Command {
// get the context back
ctx = cmd.Context()
// Detect if the CLI is running on DBR and store this on the context.
ctx = dbr.DetectRuntime(ctx)
// Configure our user agent with the command that's about to be executed.
ctx = withCommandInUserAgent(ctx, cmd)
ctx = withCommandExecIdInUserAgent(ctx)
@ -97,7 +104,9 @@ func flagErrorFunc(c *cobra.Command, err error) error {
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute(ctx context.Context, cmd *cobra.Command) error {
// TODO: deferred panic recovery
ctx = telemetry.WithNewLogger(ctx)
ctx = dbr.DetectRuntime(ctx)
start := time.Now()
// Run the command
cmd, err := cmd.ExecuteContextC(ctx)
@ -126,5 +135,110 @@ func Execute(ctx context.Context, cmd *cobra.Command) error {
}
}
end := time.Now()
exitCode := 0
if err != nil {
exitCode = 1
}
uploadTelemetry(cmd.Context(), commandString(cmd), start, end, exitCode)
return err
}
// We want child telemetry processes to inherit environment variables like $HOME or $HTTPS_PROXY
// because they influence auth resolution.
func inheritEnvVars() []string {
base := os.Environ()
out := []string{}
authEnvVars := auth.EnvVars()
// Remove any existing auth environment variables. This is done because
// the CLI offers multiple modalities of configuring authentication like
// `--profile` or `DATABRICKS_CONFIG_PROFILE` or `profile: <profile>` in the
// bundle config file.
//
// Each of these modalities have different priorities and thus we don't want
// any auth configuration to piggyback into the child process environment.
//
// This is a precaution to avoid conflicting auth configurations being passed
// to the child telemetry process.
for _, v := range base {
k, _, found := strings.Cut(v, "=")
if !found {
continue
}
if slices.Contains(authEnvVars, k) {
continue
}
out = append(out, v)
}
return out
}
func uploadTelemetry(ctx context.Context, cmdStr string, start, end time.Time, exitCode int) {
// Nothing to upload.
if !telemetry.HasLogs(ctx) {
return
}
telemetry.SetExecutionContext(ctx, protos.ExecutionContext{
CmdExecID: cmdExecId,
Version: build.GetInfo().Version,
Command: cmdStr,
OperatingSystem: runtime.GOOS,
DbrVersion: env.Get(ctx, dbr.EnvVarName),
ExecutionTimeMs: end.Sub(start).Milliseconds(),
ExitCode: int64(exitCode),
})
logs := telemetry.GetLogs(ctx)
in := telemetry.UploadConfig{
Logs: logs,
}
execPath, err := os.Executable()
if err != nil {
log.Debugf(ctx, "failed to get executable path: %s", err)
}
telemetryCmd := exec.Command(execPath, "telemetry", "upload")
telemetryCmd.Env = inheritEnvVars()
for k, v := range auth.Env(ConfigUsed(ctx)) {
telemetryCmd.Env = append(telemetryCmd.Env, fmt.Sprintf("%s=%s", k, v))
}
b, err := json.Marshal(in)
if err != nil {
log.Debugf(ctx, "failed to marshal telemetry logs: %s", err)
return
}
stdin, err := telemetryCmd.StdinPipe()
if err != nil {
log.Debugf(ctx, "failed to create stdin pipe for telemetry worker: %s", err)
}
err = telemetryCmd.Start()
if err != nil {
log.Debugf(ctx, "failed to start telemetry worker: %s", err)
return
}
if pidFilePath := env.Get(ctx, telemetry.PidFileEnvVar); pidFilePath != "" {
err = os.WriteFile(pidFilePath, []byte(strconv.Itoa(telemetryCmd.Process.Pid)), 0o644)
if err != nil {
log.Debugf(ctx, "failed to write telemetry worker PID file: %s", err)
}
}
_, err = stdin.Write(b)
if err != nil {
log.Debugf(ctx, "failed to write to telemetry worker: %s", err)
}
err = stdin.Close()
if err != nil {
log.Debugf(ctx, "failed to close stdin for telemetry worker: %s", err)
}
}

View File

@ -7,8 +7,10 @@ import (
"github.com/google/uuid"
)
var cmdExecId = uuid.New().String()
func withCommandExecIdInUserAgent(ctx context.Context) context.Context {
// A UUID that will allow us to correlate multiple API requests made by
// the same CLI invocation.
return useragent.InContext(ctx, "cmd-exec-id", uuid.New().String())
return useragent.InContext(ctx, "cmd-exec-id", cmdExecId)
}

31
cmd/telemetry/dummy.go Normal file
View File

@ -0,0 +1,31 @@
package telemetry
import (
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/telemetry"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/spf13/cobra"
)
func newDummyCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "dummy",
Short: "log dummy telemetry events",
Long: "Fire a test telemetry event against the configured Databricks workspace.",
Hidden: true,
PreRunE: root.MustWorkspaceClient,
}
cmd.RunE = func(cmd *cobra.Command, args []string) error {
for _, v := range []string{"VALUE1", "VALUE2"} {
telemetry.Log(cmd.Context(), protos.DatabricksCliLog{
CliTestEvent: &protos.CliTestEvent{
Name: protos.DummyCliEnum(v),
},
})
}
return nil
}
return cmd
}

View File

@ -0,0 +1,16 @@
package telemetry
import (
"github.com/spf13/cobra"
)
func New() *cobra.Command {
cmd := &cobra.Command{
Use: "telemetry",
Short: "",
Hidden: true,
}
cmd.AddCommand(newDummyCommand())
return cmd
}

View File

@ -24,3 +24,17 @@ func Env(cfg *config.Config) map[string]string {
return out
}
func EnvVars() []string {
out := []string{}
for _, attr := range config.ConfigAttributes {
if len(attr.EnvVars) == 0 {
continue
}
out = append(out, attr.EnvVars[0])
}
return out
}

View File

@ -40,3 +40,40 @@ func TestAuthEnv(t *testing.T) {
out := Env(in)
assert.Equal(t, expected, out)
}
func TestAuthEnvVars(t *testing.T) {
expected := []string{
"DATABRICKS_HOST",
"DATABRICKS_CLUSTER_ID",
"DATABRICKS_WAREHOUSE_ID",
"DATABRICKS_SERVERLESS_COMPUTE_ID",
"DATABRICKS_METADATA_SERVICE_URL",
"DATABRICKS_ACCOUNT_ID",
"DATABRICKS_TOKEN",
"DATABRICKS_USERNAME",
"DATABRICKS_PASSWORD",
"DATABRICKS_CONFIG_PROFILE",
"DATABRICKS_CONFIG_FILE",
"DATABRICKS_GOOGLE_SERVICE_ACCOUNT",
"GOOGLE_CREDENTIALS",
"DATABRICKS_AZURE_RESOURCE_ID",
"ARM_USE_MSI",
"ARM_CLIENT_SECRET",
"ARM_CLIENT_ID",
"ARM_TENANT_ID",
"ACTIONS_ID_TOKEN_REQUEST_URL",
"ACTIONS_ID_TOKEN_REQUEST_TOKEN",
"ARM_ENVIRONMENT",
"DATABRICKS_AZURE_LOGIN_APP_ID",
"DATABRICKS_CLIENT_ID",
"DATABRICKS_CLIENT_SECRET",
"DATABRICKS_CLI_PATH",
"DATABRICKS_AUTH_TYPE",
"DATABRICKS_DEBUG_TRUNCATE_BYTES",
"DATABRICKS_DEBUG_HEADERS",
"DATABRICKS_RATE_LIMIT",
}
out := EnvVars()
assert.Equal(t, expected, out)
}

View File

@ -11,6 +11,8 @@ import (
// Dereference [os.Stat] to allow mocking in tests.
var statFunc = os.Stat
const EnvVarName = "DATABRICKS_RUNTIME_VERSION"
// detect returns true if the current process is running on a Databricks Runtime.
// Its return value is meant to be cached in the context.
func detect(ctx context.Context) bool {
@ -21,7 +23,7 @@ func detect(ctx context.Context) bool {
}
// Databricks Runtime always has the DATABRICKS_RUNTIME_VERSION environment variable set.
if value, ok := env.Lookup(ctx, "DATABRICKS_RUNTIME_VERSION"); !ok || value == "" {
if value, ok := env.Lookup(ctx, EnvVarName); !ok || value == "" {
return false
}

25
libs/telemetry/context.go Normal file
View File

@ -0,0 +1,25 @@
package telemetry
import (
"context"
"errors"
)
// Private type to store the telemetry logger in the context
type telemetryLogger int
// Key to store the telemetry logger in the context
var telemetryLoggerKey telemetryLogger
func WithNewLogger(ctx context.Context) context.Context {
return context.WithValue(ctx, telemetryLoggerKey, &logger{})
}
func fromContext(ctx context.Context) *logger {
v := ctx.Value(telemetryLoggerKey)
if v == nil {
panic(errors.New("telemetry logger not found in the context"))
}
return v.(*logger)
}

50
libs/telemetry/logger.go Normal file
View File

@ -0,0 +1,50 @@
package telemetry
import (
"context"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/google/uuid"
)
func Log(ctx context.Context, event protos.DatabricksCliLog) {
fromContext(ctx).log(event)
}
func GetLogs(ctx context.Context) []protos.FrontendLog {
return fromContext(ctx).getLogs()
}
func HasLogs(ctx context.Context) bool {
return len(fromContext(ctx).getLogs()) > 0
}
func SetExecutionContext(ctx context.Context, ec protos.ExecutionContext) {
fromContext(ctx).setExecutionContext(ec)
}
type logger struct {
logs []protos.FrontendLog
}
func (l *logger) log(event protos.DatabricksCliLog) {
if l.logs == nil {
l.logs = make([]protos.FrontendLog, 0)
}
l.logs = append(l.logs, protos.FrontendLog{
FrontendLogEventID: uuid.New().String(),
Entry: protos.FrontendLogEntry{
DatabricksCliLog: event,
},
})
}
func (l *logger) getLogs() []protos.FrontendLog {
return l.logs
}
func (l *logger) setExecutionContext(ec protos.ExecutionContext) {
for i := range l.logs {
l.logs[i].Entry.DatabricksCliLog.ExecutionContext = &ec
}
}

View File

@ -24,10 +24,12 @@ type ExecutionContext struct {
FromWebTerminal bool `json:"from_web_terminal,omitempty"`
// Time taken for the CLI command to execute.
ExecutionTimeMs int64 `json:"execution_time_ms,omitempty"`
// We want to serialize the zero value as well so the omitempty tag is not set.
ExecutionTimeMs int64 `json:"execution_time_ms"`
// Exit code of the CLI command.
ExitCode int64 `json:"exit_code,omitempty"`
// We want to serialize the zero value as well so the omitempty tag is not set.
ExitCode int64 `json:"exit_code"`
}
type CliTestEvent struct {

99
libs/telemetry/upload.go Normal file
View File

@ -0,0 +1,99 @@
package telemetry
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"time"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/config"
)
const (
// File containing debug logs from the upload process.
UploadLogsFileEnvVar = "DATABRICKS_CLI_TELEMETRY_UPLOAD_LOGS_FILE"
// File containing the PID of the telemetry upload process.
PidFileEnvVar = "DATABRICKS_CLI_TELEMETRY_PID_FILE"
)
type UploadConfig struct {
Logs []protos.FrontendLog `json:"logs"`
}
// Upload reads telemetry logs from stdin and uploads them to the telemetry endpoint.
// This function is always expected to be called in a separate child process from
// the main CLI process.
func Upload() (*ResponseBody, error) {
var err error
b, err := io.ReadAll(os.Stdin)
if err != nil {
return nil, fmt.Errorf("failed to read from stdin: %s\n", err)
}
in := UploadConfig{}
err = json.Unmarshal(b, &in)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal input: %s\n", err)
}
if len(in.Logs) == 0 {
return nil, fmt.Errorf("No logs to upload: %s\n", err)
}
protoLogs := make([]string, len(in.Logs))
for i, log := range in.Logs {
b, err := json.Marshal(log)
if err != nil {
return nil, fmt.Errorf("failed to marshal log: %s\n", err)
}
protoLogs[i] = string(b)
}
// Parent process is responsible for setting environment variables to
// configure authentication.
apiClient, err := client.New(&config.Config{})
if err != nil {
return nil, fmt.Errorf("Failed to create API client: %s\n", err)
}
// Set a maximum total time to try telemetry uploads.
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
resp := &ResponseBody{}
for {
select {
case <-ctx.Done():
return nil, errors.New("Failed to flush telemetry log due to timeout")
default:
// Proceed
}
// Log the CLI telemetry events.
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, nil, RequestBody{
UploadTime: time.Now().UnixMilli(),
Items: []string{},
ProtoLogs: protoLogs,
}, resp)
if err != nil {
return nil, fmt.Errorf("Failed to upload telemetry logs: %s\n", err)
}
if len(resp.Errors) > 0 {
return nil, fmt.Errorf("Failed to upload telemetry logs: %s\n", resp.Errors)
}
if resp.NumProtoSuccess == int64(len(in.Logs)) {
return resp, nil
}
}
}

View File

@ -0,0 +1,85 @@
package telemetry
import (
"encoding/json"
"net/http"
"os"
"path/filepath"
"testing"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/databricks/cli/libs/testserver"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestTelemetryUpload(t *testing.T) {
server := testserver.New(t)
t.Cleanup(server.Close)
count := 0
server.Handle("POST /telemetry-ext", func(_ *testserver.FakeWorkspace, req *http.Request) (resp any, statusCode int) {
count++
if count == 1 {
return ResponseBody{
NumProtoSuccess: 1,
}, http.StatusOK
}
if count == 2 {
return ResponseBody{
NumProtoSuccess: 2,
}, http.StatusOK
}
return nil, http.StatusInternalServerError
})
t.Setenv("DATABRICKS_HOST", server.URL)
t.Setenv("DATABRICKS_TOKEN", "token")
logs := []protos.FrontendLog{
{
FrontendLogEventID: uuid.New().String(),
Entry: protos.FrontendLogEntry{
DatabricksCliLog: protos.DatabricksCliLog{
CliTestEvent: &protos.CliTestEvent{Name: protos.DummyCliEnumValue1},
},
},
},
{
FrontendLogEventID: uuid.New().String(),
Entry: protos.FrontendLogEntry{
DatabricksCliLog: protos.DatabricksCliLog{
CliTestEvent: &protos.CliTestEvent{Name: protos.DummyCliEnumValue2},
},
},
},
}
processIn := UploadConfig{
Logs: logs,
}
b, err := json.Marshal(processIn)
require.NoError(t, err)
tmpDir := t.TempDir()
testutil.WriteFile(t, filepath.Join(tmpDir, "stdin"), string(b))
fd, err := os.OpenFile(filepath.Join(tmpDir, "stdin"), os.O_RDONLY, 0o644)
require.NoError(t, err)
// Redirect stdin to the file containing the telemetry logs.
old := os.Stdin
os.Stdin = fd
t.Cleanup(func() {
fd.Close()
os.Stdin = old
})
resp, err := Upload()
require.NoError(t, err)
assert.Equal(t, int64(2), resp.NumProtoSuccess)
assert.Equal(t, 2, count)
}

39
main.go
View File

@ -2,14 +2,53 @@ package main
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"github.com/databricks/cli/cmd"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/telemetry"
)
func main() {
ctx := context.Background()
// Uploading telemetry data spawns a new process. We handle this separately
// from the rest of the CLI commands.
// This is done because [root.Execute] spawns a new process to run the
// "telemetry upload" command if there are logs to be uploaded. Having this outside
// of [root.Execute] ensures that the telemetry upload process is not spawned
// infinitely in a recursive manner.
if len(os.Args) == 3 && os.Args[1] == "telemetry" && os.Args[2] == "upload" {
var err error
// By default, this command should not write anything to stdout or stderr.
outW := io.Discard
errW := io.Discard
// If the environment variable is set, redirect stdout to the file.
// This is useful for testing.
if v := os.Getenv(telemetry.UploadLogsFileEnvVar); v != "" {
outW, _ = os.OpenFile(v, os.O_CREATE|os.O_WRONLY, 0o644)
errW = outW
}
resp, err := telemetry.Upload()
if err != nil {
fmt.Fprintf(errW, "error: %s\n", err)
os.Exit(1)
}
fmt.Fprintf(outW, "Telemetry logs uploaded successfully\n")
fmt.Fprintln(outW, "Response:")
b, err := json.Marshal(resp)
if err == nil {
fmt.Fprintln(outW, string(b))
}
os.Exit(0)
}
err := root.Execute(ctx, cmd.New(ctx))
if err != nil {
os.Exit(1)