mirror of https://github.com/databricks/cli.git
Add logger for CLI telemetry
This commit is contained in:
parent
8053e9c4e4
commit
9fd42279d9
|
@ -0,0 +1,72 @@
|
|||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/databricks/cli/internal/acc"
|
||||
"github.com/databricks/cli/libs/telemetry"
|
||||
"github.com/databricks/databricks-sdk-go/client"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// Wrapper to capture the response from the API client since that's not directly
|
||||
// accessible from the logger.
|
||||
type apiClientWrapper struct {
|
||||
response *telemetry.ResponseBody
|
||||
apiClient *client.DatabricksClient
|
||||
}
|
||||
|
||||
func (wrapper *apiClientWrapper) Do(ctx context.Context, method, path string,
|
||||
headers map[string]string, request, response any,
|
||||
visitors ...func(*http.Request) error) error {
|
||||
|
||||
err := wrapper.apiClient.Do(ctx, method, path, headers, request, response, visitors...)
|
||||
wrapper.response = response.(*telemetry.ResponseBody)
|
||||
return err
|
||||
}
|
||||
|
||||
func TestAccTelemetryLogger(t *testing.T) {
|
||||
ctx, w := acc.WorkspaceTest(t)
|
||||
ctx = telemetry.NewContext(ctx)
|
||||
|
||||
// Extend the maximum wait time for the telemetry flush just for this test.
|
||||
telemetry.MaxAdditionalWaitTime = 1 * time.Hour
|
||||
t.Cleanup(func() {
|
||||
telemetry.MaxAdditionalWaitTime = 2
|
||||
})
|
||||
|
||||
// Log some events.
|
||||
telemetry.Log(ctx, telemetry.FrontendLogEntry{
|
||||
DatabricksCliLog: telemetry.DatabricksCliLog{
|
||||
CliTestEvent: telemetry.CliTestEvent{
|
||||
Name: telemetry.DummyCliEnumValue1,
|
||||
},
|
||||
},
|
||||
})
|
||||
telemetry.Log(ctx, telemetry.FrontendLogEntry{
|
||||
DatabricksCliLog: telemetry.DatabricksCliLog{
|
||||
CliTestEvent: telemetry.CliTestEvent{
|
||||
Name: telemetry.DummyCliEnumValue2,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
apiClient, err := client.New(w.W.Config)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Flush the events.
|
||||
wrapper := &apiClientWrapper{
|
||||
apiClient: apiClient,
|
||||
}
|
||||
telemetry.Flush(ctx, wrapper)
|
||||
|
||||
// Assert that the events were logged.
|
||||
assert.Equal(t, telemetry.ResponseBody{
|
||||
NumProtoSuccess: 2,
|
||||
Errors: []telemetry.LogError{},
|
||||
}, *wrapper.response)
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package telemetry
|
||||
|
||||
// RequestBody is the request body type bindings for the /telemetry-ext API endpoint.
|
||||
type RequestBody struct {
|
||||
UploadTime int64 `json:"uploadTime"`
|
||||
Items []string `json:"items"`
|
||||
ProtoLogs []string `json:"protoLogs"`
|
||||
}
|
||||
|
||||
// ResponseBody is the response body type bindings for the /telemetry-ext API endpoint.
|
||||
type ResponseBody struct {
|
||||
Errors []LogError `json:"errors"`
|
||||
NumProtoSuccess int64 `json:"numProtoSuccess"`
|
||||
}
|
||||
|
||||
type LogError struct {
|
||||
Message string `json:"message"`
|
||||
|
||||
// TODO: Confirm with Ankit that this signature is accurate. When does this typically
|
||||
// trigger?
|
||||
ErrorType string `json:"ErrorType"`
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// Private type to store the telemetry logger in the context
|
||||
type telemetryLogger int
|
||||
|
||||
// Key to store the telemetry logger in the context
|
||||
var telemetryLoggerKey telemetryLogger
|
||||
|
||||
func NewContext(ctx context.Context) context.Context {
|
||||
_, ok := ctx.Value(telemetryLoggerKey).(*logger)
|
||||
if ok {
|
||||
panic("telemetry logger already exists in the context")
|
||||
}
|
||||
|
||||
return context.WithValue(ctx, telemetryLoggerKey, &logger{protoLogs: []string{}})
|
||||
}
|
||||
|
||||
func fromContext(ctx context.Context) *logger {
|
||||
l, ok := ctx.Value(telemetryLoggerKey).(*logger)
|
||||
if !ok {
|
||||
panic("telemetry logger not found in the context")
|
||||
}
|
||||
return l
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
package telemetry
|
||||
|
||||
// This corresponds to the FrontendLog lumberjack proto in universe.
|
||||
// FrontendLog is the top-level struct for any client-side logs at Databricks
|
||||
// regardless of whether they are generated from the CLI or the web UI.
|
||||
type FrontendLog struct {
|
||||
// A unique identifier for the log event generated from the CLI.
|
||||
FrontendLogEventID string `json:"frontend_log_event_id,omitempty"`
|
||||
|
||||
Entry FrontendLogEntry `json:"entry,omitempty"`
|
||||
}
|
||||
|
||||
type FrontendLogEntry struct {
|
||||
DatabricksCliLog DatabricksCliLog `json:"databricks_cli_log,omitempty"`
|
||||
}
|
||||
|
||||
type DatabricksCliLog struct {
|
||||
CliTestEvent CliTestEvent `json:"cli_test_event,omitempty"`
|
||||
}
|
||||
|
||||
// dummy event for testing the telemetry pipeline
|
||||
type CliTestEvent struct {
|
||||
Name DummyCliEnum `json:"name,omitempty"`
|
||||
}
|
||||
|
||||
type DummyCliEnum string
|
||||
|
||||
const (
|
||||
DummyCliEnumUnspecified DummyCliEnum = "DUMMY_CLI_ENUM_UNSPECIFIED"
|
||||
DummyCliEnumValue1 DummyCliEnum = "VALUE1"
|
||||
DummyCliEnumValue2 DummyCliEnum = "VALUE2"
|
||||
DummyCliEnumValue3 DummyCliEnum = "VALUE3"
|
||||
)
|
|
@ -0,0 +1,111 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/databricks/cli/cmd/root"
|
||||
"github.com/databricks/cli/libs/log"
|
||||
"github.com/databricks/databricks-sdk-go/client"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Interface abstraction created to mock out the Databricks client for testing.
|
||||
type DatabricksApiClient interface {
|
||||
Do(ctx context.Context, method, path string,
|
||||
headers map[string]string, request, response any,
|
||||
visitors ...func(*http.Request) error) error
|
||||
}
|
||||
|
||||
func Log(ctx context.Context, event FrontendLogEntry) error {
|
||||
l := fromContext(ctx)
|
||||
|
||||
FrontendLog := FrontendLog{
|
||||
// The telemetry endpoint deduplicates logs based on the FrontendLogEventID.
|
||||
// This it's important to generate a unique ID for each log event.
|
||||
FrontendLogEventID: uuid.New().String(),
|
||||
Entry: event,
|
||||
}
|
||||
|
||||
protoLog, err := json.Marshal(FrontendLog)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error marshalling the telemetry event: %v", err)
|
||||
}
|
||||
|
||||
l.protoLogs = append(l.protoLogs, string(protoLog))
|
||||
return nil
|
||||
}
|
||||
|
||||
type logger struct {
|
||||
protoLogs []string
|
||||
}
|
||||
|
||||
// Maximum additional time to wait for the telemetry event to flush. We expect the flush
|
||||
// method to be called when the CLI command is about to exist, so this caps the maximum
|
||||
// additional time the user will experience because of us logging CLI telemetry.
|
||||
var MaxAdditionalWaitTime = 2 * time.Second
|
||||
|
||||
// We make the API call to the /telemetry-ext endpoint to log the CLI telemetry events
|
||||
// right about as the CLI command is about to exit. The API endpoint can handle
|
||||
// payloads with ~1000 events easily. Thus we log all the events at once instead of
|
||||
// batching the API calls.
|
||||
func Flush(ctx context.Context, apiClient DatabricksApiClient) {
|
||||
// Set a maximum time to wait for the telemetry event to flush.
|
||||
ctx, _ = context.WithTimeout(ctx, MaxAdditionalWaitTime)
|
||||
l := fromContext(ctx)
|
||||
|
||||
// We pass the API client as an arg to mock it in unit tests.
|
||||
if apiClient == nil {
|
||||
var err error
|
||||
|
||||
// Create API client to make the the telemetry API call.
|
||||
apiClient, err = client.New(root.WorkspaceClient(ctx).Config)
|
||||
if err != nil {
|
||||
log.Debugf(ctx, "error creating API client for telemetry: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
resp := &ResponseBody{}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Debugf(ctx, "Timed out before flushing telemetry events")
|
||||
return
|
||||
default:
|
||||
// Proceed
|
||||
}
|
||||
|
||||
// Log the CLI telemetry events.
|
||||
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{
|
||||
UploadTime: time.Now().Unix(),
|
||||
ProtoLogs: l.protoLogs,
|
||||
|
||||
// A bug in the telemetry API requires us to send an empty items array.
|
||||
// Otherwise we get an opaque 500 internal server error.
|
||||
Items: []string{},
|
||||
}, resp)
|
||||
if err != nil {
|
||||
// The SDK automatically performs retries for 429s and 503s. Thus if we
|
||||
// see an error here, do not retry logging the telemetry.
|
||||
log.Debugf(ctx, "Error making the API request to /telemetry-ext: %v", err)
|
||||
return
|
||||
}
|
||||
// If not all the logs were successfully sent, we'll retry and log everything
|
||||
// again.
|
||||
//
|
||||
// Note: This will result in server side duplications but that's fine since
|
||||
// we can always deduplicate in the data pipeline itself.
|
||||
if resp != nil && len(l.protoLogs) > int(resp.NumProtoSuccess) {
|
||||
log.Debugf(ctx, "Not all logs were successfully sent. Retrying...")
|
||||
continue
|
||||
}
|
||||
|
||||
// All logs were successfully sent. We can exit the function.
|
||||
log.Debugf(ctx, "Successfully flushed telemetry events")
|
||||
return
|
||||
}
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
package telemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/cmd/root"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockDatabricksClient struct {
|
||||
numCalls int
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (m *mockDatabricksClient) Do(ctx context.Context, method, path string, headers map[string]string, request, response any, visitors ...func(*http.Request) error) error {
|
||||
// Block until the fire channel is fired.
|
||||
m.numCalls++
|
||||
|
||||
assertRequestPayload := func(reqb RequestBody) {
|
||||
expectedProtoLogs := []string{
|
||||
fmt.Sprintf("{\"frontend_log_event_id\":\"%s\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE1\"}}}}", root.CommandExecId()),
|
||||
fmt.Sprintf("{\"frontend_log_event_id\":\"%s\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}", root.CommandExecId()),
|
||||
fmt.Sprintf("{\"frontend_log_event_id\":\"%s\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE2\"}}}}", root.CommandExecId()),
|
||||
fmt.Sprintf("{\"frontend_log_event_id\":\"%s\",\"entry\":{\"databricks_cli_log\":{\"cli_test_event\":{\"name\":\"VALUE3\"}}}}", root.CommandExecId()),
|
||||
}
|
||||
|
||||
// Assert payload matches the expected payload.
|
||||
assert.Equal(m.t, expectedProtoLogs, reqb.ProtoLogs)
|
||||
}
|
||||
|
||||
switch m.numCalls {
|
||||
case 1:
|
||||
// The call is successful but not all events are successfully logged.
|
||||
assertRequestPayload(request.(RequestBody))
|
||||
*(response.(*ResponseBody)) = ResponseBody{
|
||||
NumProtoSuccess: 3,
|
||||
}
|
||||
case 2:
|
||||
// The call is successful and all events are successfully logged.
|
||||
assertRequestPayload(request.(RequestBody))
|
||||
*(response.(*ResponseBody)) = ResponseBody{
|
||||
NumProtoSuccess: 4,
|
||||
}
|
||||
default:
|
||||
panic("unexpected number of calls")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestTelemetryLoggerFlushesEvents(t *testing.T) {
|
||||
mockClient := &mockDatabricksClient{
|
||||
t: t,
|
||||
}
|
||||
|
||||
ctx := NewContext(context.Background())
|
||||
|
||||
for _, v := range []DummyCliEnum{DummyCliEnumValue1, DummyCliEnumValue2, DummyCliEnumValue2, DummyCliEnumValue3} {
|
||||
err := Log(ctx, FrontendLogEntry{DatabricksCliLog: DatabricksCliLog{
|
||||
CliTestEvent: CliTestEvent{Name: v},
|
||||
}})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Flush the events.
|
||||
Flush(ctx, mockClient)
|
||||
|
||||
// Assert that the protoLogs are empty after flushing.
|
||||
assert.Equal(t, 2, mockClient.numCalls)
|
||||
}
|
||||
|
||||
func TestTelemetryLoggerFlushExitsOnTimeout(t *testing.T) {
|
||||
// Set the maximum additional wait time to 0 to ensure that the Flush method times out immediately.
|
||||
MaxAdditionalWaitTime = 0
|
||||
t.Cleanup(func() {
|
||||
MaxAdditionalWaitTime = 2
|
||||
})
|
||||
|
||||
mockClient := &mockDatabricksClient{
|
||||
t: t,
|
||||
}
|
||||
|
||||
ctx := NewContext(context.Background())
|
||||
|
||||
for _, v := range []DummyCliEnum{DummyCliEnumValue1, DummyCliEnumValue2, DummyCliEnumValue2, DummyCliEnumValue3} {
|
||||
err := Log(ctx, FrontendLogEntry{DatabricksCliLog: DatabricksCliLog{
|
||||
CliTestEvent: CliTestEvent{Name: v},
|
||||
}})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Flush the events.
|
||||
Flush(ctx, mockClient)
|
||||
|
||||
// Assert that the .Do method is never called since the timeout is set to 0.
|
||||
assert.Equal(t, 0, mockClient.numCalls)
|
||||
}
|
Loading…
Reference in New Issue