1
0
mirror of https://github.com/coder/coder.git synced 2025-03-15 19:19:58 +00:00

feat: measure pubsub latencies and expose metrics ()

This commit is contained in:
Danny Kopping
2024-05-10 14:31:49 +02:00
committed by GitHub
parent e14f8fb64b
commit 4671ebb330
5 changed files with 326 additions and 38 deletions

@ -0,0 +1,74 @@
package pubsub
import (
"bytes"
"context"
"fmt"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog"
)
// LatencyMeasurer is used to measure the send & receive latencies of the underlying Pubsub implementation. We use these
// measurements to export metrics which can indicate when a Pubsub implementation's queue is overloaded and/or full.
type LatencyMeasurer struct {
// Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements.
channel uuid.UUID
logger slog.Logger
}
// LatencyMessageLength is the length of a UUIDv4 encoded to hex.
const LatencyMessageLength = 36
func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
return &LatencyMeasurer{
channel: uuid.New(),
logger: logger,
}
}
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send, recv time.Duration, err error) {
var (
start time.Time
res = make(chan time.Duration, 1)
)
msg := []byte(uuid.New().String())
lm.logger.Debug(ctx, "performing measurement", slog.F("msg", msg))
cancel, err := p.Subscribe(lm.latencyChannelName(), func(ctx context.Context, in []byte) {
if !bytes.Equal(in, msg) {
lm.logger.Warn(ctx, "received unexpected message", slog.F("got", in), slog.F("expected", msg))
return
}
res <- time.Since(start)
})
if err != nil {
return -1, -1, xerrors.Errorf("failed to subscribe: %w", err)
}
defer cancel()
start = time.Now()
err = p.Publish(lm.latencyChannelName(), msg)
if err != nil {
return -1, -1, xerrors.Errorf("failed to publish: %w", err)
}
send = time.Since(start)
select {
case <-ctx.Done():
lm.logger.Error(ctx, "context canceled before message could be received", slog.Error(ctx.Err()), slog.F("msg", msg))
return send, -1, ctx.Err()
case recv = <-res:
return send, recv, nil
}
}
func (lm *LatencyMeasurer) latencyChannelName() string {
return fmt.Sprintf("latency-measure:%s", lm.channel)
}

@ -7,6 +7,7 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
@ -28,6 +29,9 @@ type ListenerWithErr func(ctx context.Context, message []byte, err error)
// might have been dropped.
var ErrDroppedMessages = xerrors.New("dropped messages")
// LatencyMeasureTimeout defines how often to trigger a new background latency measurement.
const LatencyMeasureTimeout = time.Second * 10
// Pubsub is a generic interface for broadcasting and receiving messages.
// Implementors should assume high-availability with the backing implementation.
type Pubsub interface {
@ -205,6 +209,10 @@ type PGPubsub struct {
receivedBytesTotal prometheus.Counter
disconnectionsTotal prometheus.Counter
connected prometheus.Gauge
latencyMeasurer *LatencyMeasurer
latencyMeasureCounter atomic.Int64
latencyErrCounter atomic.Int64
}
// BufferSize is the maximum number of unhandled messages we will buffer
@ -478,6 +486,30 @@ var (
)
)
// additional metrics collected out-of-band
var (
pubsubSendLatencyDesc = prometheus.NewDesc(
"coder_pubsub_send_latency_seconds",
"The time taken to send a message into a pubsub event channel",
nil, nil,
)
pubsubRecvLatencyDesc = prometheus.NewDesc(
"coder_pubsub_receive_latency_seconds",
"The time taken to receive a message from a pubsub event channel",
nil, nil,
)
pubsubLatencyMeasureCountDesc = prometheus.NewDesc(
"coder_pubsub_latency_measures_total",
"The number of pubsub latency measurements",
nil, nil,
)
pubsubLatencyMeasureErrDesc = prometheus.NewDesc(
"coder_pubsub_latency_measure_errs_total",
"The number of pubsub latency measurement failures",
nil, nil,
)
)
// We'll track messages as size "normal" and "colossal", where the
// latter are messages larger than 7600 bytes, or 95% of the postgres
// notify limit. If we see a lot of colossal packets that's an indication that
@ -504,6 +536,12 @@ func (p *PGPubsub) Describe(descs chan<- *prometheus.Desc) {
// implicit metrics
descs <- currentSubscribersDesc
descs <- currentEventsDesc
// additional metrics
descs <- pubsubSendLatencyDesc
descs <- pubsubRecvLatencyDesc
descs <- pubsubLatencyMeasureCountDesc
descs <- pubsubLatencyMeasureErrDesc
}
// Collect implements, along with Describe, the prometheus.Collector interface
@ -528,6 +566,20 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
p.qMu.Unlock()
metrics <- prometheus.MustNewConstMetric(currentSubscribersDesc, prometheus.GaugeValue, float64(subs))
metrics <- prometheus.MustNewConstMetric(currentEventsDesc, prometheus.GaugeValue, float64(events))
// additional metrics
ctx, cancel := context.WithTimeout(context.Background(), LatencyMeasureTimeout)
defer cancel()
send, recv, err := p.latencyMeasurer.Measure(ctx, p)
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureCountDesc, prometheus.CounterValue, float64(p.latencyMeasureCounter.Add(1)))
if err != nil {
p.logger.Warn(context.Background(), "failed to measure latency", slog.Error(err))
metrics <- prometheus.MustNewConstMetric(pubsubLatencyMeasureErrDesc, prometheus.CounterValue, float64(p.latencyErrCounter.Add(1)))
return
}
metrics <- prometheus.MustNewConstMetric(pubsubSendLatencyDesc, prometheus.GaugeValue, send.Seconds())
metrics <- prometheus.MustNewConstMetric(pubsubRecvLatencyDesc, prometheus.GaugeValue, recv.Seconds())
}
// New creates a new Pubsub implementation using a PostgreSQL connection.
@ -544,10 +596,11 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect
// newWithoutListener creates a new PGPubsub without creating the pqListener.
func newWithoutListener(logger slog.Logger, database *sql.DB) *PGPubsub {
return &PGPubsub{
logger: logger,
listenDone: make(chan struct{}),
db: database,
queues: make(map[string]map[uuid.UUID]*msgQueue),
logger: logger,
listenDone: make(chan struct{}),
db: database,
queues: make(map[string]map[uuid.UUID]*msgQueue),
latencyMeasurer: NewLatencyMeasurer(logger.Named("latency-measurer")),
publishesTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "coder",

@ -3,6 +3,7 @@
package pubsub_test
import (
"bytes"
"context"
"database/sql"
"fmt"
@ -15,6 +16,8 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"
"cdr.dev/slog/sloggers/sloghuman"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
@ -294,3 +297,111 @@ func TestPubsub_Disconnect(t *testing.T) {
}
require.True(t, gotDroppedErr)
}
func TestMeasureLatency(t *testing.T) {
t.Parallel()
newPubsub := func() (pubsub.Pubsub, func()) {
ctx, cancel := context.WithCancel(context.Background())
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
connectionURL, closePg, err := dbtestutil.Open()
require.NoError(t, err)
db, err := sql.Open("postgres", connectionURL)
require.NoError(t, err)
ps, err := pubsub.New(ctx, logger, db, connectionURL)
require.NoError(t, err)
return ps, func() {
_ = ps.Close()
_ = db.Close()
closePg()
cancel()
}
}
t.Run("MeasureLatency", func(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
ps, done := newPubsub()
defer done()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
require.NoError(t, err)
require.Greater(t, send.Seconds(), 0.0)
require.Greater(t, recv.Seconds(), 0.0)
})
t.Run("MeasureLatencyRecvTimeout", func(t *testing.T) {
t.Parallel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
ps, done := newPubsub()
defer done()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour))
defer cancel()
send, recv, err := pubsub.NewLatencyMeasurer(logger).Measure(ctx, ps)
require.ErrorContains(t, err, context.DeadlineExceeded.Error())
require.Greater(t, send.Seconds(), 0.0)
require.EqualValues(t, recv, time.Duration(-1))
})
t.Run("MeasureLatencyNotifyRace", func(t *testing.T) {
t.Parallel()
var buf bytes.Buffer
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
logger = logger.AppendSinks(sloghuman.Sink(&buf))
lm := pubsub.NewLatencyMeasurer(logger)
ps, done := newPubsub()
defer done()
racy := newRacyPubsub(ps)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
send, recv, err := lm.Measure(ctx, racy)
assert.NoError(t, err)
assert.Greater(t, send.Seconds(), 0.0)
assert.Greater(t, recv.Seconds(), 0.0)
logger.Sync()
assert.Contains(t, buf.String(), "received unexpected message")
})
}
// racyPubsub simulates a race on the same channel by publishing two messages (one expected, one not).
// This is used to verify that a subscriber will only listen for the message it explicitly expects.
type racyPubsub struct {
pubsub.Pubsub
}
func newRacyPubsub(ps pubsub.Pubsub) *racyPubsub {
return &racyPubsub{ps}
}
func (s *racyPubsub) Subscribe(event string, listener pubsub.Listener) (cancel func(), err error) {
return s.Pubsub.Subscribe(event, listener)
}
func (s *racyPubsub) SubscribeWithErr(event string, listener pubsub.ListenerWithErr) (cancel func(), err error) {
return s.Pubsub.SubscribeWithErr(event, listener)
}
func (s *racyPubsub) Publish(event string, message []byte) error {
err := s.Pubsub.Publish(event, []byte("nonsense"))
if err != nil {
return xerrors.Errorf("failed to send simulated race: %w", err)
}
return s.Pubsub.Publish(event, message)
}
func (s *racyPubsub) Close() error {
return s.Pubsub.Close()
}

@ -39,7 +39,11 @@ func TestPGPubsub_Metrics(t *testing.T) {
err = registry.Register(uut)
require.NoError(t, err)
// each Gather measures pubsub latency by publishing a message & subscribing to it
var gatherCount float64
metrics, err := registry.Gather()
gatherCount++
require.NoError(t, err)
require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_events"))
require.True(t, testutil.PromGaugeHasValue(t, metrics, 0, "coder_pubsub_current_subscribers"))
@ -59,19 +63,26 @@ func TestPGPubsub_Metrics(t *testing.T) {
_ = testutil.RequireRecvCtx(ctx, t, messageChannel)
require.Eventually(t, func() bool {
latencyBytes := gatherCount * pubsub.LatencyMessageLength
metrics, err = registry.Gather()
gatherCount++
assert.NoError(t, err)
return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") &&
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_subscribers") &&
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, 7, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, 7, "coder_pubsub_published_bytes_total")
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, float64(len(data))+latencyBytes, "coder_pubsub_published_bytes_total") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") &&
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
}, testutil.WaitShort, testutil.IntervalFast)
colossalData := make([]byte, 7600)
colossalSize := 7600
colossalData := make([]byte, colossalSize)
for i := range colossalData {
colossalData[i] = 'q'
}
@ -89,16 +100,22 @@ func TestPGPubsub_Metrics(t *testing.T) {
_ = testutil.RequireRecvCtx(ctx, t, messageChannel)
require.Eventually(t, func() bool {
latencyBytes := gatherCount * pubsub.LatencyMessageLength
metrics, err = registry.Gather()
gatherCount++
assert.NoError(t, err)
return testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_current_events") &&
testutil.PromGaugeHasValue(t, metrics, 2, "coder_pubsub_current_subscribers") &&
testutil.PromGaugeHasValue(t, metrics, 1, "coder_pubsub_connected") &&
testutil.PromCounterHasValue(t, metrics, 2, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 2, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, 1+gatherCount, "coder_pubsub_publishes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, 1+gatherCount, "coder_pubsub_subscribes_total", "true") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_messages_total", "normal") &&
testutil.PromCounterHasValue(t, metrics, 1, "coder_pubsub_messages_total", "colossal") &&
testutil.PromCounterHasValue(t, metrics, 7607, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, 7607, "coder_pubsub_published_bytes_total")
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_received_bytes_total") &&
testutil.PromCounterHasValue(t, metrics, float64(colossalSize+len(data))+latencyBytes, "coder_pubsub_published_bytes_total") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_send_latency_seconds") &&
testutil.PromGaugeAssertion(t, metrics, func(in float64) bool { return in > 0 }, "coder_pubsub_receive_latency_seconds") &&
testutil.PromCounterHasValue(t, metrics, gatherCount, "coder_pubsub_latency_measures_total") &&
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
}, testutil.WaitShort, testutil.IntervalFast)
}

@ -7,29 +7,60 @@ import (
"github.com/stretchr/testify/require"
)
func PromGaugeHasValue(t testing.TB, metrics []*dto.MetricFamily, value float64, name string, label ...string) bool {
type kind string
const (
counterKind kind = "counter"
gaugeKind kind = "gauge"
)
func PromGaugeHasValue(t testing.TB, metrics []*dto.MetricFamily, value float64, name string, labels ...string) bool {
t.Helper()
for _, family := range metrics {
if family.GetName() != name {
continue
}
ms := family.GetMetric()
metricsLoop:
for _, m := range ms {
require.Equal(t, len(label), len(m.GetLabel()))
for i, lv := range label {
if lv != m.GetLabel()[i].GetValue() {
continue metricsLoop
}
}
return value == m.GetGauge().GetValue()
}
}
return false
return value == getValue(t, metrics, gaugeKind, name, labels...)
}
func PromCounterHasValue(t testing.TB, metrics []*dto.MetricFamily, value float64, name string, label ...string) bool {
func PromCounterHasValue(t testing.TB, metrics []*dto.MetricFamily, value float64, name string, labels ...string) bool {
t.Helper()
return value == getValue(t, metrics, counterKind, name, labels...)
}
func PromGaugeAssertion(t testing.TB, metrics []*dto.MetricFamily, assert func(in float64) bool, name string, labels ...string) bool {
t.Helper()
return assert(getValue(t, metrics, gaugeKind, name, labels...))
}
func PromCounterAssertion(t testing.TB, metrics []*dto.MetricFamily, assert func(in float64) bool, name string, labels ...string) bool {
t.Helper()
return assert(getValue(t, metrics, counterKind, name, labels...))
}
func PromCounterGathered(t testing.TB, metrics []*dto.MetricFamily, name string, labels ...string) bool {
t.Helper()
return getMetric(t, metrics, name, labels...) != nil
}
func PromGaugeGathered(t testing.TB, metrics []*dto.MetricFamily, name string, labels ...string) bool {
t.Helper()
return getMetric(t, metrics, name, labels...) != nil
}
func getValue(t testing.TB, metrics []*dto.MetricFamily, kind kind, name string, labels ...string) float64 {
m := getMetric(t, metrics, name, labels...)
if m == nil {
return -1
}
switch kind {
case counterKind:
return m.GetCounter().GetValue()
case gaugeKind:
return m.GetGauge().GetValue()
default:
return -1
}
}
func getMetric(t testing.TB, metrics []*dto.MetricFamily, name string, labels ...string) *dto.Metric {
for _, family := range metrics {
if family.GetName() != name {
continue
@ -37,14 +68,16 @@ func PromCounterHasValue(t testing.TB, metrics []*dto.MetricFamily, value float6
ms := family.GetMetric()
metricsLoop:
for _, m := range ms {
require.Equal(t, len(label), len(m.GetLabel()))
for i, lv := range label {
require.Equal(t, len(labels), len(m.GetLabel()))
for i, lv := range labels {
if lv != m.GetLabel()[i].GetValue() {
continue metricsLoop
}
}
return value == m.GetCounter().GetValue()
return m
}
}
return false
return nil
}