package prometheusmetrics import ( "context" "database/sql" "errors" "fmt" "strconv" "strings" "sync/atomic" "time" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "golang.org/x/xerrors" "tailscale.com/tailcfg" "cdr.dev/slog" "github.com/coder/coder/v2/coderd/agentmetrics" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/dbauthz" "github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/tailnet" "github.com/coder/quartz" ) const defaultRefreshRate = time.Minute // ActiveUsers tracks the number of users that have authenticated within the past hour. func ActiveUsers(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, duration time.Duration) (func(), error) { if duration == 0 { duration = defaultRefreshRate } gauge := prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "api", Name: "active_users_duration_hour", Help: "The number of users that have been active within the last hour.", }) err := registerer.Register(gauge) if err != nil { return nil, err } ctx, cancelFunc := context.WithCancel(ctx) done := make(chan struct{}) ticker := time.NewTicker(duration) go func() { defer close(done) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: } apiKeys, err := db.GetAPIKeysLastUsedAfter(ctx, dbtime.Now().Add(-1*time.Hour)) if err != nil { logger.Error(ctx, "get api keys for active users prometheus metric", slog.Error(err)) continue } distinctUsers := map[uuid.UUID]struct{}{} for _, apiKey := range apiKeys { distinctUsers[apiKey.UserID] = struct{}{} } gauge.Set(float64(len(distinctUsers))) } }() return func() { cancelFunc() <-done }, nil } // Users tracks the total number of registered users, partitioned by status. func Users(ctx context.Context, logger slog.Logger, clk quartz.Clock, registerer prometheus.Registerer, db database.Store, duration time.Duration) (func(), error) { if duration == 0 { // It's not super important this tracks real-time. duration = defaultRefreshRate * 5 } gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "api", Name: "total_user_count", Help: "The total number of registered users, partitioned by status.", }, []string{"status"}) err := registerer.Register(gauge) if err != nil { return nil, xerrors.Errorf("register total_user_count gauge: %w", err) } ctx, cancelFunc := context.WithCancel(ctx) done := make(chan struct{}) ticker := clk.NewTicker(duration) go func() { defer close(done) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: } gauge.Reset() //nolint:gocritic // This is a system service that needs full access //to the users table. users, err := db.GetUsers(dbauthz.AsSystemRestricted(ctx), database.GetUsersParams{}) if err != nil { logger.Error(ctx, "get all users for prometheus metrics", slog.Error(err)) continue } for _, user := range users { gauge.WithLabelValues(string(user.Status)).Inc() } } }() return func() { cancelFunc() <-done }, nil } // Workspaces tracks the total number of workspaces with labels on status. func Workspaces(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, duration time.Duration) (func(), error) { if duration == 0 { duration = defaultRefreshRate } // TODO: deprecated: remove in the future // See: https://github.com/coder/coder/issues/12999 // Deprecation reason: gauge metrics should avoid suffix `_total`` workspaceLatestBuildTotalsDeprecated := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "api", Name: "workspace_latest_build_total", Help: "DEPRECATED: use coderd_api_workspace_latest_build instead", }, []string{"status"}) if err := registerer.Register(workspaceLatestBuildTotalsDeprecated); err != nil { return nil, err } workspaceLatestBuildTotals := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "api", Name: "workspace_latest_build", Help: "The current number of workspace builds by status.", }, []string{"status"}) if err := registerer.Register(workspaceLatestBuildTotals); err != nil { return nil, err } workspaceLatestBuildStatuses := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Name: "workspace_latest_build_status", Help: "The current workspace statuses by template, transition, and owner.", }, []string{"status", "template_name", "template_version", "workspace_owner", "workspace_transition"}) if err := registerer.Register(workspaceLatestBuildStatuses); err != nil { return nil, err } ctx, cancelFunc := context.WithCancel(ctx) done := make(chan struct{}) updateWorkspaceTotals := func() { builds, err := db.GetLatestWorkspaceBuilds(ctx) if err != nil { if errors.Is(err, sql.ErrNoRows) { // clear all series if there are no database entries workspaceLatestBuildTotals.Reset() } else { logger.Warn(ctx, "failed to load latest workspace builds", slog.Error(err)) } return } jobIDs := make([]uuid.UUID, 0, len(builds)) for _, build := range builds { jobIDs = append(jobIDs, build.JobID) } jobs, err := db.GetProvisionerJobsByIDs(ctx, jobIDs) if err != nil { ids := make([]string, 0, len(jobIDs)) for _, id := range jobIDs { ids = append(ids, id.String()) } logger.Warn(ctx, "failed to load provisioner jobs", slog.F("ids", ids), slog.Error(err)) return } workspaceLatestBuildTotals.Reset() for _, job := range jobs { status := codersdk.ProvisionerJobStatus(job.JobStatus) workspaceLatestBuildTotals.WithLabelValues(string(status)).Add(1) // TODO: deprecated: remove in the future workspaceLatestBuildTotalsDeprecated.WithLabelValues(string(status)).Add(1) } } updateWorkspaceStatuses := func() { ws, err := db.GetWorkspaces(ctx, database.GetWorkspacesParams{ Deleted: false, WithSummary: false, }) if err != nil { if errors.Is(err, sql.ErrNoRows) { // clear all series if there are no database entries workspaceLatestBuildStatuses.Reset() } logger.Warn(ctx, "failed to load active workspaces", slog.Error(err)) return } workspaceLatestBuildStatuses.Reset() for _, w := range ws { workspaceLatestBuildStatuses.WithLabelValues(string(w.LatestBuildStatus), w.TemplateName, w.TemplateVersionName.String, w.OwnerUsername, string(w.LatestBuildTransition)).Add(1) } } // Use time.Nanosecond to force an initial tick. It will be reset to the // correct duration after executing once. ticker := time.NewTicker(time.Nanosecond) doTick := func() { defer ticker.Reset(duration) updateWorkspaceTotals() updateWorkspaceStatuses() } go func() { defer close(done) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: doTick() } } }() return func() { cancelFunc() <-done }, nil } // Agents tracks the total number of workspaces with labels on status. func Agents(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, coordinator *atomic.Pointer[tailnet.Coordinator], derpMapFn func() *tailcfg.DERPMap, agentInactiveDisconnectTimeout, duration time.Duration) (func(), error) { if duration == 0 { duration = defaultRefreshRate } agentsGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agents", Name: "up", Help: "The number of active agents per workspace.", }, []string{agentmetrics.LabelUsername, agentmetrics.LabelWorkspaceName, agentmetrics.LabelTemplateName, "template_version"})) err := registerer.Register(agentsGauge) if err != nil { return nil, err } agentsConnectionsGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agents", Name: "connections", Help: "Agent connections with statuses.", }, []string{agentmetrics.LabelAgentName, agentmetrics.LabelUsername, agentmetrics.LabelWorkspaceName, "status", "lifecycle_state", "tailnet_node"})) err = registerer.Register(agentsConnectionsGauge) if err != nil { return nil, err } agentsConnectionLatenciesGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agents", Name: "connection_latencies_seconds", Help: "Agent connection latencies in seconds.", }, []string{agentmetrics.LabelAgentName, agentmetrics.LabelUsername, agentmetrics.LabelWorkspaceName, "derp_region", "preferred"})) err = registerer.Register(agentsConnectionLatenciesGauge) if err != nil { return nil, err } agentsAppsGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agents", Name: "apps", Help: "Agent applications with statuses.", }, []string{agentmetrics.LabelAgentName, agentmetrics.LabelUsername, agentmetrics.LabelWorkspaceName, "app_name", "health"})) err = registerer.Register(agentsAppsGauge) if err != nil { return nil, err } metricsCollectorAgents := prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "coderd", Subsystem: "prometheusmetrics", Name: "agents_execution_seconds", Help: "Histogram for duration of agents metrics collection in seconds.", Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30}, }) err = registerer.Register(metricsCollectorAgents) if err != nil { return nil, err } ctx, cancelFunc := context.WithCancel(ctx) // nolint:gocritic // Prometheus must collect metrics for all Coder users. ctx = dbauthz.AsSystemRestricted(ctx) done := make(chan struct{}) // Use time.Nanosecond to force an initial tick. It will be reset to the // correct duration after executing once. ticker := time.NewTicker(time.Nanosecond) go func() { defer close(done) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: } logger.Debug(ctx, "agent metrics collection is starting") timer := prometheus.NewTimer(metricsCollectorAgents) derpMap := derpMapFn() workspaceRows, err := db.GetWorkspaces(ctx, database.GetWorkspacesParams{ AgentInactiveDisconnectTimeoutSeconds: int64(agentInactiveDisconnectTimeout.Seconds()), }) if err != nil { logger.Error(ctx, "can't get workspace rows", slog.Error(err)) goto done } for _, workspace := range workspaceRows { templateName := workspace.TemplateName templateVersionName := workspace.TemplateVersionName.String if !workspace.TemplateVersionName.Valid { templateVersionName = "unknown" } user, err := db.GetUserByID(ctx, workspace.OwnerID) if err != nil { logger.Error(ctx, "can't get user from the database", slog.F("user_id", workspace.OwnerID), slog.Error(err)) agentsGauge.WithLabelValues(VectorOperationAdd, 0, user.Username, workspace.Name, templateName, templateVersionName) continue } agents, err := db.GetWorkspaceAgentsInLatestBuildByWorkspaceID(ctx, workspace.ID) if err != nil { logger.Error(ctx, "can't get workspace agents", slog.F("workspace_id", workspace.ID), slog.Error(err)) agentsGauge.WithLabelValues(VectorOperationAdd, 0, user.Username, workspace.Name, templateName, templateVersionName) continue } if len(agents) == 0 { logger.Debug(ctx, "workspace agents are unavailable", slog.F("workspace_id", workspace.ID)) agentsGauge.WithLabelValues(VectorOperationAdd, 0, user.Username, workspace.Name, templateName, templateVersionName) continue } for _, agent := range agents { // Collect information about agents agentsGauge.WithLabelValues(VectorOperationAdd, 1, user.Username, workspace.Name, templateName, templateVersionName) connectionStatus := agent.Status(agentInactiveDisconnectTimeout) node := (*coordinator.Load()).Node(agent.ID) tailnetNode := "unknown" if node != nil { tailnetNode = node.ID.String() } agentsConnectionsGauge.WithLabelValues(VectorOperationSet, 1, agent.Name, user.Username, workspace.Name, string(connectionStatus.Status), string(agent.LifecycleState), tailnetNode) if node == nil { logger.Debug(ctx, "can't read in-memory node for agent", slog.F("agent_id", agent.ID)) } else { // Collect information about connection latencies for rawRegion, latency := range node.DERPLatency { regionParts := strings.SplitN(rawRegion, "-", 2) regionID, err := strconv.Atoi(regionParts[0]) if err != nil { logger.Error(ctx, "can't convert DERP region", slog.F("agent_id", agent.ID), slog.F("raw_region", rawRegion), slog.Error(err)) continue } region, found := derpMap.Regions[regionID] if !found { // It's possible that a workspace agent is using an old DERPMap // and reports regions that do not exist. If that's the case, // report the region as unknown! region = &tailcfg.DERPRegion{ RegionID: regionID, RegionName: fmt.Sprintf("Unnamed %d", regionID), } } agentsConnectionLatenciesGauge.WithLabelValues(VectorOperationSet, latency, agent.Name, user.Username, workspace.Name, region.RegionName, fmt.Sprintf("%v", node.PreferredDERP == regionID)) } } // Collect information about registered applications apps, err := db.GetWorkspaceAppsByAgentID(ctx, agent.ID) if err != nil && !errors.Is(err, sql.ErrNoRows) { logger.Error(ctx, "can't get workspace apps", slog.F("agent_id", agent.ID), slog.Error(err)) continue } for _, app := range apps { agentsAppsGauge.WithLabelValues(VectorOperationAdd, 1, agent.Name, user.Username, workspace.Name, app.DisplayName, string(app.Health)) } } } agentsGauge.Commit() agentsConnectionsGauge.Commit() agentsConnectionLatenciesGauge.Commit() agentsAppsGauge.Commit() done: logger.Debug(ctx, "agent metrics collection is done") timer.ObserveDuration() ticker.Reset(duration) } }() return func() { cancelFunc() <-done }, nil } // nolint:revive // This will be removed alongside the workspaceusage experiment func AgentStats(ctx context.Context, logger slog.Logger, registerer prometheus.Registerer, db database.Store, initialCreateAfter time.Time, duration time.Duration, aggregateByLabels []string, usage bool) (func(), error) { if duration == 0 { duration = defaultRefreshRate } if len(aggregateByLabels) == 0 { aggregateByLabels = agentmetrics.LabelAgentStats } aggregateByLabels = filterAcceptableAgentLabels(aggregateByLabels) metricsCollectorAgentStats := prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "coderd", Subsystem: "prometheusmetrics", Name: "agentstats_execution_seconds", Help: "Histogram for duration of agent stats metrics collection in seconds.", Buckets: []float64{0.001, 0.005, 0.010, 0.025, 0.050, 0.100, 0.500, 1, 5, 10, 30}, }) err := registerer.Register(metricsCollectorAgentStats) if err != nil { return nil, err } agentStatsTxBytesGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agentstats", Name: "tx_bytes", Help: "Agent Tx bytes", }, aggregateByLabels)) err = registerer.Register(agentStatsTxBytesGauge) if err != nil { return nil, err } agentStatsRxBytesGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agentstats", Name: "rx_bytes", Help: "Agent Rx bytes", }, aggregateByLabels)) err = registerer.Register(agentStatsRxBytesGauge) if err != nil { return nil, err } agentStatsConnectionCountGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agentstats", Name: "connection_count", Help: "The number of established connections by agent", }, aggregateByLabels)) err = registerer.Register(agentStatsConnectionCountGauge) if err != nil { return nil, err } agentStatsConnectionMedianLatencyGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agentstats", Name: "connection_median_latency_seconds", Help: "The median agent connection latency in seconds", }, aggregateByLabels)) err = registerer.Register(agentStatsConnectionMedianLatencyGauge) if err != nil { return nil, err } agentStatsSessionCountJetBrainsGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agentstats", Name: "session_count_jetbrains", Help: "The number of session established by JetBrains", }, aggregateByLabels)) err = registerer.Register(agentStatsSessionCountJetBrainsGauge) if err != nil { return nil, err } agentStatsSessionCountReconnectingPTYGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agentstats", Name: "session_count_reconnecting_pty", Help: "The number of session established by reconnecting PTY", }, aggregateByLabels)) err = registerer.Register(agentStatsSessionCountReconnectingPTYGauge) if err != nil { return nil, err } agentStatsSessionCountSSHGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agentstats", Name: "session_count_ssh", Help: "The number of session established by SSH", }, aggregateByLabels)) err = registerer.Register(agentStatsSessionCountSSHGauge) if err != nil { return nil, err } agentStatsSessionCountVSCodeGauge := NewCachedGaugeVec(prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Subsystem: "agentstats", Name: "session_count_vscode", Help: "The number of session established by VSCode", }, aggregateByLabels)) err = registerer.Register(agentStatsSessionCountVSCodeGauge) if err != nil { return nil, err } ctx, cancelFunc := context.WithCancel(ctx) done := make(chan struct{}) createdAfter := initialCreateAfter // Use time.Nanosecond to force an initial tick. It will be reset to the // correct duration after executing once. ticker := time.NewTicker(time.Nanosecond) go func() { defer close(done) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: } logger.Debug(ctx, "agent metrics collection is starting") timer := prometheus.NewTimer(metricsCollectorAgentStats) checkpoint := time.Now() var ( stats []database.GetWorkspaceAgentStatsAndLabelsRow err error ) if usage { var agentUsageStats []database.GetWorkspaceAgentUsageStatsAndLabelsRow agentUsageStats, err = db.GetWorkspaceAgentUsageStatsAndLabels(ctx, createdAfter) stats = make([]database.GetWorkspaceAgentStatsAndLabelsRow, 0, len(agentUsageStats)) for _, agentUsageStat := range agentUsageStats { stats = append(stats, database.GetWorkspaceAgentStatsAndLabelsRow(agentUsageStat)) } } else { stats, err = db.GetWorkspaceAgentStatsAndLabels(ctx, createdAfter) } if err != nil { logger.Error(ctx, "can't get agent stats", slog.Error(err)) } else { for _, agentStat := range stats { var labelValues []string for _, label := range aggregateByLabels { switch label { case agentmetrics.LabelUsername: labelValues = append(labelValues, agentStat.Username) case agentmetrics.LabelWorkspaceName: labelValues = append(labelValues, agentStat.WorkspaceName) case agentmetrics.LabelAgentName: labelValues = append(labelValues, agentStat.AgentName) } } agentStatsRxBytesGauge.WithLabelValues(VectorOperationAdd, float64(agentStat.RxBytes), labelValues...) agentStatsTxBytesGauge.WithLabelValues(VectorOperationAdd, float64(agentStat.TxBytes), labelValues...) agentStatsConnectionCountGauge.WithLabelValues(VectorOperationSet, float64(agentStat.ConnectionCount), labelValues...) agentStatsConnectionMedianLatencyGauge.WithLabelValues(VectorOperationSet, agentStat.ConnectionMedianLatencyMS/1000.0 /* (to seconds) */, labelValues...) agentStatsSessionCountJetBrainsGauge.WithLabelValues(VectorOperationSet, float64(agentStat.SessionCountJetBrains), labelValues...) agentStatsSessionCountReconnectingPTYGauge.WithLabelValues(VectorOperationSet, float64(agentStat.SessionCountReconnectingPTY), labelValues...) agentStatsSessionCountSSHGauge.WithLabelValues(VectorOperationSet, float64(agentStat.SessionCountSSH), labelValues...) agentStatsSessionCountVSCodeGauge.WithLabelValues(VectorOperationSet, float64(agentStat.SessionCountVSCode), labelValues...) } if len(stats) > 0 { agentStatsRxBytesGauge.Commit() agentStatsTxBytesGauge.Commit() agentStatsConnectionCountGauge.Commit() agentStatsConnectionMedianLatencyGauge.Commit() agentStatsSessionCountJetBrainsGauge.Commit() agentStatsSessionCountReconnectingPTYGauge.Commit() agentStatsSessionCountSSHGauge.Commit() agentStatsSessionCountVSCodeGauge.Commit() } } logger.Debug(ctx, "agent metrics collection is done", slog.F("len", len(stats))) timer.ObserveDuration() createdAfter = checkpoint ticker.Reset(duration) } }() return func() { cancelFunc() <-done }, nil } // Experiments registers a metric which indicates whether each experiment is enabled or not. func Experiments(registerer prometheus.Registerer, active codersdk.Experiments) error { experimentsGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "coderd", Name: "experiments", Help: "Indicates whether each experiment is enabled (1) or not (0)", }, []string{"experiment"}) if err := registerer.Register(experimentsGauge); err != nil { return err } for _, exp := range codersdk.ExperimentsAll { var val float64 for _, enabled := range active { if exp == enabled { val = 1 break } } experimentsGauge.WithLabelValues(string(exp)).Set(val) } return nil } // filterAcceptableAgentLabels handles a slightly messy situation whereby `prometheus-aggregate-agent-stats-by` can control on // which labels agent stats are aggregated, but for these specific metrics in this file there is no `template` label value, // and therefore we have to exclude it from the list of acceptable labels. func filterAcceptableAgentLabels(labels []string) []string { out := make([]string, 0, len(labels)) for _, label := range labels { if label != agentmetrics.LabelTemplateName { out = append(out, label) } } return out }