better retrying logic

This commit is contained in:
Shreyas Goenka 2025-03-02 15:25:03 +01:00
parent 0ff08776ab
commit 7f7165158b
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
10 changed files with 192 additions and 79 deletions

View File

@ -15,3 +15,37 @@
]
}
}
{
"headers": {
"Authorization": [
"Bearer [DATABRICKS_TOKEN]"
]
},
"method": "POST",
"path": "/telemetry-ext",
"body": {
"uploadTime": "UNIX_TIME_MILLIS",
"items": [],
"protoLogs": [
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}",
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}"
]
}
}
{
"headers": {
"Authorization": [
"Bearer [DATABRICKS_TOKEN]"
]
},
"method": "POST",
"path": "/telemetry-ext",
"body": {
"uploadTime": "UNIX_TIME_MILLIS",
"items": [],
"protoLogs": [
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}",
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}"
]
}
}

View File

@ -1,2 +1 @@
error: Failed to upload telemetry logs: Endpoint not implemented.
error: upload did not succeed after three attempts. err: &retries.Err{Err:(*apierr.APIError)(0x140003a20a0), Halt:true}. response body: (*telemetry.ResponseBody)(nil)

View File

@ -0,0 +1,51 @@
{
"headers": {
"Authorization": [
"Bearer [DATABRICKS_TOKEN]"
]
},
"method": "POST",
"path": "/telemetry-ext",
"body": {
"uploadTime": "UNIX_TIME_MILLIS",
"items": [],
"protoLogs": [
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}",
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}"
]
}
}
{
"headers": {
"Authorization": [
"Bearer [DATABRICKS_TOKEN]"
]
},
"method": "POST",
"path": "/telemetry-ext",
"body": {
"uploadTime": "UNIX_TIME_MILLIS",
"items": [],
"protoLogs": [
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}",
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}"
]
}
}
{
"headers": {
"Authorization": [
"Bearer [DATABRICKS_TOKEN]"
]
},
"method": "POST",
"path": "/telemetry-ext",
"body": {
"uploadTime": "UNIX_TIME_MILLIS",
"items": [],
"protoLogs": [
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE1\"}}}}",
"{\"frontend_log_event_id\":\"[UUID]\",\"entry\":{\"databricks_cli_log\":{\"execution_context\":{\"cmd_exec_id\":\"[UUID]\",\"version\":\"[DEV_VERSION]\",\"command\":\"selftest_send-telemetry\",\"operating_system\":\"[OS]\",\"execution_time_ms\":\"SMALL_INT\",\"exit_code\":0},\"cli_test_event\":{\"name\":\"VALUE2\"}}}}"
]
}
}

View File

@ -0,0 +1 @@
error: upload did not succeed after three attempts. err: <nil>. response body: &telemetry.ResponseBody{Errors:[]telemetry.LogError{}, NumProtoSuccess:1}

View File

@ -1,10 +1,6 @@
export DATABRICKS_CLI_TELEMETRY_PID_FILE=./telemetry.pid
export DATABRICKS_CLI_TELEMETRY_UPLOAD_LOGS_FILE=./out.upload_process.txt
# Configure a timeout of 0 seconds. This test validates that the timeout is respected
# and the telemetry process does not try to upload logs.
export DATABRICKS_CLI_TELEMETRY_UPLOAD_TIMEOUT="0s"
trace $CLI selftest send-telemetry
echo "waiting for telemetry process to finish"

View File

@ -5,6 +5,6 @@ Pattern = "POST /telemetry-ext"
Response.Body = '''
{
"errors": [],
"numProtoSuccess": 2
"numProtoSuccess": 1
}
'''

View File

@ -1 +0,0 @@
error: Failed to flush telemetry log due to timeout

View File

@ -3,6 +3,7 @@ package telemetry
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
@ -10,6 +11,7 @@ import (
"time"
"github.com/databricks/cli/libs/telemetry/protos"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/config"
)
@ -26,6 +28,7 @@ const (
DisableEnvVar = "DATABRICKS_CLI_DISABLE_TELEMETRY"
// Max time to try and upload the telemetry logs. Useful for testing.
// TODO: Remove the test case for this.
UploadTimeoutEnvVar = "DATABRICKS_CLI_TELEMETRY_UPLOAD_TIMEOUT"
)
@ -33,13 +36,8 @@ type UploadConfig struct {
Logs []protos.FrontendLog `json:"logs"`
}
// Upload reads telemetry logs from stdin and uploads them to the telemetry endpoint.
// This function is always expected to be called in a separate child process from
// the main CLI process.
func Upload(ctx context.Context) (*ResponseBody, error) {
var err error
b, err := io.ReadAll(os.Stdin)
func readLogs(stdin io.Reader) ([]string, error) {
b, err := io.ReadAll(stdin)
if err != nil {
return nil, fmt.Errorf("failed to read from stdin: %s\n", err)
}
@ -63,6 +61,18 @@ func Upload(ctx context.Context) (*ResponseBody, error) {
protoLogs[i] = string(b)
}
return protoLogs, nil
}
// Upload reads telemetry logs from stdin and uploads them to the telemetry endpoint.
// This function is always expected to be called in a separate child process from
// the main CLI process.
func Upload(ctx context.Context) (*ResponseBody, error) {
logs, err := readLogs(os.Stdin)
if err != nil {
return nil, err
}
// Parent process is responsible for setting environment variables to
// configure authentication.
apiClient, err := client.New(&config.Config{})
@ -70,55 +80,54 @@ func Upload(ctx context.Context) (*ResponseBody, error) {
return nil, fmt.Errorf("Failed to create API client: %s\n", err)
}
maxUploadTime := 30 * time.Second
if v, ok := os.LookupEnv(UploadTimeoutEnvVar); ok {
maxUploadTime, err = time.ParseDuration(v)
if err != nil {
return nil, fmt.Errorf("Failed to parse time limit %s: %s\n", UploadTimeoutEnvVar, err)
}
}
var resp *ResponseBody
// Set a maximum total time to try telemetry uploads.
ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
defer cancel()
resp := &ResponseBody{}
// Retry uploading logs a maximum of 3 times incase the uploads are partially successful.
// Only try uploading logs for a maximum of 3 times.
for range 3 {
// Log the CLI telemetry events.
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, nil, RequestBody{
UploadTime: time.Now().UnixMilli(),
// There is a bug in the `/telemetry-ext` API which requires us to
// send an empty array for the `Items` field. Otherwise the API returns
// a 500.
Items: []string{},
ProtoLogs: protoLogs,
}, resp)
if err != nil {
return nil, fmt.Errorf("Failed to upload telemetry logs: %s\n", err)
}
// Skip retrying if the upload fails with an error.
if len(resp.Errors) > 0 {
return nil, fmt.Errorf("Failed to upload telemetry logs: %s\n", resp.Errors)
}
// TODO: Confirm that the timeout of a request here is indeed one minute.
resp, err = attempt(ctx, apiClient, logs)
// All logs were uploaded successfully.
if resp.NumProtoSuccess == int64(len(in.Logs)) {
if err == nil && resp.NumProtoSuccess >= int64(len(logs)) {
return resp, nil
}
// Add a delay of 1 second before retrying. We avoid retrying immediately
// to avoid overwhelming the telemetry endpoint.
// We only return incase of partial successful uploads. The SDK layer takes
// care of retrying in case of retriable status codes.
//
// TODO: I think I was wrong about the SDKs automatically doing retries.
// Look into this more and confirm with ankit what the 5xx status codes are.
// TODO: Confirm that the timeout of a request here is indeed one minute.
time.Sleep(1 * time.Second)
// Partial success. Retry.
if err == nil && resp.NumProtoSuccess < int64(len(logs)) {
time.Sleep(2 * time.Second)
continue
}
// We retry for all 5xx responses. We explicitly omit 503 in the predicate here
// because it is already automatically retried in the SDK layer.
// ref: https://github.com/databricks/databricks-sdk-go/blob/cdb28002afacb8b762348534a4c4040a9f19c24b/apierr/errors.go#L91
var apiErr *apierr.APIError
if errors.As(err, &apiErr) && apiErr.StatusCode >= 500 && apiErr.StatusCode != 503 {
time.Sleep(2 * time.Second)
continue
}
}
return nil, fmt.Errorf("Failed to upload all telemetry logs after 4 tries. Only %d/%d logs uploaded", resp.NumProtoSuccess, len(in.Logs))
return resp, fmt.Errorf("upload did not succeed after three attempts. err: %#v. response body: %#v", err, resp)
}
func attempt(ctx context.Context, apiClient *client.DatabricksClient, protoLogs []string) (*ResponseBody, error) {
resp := &ResponseBody{}
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, nil, RequestBody{
UploadTime: time.Now().UnixMilli(),
// There is a bug in the `/telemetry-ext` API which requires us to
// send an empty array for the `Items` field. Otherwise the API returns
// a 500.
Items: []string{},
ProtoLogs: protoLogs,
}, resp)
if err != nil {
return nil, err
}
if len(resp.Errors) > 0 {
return nil, fmt.Errorf("uploading telemetry failed: %v", resp.Errors)
}
return resp, nil
}

View File

@ -5,6 +5,7 @@ import (
"encoding/json"
"os"
"path/filepath"
"strings"
"testing"
"github.com/databricks/cli/internal/testutil"
@ -124,31 +125,18 @@ func uploadRetriesFor(t *testing.T, statusCode int) {
assert.Equal(t, 2, count)
}
// TODO: Confirm that the SDK always parses non-200 status codes as apierr.APIError.
// Only then is this reliable.
func TestTelemetryUploadRetriesForStatusCodes(t *testing.T) {
// Note: The SDK retries automatically for 429 and 503.
// TODO: Are there other status codes we need to retry on? Do we need custom
// handler for them?
// These retries happen in the CLI itself since the SDK does not automatically
// retry for 5xx errors.
uploadRetriesFor(t, 500)
uploadRetriesFor(t, 504)
// These retries happen on the SDK layer.
// ref: https://github.com/databricks/databricks-sdk-go/blob/cdb28002afacb8b762348534a4c4040a9f19c24b/apierr/errors.go#L91
uploadRetriesFor(t, 503)
uploadRetriesFor(t, 429)
}
func TestTelemetryUploadCanceled(t *testing.T) {
server := testserver.New(t)
t.Cleanup(server.Close)
ctx, cancel := context.WithCancel(context.Background())
cancel()
configureStdin(t)
_, err := Upload(ctx)
// Since the context is already cancelled, upload should fail immediately
// with a timeout error.
assert.ErrorContains(t, err, "Failed to flush telemetry log due to timeout")
}
func TestTelemetryUploadMaxRetries(t *testing.T) {
server := testserver.New(t)
t.Cleanup(server.Close)
@ -167,6 +155,42 @@ func TestTelemetryUploadMaxRetries(t *testing.T) {
configureStdin(t)
_, err := Upload(context.Background())
assert.EqualError(t, err, "Failed to upload all telemetry logs after 4 tries. Only 1/2 logs uploaded")
assert.Equal(t, 4, count)
assert.EqualError(t, err, "upload did not succeed after three attempts. err: <nil>. response body: &telemetry.ResponseBody{Errors:[]telemetry.LogError(nil), NumProtoSuccess:1}")
assert.Equal(t, 3, count)
}
func TestReadFiles(t *testing.T) {
raw := `{
"logs": [
{
"frontend_log_event_id": "1",
"entry": {
"databricks_cli_log": {
"cli_test_event": {
"name": "DummyCliEnumValue1"
}
}
}
},
{
"frontend_log_event_id": "2",
"entry": {
"databricks_cli_log": {
"cli_test_event": {
"name": "DummyCliEnumValue2"
}
}
}
}
]
}`
r := strings.NewReader(raw)
logs, err := readLogs(r)
require.NoError(t, err)
assert.Equal(t, []string{
`{"frontend_log_event_id":"1","entry":{"databricks_cli_log":{"cli_test_event":{"name":"DummyCliEnumValue1"}}}}`,
`{"frontend_log_event_id":"2","entry":{"databricks_cli_log":{"cli_test_event":{"name":"DummyCliEnumValue2"}}}}`,
}, logs)
}