mirror of https://github.com/databricks/cli.git
add more test cases for retry on status code
This commit is contained in:
parent
e500bcb456
commit
0ff08776ab
|
@ -3,7 +3,6 @@ package telemetry
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -83,49 +82,43 @@ func Upload(ctx context.Context) (*ResponseBody, error) {
|
||||||
ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
|
ctx, cancel := context.WithTimeout(ctx, maxUploadTime)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
maxRetries := 3
|
|
||||||
count := 0
|
|
||||||
|
|
||||||
resp := &ResponseBody{}
|
resp := &ResponseBody{}
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, errors.New("Failed to flush telemetry log due to timeout")
|
|
||||||
|
|
||||||
default:
|
|
||||||
// Proceed
|
|
||||||
}
|
|
||||||
|
|
||||||
count++
|
|
||||||
|
|
||||||
|
// Retry uploading logs a maximum of 3 times incase the uploads are partially successful.
|
||||||
|
for range 3 {
|
||||||
// Log the CLI telemetry events.
|
// Log the CLI telemetry events.
|
||||||
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, nil, RequestBody{
|
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, nil, RequestBody{
|
||||||
UploadTime: time.Now().UnixMilli(),
|
UploadTime: time.Now().UnixMilli(),
|
||||||
Items: []string{},
|
// There is a bug in the `/telemetry-ext` API which requires us to
|
||||||
ProtoLogs: protoLogs,
|
// send an empty array for the `Items` field. Otherwise the API returns
|
||||||
|
// a 500.
|
||||||
|
Items: []string{},
|
||||||
|
ProtoLogs: protoLogs,
|
||||||
}, resp)
|
}, resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to upload telemetry logs: %s\n", err)
|
return nil, fmt.Errorf("Failed to upload telemetry logs: %s\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skip retrying if the upload fails with an error.
|
||||||
if len(resp.Errors) > 0 {
|
if len(resp.Errors) > 0 {
|
||||||
return nil, fmt.Errorf("Failed to upload telemetry logs: %s\n", resp.Errors)
|
return nil, fmt.Errorf("Failed to upload telemetry logs: %s\n", resp.Errors)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// All logs were uploaded successfully.
|
||||||
if resp.NumProtoSuccess == int64(len(in.Logs)) {
|
if resp.NumProtoSuccess == int64(len(in.Logs)) {
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// We retry if the logs were partially uploaded. Subsequent retries have
|
|
||||||
// a chance of uploading all logs successfully. However we limit the number
|
|
||||||
// of retries to avoid excessive load on the telemetry endpoint.
|
|
||||||
if count > maxRetries {
|
|
||||||
return nil, fmt.Errorf("Failed to upload all telemetry logs after 4 tries. Only %d/%d logs uploaded", resp.NumProtoSuccess, len(in.Logs))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add a delay of 1 second before retrying. We avoid retrying immediately
|
// Add a delay of 1 second before retrying. We avoid retrying immediately
|
||||||
// to avoid overwhelming the telemetry endpoint.
|
// to avoid overwhelming the telemetry endpoint.
|
||||||
|
// We only return incase of partial successful uploads. The SDK layer takes
|
||||||
|
// care of retrying in case of retriable status codes.
|
||||||
|
//
|
||||||
|
// TODO: I think I was wrong about the SDKs automatically doing retries.
|
||||||
|
// Look into this more and confirm with ankit what the 5xx status codes are.
|
||||||
|
// TODO: Confirm that the timeout of a request here is indeed one minute.
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("Failed to upload all telemetry logs after 4 tries. Only %d/%d logs uploaded", resp.NumProtoSuccess, len(in.Logs))
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"github.com/databricks/cli/internal/testutil"
|
"github.com/databricks/cli/internal/testutil"
|
||||||
"github.com/databricks/cli/libs/telemetry/protos"
|
"github.com/databricks/cli/libs/telemetry/protos"
|
||||||
"github.com/databricks/cli/libs/testserver"
|
"github.com/databricks/cli/libs/testserver"
|
||||||
|
"github.com/databricks/databricks-sdk-go/apierr"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -57,7 +58,7 @@ func configureStdin(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTelemetryUploadRetries(t *testing.T) {
|
func TestTelemetryUploadRetriesOnPartialSuccess(t *testing.T) {
|
||||||
server := testserver.New(t)
|
server := testserver.New(t)
|
||||||
t.Cleanup(server.Close)
|
t.Cleanup(server.Close)
|
||||||
|
|
||||||
|
@ -88,6 +89,51 @@ func TestTelemetryUploadRetries(t *testing.T) {
|
||||||
assert.Equal(t, 2, count)
|
assert.Equal(t, 2, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func uploadRetriesFor(t *testing.T, statusCode int) {
|
||||||
|
server := testserver.New(t)
|
||||||
|
t.Cleanup(server.Close)
|
||||||
|
|
||||||
|
count := 0
|
||||||
|
server.Handle("POST", "/telemetry-ext", func(req testserver.Request) any {
|
||||||
|
count++
|
||||||
|
if count == 1 {
|
||||||
|
return testserver.Response{
|
||||||
|
StatusCode: statusCode,
|
||||||
|
Body: apierr.APIError{
|
||||||
|
StatusCode: statusCode,
|
||||||
|
Message: "Some error",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if count == 2 {
|
||||||
|
return ResponseBody{
|
||||||
|
NumProtoSuccess: 2,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Setenv("DATABRICKS_HOST", server.URL)
|
||||||
|
t.Setenv("DATABRICKS_TOKEN", "token")
|
||||||
|
|
||||||
|
configureStdin(t)
|
||||||
|
|
||||||
|
resp, err := Upload(context.Background())
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, int64(2), resp.NumProtoSuccess)
|
||||||
|
assert.Equal(t, 2, count)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Confirm that the SDK always parses non-200 status codes as apierr.APIError.
|
||||||
|
// Only then is this reliable.
|
||||||
|
func TestTelemetryUploadRetriesForStatusCodes(t *testing.T) {
|
||||||
|
// Note: The SDK retries automatically for 429 and 503.
|
||||||
|
// TODO: Are there other status codes we need to retry on? Do we need custom
|
||||||
|
// handler for them?
|
||||||
|
uploadRetriesFor(t, 503)
|
||||||
|
uploadRetriesFor(t, 429)
|
||||||
|
}
|
||||||
|
|
||||||
func TestTelemetryUploadCanceled(t *testing.T) {
|
func TestTelemetryUploadCanceled(t *testing.T) {
|
||||||
server := testserver.New(t)
|
server := testserver.New(t)
|
||||||
t.Cleanup(server.Close)
|
t.Cleanup(server.Close)
|
||||||
|
|
Loading…
Reference in New Issue