feat(coderd): add inbox notifications endpoints (#16889)

This PR is part of the inbox notifications topic, and rely on previous
PRs merged - it adds :

- Endpoints to : 
  - WS : watch new inbox notifications
  - REST : list inbox notifications
  - REST : update the read status of a notification

Also, this PR acts as a follow-up PR from previous work and : 

- fix DB query issues
- fix DBMem logic to match DB
This commit is contained in:
Vincent Vielle
2025-03-18 00:02:47 +01:00
committed by GitHub
parent e85c92e7d5
commit 3ae55bbbf4
20 changed files with 2093 additions and 65 deletions

View File

@ -13,8 +13,11 @@ import (
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/notifications/types"
coderdpubsub "github.com/coder/coder/v2/coderd/pubsub"
markdown "github.com/coder/coder/v2/coderd/render"
"github.com/coder/coder/v2/codersdk"
)
type InboxStore interface {
@ -23,12 +26,13 @@ type InboxStore interface {
// InboxHandler is responsible for dispatching notification messages to the Coder Inbox.
type InboxHandler struct {
log slog.Logger
store InboxStore
log slog.Logger
store InboxStore
pubsub pubsub.Pubsub
}
func NewInboxHandler(log slog.Logger, store InboxStore) *InboxHandler {
return &InboxHandler{log: log, store: store}
func NewInboxHandler(log slog.Logger, store InboxStore, ps pubsub.Pubsub) *InboxHandler {
return &InboxHandler{log: log, store: store, pubsub: ps}
}
func (s *InboxHandler) Dispatcher(payload types.MessagePayload, titleTmpl, bodyTmpl string, _ template.FuncMap) (DeliveryFunc, error) {
@ -62,7 +66,7 @@ func (s *InboxHandler) dispatch(payload types.MessagePayload, title, body string
}
// nolint:exhaustruct
_, err = s.store.InsertInboxNotification(ctx, database.InsertInboxNotificationParams{
insertedNotif, err := s.store.InsertInboxNotification(ctx, database.InsertInboxNotificationParams{
ID: msgID,
UserID: userID,
TemplateID: templateID,
@ -76,6 +80,38 @@ func (s *InboxHandler) dispatch(payload types.MessagePayload, title, body string
return false, xerrors.Errorf("insert inbox notification: %w", err)
}
event := coderdpubsub.InboxNotificationEvent{
Kind: coderdpubsub.InboxNotificationEventKindNew,
InboxNotification: codersdk.InboxNotification{
ID: msgID,
UserID: userID,
TemplateID: templateID,
Targets: payload.Targets,
Title: title,
Content: body,
Actions: func() []codersdk.InboxNotificationAction {
var actions []codersdk.InboxNotificationAction
err := json.Unmarshal(insertedNotif.Actions, &actions)
if err != nil {
return actions
}
return actions
}(),
ReadAt: nil, // notification just has been inserted
CreatedAt: insertedNotif.CreatedAt,
},
}
payload, err := json.Marshal(event)
if err != nil {
return false, xerrors.Errorf("marshal event: %w", err)
}
err = s.pubsub.Publish(coderdpubsub.InboxNotificationForOwnerEventChannel(userID), payload)
if err != nil {
return false, xerrors.Errorf("publish event: %w", err)
}
return false, nil
}
}

View File

@ -73,7 +73,7 @@ func TestInbox(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
db, _ := dbtestutil.NewDB(t)
db, pubsub := dbtestutil.NewDB(t)
if tc.payload.UserID == "valid" {
user := dbgen.User(t, db, database.User{})
@ -82,7 +82,7 @@ func TestInbox(t *testing.T) {
ctx := context.Background()
handler := dispatch.NewInboxHandler(logger.Named("smtp"), db)
handler := dispatch.NewInboxHandler(logger.Named("smtp"), db, pubsub)
dispatcherFunc, err := handler.Dispatcher(tc.payload, "", "", nil)
require.NoError(t, err)

View File

@ -14,6 +14,7 @@ import (
"github.com/coder/quartz"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/notifications/dispatch"
"github.com/coder/coder/v2/codersdk"
)
@ -75,8 +76,7 @@ func WithTestClock(clock quartz.Clock) ManagerOption {
//
// helpers is a map of template helpers which are used to customize notification messages to use global settings like
// access URL etc.
func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.FuncMap, metrics *Metrics, log slog.Logger, opts ...ManagerOption) (*Manager, error) {
// TODO(dannyk): add the ability to use multiple notification methods.
func NewManager(cfg codersdk.NotificationsConfig, store Store, ps pubsub.Pubsub, helpers template.FuncMap, metrics *Metrics, log slog.Logger, opts ...ManagerOption) (*Manager, error) {
var method database.NotificationMethod
if err := method.Scan(cfg.Method.String()); err != nil {
return nil, xerrors.Errorf("notification method %q is invalid", cfg.Method)
@ -109,7 +109,7 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.
stop: make(chan any),
done: make(chan any),
handlers: defaultHandlers(cfg, log, store),
handlers: defaultHandlers(cfg, log, store, ps),
helpers: helpers,
clock: quartz.NewReal(),
@ -121,11 +121,11 @@ func NewManager(cfg codersdk.NotificationsConfig, store Store, helpers template.
}
// defaultHandlers builds a set of known handlers; panics if any error occurs as these handlers should be valid at compile time.
func defaultHandlers(cfg codersdk.NotificationsConfig, log slog.Logger, store Store) map[database.NotificationMethod]Handler {
func defaultHandlers(cfg codersdk.NotificationsConfig, log slog.Logger, store Store, ps pubsub.Pubsub) map[database.NotificationMethod]Handler {
return map[database.NotificationMethod]Handler{
database.NotificationMethodSmtp: dispatch.NewSMTPHandler(cfg.SMTP, log.Named("dispatcher.smtp")),
database.NotificationMethodWebhook: dispatch.NewWebhookHandler(cfg.Webhook, log.Named("dispatcher.webhook")),
database.NotificationMethodInbox: dispatch.NewInboxHandler(log.Named("dispatcher.inbox"), store),
database.NotificationMethodInbox: dispatch.NewInboxHandler(log.Named("dispatcher.inbox"), store, ps),
}
}

View File

@ -33,7 +33,7 @@ func TestBufferedUpdates(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, ps := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
interceptor := &syncInterceptor{Store: store}
@ -44,7 +44,7 @@ func TestBufferedUpdates(t *testing.T) {
cfg.StoreSyncInterval = serpent.Duration(time.Hour) // Ensure we don't sync the store automatically.
// GIVEN: a manager which will pass or fail notifications based on their "nice" labels
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), createMetrics(), logger.Named("notifications-manager"))
mgr, err := notifications.NewManager(cfg, interceptor, ps, defaultHelpers(), createMetrics(), logger.Named("notifications-manager"))
require.NoError(t, err)
handlers := map[database.NotificationMethod]notifications.Handler{
@ -168,11 +168,11 @@ func TestStopBeforeRun(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, ps := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
// GIVEN: a standard manager
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), store, defaultHelpers(), createMetrics(), logger.Named("notifications-manager"))
mgr, err := notifications.NewManager(defaultNotificationsConfig(database.NotificationMethodSmtp), store, ps, defaultHelpers(), createMetrics(), logger.Named("notifications-manager"))
require.NoError(t, err)
// THEN: validate that the manager can be stopped safely without Run() having been called yet

View File

@ -39,7 +39,7 @@ func TestMetrics(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
reg := prometheus.NewRegistry()
@ -60,7 +60,7 @@ func TestMetrics(t *testing.T) {
cfg.RetryInterval = serpent.Duration(time.Millisecond * 50)
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) // Twice as long as fetch interval to ensure we catch pending updates.
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), metrics, logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), metrics, logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
@ -228,7 +228,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
// SETUP
// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
reg := prometheus.NewRegistry()
@ -250,7 +250,7 @@ func TestPendingUpdatesMetric(t *testing.T) {
defer trap.Close()
fetchTrap := mClock.Trap().TickerFunc("notifier", "fetchInterval")
defer fetchTrap.Close()
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), metrics, logger.Named("manager"),
mgr, err := notifications.NewManager(cfg, interceptor, pubsub, defaultHelpers(), metrics, logger.Named("manager"),
notifications.WithTestClock(mClock))
require.NoError(t, err)
t.Cleanup(func() {
@ -322,7 +322,7 @@ func TestInflightDispatchesMetric(t *testing.T) {
// SETUP
// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
reg := prometheus.NewRegistry()
@ -338,7 +338,7 @@ func TestInflightDispatchesMetric(t *testing.T) {
cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere.
cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100)
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), metrics, logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), metrics, logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
@ -402,7 +402,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsSystemRestricted(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
var (
@ -427,7 +427,7 @@ func TestCustomMethodMetricCollection(t *testing.T) {
// WHEN: two notifications (each with different templates) are enqueued.
cfg := defaultNotificationsConfig(defaultMethod)
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), metrics, logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), metrics, logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))

View File

@ -71,7 +71,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
method := database.NotificationMethodSmtp
@ -80,7 +80,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
interceptor := &syncInterceptor{Store: store}
cfg := defaultNotificationsConfig(method)
cfg.RetryInterval = serpent.Duration(time.Hour) // Ensure retries don't interfere with the test
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, interceptor, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
@ -138,7 +138,7 @@ func TestSMTPDispatch(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
// start mock SMTP server
@ -161,7 +161,7 @@ func TestSMTPDispatch(t *testing.T) {
Hello: "localhost",
}
handler := newDispatchInterceptor(dispatch.NewSMTPHandler(cfg.SMTP, logger.Named("smtp")))
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
@ -204,7 +204,7 @@ func TestWebhookDispatch(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
sent := make(chan dispatch.WebhookPayload, 1)
@ -230,7 +230,7 @@ func TestWebhookDispatch(t *testing.T) {
cfg.Webhook = codersdk.NotificationsWebhookConfig{
Endpoint: *serpent.URLOf(endpoint),
}
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
@ -284,7 +284,7 @@ func TestBackpressure(t *testing.T) {
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
}
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitShort))
@ -319,7 +319,7 @@ func TestBackpressure(t *testing.T) {
defer fetchTrap.Close()
// GIVEN: a notification manager whose updates will be intercepted
mgr, err := notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(),
mgr, err := notifications.NewManager(cfg, storeInterceptor, pubsub, defaultHelpers(), createMetrics(),
logger.Named("manager"), notifications.WithTestClock(mClock))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
@ -417,7 +417,7 @@ func TestRetries(t *testing.T) {
const maxAttempts = 3
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
// GIVEN: a mock HTTP server which will receive webhooksand a map to track the dispatch attempts
@ -468,7 +468,7 @@ func TestRetries(t *testing.T) {
// Intercept calls to submit the buffered updates to the store.
storeInterceptor := &syncInterceptor{Store: store}
mgr, err := notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, storeInterceptor, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
@ -517,7 +517,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
// GIVEN: a manager which has its updates intercepted and paused until measurements can be taken
@ -539,7 +539,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
mgrCtx, cancelManagerCtx := context.WithCancel(dbauthz.AsNotifier(context.Background()))
t.Cleanup(cancelManagerCtx)
mgr, err := notifications.NewManager(cfg, noopInterceptor, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, noopInterceptor, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)
@ -588,7 +588,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
// Intercept calls to submit the buffered updates to the store.
storeInterceptor := &syncInterceptor{Store: store}
handler := newDispatchInterceptor(&fakeHandler{})
mgr, err = notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err = notifications.NewManager(cfg, storeInterceptor, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
@ -620,7 +620,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
func TestInvalidConfig(t *testing.T) {
t.Parallel()
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
// GIVEN: invalid config with dispatch period <= lease period
@ -633,7 +633,7 @@ func TestInvalidConfig(t *testing.T) {
cfg.DispatchTimeout = serpent.Duration(leasePeriod)
// WHEN: the manager is created with invalid config
_, err := notifications.NewManager(cfg, store, defaultHelpers(), createMetrics(), logger.Named("manager"))
_, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
// THEN: the manager will fail to be created, citing invalid config as error
require.ErrorIs(t, err, notifications.ErrInvalidDispatchTimeout)
@ -646,7 +646,7 @@ func TestNotifierPaused(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
// Prepare the test.
@ -657,7 +657,7 @@ func TestNotifierPaused(t *testing.T) {
const fetchInterval = time.Millisecond * 100
cfg := defaultNotificationsConfig(method)
cfg.FetchInterval = serpent.Duration(fetchInterval)
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
@ -1229,6 +1229,8 @@ func TestNotificationTemplates_Golden(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
_, pubsub := dbtestutil.NewDB(t)
// smtp config shared between client and server
smtpConfig := codersdk.NotificationsEmailConfig{
Hello: hello,
@ -1296,6 +1298,7 @@ func TestNotificationTemplates_Golden(t *testing.T) {
smtpManager, err := notifications.NewManager(
smtpCfg,
*db,
pubsub,
defaultHelpers(),
createMetrics(),
logger.Named("manager"),
@ -1410,6 +1413,7 @@ func TestNotificationTemplates_Golden(t *testing.T) {
return &db, &api.Logger, &user
}()
_, pubsub := dbtestutil.NewDB(t)
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
@ -1437,6 +1441,7 @@ func TestNotificationTemplates_Golden(t *testing.T) {
webhookManager, err := notifications.NewManager(
webhookCfg,
*db,
pubsub,
defaultHelpers(),
createMetrics(),
logger.Named("manager"),
@ -1613,13 +1618,13 @@ func TestDisabledAfterEnqueue(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
method := database.NotificationMethodSmtp
cfg := defaultNotificationsConfig(method)
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
@ -1670,7 +1675,7 @@ func TestCustomNotificationMethod(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
received := make(chan uuid.UUID, 1)
@ -1728,7 +1733,7 @@ func TestCustomNotificationMethod(t *testing.T) {
Endpoint: *serpent.URLOf(endpoint),
}
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
_ = mgr.Stop(ctx)
@ -1811,13 +1816,13 @@ func TestNotificationDuplicates(t *testing.T) {
// nolint:gocritic // Unit test.
ctx := dbauthz.AsNotifier(testutil.Context(t, testutil.WaitSuperLong))
store, _ := dbtestutil.NewDB(t)
store, pubsub := dbtestutil.NewDB(t)
logger := testutil.Logger(t)
method := database.NotificationMethodSmtp
cfg := defaultNotificationsConfig(method)
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), createMetrics(), logger.Named("manager"))
mgr, err := notifications.NewManager(cfg, store, pubsub, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))