diff --git a/acceptance/acceptance_test.go b/acceptance/acceptance_test.go index 5f1181313..f0cb15c94 100644 --- a/acceptance/acceptance_test.go +++ b/acceptance/acceptance_test.go @@ -71,7 +71,7 @@ func TestAccept(t *testing.T) { cloudEnv := os.Getenv("CLOUD_ENV") if cloudEnv == "" { - server := StartServer(t) + server := testutil.StartServer(t) AddHandlers(server) // Redirect API access to local server: t.Setenv("DATABRICKS_HOST", server.URL) diff --git a/acceptance/server_test.go b/acceptance/server_test.go index 0d10fbea1..4f46d0689 100644 --- a/acceptance/server_test.go +++ b/acceptance/server_test.go @@ -1,73 +1,16 @@ package acceptance_test import ( - "encoding/json" "net/http" - "net/http/httptest" - "testing" + "github.com/databricks/cli/internal/testutil" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/workspace" ) -type TestServer struct { - *httptest.Server - Mux *http.ServeMux -} - -type HandlerFunc func(r *http.Request) (any, error) - -func NewTestServer() *TestServer { - mux := http.NewServeMux() - server := httptest.NewServer(mux) - - return &TestServer{ - Server: server, - Mux: mux, - } -} - -func (s *TestServer) Handle(pattern string, handler HandlerFunc) { - s.Mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { - resp, err := handler(r) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - - var respBytes []byte - - respString, ok := resp.(string) - if ok { - respBytes = []byte(respString) - } else { - respBytes, err = json.MarshalIndent(resp, "", " ") - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } - - if _, err := w.Write(respBytes); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - }) -} - -func StartServer(t *testing.T) *TestServer { - server := NewTestServer() - t.Cleanup(func() { - server.Close() - }) - return server -} - -func AddHandlers(server *TestServer) { +func AddHandlers(server *testutil.Server) { server.Handle("/api/2.0/policies/clusters/list", func(r *http.Request) (any, error) { return compute.ListPoliciesResponse{ Policies: []compute.Policy{ diff --git a/bundle/bundle.go b/bundle/bundle.go index 3bf4ffb62..66cba4c7b 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -230,6 +230,10 @@ func (b *Bundle) GetSyncIncludePatterns(ctx context.Context) ([]string, error) { return append(b.Config.Sync.Include, filepath.ToSlash(filepath.Join(internalDirRel, "*.*"))), nil } +// TODO: Add end to end tests that the Environment variables returned by the +// AuthEnv function are correct + the config set in the context is fully resolved +// (instead of just being the input). + // AuthEnv returns a map with environment variables and their values // derived from the workspace client configuration that was resolved // in the context of this bundle. diff --git a/cmd/cmd.go b/cmd/cmd.go index 5d835409f..a7ee977c3 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -13,6 +13,8 @@ import ( "github.com/databricks/cli/cmd/labs" "github.com/databricks/cli/cmd/root" "github.com/databricks/cli/cmd/sync" + sendtestevent "github.com/databricks/cli/cmd/telemetry/send_test_event" + "github.com/databricks/cli/cmd/telemetry/worker" "github.com/databricks/cli/cmd/version" "github.com/databricks/cli/cmd/workspace" "github.com/spf13/cobra" @@ -75,5 +77,9 @@ func New(ctx context.Context) *cobra.Command { cli.AddCommand(sync.New()) cli.AddCommand(version.New()) + // TODO: Move these under the telemetry subcommand? + cli.AddCommand(worker.New()) + cli.AddCommand(sendtestevent.New()) + return cli } diff --git a/cmd/root/auth.go b/cmd/root/auth.go index 49abfd414..43f9808c4 100644 --- a/cmd/root/auth.go +++ b/cmd/root/auth.go @@ -18,7 +18,10 @@ import ( var ( workspaceClient int accountClient int - configUsed int + + // TODO: Does the config used have the resolved configuration? Like has the + // profile been loaded? + configUsed int ) type ErrNoWorkspaceProfiles struct { @@ -334,6 +337,8 @@ func AccountClient(ctx context.Context) *databricks.AccountClient { func ConfigUsed(ctx context.Context) *config.Config { cfg, ok := ctx.Value(&configUsed).(*config.Config) if !ok { + // todo: remove this, just for testing. + return &config.Config{} panic("cannot get *config.Config. Please report it as a bug") } return cfg diff --git a/cmd/root/is_web_terminal.go b/cmd/root/is_web_terminal.go new file mode 100644 index 000000000..33b563d42 --- /dev/null +++ b/cmd/root/is_web_terminal.go @@ -0,0 +1,63 @@ +package root + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + + "github.com/databricks/cli/libs/dbr" +) + +// TODO: Split this into a separate PR and add a test. +func isWebTerminal(ctx context.Context) bool { + if !dbr.RunsOnRuntime(ctx) { + return false + } + + cur := os.Getpid() + + // Max number of ancestors to check for trying to detect if the process is + // running in a web terminal (i.e. launched by ttyd). + maxHeight := 10 + + for range maxHeight { + // If the pid is a 0 or 1, we are at the root of the process tree. + if cur == 0 || cur == 1 { + return false + } + + // Read the name of the current process + b, err := os.ReadFile(fmt.Sprintf("/proc/%d/comm", cur)) + if err != nil { + return false + } + + // If the name for any of the parent processes is ttyd, then the + // CLI has been run from the web terminal. + if strings.TrimSpace(string(b)) == "ttyd" { + return true + } + + // The 4th field in /proc//stat is the parent pid. + b, err = os.ReadFile(fmt.Sprintf("/proc/%d/stat", cur)) + if err != nil { + return false + } + + stat := strings.Split(string(b), " ") + if len(stat) < 4 { + return false + } + + v, err := strconv.Atoi(stat[3]) + if err != nil { + return false + } + + cur = v + } + + return false +} diff --git a/cmd/root/root.go b/cmd/root/root.go index 3b37d0176..eabf0c993 100644 --- a/cmd/root/root.go +++ b/cmd/root/root.go @@ -2,16 +2,24 @@ package root import ( "context" + "encoding/json" "errors" "fmt" + "io" "log/slog" "os" + "os/exec" + "runtime" "strings" + "time" "github.com/databricks/cli/internal/build" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/dbr" + "github.com/databricks/cli/libs/env" "github.com/databricks/cli/libs/log" + "github.com/databricks/cli/libs/telemetry" + "github.com/databricks/cli/libs/telemetry/protos" "github.com/spf13/cobra" ) @@ -73,9 +81,6 @@ func New(ctx context.Context) *cobra.Command { // get the context back ctx = cmd.Context() - // Detect if the CLI is running on DBR and store this on the context. - ctx = dbr.DetectRuntime(ctx) - // Configure our user agent with the command that's about to be executed. ctx = withCommandInUserAgent(ctx, cmd) ctx = withCommandExecIdInUserAgent(ctx) @@ -94,32 +99,147 @@ func flagErrorFunc(c *cobra.Command, err error) error { return fmt.Errorf("%w\n\n%s", err, c.UsageString()) } +// TODO CONTINUE: This setup should mostly work. There are a couple of open questions: +// 4. I can print the output from the telemetry-worker command and a waiting mode +// to the root.Execution method here to see whether the expected output matches. + // Execute adds all child commands to the root command and sets flags appropriately. // This is called by main.main(). It only needs to happen once to the rootCmd. + +// TODO: The test runner also relies on this function. Create a separate function to +// avoid logging telemetry in our testcli runner. func Execute(ctx context.Context, cmd *cobra.Command) error { - // TODO: deferred panic recovery + ctx = telemetry.WithNewLogger(ctx) + ctx = dbr.DetectRuntime(ctx) + start := time.Now() // Run the command - cmd, err := cmd.ExecuteContextC(ctx) - if err != nil && !errors.Is(err, ErrAlreadyPrinted) { + cmd, cmdErr := cmd.ExecuteContextC(ctx) + if cmdErr != nil && !errors.Is(cmdErr, ErrAlreadyPrinted) { // If cmdio logger initialization succeeds, then this function logs with the // initialized cmdio logger, otherwise with the default cmdio logger - cmdio.LogError(cmd.Context(), err) + cmdio.LogError(cmd.Context(), cmdErr) } // Log exit status and error // We only log if logger initialization succeeded and is stored in command // context if logger, ok := log.FromContext(cmd.Context()); ok { - if err == nil { + if cmdErr == nil { logger.Info("completed execution", slog.String("exit_code", "0")) } else { logger.Error("failed execution", slog.String("exit_code", "1"), - slog.String("error", err.Error())) + slog.String("error", cmdErr.Error())) } } - return err + end := time.Now() + + exitCode := 0 + if cmdErr != nil { + exitCode = 1 + } + + if env.Get(ctx, telemetry.SkipEnvVar) != "true" { + logTelemetry(ctx, commandString(cmd), start, end, exitCode) + } + + return cmdErr +} + +// TODO: Do not log for integration tests using the CLI. +// TODO: Skip telemetry if the credentials are invalid. +func logTelemetry(ctx context.Context, cmdStr string, start, end time.Time, exitCode int) { + telemetry.SetExecutionContext(ctx, protos.ExecutionContext{ + CmdExecId: cmdExecId, + Version: build.GetInfo().Version, + Command: cmdStr, + OperatingSystem: runtime.GOOS, + DbrVersion: env.Get(ctx, dbr.EnvVarName), + FromWebTerminal: isWebTerminal(ctx), + ExecutionTimeMs: end.Sub(start).Milliseconds(), + ExitCode: int64(exitCode), + }) + + // TODO: Better check? + // Do not log telemetry for the telemetry-worker command to avoid fork bombs. + if cmdStr == "telemetry-worker" { + return + } + + execPath, err := os.Executable() + if err != nil { + log.Debugf(ctx, "failed to get executable path: %s", err) + } + telemetryCmd := exec.Command(execPath, "telemetry-worker") + + // TODO: Add test that ensures that the context key for cli commands stores a + // resolved auth configuration. + // TODO: Add test that the worker inherits the environment variables from the + // parent process. + in := telemetry.WorkerInput{ + AuthConfig: ConfigUsed(ctx), + Logs: telemetry.GetLogs(ctx), + } + + if len(in.Logs) == 0 { + return + } + + b, err := json.Marshal(in) + if err != nil { + log.Debugf(ctx, "failed to marshal telemetry logs: %s", err) + return + } + + stdin, err := telemetryCmd.StdinPipe() + if err != nil { + log.Debugf(ctx, "failed to create stdin pipe for telemetry worker: %s", err) + } + + stdout, err := telemetryCmd.StdoutPipe() + if err != nil { + log.Debugf(ctx, "failed to create stdout pipe for telemetry worker: %s", err) + } + + err = telemetryCmd.Start() + if err != nil { + log.Debugf(ctx, "failed to start telemetry worker: %s", err) + return + } + + // Set DATABRICKS_CLI_SKIP_TELEMETRY to true to ensure that the telemetry worker + // command accidentally does not call itself causing a fork bomb. This can happen + // if a change starts logging telemetry in the telemetry worker command's code + // path. + telemetryCmd.Env = os.Environ() + telemetryCmd.Env = append(telemetryCmd.Env, telemetry.SkipEnvVar+"=true") + + _, err = stdin.Write(b) + if err != nil { + log.Debugf(ctx, "failed to write to telemetry worker: %s", err) + } + + err = stdin.Close() + if err != nil { + log.Debugf(ctx, "failed to close stdin for telemetry worker: %s", err) + } + + // This is only meant for testing purposes, to do assertions on the output + // of the telemetry worker command. + if env.Get(ctx, telemetry.BlockOnUploadEnvVar) == "true" { + err = telemetryCmd.Wait() + if err != nil { + log.Debugf(ctx, "failed to wait for telemetry worker: %s", err) + } + + cmdio.LogString(ctx, "telemetry-worker output:") + b, err := io.ReadAll(stdout) + if err != nil { + log.Debugf(ctx, "failed to read telemetry worker output: %s", err) + } + cmdio.LogString(ctx, string(b)) + } } diff --git a/cmd/root/user_agent_command_exec_id.go b/cmd/root/user_agent_command_exec_id.go index 3bf32b703..e3416983d 100644 --- a/cmd/root/user_agent_command_exec_id.go +++ b/cmd/root/user_agent_command_exec_id.go @@ -7,8 +7,10 @@ import ( "github.com/google/uuid" ) +var cmdExecId = uuid.New().String() + func withCommandExecIdInUserAgent(ctx context.Context) context.Context { // A UUID that will allow us to correlate multiple API requests made by // the same CLI invocation. - return useragent.InContext(ctx, "cmd-exec-id", uuid.New().String()) + return useragent.InContext(ctx, "cmd-exec-id", cmdExecId) } diff --git a/cmd/telemetry/send_test_event/send_test_event.go b/cmd/telemetry/send_test_event/send_test_event.go new file mode 100644 index 000000000..d09d44ba0 --- /dev/null +++ b/cmd/telemetry/send_test_event/send_test_event.go @@ -0,0 +1,30 @@ +package sendtestevent + +import ( + "github.com/databricks/cli/cmd/root" + "github.com/databricks/cli/libs/telemetry" + "github.com/databricks/cli/libs/telemetry/protos" + "github.com/spf13/cobra" +) + +func New() *cobra.Command { + cmd := &cobra.Command{ + Use: "send-test-event", + Short: "Send a test telemetry event to Databricks", + Hidden: true, + PreRunE: root.MustWorkspaceClient, + } + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + for _, v := range []string{"VALUE1", "VALUE2", "VALUE3"} { + telemetry.Log(cmd.Context(), protos.DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnum(v), + }, + }) + } + return nil + } + + return cmd +} diff --git a/cmd/telemetry/worker/worker.go b/cmd/telemetry/worker/worker.go new file mode 100644 index 000000000..5cd4c5368 --- /dev/null +++ b/cmd/telemetry/worker/worker.go @@ -0,0 +1,136 @@ +package worker + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/databricks/cli/cmd/root" + "github.com/databricks/cli/libs/telemetry" + "github.com/spf13/cobra" + + "github.com/databricks/databricks-sdk-go/client" +) + +// TODO CONTINUE: +// 2. Add end to end integration tests and mocked tests for telemetry upload. +// 3. Verify that tha auth configuration is resolved. Enforce that somehow? + +// TODO: What happens here with OAuth? This then would end up spawning a new process +// to resolve the auth token. Options: +// 1. Check in with miles that this is fine. +// 2. See if we can directly pass a token with enough lifetime left to this +// worker process. +// 3. Right before spawning the child process make sure to refresh the token. + +// TODO: Print errors to stderr and assert in tests that the stderr is empty. + +// We need to spawn a separate process to upload telemetry logs in order to avoid +// increasing the latency of CLI commands. +// +// TODO: Add check to ensure this does not become a fork bomb. Maybe a unit test +// as well. +func New() *cobra.Command { + cmd := &cobra.Command{ + Use: "telemetry-worker", + Short: "Upload telemetry logs from stdin to Databricks", + Args: root.NoArgs, + Hidden: true, + } + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + fmt.Printf("Running telemetry worker\n") + + b, err := io.ReadAll(cmd.InOrStdin()) + if err != nil { + return fmt.Errorf("failed to read from stdin: %s\n", err) + } + + in := telemetry.WorkerInput{} + err = json.Unmarshal(b, &in) + if err != nil { + return fmt.Errorf("failed to unmarshal input: %s\n", err) + } + + fmt.Printf("worker input: %#v\n", in) + + logs := in.Logs + + // No logs to upload. + if len(logs) == 0 { + return fmt.Errorf("No logs to upload: %s\n", err) + } + + // The API expects logs to be JSON strings. Serialize the logs to a string + // to be set in the request body. + var protoLogs []string + for _, event := range logs { + s, err := json.Marshal(event) + if err != nil { + return err + } + + protoLogs = append(protoLogs, string(s)) + } + + apiClient, err := client.New(in.AuthConfig) + if err != nil { + return fmt.Errorf("Failed to create API client: %s\n", err) + } + + // Set a maximum total time to try telemetry uploads. + ctx, cancel := context.WithTimeout(cmd.Context(), 60*time.Second) + defer cancel() + + resp := &telemetry.ResponseBody{} + for { + select { + case <-ctx.Done(): + return errors.New("Failed to flush telemetry log due to timeout") + + default: + // Proceed + } + + // Log the CLI telemetry events. + err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, telemetry.RequestBody{ + UploadTime: time.Now().Unix(), + ProtoLogs: protoLogs, + + // A bug in the telemetry API requires us to send an empty items array. + // Otherwise we get an opaque 500 internal server error. + // TODO: Do I need to do this even though "omitempty" is not set? + Items: []string{}, + }, resp) + if err != nil { + // The SDK automatically performs retries for 429s and 503s. Thus if we + // see an error here, do not retry logging the telemetry. + return fmt.Errorf("Error making the API request to /telemetry-ext: %v", err) + } + + // If not all the logs were successfully sent, we'll retry and log everything + // again. + // + // Note: This will result in server side duplications but that's fine since + // we can always deduplicate in the data pipeline itself. + if len(logs) > int(resp.NumProtoSuccess) { + continue + } + + // TODO: Add an integration acceptance test for this. + fmt.Println("Successfully flushed telemetry events") + b, err := json.Marshal(resp) + if err != nil { + return fmt.Errorf("Failed to marshal response: %s\n", err) + } + fmt.Println("Response: ", string(b)) + return nil + } + } + + return cmd +} diff --git a/cmd/telemetry/worker/worker_test.go b/cmd/telemetry/worker/worker_test.go new file mode 100644 index 000000000..6203a2a9d --- /dev/null +++ b/cmd/telemetry/worker/worker_test.go @@ -0,0 +1,94 @@ +package worker + +import ( + "bytes" + "encoding/json" + "net/http" + "testing" + + "github.com/databricks/cli/internal/testutil" + "github.com/databricks/cli/libs/telemetry" + "github.com/databricks/cli/libs/telemetry/protos" + "github.com/databricks/databricks-sdk-go/config" + "github.com/stretchr/testify/require" +) + +func TestTelemetryWorker(t *testing.T) { + server := testutil.StartServer(t) + count := int64(0) + + server.Handle("POST /telemetry-ext", func(r *http.Request) (any, error) { + // auth token should be set. + require.Equal(t, "Bearer foobar", r.Header.Get("Authorization")) + + // reqBody should contain all the logs. + reqBody := telemetry.RequestBody{} + err := json.NewDecoder(r.Body).Decode(&reqBody) + require.NoError(t, err) + + require.Equal(t, []string{ + "{\"frontend_log_event_id\":\"aaaa\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE1\"}}}}", + "{\"frontend_log_event_id\":\"bbbb\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}", + "{\"frontend_log_event_id\":\"cccc\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE3\"}}}}", + }, reqBody.ProtoLogs) + + count++ + return telemetry.ResponseBody{ + NumProtoSuccess: count, + }, nil + }) + + in := telemetry.WorkerInput{ + Logs: []protos.FrontendLog{ + { + FrontendLogEventID: "aaaa", + Entry: protos.FrontendLogEntry{ + DatabricksCliLog: protos.DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnumValue1, + }, + }, + }, + }, + { + FrontendLogEventID: "bbbb", + Entry: protos.FrontendLogEntry{ + DatabricksCliLog: protos.DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnumValue2, + }, + }, + }, + }, + { + FrontendLogEventID: "cccc", + Entry: protos.FrontendLogEntry{ + DatabricksCliLog: protos.DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnumValue3, + }, + }, + }, + }, + }, + AuthConfig: &config.Config{ + Host: server.URL, + Token: "foobar", + }, + } + + inBytes, err := json.Marshal(in) + require.NoError(t, err) + + stdinReader := bytes.NewReader(inBytes) + + cmd := New() + cmd.SetIn(stdinReader) + cmd.SetArgs([]string{}) + + err = cmd.Execute() + require.NoError(t, err) + + // Telemetry worker should retry until all logs are uploaded. + require.Equal(t, int64(3), count) +} diff --git a/integration/libs/telemetry/__debug_bin2084705627 b/integration/libs/telemetry/__debug_bin2084705627 new file mode 100644 index 000000000..e69de29bb diff --git a/integration/libs/telemetry/build/databricks b/integration/libs/telemetry/build/databricks new file mode 100755 index 000000000..d37968e0f Binary files /dev/null and b/integration/libs/telemetry/build/databricks differ diff --git a/integration/libs/telemetry/telemetry_test.go b/integration/libs/telemetry/telemetry_test.go index cd9040422..6eb19751a 100644 --- a/integration/libs/telemetry/telemetry_test.go +++ b/integration/libs/telemetry/telemetry_test.go @@ -1,91 +1,232 @@ package telemetry_test import ( - "context" + "encoding/json" + "fmt" "net/http" - "reflect" + "os/exec" + "path/filepath" "testing" "time" - "github.com/databricks/cli/integration/internal/acc" + "github.com/databricks/cli/internal/testutil" "github.com/databricks/cli/libs/telemetry" "github.com/databricks/cli/libs/telemetry/protos" - "github.com/databricks/databricks-sdk-go/client" - "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// Wrapper to capture the response from the API client since that's not directly -// accessible from the logger. -type apiClientWrapper struct { - response *telemetry.ResponseBody - apiClient *client.DatabricksClient -} +// // Wrapper to capture the response from the API client since that's not directly +// // accessible from the logger. +// type apiClientWrapper struct { +// response *telemetry.ResponseBody +// apiClient *client.DatabricksClient +// } +// ยง +// func (wrapper *apiClientWrapper) Do(ctx context.Context, method, path string, +// headers map[string]string, request, response any, +// visitors ...func(*http.Request) error, +// ) error { +// err := wrapper.apiClient.Do(ctx, method, path, headers, request, response, visitors...) +// wrapper.response = response.(*telemetry.ResponseBody) +// return err +// } -func (wrapper *apiClientWrapper) Do(ctx context.Context, method, path string, - headers map[string]string, request, response any, - visitors ...func(*http.Request) error, -) error { - err := wrapper.apiClient.Do(ctx, method, path, headers, request, response, visitors...) - wrapper.response = response.(*telemetry.ResponseBody) - return err -} +// func TestTelemetryLogger(t *testing.T) { +// events := []telemetry.DatabricksCliLog{ +// { +// CliTestEvent: &protos.CliTestEvent{ +// Name: protos.DummyCliEnumValue1, +// }, +// }, +// { +// BundleInitEvent: &protos.BundleInitEvent{ +// Uuid: uuid.New().String(), +// TemplateName: "abc", +// TemplateEnumArgs: []protos.BundleInitTemplateEnumArg{ +// { +// Key: "a", +// Value: "b", +// }, +// { +// Key: "c", +// Value: "d", +// }, +// }, +// }, +// }, +// } -func TestTelemetryLogger(t *testing.T) { - events := []telemetry.DatabricksCliLog{ - { - CliTestEvent: &protos.CliTestEvent{ - Name: protos.DummyCliEnumValue1, - }, - }, - { - BundleInitEvent: &protos.BundleInitEvent{ - Uuid: uuid.New().String(), - TemplateName: "abc", - TemplateEnumArgs: []protos.BundleInitTemplateEnumArg{ - { - Key: "a", - Value: "b", - }, - { - Key: "c", - Value: "d", - }, - }, - }, - }, - } +// assert.Equal(t, len(events), reflect.TypeOf(telemetry.DatabricksCliLog{}).NumField(), +// "Number of events should match the number of fields in DatabricksCliLog. Please add a new event to this test.") - assert.Equal(t, len(events), reflect.TypeOf(telemetry.DatabricksCliLog{}).NumField(), - "Number of events should match the number of fields in DatabricksCliLog. Please add a new event to this test.") +// ctx, w := acc.WorkspaceTest(t) +// ctx = telemetry.WithDefaultLogger(ctx) - ctx, w := acc.WorkspaceTest(t) - ctx = telemetry.WithDefaultLogger(ctx) +// // Extend the maximum wait time for the telemetry flush just for this test. +// oldV := telemetry.MaxAdditionalWaitTime +// telemetry.MaxAdditionalWaitTime = 1 * time.Hour +// t.Cleanup(func() { +// telemetry.MaxAdditionalWaitTime = oldV +// }) - // Extend the maximum wait time for the telemetry flush just for this test. - oldV := telemetry.MaxAdditionalWaitTime - telemetry.MaxAdditionalWaitTime = 1 * time.Hour - t.Cleanup(func() { - telemetry.MaxAdditionalWaitTime = oldV +// for _, event := range events { +// telemetry.Log(ctx, event) +// } + +// apiClient, err := client.New(w.W.Config) +// require.NoError(t, err) + +// // Flush the events. +// wrapper := &apiClientWrapper{ +// apiClient: apiClient, +// } +// telemetry.Flush(ctx, wrapper) + +// // Assert that the events were logged. +// assert.Equal(t, telemetry.ResponseBody{ +// NumProtoSuccess: int64(len(events)), +// Errors: []telemetry.LogError{}, +// }, *wrapper.response) +// } + +func TestTelemetryCliPassesAuthCredentials(t *testing.T) { + server := testutil.StartServer(t) + count := int64(0) + + server.Handle("POST /telemetry-ext", func(r *http.Request) (any, error) { + // auth token should be set. Since the telemetry-worker command does not + // load the profile, the token must have been passed explicitly. + require.Equal(t, "Bearer mytoken", r.Header.Get("Authorization")) + + // reqBody should contain all the logs. + reqBody := telemetry.RequestBody{} + err := json.NewDecoder(r.Body).Decode(&reqBody) + require.NoError(t, err) + + logs := []protos.FrontendLog{} + for _, log := range reqBody.ProtoLogs { + var l protos.FrontendLog + err := json.Unmarshal([]byte(log), &l) + require.NoError(t, err) + + logs = append(logs, l) + } + + assert.Len(t, logs, 3) + assert.Equal(t, protos.DummyCliEnum("VALUE1"), logs[0].Entry.DatabricksCliLog.CliTestEvent.Name) + assert.Equal(t, protos.DummyCliEnum("VALUE2"), logs[1].Entry.DatabricksCliLog.CliTestEvent.Name) + assert.Equal(t, protos.DummyCliEnum("VALUE3"), logs[2].Entry.DatabricksCliLog.CliTestEvent.Name) + + count++ + + // TODO: Add (or keep) the API testing tests that ensure that the telemetry API is working correctly. + return telemetry.ResponseBody{ + NumProtoSuccess: count, + }, nil }) - for _, event := range events { - telemetry.Log(ctx, event) - } + // Setup databrickscfg file. + tmpDir := t.TempDir() + testutil.WriteFile(t, filepath.Join(tmpDir, ".databrickscfg"), fmt.Sprintf(` +[myprofile] +host = %s +token = mytoken`, server.URL)) + t.Setenv("DATABRICKS_CONFIG_FILE", filepath.Join(tmpDir, ".databrickscfg")) + t.Setenv("DATABRICKS_CONFIG_PROFILE", "myprofile") - apiClient, err := client.New(w.W.Config) + execPath := testutil.BuildCLI(t) + cmd := exec.Command(execPath, "send-test-event") + err := cmd.Run() require.NoError(t, err) - // Flush the events. - wrapper := &apiClientWrapper{ - apiClient: apiClient, - } - telemetry.Flush(ctx, wrapper) - - // Assert that the events were logged. - assert.Equal(t, telemetry.ResponseBody{ - NumProtoSuccess: int64(len(events)), - Errors: []telemetry.LogError{}, - }, *wrapper.response) + assert.Eventually(t, func() bool { + return count == 3 + }, 10*time.Second, 1*time.Second) +} + +func TestTelemetry(t *testing.T) { + server := testutil.StartServer(t) + count := int64(0) + + server.Handle("POST /telemetry-ext", func(r *http.Request) (any, error) { + // auth token should be set. + require.Equal(t, "Bearer foobar", r.Header.Get("Authorization")) + + // reqBody should contain all the logs. + reqBody := telemetry.RequestBody{} + err := json.NewDecoder(r.Body).Decode(&reqBody) + require.NoError(t, err) + + logs := []protos.FrontendLog{} + for _, log := range reqBody.ProtoLogs { + var l protos.FrontendLog + err := json.Unmarshal([]byte(log), &l) + require.NoError(t, err) + + logs = append(logs, l) + } + + assert.Len(t, logs, 3) + assert.Equal(t, protos.DummyCliEnum("VALUE1"), logs[0].Entry.DatabricksCliLog.CliTestEvent.Name) + assert.Equal(t, protos.DummyCliEnum("VALUE2"), logs[1].Entry.DatabricksCliLog.CliTestEvent.Name) + assert.Equal(t, protos.DummyCliEnum("VALUE3"), logs[2].Entry.DatabricksCliLog.CliTestEvent.Name) + + count++ + + // TODO: Add (or keep) the API testing tests that ensure that the telemetry API is working correctly. + return telemetry.ResponseBody{ + NumProtoSuccess: count, + }, nil + }) + + // TODO: Also see how much extra time does spawning a process take? + t.Setenv("DATABRICKS_HOST", server.URL) + t.Setenv("DATABRICKS_TOKEN", "foobar") + + execPath := testutil.BuildCLI(t) + cmd := exec.Command(execPath, "send-test-event") + err := cmd.Run() + require.NoError(t, err) + + assert.Eventually(t, func() bool { + return count == 3 + }, 10*time.Second, 1*time.Second) +} + +func TestTelemetryDoesNotBlock(t *testing.T) { + server := testutil.StartServer(t) + count := int64(0) + + fire := make(chan struct{}) + + server.Handle("POST /telemetry-ext", func(r *http.Request) (any, error) { + // Block until the channel is closed. + <-fire + + require.Equal(t, "Bearer foobar", r.Header.Get("Authorization")) + + count++ + return telemetry.ResponseBody{ + NumProtoSuccess: 3, + }, nil + }) + + t.Setenv("DATABRICKS_HOST", server.URL) + t.Setenv("DATABRICKS_TOKEN", "foobar") + + execPath := testutil.BuildCLI(t) + cmd := exec.Command(execPath, "send-test-event") + err := cmd.Run() + require.NoError(t, err) + + // No API calls should have been made yet. Even though the main process has + // finished, the telemetry worker should be running in the background. + assert.Equal(t, int64(0), count) + + // Close the channel to allow the API call to go through. + close(fire) + assert.Eventually(t, func() bool { + return count == 1 + }, 10*time.Second, 1*time.Second) } diff --git a/internal/testutil/build/databricks b/internal/testutil/build/databricks new file mode 100755 index 000000000..7b6509b9a Binary files /dev/null and b/internal/testutil/build/databricks differ diff --git a/internal/testutil/build_cli.go b/internal/testutil/build_cli.go new file mode 100644 index 000000000..ce3c69ff5 --- /dev/null +++ b/internal/testutil/build_cli.go @@ -0,0 +1,55 @@ +package testutil + +import ( + "os/exec" + "path/filepath" + "runtime" + "time" + + "github.com/databricks/cli/libs/folders" + "github.com/stretchr/testify/require" +) + +func BuildCLI(t TestingT, flags ...string) string { + repoRoot, err := folders.FindDirWithLeaf(".", ".git") + require.NoError(t, err) + + // Stable path for the CLI binary. This ensures fast builds and cache reuse. + execPath := filepath.Join(repoRoot, "internal", "testutil", "build", "databricks") + if runtime.GOOS == "windows" { + execPath += ".exe" + } + + start := time.Now() + args := []string{ + "go", "build", + "-mod", "vendor", + "-o", execPath, + } + if len(flags) > 0 { + args = append(args, flags...) + } + + if runtime.GOOS == "windows" { + // Get this error on my local Windows: + // error obtaining VCS status: exit status 128 + // Use -buildvcs=false to disable VCS stamping. + args = append(args, "-buildvcs=false") + } + + cmd := exec.Command(args[0], args[1:]...) + cmd.Dir = repoRoot + out, err := cmd.CombinedOutput() + elapsed := time.Since(start) + t.Logf("%s took %s", args, elapsed) + require.NoError(t, err, "go build failed: %s: %s\n%s", args, err, out) + if len(out) > 0 { + t.Logf("go build output: %s: %s", args, out) + } + + // Quick check + warm up cache: + cmd = exec.Command(execPath, "--version") + out, err = cmd.CombinedOutput() + require.NoError(t, err, "%s --version failed: %s\n%s", execPath, err, out) + return execPath +} diff --git a/internal/testutil/server.go b/internal/testutil/server.go new file mode 100644 index 000000000..546985528 --- /dev/null +++ b/internal/testutil/server.go @@ -0,0 +1,63 @@ +package testutil + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" +) + +type Server struct { + *httptest.Server + Mux *http.ServeMux +} + +type HandlerFunc func(r *http.Request) (any, error) + +func NewServer() *Server { + mux := http.NewServeMux() + server := httptest.NewServer(mux) + + return &Server{ + Server: server, + Mux: mux, + } +} + +func (s *Server) Handle(pattern string, handler HandlerFunc) { + s.Mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { + resp, err := handler(r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + + var respBytes []byte + + respString, ok := resp.(string) + if ok { + respBytes = []byte(respString) + } else { + respBytes, err = json.MarshalIndent(resp, "", " ") + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } + + if _, err := w.Write(respBytes); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }) +} + +func StartServer(t *testing.T) *Server { + server := NewServer() + t.Cleanup(func() { + server.Close() + }) + return server +} diff --git a/libs/auth/env.go b/libs/auth/env.go new file mode 100644 index 000000000..c58cc53e3 --- /dev/null +++ b/libs/auth/env.go @@ -0,0 +1,26 @@ +package auth + +import "github.com/databricks/databricks-sdk-go/config" + +// Env generates the authentication environment variables we need to set for +// downstream applications from the CLI to work correctly. +func Env(cfg *config.Config) map[string]string { + out := make(map[string]string) + for _, attr := range config.ConfigAttributes { + // Ignore profile so that downstream tools don't try and reload + // the profile. We know the current configuration is already valid since + // otherwise the CLI would have thrown an error when loading it. + if attr.Name == "profile" { + continue + } + if len(attr.EnvVars) == 0 { + continue + } + if attr.IsZero(cfg) { + continue + } + out[attr.EnvVars[0]] = attr.GetString(cfg) + } + + return out +} diff --git a/libs/dbr/detect.go b/libs/dbr/detect.go index d8b4dfe20..12b83dc8c 100644 --- a/libs/dbr/detect.go +++ b/libs/dbr/detect.go @@ -11,6 +11,8 @@ import ( // Dereference [os.Stat] to allow mocking in tests. var statFunc = os.Stat +const EnvVarName = "DATABRICKS_RUNTIME_VERSION" + // detect returns true if the current process is running on a Databricks Runtime. // Its return value is meant to be cached in the context. func detect(ctx context.Context) bool { @@ -21,7 +23,7 @@ func detect(ctx context.Context) bool { } // Databricks Runtime always has the DATABRICKS_RUNTIME_VERSION environment variable set. - if value, ok := env.Lookup(ctx, "DATABRICKS_RUNTIME_VERSION"); !ok || value == "" { + if value, ok := env.Lookup(ctx, EnvVarName); !ok || value == "" { return false } diff --git a/libs/telemetry/api.go b/libs/telemetry/api.go index e3f80fadb..7a3d242aa 100644 --- a/libs/telemetry/api.go +++ b/libs/telemetry/api.go @@ -1,5 +1,10 @@ package telemetry +import ( + "github.com/databricks/cli/libs/telemetry/protos" + "github.com/databricks/databricks-sdk-go/config" +) + // RequestBody is the request body type bindings for the /telemetry-ext API endpoint. type RequestBody struct { UploadTime int64 `json:"uploadTime"` @@ -17,3 +22,8 @@ type LogError struct { Message string `json:"message"` ErrorType string `json:"ErrorType"` } + +type WorkerInput struct { + AuthConfig *config.Config `json:"authConfig"` + Logs []protos.FrontendLog `json:"logs"` +} diff --git a/libs/telemetry/context.go b/libs/telemetry/context.go index c0aea80bc..529ad9f6e 100644 --- a/libs/telemetry/context.go +++ b/libs/telemetry/context.go @@ -11,52 +11,15 @@ type telemetryLogger int // Key to store the telemetry logger in the context var telemetryLoggerKey telemetryLogger -func WithDefaultLogger(ctx context.Context) context.Context { - v := ctx.Value(telemetryLoggerKey) - - // If no logger is set in the context, set the default logger. - if v == nil { - nctx := context.WithValue(ctx, telemetryLoggerKey, &defaultLogger{}) - return nctx - } - - switch v.(type) { - case *defaultLogger: - panic(fmt.Errorf("default telemetry logger already set in the context: %T", v)) - case *mockLogger: - // Do nothing. Unit and integration tests set the mock logger in the context - // to avoid making actual API calls. Thus WithDefaultLogger should silently - // ignore the mock logger. - default: - panic(fmt.Errorf("unexpected telemetry logger type: %T", v)) - } - - return ctx +func WithNewLogger(ctx context.Context) context.Context { + return context.WithValue(ctx, telemetryLoggerKey, &logger{}) } -// WithMockLogger sets a mock telemetry logger in the context. It overrides the -// default logger if it is already set in the context. -func WithMockLogger(ctx context.Context) context.Context { - v := ctx.Value(telemetryLoggerKey) - if v != nil { - panic(fmt.Errorf("telemetry logger already set in the context: %T", v)) - } - - return context.WithValue(ctx, telemetryLoggerKey, &mockLogger{}) -} - -func fromContext(ctx context.Context) Logger { +func fromContext(ctx context.Context) *logger { v := ctx.Value(telemetryLoggerKey) if v == nil { panic(fmt.Errorf("telemetry logger not found in the context")) } - switch vv := v.(type) { - case *defaultLogger: - return vv - case *mockLogger: - return vv - default: - panic(fmt.Errorf("unexpected telemetry logger type: %T", v)) - } + return v.(*logger) } diff --git a/libs/telemetry/context_test.go b/libs/telemetry/context_test.go deleted file mode 100644 index ddcdb83de..000000000 --- a/libs/telemetry/context_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package telemetry - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestWithDefaultLogger(t *testing.T) { - ctx := context.Background() - - // No default logger set - ctx1 := WithDefaultLogger(ctx) - assert.Equal(t, &defaultLogger{}, ctx1.Value(telemetryLoggerKey)) - - // Default logger already set - assert.PanicsWithError(t, "default telemetry logger already set in the context: *telemetry.defaultLogger", func() { - WithDefaultLogger(ctx1) - }) - - // Mock logger already set - ctx2 := WithMockLogger(ctx) - assert.NotPanics(t, func() { - WithDefaultLogger(ctx2) - }) - - // Unexpected logger type - type foobar struct{} - ctx3 := context.WithValue(ctx, telemetryLoggerKey, &foobar{}) - assert.PanicsWithError(t, "unexpected telemetry logger type: *telemetry.foobar", func() { - WithDefaultLogger(ctx3) - }) -} - -func TestWithMockLogger(t *testing.T) { - ctx := context.Background() - - // No logger set - ctx1 := WithMockLogger(ctx) - assert.Equal(t, &mockLogger{}, ctx1.Value(telemetryLoggerKey)) - - // Logger already set - assert.PanicsWithError(t, "telemetry logger already set in the context: *telemetry.mockLogger", func() { - WithMockLogger(ctx1) - }) - - // Default logger already set - ctx2 := WithDefaultLogger(ctx) - assert.PanicsWithError(t, "telemetry logger already set in the context: *telemetry.defaultLogger", func() { - WithMockLogger(ctx2) - }) -} - -func TestFromContext(t *testing.T) { - ctx := context.Background() - - // No logger set - assert.PanicsWithError(t, "telemetry logger not found in the context", func() { - fromContext(ctx) - }) - - // Default logger set - ctx1 := WithDefaultLogger(ctx) - assert.Equal(t, &defaultLogger{}, fromContext(ctx1)) - - // Mock logger set - ctx2 := WithMockLogger(ctx) - assert.Equal(t, &mockLogger{}, fromContext(ctx2)) - - // Unexpected logger type - type foobar struct{} - ctx3 := context.WithValue(ctx, telemetryLoggerKey, &foobar{}) - assert.PanicsWithError(t, "unexpected telemetry logger type: *telemetry.foobar", func() { - fromContext(ctx3) - }) -} diff --git a/libs/telemetry/logger.go b/libs/telemetry/logger.go index 77fffac8a..969e55478 100644 --- a/libs/telemetry/logger.go +++ b/libs/telemetry/logger.go @@ -2,138 +2,52 @@ package telemetry import ( "context" - "encoding/json" - "net/http" - "time" - "github.com/databricks/cli/libs/log" + "github.com/databricks/cli/libs/telemetry/protos" "github.com/google/uuid" ) -// Interface abstraction created to mock out the Databricks client for testing. -type DatabricksApiClient interface { - Do(ctx context.Context, method, path string, - headers map[string]string, request, response any, - visitors ...func(*http.Request) error) error +const SkipEnvVar = "DATABRICKS_CLI_SKIP_TELEMETRY" + +// DATABRICKS_CLI_BLOCK_ON_TELEMETRY_UPLOAD is an environment variable that can be set +// to make the CLI process block until the telemetry logs are uploaded. +// Only used for testing. +const BlockOnUploadEnvVar = "DATABRICKS_CLI_BLOCK_ON_TELEMETRY_UPLOAD" + +func Log(ctx context.Context, event protos.DatabricksCliLog) { + fromContext(ctx).log(event) } -type Logger interface { - // Record a telemetry event, to be flushed later. - Log(event DatabricksCliLog) - - // Flush all the telemetry events that have been logged so far. We expect - // this to be called once per CLI command for the default logger. - Flush(ctx context.Context, apiClient DatabricksApiClient) - - // This function is meant to be only to be used in tests to introspect - // the telemetry logs that have been logged so far. - Introspect() []DatabricksCliLog +func GetLogs(ctx context.Context) []protos.FrontendLog { + return fromContext(ctx).getLogs() } -type defaultLogger struct { - logs []FrontendLog +func SetExecutionContext(ctx context.Context, ec protos.ExecutionContext) { + fromContext(ctx).setExecutionContext(ec) } -func (l *defaultLogger) Log(event DatabricksCliLog) { +type logger struct { + logs []protos.FrontendLog +} + +func (l *logger) log(event protos.DatabricksCliLog) { if l.logs == nil { - l.logs = make([]FrontendLog, 0) + l.logs = make([]protos.FrontendLog, 0) } - l.logs = append(l.logs, FrontendLog{ - // The telemetry endpoint deduplicates logs based on the FrontendLogEventID. - // This it's important to generate a unique ID for each log event. + l.logs = append(l.logs, protos.FrontendLog{ FrontendLogEventID: uuid.New().String(), - Entry: FrontendLogEntry{ + Entry: protos.FrontendLogEntry{ DatabricksCliLog: event, }, }) } -// Maximum additional time to wait for the telemetry event to flush. We expect the flush -// method to be called when the CLI command is about to exit, so this caps the maximum -// additional time the user will experience because of us logging CLI telemetry. -var MaxAdditionalWaitTime = 3 * time.Second +func (l *logger) getLogs() []protos.FrontendLog { + return l.logs +} -// We make the API call to the /telemetry-ext endpoint to log the CLI telemetry events -// right about as the CLI command is about to exit. The API endpoint can handle -// payloads with ~1000 events easily. Thus we log all the events at once instead of -// batching the logs across multiple API calls. -func (l *defaultLogger) Flush(ctx context.Context, apiClient DatabricksApiClient) { - // Set a maximum time to wait for the telemetry event to flush. - ctx, cancel := context.WithTimeout(ctx, MaxAdditionalWaitTime) - defer cancel() - - if len(l.logs) == 0 { - log.Debugf(ctx, "No telemetry events to flush") - return - } - - var protoLogs []string - for _, event := range l.logs { - s, err := json.Marshal(event) - if err != nil { - log.Debugf(ctx, "Error marshalling the telemetry event %v: %v", event, err) - continue - } - - protoLogs = append(protoLogs, string(s)) - } - - resp := &ResponseBody{} - for { - select { - case <-ctx.Done(): - log.Debugf(ctx, "Timed out before flushing telemetry events") - return - default: - // Proceed - } - - // Log the CLI telemetry events. - err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{ - UploadTime: time.Now().Unix(), - ProtoLogs: protoLogs, - - // A bug in the telemetry API requires us to send an empty items array. - // Otherwise we get an opaque 500 internal server error. - Items: []string{}, - }, resp) - if err != nil { - // The SDK automatically performs retries for 429s and 503s. Thus if we - // see an error here, do not retry logging the telemetry. - log.Debugf(ctx, "Error making the API request to /telemetry-ext: %v", err) - return - } - // If not all the logs were successfully sent, we'll retry and log everything - // again. - // - // Note: This will result in server side duplications but that's fine since - // we can always deduplicate in the data pipeline itself. - if len(l.logs) > int(resp.NumProtoSuccess) { - log.Debugf(ctx, "Not all logs were successfully sent. Retrying...") - continue - } - - // All logs were successfully sent. We can exit the function. - log.Debugf(ctx, "Successfully flushed telemetry events") - return +func (l *logger) setExecutionContext(ec protos.ExecutionContext) { + for i := range l.logs { + l.logs[i].Entry.DatabricksCliLog.ExecutionContext = &ec } } - -func (l *defaultLogger) Introspect() []DatabricksCliLog { - panic("not implemented") -} - -func Log(ctx context.Context, event DatabricksCliLog) { - l := fromContext(ctx) - l.Log(event) -} - -func Flush(ctx context.Context, apiClient DatabricksApiClient) { - l := fromContext(ctx) - l.Flush(ctx, apiClient) -} - -func Introspect(ctx context.Context) []DatabricksCliLog { - l := fromContext(ctx) - return l.Introspect() -} diff --git a/libs/telemetry/logger_test.go b/libs/telemetry/logger_test.go deleted file mode 100644 index 0d5eb01ff..000000000 --- a/libs/telemetry/logger_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package telemetry - -import ( - "context" - "math/rand" - "net/http" - "testing" - - "github.com/databricks/cli/libs/telemetry/protos" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" -) - -type mockDatabricksClient struct { - numCalls int - t *testing.T -} - -func (m *mockDatabricksClient) Do(ctx context.Context, method, path string, headers map[string]string, request, response any, visitors ...func(*http.Request) error) error { - // Block until the fire channel is fired. - m.numCalls++ - - assertRequestPayload := func(reqb RequestBody) { - expectedProtoLogs := []string{ - "{\"frontend_log_event_id\":\"0194fdc2-fa2f-4cc0-81d3-ff12045b73c8\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE1\"}}}}", - "{\"frontend_log_event_id\":\"6e4ff95f-f662-45ee-a82a-bdf44a2d0b75\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}", - "{\"frontend_log_event_id\":\"fb180daf-48a7-4ee0-b10d-394651850fd4\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}", - "{\"frontend_log_event_id\":\"a178892e-e285-4ce1-9114-55780875d64e\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE3\"}}}}", - } - - // Assert payload matches the expected payload. - assert.Equal(m.t, expectedProtoLogs, reqb.ProtoLogs) - } - - switch m.numCalls { - case 1: - // The call is successful but not all events are successfully logged. - assertRequestPayload(request.(RequestBody)) - *(response.(*ResponseBody)) = ResponseBody{ - NumProtoSuccess: 3, - } - case 2: - // The call is successful and all events are successfully logged. - assertRequestPayload(request.(RequestBody)) - *(response.(*ResponseBody)) = ResponseBody{ - NumProtoSuccess: 4, - } - default: - panic("unexpected number of calls") - } - - return nil -} - -func TestTelemetryLoggerFlushesEvents(t *testing.T) { - mockClient := &mockDatabricksClient{ - t: t, - } - - // Set the random number generator to a fixed seed to ensure that the UUIDs are deterministic. - uuid.SetRand(rand.New(rand.NewSource(0))) - t.Cleanup(func() { - uuid.SetRand(nil) - }) - - ctx := WithDefaultLogger(context.Background()) - - for _, v := range []protos.DummyCliEnum{protos.DummyCliEnumValue1, protos.DummyCliEnumValue2, protos.DummyCliEnumValue2, protos.DummyCliEnumValue3} { - Log(ctx, DatabricksCliLog{ - CliTestEvent: &protos.CliTestEvent{Name: v}, - }) - } - - // Flush the events. - Flush(ctx, mockClient) - - // Assert that the .Do method is called twice, because all logs were not - // successfully logged in the first call. - assert.Equal(t, 2, mockClient.numCalls) -} - -func TestTelemetryLoggerFlushExitsOnTimeout(t *testing.T) { - // Set the maximum additional wait time to 0 to ensure that the Flush method times out immediately. - oldV := MaxAdditionalWaitTime - MaxAdditionalWaitTime = 0 - t.Cleanup(func() { - MaxAdditionalWaitTime = oldV - }) - - mockClient := &mockDatabricksClient{ - t: t, - } - - // Set the random number generator to a fixed seed to ensure that the UUIDs are deterministic. - uuid.SetRand(rand.New(rand.NewSource(0))) - t.Cleanup(func() { - uuid.SetRand(nil) - }) - - ctx := WithDefaultLogger(context.Background()) - - for _, v := range []protos.DummyCliEnum{protos.DummyCliEnumValue1, protos.DummyCliEnumValue2, protos.DummyCliEnumValue2, protos.DummyCliEnumValue3} { - Log(ctx, DatabricksCliLog{ - CliTestEvent: &protos.CliTestEvent{Name: v}, - }) - } - - // Flush the events. - Flush(ctx, mockClient) - - // Assert that the .Do method is never called since the timeout is set to 0. - assert.Equal(t, 0, mockClient.numCalls) -} diff --git a/libs/telemetry/mock_logger.go b/libs/telemetry/mock_logger.go deleted file mode 100644 index be33ee658..000000000 --- a/libs/telemetry/mock_logger.go +++ /dev/null @@ -1,22 +0,0 @@ -package telemetry - -import "context" - -type mockLogger struct { - events []DatabricksCliLog -} - -func (l *mockLogger) Log(event DatabricksCliLog) { - if l.events == nil { - l.events = make([]DatabricksCliLog, 0) - } - l.events = append(l.events, event) -} - -func (l *mockLogger) Flush(ctx context.Context, apiClient DatabricksApiClient) { - // Do nothing -} - -func (l *mockLogger) Introspect() []DatabricksCliLog { - return l.events -} diff --git a/libs/telemetry/protos/README.md b/libs/telemetry/protos/README.md index a188a0868..7dcc75e17 100644 --- a/libs/telemetry/protos/README.md +++ b/libs/telemetry/protos/README.md @@ -1,3 +1,2 @@ The types in this package are equivalent to the lumberjack protos defined in Universe. -You can find all lumberjack protos for the Databricks CLI in the `proto/logs/frontend/databricks_cli` -directory. +You can find all lumberjack protos for the Databricks CLI in the `proto/logs/frontend/databricks_cli` directory. diff --git a/libs/telemetry/protos/bundle_deploy.go b/libs/telemetry/protos/bundle_deploy.go new file mode 100644 index 000000000..d096cd1bb --- /dev/null +++ b/libs/telemetry/protos/bundle_deploy.go @@ -0,0 +1,77 @@ +package protos + +type BundleDeployEvent struct { + // UUID associated with the bundle itself. Set in the `bundle.uuid` field in the bundle configuration. + BundleUuid string `json:"bundle_uuid,omitempty"` + + ResourceCount int64 `json:"resource_count,omitempty"` + ResourceJobCount int64 `json:"resource_job_count,omitempty"` + ResourcePipelineCount int64 `json:"resource_pipeline_count,omitempty"` + ResourceModelCount int64 `json:"resource_model_count,omitempty"` + ResourceExperimentCount int64 `json:"resource_experiment_count,omitempty"` + ResourceModelServingEndpointCount int64 `json:"resource_model_serving_endpoint_count,omitempty"` + ResourceRegisteredModelCount int64 `json:"resource_registered_model_count,omitempty"` + ResourceQualityMonitorCount int64 `json:"resource_quality_monitor_count,omitempty"` + ResourceSchemaCount int64 `json:"resource_schema_count,omitempty"` + ResourceVolumeCount int64 `json:"resource_volume_count,omitempty"` + ResourceClusterCount int64 `json:"resource_cluster_count,omitempty"` + ResourceDashboardCount int64 `json:"resource_dashboard_count,omitempty"` + ResourceAppCount int64 `json:"resource_app_count,omitempty"` + + // IDs of resources managed by the bundle. Some resources like volumes or schemas + // do not expose a numerical or UUID identifier and are tracked by name. Those + // resources are not tracked here since the names are PII. + ResourceJobIds []string `json:"resource_job_ids,omitempty"` + ResourcePipelineIds []string `json:"resource_pipeline_ids,omitempty"` + ResourceClusterIds []string `json:"resource_cluster_ids,omitempty"` + ResourceDashboardIds []string `json:"resource_dashboard_ids,omitempty"` + + Experimental *BundleDeployExperimental `json:"experimental,omitempty"` +} + +// These metrics are experimental and are often added in an adhoc manner. There +// are no guarantees for these metrics and they maybe removed in the future without +// any notice. +type BundleDeployExperimental struct { + // Number of YAML (or JSON) configuration files in the bundle. + ConfigurationFileCount int64 `json:"configuration_file_count,omitempty"` + + // Size in bytes of the Terraform state file + TerraformStateSizeBytes int64 `json:"terraform_state_size_bytes,omitempty"` + + // Number of variables in the bundle + VariableCount int64 `json:"variable_count,omitempty"` + ComplexVariableCount int64 `json:"complex_variable_count,omitempty"` + LookupVariableCount int64 `json:"lookup_variable_count,omitempty"` + + // Number of targets in the bundle + TargetCount int64 `json:"target_count,omitempty"` + + // Whether a field is set or not. If a configuration field is not present in this + // map then it is not tracked by this field. + // Keys are the full path of the field in the configuration tree. + // Examples: "bundle.terraform.exec_path", "bundle.git.branch" etc. + SetFields []BoolMapEntry `json:"set_fields,omitempty"` + + // Values for boolean configuration fields like `experimental.python_wheel_wrapper` + // We don't need to define protos to track boolean values and can simply write those + // values to this map to track them. + BoolValues []BoolMapEntry `json:"bool_values,omitempty"` + + BundleMode BundleMode `json:"bundle_mode,omitempty"` + + WorkspaceArtifactPathType BundleDeployArtifactPathType `json:"workspace_artifact_path_type,omitempty"` + + // Execution time per mutator for a selected subset of mutators. + BundleMutatorExecutionTimeMs []IntMapEntry `json:"bundle_mutator_execution_time_ms,omitempty"` +} + +type BoolMapEntry struct { + Key string `json:"key,omitempty"` + Value bool `json:"value,omitempty"` +} + +type IntMapEntry struct { + Key string `json:"key,omitempty"` + Value int64 `json:"value,omitempty"` +} diff --git a/libs/telemetry/protos/bundle_init.go b/libs/telemetry/protos/bundle_init.go index b02008314..47308a267 100644 --- a/libs/telemetry/protos/bundle_init.go +++ b/libs/telemetry/protos/bundle_init.go @@ -1,12 +1,10 @@ package protos -// Corresponds to the `DatabricksCliBundleInitEvent` proto message in `databricks_cli_log.proto` -// as of 20 Dec 2024. type BundleInitEvent struct { // UUID associated with the DAB itself. This is serialized into the DAB // when a user runs `databricks bundle init` and all subsequent deployments of // that DAB can then be associated with this init event. - Uuid string `json:"bundle_uuid,omitempty"` + BundleUuid string `json:"bundle_uuid,omitempty"` // Name of the template initialized when the user ran `databricks bundle init` // This is only populated when the template is a first party template like diff --git a/libs/telemetry/protos/databricks_cli_log.go b/libs/telemetry/protos/databricks_cli_log.go new file mode 100644 index 000000000..7569a7e85 --- /dev/null +++ b/libs/telemetry/protos/databricks_cli_log.go @@ -0,0 +1,35 @@ +package protos + +type ExecutionContext struct { + // UUID generated by the CLI for every CLI command run. This is also set in the HTTP user + // agent under the key "cmd-exec-id" and can be used to correlate frontend_log table + // with the http_access_log table. + CmdExecId string `json:"cmd_exec_id,omitempty"` + + // Version of the Databricks CLI used. + Version string `json:"version,omitempty"` + + // Command that was run by the user. Eg: bundle_deploy, fs_cp etc. + Command string `json:"command,omitempty"` + + // Lowercase string name for the operating system. Same value + // as the one set in `runtime.GOOS` in Golang. + OperatingSystem string `json:"operating_system,omitempty"` + + // Version of DBR from which CLI is being run. + // Only set when the CLI is being run from a Databricks cluster. + DbrVersion string `json:"dbr_version,omitempty"` + + // If true, the CLI is being run from a Databricks notebook / cluster web terminal. + FromWebTerminal bool `json:"from_web_terminal,omitempty"` + + // Time taken for the CLI command to execute. + ExecutionTimeMs int64 `json:"execution_time_ms,omitempty"` + + // Exit code of the CLI command. + ExitCode int64 `json:"exit_code,omitempty"` +} + +type CliTestEvent struct { + Name DummyCliEnum `json:"name,omitempty"` +} diff --git a/libs/telemetry/protos/enum.go b/libs/telemetry/protos/enum.go new file mode 100644 index 000000000..7f6780cb6 --- /dev/null +++ b/libs/telemetry/protos/enum.go @@ -0,0 +1,26 @@ +package protos + +type DummyCliEnum string + +const ( + DummyCliEnumUnspecified DummyCliEnum = "DUMMY_CLI_ENUM_UNSPECIFIED" + DummyCliEnumValue1 DummyCliEnum = "VALUE1" + DummyCliEnumValue2 DummyCliEnum = "VALUE2" + DummyCliEnumValue3 DummyCliEnum = "VALUE3" +) + +type BundleMode string + +const ( + BundleModeUnspecified BundleMode = "TYPE_UNSPECIFIED" + BundleModeDevelopment BundleMode = "DEVELOPMENT" + BundleModeProduction BundleMode = "PRODUCTION" +) + +type BundleDeployArtifactPathType string + +const ( + BundleDeployArtifactPathTypeUnspecified BundleDeployArtifactPathType = "TYPE_UNSPECIFIED" + BundleDeployArtifactPathTypeWorkspace BundleDeployArtifactPathType = "WORKSPACE_FILE_SYSTEM" + BundleDeployArtifactPathTypeVolume BundleDeployArtifactPathType = "UC_VOLUME" +) diff --git a/libs/telemetry/frontend_log.go b/libs/telemetry/protos/frontend_log.go similarity index 60% rename from libs/telemetry/frontend_log.go rename to libs/telemetry/protos/frontend_log.go index 168d61c98..d20d43f6b 100644 --- a/libs/telemetry/frontend_log.go +++ b/libs/telemetry/protos/frontend_log.go @@ -1,10 +1,7 @@ -package telemetry - -import "github.com/databricks/cli/libs/telemetry/protos" +package protos // This corresponds to the FrontendLog lumberjack proto in universe. -// FrontendLog is the top-level struct for any client-side logs at Databricks -// regardless of whether they are generated from the CLI or the web UI. +// FrontendLog is the top-level struct for any client-side logs at Databricks. type FrontendLog struct { // A unique identifier for the log event generated from the CLI. FrontendLogEventID string `json:"frontend_log_event_id,omitempty"` @@ -17,6 +14,9 @@ type FrontendLogEntry struct { } type DatabricksCliLog struct { - CliTestEvent *protos.CliTestEvent `json:"cli_test_event,omitempty"` - BundleInitEvent *protos.BundleInitEvent `json:"bundle_init_event,omitempty"` + ExecutionContext *ExecutionContext `json:"execution_context,omitempty"` + + CliTestEvent *CliTestEvent `json:"cli_test_event,omitempty"` + BundleInitEvent *BundleInitEvent `json:"bundle_init_event,omitempty"` + BundleDeplyEvent *BundleDeployEvent `json:"bundle_deploy_event,omitempty"` } diff --git a/libs/telemetry/protos/test_event.go b/libs/telemetry/protos/test_event.go deleted file mode 100644 index c4f650cda..000000000 --- a/libs/telemetry/protos/test_event.go +++ /dev/null @@ -1,16 +0,0 @@ -package protos - -// dummy event for testing the telemetry pipeline. Corresponds to `DatabricksCliTestEvent` -// proto in `databricks_cli_log.proto` as of 20 Dec 2024. -type CliTestEvent struct { - Name DummyCliEnum `json:"name,omitempty"` -} - -type DummyCliEnum string - -const ( - DummyCliEnumUnspecified DummyCliEnum = "DUMMY_CLI_ENUM_UNSPECIFIED" - DummyCliEnumValue1 DummyCliEnum = "VALUE1" - DummyCliEnumValue2 DummyCliEnum = "VALUE2" - DummyCliEnumValue3 DummyCliEnum = "VALUE3" -) diff --git a/test.txt b/test.txt new file mode 100644 index 000000000..36ebcc3f1 --- /dev/null +++ b/test.txt @@ -0,0 +1 @@ +databricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricksdatabricks \ No newline at end of file