mirror of https://github.com/databricks/cli.git
Compare commits
4 Commits
4563397edc
...
198690d050
Author | SHA1 | Date |
---|---|---|
|
198690d050 | |
|
d00dc1f04d | |
|
4fc8c37376 | |
|
9fb2c66c4a |
|
@ -15,6 +15,7 @@ import (
|
||||||
"github.com/databricks/cli/libs/dbr"
|
"github.com/databricks/cli/libs/dbr"
|
||||||
"github.com/databricks/cli/libs/filer"
|
"github.com/databricks/cli/libs/filer"
|
||||||
"github.com/databricks/cli/libs/git"
|
"github.com/databricks/cli/libs/git"
|
||||||
|
"github.com/databricks/cli/libs/log"
|
||||||
"github.com/databricks/cli/libs/telemetry"
|
"github.com/databricks/cli/libs/telemetry"
|
||||||
"github.com/databricks/cli/libs/template"
|
"github.com/databricks/cli/libs/template"
|
||||||
"github.com/databricks/databricks-sdk-go/client"
|
"github.com/databricks/databricks-sdk-go/client"
|
||||||
|
@ -212,6 +213,7 @@ See https://docs.databricks.com/en/dev-tools/bundles/templates.html for more inf
|
||||||
apiClient, err := client.New(w.Config)
|
apiClient, err := client.New(w.Config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Uploading telemetry is best effort. Do not error.
|
// Uploading telemetry is best effort. Do not error.
|
||||||
|
log.Debugf(ctx, "Could not create API client to send telemetry using: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,7 +69,7 @@ func TestBundleInitOnMlopsStacks(t *testing.T) {
|
||||||
testcli.RequireSuccessfulRun(t, ctx, "bundle", "init", "mlops-stacks", "--output-dir", tmpDir2, "--config-file", filepath.Join(tmpDir1, "config.json"))
|
testcli.RequireSuccessfulRun(t, ctx, "bundle", "init", "mlops-stacks", "--output-dir", tmpDir2, "--config-file", filepath.Join(tmpDir1, "config.json"))
|
||||||
|
|
||||||
// Assert the telemetry payload is correctly logged.
|
// Assert the telemetry payload is correctly logged.
|
||||||
logs, err := telemetry.GetLogs(ctx)
|
logs := telemetry.GetLogs(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 1, len(logs))
|
require.Equal(t, 1, len(logs))
|
||||||
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent
|
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent
|
||||||
|
@ -186,7 +186,7 @@ func TestBundleInitTelemetryForDefaultTemplates(t *testing.T) {
|
||||||
assert.DirExists(t, filepath.Join(tmpDir2, tc.args["project_name"]))
|
assert.DirExists(t, filepath.Join(tmpDir2, tc.args["project_name"]))
|
||||||
|
|
||||||
// Assert the telemetry payload is correctly logged.
|
// Assert the telemetry payload is correctly logged.
|
||||||
logs, err := telemetry.GetLogs(ctx)
|
logs := telemetry.GetLogs(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 1, len(logs))
|
require.Equal(t, 1, len(logs))
|
||||||
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent
|
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent
|
||||||
|
@ -241,7 +241,7 @@ func TestBundleInitTelemetryForCustomTemplates(t *testing.T) {
|
||||||
|
|
||||||
// Assert the telemetry payload is correctly logged. For custom templates we should
|
// Assert the telemetry payload is correctly logged. For custom templates we should
|
||||||
// never set template_enum_args.
|
// never set template_enum_args.
|
||||||
logs, err := telemetry.GetLogs(ctx)
|
logs := telemetry.GetLogs(ctx)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 1, len(logs))
|
require.Equal(t, 1, len(logs))
|
||||||
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent
|
event := logs[0].Entry.DatabricksCliLog.BundleInitEvent
|
||||||
|
|
|
@ -33,16 +33,13 @@ func (wrapper *apiClientWrapper) Do(ctx context.Context, method, path string,
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTelemetryLogger(t *testing.T) {
|
func TestTelemetryLogger(t *testing.T) {
|
||||||
events := []telemetry.FrontendLogEntry{
|
events := []telemetry.DatabricksCliLog{
|
||||||
{
|
{
|
||||||
DatabricksCliLog: telemetry.DatabricksCliLog{
|
|
||||||
CliTestEvent: &events.CliTestEvent{
|
CliTestEvent: &events.CliTestEvent{
|
||||||
Name: events.DummyCliEnumValue1,
|
Name: events.DummyCliEnumValue1,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
|
||||||
{
|
{
|
||||||
DatabricksCliLog: telemetry.DatabricksCliLog{
|
|
||||||
BundleInitEvent: &events.BundleInitEvent{
|
BundleInitEvent: &events.BundleInitEvent{
|
||||||
Uuid: uuid.New().String(),
|
Uuid: uuid.New().String(),
|
||||||
TemplateName: "abc",
|
TemplateName: "abc",
|
||||||
|
@ -52,7 +49,6 @@ func TestTelemetryLogger(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Equal(t, reflect.TypeOf(telemetry.DatabricksCliLog{}).NumField(), len(events),
|
assert.Equal(t, reflect.TypeOf(telemetry.DatabricksCliLog{}).NumField(), len(events),
|
||||||
|
@ -68,8 +64,7 @@ func TestTelemetryLogger(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
err := telemetry.Log(ctx, event)
|
telemetry.Log(ctx, event)
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
apiClient, err := client.New(w.W.Config)
|
apiClient, err := client.New(w.W.Config)
|
||||||
|
|
|
@ -18,7 +18,7 @@ func ContextWithLogger(ctx context.Context) context.Context {
|
||||||
return ctx
|
return ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
return context.WithValue(ctx, telemetryLoggerKey, &logger{protoLogs: []string{}})
|
return context.WithValue(ctx, telemetryLoggerKey, &logger{logs: []FrontendLog{}})
|
||||||
}
|
}
|
||||||
|
|
||||||
func fromContext(ctx context.Context) *logger {
|
func fromContext(ctx context.Context) *logger {
|
||||||
|
|
|
@ -3,7 +3,6 @@ package telemetry
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -18,50 +17,34 @@ type DatabricksApiClient interface {
|
||||||
visitors ...func(*http.Request) error) error
|
visitors ...func(*http.Request) error) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func Log(ctx context.Context, event FrontendLogEntry) error {
|
func Log(ctx context.Context, event DatabricksCliLog) {
|
||||||
l := fromContext(ctx)
|
l := fromContext(ctx)
|
||||||
|
|
||||||
FrontendLog := FrontendLog{
|
l.logs = append(l.logs, FrontendLog{
|
||||||
// The telemetry endpoint deduplicates logs based on the FrontendLogEventID.
|
// The telemetry endpoint deduplicates logs based on the FrontendLogEventID.
|
||||||
// This it's important to generate a unique ID for each log event.
|
// This it's important to generate a unique ID for each log event.
|
||||||
FrontendLogEventID: uuid.New().String(),
|
FrontendLogEventID: uuid.New().String(),
|
||||||
Entry: event,
|
Entry: FrontendLogEntry{
|
||||||
}
|
DatabricksCliLog: event,
|
||||||
|
},
|
||||||
protoLog, err := json.Marshal(FrontendLog)
|
})
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error marshalling the telemetry event: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
l.protoLogs = append(l.protoLogs, string(protoLog))
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type logger struct {
|
type logger struct {
|
||||||
protoLogs []string
|
logs []FrontendLog
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function is meant to be only to be used in tests to introspect the telemetry logs
|
// This function is meant to be only to be used in tests to introspect the telemetry logs
|
||||||
// that have been logged so far.
|
// that have been logged so far.
|
||||||
func GetLogs(ctx context.Context) ([]FrontendLog, error) {
|
func GetLogs(ctx context.Context) []FrontendLog {
|
||||||
l := fromContext(ctx)
|
l := fromContext(ctx)
|
||||||
res := []FrontendLog{}
|
return l.logs
|
||||||
|
|
||||||
for _, log := range l.protoLogs {
|
|
||||||
frontendLog := FrontendLog{}
|
|
||||||
err := json.Unmarshal([]byte(log), &frontendLog)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error unmarshalling the telemetry event: %v", err)
|
|
||||||
}
|
|
||||||
res = append(res, frontendLog)
|
|
||||||
}
|
|
||||||
return res, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Maximum additional time to wait for the telemetry event to flush. We expect the flush
|
// Maximum additional time to wait for the telemetry event to flush. We expect the flush
|
||||||
// method to be called when the CLI command is about to exist, so this caps the maximum
|
// method to be called when the CLI command is about to exist, so this caps the maximum
|
||||||
// additional time the user will experience because of us logging CLI telemetry.
|
// additional time the user will experience because of us logging CLI telemetry.
|
||||||
var MaxAdditionalWaitTime = 2 * time.Second
|
var MaxAdditionalWaitTime = 5 * time.Second
|
||||||
|
|
||||||
// We make the API call to the /telemetry-ext endpoint to log the CLI telemetry events
|
// We make the API call to the /telemetry-ext endpoint to log the CLI telemetry events
|
||||||
// right about as the CLI command is about to exit. The API endpoint can handle
|
// right about as the CLI command is about to exit. The API endpoint can handle
|
||||||
|
@ -73,11 +56,22 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
l := fromContext(ctx)
|
l := fromContext(ctx)
|
||||||
|
|
||||||
if len(l.protoLogs) == 0 {
|
if len(l.logs) == 0 {
|
||||||
log.Debugf(ctx, "No telemetry events to flush")
|
log.Debugf(ctx, "No telemetry events to flush")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var protoLogs []string
|
||||||
|
for _, event := range l.logs {
|
||||||
|
s, err := json.Marshal(event)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf(ctx, "Error marshalling the telemetry event %v: %v", event, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
protoLogs = append(protoLogs, string(s))
|
||||||
|
}
|
||||||
|
|
||||||
resp := &ResponseBody{}
|
resp := &ResponseBody{}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
@ -91,7 +85,7 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) {
|
||||||
// Log the CLI telemetry events.
|
// Log the CLI telemetry events.
|
||||||
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{
|
err := apiClient.Do(ctx, http.MethodPost, "/telemetry-ext", nil, RequestBody{
|
||||||
UploadTime: time.Now().Unix(),
|
UploadTime: time.Now().Unix(),
|
||||||
ProtoLogs: l.protoLogs,
|
ProtoLogs: protoLogs,
|
||||||
|
|
||||||
// A bug in the telemetry API requires us to send an empty items array.
|
// A bug in the telemetry API requires us to send an empty items array.
|
||||||
// Otherwise we get an opaque 500 internal server error.
|
// Otherwise we get an opaque 500 internal server error.
|
||||||
|
@ -108,7 +102,7 @@ func Flush(ctx context.Context, apiClient DatabricksApiClient) {
|
||||||
//
|
//
|
||||||
// Note: This will result in server side duplications but that's fine since
|
// Note: This will result in server side duplications but that's fine since
|
||||||
// we can always deduplicate in the data pipeline itself.
|
// we can always deduplicate in the data pipeline itself.
|
||||||
if len(l.protoLogs) > int(resp.NumProtoSuccess) {
|
if len(l.logs) > int(resp.NumProtoSuccess) {
|
||||||
log.Debugf(ctx, "Not all logs were successfully sent. Retrying...")
|
log.Debugf(ctx, "Not all logs were successfully sent. Retrying...")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"github.com/databricks/cli/libs/telemetry/events"
|
"github.com/databricks/cli/libs/telemetry/events"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockDatabricksClient struct {
|
type mockDatabricksClient struct {
|
||||||
|
@ -68,10 +67,9 @@ func TestTelemetryLoggerFlushesEvents(t *testing.T) {
|
||||||
ctx := ContextWithLogger(context.Background())
|
ctx := ContextWithLogger(context.Background())
|
||||||
|
|
||||||
for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} {
|
for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} {
|
||||||
err := Log(ctx, FrontendLogEntry{DatabricksCliLog: DatabricksCliLog{
|
Log(ctx, DatabricksCliLog{
|
||||||
CliTestEvent: &events.CliTestEvent{Name: v},
|
CliTestEvent: &events.CliTestEvent{Name: v},
|
||||||
}})
|
})
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush the events.
|
// Flush the events.
|
||||||
|
@ -102,10 +100,9 @@ func TestTelemetryLoggerFlushExitsOnTimeout(t *testing.T) {
|
||||||
ctx := ContextWithLogger(context.Background())
|
ctx := ContextWithLogger(context.Background())
|
||||||
|
|
||||||
for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} {
|
for _, v := range []events.DummyCliEnum{events.DummyCliEnumValue1, events.DummyCliEnumValue2, events.DummyCliEnumValue2, events.DummyCliEnumValue3} {
|
||||||
err := Log(ctx, FrontendLogEntry{DatabricksCliLog: DatabricksCliLog{
|
Log(ctx, DatabricksCliLog{
|
||||||
CliTestEvent: &events.CliTestEvent{Name: v},
|
CliTestEvent: &events.CliTestEvent{Name: v},
|
||||||
}})
|
})
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush the events.
|
// Flush the events.
|
||||||
|
|
|
@ -124,9 +124,8 @@ func (t *Template) logTelemetry(ctx context.Context) error {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
return telemetry.Log(ctx, telemetry.FrontendLogEntry{
|
telemetry.Log(ctx, event)
|
||||||
DatabricksCliLog: event,
|
return nil
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function materializes the input templates as a project, using user defined
|
// This function materializes the input templates as a project, using user defined
|
||||||
|
|
Loading…
Reference in New Issue