mirror of
https://github.com/coder/coder.git
synced 2025-07-09 11:45:56 +00:00
fix(coderd/batchstats): fix init race and close flush (#9248)
This commit is contained in:
committed by
GitHub
parent
31ffb566d0
commit
ed2b1236c0
@ -105,6 +105,8 @@ func New(ctx context.Context, opts ...Option) (*Batcher, func(), error) {
|
|||||||
b.tickCh = b.ticker.C
|
b.tickCh = b.ticker.C
|
||||||
}
|
}
|
||||||
|
|
||||||
|
b.initBuf(b.batchSize)
|
||||||
|
|
||||||
cancelCtx, cancelFunc := context.WithCancel(ctx)
|
cancelCtx, cancelFunc := context.WithCancel(ctx)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
@ -172,7 +174,6 @@ func (b *Batcher) Add(
|
|||||||
|
|
||||||
// Run runs the batcher.
|
// Run runs the batcher.
|
||||||
func (b *Batcher) run(ctx context.Context) {
|
func (b *Batcher) run(ctx context.Context) {
|
||||||
b.initBuf(b.batchSize)
|
|
||||||
// nolint:gocritic // This is only ever used for one thing - inserting agent stats.
|
// nolint:gocritic // This is only ever used for one thing - inserting agent stats.
|
||||||
authCtx := dbauthz.AsSystemRestricted(ctx)
|
authCtx := dbauthz.AsSystemRestricted(ctx)
|
||||||
for {
|
for {
|
||||||
@ -184,7 +185,13 @@ func (b *Batcher) run(ctx context.Context) {
|
|||||||
b.flush(authCtx, true, "reaching capacity")
|
b.flush(authCtx, true, "reaching capacity")
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
b.log.Debug(ctx, "context done, flushing before exit")
|
b.log.Debug(ctx, "context done, flushing before exit")
|
||||||
b.flush(authCtx, true, "exit")
|
|
||||||
|
// We must create a new context here as the parent context is done.
|
||||||
|
ctxTimeout, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
|
defer cancel() //nolint:revive // We're returning, defer is fine.
|
||||||
|
|
||||||
|
// nolint:gocritic // This is only ever used for one thing - inserting agent stats.
|
||||||
|
b.flush(dbauthz.AsSystemRestricted(ctxTimeout), true, "exit")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,7 @@ func TestBatchStats(t *testing.T) {
|
|||||||
deps1 := setupDeps(t, store)
|
deps1 := setupDeps(t, store)
|
||||||
deps2 := setupDeps(t, store)
|
deps2 := setupDeps(t, store)
|
||||||
tick := make(chan time.Time)
|
tick := make(chan time.Time)
|
||||||
flushed := make(chan int)
|
flushed := make(chan int, 1)
|
||||||
|
|
||||||
b, closer, err := New(ctx,
|
b, closer, err := New(ctx,
|
||||||
WithStore(store),
|
WithStore(store),
|
||||||
|
Reference in New Issue
Block a user