marshall in flush

This commit is contained in:
Shreyas Goenka 2024-12-30 12:29:18 +05:30
parent 4fc8c37376
commit d00dc1f04d
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
6 changed files with 29 additions and 40 deletions

View File

@ -69,7 +69,7 @@ func TestBundleInitOnMlopsStacks(t *testing.T) {
testcli.RequireSuccessfulRun(t, ctx, "bundle", "init", "mlops-stacks", "--output-dir", tmpDir2, "--config-file", filepath.Join(tmpDir1, "config.json"))
// Assert the telemetry payload is correctly logged.
logs, err := telemetry.GetLogs(ctx)
logs := telemetry.GetLogs(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(logs))
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent
@ -186,7 +186,7 @@ func TestBundleInitTelemetryForDefaultTemplates(t *testing.T) {
assert.DirExists(t, filepath.Join(tmpDir2, tc.args["project_name"]))
// Assert the telemetry payload is correctly logged.
logs, err := telemetry.GetLogs(ctx)
logs := telemetry.GetLogs(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(logs))
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent
@ -241,7 +241,7 @@ func TestBundleInitTelemetryForCustomTemplates(t *testing.T) {
// Assert the telemetry payload is correctly logged. For custom templates we should
// never set template_enum_args.
logs, err := telemetry.GetLogs(ctx)
logs := telemetry.GetLogs(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(logs))
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent

View File

@ -64,8 +64,7 @@ func TestTelemetryLogger(t *testing.T) {
})
for _, event := range events {
err := telemetry.Log(ctx, event)
require.NoError(t, err)
telemetry.Log(ctx, event)
}
apiClient, err := client.New(w.W.Config)

View File

@ -18,7 +18,7 @@ func ContextWithLogger(ctx context.Context) context.Context {
return ctx
}
return context.WithValue(ctx, telemetryLoggerKey, &logger{protoLogs: []string{}})
return context.WithValue(ctx, telemetryLoggerKey, &logger{logs: []FrontendLog{}})
}
func fromContext(ctx context.Context) *logger {

View File

@ -3,7 +3,6 @@ package telemetry
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
@ -18,46 +17,28 @@ type DatabricksApiClient interface {
visitors ...func(*http.Request) error) error
}
func Log(ctx context.Context, event DatabricksCliLog) error {
func Log(ctx context.Context, event DatabricksCliLog) {
l := fromContext(ctx)
FrontendLog := FrontendLog{
l.logs = append(l.logs, FrontendLog{
// The telemetry endpoint deduplicates logs based on the FrontendLogEventID.
// This it's important to generate a unique ID for each log event.
FrontendLogEventID: uuid.New().String(),
Entry: FrontendLogEntry{
DatabricksCliLog: event,
},
}
protoLog, err := json.Marshal(FrontendLog)
if err != nil {
return fmt.Errorf("error marshalling the telemetry event: %v", err)
}
l.protoLogs = append(l.protoLogs, string(protoLog))
return nil
})
}
type logger struct {
protoLogs []string
logs []FrontendLog
}
// This function is meant to be only to be used in tests to introspect the telemetry logs
// that have been logged so far.
func GetLogs(ctx context.Context) ([]FrontendLog, error) {
func GetLogs(ctx context.Context) []FrontendLog {
l := fromContext(ctx)
res := []FrontendLog{}
for _, log := range l.protoLogs {
frontendLog := FrontendLog{}
err := json.Unmarshal([]byte(log), &frontendLog)
if err != nil {
return nil, fmt.Errorf("error unmarshalling the telemetry event: %v", err)
}
res = append(res, frontendLog)
}
return res, nil
return l.logs
}
// Maximum additional time to wait for the telemetry event to flush. We expect the flush
@ -75,11 +56,22 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) {
defer cancel()
l := fromContext(ctx)
if len(l.protoLogs) == 0 {
if len(l.logs) == 0 {
log.Debugf(ctx, "No telemetry events to flush")
return
}
var protoLogs []string
for _, event := range l.logs {
s, err := json.Marshal(event)
if err != nil {
log.Debugf(ctx, "Error marshalling the telemetry event %v: %v", event, err)
continue
}
protoLogs = append(protoLogs, string(s))
}
resp := &ResponseBody{}
for {
select {
@ -93,7 +85,7 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) {
// Log the CLI telemetry events.
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{
UploadTime: time.Now().Unix(),
ProtoLogs: l.protoLogs,
ProtoLogs: protoLogs,
// A bug in the telemetry API requires us to send an empty items array.
// Otherwise we get an opaque 500 internal server error.
@ -110,7 +102,7 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) {
//
// Note: This will result in server side duplications but that's fine since
// we can always deduplicate in the data pipeline itself.
if len(l.protoLogs) > int(resp.NumProtoSuccess) {
if len(l.logs) > int(resp.NumProtoSuccess) {
log.Debugf(ctx, "Not all logs were successfully sent. Retrying...")
continue
}

View File

@ -10,7 +10,6 @@ import (
"github.com/databricks/cli/libs/telemetry/events"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type mockDatabricksClient struct {
@ -68,10 +67,9 @@ func TestTelemetryLoggerFlushesEvents(t *testing.T) {
ctx := ContextWithLogger(context.Background())
for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} {
err := Log(ctx, DatabricksCliLog{
Log(ctx, DatabricksCliLog{
CliTestEvent: &events.CliTestEvent{Name: v},
})
require.NoError(t, err)
}
// Flush the events.
@ -102,10 +100,9 @@ func TestTelemetryLoggerFlushExitsOnTimeout(t *testing.T) {
ctx := ContextWithLogger(context.Background())
for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} {
err := Log(ctx, DatabricksCliLog{
Log(ctx, DatabricksCliLog{
CliTestEvent: &events.CliTestEvent{Name: v},
})
require.NoError(t, err)
}
// Flush the events.

View File

@ -124,7 +124,8 @@ func (t *Template) logTelemetry(ctx context.Context) error {
},
}
return telemetry.Log(ctx, event)
telemetry.Log(ctx, event)
return nil
}
// This function materializes the input templates as a project, using user defined