From 7f7165158bee965744a3748b8244866ab861b6c9 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Sun, 2 Mar 2025 15:25:03 +0100 Subject: [PATCH] better retrying logic --- .../telemetry/upload-fails/out.requests.txt | 34 ++++++ .../upload-fails/out.upload_process.txt | 3 +- .../out.requests.txt | 51 +++++++++ .../out.upload_process.txt | 1 + .../output.txt | 0 .../script | 4 - .../test.toml | 2 +- .../upload-timeout/out.upload_process.txt | 1 - libs/telemetry/upload.go | 107 ++++++++++-------- libs/telemetry/upload_test.go | 68 +++++++---- 10 files changed, 192 insertions(+), 79 deletions(-) create mode 100644 acceptance/telemetry/upload-partially-succeeds/out.requests.txt create mode 100644 acceptance/telemetry/upload-partially-succeeds/out.upload_process.txt rename acceptance/telemetry/{upload-timeout => upload-partially-succeeds}/output.txt (100%) rename acceptance/telemetry/{upload-timeout => upload-partially-succeeds}/script (63%) rename acceptance/telemetry/{upload-timeout => upload-partially-succeeds}/test.toml (81%) delete mode 100644 acceptance/telemetry/upload-timeout/out.upload_process.txt diff --git a/acceptance/telemetry/upload-fails/out.requests.txt b/acceptance/telemetry/upload-fails/out.requests.txt index d8260f562..95b21f5c3 100644 --- a/acceptance/telemetry/upload-fails/out.requests.txt +++ b/acceptance/telemetry/upload-fails/out.requests.txt @@ -15,3 +15,37 @@ ] } } +{ + "headers": { + "Authorization": [ + "Bearer [DATABRICKS_TOKEN]" + ] + }, + "method": "POST", + "path": "/telemetry-ext", + "body": { + "uploadTime": "UNIX_TIME_MILLIS", + "items": [], + "protoLogs": [ + "{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}", + "{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}" + ] + } +} +{ + "headers": { + "Authorization": [ + "Bearer [DATABRICKS_TOKEN]" + ] + }, + "method": "POST", + "path": "/telemetry-ext", + "body": { + "uploadTime": "UNIX_TIME_MILLIS", + "items": [], + "protoLogs": [ + "{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}", + "{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}" + ] + } +} diff --git a/acceptance/telemetry/upload-fails/out.upload_process.txt b/acceptance/telemetry/upload-fails/out.upload_process.txt index 2881d09aa..fd804b280 100644 --- a/acceptance/telemetry/upload-fails/out.upload_process.txt +++ b/acceptance/telemetry/upload-fails/out.upload_process.txt @@ -1,2 +1 @@ -error: Failed to upload telemetry logs: Endpoint not implemented. - +error: upload did not succeed after three attempts. err: &retries.Err{Err:(*apierr.APIError)(0x140003a20a0), Halt:true}. response body: (*telemetry.ResponseBody)(nil) diff --git a/acceptance/telemetry/upload-partially-succeeds/out.requests.txt b/acceptance/telemetry/upload-partially-succeeds/out.requests.txt new file mode 100644 index 000000000..95b21f5c3 --- /dev/null +++ b/acceptance/telemetry/upload-partially-succeeds/out.requests.txt @@ -0,0 +1,51 @@ +{ + "headers": { + "Authorization": [ + "Bearer [DATABRICKS_TOKEN]" + ] + }, + "method": "POST", + "path": "/telemetry-ext", + "body": { + "uploadTime": "UNIX_TIME_MILLIS", + "items": [], + "protoLogs": [ + "{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}", + "{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}" + ] + } +} +{ + "headers": { + "Authorization": [ + "Bearer [DATABRICKS_TOKEN]" + ] + }, + "method": "POST", + "path": "/telemetry-ext", + "body": { + "uploadTime": "UNIX_TIME_MILLIS", + "items": [], + "protoLogs": [ + "{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}", + "{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}" + ] + } +} +{ + "headers": { + "Authorization": [ + "Bearer [DATABRICKS_TOKEN]" + ] + }, + "method": "POST", + "path": "/telemetry-ext", + "body": { + "uploadTime": "UNIX_TIME_MILLIS", + "items": [], + "protoLogs": [ + "{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}", + "{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}" + ] + } +} diff --git a/acceptance/telemetry/upload-partially-succeeds/out.upload_process.txt b/acceptance/telemetry/upload-partially-succeeds/out.upload_process.txt new file mode 100644 index 000000000..d1f9d6f04 --- /dev/null +++ b/acceptance/telemetry/upload-partially-succeeds/out.upload_process.txt @@ -0,0 +1 @@ +error: upload did not succeed after three attempts. err: . response body: &telemetry.ResponseBody{Errors:[]telemetry.LogError{}, NumProtoSuccess:1} diff --git a/acceptance/telemetry/upload-timeout/output.txt b/acceptance/telemetry/upload-partially-succeeds/output.txt similarity index 100% rename from acceptance/telemetry/upload-timeout/output.txt rename to acceptance/telemetry/upload-partially-succeeds/output.txt diff --git a/acceptance/telemetry/upload-timeout/script b/acceptance/telemetry/upload-partially-succeeds/script similarity index 63% rename from acceptance/telemetry/upload-timeout/script rename to acceptance/telemetry/upload-partially-succeeds/script index def1af347..5f29f560f 100644 --- a/acceptance/telemetry/upload-timeout/script +++ b/acceptance/telemetry/upload-partially-succeeds/script @@ -1,10 +1,6 @@ export DATABRICKS_CLI_TELEMETRY_PID_FILE=./telemetry.pid export DATABRICKS_CLI_TELEMETRY_UPLOAD_LOGS_FILE=./out.upload_process.txt -# Configure a timeout of 0 seconds. This test validates that the timeout is respected -# and the telemetry process does not try to upload logs. -export DATABRICKS_CLI_TELEMETRY_UPLOAD_TIMEOUT="0s" - trace $CLI selftest send-telemetry echo "waiting for telemetry process to finish" diff --git a/acceptance/telemetry/upload-timeout/test.toml b/acceptance/telemetry/upload-partially-succeeds/test.toml similarity index 81% rename from acceptance/telemetry/upload-timeout/test.toml rename to acceptance/telemetry/upload-partially-succeeds/test.toml index cb29099c2..7f43da7d9 100644 --- a/acceptance/telemetry/upload-timeout/test.toml +++ b/acceptance/telemetry/upload-partially-succeeds/test.toml @@ -5,6 +5,6 @@ Pattern = "POST /telemetry-ext" Response.Body = ''' { "errors": [], - "numProtoSuccess": 2 + "numProtoSuccess": 1 } ''' diff --git a/acceptance/telemetry/upload-timeout/out.upload_process.txt b/acceptance/telemetry/upload-timeout/out.upload_process.txt deleted file mode 100644 index 8cdfbf20e..000000000 --- a/acceptance/telemetry/upload-timeout/out.upload_process.txt +++ /dev/null @@ -1 +0,0 @@ -error: Failed to flush telemetry log due to timeout diff --git a/libs/telemetry/upload.go b/libs/telemetry/upload.go index 323138f61..eb2b9c9ae 100644 --- a/libs/telemetry/upload.go +++ b/libs/telemetry/upload.go @@ -3,6 +3,7 @@ package telemetry import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -10,6 +11,7 @@ import ( "time" "github.com/databricks/cli/libs/telemetry/protos" + "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/config" ) @@ -26,6 +28,7 @@ const ( DisableEnvVar = "DATABRICKS_CLI_DISABLE_TELEMETRY" // Max time to try and upload the telemetry logs. Useful for testing. + // TODO: Remove the test case for this. UploadTimeoutEnvVar = "DATABRICKS_CLI_TELEMETRY_UPLOAD_TIMEOUT" ) @@ -33,13 +36,8 @@ type UploadConfig struct { Logs []protos.FrontendLog `json:"logs"` } -// Upload reads telemetry logs from stdin and uploads them to the telemetry endpoint. -// This function is always expected to be called in a separate child process from -// the main CLI process. -func Upload(ctx context.Context) (*ResponseBody, error) { - var err error - - b, err := io.ReadAll(os.Stdin) +func readLogs(stdin io.Reader) ([]string, error) { + b, err := io.ReadAll(stdin) if err != nil { return nil, fmt.Errorf("failed to read from stdin: %s\n", err) } @@ -63,6 +61,18 @@ func Upload(ctx context.Context) (*ResponseBody, error) { protoLogs[i] = string(b) } + return protoLogs, nil +} + +// Upload reads telemetry logs from stdin and uploads them to the telemetry endpoint. +// This function is always expected to be called in a separate child process from +// the main CLI process. +func Upload(ctx context.Context) (*ResponseBody, error) { + logs, err := readLogs(os.Stdin) + if err != nil { + return nil, err + } + // Parent process is responsible for setting environment variables to // configure authentication. apiClient, err := client.New(&config.Config{}) @@ -70,55 +80,54 @@ func Upload(ctx context.Context) (*ResponseBody, error) { return nil, fmt.Errorf("Failed to create API client: %s\n", err) } - maxUploadTime := 30 * time.Second - if v, ok := os.LookupEnv(UploadTimeoutEnvVar); ok { - maxUploadTime, err = time.ParseDuration(v) - if err != nil { - return nil, fmt.Errorf("Failed to parse time limit %s: %s\n", UploadTimeoutEnvVar, err) - } - } + var resp *ResponseBody - // Set a maximum total time to try telemetry uploads. - ctx, cancel := context.WithTimeout(ctx, maxUploadTime) - defer cancel() - - resp := &ResponseBody{} - - // Retry uploading logs a maximum of 3 times incase the uploads are partially successful. + // Only try uploading logs for a maximum of 3 times. for range 3 { - // Log the CLI telemetry events. - err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, nil, RequestBody{ - UploadTime: time.Now().UnixMilli(), - // There is a bug in the `/telemetry-ext` API which requires us to - // send an empty array for the `Items` field. Otherwise the API returns - // a 500. - Items: []string{}, - ProtoLogs: protoLogs, - }, resp) - if err != nil { - return nil, fmt.Errorf("Failed to upload telemetry logs: %s\n", err) - } - - // Skip retrying if the upload fails with an error. - if len(resp.Errors) > 0 { - return nil, fmt.Errorf("Failed to upload telemetry logs: %s\n", resp.Errors) - } + // TODO: Confirm that the timeout of a request here is indeed one minute. + resp, err = attempt(ctx, apiClient, logs) // All logs were uploaded successfully. - if resp.NumProtoSuccess == int64(len(in.Logs)) { + if err == nil && resp.NumProtoSuccess >= int64(len(logs)) { return resp, nil } - // Add a delay of 1 second before retrying. We avoid retrying immediately - // to avoid overwhelming the telemetry endpoint. - // We only return incase of partial successful uploads. The SDK layer takes - // care of retrying in case of retriable status codes. - // - // TODO: I think I was wrong about the SDKs automatically doing retries. - // Look into this more and confirm with ankit what the 5xx status codes are. - // TODO: Confirm that the timeout of a request here is indeed one minute. - time.Sleep(1 * time.Second) + // Partial success. Retry. + if err == nil && resp.NumProtoSuccess < int64(len(logs)) { + time.Sleep(2 * time.Second) + continue + } + + // We retry for all 5xx responses. We explicitly omit 503 in the predicate here + // because it is already automatically retried in the SDK layer. + // ref: https://github.com/databricks/databricks-sdk-go/blob/cdb28002afacb8b762348534a4c4040a9f19c24b/apierr/errors.go#L91 + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode >= 500 && apiErr.StatusCode != 503 { + time.Sleep(2 * time.Second) + continue + } } - return nil, fmt.Errorf("Failed to upload all telemetry logs after 4 tries. Only %d/%d logs uploaded", resp.NumProtoSuccess, len(in.Logs)) + return resp, fmt.Errorf("upload did not succeed after three attempts. err: %#v. response body: %#v", err, resp) +} + +func attempt(ctx context.Context, apiClient *client.DatabricksClient, protoLogs []string) (*ResponseBody, error) { + resp := &ResponseBody{} + err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, nil, RequestBody{ + UploadTime: time.Now().UnixMilli(), + // There is a bug in the `/telemetry-ext` API which requires us to + // send an empty array for the `Items` field. Otherwise the API returns + // a 500. + Items: []string{}, + ProtoLogs: protoLogs, + }, resp) + if err != nil { + return nil, err + } + + if len(resp.Errors) > 0 { + return nil, fmt.Errorf("uploading telemetry failed: %v", resp.Errors) + } + + return resp, nil } diff --git a/libs/telemetry/upload_test.go b/libs/telemetry/upload_test.go index b340b668e..6560fbb4c 100644 --- a/libs/telemetry/upload_test.go +++ b/libs/telemetry/upload_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "os" "path/filepath" + "strings" "testing" "github.com/databricks/cli/internal/testutil" @@ -124,31 +125,18 @@ func uploadRetriesFor(t *testing.T, statusCode int) { assert.Equal(t, 2, count) } -// TODO: Confirm that the SDK always parses non-200 status codes as apierr.APIError. -// Only then is this reliable. func TestTelemetryUploadRetriesForStatusCodes(t *testing.T) { - // Note: The SDK retries automatically for 429 and 503. - // TODO: Are there other status codes we need to retry on? Do we need custom - // handler for them? + // These retries happen in the CLI itself since the SDK does not automatically + // retry for 5xx errors. + uploadRetriesFor(t, 500) + uploadRetriesFor(t, 504) + + // These retries happen on the SDK layer. + // ref: https://github.com/databricks/databricks-sdk-go/blob/cdb28002afacb8b762348534a4c4040a9f19c24b/apierr/errors.go#L91 uploadRetriesFor(t, 503) uploadRetriesFor(t, 429) } -func TestTelemetryUploadCanceled(t *testing.T) { - server := testserver.New(t) - t.Cleanup(server.Close) - - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - configureStdin(t) - _, err := Upload(ctx) - - // Since the context is already cancelled, upload should fail immediately - // with a timeout error. - assert.ErrorContains(t, err, "Failed to flush telemetry log due to timeout") -} - func TestTelemetryUploadMaxRetries(t *testing.T) { server := testserver.New(t) t.Cleanup(server.Close) @@ -167,6 +155,42 @@ func TestTelemetryUploadMaxRetries(t *testing.T) { configureStdin(t) _, err := Upload(context.Background()) - assert.EqualError(t, err, "Failed to upload all telemetry logs after 4 tries. Only 1/2 logs uploaded") - assert.Equal(t, 4, count) + assert.EqualError(t, err, "upload did not succeed after three attempts. err: . response body: &telemetry.ResponseBody{Errors:[]telemetry.LogError(nil), NumProtoSuccess:1}") + assert.Equal(t, 3, count) +} + +func TestReadFiles(t *testing.T) { + raw := `{ + "logs": [ + { + "frontend_log_event_id": "1", + "entry": { + "databricks_cli_log": { + "cli_test_event": { + "name": "DummyCliEnumValue1" + } + } + } + }, + { + "frontend_log_event_id": "2", + "entry": { + "databricks_cli_log": { + "cli_test_event": { + "name": "DummyCliEnumValue2" + } + } + } + } + ] +}` + + r := strings.NewReader(raw) + logs, err := readLogs(r) + require.NoError(t, err) + + assert.Equal(t, []string{ + `{"frontend_log_event_id":"1","entry":{"databricks_cli_log":{"cli_test_event":{"name":"DummyCliEnumValue1"}}}}`, + `{"frontend_log_event_id":"2","entry":{"databricks_cli_log":{"cli_test_event":{"name":"DummyCliEnumValue2"}}}}`, + }, logs) }