mirror of
https://github.com/coder/coder.git
synced 2025-07-06 15:41:45 +00:00
chore: add clock pkg for testing time (#13461)
Adds a package for testing time/timer/ticker functions. Implementation is limited to `NewTimer` and `NewContextTicker`, but will eventually be expanded to all `time` functions from the standard library as well as `context.WithTimeout()`, `context.WithDeadline()`. Replaces `benbjohnson/clock` for the pubsub watchdog, as a proof of concept. Eventually, as we expand functionality, we will replace most time-related functions with this library for testing.
This commit is contained in:
@ -7,9 +7,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"github.com/coder/coder/v2/clock"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -36,7 +35,7 @@ type Watchdog struct {
|
||||
}
|
||||
|
||||
func NewWatchdog(ctx context.Context, logger slog.Logger, ps Pubsub) *Watchdog {
|
||||
return NewWatchdogWithClock(ctx, logger, ps, clock.New())
|
||||
return NewWatchdogWithClock(ctx, logger, ps, clock.NewReal())
|
||||
}
|
||||
|
||||
// NewWatchdogWithClock returns a watchdog with the given clock. Product code should always call NewWatchDog.
|
||||
@ -79,32 +78,23 @@ func (w *Watchdog) Timeout() <-chan struct{} {
|
||||
|
||||
func (w *Watchdog) publishLoop() {
|
||||
defer w.wg.Done()
|
||||
tkr := w.clock.Ticker(periodHeartbeat)
|
||||
defer tkr.Stop()
|
||||
// immediate publish after starting the ticker. This helps testing so that we can tell from
|
||||
// the outside that the ticker is started.
|
||||
err := w.ps.Publish(EventPubsubWatchdog, []byte{})
|
||||
if err != nil {
|
||||
w.logger.Warn(w.ctx, "failed to publish heartbeat on pubsub watchdog", slog.Error(err))
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-w.ctx.Done():
|
||||
w.logger.Debug(w.ctx, "context done; exiting publishLoop")
|
||||
return
|
||||
case <-tkr.C:
|
||||
err := w.ps.Publish(EventPubsubWatchdog, []byte{})
|
||||
if err != nil {
|
||||
w.logger.Warn(w.ctx, "failed to publish heartbeat on pubsub watchdog", slog.Error(err))
|
||||
}
|
||||
tkr := w.clock.TickerFunc(w.ctx, periodHeartbeat, func() error {
|
||||
err := w.ps.Publish(EventPubsubWatchdog, []byte{})
|
||||
if err != nil {
|
||||
w.logger.Warn(w.ctx, "failed to publish heartbeat on pubsub watchdog", slog.Error(err))
|
||||
} else {
|
||||
w.logger.Debug(w.ctx, "published heartbeat on pubsub watchdog")
|
||||
}
|
||||
}
|
||||
return err
|
||||
}, "publish")
|
||||
// ignore the error, since we log before returning the error
|
||||
_ = tkr.Wait()
|
||||
}
|
||||
|
||||
func (w *Watchdog) subscribeMonitor() {
|
||||
defer w.wg.Done()
|
||||
tmr := w.clock.Timer(periodTimeout)
|
||||
defer tmr.Stop()
|
||||
tmr := w.clock.NewTimer(periodTimeout)
|
||||
defer tmr.Stop("subscribe")
|
||||
beats := make(chan struct{})
|
||||
unsub, err := w.ps.Subscribe(EventPubsubWatchdog, func(context.Context, []byte) {
|
||||
w.logger.Debug(w.ctx, "got heartbeat for pubsub watchdog")
|
||||
|
@ -4,36 +4,48 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/coder/v2/clock"
|
||||
"github.com/coder/coder/v2/coderd/database/pubsub"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
func TestWatchdog_NoTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, time.Hour)
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
mClock := clock.NewMock()
|
||||
start := time.Date(2024, 2, 5, 8, 7, 6, 5, time.UTC)
|
||||
mClock.Set(start)
|
||||
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
|
||||
fPS := newFakePubsub()
|
||||
|
||||
// trap the ticker and timer.Stop() calls
|
||||
pubTrap := mClock.Trap().TickerFunc("publish")
|
||||
defer pubTrap.Close()
|
||||
subTrap := mClock.Trap().TimerStop("subscribe")
|
||||
defer subTrap.Close()
|
||||
|
||||
uut := pubsub.NewWatchdogWithClock(ctx, logger, fPS, mClock)
|
||||
|
||||
// wait for the ticker to be created so that we know it starts from the
|
||||
// right baseline time.
|
||||
pc, err := pubTrap.Wait(ctx)
|
||||
require.NoError(t, err)
|
||||
pc.Release()
|
||||
require.Equal(t, 15*time.Second, pc.Duration)
|
||||
|
||||
// we subscribe after starting the timer, so we know the timer also starts
|
||||
// from the baseline.
|
||||
sub := testutil.RequireRecvCtx(ctx, t, fPS.subs)
|
||||
require.Equal(t, pubsub.EventPubsubWatchdog, sub.event)
|
||||
p := testutil.RequireRecvCtx(ctx, t, fPS.pubs)
|
||||
require.Equal(t, pubsub.EventPubsubWatchdog, p)
|
||||
|
||||
// 5 min / 15 sec = 20, so do 21 ticks
|
||||
for i := 0; i < 21; i++ {
|
||||
mClock.Add(15 * time.Second)
|
||||
p = testutil.RequireRecvCtx(ctx, t, fPS.pubs)
|
||||
mClock.Advance(15 * time.Second)
|
||||
p := testutil.RequireRecvCtx(ctx, t, fPS.pubs)
|
||||
require.Equal(t, pubsub.EventPubsubWatchdog, p)
|
||||
mClock.Add(30 * time.Millisecond) // reasonable round-trip
|
||||
mClock.Advance(30 * time.Millisecond) // reasonable round-trip
|
||||
// forward the beat
|
||||
sub.listener(ctx, []byte{})
|
||||
// we shouldn't time out
|
||||
@ -45,7 +57,14 @@ func TestWatchdog_NoTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
err := uut.Close()
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- uut.Close()
|
||||
}()
|
||||
sc, err := subTrap.Wait(ctx) // timer.Stop() called
|
||||
require.NoError(t, err)
|
||||
sc.Release()
|
||||
err = testutil.RequireRecvCtx(ctx, t, errCh)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -53,23 +72,33 @@ func TestWatchdog_Timeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
mClock := clock.NewMock()
|
||||
start := time.Date(2024, 2, 5, 8, 7, 6, 5, time.UTC)
|
||||
mClock.Set(start)
|
||||
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
|
||||
fPS := newFakePubsub()
|
||||
|
||||
// trap the ticker and timer calls
|
||||
pubTrap := mClock.Trap().TickerFunc("publish")
|
||||
defer pubTrap.Close()
|
||||
|
||||
uut := pubsub.NewWatchdogWithClock(ctx, logger, fPS, mClock)
|
||||
|
||||
// wait for the ticker to be created so that we know it starts from the
|
||||
// right baseline time.
|
||||
pc, err := pubTrap.Wait(ctx)
|
||||
require.NoError(t, err)
|
||||
pc.Release()
|
||||
require.Equal(t, 15*time.Second, pc.Duration)
|
||||
|
||||
// we subscribe after starting the timer, so we know the timer also starts
|
||||
// from the baseline.
|
||||
sub := testutil.RequireRecvCtx(ctx, t, fPS.subs)
|
||||
require.Equal(t, pubsub.EventPubsubWatchdog, sub.event)
|
||||
p := testutil.RequireRecvCtx(ctx, t, fPS.pubs)
|
||||
require.Equal(t, pubsub.EventPubsubWatchdog, p)
|
||||
|
||||
// 5 min / 15 sec = 20, so do 19 ticks without timing out
|
||||
for i := 0; i < 19; i++ {
|
||||
mClock.Add(15 * time.Second)
|
||||
p = testutil.RequireRecvCtx(ctx, t, fPS.pubs)
|
||||
mClock.Advance(15 * time.Second)
|
||||
p := testutil.RequireRecvCtx(ctx, t, fPS.pubs)
|
||||
require.Equal(t, pubsub.EventPubsubWatchdog, p)
|
||||
mClock.Add(30 * time.Millisecond) // reasonable round-trip
|
||||
mClock.Advance(30 * time.Millisecond) // reasonable round-trip
|
||||
// we DO NOT forward the heartbeat
|
||||
// we shouldn't time out
|
||||
select {
|
||||
@ -79,12 +108,12 @@ func TestWatchdog_Timeout(t *testing.T) {
|
||||
// OK!
|
||||
}
|
||||
}
|
||||
mClock.Add(15 * time.Second)
|
||||
p = testutil.RequireRecvCtx(ctx, t, fPS.pubs)
|
||||
mClock.Advance(15 * time.Second)
|
||||
p := testutil.RequireRecvCtx(ctx, t, fPS.pubs)
|
||||
require.Equal(t, pubsub.EventPubsubWatchdog, p)
|
||||
testutil.RequireRecvCtx(ctx, t, uut.Timeout())
|
||||
|
||||
err := uut.Close()
|
||||
err = uut.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
@ -118,7 +147,7 @@ func (f *fakePubsub) Publish(event string, _ []byte) error {
|
||||
|
||||
func newFakePubsub() *fakePubsub {
|
||||
return &fakePubsub{
|
||||
pubs: make(chan string),
|
||||
pubs: make(chan string, 1),
|
||||
subs: make(chan subscribe),
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user