package notifications_test import ( "context" "testing" "time" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/coder/serpent" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbtestutil" "github.com/coder/coder/v2/coderd/notifications" "github.com/coder/coder/v2/coderd/notifications/dispatch" "github.com/coder/coder/v2/coderd/notifications/types" "github.com/coder/coder/v2/testutil" ) func TestMetrics(t *testing.T) { t.Parallel() // SETUP if !dbtestutil.WillUsePostgres() { t.Skip("This test requires postgres; it relies on business-logic only implemented in the database") } ctx, logger, store := setup(t) reg := prometheus.NewRegistry() metrics := notifications.NewMetrics(reg) template := notifications.TemplateWorkspaceDeleted const ( method = database.NotificationMethodSmtp maxAttempts = 3 debug = false ) // GIVEN: a notification manager whose intervals are tuned low (for test speed) and whose dispatches are intercepted cfg := defaultNotificationsConfig(method) cfg.MaxSendAttempts = maxAttempts // Tune the intervals low to increase test speed. cfg.FetchInterval = serpent.Duration(time.Millisecond * 50) cfg.RetryInterval = serpent.Duration(time.Millisecond * 50) cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) // Twice as long as fetch interval to ensure we catch pending updates. mgr, err := notifications.NewManager(cfg, store, metrics, logger.Named("manager")) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) handler := &fakeHandler{} mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ method: handler, }) enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer")) require.NoError(t, err) user := createSampleUser(t, store) // Build fingerprints for the two different series we expect. methodTemplateFP := fingerprintLabels(notifications.LabelMethod, string(method), notifications.LabelTemplateID, template.String()) methodFP := fingerprintLabels(notifications.LabelMethod, string(method)) expected := map[string]func(metric *dto.Metric, series string) bool{ "coderd_notifications_dispatch_attempts_total": func(metric *dto.Metric, series string) bool { // This metric has 3 possible dispositions; find if any of them match first before we check the metric's value. results := map[string]float64{ notifications.ResultSuccess: 1, // Only 1 successful delivery. notifications.ResultTempFail: maxAttempts - 1, // 2 temp failures, on the 3rd it'll be marked permanent failure. notifications.ResultPermFail: 1, // 1 permanent failure after retries exhausted. } var match string for result, val := range results { seriesFP := fingerprintLabels(notifications.LabelMethod, string(method), notifications.LabelTemplateID, template.String(), notifications.LabelResult, result) if !hasMatchingFingerprint(metric, seriesFP) { continue } match = result if debug { t.Logf("coderd_notifications_dispatch_attempts_total{result=%q} == %v: %v", result, val, metric.Counter.GetValue()) } break } // Could not find a matching series. if match == "" { assert.Failf(t, "found unexpected series %q", series) return false } // nolint:forcetypeassert // Already checked above. target := results[match] return metric.Counter.GetValue() == target }, "coderd_notifications_retry_count": func(metric *dto.Metric, series string) bool { assert.Truef(t, hasMatchingFingerprint(metric, methodTemplateFP), "found unexpected series %q", series) if debug { t.Logf("coderd_notifications_retry_count == %v: %v", maxAttempts-1, metric.Counter.GetValue()) } // 1 original attempts + 2 retries = maxAttempts return metric.Counter.GetValue() == maxAttempts-1 }, "coderd_notifications_queued_seconds": func(metric *dto.Metric, series string) bool { assert.Truef(t, hasMatchingFingerprint(metric, methodFP), "found unexpected series %q", series) if debug { t.Logf("coderd_notifications_queued_seconds > 0: %v", metric.Histogram.GetSampleSum()) } // Notifications will queue for a non-zero amount of time. return metric.Histogram.GetSampleSum() > 0 }, "coderd_notifications_dispatcher_send_seconds": func(metric *dto.Metric, series string) bool { assert.Truef(t, hasMatchingFingerprint(metric, methodFP), "found unexpected series %q", series) if debug { t.Logf("coderd_notifications_dispatcher_send_seconds > 0: %v", metric.Histogram.GetSampleSum()) } // Dispatches should take a non-zero amount of time. return metric.Histogram.GetSampleSum() > 0 }, "coderd_notifications_inflight_dispatches": func(metric *dto.Metric, series string) bool { // This is a gauge, so it can be difficult to get the timing right to catch it. // See TestInflightDispatchesMetric for a more precise test. return true }, "coderd_notifications_pending_updates": func(metric *dto.Metric, series string) bool { // This is a gauge, so it can be difficult to get the timing right to catch it. // See TestPendingUpdatesMetric for a more precise test. return true }, "coderd_notifications_synced_updates_total": func(metric *dto.Metric, series string) bool { if debug { t.Logf("coderd_notifications_synced_updates_total = %v: %v", maxAttempts+1, metric.Counter.GetValue()) } // 1 message will exceed its maxAttempts, 1 will succeed on the first try. return metric.Counter.GetValue() == maxAttempts+1 }, } // WHEN: 2 notifications are enqueued, 1 of which will fail until its retries are exhausted, and another which will succeed _, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test") // this will succeed require.NoError(t, err) _, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "failure"}, "test2") // this will fail and retry (maxAttempts - 1) times require.NoError(t, err) mgr.Run(ctx) // THEN: expect all the defined metrics to be present and have their expected values require.EventuallyWithT(t, func(ct *assert.CollectT) { handler.mu.RLock() defer handler.mu.RUnlock() gathered, err := reg.Gather() assert.NoError(t, err) succeeded := len(handler.succeeded) failed := len(handler.failed) if debug { t.Logf("SUCCEEDED == 1: %v, FAILED == %v: %v\n", succeeded, maxAttempts, failed) } // Ensure that all metrics have a) the expected label combinations (series) and b) the expected values. for _, family := range gathered { hasExpectedValue, ok := expected[family.GetName()] if !assert.Truef(ct, ok, "found unexpected metric family %q", family.GetName()) { t.Logf("found unexpected metric family %q", family.GetName()) // Bail out fast if precondition is not met. ct.FailNow() } for _, metric := range family.Metric { assert.True(ct, hasExpectedValue(metric, metric.String())) } } // One message will succeed. assert.Equal(ct, succeeded, 1) // One message will fail, and exhaust its maxAttempts. assert.Equal(ct, failed, maxAttempts) }, testutil.WaitShort, testutil.IntervalFast) } func TestPendingUpdatesMetric(t *testing.T) { t.Parallel() // SETUP ctx, logger, store := setupInMemory(t) reg := prometheus.NewRegistry() metrics := notifications.NewMetrics(reg) template := notifications.TemplateWorkspaceDeleted const method = database.NotificationMethodSmtp // GIVEN: a notification manager whose store updates are intercepted so we can read the number of pending updates set in the metric cfg := defaultNotificationsConfig(method) cfg.FetchInterval = serpent.Duration(time.Millisecond * 50) cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere. cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) syncer := &syncInterceptor{Store: store} interceptor := newUpdateSignallingInterceptor(syncer) mgr, err := notifications.NewManager(cfg, interceptor, metrics, logger.Named("manager")) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) handler := &fakeHandler{} mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ method: handler, }) enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer")) require.NoError(t, err) user := createSampleUser(t, store) // WHEN: 2 notifications are enqueued, one of which will fail and one which will succeed _, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test") // this will succeed require.NoError(t, err) _, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "failure"}, "test2") // this will fail and retry (maxAttempts - 1) times require.NoError(t, err) mgr.Run(ctx) // THEN: // Wait until the handler has dispatched the given notifications. require.Eventually(t, func() bool { handler.mu.RLock() defer handler.mu.RUnlock() return len(handler.succeeded) == 1 && len(handler.failed) == 1 }, testutil.WaitShort, testutil.IntervalFast) // Wait until we intercept the calls to sync the pending updates to the store. success := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateSuccess) failure := testutil.RequireRecvCtx(testutil.Context(t, testutil.WaitShort), t, interceptor.updateFailure) // Wait for the metric to be updated with the expected count of metrics. require.Eventually(t, func() bool { return promtest.ToFloat64(metrics.PendingUpdates) == float64(success+failure) }, testutil.WaitShort, testutil.IntervalFast) // Unpause the interceptor so the updates can proceed. interceptor.unpause() // Validate that the store synced the expected number of updates. require.Eventually(t, func() bool { return syncer.sent.Load() == 1 && syncer.failed.Load() == 1 }, testutil.WaitShort, testutil.IntervalFast) // Wait for the updates to be synced and the metric to reflect that. require.Eventually(t, func() bool { return promtest.ToFloat64(metrics.PendingUpdates) == 0 }, testutil.WaitShort, testutil.IntervalFast) } func TestInflightDispatchesMetric(t *testing.T) { t.Parallel() // SETUP ctx, logger, store := setupInMemory(t) reg := prometheus.NewRegistry() metrics := notifications.NewMetrics(reg) template := notifications.TemplateWorkspaceDeleted const method = database.NotificationMethodSmtp // GIVEN: a notification manager whose dispatches are intercepted and delayed to measure the number of inflight requests cfg := defaultNotificationsConfig(method) cfg.LeaseCount = 10 cfg.FetchInterval = serpent.Duration(time.Millisecond * 50) cfg.RetryInterval = serpent.Duration(time.Hour) // Delay retries so they don't interfere. cfg.StoreSyncInterval = serpent.Duration(time.Millisecond * 100) mgr, err := notifications.NewManager(cfg, store, metrics, logger.Named("manager")) require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, mgr.Stop(ctx)) }) handler := &fakeHandler{} // Delayer will delay all dispatches by 2x fetch intervals to ensure we catch the requests inflight. delayer := newDelayingHandler(cfg.FetchInterval.Value()*2, handler) mgr.WithHandlers(map[database.NotificationMethod]notifications.Handler{ method: delayer, }) enq, err := notifications.NewStoreEnqueuer(cfg, store, defaultHelpers(), logger.Named("enqueuer")) require.NoError(t, err) user := createSampleUser(t, store) // WHEN: notifications are enqueued which will succeed (and be delayed during dispatch) const msgCount = 2 for i := 0; i < msgCount; i++ { _, err = enq.Enqueue(ctx, user.ID, template, map[string]string{"type": "success"}, "test") require.NoError(t, err) } mgr.Run(ctx) // THEN: // Ensure we see the dispatches of the messages inflight. require.Eventually(t, func() bool { return promtest.ToFloat64(metrics.InflightDispatches.WithLabelValues(string(method), template.String())) == msgCount }, testutil.WaitShort, testutil.IntervalFast) // Wait until the handler has dispatched the given notifications. require.Eventually(t, func() bool { handler.mu.RLock() defer handler.mu.RUnlock() return len(handler.succeeded) == msgCount }, testutil.WaitShort, testutil.IntervalFast) // Wait for the updates to be synced and the metric to reflect that. require.Eventually(t, func() bool { return promtest.ToFloat64(metrics.InflightDispatches) == 0 }, testutil.WaitShort, testutil.IntervalFast) } // hasMatchingFingerprint checks if the given metric's series fingerprint matches the reference fingerprint. func hasMatchingFingerprint(metric *dto.Metric, fp model.Fingerprint) bool { return fingerprintLabelPairs(metric.Label) == fp } // fingerprintLabelPairs produces a fingerprint unique to the given combination of label pairs. func fingerprintLabelPairs(lbs []*dto.LabelPair) model.Fingerprint { pairs := make([]string, 0, len(lbs)*2) for _, lp := range lbs { pairs = append(pairs, lp.GetName(), lp.GetValue()) } return fingerprintLabels(pairs...) } // fingerprintLabels produces a fingerprint unique to the given pairs of label values. // MUST contain an even number of arguments (key:value), otherwise it will panic. func fingerprintLabels(lbs ...string) model.Fingerprint { if len(lbs)%2 != 0 { panic("imbalanced set of label pairs given") } lbsSet := make(model.LabelSet, len(lbs)/2) for i := 0; i < len(lbs); i += 2 { k := lbs[i] v := lbs[i+1] lbsSet[model.LabelName(k)] = model.LabelValue(v) } return lbsSet.Fingerprint() // FastFingerprint does not sort the labels. } // updateSignallingInterceptor intercepts bulk update calls to the store, and waits on the "proceed" condition to be // signaled by the caller so it can continue. type updateSignallingInterceptor struct { notifications.Store pause chan any updateSuccess chan int updateFailure chan int } func newUpdateSignallingInterceptor(interceptor notifications.Store) *updateSignallingInterceptor { return &updateSignallingInterceptor{ Store: interceptor, pause: make(chan any, 1), updateSuccess: make(chan int, 1), updateFailure: make(chan int, 1), } } func (u *updateSignallingInterceptor) unpause() { close(u.pause) } func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesSent(ctx context.Context, arg database.BulkMarkNotificationMessagesSentParams) (int64, error) { u.updateSuccess <- len(arg.IDs) // Wait until signaled so we have a chance to read the number of pending updates. <-u.pause return u.Store.BulkMarkNotificationMessagesSent(ctx, arg) } func (u *updateSignallingInterceptor) BulkMarkNotificationMessagesFailed(ctx context.Context, arg database.BulkMarkNotificationMessagesFailedParams) (int64, error) { u.updateFailure <- len(arg.IDs) // Wait until signaled so we have a chance to read the number of pending updates. <-u.pause return u.Store.BulkMarkNotificationMessagesFailed(ctx, arg) } type delayingHandler struct { h notifications.Handler delay time.Duration } func newDelayingHandler(delay time.Duration, handler notifications.Handler) *delayingHandler { return &delayingHandler{ delay: delay, h: handler, } } func (d *delayingHandler) Dispatcher(payload types.MessagePayload, title, body string) (dispatch.DeliveryFunc, error) { deliverFn, err := d.h.Dispatcher(payload, title, body) if err != nil { return nil, err } return func(ctx context.Context, msgID uuid.UUID) (retryable bool, err error) { time.Sleep(d.delay) return deliverFn(ctx, msgID) }, nil }