feat: add killswitch for notifications (#13794)

This commit is contained in:
Marcin Tojek
2024-07-10 16:15:06 +02:00
committed by GitHub
parent 542fff7df0
commit bf392ffea4
31 changed files with 774 additions and 25 deletions

View File

@ -538,6 +538,71 @@ func TestInvalidConfig(t *testing.T) {
require.ErrorIs(t, err, notifications.ErrInvalidDispatchTimeout)
}
func TestNotifierPaused(t *testing.T) {
t.Parallel()
// setup
ctx, logger, db := setupInMemory(t)
// Prepare the test
handler := &fakeHandler{}
method := database.NotificationMethodSmtp
user := createSampleUser(t, db)
cfg := defaultNotificationsConfig(method)
fetchInterval := time.Nanosecond // Let
cfg.FetchInterval = *serpent.DurationOf(&fetchInterval)
mgr, err := notifications.NewManager(cfg, db, logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
})
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
require.NoError(t, err)
mgr.Run(ctx)
// Notifier is on, enqueue the first message.
sid, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"type": "success"}, "test")
require.NoError(t, err)
require.Eventually(t, func() bool {
handler.mu.RLock()
defer handler.mu.RUnlock()
return slices.Contains(handler.succeeded, sid.String())
}, testutil.WaitShort, testutil.IntervalFast)
// Pause the notifier.
settingsJSON, err := json.Marshal(&codersdk.NotificationsSettings{NotifierPaused: true})
require.NoError(t, err)
err = db.UpsertNotificationsSettings(ctx, string(settingsJSON))
require.NoError(t, err)
// Notifier is paused, enqueue the next message.
sid, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"type": "success"}, "test")
require.NoError(t, err)
require.Eventually(t, func() bool {
pendingMessages, err := db.GetNotificationMessagesByStatus(ctx, database.GetNotificationMessagesByStatusParams{
Status: database.NotificationMessageStatusPending,
})
assert.NoError(t, err)
return len(pendingMessages) == 1
}, testutil.WaitShort, testutil.IntervalFast)
// Unpause the notifier.
settingsJSON, err = json.Marshal(&codersdk.NotificationsSettings{NotifierPaused: false})
require.NoError(t, err)
err = db.UpsertNotificationsSettings(ctx, string(settingsJSON))
require.NoError(t, err)
// Notifier is running again, message should be dequeued.
require.Eventually(t, func() bool {
handler.mu.RLock()
defer handler.mu.RUnlock()
return slices.Contains(handler.succeeded, sid.String())
}, testutil.WaitShort, testutil.IntervalFast)
}
type fakeHandler struct {
mu sync.RWMutex
@ -546,7 +611,7 @@ type fakeHandler struct {
}
func (f *fakeHandler) Dispatcher(payload types.MessagePayload, _, _ string) (dispatch.DeliveryFunc, error) {
return func(ctx context.Context, msgID uuid.UUID) (retryable bool, err error) {
return func(_ context.Context, msgID uuid.UUID) (retryable bool, err error) {
f.mu.Lock()
defer f.mu.Unlock()