Files
coder/coderd/database/pubsub/latency.go

75 lines
2.0 KiB
Go

package pubsub
import (
"bytes"
"context"
"fmt"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog"
)
// LatencyMeasurer is used to measure the send & receive latencies of the underlying Pubsub implementation. We use these
// measurements to export metrics which can indicate when a Pubsub implementation's queue is overloaded and/or full.
type LatencyMeasurer struct {
// Create unique pubsub channel names so that multiple coderd replicas do not clash when performing latency measurements.
channel uuid.UUID
logger slog.Logger
}
// LatencyMessageLength is the length of a UUIDv4 encoded to hex.
const LatencyMessageLength = 36
func NewLatencyMeasurer(logger slog.Logger) *LatencyMeasurer {
return &LatencyMeasurer{
channel: uuid.New(),
logger: logger,
}
}
// Measure takes a given Pubsub implementation, publishes a message & immediately receives it, and returns the observed latency.
func (lm *LatencyMeasurer) Measure(ctx context.Context, p Pubsub) (send, recv time.Duration, err error) {
var (
start time.Time
res = make(chan time.Duration, 1)
)
msg := []byte(uuid.New().String())
lm.logger.Debug(ctx, "performing measurement", slog.F("msg", msg))
cancel, err := p.Subscribe(lm.latencyChannelName(), func(ctx context.Context, in []byte) {
if !bytes.Equal(in, msg) {
lm.logger.Warn(ctx, "received unexpected message", slog.F("got", in), slog.F("expected", msg))
return
}
res <- time.Since(start)
})
if err != nil {
return -1, -1, xerrors.Errorf("failed to subscribe: %w", err)
}
defer cancel()
start = time.Now()
err = p.Publish(lm.latencyChannelName(), msg)
if err != nil {
return -1, -1, xerrors.Errorf("failed to publish: %w", err)
}
send = time.Since(start)
select {
case <-ctx.Done():
lm.logger.Error(ctx, "context canceled before message could be received", slog.Error(ctx.Err()), slog.F("msg", msg))
return send, -1, ctx.Err()
case recv = <-res:
return send, recv, nil
}
}
func (lm *LatencyMeasurer) latencyChannelName() string {
return fmt.Sprintf("latency-measure:%s", lm.channel)
}