From b3a479ce735b309648bcbaecd438e134033d227f Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Wed, 5 Mar 2025 15:21:12 +0100 Subject: [PATCH] add sync logger --- libs/telemetry/logger.go | 92 +++++++++++++++++++++ libs/telemetry/logger_test.go | 149 ++++++++++++++++++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 libs/telemetry/logger_test.go diff --git a/libs/telemetry/logger.go b/libs/telemetry/logger.go index 4fbce2f52..b93ab94dd 100644 --- a/libs/telemetry/logger.go +++ b/libs/telemetry/logger.go @@ -2,8 +2,17 @@ package telemetry import ( "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" + "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/telemetry/protos" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/client" + "github.com/databricks/databricks-sdk-go/config" "github.com/google/uuid" ) @@ -48,3 +57,86 @@ func (l *logger) setExecutionContext(ec protos.ExecutionContext) { l.logs[i].Entry.DatabricksCliLog.ExecutionContext = &ec } } + +// TODO: Test that the max timeout here is indeed 3 seconds. + +func Upload(ctx context.Context, cfg *config.Config) error { + l := fromContext(ctx) + if len(l.logs) == 0 { + return nil + } + + protoLogs := make([]string, len(l.logs)) + for i, log := range l.logs { + b, err := json.Marshal(log) + if err != nil { + return fmt.Errorf("failed to marshal log: %s", err) + } + protoLogs[i] = string(b) + } + + apiClient, err := client.New(cfg) + if err != nil { + return fmt.Errorf("failed to create API client: %w", err) + } + + ctx, _ = context.WithTimeout(ctx, 3*time.Second) + var resp *ResponseBody + + // Only try uploading logs for a maximum of 3 times. + for i := range 3 { + select { + case <-ctx.Done(): + return fmt.Errorf("uploading telemetry logs timed out: %w", ctx.Err()) + default: + // proceed + } + + resp, err = attempt(ctx, apiClient, protoLogs) + + // All logs were uploaded successfully. + if err == nil && resp.NumProtoSuccess >= int64(len(protoLogs)) { + return nil + } + + // Partial success. Retry. + if err == nil && resp.NumProtoSuccess < int64(len(protoLogs)) { + log.Debugf(ctx, "Attempt %d was a partial success. Number of logs uploaded: %d out of %d\n", i+1, resp.NumProtoSuccess, len(protoLogs)) + time.Sleep(200 * time.Millisecond) + continue + } + + // We retry for all 5xx responses. We explicitly omit 503 in the predicate here + // because it is already automatically retried in the SDK layer. + // ref: https://github.com/databricks/databricks-sdk-go/blob/cdb28002afacb8b762348534a4c4040a9f19c24b/apierr/errors.go#L91 + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode >= 500 && apiErr.StatusCode != 503 { + log.Debugf(ctx, "Attempt %d failed due to a server side error. Retrying status code: %d\n", i+1, apiErr.StatusCode) + time.Sleep(200 * time.Millisecond) + continue + } + } + + return errors.New("failed to upload telemetry logs after three attempts") +} + +func attempt(ctx context.Context, apiClient *client.DatabricksClient, protoLogs []string) (*ResponseBody, error) { + resp := &ResponseBody{} + err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, nil, RequestBody{ + UploadTime: time.Now().UnixMilli(), + // There is a bug in the `/telemetry-ext` API which requires us to + // send an empty array for the `Items` field. Otherwise the API returns + // a 500. + Items: []string{}, + ProtoLogs: protoLogs, + }, resp) + if err != nil { + return nil, err + } + + if len(resp.Errors) > 0 { + return nil, fmt.Errorf("uploading telemetry failed: %v", resp.Errors) + } + + return resp, nil +} diff --git a/libs/telemetry/logger_test.go b/libs/telemetry/logger_test.go new file mode 100644 index 000000000..a38812f48 --- /dev/null +++ b/libs/telemetry/logger_test.go @@ -0,0 +1,149 @@ +package telemetry + +import ( + "context" + "testing" + + "github.com/databricks/cli/libs/telemetry/protos" + "github.com/databricks/cli/libs/testserver" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTelemetryUploadRetriesOnPartialSuccess(t *testing.T) { + server := testserver.New(t) + t.Cleanup(server.Close) + + count := 0 + server.Handle("POST", "/telemetry-ext", func(req testserver.Request) any { + count++ + if count == 1 { + return ResponseBody{ + NumProtoSuccess: 1, + } + } + if count == 2 { + return ResponseBody{ + NumProtoSuccess: 2, + } + } + return nil + }) + + ctx := WithNewLogger(context.Background()) + + Log(ctx, protos.DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnumValue1, + }, + }) + Log(ctx, protos.DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnumValue2, + }, + }) + + err := Upload(ctx, &config.Config{ + Host: server.URL, + Token: "token", + }) + require.NoError(t, err) + assert.Equal(t, 2, count) +} + +func uploadRetriesFor(t *testing.T, statusCode int) { + server := testserver.New(t) + t.Cleanup(server.Close) + + count := 0 + server.Handle("POST", "/telemetry-ext", func(req testserver.Request) any { + count++ + if count == 1 { + return testserver.Response{ + StatusCode: statusCode, + Body: apierr.APIError{ + StatusCode: statusCode, + Message: "Some error", + }, + } + } + if count == 2 { + return ResponseBody{ + NumProtoSuccess: 2, + } + } + return nil + }) + + t.Setenv("DATABRICKS_HOST", server.URL) + t.Setenv("DATABRICKS_TOKEN", "token") + + ctx := WithNewLogger(context.Background()) + + Log(ctx, protos.DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnumValue1, + }, + }) + Log(ctx, protos.DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnumValue2, + }, + }) + + err := Upload(ctx, &config.Config{ + Host: server.URL, + Token: "token", + }) + require.NoError(t, err) + assert.Equal(t, 2, count) +} + +func TestTelemetryUploadRetriesForStatusCodes(t *testing.T) { + // These retries happen in the CLI itself since the SDK does not automatically + // retry for 5xx errors. + uploadRetriesFor(t, 500) + uploadRetriesFor(t, 504) + + // These retries happen on the SDK layer. + // ref: https://github.com/databricks/databricks-sdk-go/blob/cdb28002afacb8b762348534a4c4040a9f19c24b/apierr/errors.go#L91 + uploadRetriesFor(t, 503) + uploadRetriesFor(t, 429) +} + +func TestTelemetryUploadMaxRetries(t *testing.T) { + server := testserver.New(t) + t.Cleanup(server.Close) + count := 0 + + server.Handle("POST", "/telemetry-ext", func(req testserver.Request) any { + count++ + return ResponseBody{ + NumProtoSuccess: 1, + } + }) + + t.Setenv("DATABRICKS_HOST", server.URL) + t.Setenv("DATABRICKS_TOKEN", "token") + ctx := WithNewLogger(context.Background()) + + Log(ctx, protos.DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnumValue1, + }, + }) + Log(ctx, protos.DatabricksCliLog{ + CliTestEvent: &protos.CliTestEvent{ + Name: protos.DummyCliEnumValue2, + }, + }) + + err := Upload(ctx, &config.Config{ + Host: server.URL, + Token: "token", + }) + assert.EqualError(t, err, "failed to upload telemetry logs after three attempts") + assert.Equal(t, 3, count) +}