[WIP] In process telemetry logger

This commit is contained in:
Shreyas Goenka 2025-01-22 22:03:13 +01:00
parent ab10720027
commit 3964d8d454
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
33 changed files with 1020 additions and 535 deletions

View File

@ -71,7 +71,7 @@ func TestAccept(t *testing.T) {
cloudEnv := os.Getenv("CLOUD_ENV") cloudEnv := os.Getenv("CLOUD_ENV")
if cloudEnv == "" { if cloudEnv == "" {
server := StartServer(t) server := testutil.StartServer(t)
AddHandlers(server) AddHandlers(server)
// Redirect API access to local server: // Redirect API access to local server:
t.Setenv("DATABRICKS_HOST", server.URL) t.Setenv("DATABRICKS_HOST", server.URL)

View File

@ -1,73 +1,16 @@
package acceptance_test package acceptance_test
import ( import (
"encoding/json"
"net/http" "net/http"
"net/http/httptest"
"testing"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/workspace" "github.com/databricks/databricks-sdk-go/service/workspace"
) )
type TestServer struct { func AddHandlers(server *testutil.Server) {
*httptest.Server
Mux *http.ServeMux
}
type HandlerFunc func(r *http.Request) (any, error)
func NewTestServer() *TestServer {
mux := http.NewServeMux()
server := httptest.NewServer(mux)
return &TestServer{
Server: server,
Mux: mux,
}
}
func (s *TestServer) Handle(pattern string, handler HandlerFunc) {
s.Mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
resp, err := handler(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
var respBytes []byte
respString, ok := resp.(string)
if ok {
respBytes = []byte(respString)
} else {
respBytes, err = json.MarshalIndent(resp, "", " ")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
if _, err := w.Write(respBytes); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
})
}
func StartServer(t *testing.T) *TestServer {
server := NewTestServer()
t.Cleanup(func() {
server.Close()
})
return server
}
func AddHandlers(server *TestServer) {
server.Handle("/api/2.0/policies/clusters/list", func(r *http.Request) (any, error) { server.Handle("/api/2.0/policies/clusters/list", func(r *http.Request) (any, error) {
return compute.ListPoliciesResponse{ return compute.ListPoliciesResponse{
Policies: []compute.Policy{ Policies: []compute.Policy{

View File

@ -230,6 +230,10 @@ func (b *Bundle) GetSyncIncludePatterns(ctx context.Context) ([]string, error) {
return append(b.Config.Sync.Include, filepath.ToSlash(filepath.Join(internalDirRel, "*.*"))), nil return append(b.Config.Sync.Include, filepath.ToSlash(filepath.Join(internalDirRel, "*.*"))), nil
} }
// TODO: Add end to end tests that the Environment variables returned by the
// AuthEnv function are correct + the config set in the context is fully resolved
// (instead of just being the input).
// AuthEnv returns a map with environment variables and their values // AuthEnv returns a map with environment variables and their values
// derived from the workspace client configuration that was resolved // derived from the workspace client configuration that was resolved
// in the context of this bundle. // in the context of this bundle.

View File

@ -13,6 +13,8 @@ import (
"github.com/databricks/cli/cmd/labs" "github.com/databricks/cli/cmd/labs"
"github.com/databricks/cli/cmd/root" "github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/cmd/sync" "github.com/databricks/cli/cmd/sync"
sendtestevent "github.com/databricks/cli/cmd/telemetry/send_test_event"
"github.com/databricks/cli/cmd/telemetry/worker"
"github.com/databricks/cli/cmd/version" "github.com/databricks/cli/cmd/version"
"github.com/databricks/cli/cmd/workspace" "github.com/databricks/cli/cmd/workspace"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -75,5 +77,9 @@ func New(ctx context.Context) *cobra.Command {
cli.AddCommand(sync.New()) cli.AddCommand(sync.New())
cli.AddCommand(version.New()) cli.AddCommand(version.New())
// TODO: Move these under the telemetry subcommand?
cli.AddCommand(worker.New())
cli.AddCommand(sendtestevent.New())
return cli return cli
} }

View File

@ -18,7 +18,10 @@ import (
var ( var (
workspaceClient int workspaceClient int
accountClient int accountClient int
configUsed int
// TODO: Does the config used have the resolved configuration? Like has the
// profile been loaded?
configUsed int
) )
type ErrNoWorkspaceProfiles struct { type ErrNoWorkspaceProfiles struct {
@ -334,6 +337,8 @@ func AccountClient(ctx context.Context) *databricks.AccountClient {
func ConfigUsed(ctx context.Context) *config.Config { func ConfigUsed(ctx context.Context) *config.Config {
cfg, ok := ctx.Value(&configUsed).(*config.Config) cfg, ok := ctx.Value(&configUsed).(*config.Config)
if !ok { if !ok {
// todo: remove this, just for testing.
return &config.Config{}
panic("cannot get *config.Config. Please report it as a bug") panic("cannot get *config.Config. Please report it as a bug")
} }
return cfg return cfg

View File

@ -0,0 +1,63 @@
package root
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"github.com/databricks/cli/libs/dbr"
)
// TODO: Split this into a separate PR and add a test.
func isWebTerminal(ctx context.Context) bool {
if !dbr.RunsOnRuntime(ctx) {
return false
}
cur := os.Getpid()
// Max number of ancestors to check for trying to detect if the process is
// running in a web terminal (i.e. launched by ttyd).
maxHeight := 10
for range maxHeight {
// If the pid is a 0 or 1, we are at the root of the process tree.
if cur == 0 || cur == 1 {
return false
}
// Read the name of the current process
b, err := os.ReadFile(fmt.Sprintf("/proc/%d/comm", cur))
if err != nil {
return false
}
// If the name for any of the parent processes is ttyd, then the
// CLI has been run from the web terminal.
if strings.TrimSpace(string(b)) == "ttyd" {
return true
}
// The 4th field in /proc/<pid>/stat is the parent pid.
b, err = os.ReadFile(fmt.Sprintf("/proc/%d/stat", cur))
if err != nil {
return false
}
stat := strings.Split(string(b), " ")
if len(stat) < 4 {
return false
}
v, err := strconv.Atoi(stat[3])
if err != nil {
return false
}
cur = v
}
return false
}

View File

@ -2,16 +2,24 @@ package root
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"log/slog" "log/slog"
"os" "os"
"os/exec"
"runtime"
"strings" "strings"
"time"
"github.com/databricks/cli/internal/build" "github.com/databricks/cli/internal/build"
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/dbr" "github.com/databricks/cli/libs/dbr"
"github.com/databricks/cli/libs/env"
"github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/telemetry"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -73,9 +81,6 @@ func New(ctx context.Context) *cobra.Command {
// get the context back // get the context back
ctx = cmd.Context() ctx = cmd.Context()
// Detect if the CLI is running on DBR and store this on the context.
ctx = dbr.DetectRuntime(ctx)
// Configure our user agent with the command that's about to be executed. // Configure our user agent with the command that's about to be executed.
ctx = withCommandInUserAgent(ctx, cmd) ctx = withCommandInUserAgent(ctx, cmd)
ctx = withCommandExecIdInUserAgent(ctx) ctx = withCommandExecIdInUserAgent(ctx)
@ -94,32 +99,147 @@ func flagErrorFunc(c *cobra.Command, err error) error {
return fmt.Errorf("%w\n\n%s", err, c.UsageString()) return fmt.Errorf("%w\n\n%s", err, c.UsageString())
} }
// TODO CONTINUE: This setup should mostly work. There are a couple of open questions:
// 4. I can print the output from the telemetry-worker command and a waiting mode
// to the root.Execution method here to see whether the expected output matches.
// Execute adds all child commands to the root command and sets flags appropriately. // Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd. // This is called by main.main(). It only needs to happen once to the rootCmd.
// TODO: The test runner also relies on this function. Create a separate function to
// avoid logging telemetry in our testcli runner.
func Execute(ctx context.Context, cmd *cobra.Command) error { func Execute(ctx context.Context, cmd *cobra.Command) error {
// TODO: deferred panic recovery ctx = telemetry.WithNewLogger(ctx)
ctx = dbr.DetectRuntime(ctx)
start := time.Now()
// Run the command // Run the command
cmd, err := cmd.ExecuteContextC(ctx) cmd, cmdErr := cmd.ExecuteContextC(ctx)
if err != nil && !errors.Is(err, ErrAlreadyPrinted) { if cmdErr != nil && !errors.Is(cmdErr, ErrAlreadyPrinted) {
// If cmdio logger initialization succeeds, then this function logs with the // If cmdio logger initialization succeeds, then this function logs with the
// initialized cmdio logger, otherwise with the default cmdio logger // initialized cmdio logger, otherwise with the default cmdio logger
cmdio.LogError(cmd.Context(), err) cmdio.LogError(cmd.Context(), cmdErr)
} }
// Log exit status and error // Log exit status and error
// We only log if logger initialization succeeded and is stored in command // We only log if logger initialization succeeded and is stored in command
// context // context
if logger, ok := log.FromContext(cmd.Context()); ok { if logger, ok := log.FromContext(cmd.Context()); ok {
if err == nil { if cmdErr == nil {
logger.Info("completed execution", logger.Info("completed execution",
slog.String("exit_code", "0")) slog.String("exit_code", "0"))
} else { } else {
logger.Error("failed execution", logger.Error("failed execution",
slog.String("exit_code", "1"), slog.String("exit_code", "1"),
slog.String("error", err.Error())) slog.String("error", cmdErr.Error()))
} }
} }
return err end := time.Now()
exitCode := 0
if cmdErr != nil {
exitCode = 1
}
if env.Get(ctx, telemetry.SkipEnvVar) != "true" {
logTelemetry(ctx, commandString(cmd), start, end, exitCode)
}
return cmdErr
}
// TODO: Do not log for integration tests using the CLI.
// TODO: Skip telemetry if the credentials are invalid.
func logTelemetry(ctx context.Context, cmdStr string, start, end time.Time, exitCode int) {
telemetry.SetExecutionContext(ctx, protos.ExecutionContext{
CmdExecId: cmdExecId,
Version: build.GetInfo().Version,
Command: cmdStr,
OperatingSystem: runtime.GOOS,
DbrVersion: env.Get(ctx, dbr.EnvVarName),
FromWebTerminal: isWebTerminal(ctx),
ExecutionTimeMs: end.Sub(start).Milliseconds(),
ExitCode: int64(exitCode),
})
// TODO: Better check?
// Do not log telemetry for the telemetry-worker command to avoid fork bombs.
if cmdStr == "telemetry-worker" {
return
}
execPath, err := os.Executable()
if err != nil {
log.Debugf(ctx, "failed to get executable path: %s", err)
}
telemetryCmd := exec.Command(execPath, "telemetry-worker")
// TODO: Add test that ensures that the context key for cli commands stores a
// resolved auth configuration.
// TODO: Add test that the worker inherits the environment variables from the
// parent process.
in := telemetry.WorkerInput{
AuthConfig: ConfigUsed(ctx),
Logs: telemetry.GetLogs(ctx),
}
if len(in.Logs) == 0 {
return
}
b, err := json.Marshal(in)
if err != nil {
log.Debugf(ctx, "failed to marshal telemetry logs: %s", err)
return
}
stdin, err := telemetryCmd.StdinPipe()
if err != nil {
log.Debugf(ctx, "failed to create stdin pipe for telemetry worker: %s", err)
}
stdout, err := telemetryCmd.StdoutPipe()
if err != nil {
log.Debugf(ctx, "failed to create stdout pipe for telemetry worker: %s", err)
}
err = telemetryCmd.Start()
if err != nil {
log.Debugf(ctx, "failed to start telemetry worker: %s", err)
return
}
// Set DATABRICKS_CLI_SKIP_TELEMETRY to true to ensure that the telemetry worker
// command accidentally does not call itself causing a fork bomb. This can happen
// if a change starts logging telemetry in the telemetry worker command's code
// path.
telemetryCmd.Env = os.Environ()
telemetryCmd.Env = append(telemetryCmd.Env, telemetry.SkipEnvVar+"=true")
_, err = stdin.Write(b)
if err != nil {
log.Debugf(ctx, "failed to write to telemetry worker: %s", err)
}
err = stdin.Close()
if err != nil {
log.Debugf(ctx, "failed to close stdin for telemetry worker: %s", err)
}
// This is only meant for testing purposes, to do assertions on the output
// of the telemetry worker command.
if env.Get(ctx, telemetry.BlockOnUploadEnvVar) == "true" {
err = telemetryCmd.Wait()
if err != nil {
log.Debugf(ctx, "failed to wait for telemetry worker: %s", err)
}
cmdio.LogString(ctx, "telemetry-worker output:")
b, err := io.ReadAll(stdout)
if err != nil {
log.Debugf(ctx, "failed to read telemetry worker output: %s", err)
}
cmdio.LogString(ctx, string(b))
}
} }

View File

@ -7,8 +7,10 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
) )
var cmdExecId = uuid.New().String()
func withCommandExecIdInUserAgent(ctx context.Context) context.Context { func withCommandExecIdInUserAgent(ctx context.Context) context.Context {
// A UUID that will allow us to correlate multiple API requests made by // A UUID that will allow us to correlate multiple API requests made by
// the same CLI invocation. // the same CLI invocation.
return useragent.InContext(ctx, "cmd-exec-id", uuid.New().String()) return useragent.InContext(ctx, "cmd-exec-id", cmdExecId)
} }

View File

@ -0,0 +1,30 @@
package sendtestevent
import (
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/telemetry"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/spf13/cobra"
)
func New() *cobra.Command {
cmd := &cobra.Command{
Use: "send-test-event",
Short: "Send a test telemetry event to Databricks",
Hidden: true,
PreRunE: root.MustWorkspaceClient,
}
cmd.RunE = func(cmd *cobra.Command, args []string) error {
for _, v := range []string{"VALUE1", "VALUE2", "VALUE3"} {
telemetry.Log(cmd.Context(), protos.DatabricksCliLog{
CliTestEvent: &protos.CliTestEvent{
Name: protos.DummyCliEnum(v),
},
})
}
return nil
}
return cmd
}

View File

@ -0,0 +1,136 @@
package worker
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/telemetry"
"github.com/spf13/cobra"
"github.com/databricks/databricks-sdk-go/client"
)
// TODO CONTINUE:
// 2. Add end to end integration tests and mocked tests for telemetry upload.
// 3. Verify that tha auth configuration is resolved. Enforce that somehow?
// TODO: What happens here with OAuth? This then would end up spawning a new process
// to resolve the auth token. Options:
// 1. Check in with miles that this is fine.
// 2. See if we can directly pass a token with enough lifetime left to this
// worker process.
// 3. Right before spawning the child process make sure to refresh the token.
// TODO: Print errors to stderr and assert in tests that the stderr is empty.
// We need to spawn a separate process to upload telemetry logs in order to avoid
// increasing the latency of CLI commands.
//
// TODO: Add check to ensure this does not become a fork bomb. Maybe a unit test
// as well.
func New() *cobra.Command {
cmd := &cobra.Command{
Use: "telemetry-worker",
Short: "Upload telemetry logs from stdin to Databricks",
Args: root.NoArgs,
Hidden: true,
}
cmd.RunE = func(cmd *cobra.Command, args []string) error {
fmt.Printf("Running telemetry worker\n")
b, err := io.ReadAll(cmd.InOrStdin())
if err != nil {
return fmt.Errorf("failed to read from stdin: %s\n", err)
}
in := telemetry.WorkerInput{}
err = json.Unmarshal(b, &in)
if err != nil {
return fmt.Errorf("failed to unmarshal input: %s\n", err)
}
fmt.Printf("worker input: %#v\n", in)
logs := in.Logs
// No logs to upload.
if len(logs) == 0 {
return fmt.Errorf("No logs to upload: %s\n", err)
}
// The API expects logs to be JSON strings. Serialize the logs to a string
// to be set in the request body.
var protoLogs []string
for _, event := range logs {
s, err := json.Marshal(event)
if err != nil {
return err
}
protoLogs = append(protoLogs, string(s))
}
apiClient, err := client.New(in.AuthConfig)
if err != nil {
return fmt.Errorf("Failed to create API client: %s\n", err)
}
// Set a maximum total time to try telemetry uploads.
ctx, cancel := context.WithTimeout(cmd.Context(), 60*time.Second)
defer cancel()
resp := &telemetry.ResponseBody{}
for {
select {
case <-ctx.Done():
return errors.New("Failed to flush telemetry log due to timeout")
default:
// Proceed
}
// Log the CLI telemetry events.
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, telemetry.RequestBody{
UploadTime: time.Now().Unix(),
ProtoLogs: protoLogs,
// A bug in the telemetry API requires us to send an empty items array.
// Otherwise we get an opaque 500 internal server error.
// TODO: Do I need to do this even though "omitempty" is not set?
Items: []string{},
}, resp)
if err != nil {
// The SDK automatically performs retries for 429s and 503s. Thus if we
// see an error here, do not retry logging the telemetry.
return fmt.Errorf("Error making the API request to /telemetry-ext: %v", err)
}
// If not all the logs were successfully sent, we'll retry and log everything
// again.
//
// Note: This will result in server side duplications but that's fine since
// we can always deduplicate in the data pipeline itself.
if len(logs) > int(resp.NumProtoSuccess) {
continue
}
// TODO: Add an integration acceptance test for this.
fmt.Println("Successfully flushed telemetry events")
b, err := json.Marshal(resp)
if err != nil {
return fmt.Errorf("Failed to marshal response: %s\n", err)
}
fmt.Println("Response: ", string(b))
return nil
}
}
return cmd
}

View File

@ -0,0 +1,94 @@
package worker
import (
"bytes"
"encoding/json"
"net/http"
"testing"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/telemetry"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/databricks/databricks-sdk-go/config"
"github.com/stretchr/testify/require"
)
func TestTelemetryWorker(t *testing.T) {
server := testutil.StartServer(t)
count := int64(0)
server.Handle("POST /telemetry-ext", func(r *http.Request) (any, error) {
// auth token should be set.
require.Equal(t, "Bearer foobar", r.Header.Get("Authorization"))
// reqBody should contain all the logs.
reqBody := telemetry.RequestBody{}
err := json.NewDecoder(r.Body).Decode(&reqBody)
require.NoError(t, err)
require.Equal(t, []string{
"{\"frontend_log_event_id\":\"aaaa\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE1\"}}}}",
"{\"frontend_log_event_id\":\"bbbb\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}",
"{\"frontend_log_event_id\":\"cccc\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE3\"}}}}",
}, reqBody.ProtoLogs)
count++
return telemetry.ResponseBody{
NumProtoSuccess: count,
}, nil
})
in := telemetry.WorkerInput{
Logs: []protos.FrontendLog{
{
FrontendLogEventID: "aaaa",
Entry: protos.FrontendLogEntry{
DatabricksCliLog: protos.DatabricksCliLog{
CliTestEvent: &protos.CliTestEvent{
Name: protos.DummyCliEnumValue1,
},
},
},
},
{
FrontendLogEventID: "bbbb",
Entry: protos.FrontendLogEntry{
DatabricksCliLog: protos.DatabricksCliLog{
CliTestEvent: &protos.CliTestEvent{
Name: protos.DummyCliEnumValue2,
},
},
},
},
{
FrontendLogEventID: "cccc",
Entry: protos.FrontendLogEntry{
DatabricksCliLog: protos.DatabricksCliLog{
CliTestEvent: &protos.CliTestEvent{
Name: protos.DummyCliEnumValue3,
},
},
},
},
},
AuthConfig: &config.Config{
Host: server.URL,
Token: "foobar",
},
}
inBytes, err := json.Marshal(in)
require.NoError(t, err)
stdinReader := bytes.NewReader(inBytes)
cmd := New()
cmd.SetIn(stdinReader)
cmd.SetArgs([]string{})
err = cmd.Execute()
require.NoError(t, err)
// Telemetry worker should retry until all logs are uploaded.
require.Equal(t, int64(3), count)
}

Binary file not shown.

View File

@ -1,91 +1,232 @@
package telemetry_test package telemetry_test
import ( import (
"context" "encoding/json"
"fmt"
"net/http" "net/http"
"reflect" "os/exec"
"path/filepath"
"testing" "testing"
"time" "time"
"github.com/databricks/cli/integration/internal/acc" "github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/telemetry" "github.com/databricks/cli/libs/telemetry"
"github.com/databricks/cli/libs/telemetry/protos" "github.com/databricks/cli/libs/telemetry/protos"
"github.com/databricks/databricks-sdk-go/client"
"github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
// Wrapper to capture the response from the API client since that's not directly // // Wrapper to capture the response from the API client since that's not directly
// accessible from the logger. // // accessible from the logger.
type apiClientWrapper struct { // type apiClientWrapper struct {
response *telemetry.ResponseBody // response *telemetry.ResponseBody
apiClient *client.DatabricksClient // apiClient *client.DatabricksClient
} // }
// §
// func (wrapper *apiClientWrapper) Do(ctx context.Context, method, path string,
// headers map[string]string, request, response any,
// visitors ...func(*http.Request) error,
// ) error {
// err := wrapper.apiClient.Do(ctx, method, path, headers, request, response, visitors...)
// wrapper.response = response.(*telemetry.ResponseBody)
// return err
// }
func (wrapper *apiClientWrapper) Do(ctx context.Context, method, path string, // func TestTelemetryLogger(t *testing.T) {
headers map[string]string, request, response any, // events := []telemetry.DatabricksCliLog{
visitors ...func(*http.Request) error, // {
) error { // CliTestEvent: &protos.CliTestEvent{
err := wrapper.apiClient.Do(ctx, method, path, headers, request, response, visitors...) // Name: protos.DummyCliEnumValue1,
wrapper.response = response.(*telemetry.ResponseBody) // },
return err // },
} // {
// BundleInitEvent: &protos.BundleInitEvent{
// Uuid: uuid.New().String(),
// TemplateName: "abc",
// TemplateEnumArgs: []protos.BundleInitTemplateEnumArg{
// {
// Key: "a",
// Value: "b",
// },
// {
// Key: "c",
// Value: "d",
// },
// },
// },
// },
// }
func TestTelemetryLogger(t *testing.T) { // assert.Equal(t, len(events), reflect.TypeOf(telemetry.DatabricksCliLog{}).NumField(),
events := []telemetry.DatabricksCliLog{ // "Number of events should match the number of fields in DatabricksCliLog. Please add a new event to this test.")
{
CliTestEvent: &protos.CliTestEvent{
Name: protos.DummyCliEnumValue1,
},
},
{
BundleInitEvent: &protos.BundleInitEvent{
Uuid: uuid.New().String(),
TemplateName: "abc",
TemplateEnumArgs: []protos.BundleInitTemplateEnumArg{
{
Key: "a",
Value: "b",
},
{
Key: "c",
Value: "d",
},
},
},
},
}
assert.Equal(t, len(events), reflect.TypeOf(telemetry.DatabricksCliLog{}).NumField(), // ctx, w := acc.WorkspaceTest(t)
"Number of events should match the number of fields in DatabricksCliLog. Please add a new event to this test.") // ctx = telemetry.WithDefaultLogger(ctx)
ctx, w := acc.WorkspaceTest(t) // // Extend the maximum wait time for the telemetry flush just for this test.
ctx = telemetry.WithDefaultLogger(ctx) // oldV := telemetry.MaxAdditionalWaitTime
// telemetry.MaxAdditionalWaitTime = 1 * time.Hour
// t.Cleanup(func() {
// telemetry.MaxAdditionalWaitTime = oldV
// })
// Extend the maximum wait time for the telemetry flush just for this test. // for _, event := range events {
oldV := telemetry.MaxAdditionalWaitTime // telemetry.Log(ctx, event)
telemetry.MaxAdditionalWaitTime = 1 * time.Hour // }
t.Cleanup(func() {
telemetry.MaxAdditionalWaitTime = oldV // apiClient, err := client.New(w.W.Config)
// require.NoError(t, err)
// // Flush the events.
// wrapper := &apiClientWrapper{
// apiClient: apiClient,
// }
// telemetry.Flush(ctx, wrapper)
// // Assert that the events were logged.
// assert.Equal(t, telemetry.ResponseBody{
// NumProtoSuccess: int64(len(events)),
// Errors: []telemetry.LogError{},
// }, *wrapper.response)
// }
func TestTelemetryCliPassesAuthCredentials(t *testing.T) {
server := testutil.StartServer(t)
count := int64(0)
server.Handle("POST /telemetry-ext", func(r *http.Request) (any, error) {
// auth token should be set. Since the telemetry-worker command does not
// load the profile, the token must have been passed explicitly.
require.Equal(t, "Bearer mytoken", r.Header.Get("Authorization"))
// reqBody should contain all the logs.
reqBody := telemetry.RequestBody{}
err := json.NewDecoder(r.Body).Decode(&reqBody)
require.NoError(t, err)
logs := []protos.FrontendLog{}
for _, log := range reqBody.ProtoLogs {
var l protos.FrontendLog
err := json.Unmarshal([]byte(log), &l)
require.NoError(t, err)
logs = append(logs, l)
}
assert.Len(t, logs, 3)
assert.Equal(t, protos.DummyCliEnum("VALUE1"), logs[0].Entry.DatabricksCliLog.CliTestEvent.Name)
assert.Equal(t, protos.DummyCliEnum("VALUE2"), logs[1].Entry.DatabricksCliLog.CliTestEvent.Name)
assert.Equal(t, protos.DummyCliEnum("VALUE3"), logs[2].Entry.DatabricksCliLog.CliTestEvent.Name)
count++
// TODO: Add (or keep) the API testing tests that ensure that the telemetry API is working correctly.
return telemetry.ResponseBody{
NumProtoSuccess: count,
}, nil
}) })
for _, event := range events { // Setup databrickscfg file.
telemetry.Log(ctx, event) tmpDir := t.TempDir()
} testutil.WriteFile(t, filepath.Join(tmpDir, ".databrickscfg"), fmt.Sprintf(`
[myprofile]
host = %s
token = mytoken`, server.URL))
t.Setenv("DATABRICKS_CONFIG_FILE", filepath.Join(tmpDir, ".databrickscfg"))
t.Setenv("DATABRICKS_CONFIG_PROFILE", "myprofile")
apiClient, err := client.New(w.W.Config) execPath := testutil.BuildCLI(t)
cmd := exec.Command(execPath, "send-test-event")
err := cmd.Run()
require.NoError(t, err) require.NoError(t, err)
// Flush the events. assert.Eventually(t, func() bool {
wrapper := &apiClientWrapper{ return count == 3
apiClient: apiClient, }, 10*time.Second, 1*time.Second)
} }
telemetry.Flush(ctx, wrapper)
func TestTelemetry(t *testing.T) {
// Assert that the events were logged. server := testutil.StartServer(t)
assert.Equal(t, telemetry.ResponseBody{ count := int64(0)
NumProtoSuccess: int64(len(events)),
Errors: []telemetry.LogError{}, server.Handle("POST /telemetry-ext", func(r *http.Request) (any, error) {
}, *wrapper.response) // auth token should be set.
require.Equal(t, "Bearer foobar", r.Header.Get("Authorization"))
// reqBody should contain all the logs.
reqBody := telemetry.RequestBody{}
err := json.NewDecoder(r.Body).Decode(&reqBody)
require.NoError(t, err)
logs := []protos.FrontendLog{}
for _, log := range reqBody.ProtoLogs {
var l protos.FrontendLog
err := json.Unmarshal([]byte(log), &l)
require.NoError(t, err)
logs = append(logs, l)
}
assert.Len(t, logs, 3)
assert.Equal(t, protos.DummyCliEnum("VALUE1"), logs[0].Entry.DatabricksCliLog.CliTestEvent.Name)
assert.Equal(t, protos.DummyCliEnum("VALUE2"), logs[1].Entry.DatabricksCliLog.CliTestEvent.Name)
assert.Equal(t, protos.DummyCliEnum("VALUE3"), logs[2].Entry.DatabricksCliLog.CliTestEvent.Name)
count++
// TODO: Add (or keep) the API testing tests that ensure that the telemetry API is working correctly.
return telemetry.ResponseBody{
NumProtoSuccess: count,
}, nil
})
// TODO: Also see how much extra time does spawning a process take?
t.Setenv("DATABRICKS_HOST", server.URL)
t.Setenv("DATABRICKS_TOKEN", "foobar")
execPath := testutil.BuildCLI(t)
cmd := exec.Command(execPath, "send-test-event")
err := cmd.Run()
require.NoError(t, err)
assert.Eventually(t, func() bool {
return count == 3
}, 10*time.Second, 1*time.Second)
}
func TestTelemetryDoesNotBlock(t *testing.T) {
server := testutil.StartServer(t)
count := int64(0)
fire := make(chan struct{})
server.Handle("POST /telemetry-ext", func(r *http.Request) (any, error) {
// Block until the channel is closed.
<-fire
require.Equal(t, "Bearer foobar", r.Header.Get("Authorization"))
count++
return telemetry.ResponseBody{
NumProtoSuccess: 3,
}, nil
})
t.Setenv("DATABRICKS_HOST", server.URL)
t.Setenv("DATABRICKS_TOKEN", "foobar")
execPath := testutil.BuildCLI(t)
cmd := exec.Command(execPath, "send-test-event")
err := cmd.Run()
require.NoError(t, err)
// No API calls should have been made yet. Even though the main process has
// finished, the telemetry worker should be running in the background.
assert.Equal(t, int64(0), count)
// Close the channel to allow the API call to go through.
close(fire)
assert.Eventually(t, func() bool {
return count == 1
}, 10*time.Second, 1*time.Second)
} }

Binary file not shown.

View File

@ -0,0 +1,55 @@
package testutil
import (
"os/exec"
"path/filepath"
"runtime"
"time"
"github.com/databricks/cli/libs/folders"
"github.com/stretchr/testify/require"
)
func BuildCLI(t TestingT, flags ...string) string {
repoRoot, err := folders.FindDirWithLeaf(".", ".git")
require.NoError(t, err)
// Stable path for the CLI binary. This ensures fast builds and cache reuse.
execPath := filepath.Join(repoRoot, "internal", "testutil", "build", "databricks")
if runtime.GOOS == "windows" {
execPath += ".exe"
}
start := time.Now()
args := []string{
"go", "build",
"-mod", "vendor",
"-o", execPath,
}
if len(flags) > 0 {
args = append(args, flags...)
}
if runtime.GOOS == "windows" {
// Get this error on my local Windows:
// error obtaining VCS status: exit status 128
// Use -buildvcs=false to disable VCS stamping.
args = append(args, "-buildvcs=false")
}
cmd := exec.Command(args[0], args[1:]...)
cmd.Dir = repoRoot
out, err := cmd.CombinedOutput()
elapsed := time.Since(start)
t.Logf("%s took %s", args, elapsed)
require.NoError(t, err, "go build failed: %s: %s\n%s", args, err, out)
if len(out) > 0 {
t.Logf("go build output: %s: %s", args, out)
}
// Quick check + warm up cache:
cmd = exec.Command(execPath, "--version")
out, err = cmd.CombinedOutput()
require.NoError(t, err, "%s --version failed: %s\n%s", execPath, err, out)
return execPath
}

View File

@ -0,0 +1,63 @@
package testutil
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
type Server struct {
*httptest.Server
Mux *http.ServeMux
}
type HandlerFunc func(r *http.Request) (any, error)
func NewServer() *Server {
mux := http.NewServeMux()
server := httptest.NewServer(mux)
return &Server{
Server: server,
Mux: mux,
}
}
func (s *Server) Handle(pattern string, handler HandlerFunc) {
s.Mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
resp, err := handler(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
var respBytes []byte
respString, ok := resp.(string)
if ok {
respBytes = []byte(respString)
} else {
respBytes, err = json.MarshalIndent(resp, "", " ")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
if _, err := w.Write(respBytes); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
})
}
func StartServer(t *testing.T) *Server {
server := NewServer()
t.Cleanup(func() {
server.Close()
})
return server
}

26
libs/auth/env.go Normal file
View File

@ -0,0 +1,26 @@
package auth
import "github.com/databricks/databricks-sdk-go/config"
// Env generates the authentication environment variables we need to set for
// downstream applications from the CLI to work correctly.
func Env(cfg *config.Config) map[string]string {
out := make(map[string]string)
for _, attr := range config.ConfigAttributes {
// Ignore profile so that downstream tools don't try and reload
// the profile. We know the current configuration is already valid since
// otherwise the CLI would have thrown an error when loading it.
if attr.Name == "profile" {
continue
}
if len(attr.EnvVars) == 0 {
continue
}
if attr.IsZero(cfg) {
continue
}
out[attr.EnvVars[0]] = attr.GetString(cfg)
}
return out
}

View File

@ -11,6 +11,8 @@ import (
// Dereference [os.Stat] to allow mocking in tests. // Dereference [os.Stat] to allow mocking in tests.
var statFunc = os.Stat var statFunc = os.Stat
const EnvVarName = "DATABRICKS_RUNTIME_VERSION"
// detect returns true if the current process is running on a Databricks Runtime. // detect returns true if the current process is running on a Databricks Runtime.
// Its return value is meant to be cached in the context. // Its return value is meant to be cached in the context.
func detect(ctx context.Context) bool { func detect(ctx context.Context) bool {
@ -21,7 +23,7 @@ func detect(ctx context.Context) bool {
} }
// Databricks Runtime always has the DATABRICKS_RUNTIME_VERSION environment variable set. // Databricks Runtime always has the DATABRICKS_RUNTIME_VERSION environment variable set.
if value, ok := env.Lookup(ctx, "DATABRICKS_RUNTIME_VERSION"); !ok || value == "" { if value, ok := env.Lookup(ctx, EnvVarName); !ok || value == "" {
return false return false
} }

View File

@ -1,5 +1,10 @@
package telemetry package telemetry
import (
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/databricks/databricks-sdk-go/config"
)
// RequestBody is the request body type bindings for the /telemetry-ext API endpoint. // RequestBody is the request body type bindings for the /telemetry-ext API endpoint.
type RequestBody struct { type RequestBody struct {
UploadTime int64 `json:"uploadTime"` UploadTime int64 `json:"uploadTime"`
@ -17,3 +22,8 @@ type LogError struct {
Message string `json:"message"` Message string `json:"message"`
ErrorType string `json:"ErrorType"` ErrorType string `json:"ErrorType"`
} }
type WorkerInput struct {
AuthConfig *config.Config `json:"authConfig"`
Logs []protos.FrontendLog `json:"logs"`
}

View File

@ -11,52 +11,15 @@ type telemetryLogger int
// Key to store the telemetry logger in the context // Key to store the telemetry logger in the context
var telemetryLoggerKey telemetryLogger var telemetryLoggerKey telemetryLogger
func WithDefaultLogger(ctx context.Context) context.Context { func WithNewLogger(ctx context.Context) context.Context {
v := ctx.Value(telemetryLoggerKey) return context.WithValue(ctx, telemetryLoggerKey, &logger{})
// If no logger is set in the context, set the default logger.
if v == nil {
nctx := context.WithValue(ctx, telemetryLoggerKey, &defaultLogger{})
return nctx
}
switch v.(type) {
case *defaultLogger:
panic(fmt.Errorf("default telemetry logger already set in the context: %T", v))
case *mockLogger:
// Do nothing. Unit and integration tests set the mock logger in the context
// to avoid making actual API calls. Thus WithDefaultLogger should silently
// ignore the mock logger.
default:
panic(fmt.Errorf("unexpected telemetry logger type: %T", v))
}
return ctx
} }
// WithMockLogger sets a mock telemetry logger in the context. It overrides the func fromContext(ctx context.Context) *logger {
// default logger if it is already set in the context.
func WithMockLogger(ctx context.Context) context.Context {
v := ctx.Value(telemetryLoggerKey)
if v != nil {
panic(fmt.Errorf("telemetry logger already set in the context: %T", v))
}
return context.WithValue(ctx, telemetryLoggerKey, &mockLogger{})
}
func fromContext(ctx context.Context) Logger {
v := ctx.Value(telemetryLoggerKey) v := ctx.Value(telemetryLoggerKey)
if v == nil { if v == nil {
panic(fmt.Errorf("telemetry logger not found in the context")) panic(fmt.Errorf("telemetry logger not found in the context"))
} }
switch vv := v.(type) { return v.(*logger)
case *defaultLogger:
return vv
case *mockLogger:
return vv
default:
panic(fmt.Errorf("unexpected telemetry logger type: %T", v))
}
} }

View File

@ -1,77 +0,0 @@
package telemetry
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func TestWithDefaultLogger(t *testing.T) {
ctx := context.Background()
// No default logger set
ctx1 := WithDefaultLogger(ctx)
assert.Equal(t, &defaultLogger{}, ctx1.Value(telemetryLoggerKey))
// Default logger already set
assert.PanicsWithError(t, "default telemetry logger already set in the context: *telemetry.defaultLogger", func() {
WithDefaultLogger(ctx1)
})
// Mock logger already set
ctx2 := WithMockLogger(ctx)
assert.NotPanics(t, func() {
WithDefaultLogger(ctx2)
})
// Unexpected logger type
type foobar struct{}
ctx3 := context.WithValue(ctx, telemetryLoggerKey, &foobar{})
assert.PanicsWithError(t, "unexpected telemetry logger type: *telemetry.foobar", func() {
WithDefaultLogger(ctx3)
})
}
func TestWithMockLogger(t *testing.T) {
ctx := context.Background()
// No logger set
ctx1 := WithMockLogger(ctx)
assert.Equal(t, &mockLogger{}, ctx1.Value(telemetryLoggerKey))
// Logger already set
assert.PanicsWithError(t, "telemetry logger already set in the context: *telemetry.mockLogger", func() {
WithMockLogger(ctx1)
})
// Default logger already set
ctx2 := WithDefaultLogger(ctx)
assert.PanicsWithError(t, "telemetry logger already set in the context: *telemetry.defaultLogger", func() {
WithMockLogger(ctx2)
})
}
func TestFromContext(t *testing.T) {
ctx := context.Background()
// No logger set
assert.PanicsWithError(t, "telemetry logger not found in the context", func() {
fromContext(ctx)
})
// Default logger set
ctx1 := WithDefaultLogger(ctx)
assert.Equal(t, &defaultLogger{}, fromContext(ctx1))
// Mock logger set
ctx2 := WithMockLogger(ctx)
assert.Equal(t, &mockLogger{}, fromContext(ctx2))
// Unexpected logger type
type foobar struct{}
ctx3 := context.WithValue(ctx, telemetryLoggerKey, &foobar{})
assert.PanicsWithError(t, "unexpected telemetry logger type: *telemetry.foobar", func() {
fromContext(ctx3)
})
}

View File

@ -2,138 +2,52 @@ package telemetry
import ( import (
"context" "context"
"encoding/json"
"net/http"
"time"
"github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/telemetry/protos"
"github.com/google/uuid" "github.com/google/uuid"
) )
// Interface abstraction created to mock out the Databricks client for testing. const SkipEnvVar = "DATABRICKS_CLI_SKIP_TELEMETRY"
type DatabricksApiClient interface {
Do(ctx context.Context, method, path string, // DATABRICKS_CLI_BLOCK_ON_TELEMETRY_UPLOAD is an environment variable that can be set
headers map[string]string, request, response any, // to make the CLI process block until the telemetry logs are uploaded.
visitors ...func(*http.Request) error) error // Only used for testing.
const BlockOnUploadEnvVar = "DATABRICKS_CLI_BLOCK_ON_TELEMETRY_UPLOAD"
func Log(ctx context.Context, event protos.DatabricksCliLog) {
fromContext(ctx).log(event)
} }
type Logger interface { func GetLogs(ctx context.Context) []protos.FrontendLog {
// Record a telemetry event, to be flushed later. return fromContext(ctx).getLogs()
Log(event DatabricksCliLog)
// Flush all the telemetry events that have been logged so far. We expect
// this to be called once per CLI command for the default logger.
Flush(ctx context.Context, apiClient DatabricksApiClient)
// This function is meant to be only to be used in tests to introspect
// the telemetry logs that have been logged so far.
Introspect() []DatabricksCliLog
} }
type defaultLogger struct { func SetExecutionContext(ctx context.Context, ec protos.ExecutionContext) {
logs []FrontendLog fromContext(ctx).setExecutionContext(ec)
} }
func (l *defaultLogger) Log(event DatabricksCliLog) { type logger struct {
logs []protos.FrontendLog
}
func (l *logger) log(event protos.DatabricksCliLog) {
if l.logs == nil { if l.logs == nil {
l.logs = make([]FrontendLog, 0) l.logs = make([]protos.FrontendLog, 0)
} }
l.logs = append(l.logs, FrontendLog{ l.logs = append(l.logs, protos.FrontendLog{
// The telemetry endpoint deduplicates logs based on the FrontendLogEventID.
// This it's important to generate a unique ID for each log event.
FrontendLogEventID: uuid.New().String(), FrontendLogEventID: uuid.New().String(),
Entry: FrontendLogEntry{ Entry: protos.FrontendLogEntry{
DatabricksCliLog: event, DatabricksCliLog: event,
}, },
}) })
} }
// Maximum additional time to wait for the telemetry event to flush. We expect the flush func (l *logger) getLogs() []protos.FrontendLog {
// method to be called when the CLI command is about to exit, so this caps the maximum return l.logs
// additional time the user will experience because of us logging CLI telemetry. }
var MaxAdditionalWaitTime = 3 * time.Second
// We make the API call to the /telemetry-ext endpoint to log the CLI telemetry events func (l *logger) setExecutionContext(ec protos.ExecutionContext) {
// right about as the CLI command is about to exit. The API endpoint can handle for i := range l.logs {
// payloads with ~1000 events easily. Thus we log all the events at once instead of l.logs[i].Entry.DatabricksCliLog.ExecutionContext = &ec
// batching the logs across multiple API calls.
func (l *defaultLogger) Flush(ctx context.Context, apiClient DatabricksApiClient) {
// Set a maximum time to wait for the telemetry event to flush.
ctx, cancel := context.WithTimeout(ctx, MaxAdditionalWaitTime)
defer cancel()
if len(l.logs) == 0 {
log.Debugf(ctx, "No telemetry events to flush")
return
}
var protoLogs []string
for _, event := range l.logs {
s, err := json.Marshal(event)
if err != nil {
log.Debugf(ctx, "Error marshalling the telemetry event %v: %v", event, err)
continue
}
protoLogs = append(protoLogs, string(s))
}
resp := &ResponseBody{}
for {
select {
case <-ctx.Done():
log.Debugf(ctx, "Timed out before flushing telemetry events")
return
default:
// Proceed
}
// Log the CLI telemetry events.
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{
UploadTime: time.Now().Unix(),
ProtoLogs: protoLogs,
// A bug in the telemetry API requires us to send an empty items array.
// Otherwise we get an opaque 500 internal server error.
Items: []string{},
}, resp)
if err != nil {
// The SDK automatically performs retries for 429s and 503s. Thus if we
// see an error here, do not retry logging the telemetry.
log.Debugf(ctx, "Error making the API request to /telemetry-ext: %v", err)
return
}
// If not all the logs were successfully sent, we'll retry and log everything
// again.
//
// Note: This will result in server side duplications but that's fine since
// we can always deduplicate in the data pipeline itself.
if len(l.logs) > int(resp.NumProtoSuccess) {
log.Debugf(ctx, "Not all logs were successfully sent. Retrying...")
continue
}
// All logs were successfully sent. We can exit the function.
log.Debugf(ctx, "Successfully flushed telemetry events")
return
} }
} }
func (l *defaultLogger) Introspect() []DatabricksCliLog {
panic("not implemented")
}
func Log(ctx context.Context, event DatabricksCliLog) {
l := fromContext(ctx)
l.Log(event)
}
func Flush(ctx context.Context, apiClient DatabricksApiClient) {
l := fromContext(ctx)
l.Flush(ctx, apiClient)
}
func Introspect(ctx context.Context) []DatabricksCliLog {
l := fromContext(ctx)
return l.Introspect()
}

View File

@ -1,113 +0,0 @@
package telemetry
import (
"context"
"math/rand"
"net/http"
"testing"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
type mockDatabricksClient struct {
numCalls int
t *testing.T
}
func (m *mockDatabricksClient) Do(ctx context.Context, method, path string, headers map[string]string, request, response any, visitors ...func(*http.Request) error) error {
// Block until the fire channel is fired.
m.numCalls++
assertRequestPayload := func(reqb RequestBody) {
expectedProtoLogs := []string{
"{\"frontend_log_event_id\":\"0194fdc2-fa2f-4cc0-81d3-ff12045b73c8\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE1\"}}}}",
"{\"frontend_log_event_id\":\"6e4ff95f-f662-45ee-a82a-bdf44a2d0b75\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}",
"{\"frontend_log_event_id\":\"fb180daf-48a7-4ee0-b10d-394651850fd4\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}",
"{\"frontend_log_event_id\":\"a178892e-e285-4ce1-9114-55780875d64e\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE3\"}}}}",
}
// Assert payload matches the expected payload.
assert.Equal(m.t, expectedProtoLogs, reqb.ProtoLogs)
}
switch m.numCalls {
case 1:
// The call is successful but not all events are successfully logged.
assertRequestPayload(request.(RequestBody))
*(response.(*ResponseBody)) = ResponseBody{
NumProtoSuccess: 3,
}
case 2:
// The call is successful and all events are successfully logged.
assertRequestPayload(request.(RequestBody))
*(response.(*ResponseBody)) = ResponseBody{
NumProtoSuccess: 4,
}
default:
panic("unexpected number of calls")
}
return nil
}
func TestTelemetryLoggerFlushesEvents(t *testing.T) {
mockClient := &mockDatabricksClient{
t: t,
}
// Set the random number generator to a fixed seed to ensure that the UUIDs are deterministic.
uuid.SetRand(rand.New(rand.NewSource(0)))
t.Cleanup(func() {
uuid.SetRand(nil)
})
ctx := WithDefaultLogger(context.Background())
for _, v := range []protos.DummyCliEnum{protos.DummyCliEnumValue1, protos.DummyCliEnumValue2, protos.DummyCliEnumValue2, protos.DummyCliEnumValue3} {
Log(ctx, DatabricksCliLog{
CliTestEvent: &protos.CliTestEvent{Name: v},
})
}
// Flush the events.
Flush(ctx, mockClient)
// Assert that the .Do method is called twice, because all logs were not
// successfully logged in the first call.
assert.Equal(t, 2, mockClient.numCalls)
}
func TestTelemetryLoggerFlushExitsOnTimeout(t *testing.T) {
// Set the maximum additional wait time to 0 to ensure that the Flush method times out immediately.
oldV := MaxAdditionalWaitTime
MaxAdditionalWaitTime = 0
t.Cleanup(func() {
MaxAdditionalWaitTime = oldV
})
mockClient := &mockDatabricksClient{
t: t,
}
// Set the random number generator to a fixed seed to ensure that the UUIDs are deterministic.
uuid.SetRand(rand.New(rand.NewSource(0)))
t.Cleanup(func() {
uuid.SetRand(nil)
})
ctx := WithDefaultLogger(context.Background())
for _, v := range []protos.DummyCliEnum{protos.DummyCliEnumValue1, protos.DummyCliEnumValue2, protos.DummyCliEnumValue2, protos.DummyCliEnumValue3} {
Log(ctx, DatabricksCliLog{
CliTestEvent: &protos.CliTestEvent{Name: v},
})
}
// Flush the events.
Flush(ctx, mockClient)
// Assert that the .Do method is never called since the timeout is set to 0.
assert.Equal(t, 0, mockClient.numCalls)
}

View File

@ -1,22 +0,0 @@
package telemetry
import "context"
type mockLogger struct {
events []DatabricksCliLog
}
func (l *mockLogger) Log(event DatabricksCliLog) {
if l.events == nil {
l.events = make([]DatabricksCliLog, 0)
}
l.events = append(l.events, event)
}
func (l *mockLogger) Flush(ctx context.Context, apiClient DatabricksApiClient) {
// Do nothing
}
func (l *mockLogger) Introspect() []DatabricksCliLog {
return l.events
}

View File

@ -1,3 +1,2 @@
The types in this package are equivalent to the lumberjack protos defined in Universe. The types in this package are equivalent to the lumberjack protos defined in Universe.
You can find all lumberjack protos for the Databricks CLI in the `proto/logs/frontend/databricks_cli` You can find all lumberjack protos for the Databricks CLI in the `proto/logs/frontend/databricks_cli` directory.
directory.

View File

@ -0,0 +1,77 @@
package protos
type BundleDeployEvent struct {
// UUID associated with the bundle itself. Set in the `bundle.uuid` field in the bundle configuration.
BundleUuid string `json:"bundle_uuid,omitempty"`
ResourceCount int64 `json:"resource_count,omitempty"`
ResourceJobCount int64 `json:"resource_job_count,omitempty"`
ResourcePipelineCount int64 `json:"resource_pipeline_count,omitempty"`
ResourceModelCount int64 `json:"resource_model_count,omitempty"`
ResourceExperimentCount int64 `json:"resource_experiment_count,omitempty"`
ResourceModelServingEndpointCount int64 `json:"resource_model_serving_endpoint_count,omitempty"`
ResourceRegisteredModelCount int64 `json:"resource_registered_model_count,omitempty"`
ResourceQualityMonitorCount int64 `json:"resource_quality_monitor_count,omitempty"`
ResourceSchemaCount int64 `json:"resource_schema_count,omitempty"`
ResourceVolumeCount int64 `json:"resource_volume_count,omitempty"`
ResourceClusterCount int64 `json:"resource_cluster_count,omitempty"`
ResourceDashboardCount int64 `json:"resource_dashboard_count,omitempty"`
ResourceAppCount int64 `json:"resource_app_count,omitempty"`
// IDs of resources managed by the bundle. Some resources like volumes or schemas
// do not expose a numerical or UUID identifier and are tracked by name. Those
// resources are not tracked here since the names are PII.
ResourceJobIds []string `json:"resource_job_ids,omitempty"`
ResourcePipelineIds []string `json:"resource_pipeline_ids,omitempty"`
ResourceClusterIds []string `json:"resource_cluster_ids,omitempty"`
ResourceDashboardIds []string `json:"resource_dashboard_ids,omitempty"`
Experimental *BundleDeployExperimental `json:"experimental,omitempty"`
}
// These metrics are experimental and are often added in an adhoc manner. There
// are no guarantees for these metrics and they maybe removed in the future without
// any notice.
type BundleDeployExperimental struct {
// Number of YAML (or JSON) configuration files in the bundle.
ConfigurationFileCount int64 `json:"configuration_file_count,omitempty"`
// Size in bytes of the Terraform state file
TerraformStateSizeBytes int64 `json:"terraform_state_size_bytes,omitempty"`
// Number of variables in the bundle
VariableCount int64 `json:"variable_count,omitempty"`
ComplexVariableCount int64 `json:"complex_variable_count,omitempty"`
LookupVariableCount int64 `json:"lookup_variable_count,omitempty"`
// Number of targets in the bundle
TargetCount int64 `json:"target_count,omitempty"`
// Whether a field is set or not. If a configuration field is not present in this
// map then it is not tracked by this field.
// Keys are the full path of the field in the configuration tree.
// Examples: "bundle.terraform.exec_path", "bundle.git.branch" etc.
SetFields []BoolMapEntry `json:"set_fields,omitempty"`
// Values for boolean configuration fields like `experimental.python_wheel_wrapper`
// We don't need to define protos to track boolean values and can simply write those
// values to this map to track them.
BoolValues []BoolMapEntry `json:"bool_values,omitempty"`
BundleMode BundleMode `json:"bundle_mode,omitempty"`
WorkspaceArtifactPathType BundleDeployArtifactPathType `json:"workspace_artifact_path_type,omitempty"`
// Execution time per mutator for a selected subset of mutators.
BundleMutatorExecutionTimeMs []IntMapEntry `json:"bundle_mutator_execution_time_ms,omitempty"`
}
type BoolMapEntry struct {
Key string `json:"key,omitempty"`
Value bool `json:"value,omitempty"`
}
type IntMapEntry struct {
Key string `json:"key,omitempty"`
Value int64 `json:"value,omitempty"`
}

View File

@ -1,12 +1,10 @@
package protos package protos
// Corresponds to the `DatabricksCliBundleInitEvent` proto message in `databricks_cli_log.proto`
// as of 20 Dec 2024.
type BundleInitEvent struct { type BundleInitEvent struct {
// UUID associated with the DAB itself. This is serialized into the DAB // UUID associated with the DAB itself. This is serialized into the DAB
// when a user runs `databricks bundle init` and all subsequent deployments of // when a user runs `databricks bundle init` and all subsequent deployments of
// that DAB can then be associated with this init event. // that DAB can then be associated with this init event.
Uuid string `json:"bundle_uuid,omitempty"` BundleUuid string `json:"bundle_uuid,omitempty"`
// Name of the template initialized when the user ran `databricks bundle init` // Name of the template initialized when the user ran `databricks bundle init`
// This is only populated when the template is a first party template like // This is only populated when the template is a first party template like

View File

@ -0,0 +1,35 @@
package protos
type ExecutionContext struct {
// UUID generated by the CLI for every CLI command run. This is also set in the HTTP user
// agent under the key "cmd-exec-id" and can be used to correlate frontend_log table
// with the http_access_log table.
CmdExecId string `json:"cmd_exec_id,omitempty"`
// Version of the Databricks CLI used.
Version string `json:"version,omitempty"`
// Command that was run by the user. Eg: bundle_deploy, fs_cp etc.
Command string `json:"command,omitempty"`
// Lowercase string name for the operating system. Same value
// as the one set in `runtime.GOOS` in Golang.
OperatingSystem string `json:"operating_system,omitempty"`
// Version of DBR from which CLI is being run.
// Only set when the CLI is being run from a Databricks cluster.
DbrVersion string `json:"dbr_version,omitempty"`
// If true, the CLI is being run from a Databricks notebook / cluster web terminal.
FromWebTerminal bool `json:"from_web_terminal,omitempty"`
// Time taken for the CLI command to execute.
ExecutionTimeMs int64 `json:"execution_time_ms,omitempty"`
// Exit code of the CLI command.
ExitCode int64 `json:"exit_code,omitempty"`
}
type CliTestEvent struct {
Name DummyCliEnum `json:"name,omitempty"`
}

View File

@ -0,0 +1,26 @@
package protos
type DummyCliEnum string
const (
DummyCliEnumUnspecified DummyCliEnum = "DUMMY_CLI_ENUM_UNSPECIFIED"
DummyCliEnumValue1 DummyCliEnum = "VALUE1"
DummyCliEnumValue2 DummyCliEnum = "VALUE2"
DummyCliEnumValue3 DummyCliEnum = "VALUE3"
)
type BundleMode string
const (
BundleModeUnspecified BundleMode = "TYPE_UNSPECIFIED"
BundleModeDevelopment BundleMode = "DEVELOPMENT"
BundleModeProduction BundleMode = "PRODUCTION"
)
type BundleDeployArtifactPathType string
const (
BundleDeployArtifactPathTypeUnspecified BundleDeployArtifactPathType = "TYPE_UNSPECIFIED"
BundleDeployArtifactPathTypeWorkspace BundleDeployArtifactPathType = "WORKSPACE_FILE_SYSTEM"
BundleDeployArtifactPathTypeVolume BundleDeployArtifactPathType = "UC_VOLUME"
)

View File

@ -1,10 +1,7 @@
package telemetry package protos
import "github.com/databricks/cli/libs/telemetry/protos"
// This corresponds to the FrontendLog lumberjack proto in universe. // This corresponds to the FrontendLog lumberjack proto in universe.
// FrontendLog is the top-level struct for any client-side logs at Databricks // FrontendLog is the top-level struct for any client-side logs at Databricks.
// regardless of whether they are generated from the CLI or the web UI.
type FrontendLog struct { type FrontendLog struct {
// A unique identifier for the log event generated from the CLI. // A unique identifier for the log event generated from the CLI.
FrontendLogEventID string `json:"frontend_log_event_id,omitempty"` FrontendLogEventID string `json:"frontend_log_event_id,omitempty"`
@ -17,6 +14,9 @@ type FrontendLogEntry struct {
} }
type DatabricksCliLog struct { type DatabricksCliLog struct {
CliTestEvent *protos.CliTestEvent `json:"cli_test_event,omitempty"` ExecutionContext *ExecutionContext `json:"execution_context,omitempty"`
BundleInitEvent *protos.BundleInitEvent `json:"bundle_init_event,omitempty"`
CliTestEvent *CliTestEvent `json:"cli_test_event,omitempty"`
BundleInitEvent *BundleInitEvent `json:"bundle_init_event,omitempty"`
BundleDeplyEvent *BundleDeployEvent `json:"bundle_deploy_event,omitempty"`
} }

View File

@ -1,16 +0,0 @@
package protos
// dummy event for testing the telemetry pipeline. Corresponds to `DatabricksCliTestEvent`
// proto in `databricks_cli_log.proto` as of 20 Dec 2024.
type CliTestEvent struct {
Name DummyCliEnum `json:"name,omitempty"`
}
type DummyCliEnum string
const (
DummyCliEnumUnspecified DummyCliEnum = "DUMMY_CLI_ENUM_UNSPECIFIED"
DummyCliEnumValue1 DummyCliEnum = "VALUE1"
DummyCliEnumValue2 DummyCliEnum = "VALUE2"
DummyCliEnumValue3 DummyCliEnum = "VALUE3"
)

1
test.txt Normal file
View File

@ -0,0 +1 @@
databricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricks