chore: move stat reporting into workspacestats package (#13386)

This commit is contained in:
Garrett Delfosse
2024-05-29 11:49:08 -04:00
committed by GitHub
parent afd9d3b35f
commit 5789ea5397
19 changed files with 314 additions and 346 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/tracing"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/tailnet"
tailnetproto "github.com/coder/coder/v2/tailnet/proto"
@ -59,7 +60,7 @@ type Options struct {
DerpMapFn func() *tailcfg.DERPMap
TailnetCoordinator *atomic.Pointer[tailnet.Coordinator]
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
StatsBatcher StatsBatcher
StatsReporter *workspacestats.Reporter
AppearanceFetcher *atomic.Pointer[appearance.Fetcher]
PublishWorkspaceUpdateFn func(ctx context.Context, workspaceID uuid.UUID)
PublishWorkspaceAgentLogsUpdateFn func(ctx context.Context, workspaceAgentID uuid.UUID, msg agentsdk.LogsNotifyMessage)
@ -114,12 +115,9 @@ func New(opts Options) *API {
api.StatsAPI = &StatsAPI{
AgentFn: api.agent,
Database: opts.Database,
Pubsub: opts.Pubsub,
Log: opts.Log,
StatsBatcher: opts.StatsBatcher,
TemplateScheduleStore: opts.TemplateScheduleStore,
StatsReporter: opts.StatsReporter,
AgentStatsRefreshInterval: opts.AgentStatsRefreshInterval,
UpdateAgentMetricsFn: opts.UpdateAgentMetricsFn,
}
api.LifecycleAPI = &LifecycleAPI{

View File

@ -2,10 +2,8 @@ package agentapi
import (
"context"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/durationpb"
@ -15,10 +13,7 @@ import (
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/coderd/workspacestats"
)
type StatsBatcher interface {
@ -28,12 +23,9 @@ type StatsBatcher interface {
type StatsAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error)
Database database.Store
Pubsub pubsub.Pubsub
Log slog.Logger
StatsBatcher StatsBatcher
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
StatsReporter *workspacestats.Reporter
AgentStatsRefreshInterval time.Duration
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)
TimeNowFn func() time.Time // defaults to dbtime.Now()
}
@ -69,80 +61,17 @@ func (a *StatsAPI) UpdateStats(ctx context.Context, req *agentproto.UpdateStatsR
slog.F("payload", req),
)
now := a.now()
if req.Stats.ConnectionCount > 0 {
var nextAutostart time.Time
if workspace.AutostartSchedule.String != "" {
templateSchedule, err := (*(a.TemplateScheduleStore.Load())).Get(ctx, a.Database, workspace.TemplateID)
// If the template schedule fails to load, just default to bumping
// without the next transition and log it.
if err != nil {
a.Log.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min",
slog.F("workspace_id", workspace.ID),
slog.F("template_id", workspace.TemplateID),
slog.Error(err),
)
} else {
next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule)
if allowed {
nextAutostart = next
}
}
}
ActivityBumpWorkspace(ctx, a.Log.Named("activity_bump"), a.Database, workspace.ID, nextAutostart)
}
var errGroup errgroup.Group
errGroup.Go(func() error {
err := a.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req.Stats)
if err != nil {
a.Log.Error(ctx, "add agent stats to batcher", slog.Error(err))
return xerrors.Errorf("insert workspace agent stats batch: %w", err)
}
return nil
})
errGroup.Go(func() error {
// nolint:gocritic // (#13146) Will be moved soon as part of refactor.
err := a.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
})
if err != nil {
return xerrors.Errorf("update workspace LastUsedAt: %w", err)
}
return nil
})
if a.UpdateAgentMetricsFn != nil {
errGroup.Go(func() error {
user, err := a.Database.GetUserByID(ctx, workspace.OwnerID)
if err != nil {
return xerrors.Errorf("get user: %w", err)
}
a.UpdateAgentMetricsFn(ctx, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: workspaceAgent.Name,
TemplateName: getWorkspaceAgentByIDRow.TemplateName,
}, req.Stats.Metrics)
return nil
})
}
err = errGroup.Wait()
err = a.StatsReporter.ReportAgentStats(
ctx,
a.now(),
workspace,
workspaceAgent,
getWorkspaceAgentByIDRow.TemplateName,
req.Stats,
)
if err != nil {
return nil, xerrors.Errorf("update stats in database: %w", err)
return nil, xerrors.Errorf("report agent stats: %w", err)
}
// Tell the frontend about the new agent report, now that everything is updated
a.publishWorkspaceAgentStats(ctx, workspace.ID)
return res, nil
}
func (a *StatsAPI) publishWorkspaceAgentStats(ctx context.Context, workspaceID uuid.UUID) {
err := a.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspaceID), []byte{})
if err != nil {
a.Log.Warn(ctx, "failed to publish workspace agent stats",
slog.F("workspace_id", workspaceID), slog.Error(err))
}
}

View File

@ -22,6 +22,7 @@ import (
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/testutil"
)
@ -129,21 +130,24 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
}),
AgentStatsRefreshInterval: 10 * time.Second,
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
TimeNowFn: func() time.Time {
return now
},
@ -232,13 +236,16 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
// Ignored when nil.
UpdateAgentMetricsFn: nil,
}),
AgentStatsRefreshInterval: 10 * time.Second,
// Ignored when nil.
UpdateAgentMetricsFn: nil,
TimeNowFn: func() time.Time {
return now
},
@ -274,12 +281,15 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: nil, // should not be called
TemplateScheduleStore: nil, // should not be called
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: nil, // should not be called
TemplateScheduleStore: nil, // should not be called
UpdateAgentMetricsFn: nil, // should not be called
}),
AgentStatsRefreshInterval: 10 * time.Second,
UpdateAgentMetricsFn: nil, // should not be called
TimeNowFn: func() time.Time {
panic("should not be called")
},
@ -343,21 +353,24 @@ func TestUpdateStates(t *testing.T) {
AgentFn: func(context.Context) (database.WorkspaceAgent, error) {
return agent, nil
},
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
Database: dbM,
StatsReporter: workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: dbM,
Pubsub: ps,
StatsBatcher: batcher,
TemplateScheduleStore: templateScheduleStorePtr(templateScheduleStore),
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
}),
AgentStatsRefreshInterval: 15 * time.Second,
UpdateAgentMetricsFn: func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric) {
updateAgentMetricsFnCalled = true
assert.Equal(t, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: agent.Name,
TemplateName: template.Name,
}, labels)
assert.Equal(t, req.Stats.Metrics, metrics)
},
TimeNowFn: func() time.Time {
return now
},

View File

@ -68,6 +68,7 @@ import (
"github.com/coder/coder/v2/coderd/updatecheck"
"github.com/coder/coder/v2/coderd/util/slice"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/coderd/workspaceusage"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/drpc"
@ -550,13 +551,22 @@ func New(options *Options) *API {
api.Logger.Fatal(api.ctx, "failed to initialize tailnet client service", slog.Error(err))
}
api.statsReporter = workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: options.Database,
Logger: options.Logger.Named("workspacestats"),
Pubsub: options.Pubsub,
TemplateScheduleStore: options.TemplateScheduleStore,
StatsBatcher: options.StatsBatcher,
UpdateAgentMetricsFn: options.UpdateAgentMetrics,
AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize,
})
workspaceAppsLogger := options.Logger.Named("workspaceapps")
if options.WorkspaceAppsStatsCollectorOptions.Logger == nil {
named := workspaceAppsLogger.Named("stats_collector")
options.WorkspaceAppsStatsCollectorOptions.Logger = &named
}
if options.WorkspaceAppsStatsCollectorOptions.Reporter == nil {
options.WorkspaceAppsStatsCollectorOptions.Reporter = workspaceapps.NewStatsDBReporter(options.Database, workspaceapps.DefaultStatsDBReporterBatchSize)
options.WorkspaceAppsStatsCollectorOptions.Reporter = api.statsReporter
}
api.workspaceAppServer = &workspaceapps.Server{
@ -626,8 +636,6 @@ func New(options *Options) *API {
cors := httpmw.Cors(options.DeploymentValues.Dangerous.AllowAllCors.Value())
prometheusMW := httpmw.Prometheus(options.PrometheusRegistry)
api.statsBatcher = options.StatsBatcher
r.Use(
httpmw.Recover(api.Logger),
tracing.StatusWriterMiddleware,
@ -1287,7 +1295,7 @@ type API struct {
healthCheckGroup *singleflight.Group[string, *healthsdk.HealthcheckReport]
healthCheckCache atomic.Pointer[healthsdk.HealthcheckReport]
statsBatcher *batchstats.Batcher
statsReporter *workspacestats.Reporter
Acquirer *provisionerdserver.Acquirer
// dbRolluper rolls up template usage stats from raw agent and app

View File

@ -264,7 +264,7 @@ func (s *MethodTestSuite) NotAuthorizedErrorTest(ctx context.Context, az *coderd
// any case where the error is nil and the response is an empty slice.
if err != nil || !hasEmptySliceResponse(resp) {
s.Errorf(err, "method should an error with cancellation")
s.ErrorIsf(err, context.Canceled, "error should match context.Cancelled")
s.ErrorIsf(err, context.Canceled, "error should match context.Canceled")
}
})
}

View File

@ -31,6 +31,7 @@ import (
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
@ -736,9 +737,12 @@ func TestTemplateInsights_Golden(t *testing.T) {
})
}
}
reporter := workspaceapps.NewStatsDBReporter(db, workspaceapps.DefaultStatsDBReporterBatchSize)
reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: db,
AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize,
})
//nolint:gocritic // This is a test.
err = reporter.Report(dbauthz.AsSystemRestricted(ctx), stats)
err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats)
require.NoError(t, err, "want no error inserting app stats")
return client, events
@ -1632,9 +1636,12 @@ func TestUserActivityInsights_Golden(t *testing.T) {
})
}
}
reporter := workspaceapps.NewStatsDBReporter(db, workspaceapps.DefaultStatsDBReporterBatchSize)
reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: db,
AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize,
})
//nolint:gocritic // This is a test.
err = reporter.Report(dbauthz.AsSystemRestricted(ctx), stats)
err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(ctx), stats)
require.NoError(t, err, "want no error inserting app stats")
return client, events

View File

@ -25,6 +25,7 @@ import (
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/prometheusmetrics/insights"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/testutil"
)
@ -109,10 +110,13 @@ func TestCollectInsights(t *testing.T) {
require.NoError(t, err, "unable to post fake stats")
// Fake app usage
reporter := workspaceapps.NewStatsDBReporter(db, workspaceapps.DefaultStatsDBReporterBatchSize)
reporter := workspacestats.NewReporter(workspacestats.ReporterOptions{
Database: db,
AppStatBatchSize: workspaceapps.DefaultStatsDBReporterBatchSize,
})
refTime := time.Now().Add(-3 * time.Minute).Truncate(time.Minute)
//nolint:gocritic // This is a test.
err = reporter.Report(dbauthz.AsSystemRestricted(context.Background()), []workspaceapps.StatsReport{
err = reporter.ReportAppStats(dbauthz.AsSystemRestricted(context.Background()), []workspaceapps.StatsReport{
{
UserID: user.ID,
WorkspaceID: workspace1.ID,

View File

@ -34,9 +34,7 @@ import (
"github.com/coder/coder/v2/coderd/externalauth"
"github.com/coder/coder/v2/coderd/httpapi"
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
@ -1167,35 +1165,6 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
slog.F("payload", req),
)
if req.ConnectionCount > 0 {
var nextAutostart time.Time
if workspace.AutostartSchedule.String != "" {
templateSchedule, err := (*(api.TemplateScheduleStore.Load())).Get(ctx, api.Database, workspace.TemplateID)
// If the template schedule fails to load, just default to bumping without the next transition and log it.
if err != nil {
// There's nothing we can do if the query was canceled, the
// client most likely went away so we just return an internal
// server error.
if database.IsQueryCanceledError(err) {
httpapi.InternalServerError(rw, err)
return
}
api.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min",
slog.F("workspace_id", workspace.ID),
slog.F("template_id", workspace.TemplateID),
slog.Error(err),
)
} else {
next, allowed := schedule.NextAutostart(time.Now(), workspace.AutostartSchedule.String, templateSchedule)
if allowed {
nextAutostart = next
}
}
}
agentapi.ActivityBumpWorkspace(ctx, api.Logger.Named("activity_bump"), api.Database, workspace.ID, nextAutostart)
}
now := dbtime.Now()
protoStats := &agentproto.Stats{
ConnectionsByProto: req.ConnectionsByProto,
ConnectionCount: req.ConnectionCount,
@ -1232,46 +1201,14 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
}
}
}
var errGroup errgroup.Group
errGroup.Go(func() error {
err := api.statsBatcher.Add(time.Now(), workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, protoStats)
if err != nil {
api.Logger.Error(ctx, "failed to add stats to batcher", slog.Error(err))
return xerrors.Errorf("can't insert workspace agent stat: %w", err)
}
return nil
})
if req.SessionCount() > 0 {
errGroup.Go(func() error {
// nolint:gocritic // (#13146) Will be moved soon as part of refactor.
err := api.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
})
if err != nil {
return xerrors.Errorf("can't update workspace LastUsedAt: %w", err)
}
return nil
})
}
if api.Options.UpdateAgentMetrics != nil {
errGroup.Go(func() error {
user, err := api.Database.GetUserByID(ctx, workspace.OwnerID)
if err != nil {
return xerrors.Errorf("can't get user: %w", err)
}
api.Options.UpdateAgentMetrics(ctx, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: workspaceAgent.Name,
TemplateName: row.TemplateName,
}, protoStats.Metrics)
return nil
})
}
err = errGroup.Wait()
err = api.statsReporter.ReportAgentStats(
ctx,
dbtime.Now(),
workspace,
workspaceAgent,
row.TemplateName,
protoStats,
)
if err != nil {
httpapi.InternalServerError(rw, err)
return

View File

@ -939,32 +939,6 @@ func TestWorkspaceAgentReportStats(t *testing.T) {
agentClient.SetSessionToken(r.AgentToken)
_, err := agentClient.PostStats(context.Background(), &agentsdk.Stats{
ConnectionsByProto: map[string]int64{"TCP": 1},
// Set connection count to 1 but all session counts to zero to
// assert we aren't updating last_used_at for a connections that may
// be spawned passively by the dashboard.
ConnectionCount: 1,
RxPackets: 1,
RxBytes: 1,
TxPackets: 1,
TxBytes: 1,
SessionCountVSCode: 0,
SessionCountJetBrains: 0,
SessionCountReconnectingPTY: 0,
SessionCountSSH: 0,
ConnectionMedianLatencyMS: 10,
})
require.NoError(t, err)
newWorkspace, err := client.Workspace(context.Background(), r.Workspace.ID)
require.NoError(t, err)
assert.True(t,
newWorkspace.LastUsedAt.Equal(r.Workspace.LastUsedAt),
"%s and %s should not differ", newWorkspace.LastUsedAt, r.Workspace.LastUsedAt,
)
_, err = agentClient.PostStats(context.Background(), &agentsdk.Stats{
ConnectionsByProto: map[string]int64{"TCP": 1},
ConnectionCount: 1,
RxPackets: 1,
@ -979,7 +953,7 @@ func TestWorkspaceAgentReportStats(t *testing.T) {
})
require.NoError(t, err)
newWorkspace, err = client.Workspace(context.Background(), r.Workspace.ID)
newWorkspace, err := client.Workspace(context.Background(), r.Workspace.ID)
require.NoError(t, err)
assert.True(t,

View File

@ -132,7 +132,7 @@ func (api *API) workspaceAgentRPC(rw http.ResponseWriter, r *http.Request) {
TailnetCoordinator: &api.TailnetCoordinator,
TemplateScheduleStore: api.TemplateScheduleStore,
AppearanceFetcher: &api.AppearanceFetcher,
StatsBatcher: api.statsBatcher,
StatsReporter: api.statsReporter,
PublishWorkspaceUpdateFn: api.publishWorkspaceUpdate,
PublishWorkspaceAgentLogsUpdateFn: api.publishWorkspaceAgentLogsUpdate,

View File

@ -1688,7 +1688,7 @@ func (r *fakeStatsReporter) stats() []workspaceapps.StatsReport {
return r.s
}
func (r *fakeStatsReporter) Report(_ context.Context, stats []workspaceapps.StatsReport) error {
func (r *fakeStatsReporter) ReportAppStats(_ context.Context, stats []workspaceapps.StatsReport) error {
r.mu.Lock()
r.s = append(r.s, stats...)
r.mu.Unlock()

View File

@ -10,10 +10,8 @@ import (
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/util/slice"
)
const (
@ -52,100 +50,7 @@ func newStatsReportFromSignedToken(token SignedToken) StatsReport {
// StatsReporter reports workspace app StatsReports.
type StatsReporter interface {
Report(context.Context, []StatsReport) error
}
var _ StatsReporter = (*StatsDBReporter)(nil)
// StatsDBReporter writes workspace app StatsReports to the database.
type StatsDBReporter struct {
db database.Store
batchSize int
}
// NewStatsDBReporter returns a new StatsDBReporter.
func NewStatsDBReporter(db database.Store, batchSize int) *StatsDBReporter {
return &StatsDBReporter{
db: db,
batchSize: batchSize,
}
}
// Report writes the given StatsReports to the database.
func (r *StatsDBReporter) Report(ctx context.Context, stats []StatsReport) error {
err := r.db.InTx(func(tx database.Store) error {
maxBatchSize := r.batchSize
if len(stats) < maxBatchSize {
maxBatchSize = len(stats)
}
batch := database.InsertWorkspaceAppStatsParams{
UserID: make([]uuid.UUID, 0, maxBatchSize),
WorkspaceID: make([]uuid.UUID, 0, maxBatchSize),
AgentID: make([]uuid.UUID, 0, maxBatchSize),
AccessMethod: make([]string, 0, maxBatchSize),
SlugOrPort: make([]string, 0, maxBatchSize),
SessionID: make([]uuid.UUID, 0, maxBatchSize),
SessionStartedAt: make([]time.Time, 0, maxBatchSize),
SessionEndedAt: make([]time.Time, 0, maxBatchSize),
Requests: make([]int32, 0, maxBatchSize),
}
for _, stat := range stats {
batch.UserID = append(batch.UserID, stat.UserID)
batch.WorkspaceID = append(batch.WorkspaceID, stat.WorkspaceID)
batch.AgentID = append(batch.AgentID, stat.AgentID)
batch.AccessMethod = append(batch.AccessMethod, string(stat.AccessMethod))
batch.SlugOrPort = append(batch.SlugOrPort, stat.SlugOrPort)
batch.SessionID = append(batch.SessionID, stat.SessionID)
batch.SessionStartedAt = append(batch.SessionStartedAt, stat.SessionStartedAt)
batch.SessionEndedAt = append(batch.SessionEndedAt, stat.SessionEndedAt)
batch.Requests = append(batch.Requests, int32(stat.Requests))
if len(batch.UserID) >= r.batchSize {
err := tx.InsertWorkspaceAppStats(ctx, batch)
if err != nil {
return err
}
// Reset batch.
batch.UserID = batch.UserID[:0]
batch.WorkspaceID = batch.WorkspaceID[:0]
batch.AgentID = batch.AgentID[:0]
batch.AccessMethod = batch.AccessMethod[:0]
batch.SlugOrPort = batch.SlugOrPort[:0]
batch.SessionID = batch.SessionID[:0]
batch.SessionStartedAt = batch.SessionStartedAt[:0]
batch.SessionEndedAt = batch.SessionEndedAt[:0]
batch.Requests = batch.Requests[:0]
}
}
if len(batch.UserID) == 0 {
return nil
}
if err := tx.InsertWorkspaceAppStats(ctx, batch); err != nil {
return err
}
// TODO: We currently measure workspace usage based on when we get stats from it.
// There are currently two paths for this:
// 1) From SSH -> workspace agent stats POSTed from agent
// 2) From workspace apps / rpty -> workspace app stats (from coderd / wsproxy)
// Ideally we would have a single code path for this.
uniqueIDs := slice.Unique(batch.WorkspaceID)
if err := tx.BatchUpdateWorkspaceLastUsedAt(ctx, database.BatchUpdateWorkspaceLastUsedAtParams{
IDs: uniqueIDs,
LastUsedAt: dbtime.Now(), // This isn't 100% accurate, but it's good enough.
}); err != nil {
return err
}
return nil
}, nil)
if err != nil {
return xerrors.Errorf("insert workspace app stats failed: %w", err)
}
return nil
ReportAppStats(context.Context, []StatsReport) error
}
// This should match the database unique constraint.
@ -353,7 +258,7 @@ func (sc *StatsCollector) flush(ctx context.Context) (err error) {
// backlog and the stats we're about to report, but it's not worth
// the complexity.
if len(sc.backlog) > 0 {
err = sc.opts.Reporter.Report(ctx, sc.backlog)
err = sc.opts.Reporter.ReportAppStats(ctx, sc.backlog)
if err != nil {
return xerrors.Errorf("report workspace app stats from backlog failed: %w", err)
}
@ -366,7 +271,7 @@ func (sc *StatsCollector) flush(ctx context.Context) (err error) {
return nil
}
err = sc.opts.Reporter.Report(ctx, stats)
err = sc.opts.Reporter.ReportAppStats(ctx, stats)
if err != nil {
sc.backlog = stats
return xerrors.Errorf("report workspace app stats failed: %w", err)

View File

@ -43,7 +43,7 @@ func (r *fakeReporter) setError(err error) {
r.err = err
}
func (r *fakeReporter) Report(_ context.Context, stats []workspaceapps.StatsReport) error {
func (r *fakeReporter) ReportAppStats(_ context.Context, stats []workspaceapps.StatsReport) error {
r.mu.Lock()
if r.err != nil {
r.errN++

View File

@ -1,4 +1,4 @@
package agentapi
package workspacestats
import (
"context"
@ -41,7 +41,6 @@ func ActivityBumpWorkspace(ctx context.Context, log slog.Logger, db database.Sto
// low priority operations fail first.
ctx, cancel := context.WithTimeout(ctx, time.Second*15)
defer cancel()
// nolint:gocritic // (#13146) Will be moved soon as part of refactor.
err := db.ActivityBumpWorkspace(ctx, database.ActivityBumpWorkspaceParams{
NextAutostart: nextAutostart.UTC(),
WorkspaceID: workspaceID,

View File

@ -1,4 +1,4 @@
package agentapi_test
package workspacestats_test
import (
"database/sql"
@ -8,12 +8,12 @@ import (
"github.com/google/uuid"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/agentapi"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/coderd/workspacestats"
"github.com/coder/coder/v2/testutil"
"github.com/stretchr/testify/assert"
@ -272,7 +272,7 @@ func Test_ActivityBumpWorkspace(t *testing.T) {
// Bump duration is measured from the time of the bump, so we measure from here.
start := dbtime.Now()
agentapi.ActivityBumpWorkspace(ctx, log, db, bld.WorkspaceID, nextAutostart(start))
workspacestats.ActivityBumpWorkspace(ctx, log, db, bld.WorkspaceID, nextAutostart(start))
end := dbtime.Now()
// Validate our state after bump

View File

@ -0,0 +1,194 @@
package workspacestats
import (
"context"
"sync/atomic"
"time"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
"cdr.dev/slog"
agentproto "github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/prometheusmetrics"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/util/slice"
"github.com/coder/coder/v2/coderd/workspaceapps"
"github.com/coder/coder/v2/codersdk"
)
type StatsBatcher interface {
Add(now time.Time, agentID uuid.UUID, templateID uuid.UUID, userID uuid.UUID, workspaceID uuid.UUID, st *agentproto.Stats) error
}
type ReporterOptions struct {
Database database.Store
Logger slog.Logger
Pubsub pubsub.Pubsub
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
StatsBatcher StatsBatcher
UpdateAgentMetricsFn func(ctx context.Context, labels prometheusmetrics.AgentMetricLabels, metrics []*agentproto.Stats_Metric)
AppStatBatchSize int
}
type Reporter struct {
opts ReporterOptions
}
func NewReporter(opts ReporterOptions) *Reporter {
return &Reporter{opts: opts}
}
func (r *Reporter) ReportAppStats(ctx context.Context, stats []workspaceapps.StatsReport) error {
err := r.opts.Database.InTx(func(tx database.Store) error {
maxBatchSize := r.opts.AppStatBatchSize
if len(stats) < maxBatchSize {
maxBatchSize = len(stats)
}
batch := database.InsertWorkspaceAppStatsParams{
UserID: make([]uuid.UUID, 0, maxBatchSize),
WorkspaceID: make([]uuid.UUID, 0, maxBatchSize),
AgentID: make([]uuid.UUID, 0, maxBatchSize),
AccessMethod: make([]string, 0, maxBatchSize),
SlugOrPort: make([]string, 0, maxBatchSize),
SessionID: make([]uuid.UUID, 0, maxBatchSize),
SessionStartedAt: make([]time.Time, 0, maxBatchSize),
SessionEndedAt: make([]time.Time, 0, maxBatchSize),
Requests: make([]int32, 0, maxBatchSize),
}
for _, stat := range stats {
batch.UserID = append(batch.UserID, stat.UserID)
batch.WorkspaceID = append(batch.WorkspaceID, stat.WorkspaceID)
batch.AgentID = append(batch.AgentID, stat.AgentID)
batch.AccessMethod = append(batch.AccessMethod, string(stat.AccessMethod))
batch.SlugOrPort = append(batch.SlugOrPort, stat.SlugOrPort)
batch.SessionID = append(batch.SessionID, stat.SessionID)
batch.SessionStartedAt = append(batch.SessionStartedAt, stat.SessionStartedAt)
batch.SessionEndedAt = append(batch.SessionEndedAt, stat.SessionEndedAt)
batch.Requests = append(batch.Requests, int32(stat.Requests))
if len(batch.UserID) >= r.opts.AppStatBatchSize {
err := tx.InsertWorkspaceAppStats(ctx, batch)
if err != nil {
return err
}
// Reset batch.
batch.UserID = batch.UserID[:0]
batch.WorkspaceID = batch.WorkspaceID[:0]
batch.AgentID = batch.AgentID[:0]
batch.AccessMethod = batch.AccessMethod[:0]
batch.SlugOrPort = batch.SlugOrPort[:0]
batch.SessionID = batch.SessionID[:0]
batch.SessionStartedAt = batch.SessionStartedAt[:0]
batch.SessionEndedAt = batch.SessionEndedAt[:0]
batch.Requests = batch.Requests[:0]
}
}
if len(batch.UserID) == 0 {
return nil
}
if err := tx.InsertWorkspaceAppStats(ctx, batch); err != nil {
return err
}
// TODO: We currently measure workspace usage based on when we get stats from it.
// There are currently two paths for this:
// 1) From SSH -> workspace agent stats POSTed from agent
// 2) From workspace apps / rpty -> workspace app stats (from coderd / wsproxy)
// Ideally we would have a single code path for this.
uniqueIDs := slice.Unique(batch.WorkspaceID)
if err := tx.BatchUpdateWorkspaceLastUsedAt(ctx, database.BatchUpdateWorkspaceLastUsedAtParams{
IDs: uniqueIDs,
LastUsedAt: dbtime.Now(), // This isn't 100% accurate, but it's good enough.
}); err != nil {
return err
}
return nil
}, nil)
if err != nil {
return xerrors.Errorf("insert workspace app stats failed: %w", err)
}
return nil
}
func (r *Reporter) ReportAgentStats(ctx context.Context, now time.Time, workspace database.Workspace, workspaceAgent database.WorkspaceAgent, templateName string, stats *agentproto.Stats) error {
if stats.ConnectionCount > 0 {
var nextAutostart time.Time
if workspace.AutostartSchedule.String != "" {
templateSchedule, err := (*(r.opts.TemplateScheduleStore.Load())).Get(ctx, r.opts.Database, workspace.TemplateID)
// If the template schedule fails to load, just default to bumping
// without the next transition and log it.
if err != nil {
r.opts.Logger.Error(ctx, "failed to load template schedule bumping activity, defaulting to bumping by 60min",
slog.F("workspace_id", workspace.ID),
slog.F("template_id", workspace.TemplateID),
slog.Error(err),
)
} else {
next, allowed := schedule.NextAutostart(now, workspace.AutostartSchedule.String, templateSchedule)
if allowed {
nextAutostart = next
}
}
}
ActivityBumpWorkspace(ctx, r.opts.Logger.Named("activity_bump"), r.opts.Database, workspace.ID, nextAutostart)
}
var errGroup errgroup.Group
errGroup.Go(func() error {
err := r.opts.StatsBatcher.Add(now, workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, stats)
if err != nil {
r.opts.Logger.Error(ctx, "add agent stats to batcher", slog.Error(err))
return xerrors.Errorf("insert workspace agent stats batch: %w", err)
}
return nil
})
errGroup.Go(func() error {
err := r.opts.Database.UpdateWorkspaceLastUsedAt(ctx, database.UpdateWorkspaceLastUsedAtParams{
ID: workspace.ID,
LastUsedAt: now,
})
if err != nil {
return xerrors.Errorf("update workspace LastUsedAt: %w", err)
}
return nil
})
if r.opts.UpdateAgentMetricsFn != nil {
errGroup.Go(func() error {
user, err := r.opts.Database.GetUserByID(ctx, workspace.OwnerID)
if err != nil {
return xerrors.Errorf("get user: %w", err)
}
r.opts.UpdateAgentMetricsFn(ctx, prometheusmetrics.AgentMetricLabels{
Username: user.Username,
WorkspaceName: workspace.Name,
AgentName: workspaceAgent.Name,
TemplateName: templateName,
}, stats.Metrics)
return nil
})
}
err := errGroup.Wait()
if err != nil {
return xerrors.Errorf("update stats in database: %w", err)
}
err = r.opts.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspace.ID), []byte{})
if err != nil {
r.opts.Logger.Warn(ctx, "failed to publish workspace agent stats",
slog.F("workspace_id", workspace.ID), slog.Error(err))
}
return nil
}