diff --git a/cli/cliui/agent.go b/cli/cliui/agent.go index acbbca9bef..fcaa1de54a 100644 --- a/cli/cliui/agent.go +++ b/cli/cliui/agent.go @@ -137,26 +137,44 @@ func Agent(ctx context.Context, writer io.Writer, agentID uuid.UUID, opts AgentO } defer logsCloser.Close() + var lastLog codersdk.WorkspaceAgentStartupLog + fetchedAgentWhileFollowing := fetchedAgent + if !follow { + fetchedAgentWhileFollowing = nil + } for { // This select is essentially and inline `fetch()`. select { case <-ctx.Done(): return ctx.Err() - case f := <-fetchedAgent: + case f := <-fetchedAgentWhileFollowing: if f.err != nil { return xerrors.Errorf("fetch: %w", f.err) } - // We could handle changes in the agent status here, like - // if the agent becomes disconnected, we may want to stop. - // But for now, we'll just keep going, hopefully the agent - // will reconnect and update its status. agent = f.agent + + // If the agent is no longer starting, stop following + // logs because FetchLogs will keep streaming forever. + // We do one last non-follow request to ensure we have + // fetched all logs. + if !agent.LifecycleState.Starting() { + _ = logsCloser.Close() + fetchedAgentWhileFollowing = nil + + logStream, logsCloser, err = opts.FetchLogs(ctx, agent.ID, lastLog.ID, false) + if err != nil { + return xerrors.Errorf("fetch workspace agent startup logs: %w", err) + } + // Logs are already primed, so we can call close. + _ = logsCloser.Close() + } case logs, ok := <-logStream: if !ok { return nil } for _, log := range logs { sw.Log(log.CreatedAt, log.Level, log.Output) + lastLog = log } } } diff --git a/cli/cliui/agent_test.go b/cli/cliui/agent_test.go index c08ba163ea..e0e0485778 100644 --- a/cli/cliui/agent_test.go +++ b/cli/cliui/agent_test.go @@ -46,7 +46,6 @@ func TestAgent(t *testing.T) { func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error { agent.Status = codersdk.WorkspaceAgentConnected agent.FirstConnectedAt = ptr.Ref(time.Now()) - close(logs) return nil }, }, @@ -79,7 +78,6 @@ func TestAgent(t *testing.T) { agent.FirstConnectedAt = ptr.Ref(time.Now()) agent.LifecycleState = codersdk.WorkspaceAgentLifecycleReady agent.ReadyAt = ptr.Ref(time.Now()) - close(logs) return nil }, }, @@ -113,10 +111,6 @@ func TestAgent(t *testing.T) { agent.LastConnectedAt = ptr.Ref(time.Now()) return nil }, - func(_ context.Context, _ *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error { - close(logs) - return nil - }, }, want: []string{ "⧗ The workspace agent lost connection", @@ -154,7 +148,6 @@ func TestAgent(t *testing.T) { Output: "Bye now", }, } - close(logs) return nil }, }, @@ -184,7 +177,6 @@ func TestAgent(t *testing.T) { Output: "Hello world", }, } - close(logs) return nil }, }, @@ -205,7 +197,6 @@ func TestAgent(t *testing.T) { func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error { agent.Status = codersdk.WorkspaceAgentDisconnected agent.LifecycleState = codersdk.WorkspaceAgentLifecycleOff - close(logs) return nil }, }, @@ -234,7 +225,6 @@ func TestAgent(t *testing.T) { func(_ context.Context, agent *codersdk.WorkspaceAgent, logs chan []codersdk.WorkspaceAgentStartupLog) error { agent.ReadyAt = ptr.Ref(time.Now()) agent.LifecycleState = codersdk.WorkspaceAgentLifecycleShuttingDown - close(logs) return nil }, }, @@ -316,8 +306,21 @@ func TestAgent(t *testing.T) { } return agent, err } - tc.opts.FetchLogs = func(_ context.Context, _ uuid.UUID, _ int64, _ bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) { - return logs, closeFunc(func() error { return nil }), nil + tc.opts.FetchLogs = func(ctx context.Context, _ uuid.UUID, _ int64, follow bool) (<-chan []codersdk.WorkspaceAgentStartupLog, io.Closer, error) { + if follow { + return logs, closeFunc(func() error { return nil }), nil + } + + fetchLogs := make(chan []codersdk.WorkspaceAgentStartupLog, 1) + select { + case <-ctx.Done(): + return nil, nil, ctx.Err() + case l := <-logs: + fetchLogs <- l + default: + } + close(fetchLogs) + return fetchLogs, closeFunc(func() error { return nil }), nil } err := cliui.Agent(inv.Context(), &buf, uuid.Nil, tc.opts) return err diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 48c97bbd11..f8d5e10f62 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -280,81 +280,61 @@ func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.R level = append(level, parsedLevel) } - var logs []database.WorkspaceAgentStartupLog - // Ensure logs are not written after script ended. - scriptEndedError := xerrors.New("startup script has ended") - err := api.Database.InTx(func(db database.Store) error { - state, err := db.GetWorkspaceAgentLifecycleStateByID(ctx, workspaceAgent.ID) - if err != nil { - return xerrors.Errorf("workspace agent startup script status: %w", err) - } - - if state.ReadyAt.Valid { - // The agent startup script has already ended, so we don't want to - // process any more logs. - return scriptEndedError - } - - logs, err = db.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{ - AgentID: workspaceAgent.ID, - CreatedAt: createdAt, - Output: output, - Level: level, - OutputLength: int32(outputLength), - }) - return err - }, nil) + logs, err := api.Database.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{ + AgentID: workspaceAgent.ID, + CreatedAt: createdAt, + Output: output, + Level: level, + OutputLength: int32(outputLength), + }) if err != nil { - if errors.Is(err, scriptEndedError) { - httpapi.Write(ctx, rw, http.StatusConflict, codersdk.Response{ - Message: "Failed to upload logs, startup script has already ended.", + if !database.IsStartupLogsLimitError(err) { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Failed to upload startup logs", Detail: err.Error(), }) return } - if database.IsStartupLogsLimitError(err) { - if !workspaceAgent.StartupLogsOverflowed { - err := api.Database.UpdateWorkspaceAgentStartupLogOverflowByID(ctx, database.UpdateWorkspaceAgentStartupLogOverflowByIDParams{ - ID: workspaceAgent.ID, - StartupLogsOverflowed: true, - }) - if err != nil { - // We don't want to return here, because the agent will retry - // on failure and this isn't a huge deal. The overflow state - // is just a hint to the user that the logs are incomplete. - api.Logger.Warn(ctx, "failed to update workspace agent startup log overflow", slog.Error(err)) - } - - resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID) - if err != nil { - httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ - Message: "Failed to get workspace resource.", - Detail: err.Error(), - }) - return - } - - build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID) - if err != nil { - httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ - Message: "Internal error fetching workspace build job.", - Detail: err.Error(), - }) - return - } - - api.publishWorkspaceUpdate(ctx, build.WorkspaceID) - } - + if workspaceAgent.StartupLogsOverflowed { httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{ Message: "Startup logs limit exceeded", Detail: err.Error(), }) return } - httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ - Message: "Failed to upload startup logs", - Detail: err.Error(), + err := api.Database.UpdateWorkspaceAgentStartupLogOverflowByID(ctx, database.UpdateWorkspaceAgentStartupLogOverflowByIDParams{ + ID: workspaceAgent.ID, + StartupLogsOverflowed: true, + }) + if err != nil { + // We don't want to return here, because the agent will retry + // on failure and this isn't a huge deal. The overflow state + // is just a hint to the user that the logs are incomplete. + api.Logger.Warn(ctx, "failed to update workspace agent startup log overflow", slog.Error(err)) + } + + resource, err := api.Database.GetWorkspaceResourceByID(ctx, workspaceAgent.ResourceID) + if err != nil { + httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ + Message: "Failed to get workspace resource.", + Detail: err.Error(), + }) + return + } + + build, err := api.Database.GetWorkspaceBuildByJobID(ctx, resource.JobID) + if err != nil { + httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{ + Message: "Internal error fetching workspace build job.", + Detail: err.Error(), + }) + return + } + + api.publishWorkspaceUpdate(ctx, build.WorkspaceID) + + httpapi.Write(ctx, rw, http.StatusRequestEntityTooLarge, codersdk.Response{ + Message: "Startup logs limit exceeded", }) return } @@ -497,18 +477,6 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques return } - if workspaceAgent.ReadyAt.Valid { - // Fast path, the startup script has finished running, so we can close - // the connection. - return - } - if !codersdk.WorkspaceAgentLifecycle(workspaceAgent.LifecycleState).Starting() { - // Backwards compatibility: Avoid waiting forever in case this agent is - // older than the current release and has already reported the ready - // state. - return - } - lastSentLogID := after if len(logs) > 0 { lastSentLogID = logs[len(logs)-1].ID @@ -543,11 +511,9 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques t := time.NewTicker(recheckInterval) defer t.Stop() - var state database.GetWorkspaceAgentLifecycleStateByIDRow go func() { defer close(bufferedLogs) - var err error for { select { case <-ctx.Done(): @@ -557,17 +523,6 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques t.Reset(recheckInterval) } - if !state.ReadyAt.Valid { - state, err = api.Database.GetWorkspaceAgentLifecycleStateByID(ctx, workspaceAgent.ID) - if err != nil { - if xerrors.Is(err, context.Canceled) { - return - } - logger.Warn(ctx, "failed to get workspace agent lifecycle state", slog.Error(err)) - continue - } - } - logs, err := api.Database.GetWorkspaceAgentStartupLogsAfter(ctx, database.GetWorkspaceAgentStartupLogsAfterParams{ AgentID: workspaceAgent.ID, CreatedAfter: lastSentLogID, @@ -580,9 +535,7 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques continue } if len(logs) == 0 { - if state.ReadyAt.Valid { - return - } + // Just keep listening - more logs might come in the future! continue } @@ -1689,12 +1642,6 @@ func (api *API) workspaceAgentReportLifecycle(rw http.ResponseWriter, r *http.Re return } - if readyAt.Valid { - api.publishWorkspaceAgentStartupLogsUpdate(ctx, workspaceAgent.ID, agentsdk.StartupLogsNotifyMessage{ - EndOfLogs: true, - }) - } - api.publishWorkspaceUpdate(ctx, workspace.ID) httpapi.Write(ctx, rw, http.StatusNoContent, nil) diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index 95d75d1681..6afec803bb 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -301,130 +301,6 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) { } } }) - t.Run("CloseAfterLifecycleStateIsNotRunning", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitMedium) - client := coderdtest.New(t, &coderdtest.Options{ - IncludeProvisionerDaemon: true, - }) - user := coderdtest.CreateFirstUser(t, client) - authToken := uuid.NewString() - version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ - Parse: echo.ParseComplete, - ProvisionPlan: echo.ProvisionComplete, - ProvisionApply: []*proto.Provision_Response{{ - Type: &proto.Provision_Response_Complete{ - Complete: &proto.Provision_Complete{ - Resources: []*proto.Resource{{ - Name: "example", - Type: "aws_instance", - Agents: []*proto.Agent{{ - Id: uuid.NewString(), - Auth: &proto.Agent_Token{ - Token: authToken, - }, - }}, - }}, - }, - }, - }}, - }) - template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) - coderdtest.AwaitTemplateVersionJob(t, client, version.ID) - workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) - build := coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID) - - agentClient := agentsdk.New(client.URL) - agentClient.SetSessionToken(authToken) - - logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0, true) - require.NoError(t, err) - defer func() { - _ = closer.Close() - }() - - err = agentClient.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{ - Logs: []agentsdk.StartupLog{ - { - CreatedAt: database.Now(), - Output: "testing", - }, - }, - }) - require.NoError(t, err) - - err = agentClient.PostLifecycle(ctx, agentsdk.PostLifecycleRequest{ - State: codersdk.WorkspaceAgentLifecycleReady, - ChangedAt: time.Now(), - }) - require.NoError(t, err) - - var gotLogs []codersdk.WorkspaceAgentStartupLog - for { - select { - case <-ctx.Done(): - require.Fail(t, "timed out waiting for logs to end") - case l, ok := <-logs: - gotLogs = append(gotLogs, l...) - if !ok { - require.Len(t, gotLogs, 1, "expected one log") - return // Success. - } - } - } - }) - t.Run("NoLogAfterScriptEnded", func(t *testing.T) { - t.Parallel() - ctx := testutil.Context(t, testutil.WaitMedium) - client := coderdtest.New(t, &coderdtest.Options{ - IncludeProvisionerDaemon: true, - }) - user := coderdtest.CreateFirstUser(t, client) - authToken := uuid.NewString() - version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{ - Parse: echo.ParseComplete, - ProvisionPlan: echo.ProvisionComplete, - ProvisionApply: []*proto.Provision_Response{{ - Type: &proto.Provision_Response_Complete{ - Complete: &proto.Provision_Complete{ - Resources: []*proto.Resource{{ - Name: "example", - Type: "aws_instance", - Agents: []*proto.Agent{{ - Id: uuid.NewString(), - Auth: &proto.Agent_Token{ - Token: authToken, - }, - }}, - }}, - }, - }, - }}, - }) - template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID) - coderdtest.AwaitTemplateVersionJob(t, client, version.ID) - workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID) - _ = coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID) - - agentClient := agentsdk.New(client.URL) - agentClient.SetSessionToken(authToken) - - err := agentClient.PostLifecycle(ctx, agentsdk.PostLifecycleRequest{ - State: codersdk.WorkspaceAgentLifecycleReady, - ChangedAt: time.Now(), - }) - require.NoError(t, err) - - err = agentClient.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{ - Logs: []agentsdk.StartupLog{ - { - CreatedAt: database.Now(), - Output: "testing", - }, - }, - }) - require.Error(t, err, "insert after script ended should not succeed") - }) } func TestWorkspaceAgentListen(t *testing.T) { diff --git a/codersdk/agentsdk/agentsdk.go b/codersdk/agentsdk/agentsdk.go index bf150cd849..6c60cd2303 100644 --- a/codersdk/agentsdk/agentsdk.go +++ b/codersdk/agentsdk/agentsdk.go @@ -694,7 +694,6 @@ func StartupLogsNotifyChannel(agentID uuid.UUID) string { type StartupLogsNotifyMessage struct { CreatedAfter int64 `json:"created_after"` - EndOfLogs bool `json:"end_of_logs"` } type closeNetConn struct {