fix: fix TestPendingUpdatesMetric flaky assertion (#14534)

This commit is contained in:
Spike Curtis
2024-09-03 13:47:34 +04:00
committed by GitHub
parent 2f18f4583b
commit 0eca1fcb8b
3 changed files with 58 additions and 43 deletions

View File

@ -11,6 +11,7 @@ import (
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/quartz"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/notifications/dispatch"
@ -54,13 +55,25 @@ type Manager struct {
stopOnce sync.Once
stop chan any
done chan any
// clock is for testing only
clock quartz.Clock
}
type ManagerOption func(*Manager)
// WithTestClock is used in testing to set the quartz clock on the manager
func WithTestClock(clock quartz.Clock) ManagerOption {
return func(m *Manager) {
m.clock = clock
}
}
// NewManager instantiates a new Manager instance which coordinates notification enqueuing and delivery.
//
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
// access URL etc.
func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger) (*Manager, error) {
func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger, opts ...ManagerOption) (*Manager, error) {
// TODO(dannyk): add the ability to use multiple notification methods.
var method database.NotificationMethod
if err := method.Scan(cfg.Method.String()); err != nil {
@ -74,7 +87,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.
return nil, ErrInvalidDispatchTimeout
}
return &Manager{
m := &Manager{
log: log,
cfg: cfg,
store: store,
@ -95,7 +108,13 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.
done: make(chan any),
handlers: defaultHandlers(cfg, helpers, log),
}, nil
clock: quartz.NewReal(),
}
for _, o := range opts {
o(m)
}
return m, nil
}
// defaultHandlers builds a set of known handlers; panics if any error occurs as these handlers should be valid at compile time.
@ -150,7 +169,7 @@ func (m *Manager) loop(ctx context.Context) error {
var eg errgroup.Group
// Create a notifier to run concurrently, which will handle dequeueing and dispatching notifications.
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics)
m.notifier = newNotifier(m.cfg, uuid.New(), m.log, m.store, m.handlers, m.metrics, m.clock)
eg.Go(func() error {
return m.notifier.run(ctx, m.success, m.failure)
})
@ -158,7 +177,7 @@ func (m *Manager) loop(ctx context.Context) error {
// Periodically flush notification state changes to the store.
eg.Go(func() error {
// Every interval, collect the messages in the channels and bulk update them in the store.
tick := time.NewTicker(m.cfg.StoreSyncInterval.Value())
tick := m.clock.NewTicker(m.cfg.StoreSyncInterval.Value(), "Manager", "storeSync")
defer tick.Stop()
for {
select {

View File

@ -221,13 +221,16 @@ func TestPendingUpdatesMetric(t *testing.T) {
// GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric
cfg := defaultNotificationsConfig(method)
cfg.FetchInterval = serpent.Duration(time.Millisecond * 50)
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)
syncer := &syncInterceptor{Store: api.Database}
interceptor := newUpdateSignallingInterceptor(syncer)
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager"))
mClock := quartz.NewMock(t)
trap := mClock.Trap().NewTicker("Manager", "storeSync")
defer trap.Close()
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, api.Logger.Named("manager"),
notifications.WithTestClock(mClock))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
@ -249,6 +252,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
require.NoError(t, err)
mgr.Run(ctx)
trap.MustWait(ctx).Release() // ensures ticker has been set
// THEN:
// Wait until the handler has dispatched the given notifications.
@ -259,17 +263,20 @@ func TestPendingUpdatesMetric(t *testing.T) {
return len(handler.succeeded) == 1 && len(handler.failed) == 1
}, testutil.WaitShort, testutil.IntervalFast)
// Wait until we intercept the calls to sync the pending updates to the store.
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure)
// Wait for the metric to be updated with the expected count of metrics.
// Both handler calls should be pending in the metrics.
require.Eventually(t, func() bool {
return promtest.ToFloat64(metrics.PendingUpdates) == float64(success+failure)
return promtest.ToFloat64(metrics.PendingUpdates) == float64(2)
}, testutil.WaitShort, testutil.IntervalFast)
// Unpause the interceptor so the updates can proceed.
interceptor.unpause()
// THEN:
// Trigger syncing updates
mClock.Advance(cfg.StoreSyncInterval.Value()).MustWait(ctx)
// Wait until we intercept the calls to sync the pending updates to the store.
success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess)
require.EqualValues(t, 1, success)
failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure)
require.EqualValues(t, 1, failure)
// Validate that the store synced the expected number of updates.
require.Eventually(t, func() bool {
@ -464,43 +471,25 @@ func fingerprintLabels(lbs ...string) model.Fingerprint {
// signaled by the caller so it can continue.
type updateSignallingInterceptor struct {
notifications.Store
pause chan any
updateSuccess chan int
updateFailure chan int
}
func newUpdateSignallingInterceptor(interceptor notifications.Store) *updateSignallingInterceptor {
return &updateSignallingInterceptor{
Store: interceptor,
pause: make(chan any, 1),
Store: interceptor,
updateSuccess: make(chan int, 1),
updateFailure: make(chan int, 1),
}
}
func (u *updateSignallingInterceptor) unpause() {
close(u.pause)
}
func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) {
u.updateSuccess <- len(arg.IDs)
// Wait until signaled so we have a chance to read the number of pending updates.
<-u.pause
return u.Store.BulkMarkNotificationMessagesSent(ctx, arg)
}
func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) {
u.updateFailure <- len(arg.IDs)
// Wait until signaled so we have a chance to read the number of pending updates.
<-u.pause
return u.Store.BulkMarkNotificationMessagesFailed(ctx, arg)
}

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
@ -15,6 +14,7 @@ import (
"github.com/coder/coder/v2/coderd/notifications/render"
"github.com/coder/coder/v2/coderd/notifications/types"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/quartz"
"cdr.dev/slog"
@ -29,26 +29,33 @@ type notifier struct {
log slog.Logger
store Store
tick *time.Ticker
tick *quartz.Ticker
stopOnce sync.Once
quit chan any
done chan any
handlers map[database.NotificationMethod]Handler
metrics *Metrics
// clock is for testing
clock quartz.Clock
}
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store, hr map[database.NotificationMethod]Handler, metrics *Metrics) *notifier {
func newNotifier(cfg codersdk.NotificationsConfig, id uuid.UUID, log slog.Logger, db Store,
hr map[database.NotificationMethod]Handler, metrics *Metrics, clock quartz.Clock,
) *notifier {
tick := clock.NewTicker(cfg.FetchInterval.Value(), "notifier", "fetchInterval")
return &notifier{
id: id,
cfg: cfg,
log: log.Named("notifier").With(slog.F("notifier_id", id)),
quit: make(chan any),
done: make(chan any),
tick: time.NewTicker(cfg.FetchInterval.Value()),
tick: tick,
store: db,
handlers: hr,
metrics: metrics,
clock: clock,
}
}
@ -245,10 +252,10 @@ func (n *notifier) deliver(ctx context.Context, msg database.AcquireNotification
n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Inc()
n.metrics.QueuedSeconds.WithLabelValues(string(msg.Method)).Observe(msg.QueuedSeconds)
start := time.Now()
start := n.clock.Now()
retryable, err := deliver(ctx, msg.ID)
n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(time.Since(start).Seconds())
n.metrics.DispatcherSendSeconds.WithLabelValues(string(msg.Method)).Observe(n.clock.Since(start).Seconds())
n.metrics.InflightDispatches.WithLabelValues(string(msg.Method), msg.TemplateID.String()).Dec()
if err != nil {
@ -291,7 +298,7 @@ func (n *notifier) newSuccessfulDispatch(msg database.AcquireNotificationMessage
return dispatchResult{
notifier: n.id,
msg: msg.ID,
ts: dbtime.Now(),
ts: dbtime.Time(n.clock.Now().UTC()),
}
}
@ -311,7 +318,7 @@ func (n *notifier) newFailedDispatch(msg database.AcquireNotificationMessagesRow
return dispatchResult{
notifier: n.id,
msg: msg.ID,
ts: dbtime.Now(),
ts: dbtime.Time(n.clock.Now().UTC()),
err: err,
retryable: retryable,
}
@ -321,7 +328,7 @@ func (n *notifier) newInhibitedDispatch(msg database.AcquireNotificationMessages
return dispatchResult{
notifier: n.id,
msg: msg.ID,
ts: dbtime.Now(),
ts: dbtime.Time(n.clock.Now().UTC()),
retryable: false,
inhibited: true,
}