mirror of
https://github.com/coder/coder.git
synced 2025-07-12 00:14:10 +00:00
feat: fetch prebuilds metrics state in background (#17792)
`Collect()` is called whenever the `/metrics` endpoint is hit to retrieve metrics. The queries used in prebuilds metrics collection are quite heavy, and we want to avoid having them running concurrently / too often to keep db load down. Here I'm moving towards a background retrieval of the state required to set the metrics, which gets invalidated every interval. Also introduces `coderd_prebuilt_workspaces_metrics_last_updated` which operators can use to determine when these metrics go stale. See https://github.com/coder/coder/pull/17789 as well. --------- Signed-off-by: Danny Kopping <dannykopping@gmail.com>
This commit is contained in:
@ -2,14 +2,17 @@ package prebuilds
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"cdr.dev/slog"
|
"cdr.dev/slog"
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
|
||||||
|
|
||||||
"github.com/coder/coder/v2/coderd/database"
|
"github.com/coder/coder/v2/coderd/database"
|
||||||
"github.com/coder/coder/v2/coderd/database/dbauthz"
|
"github.com/coder/coder/v2/coderd/database/dbtime"
|
||||||
"github.com/coder/coder/v2/coderd/prebuilds"
|
"github.com/coder/coder/v2/coderd/prebuilds"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -55,20 +58,34 @@ var (
|
|||||||
labels,
|
labels,
|
||||||
nil,
|
nil,
|
||||||
)
|
)
|
||||||
|
lastUpdateDesc = prometheus.NewDesc(
|
||||||
|
"coderd_prebuilt_workspaces_metrics_last_updated",
|
||||||
|
"The unix timestamp when the metrics related to prebuilt workspaces were last updated; these metrics are cached.",
|
||||||
|
[]string{},
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
metricsUpdateInterval = time.Second * 15
|
||||||
|
metricsUpdateTimeout = time.Second * 10
|
||||||
)
|
)
|
||||||
|
|
||||||
type MetricsCollector struct {
|
type MetricsCollector struct {
|
||||||
database database.Store
|
database database.Store
|
||||||
logger slog.Logger
|
logger slog.Logger
|
||||||
snapshotter prebuilds.StateSnapshotter
|
snapshotter prebuilds.StateSnapshotter
|
||||||
|
|
||||||
|
latestState atomic.Pointer[metricsState]
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ prometheus.Collector = new(MetricsCollector)
|
var _ prometheus.Collector = new(MetricsCollector)
|
||||||
|
|
||||||
func NewMetricsCollector(db database.Store, logger slog.Logger, snapshotter prebuilds.StateSnapshotter) *MetricsCollector {
|
func NewMetricsCollector(db database.Store, logger slog.Logger, snapshotter prebuilds.StateSnapshotter) *MetricsCollector {
|
||||||
|
log := logger.Named("prebuilds_metrics_collector")
|
||||||
return &MetricsCollector{
|
return &MetricsCollector{
|
||||||
database: db,
|
database: db,
|
||||||
logger: logger.Named("prebuilds_metrics_collector"),
|
logger: log,
|
||||||
snapshotter: snapshotter,
|
snapshotter: snapshotter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -80,38 +97,34 @@ func (*MetricsCollector) Describe(descCh chan<- *prometheus.Desc) {
|
|||||||
descCh <- desiredPrebuildsDesc
|
descCh <- desiredPrebuildsDesc
|
||||||
descCh <- runningPrebuildsDesc
|
descCh <- runningPrebuildsDesc
|
||||||
descCh <- eligiblePrebuildsDesc
|
descCh <- eligiblePrebuildsDesc
|
||||||
|
descCh <- lastUpdateDesc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Collect uses the cached state to set configured metrics.
|
||||||
|
// The state is cached because this function can be called multiple times per second and retrieving the current state
|
||||||
|
// is an expensive operation.
|
||||||
func (mc *MetricsCollector) Collect(metricsCh chan<- prometheus.Metric) {
|
func (mc *MetricsCollector) Collect(metricsCh chan<- prometheus.Metric) {
|
||||||
// nolint:gocritic // We need to set an authz context to read metrics from the db.
|
currentState := mc.latestState.Load() // Grab a copy; it's ok if it goes stale during the course of this func.
|
||||||
ctx, cancel := context.WithTimeout(dbauthz.AsPrebuildsOrchestrator(context.Background()), 10*time.Second)
|
if currentState == nil {
|
||||||
defer cancel()
|
mc.logger.Warn(context.Background(), "failed to set prebuilds metrics; state not set")
|
||||||
prebuildMetrics, err := mc.database.GetPrebuildMetrics(ctx)
|
metricsCh <- prometheus.MustNewConstMetric(lastUpdateDesc, prometheus.GaugeValue, 0)
|
||||||
if err != nil {
|
|
||||||
mc.logger.Error(ctx, "failed to get prebuild metrics", slog.Error(err))
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, metric := range prebuildMetrics {
|
for _, metric := range currentState.prebuildMetrics {
|
||||||
metricsCh <- prometheus.MustNewConstMetric(createdPrebuildsDesc, prometheus.CounterValue, float64(metric.CreatedCount), metric.TemplateName, metric.PresetName, metric.OrganizationName)
|
metricsCh <- prometheus.MustNewConstMetric(createdPrebuildsDesc, prometheus.CounterValue, float64(metric.CreatedCount), metric.TemplateName, metric.PresetName, metric.OrganizationName)
|
||||||
metricsCh <- prometheus.MustNewConstMetric(failedPrebuildsDesc, prometheus.CounterValue, float64(metric.FailedCount), metric.TemplateName, metric.PresetName, metric.OrganizationName)
|
metricsCh <- prometheus.MustNewConstMetric(failedPrebuildsDesc, prometheus.CounterValue, float64(metric.FailedCount), metric.TemplateName, metric.PresetName, metric.OrganizationName)
|
||||||
metricsCh <- prometheus.MustNewConstMetric(claimedPrebuildsDesc, prometheus.CounterValue, float64(metric.ClaimedCount), metric.TemplateName, metric.PresetName, metric.OrganizationName)
|
metricsCh <- prometheus.MustNewConstMetric(claimedPrebuildsDesc, prometheus.CounterValue, float64(metric.ClaimedCount), metric.TemplateName, metric.PresetName, metric.OrganizationName)
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshot, err := mc.snapshotter.SnapshotState(ctx, mc.database)
|
for _, preset := range currentState.snapshot.Presets {
|
||||||
if err != nil {
|
|
||||||
mc.logger.Error(ctx, "failed to get latest prebuild state", slog.Error(err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, preset := range snapshot.Presets {
|
|
||||||
if !preset.UsingActiveVersion {
|
if !preset.UsingActiveVersion {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
presetSnapshot, err := snapshot.FilterByPreset(preset.ID)
|
presetSnapshot, err := currentState.snapshot.FilterByPreset(preset.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mc.logger.Error(ctx, "failed to filter by preset", slog.Error(err))
|
mc.logger.Error(context.Background(), "failed to filter by preset", slog.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
state := presetSnapshot.CalculateState()
|
state := presetSnapshot.CalculateState()
|
||||||
@ -120,4 +133,57 @@ func (mc *MetricsCollector) Collect(metricsCh chan<- prometheus.Metric) {
|
|||||||
metricsCh <- prometheus.MustNewConstMetric(runningPrebuildsDesc, prometheus.GaugeValue, float64(state.Actual), preset.TemplateName, preset.Name, preset.OrganizationName)
|
metricsCh <- prometheus.MustNewConstMetric(runningPrebuildsDesc, prometheus.GaugeValue, float64(state.Actual), preset.TemplateName, preset.Name, preset.OrganizationName)
|
||||||
metricsCh <- prometheus.MustNewConstMetric(eligiblePrebuildsDesc, prometheus.GaugeValue, float64(state.Eligible), preset.TemplateName, preset.Name, preset.OrganizationName)
|
metricsCh <- prometheus.MustNewConstMetric(eligiblePrebuildsDesc, prometheus.GaugeValue, float64(state.Eligible), preset.TemplateName, preset.Name, preset.OrganizationName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metricsCh <- prometheus.MustNewConstMetric(lastUpdateDesc, prometheus.GaugeValue, float64(currentState.createdAt.Unix()))
|
||||||
|
}
|
||||||
|
|
||||||
|
type metricsState struct {
|
||||||
|
prebuildMetrics []database.GetPrebuildMetricsRow
|
||||||
|
snapshot *prebuilds.GlobalSnapshot
|
||||||
|
createdAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// BackgroundFetch updates the metrics state every given interval.
|
||||||
|
func (mc *MetricsCollector) BackgroundFetch(ctx context.Context, updateInterval, updateTimeout time.Duration) {
|
||||||
|
tick := time.NewTicker(time.Nanosecond)
|
||||||
|
defer tick.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-tick.C:
|
||||||
|
// Tick immediately, then set regular interval.
|
||||||
|
tick.Reset(updateInterval)
|
||||||
|
|
||||||
|
if err := mc.UpdateState(ctx, updateTimeout); err != nil {
|
||||||
|
mc.logger.Error(ctx, "failed to update prebuilds metrics state", slog.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateState builds the current metrics state.
|
||||||
|
func (mc *MetricsCollector) UpdateState(ctx context.Context, timeout time.Duration) error {
|
||||||
|
start := time.Now()
|
||||||
|
fetchCtx, fetchCancel := context.WithTimeout(ctx, timeout)
|
||||||
|
defer fetchCancel()
|
||||||
|
|
||||||
|
prebuildMetrics, err := mc.database.GetPrebuildMetrics(fetchCtx)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("fetch prebuild metrics: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshot, err := mc.snapshotter.SnapshotState(fetchCtx, mc.database)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("snapshot state: %w", err)
|
||||||
|
}
|
||||||
|
mc.logger.Debug(ctx, "fetched prebuilds metrics state", slog.F("duration_secs", fmt.Sprintf("%.2f", time.Since(start).Seconds())))
|
||||||
|
|
||||||
|
mc.latestState.Store(&metricsState{
|
||||||
|
prebuildMetrics: prebuildMetrics,
|
||||||
|
snapshot: snapshot,
|
||||||
|
createdAt: dbtime.Now(),
|
||||||
|
})
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/coder/quartz"
|
"github.com/coder/quartz"
|
||||||
|
|
||||||
"github.com/coder/coder/v2/coderd/database"
|
"github.com/coder/coder/v2/coderd/database"
|
||||||
|
"github.com/coder/coder/v2/coderd/database/dbauthz"
|
||||||
"github.com/coder/coder/v2/coderd/database/dbgen"
|
"github.com/coder/coder/v2/coderd/database/dbgen"
|
||||||
"github.com/coder/coder/v2/coderd/database/dbtestutil"
|
"github.com/coder/coder/v2/coderd/database/dbtestutil"
|
||||||
agplprebuilds "github.com/coder/coder/v2/coderd/prebuilds"
|
agplprebuilds "github.com/coder/coder/v2/coderd/prebuilds"
|
||||||
@ -248,6 +249,10 @@ func TestMetricsCollector(t *testing.T) {
|
|||||||
setupTestDBWorkspaceAgent(t, db, workspace.ID, eligible)
|
setupTestDBWorkspaceAgent(t, db, workspace.ID, eligible)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Force an update to the metrics state to allow the collector to collect fresh metrics.
|
||||||
|
// nolint:gocritic // Authz context needed to retrieve state.
|
||||||
|
require.NoError(t, collector.UpdateState(dbauthz.AsPrebuildsOrchestrator(ctx), testutil.WaitLong))
|
||||||
|
|
||||||
metricsFamilies, err := registry.Gather()
|
metricsFamilies, err := registry.Gather()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -67,11 +68,13 @@ func NewStoreReconciler(store database.Store,
|
|||||||
provisionNotifyCh: make(chan database.ProvisionerJob, 10),
|
provisionNotifyCh: make(chan database.ProvisionerJob, 10),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if registerer != nil {
|
||||||
reconciler.metrics = NewMetricsCollector(store, logger, reconciler)
|
reconciler.metrics = NewMetricsCollector(store, logger, reconciler)
|
||||||
if err := registerer.Register(reconciler.metrics); err != nil {
|
if err := registerer.Register(reconciler.metrics); err != nil {
|
||||||
// If the registerer fails to register the metrics collector, it's not fatal.
|
// If the registerer fails to register the metrics collector, it's not fatal.
|
||||||
logger.Error(context.Background(), "failed to register prometheus metrics", slog.Error(err))
|
logger.Error(context.Background(), "failed to register prometheus metrics", slog.Error(err))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return reconciler
|
return reconciler
|
||||||
}
|
}
|
||||||
@ -87,9 +90,11 @@ func (c *StoreReconciler) Run(ctx context.Context) {
|
|||||||
slog.F("backoff_interval", c.cfg.ReconciliationBackoffInterval.String()),
|
slog.F("backoff_interval", c.cfg.ReconciliationBackoffInterval.String()),
|
||||||
slog.F("backoff_lookback", c.cfg.ReconciliationBackoffLookback.String()))
|
slog.F("backoff_lookback", c.cfg.ReconciliationBackoffLookback.String()))
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
ticker := c.clock.NewTicker(reconciliationInterval)
|
ticker := c.clock.NewTicker(reconciliationInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
wg.Wait()
|
||||||
c.done <- struct{}{}
|
c.done <- struct{}{}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -97,6 +102,15 @@ func (c *StoreReconciler) Run(ctx context.Context) {
|
|||||||
ctx, cancel := context.WithCancelCause(dbauthz.AsPrebuildsOrchestrator(ctx))
|
ctx, cancel := context.WithCancelCause(dbauthz.AsPrebuildsOrchestrator(ctx))
|
||||||
c.cancelFn = cancel
|
c.cancelFn = cancel
|
||||||
|
|
||||||
|
// Start updating metrics in the background.
|
||||||
|
if c.metrics != nil {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
c.metrics.BackgroundFetch(ctx, metricsUpdateInterval, metricsUpdateTimeout)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// Everything is in place, reconciler can now be considered as running.
|
// Everything is in place, reconciler can now be considered as running.
|
||||||
//
|
//
|
||||||
// NOTE: without this atomic bool, Stop might race with Run for the c.cancelFn above.
|
// NOTE: without this atomic bool, Stop might race with Run for the c.cancelFn above.
|
||||||
|
Reference in New Issue
Block a user