From 3b9b06fe5aa647467953ac8378b7cc962873422c Mon Sep 17 00:00:00 2001 From: Mathias Fredriksson Date: Thu, 22 Jun 2023 23:28:59 +0300 Subject: [PATCH] feat(codersdk/agentsdk): add `StartupLogsSender` and `StartupLogsWriter` (#8129) This commit adds two new `agentsdk` functions, `StartupLogsSender` and `StartupLogsWriter` that can be used by any client looking to send startup logs to coderd. We also refactor the `agent` to use these new functions. As a bonus, agent startup logs are separated into "info" and "error" levels to separate stdout and stderr. --------- Co-authored-by: Marcin Tojek --- agent/agent.go | 172 +-------- codersdk/agentsdk/logs.go | 227 +++++++++++ codersdk/agentsdk/logs_test.go | 363 ++++++++++++++++++ .../workspaceAgentLogsXService.ts | 3 +- 4 files changed, 610 insertions(+), 155 deletions(-) create mode 100644 codersdk/agentsdk/logs.go create mode 100644 codersdk/agentsdk/logs_test.go diff --git a/agent/agent.go b/agent/agent.go index 7bcdcbf532..b1218190bb 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1,7 +1,6 @@ package agent import ( - "bufio" "bytes" "context" "encoding/binary" @@ -863,26 +862,30 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er } cmd := cmdPty.AsExec() - var writer io.Writer = fileWriter + var stdout, stderr io.Writer = fileWriter, fileWriter if lifecycle == "startup" { - // Create pipes for startup logs reader and writer - logsReader, logsWriter := io.Pipe() + send, flushAndClose := agentsdk.StartupLogsSender(a.client.PatchStartupLogs, logger) + // If ctx is canceled here (or in a writer below), we may be + // discarding logs, but that's okay because we're shutting down + // anyway. We could consider creating a new context here if we + // want better control over flush during shutdown. defer func() { - _ = logsReader.Close() - }() - writer = io.MultiWriter(fileWriter, logsWriter) - flushedLogs, err := a.trackScriptLogs(ctx, logsReader) - if err != nil { - return xerrors.Errorf("track %s script logs: %w", lifecycle, err) - } - defer func() { - _ = logsWriter.Close() - <-flushedLogs + if err := flushAndClose(ctx); err != nil { + logger.Warn(ctx, "flush startup logs failed", slog.Error(err)) + } }() + + infoW := agentsdk.StartupLogsWriter(ctx, send, codersdk.LogLevelInfo) + defer infoW.Close() + errW := agentsdk.StartupLogsWriter(ctx, send, codersdk.LogLevelError) + defer errW.Close() + + stdout = io.MultiWriter(fileWriter, infoW) + stderr = io.MultiWriter(fileWriter, errW) } - cmd.Stdout = writer - cmd.Stderr = writer + cmd.Stdout = stdout + cmd.Stderr = stderr start := time.Now() defer func() { @@ -913,143 +916,6 @@ func (a *agent) runScript(ctx context.Context, lifecycle, script string) (err er return nil } -func (a *agent) trackScriptLogs(ctx context.Context, reader io.ReadCloser) (chan struct{}, error) { - // Synchronous sender, there can only be one outbound send at a time. - // - // It's important that we either flush or drop all logs before returning - // because the startup state is reported after flush. - sendDone := make(chan struct{}) - send := make(chan []agentsdk.StartupLog, 1) - go func() { - // Set flushTimeout and backlogLimit so that logs are uploaded - // once every 250ms or when 100 logs have been added to the - // backlog, whichever comes first. - flushTimeout := 250 * time.Millisecond - backlogLimit := 100 - - flush := time.NewTicker(flushTimeout) - - var backlog []agentsdk.StartupLog - defer func() { - flush.Stop() - _ = reader.Close() // Ensure read routine is closed. - if len(backlog) > 0 { - a.logger.Debug(ctx, "track script logs sender exiting, discarding logs", slog.F("discarded_logs_count", len(backlog))) - } - a.logger.Debug(ctx, "track script logs sender exited") - close(sendDone) - }() - - done := false - for { - flushed := false - select { - case <-ctx.Done(): - return - case <-a.closed: - return - // Close (!ok) can be triggered by the reader closing due to - // EOF or due to agent closing, when this happens we attempt - // a final flush. If the context is canceled this will be a - // no-op. - case logs, ok := <-send: - done = !ok - if ok { - backlog = append(backlog, logs...) - flushed = len(backlog) >= backlogLimit - } - case <-flush.C: - flushed = true - } - - if (done || flushed) && len(backlog) > 0 { - flush.Stop() // Lower the chance of a double flush. - - // Retry uploading logs until successful or a specific - // error occurs. - for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); { - err := a.client.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{ - Logs: backlog, - }) - if err == nil { - break - } - - if errors.Is(err, context.Canceled) { - return - } - var sdkErr *codersdk.Error - if errors.As(err, &sdkErr) { - if sdkErr.StatusCode() == http.StatusRequestEntityTooLarge { - a.logger.Warn(ctx, "startup logs too large, dropping logs") - break - } - } - a.logger.Error(ctx, "upload startup logs failed", slog.Error(err), slog.F("to_send", backlog)) - } - if ctx.Err() != nil { - return - } - backlog = nil - - // Anchor flush to the last log upload. - flush.Reset(flushTimeout) - } - if done { - return - } - } - }() - - // Forward read lines to the sender or queue them for when the - // sender is ready to process them. - // - // We only need to track this goroutine since it will ensure that - // the sender has closed before returning. - logsDone := make(chan struct{}) - err := a.trackConnGoroutine(func() { - defer func() { - close(send) - <-sendDone - a.logger.Debug(ctx, "track script logs reader exited") - close(logsDone) - }() - - var queue []agentsdk.StartupLog - - s := bufio.NewScanner(reader) - for s.Scan() { - select { - case <-ctx.Done(): - return - case <-a.closed: - return - case queue = <-send: - // Not captured by sender yet, re-use. - default: - } - - queue = append(queue, agentsdk.StartupLog{ - CreatedAt: database.Now(), - Output: s.Text(), - }) - send <- queue - queue = nil - } - if err := s.Err(); err != nil { - a.logger.Warn(ctx, "scan startup logs ended unexpectedly", slog.Error(err)) - } - }) - if err != nil { - close(send) - <-sendDone - close(logsDone) - return logsDone, err - } - - return logsDone, nil -} - func (a *agent) handleReconnectingPTY(ctx context.Context, logger slog.Logger, msg codersdk.WorkspaceAgentReconnectingPTYInit, conn net.Conn) (retErr error) { defer conn.Close() a.metrics.connectionsTotal.Add(1) diff --git a/codersdk/agentsdk/logs.go b/codersdk/agentsdk/logs.go new file mode 100644 index 0000000000..5eb2d33be0 --- /dev/null +++ b/codersdk/agentsdk/logs.go @@ -0,0 +1,227 @@ +package agentsdk + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "time" + + "golang.org/x/xerrors" + + "cdr.dev/slog" + "github.com/coder/coder/codersdk" + "github.com/coder/retry" +) + +type startupLogsWriter struct { + buf bytes.Buffer // Buffer to track partial lines. + ctx context.Context + send func(ctx context.Context, log ...StartupLog) error + level codersdk.LogLevel +} + +func (w *startupLogsWriter) Write(p []byte) (int, error) { + n := len(p) + for len(p) > 0 { + nl := bytes.IndexByte(p, '\n') + if nl == -1 { + break + } + cr := 0 + if nl > 0 && p[nl-1] == '\r' { + cr = 1 + } + + var partial []byte + if w.buf.Len() > 0 { + partial = w.buf.Bytes() + w.buf.Reset() + } + err := w.send(w.ctx, StartupLog{ + CreatedAt: time.Now().UTC(), // UTC, like database.Now(). + Level: w.level, + Output: string(partial) + string(p[:nl-cr]), + }) + if err != nil { + return n - len(p), err + } + p = p[nl+1:] + } + if len(p) > 0 { + _, err := w.buf.Write(p) + if err != nil { + return n - len(p), err + } + } + return n, nil +} + +func (w *startupLogsWriter) Close() error { + if w.buf.Len() > 0 { + defer w.buf.Reset() + return w.send(w.ctx, StartupLog{ + CreatedAt: time.Now().UTC(), // UTC, like database.Now(). + Level: w.level, + Output: w.buf.String(), + }) + } + return nil +} + +// StartupLogsWriter returns an io.WriteCloser that sends logs via the +// provided sender. The sender is expected to be non-blocking. Calling +// Close flushes any remaining partially written log lines but is +// otherwise no-op. If the context passed to StartupLogsWriter is +// canceled, any remaining logs will be discarded. +// +// Neither Write nor Close is safe for concurrent use and must be used +// by a single goroutine. +func StartupLogsWriter(ctx context.Context, sender func(ctx context.Context, log ...StartupLog) error, level codersdk.LogLevel) io.WriteCloser { + return &startupLogsWriter{ + ctx: ctx, + send: sender, + level: level, + } +} + +// SendStartupLogs will send agent startup logs to the server. Calls to +// sendLog are non-blocking and will return an error if flushAndClose +// has been called. Calling sendLog concurrently is not supported. If +// the context passed to flushAndClose is canceled, any remaining logs +// will be discarded. +func StartupLogsSender(patchStartupLogs func(ctx context.Context, req PatchStartupLogs) error, logger slog.Logger) (sendLog func(ctx context.Context, log ...StartupLog) error, flushAndClose func(context.Context) error) { + // The main context is used to close the sender goroutine and cancel + // any outbound requests to the API. The shudown context is used to + // signal the sender goroutine to flush logs and then exit. + ctx, cancel := context.WithCancel(context.Background()) + shutdownCtx, shutdown := context.WithCancel(ctx) + + // Synchronous sender, there can only be one outbound send at a time. + sendDone := make(chan struct{}) + send := make(chan []StartupLog, 1) + go func() { + // Set flushTimeout and backlogLimit so that logs are uploaded + // once every 250ms or when 100 logs have been added to the + // backlog, whichever comes first. + flushTimeout := 250 * time.Millisecond + backlogLimit := 100 + + flush := time.NewTicker(flushTimeout) + + var backlog []StartupLog + defer func() { + flush.Stop() + if len(backlog) > 0 { + logger.Warn(ctx, "startup logs sender exiting early, discarding logs", slog.F("discarded_logs_count", len(backlog))) + } + logger.Debug(ctx, "startup logs sender exited") + close(sendDone) + }() + + done := false + for { + flushed := false + select { + case <-ctx.Done(): + return + case <-shutdownCtx.Done(): + done = true + + // Check queued logs before flushing. + select { + case logs := <-send: + backlog = append(backlog, logs...) + default: + } + case <-flush.C: + flushed = true + case logs := <-send: + backlog = append(backlog, logs...) + flushed = len(backlog) >= backlogLimit + } + + if (done || flushed) && len(backlog) > 0 { + flush.Stop() // Lower the chance of a double flush. + + // Retry uploading logs until successful or a specific + // error occurs. Note that we use the main context here, + // meaning these requests won't be interrupted by + // shutdown. + for r := retry.New(time.Second, 5*time.Second); r.Wait(ctx); { + err := patchStartupLogs(ctx, PatchStartupLogs{ + Logs: backlog, + }) + if err == nil { + break + } + + if errors.Is(err, context.Canceled) { + return + } + // This error is expected to be codersdk.Error, but it has + // private fields so we can't fake it in tests. + var statusErr interface{ StatusCode() int } + if errors.As(err, &statusErr) { + if statusErr.StatusCode() == http.StatusRequestEntityTooLarge { + logger.Warn(ctx, "startup logs too large, discarding logs", slog.F("discarded_logs_count", len(backlog)), slog.Error(err)) + break + } + } + logger.Error(ctx, "startup logs sender failed to upload logs, retrying later", slog.F("logs_count", len(backlog)), slog.Error(err)) + } + if ctx.Err() != nil { + return + } + backlog = nil + + // Anchor flush to the last log upload. + flush.Reset(flushTimeout) + } + if done { + return + } + } + }() + + var queue []StartupLog + sendLog = func(callCtx context.Context, log ...StartupLog) error { + select { + case <-shutdownCtx.Done(): + return xerrors.Errorf("closed: %w", shutdownCtx.Err()) + case <-callCtx.Done(): + return callCtx.Err() + case queue = <-send: + // Recheck to give priority to context cancellation. + select { + case <-shutdownCtx.Done(): + return xerrors.Errorf("closed: %w", shutdownCtx.Err()) + case <-callCtx.Done(): + return callCtx.Err() + default: + } + // Queue has not been captured by sender yet, re-use. + default: + } + + queue = append(queue, log...) + send <- queue // Non-blocking. + queue = nil + + return nil + } + flushAndClose = func(callCtx context.Context) error { + defer cancel() + shutdown() + select { + case <-sendDone: + return nil + case <-callCtx.Done(): + cancel() + <-sendDone + return callCtx.Err() + } + } + return sendLog, flushAndClose +} diff --git a/codersdk/agentsdk/logs_test.go b/codersdk/agentsdk/logs_test.go new file mode 100644 index 0000000000..f14e20a8df --- /dev/null +++ b/codersdk/agentsdk/logs_test.go @@ -0,0 +1,363 @@ +package agentsdk_test + +import ( + "context" + "fmt" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" + + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + "github.com/coder/coder/codersdk" + "github.com/coder/coder/codersdk/agentsdk" + "github.com/coder/coder/testutil" +) + +func TestStartupLogsWriter_Write(t *testing.T) { + t.Parallel() + + canceledCtx, cancel := context.WithCancel(context.Background()) + cancel() + + tests := []struct { + name string + ctx context.Context + level codersdk.LogLevel + writes []string + want []agentsdk.StartupLog + wantErr bool + closeFirst bool + }{ + { + name: "single line", + ctx: context.Background(), + level: codersdk.LogLevelInfo, + writes: []string{"hello world\n"}, + want: []agentsdk.StartupLog{ + { + Level: codersdk.LogLevelInfo, + Output: "hello world", + }, + }, + }, + { + name: "multiple lines", + ctx: context.Background(), + level: codersdk.LogLevelInfo, + writes: []string{"hello world\n", "goodbye world\n"}, + want: []agentsdk.StartupLog{ + { + Level: codersdk.LogLevelInfo, + Output: "hello world", + }, + { + Level: codersdk.LogLevelInfo, + Output: "goodbye world", + }, + }, + }, + { + name: "multiple newlines", + ctx: context.Background(), + level: codersdk.LogLevelInfo, + writes: []string{"\n\n", "hello world\n\n\n", "goodbye world\n"}, + want: []agentsdk.StartupLog{ + { + Level: codersdk.LogLevelInfo, + Output: "", + }, + { + Level: codersdk.LogLevelInfo, + Output: "", + }, + { + Level: codersdk.LogLevelInfo, + Output: "hello world", + }, + { + Level: codersdk.LogLevelInfo, + Output: "", + }, + { + Level: codersdk.LogLevelInfo, + Output: "", + }, + { + Level: codersdk.LogLevelInfo, + Output: "goodbye world", + }, + }, + }, + { + name: "multiple lines with partial", + ctx: context.Background(), + level: codersdk.LogLevelInfo, + writes: []string{"hello world\n", "goodbye world"}, + want: []agentsdk.StartupLog{ + { + Level: codersdk.LogLevelInfo, + Output: "hello world", + }, + }, + }, + { + name: "multiple lines with partial, close flushes", + ctx: context.Background(), + level: codersdk.LogLevelInfo, + writes: []string{"hello world\n", "goodbye world"}, + closeFirst: true, + want: []agentsdk.StartupLog{ + { + Level: codersdk.LogLevelInfo, + Output: "hello world", + }, + { + Level: codersdk.LogLevelInfo, + Output: "goodbye world", + }, + }, + }, + { + name: "multiple lines with partial in middle", + ctx: context.Background(), + level: codersdk.LogLevelInfo, + writes: []string{"hello world\n", "goodbye", " world\n"}, + want: []agentsdk.StartupLog{ + { + Level: codersdk.LogLevelInfo, + Output: "hello world", + }, + { + Level: codersdk.LogLevelInfo, + Output: "goodbye world", + }, + }, + }, + { + name: "removes carriage return when grouped with newline", + ctx: context.Background(), + level: codersdk.LogLevelInfo, + writes: []string{"hello world\r\n", "\r\r\n", "goodbye world\n"}, + want: []agentsdk.StartupLog{ + { + Level: codersdk.LogLevelInfo, + Output: "hello world", + }, + { + Level: codersdk.LogLevelInfo, + Output: "\r", + }, + { + Level: codersdk.LogLevelInfo, + Output: "goodbye world", + }, + }, + }, + { + name: "cancel context", + ctx: canceledCtx, + writes: []string{ + "hello world\n", + }, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + var got []agentsdk.StartupLog + send := func(ctx context.Context, log ...agentsdk.StartupLog) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + got = append(got, log...) + return nil + } + w := agentsdk.StartupLogsWriter(tt.ctx, send, tt.level) + for _, s := range tt.writes { + _, err := w.Write([]byte(s)) + if err != nil { + if tt.wantErr { + return + } + t.Errorf("startupLogsWriter.Write() error = %v, wantErr %v", err, tt.wantErr) + } + } + + if tt.closeFirst { + err := w.Close() + if err != nil { + t.Errorf("startupLogsWriter.Close() error = %v", err) + return + } + } + + // Compare got and want, but ignore the CreatedAt field. + for i := range got { + got[i].CreatedAt = tt.want[i].CreatedAt + } + require.Equal(t, tt.want, got) + + err := w.Close() + if !tt.closeFirst && (err != nil) != tt.wantErr { + t.Errorf("startupLogsWriter.Close() error = %v, wantErr %v", err, tt.wantErr) + return + } + }) + } +} + +type statusError int + +func (s statusError) StatusCode() int { + return int(s) +} + +func (s statusError) Error() string { + return fmt.Sprintf("status %d", s) +} + +func TestStartupLogsSender(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + sendCount int + discard []int + patchResp func(req agentsdk.PatchStartupLogs) error + }{ + { + name: "single log", + sendCount: 1, + }, + { + name: "multiple logs", + sendCount: 995, + }, + { + name: "too large", + sendCount: 1, + discard: []int{1}, + patchResp: func(req agentsdk.PatchStartupLogs) error { + return statusError(http.StatusRequestEntityTooLarge) + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitMedium) + defer cancel() + + got := []agentsdk.StartupLog{} + patchStartupLogs := func(_ context.Context, req agentsdk.PatchStartupLogs) error { + if tt.patchResp != nil { + err := tt.patchResp(req) + if err != nil { + return err + } + } + got = append(got, req.Logs...) + return nil + } + + sendLog, flushAndClose := agentsdk.StartupLogsSender(patchStartupLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug)) + defer func() { + err := flushAndClose(ctx) + require.NoError(t, err) + }() + + var want []agentsdk.StartupLog + for i := 0; i < tt.sendCount; i++ { + want = append(want, agentsdk.StartupLog{ + CreatedAt: time.Now(), + Level: codersdk.LogLevelInfo, + Output: fmt.Sprintf("hello world %d", i), + }) + err := sendLog(ctx, want[len(want)-1]) + require.NoError(t, err) + } + + err := flushAndClose(ctx) + require.NoError(t, err) + + for _, di := range tt.discard { + want = slices.Delete(want, di-1, di) + } + + require.Equal(t, want, got) + }) + } + + t.Run("context canceled during send", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) + defer cancel() + + patchStartupLogs := func(_ context.Context, _ agentsdk.PatchStartupLogs) error { + assert.Fail(t, "should not be called") + return nil + } + + sendLog, flushAndClose := agentsdk.StartupLogsSender(patchStartupLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug)) + defer func() { + _ = flushAndClose(ctx) + }() + + cancel() + err := sendLog(ctx, agentsdk.StartupLog{ + CreatedAt: time.Now(), + Level: codersdk.LogLevelInfo, + Output: "hello world", + }) + require.Error(t, err) + + ctx, cancel = context.WithTimeout(context.Background(), testutil.WaitShort) + defer cancel() + err = flushAndClose(ctx) + require.NoError(t, err) + }) + + t.Run("context canceled during flush", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort) + defer cancel() + + var want, got []agentsdk.StartupLog + patchStartupLogs := func(_ context.Context, req agentsdk.PatchStartupLogs) error { + got = append(got, req.Logs...) + return nil + } + + sendLog, flushAndClose := agentsdk.StartupLogsSender(patchStartupLogs, slogtest.Make(t, nil).Leveled(slog.LevelDebug)) + defer func() { + _ = flushAndClose(ctx) + }() + + err := sendLog(ctx, agentsdk.StartupLog{ + CreatedAt: time.Now(), + Level: codersdk.LogLevelInfo, + Output: "hello world", + }) + require.NoError(t, err) + + cancel() + err = flushAndClose(ctx) + require.Error(t, err) + + require.Equal(t, want, got) + }) +} diff --git a/site/src/xServices/workspaceAgentLogs/workspaceAgentLogsXService.ts b/site/src/xServices/workspaceAgentLogs/workspaceAgentLogsXService.ts index 37ca026533..188f628ff7 100644 --- a/site/src/xServices/workspaceAgentLogs/workspaceAgentLogsXService.ts +++ b/site/src/xServices/workspaceAgentLogs/workspaceAgentLogsXService.ts @@ -1,6 +1,5 @@ import * as API from "api/api" import { createMachine, assign } from "xstate" -import * as TypesGen from "api/typesGenerated" import { Line } from "components/Logs/Logs" // Logs are stored as the Line interface to make rendering @@ -97,7 +96,7 @@ export const workspaceAgentLogsMachine = createMachine( type: "ADD_STARTUP_LOGS", logs: logs.map((log) => ({ id: log.id, - level: "info" as TypesGen.LogLevel, + level: log.level || "info", output: log.output, time: log.created_at, })),