mirror of
https://github.com/coder/coder.git
synced 2025-07-09 11:45:56 +00:00
feat: expose agent stats via Prometheus endpoint (#7115)
* WIP * WIP * WIP * Agents * fix * 1min * fix * WIP * Test * docs * fmt * Add timer to measure the metrics collection * Use CachedGaugeVec * Unit tests * WIP * WIP * db: GetWorkspaceAgentStatsAndLabels * fmt * WIP * gauges * feat: collect * fix * fmt * minor fixes * Prometheus flag * fix * WIP * fix tests * WIP * fix json * Rx Tx bytes * CloseFunc * fix * fix * Fixes * fix * fix: IgnoreErrors * Fix: Windows * fix * reflect.DeepEquals
This commit is contained in:
3
coderd/apidoc/docs.go
generated
3
coderd/apidoc/docs.go
generated
@ -7822,6 +7822,9 @@ const docTemplate = `{
|
||||
"address": {
|
||||
"$ref": "#/definitions/clibase.HostPort"
|
||||
},
|
||||
"collect_agent_stats": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"enable": {
|
||||
"type": "boolean"
|
||||
}
|
||||
|
3
coderd/apidoc/swagger.json
generated
3
coderd/apidoc/swagger.json
generated
@ -7008,6 +7008,9 @@
|
||||
"address": {
|
||||
"$ref": "#/definitions/clibase.HostPort"
|
||||
},
|
||||
"collect_agent_stats": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"enable": {
|
||||
"type": "boolean"
|
||||
}
|
||||
|
@ -302,6 +302,10 @@ func (q *querier) GetWorkspaceAgentStats(ctx context.Context, createdAfter time.
|
||||
return q.db.GetWorkspaceAgentStats(ctx, createdAfter)
|
||||
}
|
||||
|
||||
func (q *querier) GetWorkspaceAgentStatsAndLabels(ctx context.Context, createdAfter time.Time) ([]database.GetWorkspaceAgentStatsAndLabelsRow, error) {
|
||||
return q.db.GetWorkspaceAgentStatsAndLabels(ctx, createdAfter)
|
||||
}
|
||||
|
||||
func (q *querier) GetDeploymentWorkspaceStats(ctx context.Context) (database.GetDeploymentWorkspaceStatsRow, error) {
|
||||
return q.db.GetDeploymentWorkspaceStats(ctx)
|
||||
}
|
||||
|
@ -3998,6 +3998,77 @@ func (q *fakeQuerier) GetWorkspaceAgentStats(_ context.Context, createdAfter tim
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (q *fakeQuerier) GetWorkspaceAgentStatsAndLabels(ctx context.Context, createdAfter time.Time) ([]database.GetWorkspaceAgentStatsAndLabelsRow, error) {
|
||||
q.mutex.RLock()
|
||||
defer q.mutex.RUnlock()
|
||||
|
||||
agentStatsCreatedAfter := make([]database.WorkspaceAgentStat, 0)
|
||||
latestAgentStats := map[uuid.UUID]database.WorkspaceAgentStat{}
|
||||
|
||||
for _, agentStat := range q.workspaceAgentStats {
|
||||
if agentStat.CreatedAt.After(createdAfter) {
|
||||
agentStatsCreatedAfter = append(agentStatsCreatedAfter, agentStat)
|
||||
latestAgentStats[agentStat.AgentID] = agentStat
|
||||
}
|
||||
}
|
||||
|
||||
statByAgent := map[uuid.UUID]database.GetWorkspaceAgentStatsAndLabelsRow{}
|
||||
|
||||
// Session and connection metrics
|
||||
for _, agentStat := range latestAgentStats {
|
||||
stat := statByAgent[agentStat.AgentID]
|
||||
stat.SessionCountVSCode += agentStat.SessionCountVSCode
|
||||
stat.SessionCountJetBrains += agentStat.SessionCountJetBrains
|
||||
stat.SessionCountReconnectingPTY += agentStat.SessionCountReconnectingPTY
|
||||
stat.SessionCountSSH += agentStat.SessionCountSSH
|
||||
stat.ConnectionCount += agentStat.ConnectionCount
|
||||
if agentStat.ConnectionMedianLatencyMS >= 0 && stat.ConnectionMedianLatencyMS < agentStat.ConnectionMedianLatencyMS {
|
||||
stat.ConnectionMedianLatencyMS = agentStat.ConnectionMedianLatencyMS
|
||||
}
|
||||
statByAgent[agentStat.AgentID] = stat
|
||||
}
|
||||
|
||||
// Tx, Rx metrics
|
||||
for _, agentStat := range agentStatsCreatedAfter {
|
||||
stat := statByAgent[agentStat.AgentID]
|
||||
stat.RxBytes += agentStat.RxBytes
|
||||
stat.TxBytes += agentStat.TxBytes
|
||||
statByAgent[agentStat.AgentID] = stat
|
||||
}
|
||||
|
||||
// Labels
|
||||
for _, agentStat := range agentStatsCreatedAfter {
|
||||
stat := statByAgent[agentStat.AgentID]
|
||||
|
||||
user, err := q.getUserByIDNoLock(agentStat.UserID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stat.Username = user.Username
|
||||
|
||||
workspace, err := q.GetWorkspaceByID(ctx, agentStat.WorkspaceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stat.WorkspaceName = workspace.Name
|
||||
|
||||
agent, err := q.GetWorkspaceAgentByID(ctx, agentStat.AgentID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stat.AgentName = agent.Name
|
||||
|
||||
statByAgent[agentStat.AgentID] = stat
|
||||
}
|
||||
|
||||
stats := make([]database.GetWorkspaceAgentStatsAndLabelsRow, 0, len(statByAgent))
|
||||
for _, agent := range statByAgent {
|
||||
stats = append(stats, agent)
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (q *fakeQuerier) GetWorkspacesEligibleForAutoStartStop(ctx context.Context, now time.Time) ([]database.Workspace, error) {
|
||||
q.mutex.RLock()
|
||||
defer q.mutex.RUnlock()
|
||||
|
@ -130,6 +130,7 @@ type sqlcQuerier interface {
|
||||
GetWorkspaceAgentMetadata(ctx context.Context, workspaceAgentID uuid.UUID) ([]WorkspaceAgentMetadatum, error)
|
||||
GetWorkspaceAgentStartupLogsAfter(ctx context.Context, arg GetWorkspaceAgentStartupLogsAfterParams) ([]WorkspaceAgentStartupLog, error)
|
||||
GetWorkspaceAgentStats(ctx context.Context, createdAt time.Time) ([]GetWorkspaceAgentStatsRow, error)
|
||||
GetWorkspaceAgentStatsAndLabels(ctx context.Context, createdAt time.Time) ([]GetWorkspaceAgentStatsAndLabelsRow, error)
|
||||
GetWorkspaceAgentsByResourceIDs(ctx context.Context, ids []uuid.UUID) ([]WorkspaceAgent, error)
|
||||
GetWorkspaceAgentsCreatedAfter(ctx context.Context, createdAt time.Time) ([]WorkspaceAgent, error)
|
||||
GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx context.Context, workspaceID uuid.UUID) ([]WorkspaceAgent, error)
|
||||
|
@ -6374,6 +6374,108 @@ func (q *sqlQuerier) GetWorkspaceAgentStats(ctx context.Context, createdAt time.
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getWorkspaceAgentStatsAndLabels = `-- name: GetWorkspaceAgentStatsAndLabels :many
|
||||
WITH agent_stats AS (
|
||||
SELECT
|
||||
user_id,
|
||||
agent_id,
|
||||
workspace_id,
|
||||
coalesce(SUM(rx_bytes), 0)::bigint AS rx_bytes,
|
||||
coalesce(SUM(tx_bytes), 0)::bigint AS tx_bytes
|
||||
FROM workspace_agent_stats
|
||||
WHERE workspace_agent_stats.created_at > $1
|
||||
GROUP BY user_id, agent_id, workspace_id
|
||||
), latest_agent_stats AS (
|
||||
SELECT
|
||||
a.agent_id,
|
||||
coalesce(SUM(session_count_vscode), 0)::bigint AS session_count_vscode,
|
||||
coalesce(SUM(session_count_ssh), 0)::bigint AS session_count_ssh,
|
||||
coalesce(SUM(session_count_jetbrains), 0)::bigint AS session_count_jetbrains,
|
||||
coalesce(SUM(session_count_reconnecting_pty), 0)::bigint AS session_count_reconnecting_pty,
|
||||
coalesce(SUM(connection_count), 0)::bigint AS connection_count,
|
||||
coalesce(MAX(connection_median_latency_ms), 0)::float AS connection_median_latency_ms
|
||||
FROM (
|
||||
SELECT id, created_at, user_id, agent_id, workspace_id, template_id, connections_by_proto, connection_count, rx_packets, rx_bytes, tx_packets, tx_bytes, connection_median_latency_ms, session_count_vscode, session_count_jetbrains, session_count_reconnecting_pty, session_count_ssh, ROW_NUMBER() OVER(PARTITION BY agent_id ORDER BY created_at DESC) AS rn
|
||||
FROM workspace_agent_stats
|
||||
-- The greater than 0 is to support legacy agents that don't report connection_median_latency_ms.
|
||||
WHERE created_at > $1 AND connection_median_latency_ms > 0
|
||||
) AS a
|
||||
WHERE a.rn = 1
|
||||
GROUP BY a.user_id, a.agent_id, a.workspace_id
|
||||
)
|
||||
SELECT
|
||||
users.username, workspace_agents.name AS agent_name, workspaces.name AS workspace_name, rx_bytes, tx_bytes,
|
||||
session_count_vscode, session_count_ssh, session_count_jetbrains, session_count_reconnecting_pty,
|
||||
connection_count, connection_median_latency_ms
|
||||
FROM
|
||||
agent_stats
|
||||
JOIN
|
||||
latest_agent_stats
|
||||
ON
|
||||
agent_stats.agent_id = latest_agent_stats.agent_id
|
||||
JOIN
|
||||
users
|
||||
ON
|
||||
users.id = agent_stats.user_id
|
||||
JOIN
|
||||
workspace_agents
|
||||
ON
|
||||
workspace_agents.id = agent_stats.agent_id
|
||||
JOIN
|
||||
workspaces
|
||||
ON
|
||||
workspaces.id = agent_stats.workspace_id
|
||||
`
|
||||
|
||||
type GetWorkspaceAgentStatsAndLabelsRow struct {
|
||||
Username string `db:"username" json:"username"`
|
||||
AgentName string `db:"agent_name" json:"agent_name"`
|
||||
WorkspaceName string `db:"workspace_name" json:"workspace_name"`
|
||||
RxBytes int64 `db:"rx_bytes" json:"rx_bytes"`
|
||||
TxBytes int64 `db:"tx_bytes" json:"tx_bytes"`
|
||||
SessionCountVSCode int64 `db:"session_count_vscode" json:"session_count_vscode"`
|
||||
SessionCountSSH int64 `db:"session_count_ssh" json:"session_count_ssh"`
|
||||
SessionCountJetBrains int64 `db:"session_count_jetbrains" json:"session_count_jetbrains"`
|
||||
SessionCountReconnectingPTY int64 `db:"session_count_reconnecting_pty" json:"session_count_reconnecting_pty"`
|
||||
ConnectionCount int64 `db:"connection_count" json:"connection_count"`
|
||||
ConnectionMedianLatencyMS float64 `db:"connection_median_latency_ms" json:"connection_median_latency_ms"`
|
||||
}
|
||||
|
||||
func (q *sqlQuerier) GetWorkspaceAgentStatsAndLabels(ctx context.Context, createdAt time.Time) ([]GetWorkspaceAgentStatsAndLabelsRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getWorkspaceAgentStatsAndLabels, createdAt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []GetWorkspaceAgentStatsAndLabelsRow
|
||||
for rows.Next() {
|
||||
var i GetWorkspaceAgentStatsAndLabelsRow
|
||||
if err := rows.Scan(
|
||||
&i.Username,
|
||||
&i.AgentName,
|
||||
&i.WorkspaceName,
|
||||
&i.RxBytes,
|
||||
&i.TxBytes,
|
||||
&i.SessionCountVSCode,
|
||||
&i.SessionCountSSH,
|
||||
&i.SessionCountJetBrains,
|
||||
&i.SessionCountReconnectingPTY,
|
||||
&i.ConnectionCount,
|
||||
&i.ConnectionMedianLatencyMS,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const insertWorkspaceAgentStat = `-- name: InsertWorkspaceAgentStat :one
|
||||
INSERT INTO
|
||||
workspace_agent_stats (
|
||||
|
@ -103,3 +103,55 @@ WITH agent_stats AS (
|
||||
) AS a WHERE a.rn = 1 GROUP BY a.user_id, a.agent_id, a.workspace_id, a.template_id
|
||||
)
|
||||
SELECT * FROM agent_stats JOIN latest_agent_stats ON agent_stats.agent_id = latest_agent_stats.agent_id;
|
||||
|
||||
-- name: GetWorkspaceAgentStatsAndLabels :many
|
||||
WITH agent_stats AS (
|
||||
SELECT
|
||||
user_id,
|
||||
agent_id,
|
||||
workspace_id,
|
||||
coalesce(SUM(rx_bytes), 0)::bigint AS rx_bytes,
|
||||
coalesce(SUM(tx_bytes), 0)::bigint AS tx_bytes
|
||||
FROM workspace_agent_stats
|
||||
WHERE workspace_agent_stats.created_at > $1
|
||||
GROUP BY user_id, agent_id, workspace_id
|
||||
), latest_agent_stats AS (
|
||||
SELECT
|
||||
a.agent_id,
|
||||
coalesce(SUM(session_count_vscode), 0)::bigint AS session_count_vscode,
|
||||
coalesce(SUM(session_count_ssh), 0)::bigint AS session_count_ssh,
|
||||
coalesce(SUM(session_count_jetbrains), 0)::bigint AS session_count_jetbrains,
|
||||
coalesce(SUM(session_count_reconnecting_pty), 0)::bigint AS session_count_reconnecting_pty,
|
||||
coalesce(SUM(connection_count), 0)::bigint AS connection_count,
|
||||
coalesce(MAX(connection_median_latency_ms), 0)::float AS connection_median_latency_ms
|
||||
FROM (
|
||||
SELECT *, ROW_NUMBER() OVER(PARTITION BY agent_id ORDER BY created_at DESC) AS rn
|
||||
FROM workspace_agent_stats
|
||||
-- The greater than 0 is to support legacy agents that don't report connection_median_latency_ms.
|
||||
WHERE created_at > $1 AND connection_median_latency_ms > 0
|
||||
) AS a
|
||||
WHERE a.rn = 1
|
||||
GROUP BY a.user_id, a.agent_id, a.workspace_id
|
||||
)
|
||||
SELECT
|
||||
users.username, workspace_agents.name AS agent_name, workspaces.name AS workspace_name, rx_bytes, tx_bytes,
|
||||
session_count_vscode, session_count_ssh, session_count_jetbrains, session_count_reconnecting_pty,
|
||||
connection_count, connection_median_latency_ms
|
||||
FROM
|
||||
agent_stats
|
||||
JOIN
|
||||
latest_agent_stats
|
||||
ON
|
||||
agent_stats.agent_id = latest_agent_stats.agent_id
|
||||
JOIN
|
||||
users
|
||||
ON
|
||||
users.id = agent_stats.user_id
|
||||
JOIN
|
||||
workspace_agents
|
||||
ON
|
||||
workspace_agents.id = agent_stats.agent_id
|
||||
JOIN
|
||||
workspaces
|
||||
ON
|
||||
workspaces.id = agent_stats.workspace_id;
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
)
|
||||
|
||||
// ActiveUsers tracks the number of users that have authenticated within the past hour.
|
||||
func ActiveUsers(ctx context.Context, registerer prometheus.Registerer, db database.Store, duration time.Duration) (context.CancelFunc, error) {
|
||||
func ActiveUsers(ctx context.Context, registerer prometheus.Registerer, db database.Store, duration time.Duration) (func(), error) {
|
||||
if duration == 0 {
|
||||
duration = 5 * time.Minute
|
||||
}
|
||||
@ -40,8 +40,10 @@ func ActiveUsers(ctx context.Context, registerer prometheus.Registerer, db datab
|
||||
}
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(ctx)
|
||||
done := make(chan struct{})
|
||||
ticker := time.NewTicker(duration)
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
@ -61,11 +63,14 @@ func ActiveUsers(ctx context.Context, registerer prometheus.Registerer, db datab
|
||||
gauge.Set(float64(len(distinctUsers)))
|
||||
}
|
||||
}()
|
||||
return cancelFunc, nil
|
||||
return func() {
|
||||
cancelFunc()
|
||||
<-done
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Workspaces tracks the total number of workspaces with labels on status.
|
||||
func Workspaces(ctx context.Context, registerer prometheus.Registerer, db database.Store, duration time.Duration) (context.CancelFunc, error) {
|
||||
func Workspaces(ctx context.Context, registerer prometheus.Registerer, db database.Store, duration time.Duration) (func(), error) {
|
||||
if duration == 0 {
|
||||
duration = 5 * time.Minute
|
||||
}
|
||||
@ -85,8 +90,11 @@ func Workspaces(ctx context.Context, registerer prometheus.Registerer, db databa
|
||||
gauge.WithLabelValues("pending").Set(0)
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(ctx)
|
||||
done := make(chan struct{})
|
||||
|
||||
ticker := time.NewTicker(duration)
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
@ -115,11 +123,14 @@ func Workspaces(ctx context.Context, registerer prometheus.Registerer, db databa
|
||||
}
|
||||
}
|
||||
}()
|
||||
return cancelFunc, nil
|
||||
return func() {
|
||||
cancelFunc()
|
||||
<-done
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Agents tracks the total number of workspaces with labels on status.
|
||||
func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, coordinator *atomic.Pointer[tailnet.Coordinator], derpMap *tailcfg.DERPMap, agentInactiveDisconnectTimeout, duration time.Duration) (context.CancelFunc, error) {
|
||||
func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, coordinator *atomic.Pointer[tailnet.Coordinator], derpMap *tailcfg.DERPMap, agentInactiveDisconnectTimeout, duration time.Duration) (func(), error) {
|
||||
if duration == 0 {
|
||||
duration = 1 * time.Minute
|
||||
}
|
||||
@ -151,7 +162,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
||||
Subsystem: "agents",
|
||||
Name: "connection_latencies_seconds",
|
||||
Help: "Agent connection latencies in seconds.",
|
||||
}, []string{"agent_id", "username", "workspace_name", "derp_region", "preferred"}))
|
||||
}, []string{"agent_name", "username", "workspace_name", "derp_region", "preferred"}))
|
||||
err = registerer.Register(agentsConnectionLatenciesGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -180,10 +191,14 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(ctx)
|
||||
// nolint:gocritic // Prometheus must collect metrics for all Coder users.
|
||||
ctx, cancelFunc := context.WithCancel(dbauthz.AsSystemRestricted(ctx))
|
||||
ctx = dbauthz.AsSystemRestricted(ctx)
|
||||
done := make(chan struct{})
|
||||
|
||||
ticker := time.NewTicker(duration)
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
@ -200,7 +215,7 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(ctx, "can't get workspace rows", slog.Error(err))
|
||||
continue
|
||||
goto done
|
||||
}
|
||||
|
||||
for _, workspace := range workspaceRows {
|
||||
@ -283,9 +298,183 @@ func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Regis
|
||||
agentsConnectionLatenciesGauge.Commit()
|
||||
agentsAppsGauge.Commit()
|
||||
|
||||
done:
|
||||
logger.Debug(ctx, "Agent metrics collection is done")
|
||||
metricsCollectorAgents.Observe(timer.ObserveDuration().Seconds())
|
||||
|
||||
ticker.Reset(duration)
|
||||
}
|
||||
}()
|
||||
return cancelFunc, nil
|
||||
return func() {
|
||||
cancelFunc()
|
||||
<-done
|
||||
}, nil
|
||||
}
|
||||
|
||||
func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, initialCreateAfter time.Time, duration time.Duration) (func(), error) {
|
||||
if duration == 0 {
|
||||
duration = 1 * time.Minute
|
||||
}
|
||||
|
||||
metricsCollectorAgentStats := prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "prometheusmetrics",
|
||||
Name: "agentstats_execution_seconds",
|
||||
Help: "Histogram for duration of agent stats metrics collection in seconds.",
|
||||
Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30},
|
||||
})
|
||||
err := registerer.Register(metricsCollectorAgentStats)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agentStatsTxBytesGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "agentstats",
|
||||
Name: "tx_bytes",
|
||||
Help: "Agent Tx bytes",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
err = registerer.Register(agentStatsTxBytesGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agentStatsRxBytesGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "agentstats",
|
||||
Name: "rx_bytes",
|
||||
Help: "Agent Rx bytes",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
err = registerer.Register(agentStatsRxBytesGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agentStatsConnectionCountGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "agentstats",
|
||||
Name: "connection_count",
|
||||
Help: "The number of established connections by agent",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
err = registerer.Register(agentStatsConnectionCountGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agentStatsConnectionMedianLatencyGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "agentstats",
|
||||
Name: "connection_median_latency_seconds",
|
||||
Help: "The median agent connection latency in seconds",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
err = registerer.Register(agentStatsConnectionMedianLatencyGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agentStatsSessionCountJetBrainsGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "agentstats",
|
||||
Name: "session_count_jetbrains",
|
||||
Help: "The number of session established by JetBrains",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
err = registerer.Register(agentStatsSessionCountJetBrainsGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agentStatsSessionCountReconnectingPTYGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "agentstats",
|
||||
Name: "session_count_reconnecting_pty",
|
||||
Help: "The number of session established by reconnecting PTY",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
err = registerer.Register(agentStatsSessionCountReconnectingPTYGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agentStatsSessionCountSSHGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "agentstats",
|
||||
Name: "session_count_ssh",
|
||||
Help: "The number of session established by SSH",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
err = registerer.Register(agentStatsSessionCountSSHGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agentStatsSessionCountVSCodeGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "coderd",
|
||||
Subsystem: "agentstats",
|
||||
Name: "session_count_vscode",
|
||||
Help: "The number of session established by VSCode",
|
||||
}, []string{"agent_name", "username", "workspace_name"}))
|
||||
err = registerer.Register(agentStatsSessionCountVSCodeGauge)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(ctx)
|
||||
done := make(chan struct{})
|
||||
|
||||
createdAfter := initialCreateAfter
|
||||
ticker := time.NewTicker(duration)
|
||||
go func() {
|
||||
defer close(done)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "Agent metrics collection is starting")
|
||||
timer := prometheus.NewTimer(metricsCollectorAgentStats)
|
||||
|
||||
checkpoint := time.Now()
|
||||
stats, err := db.GetWorkspaceAgentStatsAndLabels(ctx, createdAfter)
|
||||
if err != nil {
|
||||
logger.Error(ctx, "can't get agent stats", slog.Error(err))
|
||||
} else {
|
||||
for _, agentStat := range stats {
|
||||
agentStatsRxBytesGauge.WithLabelValues(VectorOperationAdd, float64(agentStat.RxBytes), agentStat.AgentName, agentStat.Username, agentStat.WorkspaceName)
|
||||
agentStatsTxBytesGauge.WithLabelValues(VectorOperationAdd, float64(agentStat.TxBytes), agentStat.AgentName, agentStat.Username, agentStat.WorkspaceName)
|
||||
|
||||
agentStatsConnectionCountGauge.WithLabelValues(VectorOperationSet, float64(agentStat.ConnectionCount), agentStat.AgentName, agentStat.Username, agentStat.WorkspaceName)
|
||||
agentStatsConnectionMedianLatencyGauge.WithLabelValues(VectorOperationSet, agentStat.ConnectionMedianLatencyMS/1000.0 /* (to seconds) */, agentStat.AgentName, agentStat.Username, agentStat.WorkspaceName)
|
||||
|
||||
agentStatsSessionCountJetBrainsGauge.WithLabelValues(VectorOperationSet, float64(agentStat.SessionCountJetBrains), agentStat.AgentName, agentStat.Username, agentStat.WorkspaceName)
|
||||
agentStatsSessionCountReconnectingPTYGauge.WithLabelValues(VectorOperationSet, float64(agentStat.SessionCountReconnectingPTY), agentStat.AgentName, agentStat.Username, agentStat.WorkspaceName)
|
||||
agentStatsSessionCountSSHGauge.WithLabelValues(VectorOperationSet, float64(agentStat.SessionCountSSH), agentStat.AgentName, agentStat.Username, agentStat.WorkspaceName)
|
||||
agentStatsSessionCountVSCodeGauge.WithLabelValues(VectorOperationSet, float64(agentStat.SessionCountVSCode), agentStat.AgentName, agentStat.Username, agentStat.WorkspaceName)
|
||||
}
|
||||
|
||||
if len(stats) > 0 {
|
||||
agentStatsRxBytesGauge.Commit()
|
||||
agentStatsTxBytesGauge.Commit()
|
||||
|
||||
agentStatsConnectionCountGauge.Commit()
|
||||
agentStatsConnectionMedianLatencyGauge.Commit()
|
||||
|
||||
agentStatsSessionCountJetBrainsGauge.Commit()
|
||||
agentStatsSessionCountReconnectingPTYGauge.Commit()
|
||||
agentStatsSessionCountSSHGauge.Commit()
|
||||
agentStatsSessionCountVSCodeGauge.Commit()
|
||||
}
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "Agent metrics collection is done")
|
||||
metricsCollectorAgentStats.Observe(timer.ObserveDuration().Seconds())
|
||||
|
||||
createdAfter = checkpoint
|
||||
ticker.Reset(duration)
|
||||
}
|
||||
}()
|
||||
return func() {
|
||||
cancelFunc()
|
||||
<-done
|
||||
}, nil
|
||||
}
|
||||
|
@ -3,6 +3,10 @@ package prometheusmetrics_test
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@ -20,6 +24,7 @@ import (
|
||||
"github.com/coder/coder/coderd/database/dbgen"
|
||||
"github.com/coder/coder/coderd/prometheusmetrics"
|
||||
"github.com/coder/coder/codersdk"
|
||||
"github.com/coder/coder/codersdk/agentsdk"
|
||||
"github.com/coder/coder/provisioner/echo"
|
||||
"github.com/coder/coder/provisionersdk/proto"
|
||||
"github.com/coder/coder/tailnet"
|
||||
@ -85,9 +90,9 @@ func TestActiveUsers(t *testing.T) {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
registry := prometheus.NewRegistry()
|
||||
cancel, err := prometheusmetrics.ActiveUsers(context.Background(), registry, tc.Database(t), time.Millisecond)
|
||||
closeFunc, err := prometheusmetrics.ActiveUsers(context.Background(), registry, tc.Database(t), time.Millisecond)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(cancel)
|
||||
t.Cleanup(closeFunc)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
metrics, err := registry.Gather()
|
||||
@ -217,9 +222,9 @@ func TestWorkspaces(t *testing.T) {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
registry := prometheus.NewRegistry()
|
||||
cancel, err := prometheusmetrics.Workspaces(context.Background(), registry, tc.Database(), time.Millisecond)
|
||||
closeFunc, err := prometheusmetrics.Workspaces(context.Background(), registry, tc.Database(), time.Millisecond)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(cancel)
|
||||
t.Cleanup(closeFunc)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
metrics, err := registry.Gather()
|
||||
@ -300,13 +305,17 @@ func TestAgents(t *testing.T) {
|
||||
agentInactiveDisconnectTimeout := 1 * time.Hour // don't need to focus on this value in tests
|
||||
registry := prometheus.NewRegistry()
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
|
||||
// when
|
||||
cancel, err := prometheusmetrics.Agents(context.Background(), slogtest.Make(t, nil), registry, db, &coordinatorPtr, derpMap, agentInactiveDisconnectTimeout, time.Millisecond)
|
||||
t.Cleanup(cancel)
|
||||
closeFunc, err := prometheusmetrics.Agents(ctx, slogtest.Make(t, &slogtest.Options{
|
||||
IgnoreErrors: true,
|
||||
}), registry, db, &coordinatorPtr, derpMap, agentInactiveDisconnectTimeout, time.Millisecond)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(closeFunc)
|
||||
|
||||
// then
|
||||
require.NoError(t, err)
|
||||
|
||||
var agentsUp bool
|
||||
var agentsConnections bool
|
||||
var agentsApps bool
|
||||
@ -352,3 +361,124 @@ func TestAgents(t *testing.T) {
|
||||
return agentsUp && agentsConnections && agentsApps && agentsExecutionInSeconds
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
}
|
||||
|
||||
func TestAgentStats(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Build sample workspaces with test agents and fake agent client
|
||||
client, _, api := coderdtest.NewWithAPI(t, &coderdtest.Options{IncludeProvisionerDaemon: true})
|
||||
db := api.Database
|
||||
|
||||
user := coderdtest.CreateFirstUser(t, client)
|
||||
|
||||
agent1 := prepareWorkspaceAndAgent(t, client, user, 1)
|
||||
agent2 := prepareWorkspaceAndAgent(t, client, user, 2)
|
||||
agent3 := prepareWorkspaceAndAgent(t, client, user, 3)
|
||||
|
||||
registry := prometheus.NewRegistry()
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
defer cancelFunc()
|
||||
|
||||
// given
|
||||
var err error
|
||||
var i int64
|
||||
for i = 0; i < 3; i++ {
|
||||
_, err = agent1.PostStats(ctx, &agentsdk.Stats{
|
||||
TxBytes: 1 + i, RxBytes: 2 + i,
|
||||
SessionCountVSCode: 3 + i, SessionCountJetBrains: 4 + i, SessionCountReconnectingPTY: 5 + i, SessionCountSSH: 6 + i,
|
||||
ConnectionCount: 7 + i, ConnectionMedianLatencyMS: 8000,
|
||||
ConnectionsByProto: map[string]int64{"TCP": 1},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = agent2.PostStats(ctx, &agentsdk.Stats{
|
||||
TxBytes: 2 + i, RxBytes: 4 + i,
|
||||
SessionCountVSCode: 6 + i, SessionCountJetBrains: 8 + i, SessionCountReconnectingPTY: 10 + i, SessionCountSSH: 12 + i,
|
||||
ConnectionCount: 8 + i, ConnectionMedianLatencyMS: 10000,
|
||||
ConnectionsByProto: map[string]int64{"TCP": 1},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = agent3.PostStats(ctx, &agentsdk.Stats{
|
||||
TxBytes: 3 + i, RxBytes: 6 + i,
|
||||
SessionCountVSCode: 12 + i, SessionCountJetBrains: 14 + i, SessionCountReconnectingPTY: 16 + i, SessionCountSSH: 18 + i,
|
||||
ConnectionCount: 9 + i, ConnectionMedianLatencyMS: 12000,
|
||||
ConnectionsByProto: map[string]int64{"TCP": 1},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// when
|
||||
//
|
||||
// Set initialCreateAfter to some time in the past, so that AgentStats would include all above PostStats,
|
||||
// and it doesn't depend on the real time.
|
||||
closeFunc, err := prometheusmetrics.AgentStats(ctx, slogtest.Make(t, &slogtest.Options{
|
||||
IgnoreErrors: true,
|
||||
}), registry, db, time.Now().Add(-time.Minute), time.Millisecond)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(closeFunc)
|
||||
|
||||
// then
|
||||
goldenFile, err := os.ReadFile("testdata/agent-stats.json")
|
||||
require.NoError(t, err)
|
||||
golden := map[string]int{}
|
||||
err = json.Unmarshal(goldenFile, &golden)
|
||||
require.NoError(t, err)
|
||||
|
||||
collected := map[string]int{}
|
||||
var executionSeconds bool
|
||||
assert.Eventually(t, func() bool {
|
||||
metrics, err := registry.Gather()
|
||||
assert.NoError(t, err)
|
||||
|
||||
if len(metrics) < 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, metric := range metrics {
|
||||
switch metric.GetName() {
|
||||
case "coderd_prometheusmetrics_agentstats_execution_seconds":
|
||||
executionSeconds = true
|
||||
case "coderd_agentstats_connection_count",
|
||||
"coderd_agentstats_connection_median_latency_seconds",
|
||||
"coderd_agentstats_rx_bytes",
|
||||
"coderd_agentstats_tx_bytes",
|
||||
"coderd_agentstats_session_count_jetbrains",
|
||||
"coderd_agentstats_session_count_reconnecting_pty",
|
||||
"coderd_agentstats_session_count_ssh",
|
||||
"coderd_agentstats_session_count_vscode":
|
||||
for _, m := range metric.Metric {
|
||||
// username:workspace:agent:metric = value
|
||||
collected[m.Label[1].GetValue()+":"+m.Label[2].GetValue()+":"+m.Label[0].GetValue()+":"+metric.GetName()] = int(m.Gauge.GetValue())
|
||||
}
|
||||
default:
|
||||
require.FailNowf(t, "unexpected metric collected", "metric: %s", metric.GetName())
|
||||
}
|
||||
}
|
||||
return executionSeconds && reflect.DeepEqual(golden, collected)
|
||||
}, testutil.WaitShort, testutil.IntervalFast)
|
||||
|
||||
// Keep this assertion, so that "go test" can print differences instead of "Condition never satisfied"
|
||||
assert.EqualValues(t, golden, collected)
|
||||
}
|
||||
|
||||
func prepareWorkspaceAndAgent(t *testing.T, client *codersdk.Client, user codersdk.CreateFirstUserResponse, workspaceNum int) *agentsdk.Client {
|
||||
authToken := uuid.NewString()
|
||||
|
||||
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
|
||||
Parse: echo.ParseComplete,
|
||||
ProvisionPlan: echo.ProvisionComplete,
|
||||
ProvisionApply: echo.ProvisionApplyWithAgent(authToken),
|
||||
})
|
||||
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
|
||||
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
|
||||
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID, func(cwr *codersdk.CreateWorkspaceRequest) {
|
||||
cwr.Name = fmt.Sprintf("workspace-%d", workspaceNum)
|
||||
})
|
||||
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
|
||||
|
||||
agentClient := agentsdk.New(client.URL)
|
||||
agentClient.SetSessionToken(authToken)
|
||||
return agentClient
|
||||
}
|
||||
|
26
coderd/prometheusmetrics/testdata/agent-stats.json
vendored
Normal file
26
coderd/prometheusmetrics/testdata/agent-stats.json
vendored
Normal file
@ -0,0 +1,26 @@
|
||||
{
|
||||
"testuser:workspace-1:example:coderd_agentstats_connection_count": 9,
|
||||
"testuser:workspace-1:example:coderd_agentstats_connection_median_latency_seconds": 8,
|
||||
"testuser:workspace-1:example:coderd_agentstats_rx_bytes": 9,
|
||||
"testuser:workspace-1:example:coderd_agentstats_session_count_jetbrains": 6,
|
||||
"testuser:workspace-1:example:coderd_agentstats_session_count_reconnecting_pty": 7,
|
||||
"testuser:workspace-1:example:coderd_agentstats_session_count_ssh": 8,
|
||||
"testuser:workspace-1:example:coderd_agentstats_session_count_vscode": 5,
|
||||
"testuser:workspace-1:example:coderd_agentstats_tx_bytes": 6,
|
||||
"testuser:workspace-2:example:coderd_agentstats_connection_count": 10,
|
||||
"testuser:workspace-2:example:coderd_agentstats_connection_median_latency_seconds": 10,
|
||||
"testuser:workspace-2:example:coderd_agentstats_rx_bytes": 15,
|
||||
"testuser:workspace-2:example:coderd_agentstats_session_count_jetbrains": 10,
|
||||
"testuser:workspace-2:example:coderd_agentstats_session_count_reconnecting_pty": 12,
|
||||
"testuser:workspace-2:example:coderd_agentstats_session_count_ssh": 14,
|
||||
"testuser:workspace-2:example:coderd_agentstats_session_count_vscode": 8,
|
||||
"testuser:workspace-2:example:coderd_agentstats_tx_bytes": 9,
|
||||
"testuser:workspace-3:example:coderd_agentstats_connection_count": 11,
|
||||
"testuser:workspace-3:example:coderd_agentstats_connection_median_latency_seconds": 12,
|
||||
"testuser:workspace-3:example:coderd_agentstats_rx_bytes": 21,
|
||||
"testuser:workspace-3:example:coderd_agentstats_session_count_jetbrains": 16,
|
||||
"testuser:workspace-3:example:coderd_agentstats_session_count_reconnecting_pty": 18,
|
||||
"testuser:workspace-3:example:coderd_agentstats_session_count_ssh": 20,
|
||||
"testuser:workspace-3:example:coderd_agentstats_session_count_vscode": 14,
|
||||
"testuser:workspace-3:example:coderd_agentstats_tx_bytes": 12
|
||||
}
|
Reference in New Issue
Block a user