mirror of
https://github.com/coder/coder.git
synced 2025-07-03 16:13:58 +00:00
chore: improve notifications tests (#13863)
This commit is contained in:
@ -13,19 +13,19 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/google/uuid"
|
||||
smtpmock "github.com/mocktools/go-smtp-mock/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/serpent"
|
||||
|
||||
"github.com/coder/coder/v2/coderd/database"
|
||||
"github.com/coder/coder/v2/coderd/database/dbgen"
|
||||
"github.com/coder/coder/v2/coderd/database/dbmem"
|
||||
"github.com/coder/coder/v2/coderd/database/dbtestutil"
|
||||
"github.com/coder/coder/v2/coderd/notifications"
|
||||
"github.com/coder/coder/v2/coderd/notifications/dispatch"
|
||||
@ -40,23 +40,24 @@ func TestMain(m *testing.M) {
|
||||
}
|
||||
|
||||
// TestBasicNotificationRoundtrip enqueues a message to the store, waits for it to be acquired by a notifier,
|
||||
// and passes it off to a fake handler.
|
||||
// TODO: split this test up into table tests or separate tests.
|
||||
// passes it off to a fake handler, and ensures the results are synchronized to the store.
|
||||
func TestBasicNotificationRoundtrip(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// setup
|
||||
// SETUP
|
||||
if !dbtestutil.WillUsePostgres() {
|
||||
t.Skip("This test requires postgres")
|
||||
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
|
||||
}
|
||||
|
||||
ctx, logger, db := setup(t)
|
||||
method := database.NotificationMethodSmtp
|
||||
|
||||
// given
|
||||
// GIVEN: a manager with standard config but a faked dispatch handler
|
||||
handler := &fakeHandler{}
|
||||
|
||||
interceptor := &bulkUpdateInterceptor{Store: db}
|
||||
cfg := defaultNotificationsConfig(method)
|
||||
mgr, err := notifications.NewManager(cfg, db, logger.Named("manager"))
|
||||
cfg.RetryInterval = serpent.Duration(time.Hour) // Ensure retries don't interfere with the test
|
||||
mgr, err := notifications.NewManager(cfg, interceptor, logger.Named("manager"))
|
||||
require.NoError(t, err)
|
||||
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
|
||||
t.Cleanup(func() {
|
||||
@ -67,7 +68,7 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
|
||||
|
||||
user := createSampleUser(t, db)
|
||||
|
||||
// when
|
||||
// WHEN: 2 messages are enqueued
|
||||
sid, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"type": "success"}, "test")
|
||||
require.NoError(t, err)
|
||||
fid, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"type": "failure"}, "test")
|
||||
@ -75,27 +76,40 @@ func TestBasicNotificationRoundtrip(t *testing.T) {
|
||||
|
||||
mgr.Run(ctx)
|
||||
|
||||
// then
|
||||
// THEN: we expect that the handler will have received the notifications for dispatch
|
||||
require.Eventually(t, func() bool {
|
||||
handler.mu.RLock()
|
||||
defer handler.mu.RUnlock()
|
||||
return handler.succeeded == sid.String()
|
||||
}, testutil.WaitLong, testutil.IntervalMedium)
|
||||
return slices.Contains(handler.succeeded, sid.String()) &&
|
||||
slices.Contains(handler.failed, fid.String())
|
||||
}, testutil.WaitLong, testutil.IntervalFast)
|
||||
|
||||
// THEN: we expect the store to be called with the updates of the earlier dispatches
|
||||
require.Eventually(t, func() bool {
|
||||
handler.mu.RLock()
|
||||
defer handler.mu.RUnlock()
|
||||
return handler.failed == fid.String()
|
||||
}, testutil.WaitLong, testutil.IntervalMedium)
|
||||
return interceptor.sent.Load() == 1 &&
|
||||
interceptor.failed.Load() == 1
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
|
||||
// THEN: we verify that the store contains notifications in their expected state
|
||||
success, err := db.GetNotificationMessagesByStatus(ctx, database.GetNotificationMessagesByStatusParams{
|
||||
Status: database.NotificationMessageStatusSent,
|
||||
Limit: 10,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, success, 1)
|
||||
failed, err := db.GetNotificationMessagesByStatus(ctx, database.GetNotificationMessagesByStatusParams{
|
||||
Status: database.NotificationMessageStatusTemporaryFailure,
|
||||
Limit: 10,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, failed, 1)
|
||||
}
|
||||
|
||||
func TestSMTPDispatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// setup
|
||||
if !dbtestutil.WillUsePostgres() {
|
||||
t.Skip("This test requires postgres")
|
||||
}
|
||||
ctx, logger, db := setup(t)
|
||||
// SETUP
|
||||
ctx, logger, db := setupInMemory(t)
|
||||
|
||||
// start mock SMTP server
|
||||
mockSMTPSrv := smtpmock.New(smtpmock.ConfigurationAttr{
|
||||
@ -107,7 +121,7 @@ func TestSMTPDispatch(t *testing.T) {
|
||||
assert.NoError(t, mockSMTPSrv.Stop())
|
||||
})
|
||||
|
||||
// given
|
||||
// GIVEN: an SMTP setup referencing a mock SMTP server
|
||||
const from = "danny@coder.com"
|
||||
method := database.NotificationMethodSmtp
|
||||
cfg := defaultNotificationsConfig(method)
|
||||
@ -128,19 +142,20 @@ func TestSMTPDispatch(t *testing.T) {
|
||||
|
||||
user := createSampleUser(t, db)
|
||||
|
||||
// when
|
||||
// WHEN: a message is enqueued
|
||||
msgID, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{}, "test")
|
||||
require.NoError(t, err)
|
||||
|
||||
mgr.Run(ctx)
|
||||
|
||||
// then
|
||||
// THEN: wait until the dispatch interceptor validates that the messages were dispatched
|
||||
require.Eventually(t, func() bool {
|
||||
assert.Nil(t, handler.lastErr.Load())
|
||||
assert.True(t, handler.retryable.Load() == 0)
|
||||
return handler.sent.Load() == 1
|
||||
}, testutil.WaitLong, testutil.IntervalMedium)
|
||||
|
||||
// THEN: we verify that the expected message was received by the mock SMTP server
|
||||
msgs := mockSMTPSrv.MessagesAndPurge()
|
||||
require.Len(t, msgs, 1)
|
||||
require.Contains(t, msgs[0].MsgRequest(), fmt.Sprintf("From: %s", from))
|
||||
@ -151,11 +166,8 @@ func TestSMTPDispatch(t *testing.T) {
|
||||
func TestWebhookDispatch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// setup
|
||||
if !dbtestutil.WillUsePostgres() {
|
||||
t.Skip("This test requires postgres")
|
||||
}
|
||||
ctx, logger, db := setup(t)
|
||||
// SETUP
|
||||
ctx, logger, db := setupInMemory(t)
|
||||
|
||||
sent := make(chan dispatch.WebhookPayload, 1)
|
||||
// Mock server to simulate webhook endpoint.
|
||||
@ -175,7 +187,7 @@ func TestWebhookDispatch(t *testing.T) {
|
||||
endpoint, err := url.Parse(server.URL)
|
||||
require.NoError(t, err)
|
||||
|
||||
// given
|
||||
// GIVEN: a webhook setup referencing a mock HTTP server to receive the webhook
|
||||
cfg := defaultNotificationsConfig(database.NotificationMethodWebhook)
|
||||
cfg.Webhook = codersdk.NotificationsWebhookConfig{
|
||||
Endpoint: *serpent.URLOf(endpoint),
|
||||
@ -188,13 +200,17 @@ func TestWebhookDispatch(t *testing.T) {
|
||||
enq, err := notifications.NewStoreEnqueuer(cfg, db, defaultHelpers(), logger.Named("enqueuer"))
|
||||
require.NoError(t, err)
|
||||
|
||||
const (
|
||||
email = "bob@coder.com"
|
||||
name = "Robert McBobbington"
|
||||
)
|
||||
user := dbgen.User(t, db, database.User{
|
||||
Email: "bob@coder.com",
|
||||
Email: email,
|
||||
Username: "bob",
|
||||
Name: "Robert McBobbington",
|
||||
Name: name,
|
||||
})
|
||||
|
||||
// when
|
||||
// WHEN: a notification is enqueued (including arbitrary labels)
|
||||
input := map[string]string{
|
||||
"a": "b",
|
||||
"c": "d",
|
||||
@ -204,15 +220,18 @@ func TestWebhookDispatch(t *testing.T) {
|
||||
|
||||
mgr.Run(ctx)
|
||||
|
||||
// then
|
||||
// THEN: the webhook is received by the mock server and has the expected contents
|
||||
payload := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, sent)
|
||||
require.EqualValues(t, "1.0", payload.Version)
|
||||
require.Equal(t, *msgID, payload.MsgID)
|
||||
require.Equal(t, payload.Payload.Labels, input)
|
||||
require.Equal(t, payload.Payload.UserEmail, "bob@coder.com")
|
||||
require.Equal(t, payload.Payload.UserEmail, email)
|
||||
// UserName is coalesced from `name` and `username`; in this case `name` wins.
|
||||
require.Equal(t, payload.Payload.UserName, "Robert McBobbington")
|
||||
require.Equal(t, payload.Payload.NotificationName, "Workspace Deleted")
|
||||
// This is not strictly necessary for this test, but it's testing some side logic which is too small for its own test.
|
||||
require.Equal(t, payload.Payload.UserName, name)
|
||||
// Right now we don't have a way to query notification templates by ID in dbmem, and it's not necessary to add this
|
||||
// just to satisfy this test. We can safely assume that as long as this value is not empty that the given value was delivered.
|
||||
require.NotEmpty(t, payload.Payload.NotificationName)
|
||||
}
|
||||
|
||||
// TestBackpressure validates that delays in processing the buffered updates will result in slowed dequeue rates.
|
||||
@ -220,9 +239,9 @@ func TestWebhookDispatch(t *testing.T) {
|
||||
func TestBackpressure(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// setup
|
||||
// SETUP
|
||||
if !dbtestutil.WillUsePostgres() {
|
||||
t.Skip("This test requires postgres")
|
||||
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
|
||||
}
|
||||
|
||||
ctx, logger, db := setup(t)
|
||||
@ -268,7 +287,7 @@ func TestBackpressure(t *testing.T) {
|
||||
// Intercept calls to submit the buffered updates to the store.
|
||||
storeInterceptor := &bulkUpdateInterceptor{Store: db}
|
||||
|
||||
// given
|
||||
// GIVEN: a notification manager whose updates will be intercepted
|
||||
mgr, err := notifications.NewManager(cfg, storeInterceptor, logger.Named("manager"))
|
||||
require.NoError(t, err)
|
||||
mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{method: handler})
|
||||
@ -277,7 +296,7 @@ func TestBackpressure(t *testing.T) {
|
||||
|
||||
user := createSampleUser(t, db)
|
||||
|
||||
// when
|
||||
// WHEN: a set of notifications are enqueued, which causes backpressure due to the batchSize which can be processed per fetch
|
||||
const totalMessages = 30
|
||||
for i := 0; i < totalMessages; i++ {
|
||||
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"i": fmt.Sprintf("%d", i)}, "test")
|
||||
@ -287,7 +306,7 @@ func TestBackpressure(t *testing.T) {
|
||||
// Start the notifier.
|
||||
mgr.Run(ctx)
|
||||
|
||||
// then
|
||||
// THEN:
|
||||
|
||||
// Wait for 3 fetch intervals, then check progress.
|
||||
time.Sleep(fetchInterval * 3)
|
||||
@ -308,15 +327,15 @@ func TestBackpressure(t *testing.T) {
|
||||
func TestRetries(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// setup
|
||||
// SETUP
|
||||
if !dbtestutil.WillUsePostgres() {
|
||||
t.Skip("This test requires postgres")
|
||||
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
|
||||
}
|
||||
|
||||
const maxAttempts = 3
|
||||
ctx, logger, db := setup(t)
|
||||
|
||||
// given
|
||||
// GIVEN: a mock HTTP server which will receive webhooksand a map to track the dispatch attempts
|
||||
|
||||
receivedMap := syncmap.New[uuid.UUID, int]()
|
||||
// Mock server to simulate webhook endpoint.
|
||||
@ -375,7 +394,7 @@ func TestRetries(t *testing.T) {
|
||||
|
||||
user := createSampleUser(t, db)
|
||||
|
||||
// when
|
||||
// WHEN: a few notifications are enqueued, which will all fail until their final retry (determined by the mock server)
|
||||
const msgCount = 5
|
||||
for i := 0; i < msgCount; i++ {
|
||||
_, err = enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"i": fmt.Sprintf("%d", i)}, "test")
|
||||
@ -384,7 +403,7 @@ func TestRetries(t *testing.T) {
|
||||
|
||||
mgr.Run(ctx)
|
||||
|
||||
// then
|
||||
// THEN: we expect to see all but the final attempts failing
|
||||
require.Eventually(t, func() bool {
|
||||
// We expect all messages to fail all attempts but the final;
|
||||
return storeInterceptor.failed.Load() == msgCount*(maxAttempts-1) &&
|
||||
@ -400,14 +419,14 @@ func TestRetries(t *testing.T) {
|
||||
func TestExpiredLeaseIsRequeued(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// setup
|
||||
// SETUP
|
||||
if !dbtestutil.WillUsePostgres() {
|
||||
t.Skip("This test requires postgres")
|
||||
t.Skip("This test requires postgres; it relies on business-logic only implemented in the database")
|
||||
}
|
||||
|
||||
ctx, logger, db := setup(t)
|
||||
|
||||
// given
|
||||
// GIVEN: a manager which has its updates intercepted and paused until measurements can be taken
|
||||
|
||||
const (
|
||||
leasePeriod = time.Second
|
||||
@ -432,7 +451,7 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
|
||||
|
||||
user := createSampleUser(t, db)
|
||||
|
||||
// when
|
||||
// WHEN: a few notifications are enqueued which will all succeed
|
||||
var msgs []string
|
||||
for i := 0; i < msgCount; i++ {
|
||||
id, err := enq.Enqueue(ctx, user.ID, notifications.TemplateWorkspaceDeleted, map[string]string{"type": "success"}, "test")
|
||||
@ -442,6 +461,8 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
|
||||
|
||||
mgr.Run(mgrCtx)
|
||||
|
||||
// THEN:
|
||||
|
||||
// Wait for the messages to be acquired
|
||||
<-noopInterceptor.acquiredChan
|
||||
// Then cancel the context, forcing the notification manager to shutdown ungracefully (simulating a crash); leaving messages in "leased" status.
|
||||
@ -499,29 +520,29 @@ func TestExpiredLeaseIsRequeued(t *testing.T) {
|
||||
func TestInvalidConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
db := dbmem.New()
|
||||
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true, IgnoredErrorIs: []error{}}).Leveled(slog.LevelDebug)
|
||||
|
||||
// given
|
||||
_, logger, db := setupInMemory(t)
|
||||
|
||||
// GIVEN: invalid config with dispatch period <= lease period
|
||||
const (
|
||||
leasePeriod = time.Second
|
||||
method = database.NotificationMethodSmtp
|
||||
)
|
||||
|
||||
cfg := defaultNotificationsConfig(method)
|
||||
cfg.LeasePeriod = serpent.Duration(leasePeriod)
|
||||
cfg.DispatchTimeout = serpent.Duration(leasePeriod)
|
||||
|
||||
// WHEN: the manager is created with invalid config
|
||||
_, err := notifications.NewManager(cfg, db, logger.Named("manager"))
|
||||
|
||||
// THEN: the manager will fail to be created, citing invalid config as error
|
||||
require.ErrorIs(t, err, notifications.ErrInvalidDispatchTimeout)
|
||||
}
|
||||
|
||||
type fakeHandler struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
succeeded string
|
||||
failed string
|
||||
succeeded []string
|
||||
failed []string
|
||||
}
|
||||
|
||||
func (f *fakeHandler) Dispatcher(payload types.MessagePayload, _, _ string) (dispatch.DeliveryFunc, error) {
|
||||
@ -530,11 +551,12 @@ func (f *fakeHandler) Dispatcher(payload types.MessagePayload, _, _ string) (dis
|
||||
defer f.mu.Unlock()
|
||||
|
||||
if payload.Labels["type"] == "success" {
|
||||
f.succeeded = msgID.String()
|
||||
} else {
|
||||
f.failed = msgID.String()
|
||||
f.succeeded = append(f.succeeded, msgID.String())
|
||||
return false, nil
|
||||
}
|
||||
return false, nil
|
||||
|
||||
f.failed = append(f.failed, msgID.String())
|
||||
return true, xerrors.New("oops")
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user