Compare commits

..

No commits in common. "198690d050cbc5d97f508705684422bdd6ebeb34" and "4563397edc81d8e3a5294f6f03bd45f16fb8f53e" have entirely different histories.

7 changed files with 59 additions and 46 deletions

View File

@ -15,7 +15,6 @@ import (
"github.com/databricks/cli/libs/dbr" "github.com/databricks/cli/libs/dbr"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/git" "github.com/databricks/cli/libs/git"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/telemetry" "github.com/databricks/cli/libs/telemetry"
"github.com/databricks/cli/libs/template" "github.com/databricks/cli/libs/template"
"github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/client"
@ -213,7 +212,6 @@ See https://docs.databricks.com/en/dev-tools/bundles/templates.html for more inf
apiClient, err := client.New(w.Config) apiClient, err := client.New(w.Config)
if err != nil { if err != nil {
// Uploading telemetry is best effort. Do not error. // Uploading telemetry is best effort. Do not error.
log.Debugf(ctx, "Could not create API client to send telemetry using: %v", err)
return return
} }

View File

@ -69,7 +69,7 @@ func TestBundleInitOnMlopsStacks(t *testing.T) {
testcli.RequireSuccessfulRun(t, ctx, "bundle", "init", "mlops-stacks", "--output-dir", tmpDir2, "--config-file", filepath.Join(tmpDir1, "config.json")) testcli.RequireSuccessfulRun(t, ctx, "bundle", "init", "mlops-stacks", "--output-dir", tmpDir2, "--config-file", filepath.Join(tmpDir1, "config.json"))
// Assert the telemetry payload is correctly logged. // Assert the telemetry payload is correctly logged.
logs := telemetry.GetLogs(ctx) logs, err := telemetry.GetLogs(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, len(logs)) require.Equal(t, 1, len(logs))
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent event := logs[0].Entry.DatabricksCliLog.BundleInitEvent
@ -186,7 +186,7 @@ func TestBundleInitTelemetryForDefaultTemplates(t *testing.T) {
assert.DirExists(t, filepath.Join(tmpDir2, tc.args["project_name"])) assert.DirExists(t, filepath.Join(tmpDir2, tc.args["project_name"]))
// Assert the telemetry payload is correctly logged. // Assert the telemetry payload is correctly logged.
logs := telemetry.GetLogs(ctx) logs, err := telemetry.GetLogs(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, len(logs)) require.Equal(t, 1, len(logs))
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent event := logs[0].Entry.DatabricksCliLog.BundleInitEvent
@ -241,7 +241,7 @@ func TestBundleInitTelemetryForCustomTemplates(t *testing.T) {
// Assert the telemetry payload is correctly logged. For custom templates we should // Assert the telemetry payload is correctly logged. For custom templates we should
// never set template_enum_args. // never set template_enum_args.
logs := telemetry.GetLogs(ctx) logs, err := telemetry.GetLogs(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, len(logs)) require.Equal(t, 1, len(logs))
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent event := logs[0].Entry.DatabricksCliLog.BundleInitEvent

View File

@ -33,19 +33,23 @@ func (wrapper *apiClientWrapper) Do(ctx context.Context, method, path string,
} }
func TestTelemetryLogger(t *testing.T) { func TestTelemetryLogger(t *testing.T) {
events := []telemetry.DatabricksCliLog{ events := []telemetry.FrontendLogEntry{
{ {
CliTestEvent: &events.CliTestEvent{ DatabricksCliLog: telemetry.DatabricksCliLog{
Name: events.DummyCliEnumValue1, CliTestEvent: &events.CliTestEvent{
Name: events.DummyCliEnumValue1,
},
}, },
}, },
{ {
BundleInitEvent: &events.BundleInitEvent{ DatabricksCliLog: telemetry.DatabricksCliLog{
Uuid: uuid.New().String(), BundleInitEvent: &events.BundleInitEvent{
TemplateName: "abc", Uuid: uuid.New().String(),
TemplateEnumArgs: map[string]string{ TemplateName: "abc",
"a": "b", TemplateEnumArgs: map[string]string{
"c": "d", "a": "b",
"c": "d",
},
}, },
}, },
}, },
@ -64,7 +68,8 @@ func TestTelemetryLogger(t *testing.T) {
}) })
for _, event := range events { for _, event := range events {
telemetry.Log(ctx, event) err := telemetry.Log(ctx, event)
require.NoError(t, err)
} }
apiClient, err := client.New(w.W.Config) apiClient, err := client.New(w.W.Config)

View File

@ -18,7 +18,7 @@ func ContextWithLogger(ctx context.Context) context.Context {
return ctx return ctx
} }
return context.WithValue(ctx, telemetryLoggerKey, &logger{logs: []FrontendLog{}}) return context.WithValue(ctx, telemetryLoggerKey, &logger{protoLogs: []string{}})
} }
func fromContext(ctx context.Context) *logger { func fromContext(ctx context.Context) *logger {

View File

@ -3,6 +3,7 @@ package telemetry
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"time" "time"
@ -17,34 +18,50 @@ type DatabricksApiClient interface {
visitors ...func(*http.Request) error) error visitors ...func(*http.Request) error) error
} }
func Log(ctx context.Context, event DatabricksCliLog) { func Log(ctx context.Context, event FrontendLogEntry) error {
l := fromContext(ctx) l := fromContext(ctx)
l.logs = append(l.logs, FrontendLog{ FrontendLog := FrontendLog{
// The telemetry endpoint deduplicates logs based on the FrontendLogEventID. // The telemetry endpoint deduplicates logs based on the FrontendLogEventID.
// This it's important to generate a unique ID for each log event. // This it's important to generate a unique ID for each log event.
FrontendLogEventID: uuid.New().String(), FrontendLogEventID: uuid.New().String(),
Entry: FrontendLogEntry{ Entry: event,
DatabricksCliLog: event, }
},
}) protoLog, err := json.Marshal(FrontendLog)
if err != nil {
return fmt.Errorf("error marshalling the telemetry event: %v", err)
}
l.protoLogs = append(l.protoLogs, string(protoLog))
return nil
} }
type logger struct { type logger struct {
logs []FrontendLog protoLogs []string
} }
// This function is meant to be only to be used in tests to introspect the telemetry logs // This function is meant to be only to be used in tests to introspect the telemetry logs
// that have been logged so far. // that have been logged so far.
func GetLogs(ctx context.Context) []FrontendLog { func GetLogs(ctx context.Context) ([]FrontendLog, error) {
l := fromContext(ctx) l := fromContext(ctx)
return l.logs res := []FrontendLog{}
for _, log := range l.protoLogs {
frontendLog := FrontendLog{}
err := json.Unmarshal([]byte(log), &frontendLog)
if err != nil {
return nil, fmt.Errorf("error unmarshalling the telemetry event: %v", err)
}
res = append(res, frontendLog)
}
return res, nil
} }
// Maximum additional time to wait for the telemetry event to flush. We expect the flush // Maximum additional time to wait for the telemetry event to flush. We expect the flush
// method to be called when the CLI command is about to exist, so this caps the maximum // method to be called when the CLI command is about to exist, so this caps the maximum
// additional time the user will experience because of us logging CLI telemetry. // additional time the user will experience because of us logging CLI telemetry.
var MaxAdditionalWaitTime = 5 * time.Second var MaxAdditionalWaitTime = 2 * time.Second
// We make the API call to the /telemetry-ext endpoint to log the CLI telemetry events // We make the API call to the /telemetry-ext endpoint to log the CLI telemetry events
// right about as the CLI command is about to exit. The API endpoint can handle // right about as the CLI command is about to exit. The API endpoint can handle
@ -56,22 +73,11 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) {
defer cancel() defer cancel()
l := fromContext(ctx) l := fromContext(ctx)
if len(l.logs) == 0 { if len(l.protoLogs) == 0 {
log.Debugf(ctx, "No telemetry events to flush") log.Debugf(ctx, "No telemetry events to flush")
return return
} }
var protoLogs []string
for _, event := range l.logs {
s, err := json.Marshal(event)
if err != nil {
log.Debugf(ctx, "Error marshalling the telemetry event %v: %v", event, err)
continue
}
protoLogs = append(protoLogs, string(s))
}
resp := &ResponseBody{} resp := &ResponseBody{}
for { for {
select { select {
@ -85,7 +91,7 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) {
// Log the CLI telemetry events. // Log the CLI telemetry events.
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{ err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{
UploadTime: time.Now().Unix(), UploadTime: time.Now().Unix(),
ProtoLogs: protoLogs, ProtoLogs: l.protoLogs,
// A bug in the telemetry API requires us to send an empty items array. // A bug in the telemetry API requires us to send an empty items array.
// Otherwise we get an opaque 500 internal server error. // Otherwise we get an opaque 500 internal server error.
@ -102,7 +108,7 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) {
// //
// Note: This will result in server side duplications but that's fine since // Note: This will result in server side duplications but that's fine since
// we can always deduplicate in the data pipeline itself. // we can always deduplicate in the data pipeline itself.
if len(l.logs) > int(resp.NumProtoSuccess) { if len(l.protoLogs) > int(resp.NumProtoSuccess) {
log.Debugf(ctx, "Not all logs were successfully sent. Retrying...") log.Debugf(ctx, "Not all logs were successfully sent. Retrying...")
continue continue
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/databricks/cli/libs/telemetry/events" "github.com/databricks/cli/libs/telemetry/events"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
) )
type mockDatabricksClient struct { type mockDatabricksClient struct {
@ -67,9 +68,10 @@ func TestTelemetryLoggerFlushesEvents(t *testing.T) {
ctx := ContextWithLogger(context.Background()) ctx := ContextWithLogger(context.Background())
for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} { for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} {
Log(ctx, DatabricksCliLog{ err := Log(ctx, FrontendLogEntry{DatabricksCliLog: DatabricksCliLog{
CliTestEvent: &events.CliTestEvent{Name: v}, CliTestEvent: &events.CliTestEvent{Name: v},
}) }})
require.NoError(t, err)
} }
// Flush the events. // Flush the events.
@ -100,9 +102,10 @@ func TestTelemetryLoggerFlushExitsOnTimeout(t *testing.T) {
ctx := ContextWithLogger(context.Background()) ctx := ContextWithLogger(context.Background())
for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} { for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} {
Log(ctx, DatabricksCliLog{ err := Log(ctx, FrontendLogEntry{DatabricksCliLog: DatabricksCliLog{
CliTestEvent: &events.CliTestEvent{Name: v}, CliTestEvent: &events.CliTestEvent{Name: v},
}) }})
require.NoError(t, err)
} }
// Flush the events. // Flush the events.

View File

@ -124,8 +124,9 @@ func (t *Template) logTelemetry(ctx context.Context) error {
}, },
} }
telemetry.Log(ctx, event) return telemetry.Log(ctx, telemetry.FrontendLogEntry{
return nil DatabricksCliLog: event,
})
} }
// This function materializes the input templates as a project, using user defined // This function materializes the input templates as a project, using user defined