fix(codersdk/agentsdk): improve ctx cancel in agent logs flush, fix test (#10214)

Fixes #9719
Related #9865
This commit is contained in:
Mathias Fredriksson
2023-10-11 15:42:30 +03:00
committed by GitHub
parent a1ee4d44aa
commit a2cd6640f3
2 changed files with 31 additions and 9 deletions

View File

@ -90,12 +90,31 @@ func LogsWriter(ctx context.Context, sender func(ctx context.Context, log ...Log
}
}
// LogsSenderFlushTimeout changes the default flush timeout (250ms),
// this is mostly useful for tests.
func LogsSenderFlushTimeout(timeout time.Duration) func(*logsSenderOptions) {
return func(o *logsSenderOptions) {
o.flushTimeout = timeout
}
}
type logsSenderOptions struct {
flushTimeout time.Duration
}
// LogsSender will send agent startup logs to the server. Calls to
// sendLog are non-blocking and will return an error if flushAndClose
// has been called. Calling sendLog concurrently is not supported. If
// the context passed to flushAndClose is canceled, any remaining logs
// will be discarded.
func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) {
func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger, opts ...func(*logsSenderOptions)) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) {
o := logsSenderOptions{
flushTimeout: 250 * time.Millisecond,
}
for _, opt := range opts {
opt(&o)
}
// The main context is used to close the sender goroutine and cancel
// any outbound requests to the API. The shutdown context is used to
// signal the sender goroutine to flush logs and then exit.
@ -109,10 +128,9 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
// Set flushTimeout and backlogLimit so that logs are uploaded
// once every 250ms or when 100 logs have been added to the
// backlog, whichever comes first.
flushTimeout := 250 * time.Millisecond
backlogLimit := 100
flush := time.NewTicker(flushTimeout)
flush := time.NewTicker(o.flushTimeout)
var backlog []Log
defer func() {
@ -153,8 +171,9 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
// error occurs. Note that we use the main context here,
// meaning these requests won't be interrupted by
// shutdown.
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx) && ctx.Err() == nil; {
err := patchLogs(ctx, PatchLogs{
var err error
for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); {
err = patchLogs(ctx, PatchLogs{
Logs: backlog,
LogSourceID: sourceID,
})
@ -163,7 +182,7 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
}
if errors.Is(err, context.Canceled) {
return
break
}
// This error is expected to be codersdk.Error, but it has
// private fields so we can't fake it in tests.
@ -171,18 +190,19 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
if errors.As(err, &statusErr) {
if statusErr.StatusCode() == http.StatusRequestEntityTooLarge {
logger.Warn(ctx, "startup logs too large, discarding logs", slog.F("discarded_logs_count", len(backlog)), slog.Error(err))
err = nil
break
}
}
logger.Error(ctx, "startup logs sender failed to upload logs, retrying later", slog.F("logs_count", len(backlog)), slog.Error(err))
}
if ctx.Err() != nil {
if err != nil {
return
}
backlog = nil
// Anchor flush to the last log upload.
flush.Reset(flushTimeout)
flush.Reset(o.flushTimeout)
}
if done {
return

View File

@ -344,7 +344,9 @@ func TestStartupLogsSender(t *testing.T) {
return nil
}
sendLog, flushAndClose := agentsdk.LogsSender(uuid.New(), patchLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug))
// Prevent race between auto-flush and context cancellation with
// a really long timeout.
sendLog, flushAndClose := agentsdk.LogsSender(uuid.New(), patchLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug), agentsdk.LogsSenderFlushTimeout(time.Hour))
defer func() {
_ = flushAndClose(ctx)
}()