diff --git a/coderd/database/dbauthz/system.go b/coderd/database/dbauthz/system.go index 5418c57c96..d46aff267d 100644 --- a/coderd/database/dbauthz/system.go +++ b/coderd/database/dbauthz/system.go @@ -284,6 +284,10 @@ func (q *querier) GetDeploymentWorkspaceAgentStats(ctx context.Context, createdA return q.db.GetDeploymentWorkspaceAgentStats(ctx, createdAfter) } +func (q *querier) GetWorkspaceAgentStats(ctx context.Context, createdAfter time.Time) ([]database.GetWorkspaceAgentStatsRow, error) { + return q.db.GetWorkspaceAgentStats(ctx, createdAfter) +} + func (q *querier) GetDeploymentWorkspaceStats(ctx context.Context) (database.GetDeploymentWorkspaceStatsRow, error) { return q.db.GetDeploymentWorkspaceStats(ctx) } diff --git a/coderd/database/dbfake/databasefake.go b/coderd/database/dbfake/databasefake.go index 8a8eb652dd..cf8fa80df5 100644 --- a/coderd/database/dbfake/databasefake.go +++ b/coderd/database/dbfake/databasefake.go @@ -3707,6 +3707,79 @@ func (q *fakeQuerier) GetDeploymentWorkspaceStats(ctx context.Context) (database return stat, nil } +func (q *fakeQuerier) GetWorkspaceAgentStats(_ context.Context, createdAfter time.Time) ([]database.GetWorkspaceAgentStatsRow, error) { + q.mutex.RLock() + defer q.mutex.RUnlock() + + agentStatsCreatedAfter := make([]database.WorkspaceAgentStat, 0) + for _, agentStat := range q.workspaceAgentStats { + if agentStat.CreatedAt.After(createdAfter) { + agentStatsCreatedAfter = append(agentStatsCreatedAfter, agentStat) + } + } + + latestAgentStats := map[uuid.UUID]database.WorkspaceAgentStat{} + for _, agentStat := range q.workspaceAgentStats { + if agentStat.CreatedAt.After(createdAfter) { + latestAgentStats[agentStat.AgentID] = agentStat + } + } + + statByAgent := map[uuid.UUID]database.GetWorkspaceAgentStatsRow{} + for _, agentStat := range latestAgentStats { + stat := statByAgent[agentStat.AgentID] + stat.SessionCountVSCode += agentStat.SessionCountVSCode + stat.SessionCountJetBrains += agentStat.SessionCountJetBrains + stat.SessionCountReconnectingPTY += agentStat.SessionCountReconnectingPTY + stat.SessionCountSSH += agentStat.SessionCountSSH + statByAgent[stat.AgentID] = stat + } + + latenciesByAgent := map[uuid.UUID][]float64{} + minimumDateByAgent := map[uuid.UUID]time.Time{} + for _, agentStat := range agentStatsCreatedAfter { + if agentStat.ConnectionMedianLatencyMS <= 0 { + continue + } + stat := statByAgent[agentStat.AgentID] + minimumDate := minimumDateByAgent[agentStat.AgentID] + if agentStat.CreatedAt.Before(minimumDate) || minimumDate.IsZero() { + minimumDateByAgent[agentStat.AgentID] = agentStat.CreatedAt + } + stat.WorkspaceRxBytes += agentStat.RxBytes + stat.WorkspaceTxBytes += agentStat.TxBytes + statByAgent[agentStat.AgentID] = stat + latenciesByAgent[agentStat.AgentID] = append(latenciesByAgent[agentStat.AgentID], agentStat.ConnectionMedianLatencyMS) + } + + tryPercentile := func(fs []float64, p float64) float64 { + if len(fs) == 0 { + return -1 + } + sort.Float64s(fs) + return fs[int(float64(len(fs))*p/100)] + } + + for _, stat := range statByAgent { + stat.AggregatedFrom = minimumDateByAgent[stat.AgentID] + statByAgent[stat.AgentID] = stat + + latencies, ok := latenciesByAgent[stat.AgentID] + if !ok { + continue + } + stat.WorkspaceConnectionLatency50 = tryPercentile(latencies, 50) + stat.WorkspaceConnectionLatency95 = tryPercentile(latencies, 95) + statByAgent[stat.AgentID] = stat + } + + stats := make([]database.GetWorkspaceAgentStatsRow, 0, len(statByAgent)) + for _, agent := range statByAgent { + stats = append(stats, agent) + } + return stats, nil +} + func (q *fakeQuerier) UpdateWorkspaceTTLToBeWithinTemplateMax(_ context.Context, arg database.UpdateWorkspaceTTLToBeWithinTemplateMaxParams) error { if err := validateDatabaseType(arg); err != nil { return err diff --git a/coderd/database/querier.go b/coderd/database/querier.go index 14a0f2d705..174e5fcec1 100644 --- a/coderd/database/querier.go +++ b/coderd/database/querier.go @@ -121,6 +121,7 @@ type sqlcQuerier interface { GetWorkspaceAgentByAuthToken(ctx context.Context, authToken uuid.UUID) (WorkspaceAgent, error) GetWorkspaceAgentByID(ctx context.Context, id uuid.UUID) (WorkspaceAgent, error) GetWorkspaceAgentByInstanceID(ctx context.Context, authInstanceID string) (WorkspaceAgent, error) + GetWorkspaceAgentStats(ctx context.Context, createdAt time.Time) ([]GetWorkspaceAgentStatsRow, error) GetWorkspaceAgentsByResourceIDs(ctx context.Context, ids []uuid.UUID) ([]WorkspaceAgent, error) GetWorkspaceAgentsCreatedAfter(ctx context.Context, createdAt time.Time) ([]WorkspaceAgent, error) GetWorkspaceAppByAgentIDAndSlug(ctx context.Context, arg GetWorkspaceAppByAgentIDAndSlugParams) (WorkspaceApp, error) diff --git a/coderd/database/queries.sql.go b/coderd/database/queries.sql.go index b13551978f..952307dc61 100644 --- a/coderd/database/queries.sql.go +++ b/coderd/database/queries.sql.go @@ -5632,6 +5632,88 @@ func (q *sqlQuerier) GetTemplateDAUs(ctx context.Context, templateID uuid.UUID) return items, nil } +const getWorkspaceAgentStats = `-- name: GetWorkspaceAgentStats :many +WITH agent_stats AS ( + SELECT + user_id, + agent_id, + workspace_id, + template_id, + MIN(created_at)::timestamptz AS aggregated_from, + coalesce(SUM(rx_bytes), 0)::bigint AS workspace_rx_bytes, + coalesce(SUM(tx_bytes), 0)::bigint AS workspace_tx_bytes, + coalesce((PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY connection_median_latency_ms)), -1)::FLOAT AS workspace_connection_latency_50, + coalesce((PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY connection_median_latency_ms)), -1)::FLOAT AS workspace_connection_latency_95 + FROM workspace_agent_stats + -- The greater than 0 is to support legacy agents that don't report connection_median_latency_ms. + WHERE workspace_agent_stats.created_at > $1 AND connection_median_latency_ms > 0 GROUP BY user_id, agent_id, workspace_id, template_id +), latest_agent_stats AS ( + SELECT + coalesce(SUM(session_count_vscode), 0)::bigint AS session_count_vscode, + coalesce(SUM(session_count_ssh), 0)::bigint AS session_count_ssh, + coalesce(SUM(session_count_jetbrains), 0)::bigint AS session_count_jetbrains, + coalesce(SUM(session_count_reconnecting_pty), 0)::bigint AS session_count_reconnecting_pty + FROM ( + SELECT id, created_at, user_id, agent_id, workspace_id, template_id, connections_by_proto, connection_count, rx_packets, rx_bytes, tx_packets, tx_bytes, connection_median_latency_ms, session_count_vscode, session_count_jetbrains, session_count_reconnecting_pty, session_count_ssh, ROW_NUMBER() OVER(PARTITION BY agent_id ORDER BY created_at DESC) AS rn + FROM workspace_agent_stats WHERE created_at > $1 + ) AS a WHERE a.rn = 1 GROUP BY a.user_id, a.agent_id, a.workspace_id, a.template_id +) +SELECT user_id, agent_id, workspace_id, template_id, aggregated_from, workspace_rx_bytes, workspace_tx_bytes, workspace_connection_latency_50, workspace_connection_latency_95, session_count_vscode, session_count_ssh, session_count_jetbrains, session_count_reconnecting_pty FROM agent_stats, latest_agent_stats +` + +type GetWorkspaceAgentStatsRow struct { + UserID uuid.UUID `db:"user_id" json:"user_id"` + AgentID uuid.UUID `db:"agent_id" json:"agent_id"` + WorkspaceID uuid.UUID `db:"workspace_id" json:"workspace_id"` + TemplateID uuid.UUID `db:"template_id" json:"template_id"` + AggregatedFrom time.Time `db:"aggregated_from" json:"aggregated_from"` + WorkspaceRxBytes int64 `db:"workspace_rx_bytes" json:"workspace_rx_bytes"` + WorkspaceTxBytes int64 `db:"workspace_tx_bytes" json:"workspace_tx_bytes"` + WorkspaceConnectionLatency50 float64 `db:"workspace_connection_latency_50" json:"workspace_connection_latency_50"` + WorkspaceConnectionLatency95 float64 `db:"workspace_connection_latency_95" json:"workspace_connection_latency_95"` + SessionCountVSCode int64 `db:"session_count_vscode" json:"session_count_vscode"` + SessionCountSSH int64 `db:"session_count_ssh" json:"session_count_ssh"` + SessionCountJetBrains int64 `db:"session_count_jetbrains" json:"session_count_jetbrains"` + SessionCountReconnectingPTY int64 `db:"session_count_reconnecting_pty" json:"session_count_reconnecting_pty"` +} + +func (q *sqlQuerier) GetWorkspaceAgentStats(ctx context.Context, createdAt time.Time) ([]GetWorkspaceAgentStatsRow, error) { + rows, err := q.db.QueryContext(ctx, getWorkspaceAgentStats, createdAt) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetWorkspaceAgentStatsRow + for rows.Next() { + var i GetWorkspaceAgentStatsRow + if err := rows.Scan( + &i.UserID, + &i.AgentID, + &i.WorkspaceID, + &i.TemplateID, + &i.AggregatedFrom, + &i.WorkspaceRxBytes, + &i.WorkspaceTxBytes, + &i.WorkspaceConnectionLatency50, + &i.WorkspaceConnectionLatency95, + &i.SessionCountVSCode, + &i.SessionCountSSH, + &i.SessionCountJetBrains, + &i.SessionCountReconnectingPTY, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const insertWorkspaceAgentStat = `-- name: InsertWorkspaceAgentStat :one INSERT INTO workspace_agent_stats ( diff --git a/coderd/database/queries/workspaceagentstats.sql b/coderd/database/queries/workspaceagentstats.sql index 6579b04c51..5e3a11d1ca 100644 --- a/coderd/database/queries/workspaceagentstats.sql +++ b/coderd/database/queries/workspaceagentstats.sql @@ -74,3 +74,31 @@ WITH agent_stats AS ( ) AS a WHERE a.rn = 1 ) SELECT * FROM agent_stats, latest_agent_stats; + +-- name: GetWorkspaceAgentStats :many +WITH agent_stats AS ( + SELECT + user_id, + agent_id, + workspace_id, + template_id, + MIN(created_at)::timestamptz AS aggregated_from, + coalesce(SUM(rx_bytes), 0)::bigint AS workspace_rx_bytes, + coalesce(SUM(tx_bytes), 0)::bigint AS workspace_tx_bytes, + coalesce((PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY connection_median_latency_ms)), -1)::FLOAT AS workspace_connection_latency_50, + coalesce((PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY connection_median_latency_ms)), -1)::FLOAT AS workspace_connection_latency_95 + FROM workspace_agent_stats + -- The greater than 0 is to support legacy agents that don't report connection_median_latency_ms. + WHERE workspace_agent_stats.created_at > $1 AND connection_median_latency_ms > 0 GROUP BY user_id, agent_id, workspace_id, template_id +), latest_agent_stats AS ( + SELECT + coalesce(SUM(session_count_vscode), 0)::bigint AS session_count_vscode, + coalesce(SUM(session_count_ssh), 0)::bigint AS session_count_ssh, + coalesce(SUM(session_count_jetbrains), 0)::bigint AS session_count_jetbrains, + coalesce(SUM(session_count_reconnecting_pty), 0)::bigint AS session_count_reconnecting_pty + FROM ( + SELECT *, ROW_NUMBER() OVER(PARTITION BY agent_id ORDER BY created_at DESC) AS rn + FROM workspace_agent_stats WHERE created_at > $1 + ) AS a WHERE a.rn = 1 GROUP BY a.user_id, a.agent_id, a.workspace_id, a.template_id +) +SELECT * FROM agent_stats, latest_agent_stats; diff --git a/coderd/telemetry/telemetry.go b/coderd/telemetry/telemetry.go index 1200ddbb42..4ef35e7dd1 100644 --- a/coderd/telemetry/telemetry.go +++ b/coderd/telemetry/telemetry.go @@ -465,6 +465,17 @@ func (r *remoteReporter) createSnapshot() (*Snapshot, error) { } return nil }) + eg.Go(func() error { + stats, err := r.options.Database.GetWorkspaceAgentStats(ctx, createdAfter) + if err != nil { + return xerrors.Errorf("get workspace agent stats: %w", err) + } + snapshot.WorkspaceAgentStats = make([]WorkspaceAgentStat, 0, len(stats)) + for _, stat := range stats { + snapshot.WorkspaceAgentStats = append(snapshot.WorkspaceAgentStats, ConvertWorkspaceAgentStat(stat)) + } + return nil + }) err := eg.Wait() if err != nil { @@ -564,6 +575,25 @@ func ConvertWorkspaceAgent(agent database.WorkspaceAgent) WorkspaceAgent { return snapAgent } +// ConvertWorkspaceAgentStat anonymizes a workspace agent stat. +func ConvertWorkspaceAgentStat(stat database.GetWorkspaceAgentStatsRow) WorkspaceAgentStat { + return WorkspaceAgentStat{ + UserID: stat.UserID, + TemplateID: stat.TemplateID, + WorkspaceID: stat.WorkspaceID, + AgentID: stat.AgentID, + AggregatedFrom: stat.AggregatedFrom, + ConnectionLatency50: stat.WorkspaceConnectionLatency50, + ConnectionLatency95: stat.WorkspaceConnectionLatency95, + RxBytes: stat.WorkspaceRxBytes, + TxBytes: stat.WorkspaceTxBytes, + SessionCountVSCode: stat.SessionCountVSCode, + SessionCountJetBrains: stat.SessionCountJetBrains, + SessionCountReconnectingPTY: stat.SessionCountReconnectingPTY, + SessionCountSSH: stat.SessionCountSSH, + } +} + // ConvertWorkspaceApp anonymizes a workspace app. func ConvertWorkspaceApp(app database.WorkspaceApp) WorkspaceApp { return WorkspaceApp{ @@ -666,6 +696,7 @@ type Snapshot struct { Workspaces []Workspace `json:"workspaces"` WorkspaceApps []WorkspaceApp `json:"workspace_apps"` WorkspaceAgents []WorkspaceAgent `json:"workspace_agents"` + WorkspaceAgentStats []WorkspaceAgentStat `json:"workspace_agent_stats"` WorkspaceBuilds []WorkspaceBuild `json:"workspace_build"` WorkspaceResources []WorkspaceResource `json:"workspace_resources"` WorkspaceResourceMetadata []WorkspaceResourceMetadata `json:"workspace_resource_metadata"` @@ -754,6 +785,22 @@ type WorkspaceAgent struct { ShutdownScript bool `json:"shutdown_script"` } +type WorkspaceAgentStat struct { + UserID uuid.UUID `json:"user_id"` + TemplateID uuid.UUID `json:"template_id"` + WorkspaceID uuid.UUID `json:"workspace_id"` + AggregatedFrom time.Time `json:"aggregated_from"` + AgentID uuid.UUID `json:"agent_id"` + RxBytes int64 `json:"rx_bytes"` + TxBytes int64 `json:"tx_bytes"` + ConnectionLatency50 float64 `json:"connection_latency_50"` + ConnectionLatency95 float64 `json:"connection_latency_95"` + SessionCountVSCode int64 `json:"session_count_vscode"` + SessionCountJetBrains int64 `json:"session_count_jetbrains"` + SessionCountReconnectingPTY int64 `json:"session_count_reconnecting_pty"` + SessionCountSSH int64 `json:"session_count_ssh"` +} + type WorkspaceApp struct { ID uuid.UUID `json:"id"` CreatedAt time.Time `json:"created_at"` diff --git a/coderd/telemetry/telemetry_test.go b/coderd/telemetry/telemetry_test.go index cdd1d42f8b..5d55bd4f01 100644 --- a/coderd/telemetry/telemetry_test.go +++ b/coderd/telemetry/telemetry_test.go @@ -67,6 +67,7 @@ func TestTelemetry(t *testing.T) { _ = dbgen.WorkspaceResource(t, db, database.WorkspaceResource{ Transition: database.WorkspaceTransitionStart, }) + _ = dbgen.WorkspaceAgentStat(t, db, database.WorkspaceAgentStat{}) _, err = db.InsertLicense(ctx, database.InsertLicenseParams{ UploadedAt: database.Now(), JWT: "", @@ -86,6 +87,7 @@ func TestTelemetry(t *testing.T) { require.Len(t, snapshot.WorkspaceAgents, 1) require.Len(t, snapshot.WorkspaceBuilds, 1) require.Len(t, snapshot.WorkspaceResources, 1) + require.Len(t, snapshot.WorkspaceAgentStats, 1) }) t.Run("HashedEmail", func(t *testing.T) { t.Parallel()