fix: handle new agent stat format correctly (#14576)

---------

Co-authored-by: Ethan Dickson <ethan@coder.com>
This commit is contained in:
Garrett Delfosse
2024-09-19 11:52:14 -04:00
committed by GitHub
parent 37885e2e82
commit 922f4c545f
28 changed files with 1495 additions and 70 deletions

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"reflect"
"regexp"
"sort"
@ -254,6 +255,19 @@ type data struct {
defaultProxyIconURL string
}
func tryPercentile(fs []float64, p float64) float64 {
if len(fs) == 0 {
return -1
}
sort.Float64s(fs)
pos := p * (float64(len(fs)) - 1) / 100
lower, upper := int(pos), int(math.Ceil(pos))
if lower == upper {
return fs[lower]
}
return fs[lower] + (fs[upper]-fs[lower])*(pos-float64(lower))
}
func validateDatabaseTypeWithValid(v reflect.Value) (handled bool, err error) {
if v.Kind() == reflect.Struct {
return false, nil
@ -2533,20 +2547,68 @@ func (q *FakeQuerier) GetDeploymentWorkspaceAgentStats(_ context.Context, create
latencies = append(latencies, agentStat.ConnectionMedianLatencyMS)
}
tryPercentile := func(fs []float64, p float64) float64 {
if len(fs) == 0 {
return -1
}
sort.Float64s(fs)
return fs[int(float64(len(fs))*p/100)]
}
stat.WorkspaceConnectionLatency50 = tryPercentile(latencies, 50)
stat.WorkspaceConnectionLatency95 = tryPercentile(latencies, 95)
return stat, nil
}
func (q *FakeQuerier) GetDeploymentWorkspaceAgentUsageStats(_ context.Context, createdAt time.Time) (database.GetDeploymentWorkspaceAgentUsageStatsRow, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
stat := database.GetDeploymentWorkspaceAgentUsageStatsRow{}
sessions := make(map[uuid.UUID]database.WorkspaceAgentStat)
agentStatsCreatedAfter := make([]database.WorkspaceAgentStat, 0)
for _, agentStat := range q.workspaceAgentStats {
// WHERE workspace_agent_stats.created_at > $1
if agentStat.CreatedAt.After(createdAt) {
agentStatsCreatedAfter = append(agentStatsCreatedAfter, agentStat)
}
// WHERE
// created_at > $1
// AND created_at < date_trunc('minute', now()) -- Exclude current partial minute
// AND usage = true
if agentStat.Usage &&
(agentStat.CreatedAt.After(createdAt) || agentStat.CreatedAt.Equal(createdAt)) &&
agentStat.CreatedAt.Before(time.Now().Truncate(time.Minute)) {
val, ok := sessions[agentStat.AgentID]
if !ok {
sessions[agentStat.AgentID] = agentStat
} else if agentStat.CreatedAt.After(val.CreatedAt) {
sessions[agentStat.AgentID] = agentStat
} else if agentStat.CreatedAt.Truncate(time.Minute).Equal(val.CreatedAt.Truncate(time.Minute)) {
val.SessionCountVSCode += agentStat.SessionCountVSCode
val.SessionCountJetBrains += agentStat.SessionCountJetBrains
val.SessionCountReconnectingPTY += agentStat.SessionCountReconnectingPTY
val.SessionCountSSH += agentStat.SessionCountSSH
sessions[agentStat.AgentID] = val
}
}
}
latencies := make([]float64, 0)
for _, agentStat := range agentStatsCreatedAfter {
if agentStat.ConnectionMedianLatencyMS <= 0 {
continue
}
stat.WorkspaceRxBytes += agentStat.RxBytes
stat.WorkspaceTxBytes += agentStat.TxBytes
latencies = append(latencies, agentStat.ConnectionMedianLatencyMS)
}
stat.WorkspaceConnectionLatency50 = tryPercentile(latencies, 50)
stat.WorkspaceConnectionLatency95 = tryPercentile(latencies, 95)
for _, agentStat := range sessions {
stat.SessionCountVSCode += agentStat.SessionCountVSCode
stat.SessionCountJetBrains += agentStat.SessionCountJetBrains
stat.SessionCountReconnectingPTY += agentStat.SessionCountReconnectingPTY
stat.SessionCountSSH += agentStat.SessionCountSSH
}
return stat, nil
}
func (q *FakeQuerier) GetDeploymentWorkspaceStats(ctx context.Context) (database.GetDeploymentWorkspaceStatsRow, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
@ -4238,14 +4300,6 @@ func (q *FakeQuerier) GetTemplateAverageBuildTime(ctx context.Context, arg datab
}
}
tryPercentile := func(fs []float64, p float64) float64 {
if len(fs) == 0 {
return -1
}
sort.Float64s(fs)
return fs[int(float64(len(fs))*p/100)]
}
var row database.GetTemplateAverageBuildTimeRow
row.Delete50, row.Delete95 = tryPercentile(deleteTimes, 50), tryPercentile(deleteTimes, 95)
row.Stop50, row.Stop95 = tryPercentile(stopTimes, 50), tryPercentile(stopTimes, 95)
@ -5273,14 +5327,6 @@ func (q *FakeQuerier) GetUserLatencyInsights(_ context.Context, arg database.Get
seenTemplatesByUserID[stat.UserID] = uniqueSortedUUIDs(append(seenTemplatesByUserID[stat.UserID], stat.TemplateID))
}
tryPercentile := func(fs []float64, p float64) float64 {
if len(fs) == 0 {
return -1
}
sort.Float64s(fs)
return fs[int(float64(len(fs))*p/100)]
}
var rows []database.GetUserLatencyInsightsRow
for userID, latencies := range latenciesByUserID {
user, err := q.getUserByIDNoLock(userID)
@ -5794,14 +5840,6 @@ func (q *FakeQuerier) GetWorkspaceAgentStats(_ context.Context, createdAfter tim
latenciesByAgent[agentStat.AgentID] = append(latenciesByAgent[agentStat.AgentID], agentStat.ConnectionMedianLatencyMS)
}
tryPercentile := func(fs []float64, p float64) float64 {
if len(fs) == 0 {
return -1
}
sort.Float64s(fs)
return fs[int(float64(len(fs))*p/100)]
}
for _, stat := range statByAgent {
stat.AggregatedFrom = minimumDateByAgent[stat.AgentID]
statByAgent[stat.AgentID] = stat
@ -5893,6 +5931,218 @@ func (q *FakeQuerier) GetWorkspaceAgentStatsAndLabels(ctx context.Context, creat
return stats, nil
}
func (q *FakeQuerier) GetWorkspaceAgentUsageStats(_ context.Context, createdAt time.Time) ([]database.GetWorkspaceAgentUsageStatsRow, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
type agentStatsKey struct {
UserID uuid.UUID
AgentID uuid.UUID
WorkspaceID uuid.UUID
TemplateID uuid.UUID
}
type minuteStatsKey struct {
agentStatsKey
MinuteBucket time.Time
}
latestAgentStats := map[agentStatsKey]database.GetWorkspaceAgentUsageStatsRow{}
latestAgentLatencies := map[agentStatsKey][]float64{}
for _, agentStat := range q.workspaceAgentStats {
key := agentStatsKey{
UserID: agentStat.UserID,
AgentID: agentStat.AgentID,
WorkspaceID: agentStat.WorkspaceID,
TemplateID: agentStat.TemplateID,
}
if agentStat.CreatedAt.After(createdAt) {
val, ok := latestAgentStats[key]
if ok {
val.WorkspaceRxBytes += agentStat.RxBytes
val.WorkspaceTxBytes += agentStat.TxBytes
latestAgentStats[key] = val
} else {
latestAgentStats[key] = database.GetWorkspaceAgentUsageStatsRow{
UserID: agentStat.UserID,
AgentID: agentStat.AgentID,
WorkspaceID: agentStat.WorkspaceID,
TemplateID: agentStat.TemplateID,
AggregatedFrom: createdAt,
WorkspaceRxBytes: agentStat.RxBytes,
WorkspaceTxBytes: agentStat.TxBytes,
}
}
latencies, ok := latestAgentLatencies[key]
if !ok {
latestAgentLatencies[key] = []float64{agentStat.ConnectionMedianLatencyMS}
} else {
latestAgentLatencies[key] = append(latencies, agentStat.ConnectionMedianLatencyMS)
}
}
}
for key, latencies := range latestAgentLatencies {
val, ok := latestAgentStats[key]
if ok {
val.WorkspaceConnectionLatency50 = tryPercentile(latencies, 50)
val.WorkspaceConnectionLatency95 = tryPercentile(latencies, 95)
}
latestAgentStats[key] = val
}
type bucketRow struct {
database.GetWorkspaceAgentUsageStatsRow
MinuteBucket time.Time
}
minuteBuckets := make(map[minuteStatsKey]bucketRow)
for _, agentStat := range q.workspaceAgentStats {
if agentStat.Usage &&
(agentStat.CreatedAt.After(createdAt) || agentStat.CreatedAt.Equal(createdAt)) &&
agentStat.CreatedAt.Before(time.Now().Truncate(time.Minute)) {
key := minuteStatsKey{
agentStatsKey: agentStatsKey{
UserID: agentStat.UserID,
AgentID: agentStat.AgentID,
WorkspaceID: agentStat.WorkspaceID,
TemplateID: agentStat.TemplateID,
},
MinuteBucket: agentStat.CreatedAt.Truncate(time.Minute),
}
val, ok := minuteBuckets[key]
if ok {
val.SessionCountVSCode += agentStat.SessionCountVSCode
val.SessionCountJetBrains += agentStat.SessionCountJetBrains
val.SessionCountReconnectingPTY += agentStat.SessionCountReconnectingPTY
val.SessionCountSSH += agentStat.SessionCountSSH
minuteBuckets[key] = val
} else {
minuteBuckets[key] = bucketRow{
GetWorkspaceAgentUsageStatsRow: database.GetWorkspaceAgentUsageStatsRow{
UserID: agentStat.UserID,
AgentID: agentStat.AgentID,
WorkspaceID: agentStat.WorkspaceID,
TemplateID: agentStat.TemplateID,
SessionCountVSCode: agentStat.SessionCountVSCode,
SessionCountSSH: agentStat.SessionCountSSH,
SessionCountJetBrains: agentStat.SessionCountJetBrains,
SessionCountReconnectingPTY: agentStat.SessionCountReconnectingPTY,
},
MinuteBucket: agentStat.CreatedAt.Truncate(time.Minute),
}
}
}
}
// Get the latest minute bucket for each agent.
latestBuckets := make(map[uuid.UUID]bucketRow)
for key, bucket := range minuteBuckets {
latest, ok := latestBuckets[key.AgentID]
if !ok || key.MinuteBucket.After(latest.MinuteBucket) {
latestBuckets[key.AgentID] = bucket
}
}
for key, stat := range latestAgentStats {
bucket, ok := latestBuckets[stat.AgentID]
if ok {
stat.SessionCountVSCode = bucket.SessionCountVSCode
stat.SessionCountJetBrains = bucket.SessionCountJetBrains
stat.SessionCountReconnectingPTY = bucket.SessionCountReconnectingPTY
stat.SessionCountSSH = bucket.SessionCountSSH
}
latestAgentStats[key] = stat
}
return maps.Values(latestAgentStats), nil
}
func (q *FakeQuerier) GetWorkspaceAgentUsageStatsAndLabels(_ context.Context, createdAt time.Time) ([]database.GetWorkspaceAgentUsageStatsAndLabelsRow, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
type statsKey struct {
AgentID uuid.UUID
UserID uuid.UUID
WorkspaceID uuid.UUID
}
latestAgentStats := map[statsKey]database.WorkspaceAgentStat{}
maxConnMedianLatency := 0.0
for _, agentStat := range q.workspaceAgentStats {
key := statsKey{
AgentID: agentStat.AgentID,
UserID: agentStat.UserID,
WorkspaceID: agentStat.WorkspaceID,
}
// WHERE workspace_agent_stats.created_at > $1
// GROUP BY user_id, agent_id, workspace_id
if agentStat.CreatedAt.After(createdAt) {
val, ok := latestAgentStats[key]
if !ok {
val = agentStat
val.SessionCountJetBrains = 0
val.SessionCountReconnectingPTY = 0
val.SessionCountSSH = 0
val.SessionCountVSCode = 0
} else {
val.RxBytes += agentStat.RxBytes
val.TxBytes += agentStat.TxBytes
}
if agentStat.ConnectionMedianLatencyMS > maxConnMedianLatency {
val.ConnectionMedianLatencyMS = agentStat.ConnectionMedianLatencyMS
}
latestAgentStats[key] = val
}
// WHERE usage = true AND created_at > now() - '1 minute'::interval
// GROUP BY user_id, agent_id, workspace_id
if agentStat.Usage && agentStat.CreatedAt.After(time.Now().Add(-time.Minute)) {
val, ok := latestAgentStats[key]
if !ok {
latestAgentStats[key] = agentStat
} else {
val.SessionCountVSCode += agentStat.SessionCountVSCode
val.SessionCountJetBrains += agentStat.SessionCountJetBrains
val.SessionCountReconnectingPTY += agentStat.SessionCountReconnectingPTY
val.SessionCountSSH += agentStat.SessionCountSSH
val.ConnectionCount += agentStat.ConnectionCount
latestAgentStats[key] = val
}
}
}
stats := make([]database.GetWorkspaceAgentUsageStatsAndLabelsRow, 0, len(latestAgentStats))
for key, agentStat := range latestAgentStats {
user, err := q.getUserByIDNoLock(key.UserID)
if err != nil {
return nil, err
}
workspace, err := q.getWorkspaceByIDNoLock(context.Background(), key.WorkspaceID)
if err != nil {
return nil, err
}
agent, err := q.getWorkspaceAgentByIDNoLock(context.Background(), key.AgentID)
if err != nil {
return nil, err
}
stats = append(stats, database.GetWorkspaceAgentUsageStatsAndLabelsRow{
Username: user.Username,
AgentName: agent.Name,
WorkspaceName: workspace.Name,
RxBytes: agentStat.RxBytes,
TxBytes: agentStat.TxBytes,
SessionCountVSCode: agentStat.SessionCountVSCode,
SessionCountSSH: agentStat.SessionCountSSH,
SessionCountJetBrains: agentStat.SessionCountJetBrains,
SessionCountReconnectingPTY: agentStat.SessionCountReconnectingPTY,
ConnectionCount: agentStat.ConnectionCount,
ConnectionMedianLatencyMS: agentStat.ConnectionMedianLatencyMS,
})
}
return stats, nil
}
func (q *FakeQuerier) GetWorkspaceAgentsByResourceIDs(ctx context.Context, resourceIDs []uuid.UUID) ([]database.WorkspaceAgent, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
@ -7641,6 +7891,7 @@ func (q *FakeQuerier) InsertWorkspaceAgentStats(_ context.Context, arg database.
SessionCountReconnectingPTY: arg.SessionCountReconnectingPTY[i],
SessionCountSSH: arg.SessionCountSSH[i],
ConnectionMedianLatencyMS: arg.ConnectionMedianLatencyMS[i],
Usage: arg.Usage[i],
}
q.workspaceAgentStats = append(q.workspaceAgentStats, stat)
}