feat: expose agent metrics via Prometheus endpoint (#7011)

* WIP

* WIP

* WIP

* Agents

* fix

* 1min

* fix

* WIP

* Test

* docs

* fmt

* Add timer to measure the metrics collection

* Use CachedGaugeVec

* Unit tests

* Address PR comments
This commit is contained in:
Marcin Tojek
2023-04-07 17:48:52 +02:00
committed by GitHub
parent dd85ea8977
commit 0347231bb8
7 changed files with 629 additions and 48 deletions

View File

@ -2,13 +2,24 @@ package prometheusmetrics
import (
"context"
"database/sql"
"errors"
"fmt"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"tailscale.com/tailcfg"
"cdr.dev/slog"
"github.com/coder/coder/coderd"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbauthz"
"github.com/coder/coder/tailnet"
)
// ActiveUsers tracks the number of users that have authenticated within the past hour.
@ -106,3 +117,175 @@ func Workspaces(ctx context.Context, registerer prometheus.Registerer, db databa
}()
return cancelFunc, nil
}
// Agents tracks the total number of workspaces with labels on status.
func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, coordinator *atomic.Pointer[tailnet.Coordinator], derpMap *tailcfg.DERPMap, agentInactiveDisconnectTimeout, duration time.Duration) (context.CancelFunc, error) {
if duration == 0 {
duration = 1 * time.Minute
}
agentsGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "agents",
Name: "up",
Help: "The number of active agents per workspace.",
}, []string{"username", "workspace_name"}))
err := registerer.Register(agentsGauge)
if err != nil {
return nil, err
}
agentsConnectionsGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "agents",
Name: "connections",
Help: "Agent connections with statuses.",
}, []string{"agent_name", "username", "workspace_name", "status", "lifecycle_state", "tailnet_node"}))
err = registerer.Register(agentsConnectionsGauge)
if err != nil {
return nil, err
}
agentsConnectionLatenciesGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "agents",
Name: "connection_latencies_seconds",
Help: "Agent connection latencies in seconds.",
}, []string{"agent_id", "username", "workspace_name", "derp_region", "preferred"}))
err = registerer.Register(agentsConnectionLatenciesGauge)
if err != nil {
return nil, err
}
agentsAppsGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "coderd",
Subsystem: "agents",
Name: "apps",
Help: "Agent applications with statuses.",
}, []string{"agent_name", "username", "workspace_name", "app_name", "health"}))
err = registerer.Register(agentsAppsGauge)
if err != nil {
return nil, err
}
metricsCollectorAgents := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "coderd",
Subsystem: "prometheusmetrics",
Name: "agents_execution_seconds",
Help: "Histogram for duration of agents metrics collection in seconds.",
Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30},
})
err = registerer.Register(metricsCollectorAgents)
if err != nil {
return nil, err
}
// nolint:gocritic // Prometheus must collect metrics for all Coder users.
ctx, cancelFunc := context.WithCancel(dbauthz.AsSystemRestricted(ctx))
ticker := time.NewTicker(duration)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
logger.Debug(ctx, "Agent metrics collection is starting")
timer := prometheus.NewTimer(metricsCollectorAgents)
workspaceRows, err := db.GetWorkspaces(ctx, database.GetWorkspacesParams{
AgentInactiveDisconnectTimeoutSeconds: int64(agentInactiveDisconnectTimeout.Seconds()),
})
if err != nil {
logger.Error(ctx, "can't get workspace rows", slog.Error(err))
continue
}
for _, workspace := range workspaceRows {
user, err := db.GetUserByID(ctx, workspace.OwnerID)
if err != nil {
logger.Error(ctx, "can't get user", slog.F("user_id", workspace.OwnerID), slog.Error(err))
agentsGauge.WithLabelValues(VectorOperationAdd, 0, user.Username, workspace.Name)
continue
}
agents, err := db.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, workspace.ID)
if err != nil {
logger.Error(ctx, "can't get workspace agents", slog.F("workspace_id", workspace.ID), slog.Error(err))
agentsGauge.WithLabelValues(VectorOperationAdd, 0, user.Username, workspace.Name)
continue
}
if len(agents) == 0 {
logger.Debug(ctx, "workspace agents are unavailable", slog.F("workspace_id", workspace.ID))
agentsGauge.WithLabelValues(VectorOperationAdd, 0, user.Username, workspace.Name)
continue
}
for _, agent := range agents {
// Collect information about agents
agentsGauge.WithLabelValues(VectorOperationAdd, 1, user.Username, workspace.Name)
connectionStatus := agent.Status(agentInactiveDisconnectTimeout)
node := (*coordinator.Load()).Node(agent.ID)
tailnetNode := "unknown"
if node != nil {
tailnetNode = node.ID.String()
}
agentsConnectionsGauge.WithLabelValues(VectorOperationSet, 1, agent.Name, user.Username, workspace.Name, string(connectionStatus.Status), string(agent.LifecycleState), tailnetNode)
if node == nil {
logger.Debug(ctx, "can't read in-memory node for agent", slog.F("agent_id", agent.ID))
} else {
// Collect information about connection latencies
for rawRegion, latency := range node.DERPLatency {
regionParts := strings.SplitN(rawRegion, "-", 2)
regionID, err := strconv.Atoi(regionParts[0])
if err != nil {
logger.Error(ctx, "can't convert DERP region", slog.F("agent_id", agent.ID), slog.F("raw_region", rawRegion), slog.Error(err))
continue
}
region, found := derpMap.Regions[regionID]
if !found {
// It's possible that a workspace agent is using an old DERPMap
// and reports regions that do not exist. If that's the case,
// report the region as unknown!
region = &tailcfg.DERPRegion{
RegionID: regionID,
RegionName: fmt.Sprintf("Unnamed %d", regionID),
}
}
agentsConnectionLatenciesGauge.WithLabelValues(VectorOperationSet, latency, agent.Name, user.Username, workspace.Name, region.RegionName, fmt.Sprintf("%v", node.PreferredDERP == regionID))
}
}
// Collect information about registered applications
apps, err := db.GetWorkspaceAppsByAgentID(ctx, agent.ID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
logger.Error(ctx, "can't get workspace apps", slog.F("agent_id", agent.ID), slog.Error(err))
continue
}
for _, app := range apps {
agentsAppsGauge.WithLabelValues(VectorOperationAdd, 1, agent.Name, user.Username, workspace.Name, app.DisplayName, string(app.Health))
}
}
}
agentsGauge.Commit()
agentsConnectionsGauge.Commit()
agentsConnectionLatenciesGauge.Commit()
agentsAppsGauge.Commit()
logger.Debug(ctx, "Agent metrics collection is done")
metricsCollectorAgents.Observe(timer.ObserveDuration().Seconds())
}
}()
return cancelFunc, nil
}