feat(coderd): add coder_app usage stats (#9001)

Updates #8658
This commit is contained in:
Mathias Fredriksson
2023-08-16 15:22:00 +03:00
committed by GitHub
parent f3c707648c
commit 6fd9975aed
31 changed files with 1730 additions and 62 deletions

View File

@ -16,6 +16,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -1342,6 +1343,67 @@ func Run(t *testing.T, appHostIsPrimary bool, factory DeploymentFactory) {
require.Equal(t, []string{"Origin", "X-Foobar"}, deduped)
require.Equal(t, []string{"baz"}, resp.Header.Values("X-Foobar"))
})
t.Run("ReportStats", func(t *testing.T) {
t.Parallel()
flush := make(chan chan<- struct{}, 1)
reporter := &fakeStatsReporter{}
appDetails := setupProxyTest(t, &DeploymentOptions{
StatsCollectorOptions: workspaceapps.StatsCollectorOptions{
Reporter: reporter,
ReportInterval: time.Hour,
RollupWindow: time.Minute,
Flush: flush,
},
})
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
u := appDetails.PathAppURL(appDetails.Apps.Owner)
resp, err := requestWithRetries(ctx, t, appDetails.AppClient(t), http.MethodGet, u.String(), nil)
require.NoError(t, err)
defer resp.Body.Close()
_, err = io.Copy(io.Discard, resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
var stats []workspaceapps.StatsReport
require.Eventually(t, func() bool {
// Keep flushing until we get a non-empty stats report.
flushDone := make(chan struct{}, 1)
flush <- flushDone
<-flushDone
stats = reporter.stats()
return len(stats) > 0
}, testutil.WaitLong, testutil.IntervalFast, "stats not reported")
assert.Equal(t, workspaceapps.AccessMethodPath, stats[0].AccessMethod)
assert.Equal(t, "test-app-owner", stats[0].SlugOrPort)
assert.Equal(t, 1, stats[0].Requests)
})
}
type fakeStatsReporter struct {
mu sync.Mutex
s []workspaceapps.StatsReport
}
func (r *fakeStatsReporter) stats() []workspaceapps.StatsReport {
r.mu.Lock()
defer r.mu.Unlock()
return r.s
}
func (r *fakeStatsReporter) Report(_ context.Context, stats []workspaceapps.StatsReport) error {
r.mu.Lock()
r.s = append(r.s, stats...)
r.mu.Unlock()
return nil
}
func testReconnectingPTY(ctx context.Context, t *testing.T, client *codersdk.Client, opts codersdk.WorkspaceAgentReconnectingPTYOpts) {

View File

@ -21,6 +21,7 @@ import (
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/agent"
"github.com/coder/coder/coderd/coderdtest"
"github.com/coder/coder/coderd/workspaceapps"
"github.com/coder/coder/codersdk"
"github.com/coder/coder/codersdk/agentsdk"
"github.com/coder/coder/provisioner/echo"
@ -51,6 +52,8 @@ type DeploymentOptions struct {
DangerousAllowPathAppSiteOwnerAccess bool
ServeHTTPS bool
StatsCollectorOptions workspaceapps.StatsCollectorOptions
// The following fields are only used by setupProxyTestWithFactory.
noWorkspace bool
port uint16

View File

@ -19,6 +19,7 @@ import (
"cdr.dev/slog"
"github.com/coder/coder/agent/agentssh"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/httpapi"
"github.com/coder/coder/coderd/httpmw"
"github.com/coder/coder/coderd/tracing"
@ -109,7 +110,8 @@ type Server struct {
DisablePathApps bool
SecureAuthCookie bool
AgentProvider AgentProvider
AgentProvider AgentProvider
StatsCollector *StatsCollector
websocketWaitMutex sync.Mutex
websocketWaitGroup sync.WaitGroup
@ -122,6 +124,10 @@ func (s *Server) Close() error {
s.websocketWaitGroup.Wait()
s.websocketWaitMutex.Unlock()
if s.StatsCollector != nil {
_ = s.StatsCollector.Close()
}
// The caller must close the SignedTokenProvider and the AgentProvider (if
// necessary).
@ -586,6 +592,14 @@ func (s *Server) proxyWorkspaceApp(rw http.ResponseWriter, r *http.Request, appT
// end span so we don't get long lived trace data
tracing.EndHTTPSpan(r, http.StatusOK, trace.SpanFromContext(ctx))
report := newStatsReportFromSignedToken(appToken)
s.collectStats(report)
defer func() {
// We must use defer here because ServeHTTP may panic.
report.SessionEndedAt = database.Now()
s.collectStats(report)
}()
proxy.ServeHTTP(rw, r)
}
@ -678,10 +692,24 @@ func (s *Server) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
}
defer ptNetConn.Close()
log.Debug(ctx, "obtained PTY")
report := newStatsReportFromSignedToken(*appToken)
s.collectStats(report)
defer func() {
report.SessionEndedAt = database.Now()
s.collectStats(report)
}()
agentssh.Bicopy(ctx, wsNetConn, ptNetConn)
log.Debug(ctx, "pty Bicopy finished")
}
func (s *Server) collectStats(stats StatsReport) {
if s.StatsCollector != nil {
s.StatsCollector.Collect(stats)
}
}
// wsNetConn wraps net.Conn created by websocket.NetConn(). Cancel func
// is called if a read or write error is encountered.
type wsNetConn struct {

View File

@ -0,0 +1,403 @@
package workspaceapps
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/database/dbauthz"
)
const (
DefaultStatsCollectorReportInterval = 30 * time.Second
DefaultStatsCollectorRollupWindow = 1 * time.Minute
DefaultStatsDBReporterBatchSize = 1024
)
// StatsReport is a report of a workspace app session.
type StatsReport struct {
UserID uuid.UUID `json:"user_id"`
WorkspaceID uuid.UUID `json:"workspace_id"`
AgentID uuid.UUID `json:"agent_id"`
AccessMethod AccessMethod `json:"access_method"`
SlugOrPort string `json:"slug_or_port"`
SessionID uuid.UUID `json:"session_id"`
SessionStartedAt time.Time `json:"session_started_at"`
SessionEndedAt time.Time `json:"session_ended_at"` // Updated periodically while app is in use active and when the last connection is closed.
Requests int `json:"requests"`
rolledUp bool // Indicates if this report has been rolled up.
}
func newStatsReportFromSignedToken(token SignedToken) StatsReport {
return StatsReport{
UserID: token.UserID,
WorkspaceID: token.WorkspaceID,
AgentID: token.AgentID,
AccessMethod: token.AccessMethod,
SlugOrPort: token.AppSlugOrPort,
SessionID: uuid.New(),
SessionStartedAt: database.Now(),
Requests: 1,
}
}
// StatsReporter reports workspace app StatsReports.
type StatsReporter interface {
Report(context.Context, []StatsReport) error
}
var _ StatsReporter = (*StatsDBReporter)(nil)
// StatsDBReporter writes workspace app StatsReports to the database.
type StatsDBReporter struct {
db database.Store
batchSize int
}
// NewStatsDBReporter returns a new StatsDBReporter.
func NewStatsDBReporter(db database.Store, batchSize int) *StatsDBReporter {
return &StatsDBReporter{
db: db,
batchSize: batchSize,
}
}
// Report writes the given StatsReports to the database.
func (r *StatsDBReporter) Report(ctx context.Context, stats []StatsReport) error {
err := r.db.InTx(func(tx database.Store) error {
maxBatchSize := r.batchSize
if len(stats) < maxBatchSize {
maxBatchSize = len(stats)
}
batch := database.InsertWorkspaceAppStatsParams{
UserID: make([]uuid.UUID, 0, maxBatchSize),
WorkspaceID: make([]uuid.UUID, 0, maxBatchSize),
AgentID: make([]uuid.UUID, 0, maxBatchSize),
AccessMethod: make([]string, 0, maxBatchSize),
SlugOrPort: make([]string, 0, maxBatchSize),
SessionID: make([]uuid.UUID, 0, maxBatchSize),
SessionStartedAt: make([]time.Time, 0, maxBatchSize),
SessionEndedAt: make([]time.Time, 0, maxBatchSize),
Requests: make([]int32, 0, maxBatchSize),
}
for _, stat := range stats {
batch.UserID = append(batch.UserID, stat.UserID)
batch.WorkspaceID = append(batch.WorkspaceID, stat.WorkspaceID)
batch.AgentID = append(batch.AgentID, stat.AgentID)
batch.AccessMethod = append(batch.AccessMethod, string(stat.AccessMethod))
batch.SlugOrPort = append(batch.SlugOrPort, stat.SlugOrPort)
batch.SessionID = append(batch.SessionID, stat.SessionID)
batch.SessionStartedAt = append(batch.SessionStartedAt, stat.SessionStartedAt)
batch.SessionEndedAt = append(batch.SessionEndedAt, stat.SessionEndedAt)
batch.Requests = append(batch.Requests, int32(stat.Requests))
if len(batch.UserID) >= r.batchSize {
err := tx.InsertWorkspaceAppStats(ctx, batch)
if err != nil {
return err
}
// Reset batch.
batch.UserID = batch.UserID[:0]
batch.WorkspaceID = batch.WorkspaceID[:0]
batch.AgentID = batch.AgentID[:0]
batch.AccessMethod = batch.AccessMethod[:0]
batch.SlugOrPort = batch.SlugOrPort[:0]
batch.SessionID = batch.SessionID[:0]
batch.SessionStartedAt = batch.SessionStartedAt[:0]
batch.SessionEndedAt = batch.SessionEndedAt[:0]
batch.Requests = batch.Requests[:0]
}
}
if len(batch.UserID) > 0 {
err := tx.InsertWorkspaceAppStats(ctx, batch)
if err != nil {
return err
}
}
return nil
}, nil)
if err != nil {
return xerrors.Errorf("insert workspace app stats failed: %w", err)
}
return nil
}
// This should match the database unique constraint.
type statsGroupKey struct {
StartTimeTrunc time.Time
UserID uuid.UUID
WorkspaceID uuid.UUID
AgentID uuid.UUID
AccessMethod AccessMethod
SlugOrPort string
}
func (s StatsReport) groupKey(windowSize time.Duration) statsGroupKey {
return statsGroupKey{
StartTimeTrunc: s.SessionStartedAt.Truncate(windowSize),
UserID: s.UserID,
WorkspaceID: s.WorkspaceID,
AgentID: s.AgentID,
AccessMethod: s.AccessMethod,
SlugOrPort: s.SlugOrPort,
}
}
// StatsCollector collects workspace app StatsReports and reports them
// in batches, stats compaction is performed for short-lived sessions.
type StatsCollector struct {
opts StatsCollectorOptions
ctx context.Context
cancel context.CancelFunc
done chan struct{}
mu sync.Mutex // Protects following.
statsBySessionID map[uuid.UUID]*StatsReport // Track unique sessions.
groupedStats map[statsGroupKey][]*StatsReport // Rolled up stats for sessions in close proximity.
backlog []StatsReport // Stats that have not been reported yet (due to error).
}
type StatsCollectorOptions struct {
Logger *slog.Logger
Reporter StatsReporter
// ReportInterval is the interval at which stats are reported, both partial
// and fully formed stats.
ReportInterval time.Duration
// RollupWindow is the window size for rolling up stats, session shorter
// than this will be rolled up and longer than this will be tracked
// individually.
RollupWindow time.Duration
// Options for tests.
Flush <-chan chan<- struct{}
Now func() time.Time
}
func NewStatsCollector(opts StatsCollectorOptions) *StatsCollector {
if opts.Logger == nil {
opts.Logger = &slog.Logger{}
}
if opts.ReportInterval == 0 {
opts.ReportInterval = DefaultStatsCollectorReportInterval
}
if opts.RollupWindow == 0 {
opts.RollupWindow = DefaultStatsCollectorRollupWindow
}
if opts.Now == nil {
opts.Now = time.Now
}
ctx, cancel := context.WithCancel(context.Background())
sc := &StatsCollector{
ctx: ctx,
cancel: cancel,
done: make(chan struct{}),
opts: opts,
statsBySessionID: make(map[uuid.UUID]*StatsReport),
groupedStats: make(map[statsGroupKey][]*StatsReport),
}
go sc.start()
return sc
}
// Collect the given StatsReport for later reporting (non-blocking).
func (sc *StatsCollector) Collect(report StatsReport) {
sc.mu.Lock()
defer sc.mu.Unlock()
r := &report
if _, ok := sc.statsBySessionID[report.SessionID]; !ok {
groupKey := r.groupKey(sc.opts.RollupWindow)
sc.groupedStats[groupKey] = append(sc.groupedStats[groupKey], r)
}
if r.SessionEndedAt.IsZero() {
sc.statsBySessionID[report.SessionID] = r
} else {
if stat, ok := sc.statsBySessionID[report.SessionID]; ok {
// Update in-place.
*stat = *r
}
delete(sc.statsBySessionID, report.SessionID)
}
}
// rollup performs stats rollup for sessions that fall within the
// configured rollup window. For sessions longer than the window,
// we report them individually.
func (sc *StatsCollector) rollup(now time.Time) []StatsReport {
sc.mu.Lock()
defer sc.mu.Unlock()
var report []StatsReport
for g, group := range sc.groupedStats {
if len(group) == 0 {
// Safety check, this should not happen.
sc.opts.Logger.Error(sc.ctx, "empty stats group", "group", g)
delete(sc.groupedStats, g)
continue
}
var rolledUp *StatsReport
if group[0].rolledUp {
rolledUp = group[0]
group = group[1:]
} else {
rolledUp = &StatsReport{
UserID: g.UserID,
WorkspaceID: g.WorkspaceID,
AgentID: g.AgentID,
AccessMethod: g.AccessMethod,
SlugOrPort: g.SlugOrPort,
SessionStartedAt: g.StartTimeTrunc,
SessionEndedAt: g.StartTimeTrunc.Add(sc.opts.RollupWindow),
Requests: 0,
rolledUp: true,
}
}
rollupChanged := false
newGroup := []*StatsReport{rolledUp} // Must be first in slice for future iterations (see group[0] above).
for _, stat := range group {
if !stat.SessionEndedAt.IsZero() && stat.SessionEndedAt.Sub(stat.SessionStartedAt) <= sc.opts.RollupWindow {
// This is a short-lived session, roll it up.
if rolledUp.SessionID == uuid.Nil {
rolledUp.SessionID = stat.SessionID // Borrow the first session ID, useful in tests.
}
rolledUp.Requests += stat.Requests
rollupChanged = true
continue
}
if stat.SessionEndedAt.IsZero() && now.Sub(stat.SessionStartedAt) <= sc.opts.RollupWindow {
// This is an incomplete session, wait and see if it'll be rolled up or not.
newGroup = append(newGroup, stat)
continue
}
// This is a long-lived session, report it individually.
// Make a copy of stat for reporting.
r := *stat
if r.SessionEndedAt.IsZero() {
// Report an end time for incomplete sessions, it will
// be updated later. This ensures that data in the DB
// will have an end time even if the service is stopped.
r.SessionEndedAt = now.UTC() // Use UTC like database.Now().
}
report = append(report, r) // Report it (ended or incomplete).
if stat.SessionEndedAt.IsZero() {
newGroup = append(newGroup, stat) // Keep it for future updates.
}
}
if rollupChanged {
report = append(report, *rolledUp)
}
// Future rollups should only consider the compacted group.
sc.groupedStats[g] = newGroup
// Keep the group around until the next rollup window has passed
// in case data was collected late.
if len(newGroup) == 1 && rolledUp.SessionEndedAt.Add(sc.opts.RollupWindow).Before(now) {
delete(sc.groupedStats, g)
}
}
return report
}
func (sc *StatsCollector) flush(ctx context.Context) (err error) {
sc.opts.Logger.Debug(ctx, "flushing workspace app stats")
defer func() {
if err != nil {
sc.opts.Logger.Error(ctx, "failed to flush workspace app stats", "error", err)
} else {
sc.opts.Logger.Debug(ctx, "flushed workspace app stats")
}
}()
// We keep the backlog as a simple slice so that we don't need to
// attempt to merge it with the stats we're about to report. This
// is because the rollup is a one-way operation and the backlog may
// contain stats that are still in the statsBySessionID map and will
// be reported again in the future. It is possible to merge the
// backlog and the stats we're about to report, but it's not worth
// the complexity.
if len(sc.backlog) > 0 {
err = sc.opts.Reporter.Report(ctx, sc.backlog)
if err != nil {
return xerrors.Errorf("report workspace app stats from backlog failed: %w", err)
}
sc.backlog = nil
}
now := sc.opts.Now()
stats := sc.rollup(now)
if len(stats) == 0 {
return nil
}
err = sc.opts.Reporter.Report(ctx, stats)
if err != nil {
sc.backlog = stats
return xerrors.Errorf("report workspace app stats failed: %w", err)
}
return nil
}
func (sc *StatsCollector) Close() error {
sc.cancel()
<-sc.done
return nil
}
func (sc *StatsCollector) start() {
defer func() {
close(sc.done)
sc.opts.Logger.Debug(sc.ctx, "workspace app stats collector stopped")
}()
sc.opts.Logger.Debug(sc.ctx, "workspace app stats collector started")
t := time.NewTimer(sc.opts.ReportInterval)
defer t.Stop()
var reportFlushDone chan<- struct{}
done := false
for !done {
select {
case <-sc.ctx.Done():
t.Stop()
done = true
case <-t.C:
case reportFlushDone = <-sc.opts.Flush:
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
//nolint:gocritic // Inserting app stats is a system function.
_ = sc.flush(dbauthz.AsSystemRestricted(ctx))
cancel()
if !done {
t.Reset(sc.opts.ReportInterval)
}
// For tests.
if reportFlushDone != nil {
reportFlushDone <- struct{}{}
reportFlushDone = nil
}
}
}

View File

@ -0,0 +1,426 @@
package workspaceapps_test
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
"github.com/coder/coder/coderd/database"
"github.com/coder/coder/coderd/workspaceapps"
"github.com/coder/coder/testutil"
)
type fakeReporter struct {
mu sync.Mutex
s []workspaceapps.StatsReport
err error
errN int
}
func (r *fakeReporter) stats() []workspaceapps.StatsReport {
r.mu.Lock()
defer r.mu.Unlock()
return r.s
}
func (r *fakeReporter) errors() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.errN
}
func (r *fakeReporter) setError(err error) {
r.mu.Lock()
defer r.mu.Unlock()
r.err = err
}
func (r *fakeReporter) Report(_ context.Context, stats []workspaceapps.StatsReport) error {
r.mu.Lock()
if r.err != nil {
r.errN++
r.mu.Unlock()
return r.err
}
r.s = append(r.s, stats...)
r.mu.Unlock()
return nil
}
func TestStatsCollector(t *testing.T) {
t.Parallel()
rollupUUID := uuid.New()
rollupUUID2 := uuid.New()
someUUID := uuid.New()
rollupWindow := time.Minute
start := database.Now().Truncate(time.Minute).UTC()
end := start.Add(10 * time.Second)
tests := []struct {
name string
flushIncrement time.Duration
flushCount int
stats []workspaceapps.StatsReport
want []workspaceapps.StatsReport
}{
{
name: "Single stat rolled up and reported once",
flushIncrement: 2*rollupWindow + time.Second,
flushCount: 10, // Only reported once.
stats: []workspaceapps.StatsReport{
{
SessionID: rollupUUID,
SessionStartedAt: start,
SessionEndedAt: end,
Requests: 1,
},
},
want: []workspaceapps.StatsReport{
{
SessionID: rollupUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(rollupWindow),
Requests: 1,
},
},
},
{
name: "Two unique stat rolled up",
flushIncrement: 2*rollupWindow + time.Second,
flushCount: 10, // Only reported once.
stats: []workspaceapps.StatsReport{
{
AccessMethod: workspaceapps.AccessMethodPath,
SlugOrPort: "code-server",
SessionID: rollupUUID,
SessionStartedAt: start,
SessionEndedAt: end,
Requests: 1,
},
{
AccessMethod: workspaceapps.AccessMethodTerminal,
SessionID: rollupUUID2,
SessionStartedAt: start,
SessionEndedAt: end,
Requests: 1,
},
},
want: []workspaceapps.StatsReport{
{
AccessMethod: workspaceapps.AccessMethodPath,
SlugOrPort: "code-server",
SessionID: rollupUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(rollupWindow),
Requests: 1,
},
{
AccessMethod: workspaceapps.AccessMethodTerminal,
SessionID: rollupUUID2,
SessionStartedAt: start,
SessionEndedAt: start.Add(rollupWindow),
Requests: 1,
},
},
},
{
name: "Multiple stats rolled up",
flushIncrement: 2*rollupWindow + time.Second,
flushCount: 2,
stats: []workspaceapps.StatsReport{
{
SessionID: rollupUUID,
SessionStartedAt: start,
SessionEndedAt: end,
Requests: 1,
},
{
SessionID: uuid.New(),
SessionStartedAt: start,
SessionEndedAt: end,
Requests: 1,
},
},
want: []workspaceapps.StatsReport{
{
SessionID: rollupUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(rollupWindow),
Requests: 2,
},
},
},
{
name: "Long sessions not rolled up but reported multiple times",
flushIncrement: rollupWindow + time.Second,
flushCount: 4,
stats: []workspaceapps.StatsReport{
{
SessionID: rollupUUID,
SessionStartedAt: start,
Requests: 1,
},
},
want: []workspaceapps.StatsReport{
{
SessionID: rollupUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(rollupWindow + time.Second),
Requests: 1,
},
{
SessionID: rollupUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(2 * (rollupWindow + time.Second)),
Requests: 1,
},
{
SessionID: rollupUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(3 * (rollupWindow + time.Second)),
Requests: 1,
},
{
SessionID: rollupUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(4 * (rollupWindow + time.Second)),
Requests: 1,
},
},
},
{
name: "Incomplete stats not reported until it exceeds rollup window",
flushIncrement: rollupWindow / 4,
flushCount: 6,
stats: []workspaceapps.StatsReport{
{
SessionID: someUUID,
SessionStartedAt: start,
Requests: 1,
},
},
want: []workspaceapps.StatsReport{
{
SessionID: someUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(rollupWindow / 4 * 5),
Requests: 1,
},
{
SessionID: someUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(rollupWindow / 4 * 6),
Requests: 1,
},
},
},
{
name: "Same stat reported without and with end time and rolled up",
flushIncrement: rollupWindow + time.Second,
flushCount: 1,
stats: []workspaceapps.StatsReport{
{
SessionID: someUUID,
SessionStartedAt: start,
Requests: 1,
},
{
SessionID: someUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(10 * time.Second),
Requests: 1,
},
},
want: []workspaceapps.StatsReport{
{
SessionID: someUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(rollupWindow),
Requests: 1,
},
},
},
{
name: "Same non-rolled up stat reported without and with end time",
flushIncrement: rollupWindow * 2,
flushCount: 1,
stats: []workspaceapps.StatsReport{
{
SessionID: someUUID,
SessionStartedAt: start,
Requests: 1,
},
{
SessionID: someUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(rollupWindow * 2),
Requests: 1,
},
},
want: []workspaceapps.StatsReport{
{
SessionID: someUUID,
SessionStartedAt: start,
SessionEndedAt: start.Add(rollupWindow * 2),
Requests: 1,
},
},
},
}
// Run tests.
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
flush := make(chan chan<- struct{}, 1)
var now atomic.Pointer[time.Time]
now.Store(&start)
reporter := &fakeReporter{}
collector := workspaceapps.NewStatsCollector(workspaceapps.StatsCollectorOptions{
Reporter: reporter,
ReportInterval: time.Hour,
RollupWindow: rollupWindow,
Flush: flush,
Now: func() time.Time { return *now.Load() },
})
// Collect reports.
for _, report := range tt.stats {
collector.Collect(report)
}
// Advance time.
flushTime := start.Add(tt.flushIncrement)
for i := 0; i < tt.flushCount; i++ {
now.Store(&flushTime)
flushDone := make(chan struct{}, 1)
flush <- flushDone
<-flushDone
flushTime = flushTime.Add(tt.flushIncrement)
}
var gotStats []workspaceapps.StatsReport
require.Eventually(t, func() bool {
gotStats = reporter.stats()
return len(gotStats) == len(tt.want)
}, testutil.WaitMedium, testutil.IntervalFast)
// Order is not guaranteed.
sortBySessionID := func(a, b workspaceapps.StatsReport) int {
if a.SessionID == b.SessionID {
return int(a.SessionEndedAt.Sub(b.SessionEndedAt))
}
if a.SessionID.String() < b.SessionID.String() {
return -1
}
return 1
}
slices.SortFunc(tt.want, sortBySessionID)
slices.SortFunc(gotStats, sortBySessionID)
// Verify reported stats.
for i, got := range gotStats {
want := tt.want[i]
assert.Equal(t, want.SessionID, got.SessionID, "session ID; i = %d", i)
assert.Equal(t, want.SessionStartedAt, got.SessionStartedAt, "session started at; i = %d", i)
assert.Equal(t, want.SessionEndedAt, got.SessionEndedAt, "session ended at; i = %d", i)
assert.Equal(t, want.Requests, got.Requests, "requests; i = %d", i)
}
})
}
}
func TestStatsCollector_backlog(t *testing.T) {
t.Parallel()
rollupWindow := time.Minute
flush := make(chan chan<- struct{}, 1)
start := database.Now().Truncate(time.Minute).UTC()
var now atomic.Pointer[time.Time]
now.Store(&start)
reporter := &fakeReporter{}
collector := workspaceapps.NewStatsCollector(workspaceapps.StatsCollectorOptions{
Reporter: reporter,
ReportInterval: time.Hour,
RollupWindow: rollupWindow,
Flush: flush,
Now: func() time.Time { return *now.Load() },
})
reporter.setError(xerrors.New("some error"))
// The first collected stat is "rolled up" and moved into the
// backlog during the first flush. On the second flush nothing is
// rolled up due to being unable to report the backlog.
for i := 0; i < 2; i++ {
collector.Collect(workspaceapps.StatsReport{
SessionID: uuid.New(),
SessionStartedAt: start,
SessionEndedAt: start.Add(10 * time.Second),
Requests: 1,
})
start = start.Add(time.Minute)
now.Store(&start)
flushDone := make(chan struct{}, 1)
flush <- flushDone
<-flushDone
}
// Flush was performed 2 times, 2 reports should have failed.
wantErrors := 2
assert.Equal(t, wantErrors, reporter.errors())
assert.Empty(t, reporter.stats())
reporter.setError(nil)
// Flush again, this time the backlog should be reported in addition
// to the second collected stat being rolled up and reported.
flushDone := make(chan struct{}, 1)
flush <- flushDone
<-flushDone
assert.Equal(t, wantErrors, reporter.errors())
assert.Len(t, reporter.stats(), 2)
}
func TestStatsCollector_Close(t *testing.T) {
t.Parallel()
reporter := &fakeReporter{}
collector := workspaceapps.NewStatsCollector(workspaceapps.StatsCollectorOptions{
Reporter: reporter,
ReportInterval: time.Hour,
RollupWindow: time.Minute,
})
collector.Collect(workspaceapps.StatsReport{
SessionID: uuid.New(),
SessionStartedAt: database.Now(),
SessionEndedAt: database.Now(),
Requests: 1,
})
collector.Close()
// Verify that stats are reported after close.
assert.NotEmpty(t, reporter.stats())
}