chore: move LogSender to agentsdk (#12158)

Moves the LogSender to agentsdk and deprecates LogsSender based on the v1 API.
This commit is contained in:
Spike Curtis
2024-02-20 10:44:20 +04:00
committed by GitHub
parent 9861830e87
commit 081e37d7d9
3 changed files with 310 additions and 291 deletions

View File

@ -6,17 +6,31 @@ import (
"errors"
"io"
"net/http"
"sync"
"time"
"golang.org/x/xerrors"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/google/uuid"
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/retry"
)
const (
flushInterval = time.Second
maxBytesPerBatch = 1 << 20 // 1MiB
overheadPerLog = 21 // found by testing
// maxBytesQueued is the maximum length of logs we will queue in memory. The number is taken
// from dump.sql `max_logs_length` constraint, as there is no point queuing more logs than we'll
// accept in the database.
maxBytesQueued = 1048576
)
type startupLogsWriter struct {
buf bytes.Buffer // Buffer to track partial lines.
ctx context.Context
@ -107,6 +121,8 @@ type logsSenderOptions struct {
// has been called. Calling sendLog concurrently is not supported. If
// the context passed to flushAndClose is canceled, any remaining logs
// will be discarded.
//
// Deprecated: Use NewLogSender instead, based on the v2 Agent API.
func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req PatchLogs) error, logger slog.Logger, opts ...func(*logsSenderOptions)) (sendLog func(ctx context.Context, log ...Log) error, flushAndClose func(context.Context) error) {
o := logsSenderOptions{
flushTimeout: 250 * time.Millisecond,
@ -250,3 +266,248 @@ func LogsSender(sourceID uuid.UUID, patchLogs func(ctx context.Context, req Patc
}
return sendLog, flushAndClose
}
type logQueue struct {
logs []*proto.Log
flushRequested bool
lastFlush time.Time
}
// LogSender is a component that handles enqueuing logs and then sending them over the agent API.
// Things that need to log call Enqueue and Flush. When the agent API becomes available, call
// SendLoop to send pending logs.
type LogSender struct {
*sync.Cond
queues map[uuid.UUID]*logQueue
logger slog.Logger
exceededLogLimit bool
outputLen int
}
type logDest interface {
BatchCreateLogs(ctx context.Context, request *proto.BatchCreateLogsRequest) (*proto.BatchCreateLogsResponse, error)
}
func NewLogSender(logger slog.Logger) *LogSender {
return &LogSender{
Cond: sync.NewCond(&sync.Mutex{}),
logger: logger,
queues: make(map[uuid.UUID]*logQueue),
}
}
func (l *LogSender) Enqueue(src uuid.UUID, logs ...Log) {
logger := l.logger.With(slog.F("log_source_id", src))
if len(logs) == 0 {
logger.Debug(context.Background(), "enqueue called with no logs")
return
}
l.L.Lock()
defer l.L.Unlock()
if l.exceededLogLimit {
logger.Warn(context.Background(), "dropping enqueued logs because we have reached the server limit")
// don't error, as we also write to file and don't want the overall write to fail
return
}
defer l.Broadcast()
q, ok := l.queues[src]
if !ok {
q = &logQueue{}
l.queues[src] = q
}
for k, log := range logs {
// Here we check the queue size before adding a log because we want to queue up slightly
// more logs than the database would store to ensure we trigger "logs truncated" at the
// database layer. Otherwise, the end user wouldn't know logs are truncated unless they
// examined the Coder agent logs.
if l.outputLen > maxBytesQueued {
logger.Warn(context.Background(), "log queue full; truncating new logs", slog.F("new_logs", k), slog.F("queued_logs", len(q.logs)))
return
}
pl, err := ProtoFromLog(log)
if err != nil {
logger.Critical(context.Background(), "failed to convert log", slog.Error(err))
pl = &proto.Log{
CreatedAt: timestamppb.Now(),
Level: proto.Log_ERROR,
Output: "**Coder Internal Error**: Failed to convert log",
}
}
if len(pl.Output)+overheadPerLog > maxBytesPerBatch {
logger.Warn(context.Background(), "dropping log line that exceeds our limit", slog.F("len", len(pl.Output)))
continue
}
q.logs = append(q.logs, pl)
l.outputLen += len(pl.Output)
}
logger.Debug(context.Background(), "enqueued agent logs", slog.F("new_logs", len(logs)), slog.F("queued_logs", len(q.logs)))
}
func (l *LogSender) Flush(src uuid.UUID) {
l.L.Lock()
defer l.L.Unlock()
defer l.Broadcast()
q, ok := l.queues[src]
if ok {
q.flushRequested = true
}
// queue might not exist because it's already been flushed and removed from
// the map.
}
var LogLimitExceededError = xerrors.New("Log limit exceeded")
// SendLoop sends any pending logs until it hits an error or the context is canceled. It does not
// retry as it is expected that a higher layer retries establishing connection to the agent API and
// calls SendLoop again.
func (l *LogSender) SendLoop(ctx context.Context, dest logDest) error {
l.L.Lock()
defer l.L.Unlock()
if l.exceededLogLimit {
l.logger.Debug(ctx, "aborting SendLoop because log limit is already exceeded")
return LogLimitExceededError
}
ctxDone := false
defer l.logger.Debug(ctx, "log sender send loop exiting")
// wake 4 times per Flush interval to check if anything needs to be flushed
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
tkr := time.NewTicker(flushInterval / 4)
defer tkr.Stop()
for {
select {
// also monitor the context here, so we notice immediately, rather
// than waiting for the next tick or logs
case <-ctx.Done():
l.L.Lock()
ctxDone = true
l.L.Unlock()
l.Broadcast()
return
case <-tkr.C:
l.Broadcast()
}
}
}()
for {
for !ctxDone && !l.hasPendingWorkLocked() {
l.Wait()
}
if ctxDone {
return ctx.Err()
}
src, q := l.getPendingWorkLocked()
logger := l.logger.With(slog.F("log_source_id", src))
q.flushRequested = false // clear flag since we're now flushing
req := &proto.BatchCreateLogsRequest{
LogSourceId: src[:],
}
// outputToSend keeps track of the size of the protobuf message we send, while
// outputToRemove keeps track of the size of the output we'll remove from the queues on
// success. They are different because outputToSend also counts protocol message overheads.
outputToSend := 0
outputToRemove := 0
n := 0
for n < len(q.logs) {
log := q.logs[n]
outputToSend += len(log.Output) + overheadPerLog
if outputToSend > maxBytesPerBatch {
break
}
req.Logs = append(req.Logs, log)
n++
outputToRemove += len(log.Output)
}
l.L.Unlock()
logger.Debug(ctx, "sending logs to agent API", slog.F("num_logs", len(req.Logs)))
resp, err := dest.BatchCreateLogs(ctx, req)
l.L.Lock()
if err != nil {
return xerrors.Errorf("failed to upload logs: %w", err)
}
if resp.LogLimitExceeded {
l.logger.Warn(ctx, "server log limit exceeded; logs truncated")
l.exceededLogLimit = true
// no point in keeping anything we have queued around, server will not accept them
l.queues = make(map[uuid.UUID]*logQueue)
return LogLimitExceededError
}
// Since elsewhere we only append to the logs, here we can remove them
// since we successfully sent them. First we nil the pointers though,
// so that they can be gc'd.
for i := 0; i < n; i++ {
q.logs[i] = nil
}
q.logs = q.logs[n:]
l.outputLen -= outputToRemove
if len(q.logs) == 0 {
// no empty queues
delete(l.queues, src)
continue
}
q.lastFlush = time.Now()
}
}
func (l *LogSender) hasPendingWorkLocked() bool {
for _, q := range l.queues {
if time.Since(q.lastFlush) > flushInterval {
return true
}
if q.flushRequested {
return true
}
}
return false
}
func (l *LogSender) getPendingWorkLocked() (src uuid.UUID, q *logQueue) {
// take the one it's been the longest since we've flushed, so that we have some sense of
// fairness across sources
var earliestFlush time.Time
for is, iq := range l.queues {
if q == nil || iq.lastFlush.Before(earliestFlush) {
src = is
q = iq
earliestFlush = iq.lastFlush
}
}
return src, q
}
func (l *LogSender) GetScriptLogger(logSourceID uuid.UUID) ScriptLogger {
return ScriptLogger{srcID: logSourceID, sender: l}
}
type ScriptLogger struct {
sender *LogSender
srcID uuid.UUID
}
func (s ScriptLogger) Send(ctx context.Context, log ...Log) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
s.sender.Enqueue(s.srcID, log...)
return nil
}
}
func (s ScriptLogger) Flush(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
s.sender.Flush(s.srcID)
return nil
}
}