feat: add startup script logs to the ui (#6558)

* Add startup script logs to the database

* Add coderd endpoints for startup script logs

* Push startup script logs from agent

* Pull startup script logs on frontend

* Rename queries

* Add constraint

* Start creating log sending loop

* Add log sending to the agent

* Add tests for streaming logs

* Shorten notify channel name

* Add FE

* Improve bulk log performance

* Finish UI display

* Fix startup log visibility

* Add warning for overflow

* Fix agent queue logs overflow

* Display staartup logs in a virtual DOM for performance

* Fix agent queue with loads of logs

* Fix authorize test

* Remove faulty test

* Fix startup and shutdown reporting error

* Fix gen

* Fix comments

* Periodically purge old database entries

* Add test fixture for migration

* Add Storybook

* Check if there are logs when displaying features

* Fix startup component overflow gap

* Fix startup log wrapping

---------

Co-authored-by: Asher <ash@coder.com>
This commit is contained in:
Kyle Carberry
2023-03-23 14:09:13 -05:00
committed by GitHub
parent a6fa8cac58
commit cb7375450b
57 changed files with 2513 additions and 353 deletions

View File

@ -15,6 +15,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
@ -216,6 +217,318 @@ func (api *API) postWorkspaceAgentStartup(rw http.ResponseWriter, r *http.Reques
httpapi.Write(ctx, rw, http.StatusOK, nil)
}
// @Summary Patch workspace agent startup logs
// @ID patch-workspace-agent-startup-logs
// @Security CoderSessionToken
// @Accept json
// @Produce json
// @Tags Agents
// @Param request body agentsdk.PatchStartupLogs true "Startup logs"
// @Success 200 {object} codersdk.Response
// @Router /workspaceagents/me/startup-logs [patch]
// @x-apidocgen {"skip": true}
func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
workspaceAgent := httpmw.WorkspaceAgent(r)
var req agentsdk.PatchStartupLogs
if !httpapi.Read(ctx, rw, r, &req) {
return
}
if len(req.Logs) == 0 {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "No logs provided.",
})
return
}
createdAt := make([]time.Time, 0)
output := make([]string, 0)
outputLength := 0
for _, log := range req.Logs {
createdAt = append(createdAt, log.CreatedAt)
output = append(output, log.Output)
outputLength += len(log.Output)
}
logs, err := api.Database.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{
AgentID: workspaceAgent.ID,
CreatedAt: createdAt,
Output: output,
OutputLength: int32(outputLength),
})
if err != nil {
if database.IsStartupLogsLimitError(err) {
if !workspaceAgent.StartupLogsOverflowed {
err := api.Database.UpdateWorkspaceAgentStartupLogOverflowByID(ctx, database.UpdateWorkspaceAgentStartupLogOverflowByIDParams{
ID: workspaceAgent.ID,
StartupLogsOverflowed: true,
})
if err != nil {
// We don't want to return here, because the agent will retry
// on failure and this isn't a huge deal. The overflow state
// is just a hint to the user that the logs are incomplete.
api.Logger.Warn(ctx, "failed to update workspace agent startup log overflow", slog.Error(err))
}
resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID)
if err != nil {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "Failed to get workspace resource.",
Detail: err.Error(),
})
return
}
build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
if err != nil {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "Internal error fetching workspace build job.",
Detail: err.Error(),
})
return
}
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
}
httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{
Message: "Startup logs limit exceeded",
Detail: err.Error(),
})
return
}
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Failed to upload startup logs",
Detail: err.Error(),
})
return
}
if workspaceAgent.StartupLogsLength == 0 {
// If these are the first logs being appended, we publish a UI update
// to notify the UI that logs are now available.
resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID)
if err != nil {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "Failed to get workspace resource.",
Detail: err.Error(),
})
return
}
build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID)
if err != nil {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "Internal error fetching workspace build job.",
Detail: err.Error(),
})
return
}
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
}
lowestID := logs[0].ID
// Publish by the lowest log ID inserted so the
// log stream will fetch everything from that point.
data, err := json.Marshal(agentsdk.StartupLogsNotifyMessage{
CreatedAfter: lowestID - 1,
})
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Failed to marshal startup logs notify message",
Detail: err.Error(),
})
return
}
err = api.Pubsub.Publish(agentsdk.StartupLogsNotifyChannel(workspaceAgent.ID), data)
if err != nil {
// We don't want to return an error to the agent here,
// otherwise it might try to reinsert the logs.
api.Logger.Warn(ctx, "failed to publish startup logs notify message", slog.Error(err))
}
httpapi.Write(ctx, rw, http.StatusOK, nil)
}
// workspaceAgentStartupLogs returns the logs sent from a workspace agent
// during startup.
//
// @Summary Get startup logs by workspace agent
// @ID get-startup-logs-by-workspace-agent
// @Security CoderSessionToken
// @Produce json
// @Tags Agents
// @Param workspaceagent path string true "Workspace agent ID" format(uuid)
// @Param before query int false "Before log id"
// @Param after query int false "After log id"
// @Param follow query bool false "Follow log stream"
// @Success 200 {array} codersdk.WorkspaceAgentStartupLog
// @Router /workspaceagents/{workspaceagent}/startup-logs [get]
func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Request) {
// This mostly copies how provisioner job logs are streamed!
var (
ctx = r.Context()
workspaceAgent = httpmw.WorkspaceAgentParam(r)
workspace = httpmw.WorkspaceParam(r)
logger = api.Logger.With(slog.F("workspace_agent_id", workspaceAgent.ID))
follow = r.URL.Query().Has("follow")
afterRaw = r.URL.Query().Get("after")
)
if !api.Authorize(r, rbac.ActionRead, workspace) {
httpapi.ResourceNotFound(rw)
return
}
var after int64
// Only fetch logs created after the time provided.
if afterRaw != "" {
var err error
after, err = strconv.ParseInt(afterRaw, 10, 64)
if err != nil {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "Query param \"after\" must be an integer.",
Validations: []codersdk.ValidationError{
{Field: "after", Detail: "Must be an integer"},
},
})
return
}
}
logs, err := api.Database.GetWorkspaceAgentStartupLogsAfter(ctx, database.GetWorkspaceAgentStartupLogsAfterParams{
AgentID: workspaceAgent.ID,
CreatedAfter: after,
})
if errors.Is(err, sql.ErrNoRows) {
err = nil
}
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner logs.",
Detail: err.Error(),
})
return
}
if logs == nil {
logs = []database.WorkspaceAgentStartupLog{}
}
if !follow {
logger.Debug(ctx, "Finished non-follow job logs")
httpapi.Write(ctx, rw, http.StatusOK, convertWorkspaceAgentStartupLogs(logs))
return
}
api.WebsocketWaitMutex.Lock()
api.WebsocketWaitGroup.Add(1)
api.WebsocketWaitMutex.Unlock()
defer api.WebsocketWaitGroup.Done()
conn, err := websocket.Accept(rw, r, nil)
if err != nil {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "Failed to accept websocket.",
Detail: err.Error(),
})
return
}
go httpapi.Heartbeat(ctx, conn)
ctx, wsNetConn := websocketNetConn(ctx, conn, websocket.MessageText)
defer wsNetConn.Close() // Also closes conn.
// The Go stdlib JSON encoder appends a newline character after message write.
encoder := json.NewEncoder(wsNetConn)
err = encoder.Encode(convertWorkspaceAgentStartupLogs(logs))
if err != nil {
return
}
if workspaceAgent.LifecycleState == database.WorkspaceAgentLifecycleStateReady {
// The startup script has finished running, so we can close the connection.
return
}
var (
bufferedLogs = make(chan []database.WorkspaceAgentStartupLog, 128)
endOfLogs atomic.Bool
lastSentLogID atomic.Int64
)
sendLogs := func(logs []database.WorkspaceAgentStartupLog) {
select {
case bufferedLogs <- logs:
lastSentLogID.Store(logs[len(logs)-1].ID)
default:
logger.Warn(ctx, "workspace agent startup log overflowing channel")
}
}
closeSubscribe, err := api.Pubsub.Subscribe(
agentsdk.StartupLogsNotifyChannel(workspaceAgent.ID),
func(ctx context.Context, message []byte) {
if endOfLogs.Load() {
return
}
jlMsg := agentsdk.StartupLogsNotifyMessage{}
err := json.Unmarshal(message, &jlMsg)
if err != nil {
logger.Warn(ctx, "invalid startup logs notify message", slog.Error(err))
return
}
if jlMsg.CreatedAfter != 0 {
logs, err := api.Database.GetWorkspaceAgentStartupLogsAfter(ctx, database.GetWorkspaceAgentStartupLogsAfterParams{
AgentID: workspaceAgent.ID,
CreatedAfter: jlMsg.CreatedAfter,
})
if err != nil {
logger.Warn(ctx, "failed to get workspace agent startup logs after", slog.Error(err))
return
}
sendLogs(logs)
}
if jlMsg.EndOfLogs {
endOfLogs.Store(true)
logs, err := api.Database.GetWorkspaceAgentStartupLogsAfter(ctx, database.GetWorkspaceAgentStartupLogsAfterParams{
AgentID: workspaceAgent.ID,
CreatedAfter: lastSentLogID.Load(),
})
if err != nil {
logger.Warn(ctx, "get workspace agent startup logs after", slog.Error(err))
return
}
sendLogs(logs)
bufferedLogs <- nil
}
},
)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Failed to subscribe to startup logs.",
Detail: err.Error(),
})
return
}
defer closeSubscribe()
for {
select {
case <-ctx.Done():
logger.Debug(context.Background(), "job logs context canceled")
return
case logs, ok := <-bufferedLogs:
// A nil log is sent when complete!
if !ok || logs == nil {
logger.Debug(context.Background(), "reached the end of published logs")
return
}
err = encoder.Encode(convertWorkspaceAgentStartupLogs(logs))
if err != nil {
return
}
}
}
}
// workspaceAgentPTY spawns a PTY and pipes it over a WebSocket.
// This is used for the web terminal.
//
@ -851,6 +1164,8 @@ func convertWorkspaceAgent(derpMap *tailcfg.DERPMap, coordinator tailnet.Coordin
Architecture: dbAgent.Architecture,
OperatingSystem: dbAgent.OperatingSystem,
StartupScript: dbAgent.StartupScript.String,
StartupLogsLength: dbAgent.StartupLogsLength,
StartupLogsOverflowed: dbAgent.StartupLogsOverflowed,
Version: dbAgent.Version,
EnvironmentVariables: envs,
Directory: dbAgent.Directory,
@ -1525,3 +1840,19 @@ func websocketNetConn(ctx context.Context, conn *websocket.Conn, msgType websock
Conn: nc,
}
}
func convertWorkspaceAgentStartupLogs(logs []database.WorkspaceAgentStartupLog) []codersdk.WorkspaceAgentStartupLog {
sdk := make([]codersdk.WorkspaceAgentStartupLog, 0, len(logs))
for _, log := range logs {
sdk = append(sdk, convertWorkspaceAgentStartupLog(log))
}
return sdk
}
func convertWorkspaceAgentStartupLog(log database.WorkspaceAgentStartupLog) codersdk.WorkspaceAgentStartupLog {
return codersdk.WorkspaceAgentStartupLog{
ID: log.ID,
CreatedAt: log.CreatedAt,
Output: log.Output,
}
}