From d00dc1f04d3cc61fc12cfbbe94ca2e5871a5bc3b Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 30 Dec 2024 12:29:18 +0530 Subject: [PATCH] marshall in flush --- integration/bundle/init_test.go | 6 +-- integration/libs/telemetry/telemetry_test.go | 3 +- libs/telemetry/context.go | 2 +- libs/telemetry/logger.go | 48 ++++++++------------ libs/telemetry/logger_test.go | 7 +-- libs/template/materialize.go | 3 +- 6 files changed, 29 insertions(+), 40 deletions(-) diff --git a/integration/bundle/init_test.go b/integration/bundle/init_test.go index 537d8bf87..9178de667 100644 --- a/integration/bundle/init_test.go +++ b/integration/bundle/init_test.go @@ -69,7 +69,7 @@ func TestBundleInitOnMlopsStacks(t *testing.T) { testcli.RequireSuccessfulRun(t, ctx, "bundle", "init", "mlops-stacks", "--output-dir", tmpDir2, "--config-file", filepath.Join(tmpDir1, "config.json")) // Assert the telemetry payload is correctly logged. - logs, err := telemetry.GetLogs(ctx) + logs := telemetry.GetLogs(ctx) require.NoError(t, err) require.Equal(t, 1, len(logs)) event := logs[0].Entry.DatabricksCliLog.BundleInitEvent @@ -186,7 +186,7 @@ func TestBundleInitTelemetryForDefaultTemplates(t *testing.T) { assert.DirExists(t, filepath.Join(tmpDir2, tc.args["project_name"])) // Assert the telemetry payload is correctly logged. - logs, err := telemetry.GetLogs(ctx) + logs := telemetry.GetLogs(ctx) require.NoError(t, err) require.Equal(t, 1, len(logs)) event := logs[0].Entry.DatabricksCliLog.BundleInitEvent @@ -241,7 +241,7 @@ func TestBundleInitTelemetryForCustomTemplates(t *testing.T) { // Assert the telemetry payload is correctly logged. For custom templates we should // never set template_enum_args. - logs, err := telemetry.GetLogs(ctx) + logs := telemetry.GetLogs(ctx) require.NoError(t, err) require.Equal(t, 1, len(logs)) event := logs[0].Entry.DatabricksCliLog.BundleInitEvent diff --git a/integration/libs/telemetry/telemetry_test.go b/integration/libs/telemetry/telemetry_test.go index c08727042..80918ebba 100644 --- a/integration/libs/telemetry/telemetry_test.go +++ b/integration/libs/telemetry/telemetry_test.go @@ -64,8 +64,7 @@ func TestTelemetryLogger(t *testing.T) { }) for _, event := range events { - err := telemetry.Log(ctx, event) - require.NoError(t, err) + telemetry.Log(ctx, event) } apiClient, err := client.New(w.W.Config) diff --git a/libs/telemetry/context.go b/libs/telemetry/context.go index 9ea913f5a..42c7ef870 100644 --- a/libs/telemetry/context.go +++ b/libs/telemetry/context.go @@ -18,7 +18,7 @@ func ContextWithLogger(ctx context.Context) context.Context { return ctx } - return context.WithValue(ctx, telemetryLoggerKey, &logger{protoLogs: []string{}}) + return context.WithValue(ctx, telemetryLoggerKey, &logger{logs: []FrontendLog{}}) } func fromContext(ctx context.Context) *logger { diff --git a/libs/telemetry/logger.go b/libs/telemetry/logger.go index 450d6e3d6..ecf11d0ef 100644 --- a/libs/telemetry/logger.go +++ b/libs/telemetry/logger.go @@ -3,7 +3,6 @@ package telemetry import ( "context" "encoding/json" - "fmt" "net/http" "time" @@ -18,46 +17,28 @@ type DatabricksApiClient interface { visitors ...func(*http.Request) error) error } -func Log(ctx context.Context, event DatabricksCliLog) error { +func Log(ctx context.Context, event DatabricksCliLog) { l := fromContext(ctx) - FrontendLog := FrontendLog{ + l.logs = append(l.logs, FrontendLog{ // The telemetry endpoint deduplicates logs based on the FrontendLogEventID. // This it's important to generate a unique ID for each log event. FrontendLogEventID: uuid.New().String(), Entry: FrontendLogEntry{ DatabricksCliLog: event, }, - } - - protoLog, err := json.Marshal(FrontendLog) - if err != nil { - return fmt.Errorf("error marshalling the telemetry event: %v", err) - } - - l.protoLogs = append(l.protoLogs, string(protoLog)) - return nil + }) } type logger struct { - protoLogs []string + logs []FrontendLog } // This function is meant to be only to be used in tests to introspect the telemetry logs // that have been logged so far. -func GetLogs(ctx context.Context) ([]FrontendLog, error) { +func GetLogs(ctx context.Context) []FrontendLog { l := fromContext(ctx) - res := []FrontendLog{} - - for _, log := range l.protoLogs { - frontendLog := FrontendLog{} - err := json.Unmarshal([]byte(log), &frontendLog) - if err != nil { - return nil, fmt.Errorf("error unmarshalling the telemetry event: %v", err) - } - res = append(res, frontendLog) - } - return res, nil + return l.logs } // Maximum additional time to wait for the telemetry event to flush. We expect the flush @@ -75,11 +56,22 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) { defer cancel() l := fromContext(ctx) - if len(l.protoLogs) == 0 { + if len(l.logs) == 0 { log.Debugf(ctx, "No telemetry events to flush") return } + var protoLogs []string + for _, event := range l.logs { + s, err := json.Marshal(event) + if err != nil { + log.Debugf(ctx, "Error marshalling the telemetry event %v: %v", event, err) + continue + } + + protoLogs = append(protoLogs, string(s)) + } + resp := &ResponseBody{} for { select { @@ -93,7 +85,7 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) { // Log the CLI telemetry events. err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{ UploadTime: time.Now().Unix(), - ProtoLogs: l.protoLogs, + ProtoLogs: protoLogs, // A bug in the telemetry API requires us to send an empty items array. // Otherwise we get an opaque 500 internal server error. @@ -110,7 +102,7 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) { // // Note: This will result in server side duplications but that's fine since // we can always deduplicate in the data pipeline itself. - if len(l.protoLogs) > int(resp.NumProtoSuccess) { + if len(l.logs) > int(resp.NumProtoSuccess) { log.Debugf(ctx, "Not all logs were successfully sent. Retrying...") continue } diff --git a/libs/telemetry/logger_test.go b/libs/telemetry/logger_test.go index 9e4116965..05be23aeb 100644 --- a/libs/telemetry/logger_test.go +++ b/libs/telemetry/logger_test.go @@ -10,7 +10,6 @@ import ( "github.com/databricks/cli/libs/telemetry/events" "github.com/google/uuid" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type mockDatabricksClient struct { @@ -68,10 +67,9 @@ func TestTelemetryLoggerFlushesEvents(t *testing.T) { ctx := ContextWithLogger(context.Background()) for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} { - err := Log(ctx, DatabricksCliLog{ + Log(ctx, DatabricksCliLog{ CliTestEvent: &events.CliTestEvent{Name: v}, }) - require.NoError(t, err) } // Flush the events. @@ -102,10 +100,9 @@ func TestTelemetryLoggerFlushExitsOnTimeout(t *testing.T) { ctx := ContextWithLogger(context.Background()) for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} { - err := Log(ctx, DatabricksCliLog{ + Log(ctx, DatabricksCliLog{ CliTestEvent: &events.CliTestEvent{Name: v}, }) - require.NoError(t, err) } // Flush the events. diff --git a/libs/template/materialize.go b/libs/template/materialize.go index 18f08988f..32478c859 100644 --- a/libs/template/materialize.go +++ b/libs/template/materialize.go @@ -124,7 +124,8 @@ func (t *Template) logTelemetry(ctx context.Context) error { }, } - return telemetry.Log(ctx, event) + telemetry.Log(ctx, event) + return nil } // This function materializes the input templates as a project, using user defined