mirror of
https://github.com/coder/coder.git
synced 2025-07-15 22:20:27 +00:00
chore: fix TestBatchStats
flake (#8952)
This commit is contained in:
@ -125,6 +125,7 @@ func New(ctx context.Context, opts ...Option) (*Batcher, func(), error) {
|
|||||||
|
|
||||||
// Add adds a stat to the batcher for the given workspace and agent.
|
// Add adds a stat to the batcher for the given workspace and agent.
|
||||||
func (b *Batcher) Add(
|
func (b *Batcher) Add(
|
||||||
|
now time.Time,
|
||||||
agentID uuid.UUID,
|
agentID uuid.UUID,
|
||||||
templateID uuid.UUID,
|
templateID uuid.UUID,
|
||||||
userID uuid.UUID,
|
userID uuid.UUID,
|
||||||
@ -134,7 +135,7 @@ func (b *Batcher) Add(
|
|||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
defer b.mu.Unlock()
|
defer b.mu.Unlock()
|
||||||
|
|
||||||
now := database.Now()
|
now = database.Time(now)
|
||||||
|
|
||||||
b.buf.ID = append(b.buf.ID, uuid.New())
|
b.buf.ID = append(b.buf.ID, uuid.New())
|
||||||
b.buf.CreatedAt = append(b.buf.CreatedAt, now)
|
b.buf.CreatedAt = append(b.buf.CreatedAt, now)
|
||||||
@ -198,15 +199,6 @@ func (b *Batcher) flush(ctx context.Context, forced bool, reason string) {
|
|||||||
defer func() {
|
defer func() {
|
||||||
b.flushForced.Store(false)
|
b.flushForced.Store(false)
|
||||||
b.mu.Unlock()
|
b.mu.Unlock()
|
||||||
// Notify that a flush has completed. This only happens in tests.
|
|
||||||
if b.flushed != nil {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
close(b.flushed)
|
|
||||||
default:
|
|
||||||
b.flushed <- count
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if count > 0 {
|
if count > 0 {
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
b.log.Debug(ctx, "flush complete",
|
b.log.Debug(ctx, "flush complete",
|
||||||
@ -216,6 +208,15 @@ func (b *Batcher) flush(ctx context.Context, forced bool, reason string) {
|
|||||||
slog.F("reason", reason),
|
slog.F("reason", reason),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
// Notify that a flush has completed. This only happens in tests.
|
||||||
|
if b.flushed != nil {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
close(b.flushed)
|
||||||
|
default:
|
||||||
|
b.flushed <- count
|
||||||
|
}
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if len(b.buf.ID) == 0 {
|
if len(b.buf.ID) == 0 {
|
||||||
|
@ -46,7 +46,7 @@ func TestBatchStats(t *testing.T) {
|
|||||||
|
|
||||||
// Given: no data points are added for workspace
|
// Given: no data points are added for workspace
|
||||||
// When: it becomes time to report stats
|
// When: it becomes time to report stats
|
||||||
t1 := time.Now()
|
t1 := database.Now()
|
||||||
// Signal a tick and wait for a flush to complete.
|
// Signal a tick and wait for a flush to complete.
|
||||||
tick <- t1
|
tick <- t1
|
||||||
f := <-flushed
|
f := <-flushed
|
||||||
@ -59,9 +59,9 @@ func TestBatchStats(t *testing.T) {
|
|||||||
require.Empty(t, stats, "should have no stats for workspace")
|
require.Empty(t, stats, "should have no stats for workspace")
|
||||||
|
|
||||||
// Given: a single data point is added for workspace
|
// Given: a single data point is added for workspace
|
||||||
t2 := time.Now()
|
t2 := t1.Add(time.Second)
|
||||||
t.Logf("inserting 1 stat")
|
t.Logf("inserting 1 stat")
|
||||||
require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t)))
|
require.NoError(t, b.Add(t2.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t)))
|
||||||
|
|
||||||
// When: it becomes time to report stats
|
// When: it becomes time to report stats
|
||||||
// Signal a tick and wait for a flush to complete.
|
// Signal a tick and wait for a flush to complete.
|
||||||
@ -77,7 +77,7 @@ func TestBatchStats(t *testing.T) {
|
|||||||
|
|
||||||
// Given: a lot of data points are added for both workspaces
|
// Given: a lot of data points are added for both workspaces
|
||||||
// (equal to batch size)
|
// (equal to batch size)
|
||||||
t3 := time.Now()
|
t3 := t2.Add(time.Second)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -85,9 +85,9 @@ func TestBatchStats(t *testing.T) {
|
|||||||
t.Logf("inserting %d stats", defaultBufferSize)
|
t.Logf("inserting %d stats", defaultBufferSize)
|
||||||
for i := 0; i < defaultBufferSize; i++ {
|
for i := 0; i < defaultBufferSize; i++ {
|
||||||
if i%2 == 0 {
|
if i%2 == 0 {
|
||||||
require.NoError(t, b.Add(deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t)))
|
require.NoError(t, b.Add(t3.Add(time.Millisecond), deps1.Agent.ID, deps1.User.ID, deps1.Template.ID, deps1.Workspace.ID, randAgentSDKStats(t)))
|
||||||
} else {
|
} else {
|
||||||
require.NoError(t, b.Add(deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t)))
|
require.NoError(t, b.Add(t3.Add(time.Millisecond), deps2.Agent.ID, deps2.User.ID, deps2.Template.ID, deps2.Workspace.ID, randAgentSDKStats(t)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -105,7 +105,7 @@ func TestBatchStats(t *testing.T) {
|
|||||||
require.Len(t, stats, 2, "should have stats for both workspaces")
|
require.Len(t, stats, 2, "should have stats for both workspaces")
|
||||||
|
|
||||||
// Ensures that a subsequent flush pushes all the remaining data
|
// Ensures that a subsequent flush pushes all the remaining data
|
||||||
t4 := time.Now()
|
t4 := t3.Add(time.Second)
|
||||||
tick <- t4
|
tick <- t4
|
||||||
f2 := <-flushed
|
f2 := <-flushed
|
||||||
t.Logf("flush 4 completed")
|
t.Logf("flush 4 completed")
|
||||||
@ -113,7 +113,7 @@ func TestBatchStats(t *testing.T) {
|
|||||||
require.Equal(t, expectedCount, f2, "did not flush expected remaining rows")
|
require.Equal(t, expectedCount, f2, "did not flush expected remaining rows")
|
||||||
|
|
||||||
// Ensure that a subsequent flush does not push stale data.
|
// Ensure that a subsequent flush does not push stale data.
|
||||||
t5 := time.Now()
|
t5 := t4.Add(time.Second)
|
||||||
tick <- t5
|
tick <- t5
|
||||||
f = <-flushed
|
f = <-flushed
|
||||||
require.Zero(t, f, "expected zero stats to have been flushed")
|
require.Zero(t, f, "expected zero stats to have been flushed")
|
||||||
|
@ -1414,7 +1414,7 @@ func (api *API) workspaceAgentReportStats(rw http.ResponseWriter, r *http.Reques
|
|||||||
|
|
||||||
var errGroup errgroup.Group
|
var errGroup errgroup.Group
|
||||||
errGroup.Go(func() error {
|
errGroup.Go(func() error {
|
||||||
if err := api.statsBatcher.Add(workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req); err != nil {
|
if err := api.statsBatcher.Add(time.Now(), workspaceAgent.ID, workspace.TemplateID, workspace.OwnerID, workspace.ID, req); err != nil {
|
||||||
api.Logger.Error(ctx, "failed to add stats to batcher", slog.Error(err))
|
api.Logger.Error(ctx, "failed to add stats to batcher", slog.Error(err))
|
||||||
return xerrors.Errorf("can't insert workspace agent stat: %w", err)
|
return xerrors.Errorf("can't insert workspace agent stat: %w", err)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user