feat: add telemetry to user-scoped tailnet API call (#17065)

Adds support for sending telemetry on calls to the User-scoped tailnet RPC endpoint. This is currently used only by Coder Desktop.

Later PRs will fill in the version, OS information, and device ID via HTTP headers.
This commit is contained in:
Spike Curtis
2025-03-24 16:02:33 +04:00
committed by GitHub
parent 6bf22f8dc6
commit e0ecc28638
3 changed files with 122 additions and 7 deletions

View File

@ -52,6 +52,8 @@ import (
"cdr.dev/slog"
"cdr.dev/slog/sloggers/sloghuman"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/quartz"
"github.com/coder/coder/v2/coderd"
"github.com/coder/coder/v2/coderd/audit"
"github.com/coder/coder/v2/coderd/autobuild"
@ -91,7 +93,6 @@ import (
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/testutil"
"github.com/coder/quartz"
)
type Options struct {
@ -170,6 +171,7 @@ type Options struct {
APIKeyEncryptionCache cryptokeys.EncryptionKeycache
OIDCConvertKeyCache cryptokeys.SigningKeycache
Clock quartz.Clock
TelemetryReporter telemetry.Reporter
}
// New constructs a codersdk client connected to an in-memory API instance.
@ -358,6 +360,10 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
hangDetector.Start()
t.Cleanup(hangDetector.Close)
if options.TelemetryReporter == nil {
options.TelemetryReporter = telemetry.NewNoop()
}
// Did last_used_at not update? Scratching your noggin? Here's why.
// Workspace usage tracking must be triggered manually in tests.
// The vast majority of existing tests do not depend on last_used_at
@ -517,7 +523,7 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
LoginRateLimit: options.LoginRateLimit,
FilesRateLimit: options.FilesRateLimit,
Authorizer: options.Authorizer,
Telemetry: telemetry.NewNoop(),
Telemetry: options.TelemetryReporter,
TemplateScheduleStore: &templateScheduleStore,
AccessControlStore: accessControlStore,
TLSCertificates: options.TLSCertificates,

View File

@ -23,6 +23,8 @@ import (
"tailscale.com/tailcfg"
"cdr.dev/slog"
"github.com/coder/websocket"
"github.com/coder/coder/v2/coderd/agentapi"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/db2sdk"
@ -34,6 +36,7 @@ import (
"github.com/coder/coder/v2/coderd/jwtutils"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/telemetry"
maputil "github.com/coder/coder/v2/coderd/util/maps"
"github.com/coder/coder/v2/coderd/wspubsub"
"github.com/coder/coder/v2/codersdk"
@ -42,7 +45,6 @@ import (
"github.com/coder/coder/v2/codersdk/wsjson"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/websocket"
)
// @Summary Get workspace agent by ID
@ -1635,6 +1637,33 @@ func (api *API) tailnetRPCConn(rw http.ResponseWriter, r *http.Request) {
defer wsNetConn.Close()
defer conn.Close(websocket.StatusNormalClosure, "")
// Get user ID for telemetry
apiKey := httpmw.APIKey(r)
userID := apiKey.UserID.String()
// Store connection telemetry event
now := time.Now()
connectionTelemetryEvent := telemetry.UserTailnetConnection{
ConnectedAt: now,
DisconnectedAt: nil,
UserID: userID,
PeerID: peerID.String(),
DeviceID: nil,
DeviceOS: nil,
CoderDesktopVersion: nil,
}
api.Telemetry.Report(&telemetry.Snapshot{
UserTailnetConnections: []telemetry.UserTailnetConnection{connectionTelemetryEvent},
})
defer func() {
// Update telemetry event with disconnection time
disconnectTime := time.Now()
connectionTelemetryEvent.DisconnectedAt = &disconnectTime
api.Telemetry.Report(&telemetry.Snapshot{
UserTailnetConnections: []telemetry.UserTailnetConnection{connectionTelemetryEvent},
})
}()
go httpapi.Heartbeat(ctx, conn)
err = api.TailnetClientService.ServeClient(ctx, version, wsNetConn, tailnet.StreamID{
Name: "client",

View File

@ -29,6 +29,9 @@ import (
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/quartz"
"github.com/coder/websocket"
"github.com/coder/coder/v2/agent"
"github.com/coder/coder/v2/agent/agentcontainers"
"github.com/coder/coder/v2/agent/agentcontainers/acmock"
@ -47,6 +50,7 @@ import (
"github.com/coder/coder/v2/coderd/externalauth"
"github.com/coder/coder/v2/coderd/jwtutils"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/telemetry"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
@ -56,8 +60,6 @@ import (
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/coder/v2/tailnet/tailnettest"
"github.com/coder/coder/v2/testutil"
"github.com/coder/quartz"
"github.com/coder/websocket"
)
func TestWorkspaceAgent(t *testing.T) {
@ -2133,8 +2135,12 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) {
ctx := testutil.Context(t, testutil.WaitLong)
logger := testutil.Logger(t)
fTelemetry := newFakeTelemetryReporter(ctx, t, 200)
fTelemetry.enabled = false
firstClient, _, api := coderdtest.NewWithAPI(t, &coderdtest.Options{
Coordinator: tailnet.NewCoordinator(logger),
Coordinator: tailnet.NewCoordinator(logger),
TelemetryReporter: fTelemetry,
})
firstUser := coderdtest.CreateFirstUser(t, firstClient)
member, memberUser := coderdtest.CreateAnotherUser(t, firstClient, firstUser.OrganizationID, rbac.RoleTemplateAdmin())
@ -2142,12 +2148,17 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) {
// Create a workspace with an agent
firstWorkspace := buildWorkspaceWithAgent(t, member, firstUser.OrganizationID, memberUser.ID, api.Database, api.Pubsub)
// enable telemetry now that workspace is built; we don't care about snapshots before this.
fTelemetry.enabled = true
u, err := member.URL.Parse("/api/v2/tailnet")
require.NoError(t, err)
q := u.Query()
q.Set("version", "2.0")
u.RawQuery = q.Encode()
predialTime := time.Now()
//nolint:bodyclose // websocket package closes this for you
wsConn, resp, err := websocket.Dial(ctx, u.String(), &websocket.DialOptions{
HTTPHeader: http.Header{
@ -2155,13 +2166,22 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) {
},
})
if err != nil {
if resp.StatusCode != http.StatusSwitchingProtocols {
if resp != nil && resp.StatusCode != http.StatusSwitchingProtocols {
err = codersdk.ReadBodyAsError(resp)
}
require.NoError(t, err)
}
defer wsConn.Close(websocket.StatusNormalClosure, "done")
// Check telemetry
snapshot := testutil.RequireRecvCtx(ctx, t, fTelemetry.snapshots)
require.Len(t, snapshot.UserTailnetConnections, 1)
telemetryConnection := snapshot.UserTailnetConnections[0]
require.Equal(t, memberUser.ID.String(), telemetryConnection.UserID)
require.GreaterOrEqual(t, telemetryConnection.ConnectedAt, predialTime)
require.LessOrEqual(t, telemetryConnection.ConnectedAt, time.Now())
require.NotEmpty(t, telemetryConnection.PeerID)
rpcClient, err := tailnet.NewDRPCClient(
websocket.NetConn(ctx, wsConn, websocket.MessageBinary),
logger,
@ -2209,6 +2229,23 @@ func TestOwnedWorkspacesCoordinate(t *testing.T) {
NumAgents: 0,
},
})
err = stream.Close()
require.NoError(t, err)
beforeDisconnectTime := time.Now()
err = wsConn.Close(websocket.StatusNormalClosure, "done")
require.NoError(t, err)
snapshot = testutil.RequireRecvCtx(ctx, t, fTelemetry.snapshots)
require.Len(t, snapshot.UserTailnetConnections, 1)
telemetryDisconnection := snapshot.UserTailnetConnections[0]
require.Equal(t, memberUser.ID.String(), telemetryDisconnection.UserID)
require.Equal(t, telemetryConnection.ConnectedAt, telemetryDisconnection.ConnectedAt)
require.Equal(t, telemetryConnection.UserID, telemetryDisconnection.UserID)
require.Equal(t, telemetryConnection.PeerID, telemetryDisconnection.PeerID)
require.NotNil(t, telemetryDisconnection.DisconnectedAt)
require.GreaterOrEqual(t, *telemetryDisconnection.DisconnectedAt, beforeDisconnectTime)
require.LessOrEqual(t, *telemetryDisconnection.DisconnectedAt, time.Now())
}
func buildWorkspaceWithAgent(
@ -2334,3 +2371,46 @@ func waitForUpdates(
t.Fatal("Timeout waiting for desired state", currentState)
}
}
// fakeTelemetryReporter is a fake implementation of telemetry.Reporter
// that sends snapshots on a buffered channel, useful for testing.
type fakeTelemetryReporter struct {
enabled bool
snapshots chan *telemetry.Snapshot
t testing.TB
ctx context.Context
}
// newFakeTelemetryReporter creates a new fakeTelemetryReporter with a buffered channel.
// The buffer size determines how many snapshots can be reported before blocking.
func newFakeTelemetryReporter(ctx context.Context, t testing.TB, bufferSize int) *fakeTelemetryReporter {
return &fakeTelemetryReporter{
enabled: true,
snapshots: make(chan *telemetry.Snapshot, bufferSize),
ctx: ctx,
t: t,
}
}
// Report implements the telemetry.Reporter interface by sending the snapshot
// to the snapshots channel.
func (f *fakeTelemetryReporter) Report(snapshot *telemetry.Snapshot) {
if !f.enabled {
return
}
select {
case f.snapshots <- snapshot:
// Successfully sent
case <-f.ctx.Done():
f.t.Error("context closed while writing snapshot")
}
}
// Enabled implements the telemetry.Reporter interface.
func (f *fakeTelemetryReporter) Enabled() bool {
return f.enabled
}
// Close implements the telemetry.Reporter interface.
func (*fakeTelemetryReporter) Close() {}