mirror of
https://github.com/coder/coder.git
synced 2025-03-16 23:40:29 +00:00
chore: add support for peer updates to tailnet.configMaps (#11487)
Adds support to configMaps to handle peer updates including lost and disconnected peers
This commit is contained in:
2
go.mod
2
go.mod
@ -206,6 +206,8 @@ require (
|
||||
|
||||
require go.uber.org/mock v0.4.0
|
||||
|
||||
require github.com/benbjohnson/clock v1.3.5 // indirect
|
||||
|
||||
require (
|
||||
cloud.google.com/go/compute v1.23.3 // indirect
|
||||
cloud.google.com/go/logging v1.8.1 // indirect
|
||||
|
2
go.sum
2
go.sum
@ -123,6 +123,8 @@ github.com/aymanbagabas/go-osc52/v2 v2.0.1 h1:HwpRHbFMcZLEVr42D4p7XBqjyuxQH5SMiE
|
||||
github.com/aymanbagabas/go-osc52/v2 v2.0.1/go.mod h1:uYgXzlJ7ZpABp8OJ+exZzJJhRNQ2ASbcXHWsFqH8hp8=
|
||||
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
|
||||
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
|
||||
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
|
||||
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bep/godartsass v1.2.0 h1:E2VvQrxAHAFwbjyOIExAMmogTItSKodoKuijNrGm5yU=
|
||||
|
@ -3,11 +3,15 @@ package tailnet
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/google/uuid"
|
||||
"go4.org/netipx"
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
"tailscale.com/net/dns"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/ipproto"
|
||||
@ -23,10 +27,13 @@ import (
|
||||
"github.com/coder/coder/v2/tailnet/proto"
|
||||
)
|
||||
|
||||
const lostTimeout = 15 * time.Minute
|
||||
|
||||
// engineConfigurable is the subset of wgengine.Engine that we use for configuration.
|
||||
//
|
||||
// This allows us to test configuration code without faking the whole interface.
|
||||
type engineConfigurable interface {
|
||||
UpdateStatus(*ipnstate.StatusBuilder)
|
||||
SetNetworkMap(*netmap.NetworkMap)
|
||||
Reconfig(*wgcfg.Config, *router.Config, *dns.Config, *tailcfg.Debug) error
|
||||
SetDERPMap(*tailcfg.DERPMap)
|
||||
@ -49,12 +56,16 @@ type configMaps struct {
|
||||
closing bool
|
||||
phase phase
|
||||
|
||||
engine engineConfigurable
|
||||
static netmap.NetworkMap
|
||||
peers map[uuid.UUID]*peerLifecycle
|
||||
addresses []netip.Prefix
|
||||
derpMap *proto.DERPMap
|
||||
logger slog.Logger
|
||||
engine engineConfigurable
|
||||
static netmap.NetworkMap
|
||||
peers map[uuid.UUID]*peerLifecycle
|
||||
addresses []netip.Prefix
|
||||
derpMap *proto.DERPMap
|
||||
logger slog.Logger
|
||||
blockEndpoints bool
|
||||
|
||||
// for testing
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg.NodeID, nodeKey key.NodePrivate, discoKey key.DiscoPublic, addresses []netip.Prefix) *configMaps {
|
||||
@ -101,6 +112,7 @@ func newConfigMaps(logger slog.Logger, engine engineConfigurable, nodeID tailcfg
|
||||
},
|
||||
peers: make(map[uuid.UUID]*peerLifecycle),
|
||||
addresses: addresses,
|
||||
clock: clock.New(),
|
||||
}
|
||||
go c.configLoop()
|
||||
return c
|
||||
@ -165,6 +177,9 @@ func (c *configMaps) configLoop() {
|
||||
func (c *configMaps) close() {
|
||||
c.L.Lock()
|
||||
defer c.L.Unlock()
|
||||
for _, lc := range c.peers {
|
||||
lc.resetTimer()
|
||||
}
|
||||
c.closing = true
|
||||
c.Broadcast()
|
||||
for c.phase != closed {
|
||||
@ -260,11 +275,208 @@ func (c *configMaps) filterLocked() *filter.Filter {
|
||||
)
|
||||
}
|
||||
|
||||
// updatePeers handles protocol updates about peers from the coordinator. c.L MUST NOT be held.
|
||||
func (c *configMaps) updatePeers(updates []*proto.CoordinateResponse_PeerUpdate) {
|
||||
status := c.status()
|
||||
c.L.Lock()
|
||||
defer c.L.Unlock()
|
||||
|
||||
// Update all the lastHandshake values here. That way we don't have to
|
||||
// worry about them being up-to-date when handling updates below, and it covers
|
||||
// all peers, not just the ones we got updates about.
|
||||
for _, lc := range c.peers {
|
||||
if peerStatus, ok := status.Peer[lc.node.Key]; ok {
|
||||
lc.lastHandshake = peerStatus.LastHandshake
|
||||
}
|
||||
}
|
||||
|
||||
for _, update := range updates {
|
||||
if dirty := c.updatePeerLocked(update, status); dirty {
|
||||
c.netmapDirty = true
|
||||
}
|
||||
}
|
||||
if c.netmapDirty {
|
||||
c.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
// status requests a status update from the engine.
|
||||
func (c *configMaps) status() *ipnstate.Status {
|
||||
sb := &ipnstate.StatusBuilder{WantPeers: true}
|
||||
c.engine.UpdateStatus(sb)
|
||||
return sb.Status()
|
||||
}
|
||||
|
||||
// updatePeerLocked processes a single update for a single peer. It is intended
|
||||
// as internal function since it returns whether or not the config is dirtied by
|
||||
// the update (instead of handling it directly like updatePeers). c.L must be held.
|
||||
func (c *configMaps) updatePeerLocked(update *proto.CoordinateResponse_PeerUpdate, status *ipnstate.Status) (dirty bool) {
|
||||
id, err := uuid.FromBytes(update.Id)
|
||||
if err != nil {
|
||||
c.logger.Critical(context.Background(), "received update with bad id", slog.F("id", update.Id))
|
||||
return false
|
||||
}
|
||||
logger := c.logger.With(slog.F("peer_id", id))
|
||||
lc, ok := c.peers[id]
|
||||
var node *tailcfg.Node
|
||||
if update.Kind == proto.CoordinateResponse_PeerUpdate_NODE {
|
||||
// If no preferred DERP is provided, we can't reach the node.
|
||||
if update.Node.PreferredDerp == 0 {
|
||||
logger.Warn(context.Background(), "no preferred DERP, peer update", slog.F("node_proto", update.Node))
|
||||
return false
|
||||
}
|
||||
node, err = c.protoNodeToTailcfg(update.Node)
|
||||
if err != nil {
|
||||
logger.Critical(context.Background(), "failed to convert proto node to tailcfg", slog.F("node_proto", update.Node))
|
||||
return false
|
||||
}
|
||||
logger = logger.With(slog.F("key_id", node.Key.ShortString()), slog.F("node", node))
|
||||
peerStatus, ok := status.Peer[node.Key]
|
||||
// Starting KeepAlive messages at the initialization of a connection
|
||||
// causes a race condition. If we send the handshake before the peer has
|
||||
// our node, we'll have to wait for 5 seconds before trying again.
|
||||
// Ideally, the first handshake starts when the user first initiates a
|
||||
// connection to the peer. After a successful connection we enable
|
||||
// keep alives to persist the connection and keep it from becoming idle.
|
||||
// SSH connections don't send packets while idle, so we use keep alives
|
||||
// to avoid random hangs while we set up the connection again after
|
||||
// inactivity.
|
||||
node.KeepAlive = ok && peerStatus.Active
|
||||
if c.blockEndpoints {
|
||||
node.Endpoints = nil
|
||||
}
|
||||
}
|
||||
switch {
|
||||
case !ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
|
||||
// new!
|
||||
var lastHandshake time.Time
|
||||
if ps, ok := status.Peer[node.Key]; ok {
|
||||
lastHandshake = ps.LastHandshake
|
||||
}
|
||||
c.peers[id] = &peerLifecycle{
|
||||
peerID: id,
|
||||
node: node,
|
||||
lastHandshake: lastHandshake,
|
||||
lost: false,
|
||||
}
|
||||
logger.Debug(context.Background(), "adding new peer")
|
||||
return true
|
||||
case ok && update.Kind == proto.CoordinateResponse_PeerUpdate_NODE:
|
||||
// update
|
||||
node.Created = lc.node.Created
|
||||
dirty = !lc.node.Equal(node)
|
||||
lc.node = node
|
||||
lc.lost = false
|
||||
lc.resetTimer()
|
||||
logger.Debug(context.Background(), "node update to existing peer", slog.F("dirty", dirty))
|
||||
return dirty
|
||||
case !ok:
|
||||
// disconnected or lost, but we don't have the node. No op
|
||||
logger.Debug(context.Background(), "skipping update for peer we don't recognize")
|
||||
return false
|
||||
case update.Kind == proto.CoordinateResponse_PeerUpdate_DISCONNECTED:
|
||||
lc.resetTimer()
|
||||
delete(c.peers, id)
|
||||
logger.Debug(context.Background(), "disconnected peer")
|
||||
return true
|
||||
case update.Kind == proto.CoordinateResponse_PeerUpdate_LOST:
|
||||
lc.lost = true
|
||||
lc.setLostTimer(c)
|
||||
logger.Debug(context.Background(), "marked peer lost")
|
||||
// marking a node lost doesn't change anything right now, so dirty=false
|
||||
return false
|
||||
default:
|
||||
logger.Warn(context.Background(), "unknown peer update", slog.F("kind", update.Kind))
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// peerLostTimeout is the callback that peerLifecycle uses when a peer is lost the timeout to
|
||||
// receive a handshake fires.
|
||||
func (c *configMaps) peerLostTimeout(id uuid.UUID) {
|
||||
logger := c.logger.With(slog.F("peer_id", id))
|
||||
logger.Debug(context.Background(),
|
||||
"peer lost timeout")
|
||||
|
||||
// First do a status update to see if the peer did a handshake while we were
|
||||
// waiting
|
||||
status := c.status()
|
||||
c.L.Lock()
|
||||
defer c.L.Unlock()
|
||||
|
||||
lc, ok := c.peers[id]
|
||||
if !ok {
|
||||
logger.Debug(context.Background(),
|
||||
"timeout triggered for peer that is removed from the map")
|
||||
return
|
||||
}
|
||||
if peerStatus, ok := status.Peer[lc.node.Key]; ok {
|
||||
lc.lastHandshake = peerStatus.LastHandshake
|
||||
}
|
||||
logger = logger.With(slog.F("key_id", lc.node.Key.ShortString()))
|
||||
if !lc.lost {
|
||||
logger.Debug(context.Background(),
|
||||
"timeout triggered for peer that is no longer lost")
|
||||
return
|
||||
}
|
||||
since := c.clock.Since(lc.lastHandshake)
|
||||
if since >= lostTimeout {
|
||||
logger.Info(
|
||||
context.Background(), "removing lost peer")
|
||||
delete(c.peers, id)
|
||||
c.netmapDirty = true
|
||||
c.Broadcast()
|
||||
return
|
||||
}
|
||||
logger.Debug(context.Background(),
|
||||
"timeout triggered for peer but it had handshake in meantime")
|
||||
lc.setLostTimer(c)
|
||||
}
|
||||
|
||||
func (c *configMaps) protoNodeToTailcfg(p *proto.Node) (*tailcfg.Node, error) {
|
||||
node, err := ProtoToNode(p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &tailcfg.Node{
|
||||
ID: tailcfg.NodeID(p.GetId()),
|
||||
Created: c.clock.Now(),
|
||||
Key: node.Key,
|
||||
DiscoKey: node.DiscoKey,
|
||||
Addresses: node.Addresses,
|
||||
AllowedIPs: node.AllowedIPs,
|
||||
Endpoints: node.Endpoints,
|
||||
DERP: fmt.Sprintf("%s:%d", tailcfg.DerpMagicIP, node.PreferredDERP),
|
||||
Hostinfo: (&tailcfg.Hostinfo{}).View(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type peerLifecycle struct {
|
||||
node *tailcfg.Node
|
||||
// TODO: implement timers to track lost peers
|
||||
// lastHandshake time.Time
|
||||
// timer time.Timer
|
||||
peerID uuid.UUID
|
||||
node *tailcfg.Node
|
||||
lost bool
|
||||
lastHandshake time.Time
|
||||
timer *clock.Timer
|
||||
}
|
||||
|
||||
func (l *peerLifecycle) resetTimer() {
|
||||
if l.timer != nil {
|
||||
l.timer.Stop()
|
||||
l.timer = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (l *peerLifecycle) setLostTimer(c *configMaps) {
|
||||
if l.timer != nil {
|
||||
l.timer.Stop()
|
||||
}
|
||||
ttl := lostTimeout - c.clock.Since(l.lastHandshake)
|
||||
if ttl <= 0 {
|
||||
ttl = time.Nanosecond
|
||||
}
|
||||
l.timer = c.clock.AfterFunc(ttl, func() {
|
||||
c.peerLostTimeout(l.peerID)
|
||||
})
|
||||
}
|
||||
|
||||
// prefixesDifferent returns true if the two slices contain different prefixes
|
||||
|
@ -1,12 +1,17 @@
|
||||
package tailnet
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/netip"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
"tailscale.com/net/dns"
|
||||
"tailscale.com/tailcfg"
|
||||
"tailscale.com/types/key"
|
||||
@ -15,14 +20,16 @@ import (
|
||||
"tailscale.com/wgengine/router"
|
||||
"tailscale.com/wgengine/wgcfg"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/coder/v2/tailnet/proto"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
func TestConfigMaps_setAddresses_different(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, nil)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fEng := newFakeEngineConfigurable()
|
||||
nodePrivateKey := key.NewNode()
|
||||
nodeID := tailcfg.NodeID(5)
|
||||
@ -80,7 +87,7 @@ func TestConfigMaps_setAddresses_different(t *testing.T) {
|
||||
func TestConfigMaps_setAddresses_same(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, nil)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fEng := newFakeEngineConfigurable()
|
||||
nodePrivateKey := key.NewNode()
|
||||
nodeID := tailcfg.NodeID(5)
|
||||
@ -89,6 +96,494 @@ func TestConfigMaps_setAddresses_same(t *testing.T) {
|
||||
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), addrs)
|
||||
defer uut.close()
|
||||
|
||||
requireNeverConfigures(ctx, t, uut)
|
||||
|
||||
uut.setAddresses(addrs)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
uut.close()
|
||||
}()
|
||||
_ = testutil.RequireRecvCtx(ctx, t, done)
|
||||
}
|
||||
|
||||
func TestConfigMaps_updatePeers_new(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fEng := newFakeEngineConfigurable()
|
||||
nodePrivateKey := key.NewNode()
|
||||
nodeID := tailcfg.NodeID(5)
|
||||
discoKey := key.NewDisco()
|
||||
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil)
|
||||
defer uut.close()
|
||||
|
||||
p1ID := uuid.UUID{1}
|
||||
p1Node := newTestNode(1)
|
||||
p1n, err := NodeToProto(p1Node)
|
||||
require.NoError(t, err)
|
||||
p2ID := uuid.UUID{2}
|
||||
p2Node := newTestNode(2)
|
||||
p2n, err := NodeToProto(p2Node)
|
||||
require.NoError(t, err)
|
||||
|
||||
go func() {
|
||||
b := <-fEng.status
|
||||
b.AddPeer(p1Node.Key, &ipnstate.PeerStatus{
|
||||
PublicKey: p1Node.Key,
|
||||
LastHandshake: time.Date(2024, 1, 7, 12, 13, 10, 0, time.UTC),
|
||||
Active: true,
|
||||
})
|
||||
// peer 2 is missing, so it won't have KeepAlives set
|
||||
fEng.statusDone <- struct{}{}
|
||||
}()
|
||||
|
||||
updates := []*proto.CoordinateResponse_PeerUpdate{
|
||||
{
|
||||
Id: p1ID[:],
|
||||
Kind: proto.CoordinateResponse_PeerUpdate_NODE,
|
||||
Node: p1n,
|
||||
},
|
||||
{
|
||||
Id: p2ID[:],
|
||||
Kind: proto.CoordinateResponse_PeerUpdate_NODE,
|
||||
Node: p2n,
|
||||
},
|
||||
}
|
||||
uut.updatePeers(updates)
|
||||
|
||||
nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap)
|
||||
r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig)
|
||||
|
||||
require.Len(t, nm.Peers, 2)
|
||||
n1 := getNodeWithID(t, nm.Peers, 1)
|
||||
require.Equal(t, "127.3.3.40:1", n1.DERP)
|
||||
require.Equal(t, p1Node.Endpoints, n1.Endpoints)
|
||||
require.True(t, n1.KeepAlive)
|
||||
n2 := getNodeWithID(t, nm.Peers, 2)
|
||||
require.Equal(t, "127.3.3.40:2", n2.DERP)
|
||||
require.Equal(t, p2Node.Endpoints, n2.Endpoints)
|
||||
require.False(t, n2.KeepAlive)
|
||||
|
||||
// we rely on nmcfg.WGCfg() to convert the netmap to wireguard config, so just
|
||||
// require the right number of peers.
|
||||
require.Len(t, r.wg.Peers, 2)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
uut.close()
|
||||
}()
|
||||
_ = testutil.RequireRecvCtx(ctx, t, done)
|
||||
}
|
||||
|
||||
func TestConfigMaps_updatePeers_same(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fEng := newFakeEngineConfigurable()
|
||||
nodePrivateKey := key.NewNode()
|
||||
nodeID := tailcfg.NodeID(5)
|
||||
discoKey := key.NewDisco()
|
||||
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil)
|
||||
defer uut.close()
|
||||
|
||||
// Then: we don't configure
|
||||
requireNeverConfigures(ctx, t, uut)
|
||||
|
||||
p1ID := uuid.UUID{1}
|
||||
p1Node := newTestNode(1)
|
||||
p1n, err := NodeToProto(p1Node)
|
||||
require.NoError(t, err)
|
||||
p1tcn, err := uut.protoNodeToTailcfg(p1n)
|
||||
p1tcn.KeepAlive = true
|
||||
require.NoError(t, err)
|
||||
|
||||
// Given: peer already exists
|
||||
uut.L.Lock()
|
||||
uut.peers[p1ID] = &peerLifecycle{
|
||||
peerID: p1ID,
|
||||
node: p1tcn,
|
||||
lastHandshake: time.Date(2024, 1, 7, 12, 0, 10, 0, time.UTC),
|
||||
}
|
||||
uut.L.Unlock()
|
||||
|
||||
go func() {
|
||||
b := <-fEng.status
|
||||
b.AddPeer(p1Node.Key, &ipnstate.PeerStatus{
|
||||
PublicKey: p1Node.Key,
|
||||
LastHandshake: time.Date(2024, 1, 7, 12, 13, 10, 0, time.UTC),
|
||||
Active: true,
|
||||
})
|
||||
fEng.statusDone <- struct{}{}
|
||||
}()
|
||||
|
||||
// When: update with no changes
|
||||
updates := []*proto.CoordinateResponse_PeerUpdate{
|
||||
{
|
||||
Id: p1ID[:],
|
||||
Kind: proto.CoordinateResponse_PeerUpdate_NODE,
|
||||
Node: p1n,
|
||||
},
|
||||
}
|
||||
uut.updatePeers(updates)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
uut.close()
|
||||
}()
|
||||
_ = testutil.RequireRecvCtx(ctx, t, done)
|
||||
}
|
||||
|
||||
func TestConfigMaps_updatePeers_disconnect(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fEng := newFakeEngineConfigurable()
|
||||
nodePrivateKey := key.NewNode()
|
||||
nodeID := tailcfg.NodeID(5)
|
||||
discoKey := key.NewDisco()
|
||||
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil)
|
||||
defer uut.close()
|
||||
|
||||
p1ID := uuid.UUID{1}
|
||||
p1Node := newTestNode(1)
|
||||
p1n, err := NodeToProto(p1Node)
|
||||
require.NoError(t, err)
|
||||
p1tcn, err := uut.protoNodeToTailcfg(p1n)
|
||||
p1tcn.KeepAlive = true
|
||||
require.NoError(t, err)
|
||||
|
||||
// set a timer, which should get canceled by the disconnect.
|
||||
timer := uut.clock.AfterFunc(testutil.WaitMedium, func() {
|
||||
t.Error("this should not be called!")
|
||||
})
|
||||
|
||||
// Given: peer already exists
|
||||
uut.L.Lock()
|
||||
uut.peers[p1ID] = &peerLifecycle{
|
||||
peerID: p1ID,
|
||||
node: p1tcn,
|
||||
lastHandshake: time.Date(2024, 1, 7, 12, 0, 10, 0, time.UTC),
|
||||
timer: timer,
|
||||
}
|
||||
uut.L.Unlock()
|
||||
|
||||
go func() {
|
||||
b := <-fEng.status
|
||||
b.AddPeer(p1Node.Key, &ipnstate.PeerStatus{
|
||||
PublicKey: p1Node.Key,
|
||||
LastHandshake: time.Date(2024, 1, 7, 12, 13, 10, 0, time.UTC),
|
||||
Active: true,
|
||||
})
|
||||
fEng.statusDone <- struct{}{}
|
||||
}()
|
||||
|
||||
// When: update DISCONNECTED
|
||||
updates := []*proto.CoordinateResponse_PeerUpdate{
|
||||
{
|
||||
Id: p1ID[:],
|
||||
Kind: proto.CoordinateResponse_PeerUpdate_DISCONNECTED,
|
||||
},
|
||||
}
|
||||
uut.updatePeers(updates)
|
||||
assert.False(t, timer.Stop(), "timer was not stopped")
|
||||
|
||||
// Then, configure engine without the peer.
|
||||
nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap)
|
||||
r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig)
|
||||
require.Len(t, nm.Peers, 0)
|
||||
require.Len(t, r.wg.Peers, 0)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
uut.close()
|
||||
}()
|
||||
_ = testutil.RequireRecvCtx(ctx, t, done)
|
||||
}
|
||||
|
||||
func TestConfigMaps_updatePeers_lost(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fEng := newFakeEngineConfigurable()
|
||||
nodePrivateKey := key.NewNode()
|
||||
nodeID := tailcfg.NodeID(5)
|
||||
discoKey := key.NewDisco()
|
||||
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil)
|
||||
defer uut.close()
|
||||
start := time.Date(2024, time.January, 1, 8, 0, 0, 0, time.UTC)
|
||||
mClock := clock.NewMock()
|
||||
mClock.Set(start)
|
||||
uut.clock = mClock
|
||||
|
||||
p1ID := uuid.UUID{1}
|
||||
p1Node := newTestNode(1)
|
||||
p1n, err := NodeToProto(p1Node)
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, start)
|
||||
|
||||
updates := []*proto.CoordinateResponse_PeerUpdate{
|
||||
{
|
||||
Id: p1ID[:],
|
||||
Kind: proto.CoordinateResponse_PeerUpdate_NODE,
|
||||
Node: p1n,
|
||||
},
|
||||
}
|
||||
uut.updatePeers(updates)
|
||||
nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap)
|
||||
r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig)
|
||||
require.Len(t, nm.Peers, 1)
|
||||
require.Len(t, r.wg.Peers, 1)
|
||||
_ = testutil.RequireRecvCtx(ctx, t, s1)
|
||||
|
||||
mClock.Add(5 * time.Second)
|
||||
|
||||
s2 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, start)
|
||||
|
||||
updates[0].Kind = proto.CoordinateResponse_PeerUpdate_LOST
|
||||
updates[0].Node = nil
|
||||
uut.updatePeers(updates)
|
||||
_ = testutil.RequireRecvCtx(ctx, t, s2)
|
||||
|
||||
// No reprogramming yet, since we keep the peer around.
|
||||
select {
|
||||
case <-fEng.setNetworkMap:
|
||||
t.Fatal("should not reprogram")
|
||||
default:
|
||||
// OK!
|
||||
}
|
||||
|
||||
// When we advance the clock, the timeout triggers. However, the new
|
||||
// latest handshake has advanced by a minute, so we don't remove the peer.
|
||||
lh := start.Add(time.Minute)
|
||||
s3 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, lh)
|
||||
mClock.Add(lostTimeout)
|
||||
_ = testutil.RequireRecvCtx(ctx, t, s3)
|
||||
select {
|
||||
case <-fEng.setNetworkMap:
|
||||
t.Fatal("should not reprogram")
|
||||
default:
|
||||
// OK!
|
||||
}
|
||||
|
||||
// Before we update the clock again, we need to be sure the timeout has
|
||||
// completed running. To do that, we check the new lastHandshake has been set
|
||||
require.Eventually(t, func() bool {
|
||||
uut.L.Lock()
|
||||
defer uut.L.Unlock()
|
||||
return uut.peers[p1ID].lastHandshake == lh
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
|
||||
// Advance the clock again by a minute, which should trigger the reprogrammed
|
||||
// timeout.
|
||||
s4 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, lh)
|
||||
mClock.Add(time.Minute)
|
||||
|
||||
nm = testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap)
|
||||
r = testutil.RequireRecvCtx(ctx, t, fEng.reconfig)
|
||||
require.Len(t, nm.Peers, 0)
|
||||
require.Len(t, r.wg.Peers, 0)
|
||||
_ = testutil.RequireRecvCtx(ctx, t, s4)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
uut.close()
|
||||
}()
|
||||
_ = testutil.RequireRecvCtx(ctx, t, done)
|
||||
}
|
||||
|
||||
func TestConfigMaps_updatePeers_lost_and_found(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fEng := newFakeEngineConfigurable()
|
||||
nodePrivateKey := key.NewNode()
|
||||
nodeID := tailcfg.NodeID(5)
|
||||
discoKey := key.NewDisco()
|
||||
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil)
|
||||
defer uut.close()
|
||||
start := time.Date(2024, time.January, 1, 8, 0, 0, 0, time.UTC)
|
||||
mClock := clock.NewMock()
|
||||
mClock.Set(start)
|
||||
uut.clock = mClock
|
||||
|
||||
p1ID := uuid.UUID{1}
|
||||
p1Node := newTestNode(1)
|
||||
p1n, err := NodeToProto(p1Node)
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, start)
|
||||
|
||||
updates := []*proto.CoordinateResponse_PeerUpdate{
|
||||
{
|
||||
Id: p1ID[:],
|
||||
Kind: proto.CoordinateResponse_PeerUpdate_NODE,
|
||||
Node: p1n,
|
||||
},
|
||||
}
|
||||
uut.updatePeers(updates)
|
||||
nm := testutil.RequireRecvCtx(ctx, t, fEng.setNetworkMap)
|
||||
r := testutil.RequireRecvCtx(ctx, t, fEng.reconfig)
|
||||
require.Len(t, nm.Peers, 1)
|
||||
require.Len(t, r.wg.Peers, 1)
|
||||
_ = testutil.RequireRecvCtx(ctx, t, s1)
|
||||
|
||||
mClock.Add(5 * time.Second)
|
||||
|
||||
s2 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, start)
|
||||
|
||||
updates[0].Kind = proto.CoordinateResponse_PeerUpdate_LOST
|
||||
updates[0].Node = nil
|
||||
uut.updatePeers(updates)
|
||||
_ = testutil.RequireRecvCtx(ctx, t, s2)
|
||||
|
||||
// No reprogramming yet, since we keep the peer around.
|
||||
select {
|
||||
case <-fEng.setNetworkMap:
|
||||
t.Fatal("should not reprogram")
|
||||
default:
|
||||
// OK!
|
||||
}
|
||||
|
||||
mClock.Add(5 * time.Second)
|
||||
s3 := expectStatusWithHandshake(ctx, t, fEng, p1Node.Key, start)
|
||||
|
||||
updates[0].Kind = proto.CoordinateResponse_PeerUpdate_NODE
|
||||
updates[0].Node = p1n
|
||||
uut.updatePeers(updates)
|
||||
_ = testutil.RequireRecvCtx(ctx, t, s3)
|
||||
// This does not trigger reprogramming, because we never removed the node
|
||||
select {
|
||||
case <-fEng.setNetworkMap:
|
||||
t.Fatal("should not reprogram")
|
||||
default:
|
||||
// OK!
|
||||
}
|
||||
|
||||
// When we advance the clock, nothing happens because the timeout was
|
||||
// canceled
|
||||
mClock.Add(lostTimeout)
|
||||
select {
|
||||
case <-fEng.setNetworkMap:
|
||||
t.Fatal("should not reprogram")
|
||||
default:
|
||||
// OK!
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
uut.close()
|
||||
}()
|
||||
_ = testutil.RequireRecvCtx(ctx, t, done)
|
||||
}
|
||||
|
||||
func expectStatusWithHandshake(
|
||||
ctx context.Context, t testing.TB, fEng *fakeEngineConfigurable, k key.NodePublic, lastHandshake time.Time,
|
||||
) <-chan struct{} {
|
||||
t.Helper()
|
||||
called := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Error("timeout waiting for status")
|
||||
return
|
||||
case b := <-fEng.status:
|
||||
b.AddPeer(k, &ipnstate.PeerStatus{
|
||||
PublicKey: k,
|
||||
LastHandshake: lastHandshake,
|
||||
Active: true,
|
||||
})
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Error("timeout sending done")
|
||||
case fEng.statusDone <- struct{}{}:
|
||||
close(called)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return called
|
||||
}
|
||||
|
||||
func TestConfigMaps_updatePeers_nonexist(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
for _, k := range []proto.CoordinateResponse_PeerUpdate_Kind{
|
||||
proto.CoordinateResponse_PeerUpdate_DISCONNECTED,
|
||||
proto.CoordinateResponse_PeerUpdate_LOST,
|
||||
} {
|
||||
k := k
|
||||
t.Run(k.String(), func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
fEng := newFakeEngineConfigurable()
|
||||
nodePrivateKey := key.NewNode()
|
||||
nodeID := tailcfg.NodeID(5)
|
||||
discoKey := key.NewDisco()
|
||||
uut := newConfigMaps(logger, fEng, nodeID, nodePrivateKey, discoKey.Public(), nil)
|
||||
defer uut.close()
|
||||
|
||||
// Then: we don't configure
|
||||
requireNeverConfigures(ctx, t, uut)
|
||||
|
||||
// Given: no known peers
|
||||
go func() {
|
||||
<-fEng.status
|
||||
fEng.statusDone <- struct{}{}
|
||||
}()
|
||||
|
||||
// When: update with LOST/DISCONNECTED
|
||||
p1ID := uuid.UUID{1}
|
||||
updates := []*proto.CoordinateResponse_PeerUpdate{
|
||||
{
|
||||
Id: p1ID[:],
|
||||
Kind: k,
|
||||
},
|
||||
}
|
||||
uut.updatePeers(updates)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
uut.close()
|
||||
}()
|
||||
_ = testutil.RequireRecvCtx(ctx, t, done)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newTestNode(id int) *Node {
|
||||
return &Node{
|
||||
ID: tailcfg.NodeID(id),
|
||||
AsOf: time.Date(2024, 1, 7, 12, 13, 14, 15, time.UTC),
|
||||
Key: key.NewNode().Public(),
|
||||
DiscoKey: key.NewDisco().Public(),
|
||||
Endpoints: []string{"192.168.0.55"},
|
||||
PreferredDERP: id,
|
||||
}
|
||||
}
|
||||
|
||||
func getNodeWithID(t testing.TB, peers []*tailcfg.Node, id tailcfg.NodeID) *tailcfg.Node {
|
||||
t.Helper()
|
||||
for _, n := range peers {
|
||||
if n.ID == id {
|
||||
return n
|
||||
}
|
||||
}
|
||||
t.Fatal()
|
||||
return nil
|
||||
}
|
||||
|
||||
func requireNeverConfigures(ctx context.Context, t *testing.T, uut *configMaps) {
|
||||
t.Helper()
|
||||
waiting := make(chan struct{})
|
||||
go func() {
|
||||
// ensure that we never configure, and go straight to closed
|
||||
@ -101,15 +596,6 @@ func TestConfigMaps_setAddresses_same(t *testing.T) {
|
||||
assert.Equal(t, closed, uut.phase)
|
||||
}()
|
||||
_ = testutil.RequireRecvCtx(ctx, t, waiting)
|
||||
|
||||
uut.setAddresses(addrs)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
uut.close()
|
||||
}()
|
||||
_ = testutil.RequireRecvCtx(ctx, t, done)
|
||||
}
|
||||
|
||||
type reconfigCall struct {
|
||||
@ -123,6 +609,16 @@ type fakeEngineConfigurable struct {
|
||||
setNetworkMap chan *netmap.NetworkMap
|
||||
reconfig chan reconfigCall
|
||||
filter chan *filter.Filter
|
||||
|
||||
// To fake these fields the test should read from status, do stuff to the
|
||||
// StatusBuilder, then write to statusDone
|
||||
status chan *ipnstate.StatusBuilder
|
||||
statusDone chan struct{}
|
||||
}
|
||||
|
||||
func (f fakeEngineConfigurable) UpdateStatus(status *ipnstate.StatusBuilder) {
|
||||
f.status <- status
|
||||
<-f.statusDone
|
||||
}
|
||||
|
||||
func newFakeEngineConfigurable() *fakeEngineConfigurable {
|
||||
@ -130,6 +626,8 @@ func newFakeEngineConfigurable() *fakeEngineConfigurable {
|
||||
setNetworkMap: make(chan *netmap.NetworkMap),
|
||||
reconfig: make(chan reconfigCall),
|
||||
filter: make(chan *filter.Filter),
|
||||
status: make(chan *ipnstate.StatusBuilder),
|
||||
statusDone: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user