feat(coderd): add new dispatch logic for coder inbox (#16764)

This PR is [resolving the dispatch part of Coder
Inbocx](https://github.com/coder/internal/issues/403).

Since the DB layer has been merged - we now want to insert notifications
into Coder Inbox in parallel of the other delivery target.

To do so, we push two messages instead of one using the `Enqueue`
method.
This commit is contained in:
Vincent Vielle
2025-03-05 22:43:18 +01:00
committed by GitHub
parent 32450a2f77
commit 522181fead
42 changed files with 415 additions and 120 deletions

View File

@ -82,7 +82,10 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
cfg.RetryInterval = serpent.Duration(time.Hour) // Ensure retries don't interfere with the test
mgr, err := notifications.NewManager(cfg, interceptor, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
database.NotificationMethodInbox: &fakeHandler{},
})
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
})
@ -103,14 +106,14 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
require.Eventually(t, func() bool {
handler.mu.RLock()
defer handler.mu.RUnlock()
return slices.Contains(handler.succeeded, sid.String()) &&
slices.Contains(handler.failed, fid.String())
return slices.Contains(handler.succeeded, sid[0].String()) &&
slices.Contains(handler.failed, fid[0].String())
}, testutil.WaitLong, testutil.IntervalFast)
// THEN: we expect the store to be called with the updates of the earlier dispatches
require.Eventually(t, func() bool {
return interceptor.sent.Load() == 1 &&
interceptor.failed.Load() == 1
return interceptor.sent.Load() == 2 &&
interceptor.failed.Load() == 2
}, testutil.WaitLong, testutil.IntervalFast)
// THEN: we verify that the store contains notifications in their expected state
@ -119,13 +122,13 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
Limit: 10,
})
require.NoError(t, err)
require.Len(t, success, 1)
require.Len(t, success, 2)
failed, err := store.GetNotificationMessagesByStatus(ctx, database.GetNotificationMessagesByStatusParams{
Status: database.NotificationMessageStatusTemporaryFailure,
Limit: 10,
})
require.NoError(t, err)
require.Len(t, failed, 1)
require.Len(t, failed, 2)
}
func TestSMTPDispatch(t *testing.T) {
@ -160,7 +163,10 @@ func TestSMTPDispatch(t *testing.T) {
handler := newDispatchInterceptor(dispatch.NewSMTPHandler(cfg.SMTP, logger.Named("smtp")))
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
database.NotificationMethodInbox: &fakeHandler{},
})
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
})
@ -172,6 +178,7 @@ func TestSMTPDispatch(t *testing.T) {
// WHEN: a message is enqueued
msgID, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{}, "test")
require.NoError(t, err)
require.Len(t, msgID, 2)
mgr.Run(ctx)
@ -187,7 +194,7 @@ func TestSMTPDispatch(t *testing.T) {
require.Len(t, msgs, 1)
require.Contains(t, msgs[0].MsgRequest(), fmt.Sprintf("From: %s", from))
require.Contains(t, msgs[0].MsgRequest(), fmt.Sprintf("To: %s", user.Email))
require.Contains(t, msgs[0].MsgRequest(), fmt.Sprintf("Message-Id: %s", msgID))
require.Contains(t, msgs[0].MsgRequest(), fmt.Sprintf("Message-Id: %s", msgID[0]))
}
func TestWebhookDispatch(t *testing.T) {
@ -255,7 +262,7 @@ func TestWebhookDispatch(t *testing.T) {
// THEN: the webhook is received by the mock server and has the expected contents
payload := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, sent)
require.EqualValues(t, "1.1", payload.Version)
require.Equal(t, *msgID, payload.MsgID)
require.Equal(t, msgID[0], payload.MsgID)
require.Equal(t, payload.Payload.Labels, input)
require.Equal(t, payload.Payload.UserEmail, email)
// UserName is coalesced from `name` and `username`; in this case `name` wins.
@ -315,7 +322,10 @@ func TestBackpressure(t *testing.T) {
mgr, err := notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(),
logger.Named("manager"), notifications.WithTestClock(mClock))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
database.NotificationMethodInbox: handler,
})
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), mClock)
require.NoError(t, err)
@ -463,7 +473,10 @@ func TestRetries(t *testing.T) {
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
})
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
database.NotificationMethodInbox: &fakeHandler{},
})
enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer"), quartz.NewReal())
require.NoError(t, err)
@ -478,11 +491,14 @@ func TestRetries(t *testing.T) {
mgr.Run(ctx)
// THEN: we expect to see all but the final attempts failing
// the number of tries is equal to the number of messages times the number of attempts
// times 2 as the Enqueue method pushes into both the defined dispatch method and inbox
nbTries := msgCount * maxAttempts * 2
// THEN: we expect to see all but the final attempts failing on webhook, and all messages to fail on inbox
require.Eventually(t, func() bool {
// We expect all messages to fail all attempts but the final;
return storeInterceptor.failed.Load() == msgCount*(maxAttempts-1) &&
// ...and succeed on the final attempt.
// nolint:gosec
return storeInterceptor.failed.Load() == int32(nbTries-msgCount) &&
storeInterceptor.sent.Load() == msgCount
}, testutil.WaitLong, testutil.IntervalFast)
}
@ -533,10 +549,11 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
// WHEN: a few notifications are enqueued which will all succeed
var msgs []string
for i := 0; i < msgCount; i++ {
id, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted,
ids, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted,
map[string]string{"type": "success", "index": fmt.Sprintf("%d", i)}, "test")
require.NoError(t, err)
msgs = append(msgs, id.String())
require.Len(t, ids, 2)
msgs = append(msgs, ids[0].String(), ids[1].String())
}
mgr.Run(mgrCtx)
@ -551,7 +568,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
// Fetch any messages currently in "leased" status, and verify that they're exactly the ones we enqueued.
leased, err := store.GetNotificationMessagesByStatus(ctx, database.GetNotificationMessagesByStatusParams{
Status: database.NotificationMessageStatusLeased,
Limit: msgCount,
Limit: msgCount * 2,
})
require.NoError(t, err)
@ -573,7 +590,10 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
handler := newDispatchInterceptor(&fakeHandler{})
mgr, err = notifications.NewManager(cfg, storeInterceptor, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
database.NotificationMethodInbox: &fakeHandler{},
})
// Use regular context now.
t.Cleanup(func() {
@ -584,7 +604,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
// Wait until all messages are sent & updates flushed to the database.
require.Eventually(t, func() bool {
return handler.sent.Load() == msgCount &&
storeInterceptor.sent.Load() == msgCount
storeInterceptor.sent.Load() == msgCount*2
}, testutil.WaitLong, testutil.IntervalFast)
// Validate that no more messages are in "leased" status.
@ -639,7 +659,10 @@ func TestNotifierPaused(t *testing.T) {
cfg.FetchInterval = serpent.Duration(fetchInterval)
mgr, err := notifications.NewManager(cfg, store, defaultHelpers(), createMetrics(), logger.Named("manager"))
require.NoError(t, err)
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{
method: handler,
database.NotificationMethodInbox: &fakeHandler{},
})
t.Cleanup(func() {
assert.NoError(t, mgr.Stop(ctx))
})
@ -667,8 +690,9 @@ func TestNotifierPaused(t *testing.T) {
Limit: 10,
})
require.NoError(t, err)
require.Len(t, pendingMessages, 1)
require.Equal(t, pendingMessages[0].ID.String(), sid.String())
require.Len(t, pendingMessages, 2)
require.Equal(t, pendingMessages[0].ID.String(), sid[0].String())
require.Equal(t, pendingMessages[1].ID.String(), sid[1].String())
// Wait a few fetch intervals to be sure that no new notifications are being sent.
// TODO: use quartz instead.
@ -691,7 +715,7 @@ func TestNotifierPaused(t *testing.T) {
require.Eventually(t, func() bool {
handler.mu.RLock()
defer handler.mu.RUnlock()
return slices.Contains(handler.succeeded, sid.String())
return slices.Contains(handler.succeeded, sid[0].String())
}, fetchInterval*5, testutil.IntervalFast)
}
@ -767,6 +791,10 @@ func TestNotificationTemplates_Golden(t *testing.T) {
"reason": "autodeleted due to dormancy",
"initiator": "autobuild",
},
Targets: []uuid.UUID{
uuid.MustParse("5c6ea841-ca63-46cc-9c37-78734c7a788b"),
uuid.MustParse("b8355e3a-f3c5-4dd1-b382-7eb1fae7db52"),
},
},
},
{
@ -780,6 +808,10 @@ func TestNotificationTemplates_Golden(t *testing.T) {
"name": "bobby-workspace",
"reason": "autostart",
},
Targets: []uuid.UUID{
uuid.MustParse("5c6ea841-ca63-46cc-9c37-78734c7a788b"),
uuid.MustParse("b8355e3a-f3c5-4dd1-b382-7eb1fae7db52"),
},
},
},
{
@ -1298,6 +1330,7 @@ func TestNotificationTemplates_Golden(t *testing.T) {
)
require.NoError(t, err)
tc.payload.Targets = append(tc.payload.Targets, user.ID)
_, err = smtpEnqueuer.EnqueueWithData(
ctx,
user.ID,
@ -1305,7 +1338,7 @@ func TestNotificationTemplates_Golden(t *testing.T) {
tc.payload.Labels,
tc.payload.Data,
user.Username,
user.ID,
tc.payload.Targets...,
)
require.NoError(t, err)
@ -1620,8 +1653,8 @@ func TestDisabledAfterEnqueue(t *testing.T) {
Limit: 10,
})
assert.NoError(ct, err)
if assert.Equal(ct, len(m), 1) {
assert.Equal(ct, m[0].ID.String(), msgID.String())
if assert.Equal(ct, len(m), 2) {
assert.Contains(ct, []string{m[0].ID.String(), m[1].ID.String()}, msgID[0].String())
assert.Contains(ct, m[0].StatusReason.String, "disabled by user")
}
}, testutil.WaitLong, testutil.IntervalFast, "did not find the expected inhibited message")
@ -1713,7 +1746,7 @@ func TestCustomNotificationMethod(t *testing.T) {
mgr.Run(ctx)
receivedMsgID := testutil.RequireRecvCtx(ctx, t, received)
require.Equal(t, msgID.String(), receivedMsgID.String())
require.Equal(t, msgID[0].String(), receivedMsgID.String())
// Ensure no messages received by default method (SMTP):
msgs := mockSMTPSrv.MessagesAndPurge()
@ -1725,7 +1758,7 @@ func TestCustomNotificationMethod(t *testing.T) {
require.EventuallyWithT(t, func(ct *assert.CollectT) {
msgs := mockSMTPSrv.MessagesAndPurge()
if assert.Len(ct, msgs, 1) {
assert.Contains(ct, msgs[0].MsgRequest(), fmt.Sprintf("Message-Id: %s", msgID))
assert.Contains(ct, msgs[0].MsgRequest(), fmt.Sprintf("Message-Id: %s", msgID[0]))
}
}, testutil.WaitLong, testutil.IntervalFast)
}