chore: add additional network telemetry stats & events (#13800)

This commit is contained in:
Ethan
2024-07-10 14:14:35 +10:00
committed by GitHub
parent 38035da846
commit e8db21c89e
9 changed files with 282 additions and 82 deletions

View File

@ -437,7 +437,7 @@ func (r *RootCmd) ssh() *serpent.Command {
} }
err = sshSession.Wait() err = sshSession.Wait()
conn.SendDisconnectedTelemetry("ssh") conn.SendDisconnectedTelemetry()
if err != nil { if err != nil {
if exitErr := (&gossh.ExitError{}); errors.As(err, &exitErr) { if exitErr := (&gossh.ExitError{}); errors.As(err, &exitErr) {
// Clear the error since it's not useful beyond // Clear the error since it's not useful beyond

View File

@ -1240,7 +1240,7 @@ type NetworkEvent struct {
NodeIDSelf uint64 `json:"node_id_self"` NodeIDSelf uint64 `json:"node_id_self"`
NodeIDRemote uint64 `json:"node_id_remote"` NodeIDRemote uint64 `json:"node_id_remote"`
P2PEndpoint NetworkEventP2PEndpoint `json:"p2p_endpoint"` P2PEndpoint NetworkEventP2PEndpoint `json:"p2p_endpoint"`
HomeDERP string `json:"home_derp"` HomeDERP int `json:"home_derp"`
DERPMap DERPMap `json:"derp_map"` DERPMap DERPMap `json:"derp_map"`
LatestNetcheck Netcheck `json:"latest_netcheck"` LatestNetcheck Netcheck `json:"latest_netcheck"`
@ -1286,7 +1286,7 @@ func NetworkEventFromProto(proto *tailnetproto.TelemetryEvent) (NetworkEvent, er
NodeIDSelf: proto.NodeIdSelf, NodeIDSelf: proto.NodeIdSelf,
NodeIDRemote: proto.NodeIdRemote, NodeIDRemote: proto.NodeIdRemote,
P2PEndpoint: p2pEndpointFromProto(proto.P2PEndpoint), P2PEndpoint: p2pEndpointFromProto(proto.P2PEndpoint),
HomeDERP: proto.HomeDerp, HomeDERP: int(proto.HomeDerp),
DERPMap: derpMapFromProto(proto.DerpMap), DERPMap: derpMapFromProto(proto.DerpMap),
LatestNetcheck: netcheckFromProto(proto.LatestNetcheck), LatestNetcheck: netcheckFromProto(proto.LatestNetcheck),

View File

@ -380,7 +380,3 @@ func (c *AgentConn) apiClient() *http.Client {
func (c *AgentConn) GetPeerDiagnostics() tailnet.PeerDiagnostics { func (c *AgentConn) GetPeerDiagnostics() tailnet.PeerDiagnostics {
return c.Conn.GetPeerDiagnostics(c.opts.AgentID) return c.Conn.GetPeerDiagnostics(c.opts.AgentID)
} }
func (c *AgentConn) SendDisconnectedTelemetry(application string) {
c.Conn.SendDisconnectedTelemetry(c.agentAddress(), application)
}

View File

@ -228,12 +228,12 @@ func TestTailnetAPIConnector_TelemetryUnimplemented(t *testing.T) {
return uut.client != nil return uut.client != nil
}, testutil.WaitShort, testutil.IntervalFast) }, testutil.WaitShort, testutil.IntervalFast)
fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), 0) fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), 0)
uut.SendTelemetryEvent(&proto.TelemetryEvent{}) uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.False(t, uut.telemetryUnavailable.Load()) require.False(t, uut.telemetryUnavailable.Load())
require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls)) require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
fakeDRPCClient.telemeteryErorr = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented) fakeDRPCClient.telemetryError = drpcerr.WithCode(xerrors.New("Unimplemented"), drpcerr.Unimplemented)
uut.SendTelemetryEvent(&proto.TelemetryEvent{}) uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.True(t, uut.telemetryUnavailable.Load()) require.True(t, uut.telemetryUnavailable.Load())
uut.SendTelemetryEvent(&proto.TelemetryEvent{}) uut.SendTelemetryEvent(&proto.TelemetryEvent{})
@ -268,12 +268,12 @@ func TestTailnetAPIConnector_TelemetryNotRecognised(t *testing.T) {
return uut.client != nil return uut.client != nil
}, testutil.WaitShort, testutil.IntervalFast) }, testutil.WaitShort, testutil.IntervalFast)
fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("Protocol Error") fakeDRPCClient.telemetryError = drpc.ProtocolError.New("Protocol Error")
uut.SendTelemetryEvent(&proto.TelemetryEvent{}) uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.False(t, uut.telemetryUnavailable.Load()) require.False(t, uut.telemetryUnavailable.Load())
require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls)) require.Equal(t, int64(1), atomic.LoadInt64(&fakeDRPCClient.postTelemetryCalls))
fakeDRPCClient.telemeteryErorr = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry") fakeDRPCClient.telemetryError = drpc.ProtocolError.New("unknown rpc: /coder.tailnet.v2.Tailnet/PostTelemetry")
uut.SendTelemetryEvent(&proto.TelemetryEvent{}) uut.SendTelemetryEvent(&proto.TelemetryEvent{})
require.True(t, uut.telemetryUnavailable.Load()) require.True(t, uut.telemetryUnavailable.Load())
uut.SendTelemetryEvent(&proto.TelemetryEvent{}) uut.SendTelemetryEvent(&proto.TelemetryEvent{})
@ -301,7 +301,7 @@ func newFakeTailnetConn() *fakeTailnetConn {
type fakeDRPCClient struct { type fakeDRPCClient struct {
postTelemetryCalls int64 postTelemetryCalls int64
telemeteryErorr error telemetryError error
fakeDRPPCMapStream fakeDRPPCMapStream
} }
@ -331,7 +331,7 @@ func (*fakeDRPCClient) DRPCConn() drpc.Conn {
// PostTelemetry implements proto.DRPCTailnetClient. // PostTelemetry implements proto.DRPCTailnetClient.
func (f *fakeDRPCClient) PostTelemetry(_ context.Context, _ *proto.TelemetryRequest) (*proto.TelemetryResponse, error) { func (f *fakeDRPCClient) PostTelemetry(_ context.Context, _ *proto.TelemetryRequest) (*proto.TelemetryResponse, error) {
atomic.AddInt64(&f.postTelemetryCalls, 1) atomic.AddInt64(&f.postTelemetryCalls, 1)
return nil, f.telemeteryErorr return nil, f.telemetryError
} }
// StreamDERPMaps implements proto.DRPCTailnetClient. // StreamDERPMaps implements proto.DRPCTailnetClient.

View File

@ -31,6 +31,7 @@ import (
"tailscale.com/types/key" "tailscale.com/types/key"
tslogger "tailscale.com/types/logger" tslogger "tailscale.com/types/logger"
"tailscale.com/types/netlogtype" "tailscale.com/types/netlogtype"
"tailscale.com/types/netmap"
"tailscale.com/wgengine" "tailscale.com/wgengine"
"tailscale.com/wgengine/capture" "tailscale.com/wgengine/capture"
"tailscale.com/wgengine/magicsock" "tailscale.com/wgengine/magicsock"
@ -262,17 +263,8 @@ func NewConn(options *Options) (conn *Conn, err error) {
) )
nodeUp.setAddresses(options.Addresses) nodeUp.setAddresses(options.Addresses)
nodeUp.setBlockEndpoints(options.BlockEndpoints) nodeUp.setBlockEndpoints(options.BlockEndpoints)
wireguardEngine.SetStatusCallback(nodeUp.setStatus)
magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket)
if telemetryStore != nil {
wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) {
nodeUp.setNetInfo(ni)
telemetryStore.setNetInfo(ni)
})
} else {
wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo)
}
ctx, ctxCancel := context.WithCancel(context.Background())
server := &Conn{ server := &Conn{
id: uuid.New(), id: uuid.New(),
closed: make(chan struct{}), closed: make(chan struct{}),
@ -290,13 +282,32 @@ func NewConn(options *Options) (conn *Conn, err error) {
configMaps: cfgMaps, configMaps: cfgMaps,
nodeUpdater: nodeUp, nodeUpdater: nodeUp,
telemetrySink: options.TelemetrySink, telemetrySink: options.TelemetrySink,
telemeteryStore: telemetryStore, telemetryStore: telemetryStore,
createdAt: time.Now(),
watchCtx: ctx,
watchCancel: ctxCancel,
} }
defer func() { defer func() {
if err != nil { if err != nil {
_ = server.Close() _ = server.Close()
} }
}() }()
if server.telemetryStore != nil {
server.wireguardEngine.SetNetInfoCallback(func(ni *tailcfg.NetInfo) {
server.telemetryStore.setNetInfo(ni)
nodeUp.setNetInfo(ni)
server.telemetryStore.pingPeer(server)
})
server.wireguardEngine.AddNetworkMapCallback(func(nm *netmap.NetworkMap) {
server.telemetryStore.updateNetworkMap(nm)
server.telemetryStore.pingPeer(server)
})
go server.watchConnChange()
} else {
server.wireguardEngine.SetNetInfoCallback(nodeUp.setNetInfo)
}
server.wireguardEngine.SetStatusCallback(nodeUp.setStatus)
server.magicConn.SetDERPForcedWebsocketCallback(nodeUp.setDERPForcedWebsocket)
netStack.GetTCPHandlerForFlow = server.forwardTCP netStack.GetTCPHandlerForFlow = server.forwardTCP
@ -351,11 +362,15 @@ type Conn struct {
wireguardEngine wgengine.Engine wireguardEngine wgengine.Engine
listeners map[listenKey]*listener listeners map[listenKey]*listener
clientType proto.TelemetryEvent_ClientType clientType proto.TelemetryEvent_ClientType
createdAt time.Time
telemetrySink TelemetrySink telemetrySink TelemetrySink
// telemeteryStore will be nil if telemetrySink is nil. // telemetryStore will be nil if telemetrySink is nil.
telemeteryStore *TelemetryStore telemetryStore *TelemetryStore
telemetryWg sync.WaitGroup telemetryWg sync.WaitGroup
watchCtx context.Context
watchCancel func()
trafficStats *connstats.Statistics trafficStats *connstats.Statistics
} }
@ -390,8 +405,8 @@ func (c *Conn) SetNodeCallback(callback func(node *Node)) {
// SetDERPMap updates the DERPMap of a connection. // SetDERPMap updates the DERPMap of a connection.
func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) { func (c *Conn) SetDERPMap(derpMap *tailcfg.DERPMap) {
if c.configMaps.setDERPMap(derpMap) && c.telemeteryStore != nil { if c.configMaps.setDERPMap(derpMap) && c.telemetryStore != nil {
c.telemeteryStore.updateDerpMap(derpMap) c.telemetryStore.updateDerpMap(derpMap)
} }
} }
@ -540,6 +555,7 @@ func (c *Conn) Closed() <-chan struct{} {
// Close shuts down the Wireguard connection. // Close shuts down the Wireguard connection.
func (c *Conn) Close() error { func (c *Conn) Close() error {
c.logger.Info(context.Background(), "closing tailnet Conn") c.logger.Info(context.Background(), "closing tailnet Conn")
c.watchCancel()
c.telemetryWg.Wait() c.telemetryWg.Wait()
c.configMaps.close() c.configMaps.close()
c.nodeUpdater.close() c.nodeUpdater.close()
@ -709,40 +725,24 @@ func (c *Conn) MagicsockServeHTTPDebug(w http.ResponseWriter, r *http.Request) {
c.magicConn.ServeHTTPDebug(w, r) c.magicConn.ServeHTTPDebug(w, r)
} }
// SendConnectedTelemetry should be called when connection to a peer with the given IP is established.
func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) { func (c *Conn) SendConnectedTelemetry(ip netip.Addr, application string) {
if c.telemetrySink == nil { if c.telemetrySink == nil {
return return
} }
c.telemetryStore.markConnected(&ip, application)
e := c.newTelemetryEvent() e := c.newTelemetryEvent()
e.Status = proto.TelemetryEvent_CONNECTED e.Status = proto.TelemetryEvent_CONNECTED
e.Application = application c.sendTelemetryBackground(e)
pip, ok := c.wireguardEngine.PeerForIP(ip)
if ok {
e.NodeIdRemote = uint64(pip.Node.ID)
}
c.telemetryWg.Add(1)
go func() {
defer c.telemetryWg.Done()
c.telemetrySink.SendTelemetryEvent(e)
}()
} }
func (c *Conn) SendDisconnectedTelemetry(ip netip.Addr, application string) { func (c *Conn) SendDisconnectedTelemetry() {
if c.telemetrySink == nil { if c.telemetrySink == nil {
return return
} }
e := c.newTelemetryEvent() e := c.newTelemetryEvent()
e.Status = proto.TelemetryEvent_DISCONNECTED e.Status = proto.TelemetryEvent_DISCONNECTED
e.Application = application c.sendTelemetryBackground(e)
pip, ok := c.wireguardEngine.PeerForIP(ip)
if ok {
e.NodeIdRemote = uint64(pip.Node.ID)
}
c.telemetryWg.Add(1)
go func() {
defer c.telemetryWg.Done()
c.telemetrySink.SendTelemetryEvent(e)
}()
} }
func (c *Conn) SendSpeedtestTelemetry(throughputMbits float64) { func (c *Conn) SendSpeedtestTelemetry(throughputMbits float64) {
@ -750,13 +750,9 @@ func (c *Conn) SendSpeedtestTelemetry(throughputMbits float64) {
return return
} }
e := c.newTelemetryEvent() e := c.newTelemetryEvent()
e.Status = proto.TelemetryEvent_CONNECTED
e.ThroughputMbits = wrapperspb.Float(float32(throughputMbits)) e.ThroughputMbits = wrapperspb.Float(float32(throughputMbits))
c.telemetryWg.Add(1) e.Status = proto.TelemetryEvent_CONNECTED
go func() { c.sendTelemetryBackground(e)
defer c.telemetryWg.Done()
c.telemetrySink.SendTelemetryEvent(e)
}()
} }
// nolint:revive // nolint:revive
@ -769,11 +765,26 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) {
latency := durationpb.New(time.Duration(pr.LatencySeconds * float64(time.Second))) latency := durationpb.New(time.Duration(pr.LatencySeconds * float64(time.Second)))
if pr.Endpoint != "" { if pr.Endpoint != "" {
e.P2PLatency = latency e.P2PLatency = latency
e.P2PEndpoint = c.telemeteryStore.toEndpoint(pr.Endpoint) e.P2PEndpoint = c.telemetryStore.toEndpoint(pr.Endpoint)
} else { } else {
e.DerpLatency = latency e.DerpLatency = latency
} }
e.Status = proto.TelemetryEvent_CONNECTED e.Status = proto.TelemetryEvent_CONNECTED
c.sendTelemetryBackground(e)
}
// The returned telemetry event will not have it's status set.
func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent {
// Infallible
id, _ := c.id.MarshalBinary()
event := c.telemetryStore.newEvent()
event.ClientType = c.clientType
event.Id = id
event.ConnectionAge = durationpb.New(time.Since(c.createdAt))
return event
}
func (c *Conn) sendTelemetryBackground(e *proto.TelemetryEvent) {
c.telemetryWg.Add(1) c.telemetryWg.Add(1)
go func() { go func() {
defer c.telemetryWg.Done() defer c.telemetryWg.Done()
@ -781,17 +792,30 @@ func (c *Conn) sendPingTelemetry(pr *ipnstate.PingResult) {
}() }()
} }
// The returned telemetry event will not have it's status set. // Watch for changes in the connection type (P2P<->DERP) and send telemetry events.
func (c *Conn) newTelemetryEvent() *proto.TelemetryEvent { func (c *Conn) watchConnChange() {
// Infallible ticker := time.NewTicker(time.Millisecond * 50)
id, _ := c.id.MarshalBinary() defer ticker.Stop()
event := c.telemeteryStore.newEvent() for {
event.ClientType = c.clientType select {
event.Id = id case <-c.watchCtx.Done():
selfNode := c.Node() return
event.NodeIdSelf = uint64(selfNode.ID) case <-ticker.C:
event.HomeDerp = strconv.Itoa(selfNode.PreferredDERP) }
return event status := c.Status()
peers := status.Peers()
if len(peers) > 1 {
// Not a CLI<->agent connection, stop watching
return
} else if len(peers) == 0 {
continue
}
peer := status.Peer[peers[0]]
// If the connection type has changed, send a telemetry event with the latest ping stats
if c.telemetryStore.changedConntype(peer.CurAddr) {
c.telemetryStore.pingPeer(c)
}
}
} }
// PeerDiagnostics is a checklist of human-readable conditions necessary to establish an encrypted // PeerDiagnostics is a checklist of human-readable conditions necessary to establish an encrypted

View File

@ -819,7 +819,7 @@ type TelemetryEvent struct {
NodeIdSelf uint64 `protobuf:"varint,7,opt,name=node_id_self,json=nodeIdSelf,proto3" json:"node_id_self,omitempty"` NodeIdSelf uint64 `protobuf:"varint,7,opt,name=node_id_self,json=nodeIdSelf,proto3" json:"node_id_self,omitempty"`
NodeIdRemote uint64 `protobuf:"varint,8,opt,name=node_id_remote,json=nodeIdRemote,proto3" json:"node_id_remote,omitempty"` NodeIdRemote uint64 `protobuf:"varint,8,opt,name=node_id_remote,json=nodeIdRemote,proto3" json:"node_id_remote,omitempty"`
P2PEndpoint *TelemetryEvent_P2PEndpoint `protobuf:"bytes,9,opt,name=p2p_endpoint,json=p2pEndpoint,proto3" json:"p2p_endpoint,omitempty"` P2PEndpoint *TelemetryEvent_P2PEndpoint `protobuf:"bytes,9,opt,name=p2p_endpoint,json=p2pEndpoint,proto3" json:"p2p_endpoint,omitempty"`
HomeDerp string `protobuf:"bytes,10,opt,name=home_derp,json=homeDerp,proto3" json:"home_derp,omitempty"` HomeDerp int32 `protobuf:"varint,10,opt,name=home_derp,json=homeDerp,proto3" json:"home_derp,omitempty"`
DerpMap *DERPMap `protobuf:"bytes,11,opt,name=derp_map,json=derpMap,proto3" json:"derp_map,omitempty"` DerpMap *DERPMap `protobuf:"bytes,11,opt,name=derp_map,json=derpMap,proto3" json:"derp_map,omitempty"`
LatestNetcheck *Netcheck `protobuf:"bytes,12,opt,name=latest_netcheck,json=latestNetcheck,proto3" json:"latest_netcheck,omitempty"` LatestNetcheck *Netcheck `protobuf:"bytes,12,opt,name=latest_netcheck,json=latestNetcheck,proto3" json:"latest_netcheck,omitempty"`
ConnectionAge *durationpb.Duration `protobuf:"bytes,13,opt,name=connection_age,json=connectionAge,proto3" json:"connection_age,omitempty"` ConnectionAge *durationpb.Duration `protobuf:"bytes,13,opt,name=connection_age,json=connectionAge,proto3" json:"connection_age,omitempty"`
@ -925,11 +925,11 @@ func (x *TelemetryEvent) GetP2PEndpoint() *TelemetryEvent_P2PEndpoint {
return nil return nil
} }
func (x *TelemetryEvent) GetHomeDerp() string { func (x *TelemetryEvent) GetHomeDerp() int32 {
if x != nil { if x != nil {
return x.HomeDerp return x.HomeDerp
} }
return "" return 0
} }
func (x *TelemetryEvent) GetDerpMap() *DERPMap { func (x *TelemetryEvent) GetDerpMap() *DERPMap {
@ -2006,7 +2006,7 @@ var file_tailnet_proto_tailnet_proto_rawDesc = []byte{
0x2e, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e, 0x2e, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x2e,
0x50, 0x32, 0x50, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x0b, 0x70, 0x32, 0x70, 0x50, 0x32, 0x50, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x0b, 0x70, 0x32, 0x70,
0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x68, 0x6f, 0x6d, 0x65, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x68, 0x6f, 0x6d, 0x65,
0x5f, 0x64, 0x65, 0x72, 0x70, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x68, 0x6f, 0x6d, 0x5f, 0x64, 0x65, 0x72, 0x70, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x68, 0x6f, 0x6d,
0x65, 0x44, 0x65, 0x72, 0x70, 0x12, 0x34, 0x0a, 0x08, 0x64, 0x65, 0x72, 0x70, 0x5f, 0x6d, 0x61, 0x65, 0x44, 0x65, 0x72, 0x70, 0x12, 0x34, 0x0a, 0x08, 0x64, 0x65, 0x72, 0x70, 0x5f, 0x6d, 0x61,
0x70, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x70, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e,
0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x45, 0x52, 0x50, 0x4d, 0x74, 0x61, 0x69, 0x6c, 0x6e, 0x65, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x44, 0x45, 0x52, 0x50, 0x4d,

View File

@ -169,7 +169,7 @@ message TelemetryEvent {
uint64 node_id_self = 7; uint64 node_id_self = 7;
uint64 node_id_remote = 8; uint64 node_id_remote = 8;
P2PEndpoint p2p_endpoint = 9; P2PEndpoint p2p_endpoint = 9;
string home_derp = 10; int32 home_derp = 10;
DERPMap derp_map = 11; DERPMap derp_map = 11;
Netcheck latest_netcheck = 12; Netcheck latest_netcheck = 12;

View File

@ -12,6 +12,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb" "google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb" "google.golang.org/protobuf/types/known/wrapperspb"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/netmap"
"github.com/coder/coder/v2/cryptorand" "github.com/coder/coder/v2/cryptorand"
"github.com/coder/coder/v2/tailnet/proto" "github.com/coder/coder/v2/tailnet/proto"
@ -20,6 +21,7 @@ import (
const ( const (
TelemetryApplicationSSH string = "ssh" TelemetryApplicationSSH string = "ssh"
TelemetryApplicationSpeedtest string = "speedtest" TelemetryApplicationSpeedtest string = "speedtest"
TelemetryApplicationVSCode string = "vscode"
) )
// Responsible for storing and anonymizing networking telemetry state. // Responsible for storing and anonymizing networking telemetry state.
@ -31,6 +33,19 @@ type TelemetryStore struct {
cleanDerpMap *tailcfg.DERPMap cleanDerpMap *tailcfg.DERPMap
cleanNetCheck *proto.Netcheck cleanNetCheck *proto.Netcheck
nodeIDSelf uint64
homeDerp int32
application string
// nil if not connected
connSetupTime *durationpb.Duration
connectedIP *netip.Addr
// 0 if not connected
nodeIDRemote uint64
p2p bool
p2pSetupTime time.Duration
lastDerpTime time.Time
} }
func newTelemetryStore() (*TelemetryStore, error) { func newTelemetryStore() (*TelemetryStore, error) {
@ -49,16 +64,90 @@ func (b *TelemetryStore) newEvent() *proto.TelemetryEvent {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
return &proto.TelemetryEvent{ out := &proto.TelemetryEvent{
Time: timestamppb.Now(), Time: timestamppb.Now(),
DerpMap: DERPMapToProto(b.cleanDerpMap), DerpMap: DERPMapToProto(b.cleanDerpMap),
LatestNetcheck: b.cleanNetCheck, LatestNetcheck: b.cleanNetCheck,
NodeIdSelf: b.nodeIDSelf,
// TODO(ethanndickson): NodeIdRemote: b.nodeIDRemote,
ConnectionAge: &durationpb.Duration{}, HomeDerp: b.homeDerp,
ConnectionSetup: &durationpb.Duration{}, ConnectionSetup: b.connSetupTime,
P2PSetup: &durationpb.Duration{}, Application: b.application,
} }
if b.p2pSetupTime > 0 {
out.P2PSetup = durationpb.New(b.p2pSetupTime)
}
return out
}
func (b *TelemetryStore) markConnected(ip *netip.Addr, application string) {
b.mu.Lock()
defer b.mu.Unlock()
b.lastDerpTime = time.Now()
b.connectedIP = ip
b.application = application
}
func (b *TelemetryStore) pingPeer(conn *Conn) {
b.mu.Lock()
defer b.mu.Unlock()
if b.connectedIP == nil {
return
}
ip := *b.connectedIP
go func() {
_, _, _, _ = conn.Ping(conn.watchCtx, ip)
}()
}
func (b *TelemetryStore) changedConntype(addr string) bool {
b.mu.Lock()
defer b.mu.Unlock()
if b.p2p && addr != "" {
return false
} else if !b.p2p && addr != "" {
b.p2p = true
b.p2pSetupTime = time.Since(b.lastDerpTime)
return true
} else if b.p2p && addr == "" {
b.p2p = false
b.lastDerpTime = time.Now()
b.p2pSetupTime = 0
return true
}
return false
}
func (b *TelemetryStore) updateRemoteNodeIDLocked(nm *netmap.NetworkMap) {
if b.connectedIP == nil {
return
}
ip := *b.connectedIP
for _, p := range nm.Peers {
for _, a := range p.Addresses {
if a.Addr() == ip && a.IsSingleIP() {
b.nodeIDRemote = uint64(p.ID)
}
}
}
}
func (b *TelemetryStore) updateNetworkMap(nm *netmap.NetworkMap) {
b.mu.Lock()
defer b.mu.Unlock()
if nm == nil {
return
}
b.updateDerpMapLocked(nm.DERPMap)
b.updateRemoteNodeIDLocked(nm)
b.updateByNodeLocked(nm.SelfNode)
} }
// Given a DERPMap, anonymise all IPs and hostnames. // Given a DERPMap, anonymise all IPs and hostnames.
@ -67,6 +156,14 @@ func (b *TelemetryStore) newEvent() *proto.TelemetryEvent {
func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) { func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
b.updateDerpMapLocked(cur)
}
func (b *TelemetryStore) updateDerpMapLocked(cur *tailcfg.DERPMap) {
if cur == nil {
return
}
cleanMap := cur.Clone() cleanMap := cur.Clone()
for _, r := range cleanMap.Regions { for _, r := range cleanMap.Regions {
for _, n := range r.Nodes { for _, n := range r.Nodes {
@ -85,6 +182,25 @@ func (b *TelemetryStore) updateDerpMap(cur *tailcfg.DERPMap) {
b.cleanDerpMap = cleanMap b.cleanDerpMap = cleanMap
} }
// Update the telemetry store with the current self node state.
// Returns true if the home DERP has changed.
func (b *TelemetryStore) updateByNodeLocked(n *tailcfg.Node) bool {
if n == nil {
return false
}
b.nodeIDSelf = uint64(n.ID)
derpIP, err := netip.ParseAddrPort(n.DERP)
if err != nil {
return false
}
newHome := int32(derpIP.Port())
if b.homeDerp != newHome {
b.homeDerp = newHome
return true
}
return false
}
// Store an anonymized proto.Netcheck given a tailscale NetInfo. // Store an anonymized proto.Netcheck given a tailscale NetInfo.
func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) { func (b *TelemetryStore) setNetInfo(ni *tailcfg.NetInfo) {
b.mu.Lock() b.mu.Lock()

View File

@ -1,10 +1,13 @@
package tailnet package tailnet
import ( import (
"fmt"
"net/netip"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"tailscale.com/tailcfg" "tailscale.com/tailcfg"
"tailscale.com/types/netmap"
"github.com/coder/coder/v2/tailnet/proto" "github.com/coder/coder/v2/tailnet/proto"
) )
@ -12,6 +15,67 @@ import (
func TestTelemetryStore(t *testing.T) { func TestTelemetryStore(t *testing.T) {
t.Parallel() t.Parallel()
t.Run("CreateEvent", func(t *testing.T) {
t.Parallel()
remotePrefix := netip.PrefixFrom(IP(), 128)
remoteIP := remotePrefix.Addr()
application := "test"
nm := &netmap.NetworkMap{
SelfNode: &tailcfg.Node{
ID: 0,
DERP: "127.3.3.40:999",
},
Peers: []*tailcfg.Node{
{
ID: 1,
Addresses: []netip.Prefix{
netip.PrefixFrom(IP(), 128),
netip.PrefixFrom(IP(), 128),
},
},
{
ID: 2,
Addresses: []netip.Prefix{
remotePrefix,
netip.PrefixFrom(IP(), 128),
netip.PrefixFrom(IP(), 128),
},
},
},
DERPMap: &tailcfg.DERPMap{
HomeParams: &tailcfg.DERPHomeParams{
RegionScore: map[int]float64{
999: 1.0,
},
},
Regions: map[int]*tailcfg.DERPRegion{
999: {
RegionID: 999,
RegionCode: "zzz",
RegionName: "Cool Region",
EmbeddedRelay: true,
Avoid: false,
},
},
OmitDefaultRegions: false,
},
}
telemetry, err := newTelemetryStore()
require.NoError(t, err)
telemetry.markConnected(&remoteIP, application)
telemetry.updateNetworkMap(nm)
e := telemetry.newEvent()
// DERPMapToProto already tested
require.Equal(t, DERPMapToProto(nm.DERPMap), e.DerpMap)
require.Equal(t, uint64(nm.Peers[1].ID), e.NodeIdRemote)
require.Equal(t, uint64(nm.SelfNode.ID), e.NodeIdSelf)
require.Equal(t, application, e.Application)
require.Equal(t, nm.SelfNode.DERP, fmt.Sprintf("127.3.3.40:%d", e.HomeDerp))
})
t.Run("CleanIPs", func(t *testing.T) { t.Run("CleanIPs", func(t *testing.T) {
t.Parallel() t.Parallel()