From 9fd42279d96ed9e738a92f3c9467a733a3fc24ee Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 29 Nov 2024 01:57:48 +0100 Subject: [PATCH] Add logger for CLI telemetry --- internal/telemetry_test.go | 72 +++++++++++++++++++++ libs/telemetry/api.go | 22 +++++++ libs/telemetry/context.go | 28 +++++++++ libs/telemetry/frontend_log.go | 33 ++++++++++ libs/telemetry/logger.go | 111 +++++++++++++++++++++++++++++++++ libs/telemetry/logger_test.go | 101 ++++++++++++++++++++++++++++++ 6 files changed, 367 insertions(+) create mode 100644 internal/telemetry_test.go create mode 100644 libs/telemetry/api.go create mode 100644 libs/telemetry/context.go create mode 100644 libs/telemetry/frontend_log.go create mode 100644 libs/telemetry/logger.go create mode 100644 libs/telemetry/logger_test.go diff --git a/internal/telemetry_test.go b/internal/telemetry_test.go new file mode 100644 index 000000000..35bb6c0d5 --- /dev/null +++ b/internal/telemetry_test.go @@ -0,0 +1,72 @@ +package internal + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/databricks/cli/internal/acc" + "github.com/databricks/cli/libs/telemetry" + "github.com/databricks/databricks-sdk-go/client" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Wrapper to capture the response from the API client since that's not directly +// accessible from the logger. +type apiClientWrapper struct { + response *telemetry.ResponseBody + apiClient *client.DatabricksClient +} + +func (wrapper *apiClientWrapper) Do(ctx context.Context, method, path string, + headers map[string]string, request, response any, + visitors ...func(*http.Request) error) error { + + err := wrapper.apiClient.Do(ctx, method, path, headers, request, response, visitors...) + wrapper.response = response.(*telemetry.ResponseBody) + return err +} + +func TestAccTelemetryLogger(t *testing.T) { + ctx, w := acc.WorkspaceTest(t) + ctx = telemetry.NewContext(ctx) + + // Extend the maximum wait time for the telemetry flush just for this test. + telemetry.MaxAdditionalWaitTime = 1 * time.Hour + t.Cleanup(func() { + telemetry.MaxAdditionalWaitTime = 2 + }) + + // Log some events. + telemetry.Log(ctx, telemetry.FrontendLogEntry{ + DatabricksCliLog: telemetry.DatabricksCliLog{ + CliTestEvent: telemetry.CliTestEvent{ + Name: telemetry.DummyCliEnumValue1, + }, + }, + }) + telemetry.Log(ctx, telemetry.FrontendLogEntry{ + DatabricksCliLog: telemetry.DatabricksCliLog{ + CliTestEvent: telemetry.CliTestEvent{ + Name: telemetry.DummyCliEnumValue2, + }, + }, + }) + + apiClient, err := client.New(w.W.Config) + require.NoError(t, err) + + // Flush the events. + wrapper := &apiClientWrapper{ + apiClient: apiClient, + } + telemetry.Flush(ctx, wrapper) + + // Assert that the events were logged. + assert.Equal(t, telemetry.ResponseBody{ + NumProtoSuccess: 2, + Errors: []telemetry.LogError{}, + }, *wrapper.response) +} diff --git a/libs/telemetry/api.go b/libs/telemetry/api.go new file mode 100644 index 000000000..3a2d1df12 --- /dev/null +++ b/libs/telemetry/api.go @@ -0,0 +1,22 @@ +package telemetry + +// RequestBody is the request body type bindings for the /telemetry-ext API endpoint. +type RequestBody struct { + UploadTime int64 `json:"uploadTime"` + Items []string `json:"items"` + ProtoLogs []string `json:"protoLogs"` +} + +// ResponseBody is the response body type bindings for the /telemetry-ext API endpoint. +type ResponseBody struct { + Errors []LogError `json:"errors"` + NumProtoSuccess int64 `json:"numProtoSuccess"` +} + +type LogError struct { + Message string `json:"message"` + + // TODO: Confirm with Ankit that this signature is accurate. When does this typically + // trigger? + ErrorType string `json:"ErrorType"` +} diff --git a/libs/telemetry/context.go b/libs/telemetry/context.go new file mode 100644 index 000000000..5625825d8 --- /dev/null +++ b/libs/telemetry/context.go @@ -0,0 +1,28 @@ +package telemetry + +import ( + "context" +) + +// Private type to store the telemetry logger in the context +type telemetryLogger int + +// Key to store the telemetry logger in the context +var telemetryLoggerKey telemetryLogger + +func NewContext(ctx context.Context) context.Context { + _, ok := ctx.Value(telemetryLoggerKey).(*logger) + if ok { + panic("telemetry logger already exists in the context") + } + + return context.WithValue(ctx, telemetryLoggerKey, &logger{protoLogs: []string{}}) +} + +func fromContext(ctx context.Context) *logger { + l, ok := ctx.Value(telemetryLoggerKey).(*logger) + if !ok { + panic("telemetry logger not found in the context") + } + return l +} diff --git a/libs/telemetry/frontend_log.go b/libs/telemetry/frontend_log.go new file mode 100644 index 000000000..915aa71ab --- /dev/null +++ b/libs/telemetry/frontend_log.go @@ -0,0 +1,33 @@ +package telemetry + +// This corresponds to the FrontendLog lumberjack proto in universe. +// FrontendLog is the top-level struct for any client-side logs at Databricks +// regardless of whether they are generated from the CLI or the web UI. +type FrontendLog struct { + // A unique identifier for the log event generated from the CLI. + FrontendLogEventID string `json:"frontend_log_event_id,omitempty"` + + Entry FrontendLogEntry `json:"entry,omitempty"` +} + +type FrontendLogEntry struct { + DatabricksCliLog DatabricksCliLog `json:"databricks_cli_log,omitempty"` +} + +type DatabricksCliLog struct { + CliTestEvent CliTestEvent `json:"cli_test_event,omitempty"` +} + +// dummy event for testing the telemetry pipeline +type CliTestEvent struct { + Name DummyCliEnum `json:"name,omitempty"` +} + +type DummyCliEnum string + +const ( + DummyCliEnumUnspecified DummyCliEnum = "DUMMY_CLI_ENUM_UNSPECIFIED" + DummyCliEnumValue1 DummyCliEnum = "VALUE1" + DummyCliEnumValue2 DummyCliEnum = "VALUE2" + DummyCliEnumValue3 DummyCliEnum = "VALUE3" +) diff --git a/libs/telemetry/logger.go b/libs/telemetry/logger.go new file mode 100644 index 000000000..624ca243c --- /dev/null +++ b/libs/telemetry/logger.go @@ -0,0 +1,111 @@ +package telemetry + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/databricks/cli/cmd/root" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/client" + "github.com/google/uuid" +) + +// Interface abstraction created to mock out the Databricks client for testing. +type DatabricksApiClient interface { + Do(ctx context.Context, method, path string, + headers map[string]string, request, response any, + visitors ...func(*http.Request) error) error +} + +func Log(ctx context.Context, event FrontendLogEntry) error { + l := fromContext(ctx) + + FrontendLog := FrontendLog{ + // The telemetry endpoint deduplicates logs based on the FrontendLogEventID. + // This it's important to generate a unique ID for each log event. + FrontendLogEventID: uuid.New().String(), + Entry: event, + } + + protoLog, err := json.Marshal(FrontendLog) + if err != nil { + return fmt.Errorf("error marshalling the telemetry event: %v", err) + } + + l.protoLogs = append(l.protoLogs, string(protoLog)) + return nil +} + +type logger struct { + protoLogs []string +} + +// Maximum additional time to wait for the telemetry event to flush. We expect the flush +// method to be called when the CLI command is about to exist, so this caps the maximum +// additional time the user will experience because of us logging CLI telemetry. +var MaxAdditionalWaitTime = 2 * time.Second + +// We make the API call to the /telemetry-ext endpoint to log the CLI telemetry events +// right about as the CLI command is about to exit. The API endpoint can handle +// payloads with ~1000 events easily. Thus we log all the events at once instead of +// batching the API calls. +func Flush(ctx context.Context, apiClient DatabricksApiClient) { + // Set a maximum time to wait for the telemetry event to flush. + ctx, _ = context.WithTimeout(ctx, MaxAdditionalWaitTime) + l := fromContext(ctx) + + // We pass the API client as an arg to mock it in unit tests. + if apiClient == nil { + var err error + + // Create API client to make the the telemetry API call. + apiClient, err = client.New(root.WorkspaceClient(ctx).Config) + if err != nil { + log.Debugf(ctx, "error creating API client for telemetry: %v", err) + return + } + } + + resp := &ResponseBody{} + for { + select { + case <-ctx.Done(): + log.Debugf(ctx, "Timed out before flushing telemetry events") + return + default: + // Proceed + } + + // Log the CLI telemetry events. + err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{ + UploadTime: time.Now().Unix(), + ProtoLogs: l.protoLogs, + + // A bug in the telemetry API requires us to send an empty items array. + // Otherwise we get an opaque 500 internal server error. + Items: []string{}, + }, resp) + if err != nil { + // The SDK automatically performs retries for 429s and 503s. Thus if we + // see an error here, do not retry logging the telemetry. + log.Debugf(ctx, "Error making the API request to /telemetry-ext: %v", err) + return + } + // If not all the logs were successfully sent, we'll retry and log everything + // again. + // + // Note: This will result in server side duplications but that's fine since + // we can always deduplicate in the data pipeline itself. + if resp != nil && len(l.protoLogs) > int(resp.NumProtoSuccess) { + log.Debugf(ctx, "Not all logs were successfully sent. Retrying...") + continue + } + + // All logs were successfully sent. We can exit the function. + log.Debugf(ctx, "Successfully flushed telemetry events") + return + } +} diff --git a/libs/telemetry/logger_test.go b/libs/telemetry/logger_test.go new file mode 100644 index 000000000..2648e1992 --- /dev/null +++ b/libs/telemetry/logger_test.go @@ -0,0 +1,101 @@ +package telemetry + +import ( + "context" + "fmt" + "net/http" + "testing" + + "github.com/databricks/cli/cmd/root" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type mockDatabricksClient struct { + numCalls int + t *testing.T +} + +func (m *mockDatabricksClient) Do(ctx context.Context, method, path string, headers map[string]string, request, response any, visitors ...func(*http.Request) error) error { + // Block until the fire channel is fired. + m.numCalls++ + + assertRequestPayload := func(reqb RequestBody) { + expectedProtoLogs := []string{ + fmt.Sprintf("{\"frontend_log_event_id\":\"%s\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE1\"}}}}", root.CommandExecId()), + fmt.Sprintf("{\"frontend_log_event_id\":\"%s\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}", root.CommandExecId()), + fmt.Sprintf("{\"frontend_log_event_id\":\"%s\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}", root.CommandExecId()), + fmt.Sprintf("{\"frontend_log_event_id\":\"%s\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE3\"}}}}", root.CommandExecId()), + } + + // Assert payload matches the expected payload. + assert.Equal(m.t, expectedProtoLogs, reqb.ProtoLogs) + } + + switch m.numCalls { + case 1: + // The call is successful but not all events are successfully logged. + assertRequestPayload(request.(RequestBody)) + *(response.(*ResponseBody)) = ResponseBody{ + NumProtoSuccess: 3, + } + case 2: + // The call is successful and all events are successfully logged. + assertRequestPayload(request.(RequestBody)) + *(response.(*ResponseBody)) = ResponseBody{ + NumProtoSuccess: 4, + } + default: + panic("unexpected number of calls") + } + + return nil +} + +func TestTelemetryLoggerFlushesEvents(t *testing.T) { + mockClient := &mockDatabricksClient{ + t: t, + } + + ctx := NewContext(context.Background()) + + for _, v := range []DummyCliEnum{DummyCliEnumValue1, DummyCliEnumValue2, DummyCliEnumValue2, DummyCliEnumValue3} { + err := Log(ctx, FrontendLogEntry{DatabricksCliLog: DatabricksCliLog{ + CliTestEvent: CliTestEvent{Name: v}, + }}) + require.NoError(t, err) + } + + // Flush the events. + Flush(ctx, mockClient) + + // Assert that the protoLogs are empty after flushing. + assert.Equal(t, 2, mockClient.numCalls) +} + +func TestTelemetryLoggerFlushExitsOnTimeout(t *testing.T) { + // Set the maximum additional wait time to 0 to ensure that the Flush method times out immediately. + MaxAdditionalWaitTime = 0 + t.Cleanup(func() { + MaxAdditionalWaitTime = 2 + }) + + mockClient := &mockDatabricksClient{ + t: t, + } + + ctx := NewContext(context.Background()) + + for _, v := range []DummyCliEnum{DummyCliEnumValue1, DummyCliEnumValue2, DummyCliEnumValue2, DummyCliEnumValue3} { + err := Log(ctx, FrontendLogEntry{DatabricksCliLog: DatabricksCliLog{ + CliTestEvent: CliTestEvent{Name: v}, + }}) + require.NoError(t, err) + } + + // Flush the events. + Flush(ctx, mockClient) + + // Assert that the .Do method is never called since the timeout is set to 0. + assert.Equal(t, 0, mockClient.numCalls) +}