feat(coderd/database): add template_usage_stats table and rollup query (#12664)

Add `template_usage_stats` table for aggregating tempalte usage data.
Data is rolled up by the `UpsertTemplateUsageStats` query, which fetches
data from the `workspace_agent_stats` and `workspace_app_stats` tables.
This commit is contained in:
Mathias Fredriksson
2024-03-22 18:33:34 +02:00
committed by GitHub
parent a6b8f381f0
commit 04f0510b09
16 changed files with 1459 additions and 0 deletions

View File

@ -162,6 +162,7 @@ type data struct {
templateVersionParameters []database.TemplateVersionParameter
templateVersionVariables []database.TemplateVersionVariable
templates []database.TemplateTable
templateUsageStats []database.TemplateUsageStat
workspaceAgents []database.WorkspaceAgent
workspaceAgentMetadata []database.WorkspaceAgentMetadatum
workspaceAgentLogs []database.WorkspaceAgentLog
@ -3584,6 +3585,34 @@ func (q *FakeQuerier) GetTemplateParameterInsights(ctx context.Context, arg data
return rows, nil
}
func (q *FakeQuerier) GetTemplateUsageStats(_ context.Context, arg database.GetTemplateUsageStatsParams) ([]database.TemplateUsageStat, error) {
err := validateDatabaseType(arg)
if err != nil {
return nil, err
}
q.mutex.RLock()
defer q.mutex.RUnlock()
var stats []database.TemplateUsageStat
for _, stat := range q.templateUsageStats {
// Exclude all chunks that don't fall exactly within the range.
if stat.StartTime.Before(arg.StartTime) || stat.EndTime.After(arg.EndTime) {
continue
}
if len(arg.TemplateIDs) > 0 && !slices.Contains(arg.TemplateIDs, stat.TemplateID) {
continue
}
stats = append(stats, stat)
}
if len(stats) == 0 {
return nil, sql.ErrNoRows
}
return stats, nil
}
func (q *FakeQuerier) GetTemplateVersionByID(ctx context.Context, templateVersionID uuid.UUID) (database.TemplateVersion, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
@ -7923,6 +7952,598 @@ func (*FakeQuerier) UpsertTailnetTunnel(_ context.Context, arg database.UpsertTa
return database.TailnetTunnel{}, ErrUnimplemented
}
func (q *FakeQuerier) UpsertTemplateUsageStats(ctx context.Context) error {
q.mutex.Lock()
defer q.mutex.Unlock()
/*
WITH
*/
/*
latest_start AS (
SELECT
-- Truncate to hour so that we always look at even ranges of data.
date_trunc('hour', COALESCE(
MAX(start_time) - '1 hour'::interval),
-- Fallback when there are no template usage stats yet.
-- App stats can exist before this, but not agent stats,
-- limit the lookback to avoid inconsistency.
(SELECT MIN(created_at) FROM workspace_agent_stats)
)) AS t
FROM
template_usage_stats
),
*/
now := time.Now()
latestStart := time.Time{}
for _, stat := range q.templateUsageStats {
if stat.StartTime.After(latestStart) {
latestStart = stat.StartTime.Add(-time.Hour)
}
}
if latestStart.IsZero() {
for _, stat := range q.workspaceAgentStats {
if latestStart.IsZero() || stat.CreatedAt.Before(latestStart) {
latestStart = stat.CreatedAt
}
}
}
if latestStart.IsZero() {
return nil
}
latestStart = latestStart.Truncate(time.Hour)
/*
workspace_app_stat_buckets AS (
SELECT
-- Truncate the minute to the nearest half hour, this is the bucket size
-- for the data.
date_trunc('hour', s.minute_bucket) + trunc(date_part('minute', s.minute_bucket) / 30) * 30 * '1 minute'::interval AS time_bucket,
w.template_id,
was.user_id,
-- Both app stats and agent stats track web terminal usage, but
-- by different means. The app stats value should be more
-- accurate so we don't want to discard it just yet.
CASE
WHEN was.access_method = 'terminal'
THEN '[terminal]' -- Unique name, app names can't contain brackets.
ELSE was.slug_or_port
END AS app_name,
COUNT(DISTINCT s.minute_bucket) AS app_minutes,
-- Store each unique minute bucket for later merge between datasets.
array_agg(DISTINCT s.minute_bucket) AS minute_buckets
FROM
workspace_app_stats AS was
JOIN
workspaces AS w
ON
w.id = was.workspace_id
-- Generate a series of minute buckets for each session for computing the
-- mintes/bucket.
CROSS JOIN
generate_series(
date_trunc('minute', was.session_started_at),
-- Subtract 1 microsecond to avoid creating an extra series.
date_trunc('minute', was.session_ended_at - '1 microsecond'::interval),
'1 minute'::interval
) AS s(minute_bucket)
WHERE
-- s.minute_bucket >= @start_time::timestamptz
-- AND s.minute_bucket < @end_time::timestamptz
s.minute_bucket >= (SELECT t FROM latest_start)
AND s.minute_bucket < NOW()
GROUP BY
time_bucket, w.template_id, was.user_id, was.access_method, was.slug_or_port
),
*/
type workspaceAppStatGroupBy struct {
TimeBucket time.Time
TemplateID uuid.UUID
UserID uuid.UUID
AccessMethod string
SlugOrPort string
}
type workspaceAppStatRow struct {
workspaceAppStatGroupBy
AppName string
AppMinutes int
MinuteBuckets map[time.Time]struct{}
}
workspaceAppStatRows := make(map[workspaceAppStatGroupBy]workspaceAppStatRow)
for _, was := range q.workspaceAppStats {
// Preflight: s.minute_bucket >= (SELECT t FROM latest_start)
if was.SessionEndedAt.Before(latestStart) {
continue
}
// JOIN workspaces
w, err := q.getWorkspaceByIDNoLock(ctx, was.WorkspaceID)
if err != nil {
return err
}
// CROSS JOIN generate_series
for t := was.SessionStartedAt; t.Before(was.SessionEndedAt); t = t.Add(time.Minute) {
// WHERE
if t.Before(latestStart) || t.After(now) || t.Equal(now) {
continue
}
bucket := t.Truncate(30 * time.Minute)
// GROUP BY
key := workspaceAppStatGroupBy{
TimeBucket: bucket,
TemplateID: w.TemplateID,
UserID: was.UserID,
AccessMethod: was.AccessMethod,
SlugOrPort: was.SlugOrPort,
}
// SELECT
row, ok := workspaceAppStatRows[key]
if !ok {
row = workspaceAppStatRow{
workspaceAppStatGroupBy: key,
AppName: was.SlugOrPort,
AppMinutes: 0,
MinuteBuckets: make(map[time.Time]struct{}),
}
if was.AccessMethod == "terminal" {
row.AppName = "[terminal]"
}
}
row.MinuteBuckets[t] = struct{}{}
row.AppMinutes = len(row.MinuteBuckets)
workspaceAppStatRows[key] = row
}
}
/*
agent_stats_buckets AS (
SELECT
-- Truncate the minute to the nearest half hour, this is the bucket size
-- for the data.
date_trunc('hour', created_at) + trunc(date_part('minute', created_at) / 30) * 30 * '1 minute'::interval AS time_bucket,
template_id,
user_id,
-- Store each unique minute bucket for later merge between datasets.
array_agg(
DISTINCT CASE
WHEN
session_count_ssh > 0
-- TODO(mafredri): Enable when we have the column.
-- OR session_count_sftp > 0
OR session_count_reconnecting_pty > 0
OR session_count_vscode > 0
OR session_count_jetbrains > 0
THEN
date_trunc('minute', created_at)
ELSE
NULL
END
) AS minute_buckets,
COUNT(DISTINCT CASE WHEN session_count_ssh > 0 THEN date_trunc('minute', created_at) ELSE NULL END) AS ssh_mins,
-- TODO(mafredri): Enable when we have the column.
-- COUNT(DISTINCT CASE WHEN session_count_sftp > 0 THEN date_trunc('minute', created_at) ELSE NULL END) AS sftp_mins,
COUNT(DISTINCT CASE WHEN session_count_reconnecting_pty > 0 THEN date_trunc('minute', created_at) ELSE NULL END) AS reconnecting_pty_mins,
COUNT(DISTINCT CASE WHEN session_count_vscode > 0 THEN date_trunc('minute', created_at) ELSE NULL END) AS vscode_mins,
COUNT(DISTINCT CASE WHEN session_count_jetbrains > 0 THEN date_trunc('minute', created_at) ELSE NULL END) AS jetbrains_mins,
-- NOTE(mafredri): The agent stats are currently very unreliable, and
-- sometimes the connections are missing, even during active sessions.
-- Since we can't fully rely on this, we check for "any connection
-- during this half-hour". A better solution here would be preferable.
MAX(connection_count) > 0 AS has_connection
FROM
workspace_agent_stats
WHERE
-- created_at >= @start_time::timestamptz
-- AND created_at < @end_time::timestamptz
created_at >= (SELECT t FROM latest_start)
AND created_at < NOW()
-- Inclusion criteria to filter out empty results.
AND (
session_count_ssh > 0
-- TODO(mafredri): Enable when we have the column.
-- OR session_count_sftp > 0
OR session_count_reconnecting_pty > 0
OR session_count_vscode > 0
OR session_count_jetbrains > 0
)
GROUP BY
time_bucket, template_id, user_id
),
*/
type agentStatGroupBy struct {
TimeBucket time.Time
TemplateID uuid.UUID
UserID uuid.UUID
}
type agentStatRow struct {
agentStatGroupBy
MinuteBuckets map[time.Time]struct{}
SSHMinuteBuckets map[time.Time]struct{}
SSHMins int
SFTPMinuteBuckets map[time.Time]struct{}
SFTPMins int
ReconnectingPTYMinuteBuckets map[time.Time]struct{}
ReconnectingPTYMins int
VSCodeMinuteBuckets map[time.Time]struct{}
VSCodeMins int
JetBrainsMinuteBuckets map[time.Time]struct{}
JetBrainsMins int
HasConnection bool
}
agentStatRows := make(map[agentStatGroupBy]agentStatRow)
for _, was := range q.workspaceAgentStats {
// WHERE
if was.CreatedAt.Before(latestStart) || was.CreatedAt.After(now) || was.CreatedAt.Equal(now) {
continue
}
if was.SessionCountSSH == 0 && was.SessionCountReconnectingPTY == 0 && was.SessionCountVSCode == 0 && was.SessionCountJetBrains == 0 {
continue
}
// GROUP BY
key := agentStatGroupBy{
TimeBucket: was.CreatedAt.Truncate(30 * time.Minute),
TemplateID: was.TemplateID,
UserID: was.UserID,
}
// SELECT
row, ok := agentStatRows[key]
if !ok {
row = agentStatRow{
agentStatGroupBy: key,
MinuteBuckets: make(map[time.Time]struct{}),
SSHMinuteBuckets: make(map[time.Time]struct{}),
SFTPMinuteBuckets: make(map[time.Time]struct{}),
ReconnectingPTYMinuteBuckets: make(map[time.Time]struct{}),
VSCodeMinuteBuckets: make(map[time.Time]struct{}),
JetBrainsMinuteBuckets: make(map[time.Time]struct{}),
}
}
minute := was.CreatedAt.Truncate(time.Minute)
row.MinuteBuckets[minute] = struct{}{}
if was.SessionCountSSH > 0 {
row.SSHMinuteBuckets[minute] = struct{}{}
row.SSHMins = len(row.SSHMinuteBuckets)
}
// TODO(mafredri): Enable when we have the column.
// if was.SessionCountSFTP > 0 {
// row.SFTPMinuteBuckets[minute] = struct{}{}
// row.SFTPMins = len(row.SFTPMinuteBuckets)
// }
_ = row.SFTPMinuteBuckets
if was.SessionCountReconnectingPTY > 0 {
row.ReconnectingPTYMinuteBuckets[minute] = struct{}{}
row.ReconnectingPTYMins = len(row.ReconnectingPTYMinuteBuckets)
}
if was.SessionCountVSCode > 0 {
row.VSCodeMinuteBuckets[minute] = struct{}{}
row.VSCodeMins = len(row.VSCodeMinuteBuckets)
}
if was.SessionCountJetBrains > 0 {
row.JetBrainsMinuteBuckets[minute] = struct{}{}
row.JetBrainsMins = len(row.JetBrainsMinuteBuckets)
}
if !row.HasConnection {
row.HasConnection = was.ConnectionCount > 0
}
agentStatRows[key] = row
}
/*
stats AS (
SELECT
stats.time_bucket AS start_time,
stats.time_bucket + '30 minutes'::interval AS end_time,
stats.template_id,
stats.user_id,
-- Sum/distinct to handle zero/duplicate values due union and to unnest.
COUNT(DISTINCT minute_bucket) AS usage_mins,
array_agg(DISTINCT minute_bucket) AS minute_buckets,
SUM(DISTINCT stats.ssh_mins) AS ssh_mins,
SUM(DISTINCT stats.sftp_mins) AS sftp_mins,
SUM(DISTINCT stats.reconnecting_pty_mins) AS reconnecting_pty_mins,
SUM(DISTINCT stats.vscode_mins) AS vscode_mins,
SUM(DISTINCT stats.jetbrains_mins) AS jetbrains_mins,
-- This is what we unnested, re-nest as json.
jsonb_object_agg(stats.app_name, stats.app_minutes) FILTER (WHERE stats.app_name IS NOT NULL) AS app_usage_mins
FROM (
SELECT
time_bucket,
template_id,
user_id,
0 AS ssh_mins,
0 AS sftp_mins,
0 AS reconnecting_pty_mins,
0 AS vscode_mins,
0 AS jetbrains_mins,
app_name,
app_minutes,
minute_buckets
FROM
workspace_app_stat_buckets
UNION ALL
SELECT
time_bucket,
template_id,
user_id,
ssh_mins,
-- TODO(mafredri): Enable when we have the column.
0 AS sftp_mins,
reconnecting_pty_mins,
vscode_mins,
jetbrains_mins,
NULL AS app_name,
NULL AS app_minutes,
minute_buckets
FROM
agent_stats_buckets
WHERE
-- See note in the agent_stats_buckets CTE.
has_connection
) AS stats, unnest(minute_buckets) AS minute_bucket
GROUP BY
stats.time_bucket, stats.template_id, stats.user_id
),
*/
type statsGroupBy struct {
TimeBucket time.Time
TemplateID uuid.UUID
UserID uuid.UUID
}
type statsRow struct {
statsGroupBy
UsageMinuteBuckets map[time.Time]struct{}
UsageMins int
SSHMins int
SFTPMins int
ReconnectingPTYMins int
VSCodeMins int
JetBrainsMins int
AppUsageMinutes map[string]int
}
statsRows := make(map[statsGroupBy]statsRow)
for _, was := range workspaceAppStatRows {
// GROUP BY
key := statsGroupBy{
TimeBucket: was.TimeBucket,
TemplateID: was.TemplateID,
UserID: was.UserID,
}
// SELECT
row, ok := statsRows[key]
if !ok {
row = statsRow{
statsGroupBy: key,
UsageMinuteBuckets: make(map[time.Time]struct{}),
AppUsageMinutes: make(map[string]int),
}
}
for t := range was.MinuteBuckets {
row.UsageMinuteBuckets[t] = struct{}{}
}
row.UsageMins = len(row.UsageMinuteBuckets)
row.AppUsageMinutes[was.AppName] = was.AppMinutes
statsRows[key] = row
}
for _, was := range agentStatRows {
// GROUP BY
key := statsGroupBy{
TimeBucket: was.TimeBucket,
TemplateID: was.TemplateID,
UserID: was.UserID,
}
// SELECT
row, ok := statsRows[key]
if !ok {
row = statsRow{
statsGroupBy: key,
UsageMinuteBuckets: make(map[time.Time]struct{}),
AppUsageMinutes: make(map[string]int),
}
}
for t := range was.MinuteBuckets {
row.UsageMinuteBuckets[t] = struct{}{}
}
row.UsageMins = len(row.UsageMinuteBuckets)
row.SSHMins += was.SSHMins
row.SFTPMins += was.SFTPMins
row.ReconnectingPTYMins += was.ReconnectingPTYMins
row.VSCodeMins += was.VSCodeMins
row.JetBrainsMins += was.JetBrainsMins
statsRows[key] = row
}
/*
minute_buckets AS (
-- Create distinct minute buckets for user-activity, so we can filter out
-- irrelevant latencies.
SELECT DISTINCT ON (stats.start_time, stats.template_id, stats.user_id, minute_bucket)
stats.start_time,
stats.template_id,
stats.user_id,
minute_bucket
FROM
stats, unnest(minute_buckets) AS minute_bucket
),
latencies AS (
-- Select all non-zero latencies for all the minutes that a user used the
-- workspace in some way.
SELECT
mb.start_time,
mb.template_id,
mb.user_id,
-- TODO(mafredri): We're doing medians on medians here, we may want to
-- improve upon this at some point.
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY was.connection_median_latency_ms)::real AS median_latency_ms
FROM
minute_buckets AS mb
JOIN
workspace_agent_stats AS was
ON
date_trunc('minute', was.created_at) = mb.minute_bucket
AND was.template_id = mb.template_id
AND was.user_id = mb.user_id
AND was.connection_median_latency_ms >= 0
GROUP BY
mb.start_time, mb.template_id, mb.user_id
)
*/
type latenciesGroupBy struct {
StartTime time.Time
TemplateID uuid.UUID
UserID uuid.UUID
}
type latenciesRow struct {
latenciesGroupBy
Latencies []float64
MedianLatencyMS float64
}
latenciesRows := make(map[latenciesGroupBy]latenciesRow)
for _, stat := range statsRows {
for t := range stat.UsageMinuteBuckets {
// GROUP BY
key := latenciesGroupBy{
StartTime: stat.TimeBucket,
TemplateID: stat.TemplateID,
UserID: stat.UserID,
}
// JOIN
for _, was := range q.workspaceAgentStats {
if !t.Equal(was.CreatedAt.Truncate(time.Minute)) {
continue
}
if was.TemplateID != stat.TemplateID || was.UserID != stat.UserID {
continue
}
if was.ConnectionMedianLatencyMS < 0 {
continue
}
// SELECT
row, ok := latenciesRows[key]
if !ok {
row = latenciesRow{
latenciesGroupBy: key,
}
}
row.Latencies = append(row.Latencies, was.ConnectionMedianLatencyMS)
sort.Float64s(row.Latencies)
if len(row.Latencies) == 1 {
row.MedianLatencyMS = was.ConnectionMedianLatencyMS
} else if len(row.Latencies)%2 == 0 {
row.MedianLatencyMS = (row.Latencies[len(row.Latencies)/2-1] + row.Latencies[len(row.Latencies)/2]) / 2
} else {
row.MedianLatencyMS = row.Latencies[len(row.Latencies)/2]
}
latenciesRows[key] = row
}
}
}
/*
INSERT INTO template_usage_stats AS tus (
start_time,
end_time,
template_id,
user_id,
usage_mins,
median_latency_ms,
ssh_mins,
sftp_mins,
reconnecting_pty_mins,
vscode_mins,
jetbrains_mins,
app_usage_mins
) (
SELECT
stats.start_time,
stats.end_time,
stats.template_id,
stats.user_id,
stats.usage_mins,
latencies.median_latency_ms,
stats.ssh_mins,
stats.sftp_mins,
stats.reconnecting_pty_mins,
stats.vscode_mins,
stats.jetbrains_mins,
stats.app_usage_mins
FROM
stats
LEFT JOIN
latencies
ON
-- The latencies group-by ensures there at most one row.
latencies.start_time = stats.start_time
AND latencies.template_id = stats.template_id
AND latencies.user_id = stats.user_id
)
ON CONFLICT
(start_time, template_id, user_id)
DO UPDATE
SET
usage_mins = EXCLUDED.usage_mins,
median_latency_ms = EXCLUDED.median_latency_ms,
ssh_mins = EXCLUDED.ssh_mins,
sftp_mins = EXCLUDED.sftp_mins,
reconnecting_pty_mins = EXCLUDED.reconnecting_pty_mins,
vscode_mins = EXCLUDED.vscode_mins,
jetbrains_mins = EXCLUDED.jetbrains_mins,
app_usage_mins = EXCLUDED.app_usage_mins
WHERE
(tus.*) IS DISTINCT FROM (EXCLUDED.*);
*/
TemplateUsageStatsInsertLoop:
for _, stat := range statsRows {
// LEFT JOIN latencies
latency, latencyOk := latenciesRows[latenciesGroupBy{
StartTime: stat.TimeBucket,
TemplateID: stat.TemplateID,
UserID: stat.UserID,
}]
// SELECT
tus := database.TemplateUsageStat{
StartTime: stat.TimeBucket,
EndTime: stat.TimeBucket.Add(30 * time.Minute),
TemplateID: stat.TemplateID,
UserID: stat.UserID,
UsageMins: int16(stat.UsageMins),
MedianLatencyMs: sql.NullFloat64{Float64: latency.MedianLatencyMS, Valid: latencyOk},
SshMins: int16(stat.SSHMins),
SftpMins: int16(stat.SFTPMins),
ReconnectingPtyMins: int16(stat.ReconnectingPTYMins),
VscodeMins: int16(stat.VSCodeMins),
JetbrainsMins: int16(stat.JetBrainsMins),
}
if len(stat.AppUsageMinutes) > 0 {
tus.AppUsageMins = make(map[string]int64, len(stat.AppUsageMinutes))
for k, v := range stat.AppUsageMinutes {
tus.AppUsageMins[k] = int64(v)
}
}
// ON CONFLICT
for i, existing := range q.templateUsageStats {
if existing.StartTime.Equal(tus.StartTime) && existing.TemplateID == tus.TemplateID && existing.UserID == tus.UserID {
q.templateUsageStats[i] = tus
continue TemplateUsageStatsInsertLoop
}
}
// INSERT INTO
q.templateUsageStats = append(q.templateUsageStats, tus)
}
return nil
}
func (q *FakeQuerier) UpsertWorkspaceAgentPortShare(_ context.Context, arg database.UpsertWorkspaceAgentPortShareParams) (database.WorkspaceAgentPortShare, error) {
err := validateDatabaseType(arg)
if err != nil {