mirror of
https://github.com/coder/coder.git
synced 2025-07-13 21:36:50 +00:00
fix: avoid missed logs when streaming startup logs (#8029)
* feat(coderd,agent): send startup log eof at the end * fix(coderd): fix edge case in startup log pubsub * fix(coderd): ensure startup logs are closed on lifecycle state change (fallback) * fix(codersdk): fix startup log channel shared memory bug * fix(site): remove the EOF log line
This commit is contained in:
committed by
GitHub
parent
247f8a973f
commit
0c5077464b
7
coderd/apidoc/docs.go
generated
7
coderd/apidoc/docs.go
generated
@ -5856,6 +5856,9 @@ const docTemplate = `{
|
||||
"created_at": {
|
||||
"type": "string"
|
||||
},
|
||||
"eof": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"level": {
|
||||
"$ref": "#/definitions/codersdk.LogLevel"
|
||||
},
|
||||
@ -9378,6 +9381,10 @@ const docTemplate = `{
|
||||
"type": "string",
|
||||
"format": "date-time"
|
||||
},
|
||||
"eof": {
|
||||
"description": "EOF indicates that this is the last log entry and the file is closed.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"id": {
|
||||
"type": "integer"
|
||||
},
|
||||
|
7
coderd/apidoc/swagger.json
generated
7
coderd/apidoc/swagger.json
generated
@ -5171,6 +5171,9 @@
|
||||
"created_at": {
|
||||
"type": "string"
|
||||
},
|
||||
"eof": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"level": {
|
||||
"$ref": "#/definitions/codersdk.LogLevel"
|
||||
},
|
||||
@ -8474,6 +8477,10 @@
|
||||
"type": "string",
|
||||
"format": "date-time"
|
||||
},
|
||||
"eof": {
|
||||
"description": "EOF indicates that this is the last log entry and the file is closed.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"id": {
|
||||
"type": "integer"
|
||||
},
|
||||
|
@ -1407,6 +1407,14 @@ func (q *querier) GetWorkspaceAgentStartupLogsAfter(ctx context.Context, arg dat
|
||||
return q.db.GetWorkspaceAgentStartupLogsAfter(ctx, arg)
|
||||
}
|
||||
|
||||
func (q *querier) GetWorkspaceAgentStartupLogsEOF(ctx context.Context, agentID uuid.UUID) (bool, error) {
|
||||
_, err := q.GetWorkspaceAgentByID(ctx, agentID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return q.db.GetWorkspaceAgentStartupLogsEOF(ctx, agentID)
|
||||
}
|
||||
|
||||
func (q *querier) GetWorkspaceAgentStats(ctx context.Context, createdAfter time.Time) ([]database.GetWorkspaceAgentStatsRow, error) {
|
||||
return q.db.GetWorkspaceAgentStats(ctx, createdAfter)
|
||||
}
|
||||
|
@ -2722,7 +2722,7 @@ func (q *fakeQuerier) GetWorkspaceAgentStartupLogsAfter(_ context.Context, arg d
|
||||
if log.AgentID != arg.AgentID {
|
||||
continue
|
||||
}
|
||||
if arg.CreatedAfter != 0 && log.ID < arg.CreatedAfter {
|
||||
if arg.CreatedAfter != 0 && log.ID <= arg.CreatedAfter {
|
||||
continue
|
||||
}
|
||||
logs = append(logs, log)
|
||||
@ -2730,6 +2730,22 @@ func (q *fakeQuerier) GetWorkspaceAgentStartupLogsAfter(_ context.Context, arg d
|
||||
return logs, nil
|
||||
}
|
||||
|
||||
func (q *fakeQuerier) GetWorkspaceAgentStartupLogsEOF(_ context.Context, agentID uuid.UUID) (bool, error) {
|
||||
q.mutex.RLock()
|
||||
defer q.mutex.RUnlock()
|
||||
|
||||
var lastLog database.WorkspaceAgentStartupLog
|
||||
for _, log := range q.workspaceAgentLogs {
|
||||
if log.AgentID != agentID {
|
||||
continue
|
||||
}
|
||||
if log.ID > lastLog.ID {
|
||||
lastLog = log
|
||||
}
|
||||
}
|
||||
return lastLog.EOF, nil
|
||||
}
|
||||
|
||||
func (q *fakeQuerier) GetWorkspaceAgentStats(_ context.Context, createdAfter time.Time) ([]database.GetWorkspaceAgentStatsRow, error) {
|
||||
q.mutex.RLock()
|
||||
defer q.mutex.RUnlock()
|
||||
@ -4013,7 +4029,7 @@ func (q *fakeQuerier) InsertWorkspaceAgentStartupLogs(_ context.Context, arg dat
|
||||
defer q.mutex.Unlock()
|
||||
|
||||
logs := []database.WorkspaceAgentStartupLog{}
|
||||
id := int64(1)
|
||||
id := int64(0)
|
||||
if len(q.workspaceAgentLogs) > 0 {
|
||||
id = q.workspaceAgentLogs[len(q.workspaceAgentLogs)-1].ID
|
||||
}
|
||||
@ -4026,6 +4042,7 @@ func (q *fakeQuerier) InsertWorkspaceAgentStartupLogs(_ context.Context, arg dat
|
||||
CreatedAt: arg.CreatedAt[index],
|
||||
Level: arg.Level[index],
|
||||
Output: output,
|
||||
EOF: arg.EOF[index],
|
||||
})
|
||||
outputLength += int32(len(output))
|
||||
}
|
||||
|
@ -738,6 +738,13 @@ func (m metricsStore) GetWorkspaceAgentStartupLogsAfter(ctx context.Context, arg
|
||||
return logs, err
|
||||
}
|
||||
|
||||
func (m metricsStore) GetWorkspaceAgentStartupLogsEOF(ctx context.Context, agentID uuid.UUID) (bool, error) {
|
||||
start := time.Now()
|
||||
r0, r1 := m.s.GetWorkspaceAgentStartupLogsEOF(ctx, agentID)
|
||||
m.queryLatencies.WithLabelValues("GetWorkspaceAgentStartupLogsEOF").Observe(time.Since(start).Seconds())
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
func (m metricsStore) GetWorkspaceAgentStats(ctx context.Context, createdAt time.Time) ([]database.GetWorkspaceAgentStatsRow, error) {
|
||||
start := time.Now()
|
||||
stats, err := m.s.GetWorkspaceAgentStats(ctx, createdAt)
|
||||
|
@ -1453,6 +1453,21 @@ func (mr *MockStoreMockRecorder) GetWorkspaceAgentStartupLogsAfter(arg0, arg1 in
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkspaceAgentStartupLogsAfter", reflect.TypeOf((*MockStore)(nil).GetWorkspaceAgentStartupLogsAfter), arg0, arg1)
|
||||
}
|
||||
|
||||
// GetWorkspaceAgentStartupLogsEOF mocks base method.
|
||||
func (m *MockStore) GetWorkspaceAgentStartupLogsEOF(arg0 context.Context, arg1 uuid.UUID) (bool, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetWorkspaceAgentStartupLogsEOF", arg0, arg1)
|
||||
ret0, _ := ret[0].(bool)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetWorkspaceAgentStartupLogsEOF indicates an expected call of GetWorkspaceAgentStartupLogsEOF.
|
||||
func (mr *MockStoreMockRecorder) GetWorkspaceAgentStartupLogsEOF(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkspaceAgentStartupLogsEOF", reflect.TypeOf((*MockStore)(nil).GetWorkspaceAgentStartupLogsEOF), arg0, arg1)
|
||||
}
|
||||
|
||||
// GetWorkspaceAgentStats mocks base method.
|
||||
func (m *MockStore) GetWorkspaceAgentStats(arg0 context.Context, arg1 time.Time) ([]database.GetWorkspaceAgentStatsRow, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
5
coderd/database/dump.sql
generated
5
coderd/database/dump.sql
generated
@ -548,9 +548,12 @@ CREATE TABLE workspace_agent_startup_logs (
|
||||
created_at timestamp with time zone NOT NULL,
|
||||
output character varying(1024) NOT NULL,
|
||||
id bigint NOT NULL,
|
||||
level log_level DEFAULT 'info'::log_level NOT NULL
|
||||
level log_level DEFAULT 'info'::log_level NOT NULL,
|
||||
eof boolean DEFAULT false NOT NULL
|
||||
);
|
||||
|
||||
COMMENT ON COLUMN workspace_agent_startup_logs.eof IS 'End of file reached';
|
||||
|
||||
CREATE SEQUENCE workspace_agent_startup_logs_id_seq
|
||||
START WITH 1
|
||||
INCREMENT BY 1
|
||||
|
@ -0,0 +1 @@
|
||||
ALTER TABLE workspace_agent_startup_logs DROP COLUMN eof;
|
@ -0,0 +1,3 @@
|
||||
ALTER TABLE workspace_agent_startup_logs ADD COLUMN eof boolean NOT NULL DEFAULT false;
|
||||
|
||||
COMMENT ON COLUMN workspace_agent_startup_logs.eof IS 'End of file reached';
|
@ -1731,6 +1731,8 @@ type WorkspaceAgentStartupLog struct {
|
||||
Output string `db:"output" json:"output"`
|
||||
ID int64 `db:"id" json:"id"`
|
||||
Level LogLevel `db:"level" json:"level"`
|
||||
// End of file reached
|
||||
EOF bool `db:"eof" json:"eof"`
|
||||
}
|
||||
|
||||
type WorkspaceAgentStat struct {
|
||||
|
@ -127,6 +127,7 @@ type sqlcQuerier interface {
|
||||
GetWorkspaceAgentByInstanceID(ctx context.Context, authInstanceID string) (WorkspaceAgent, error)
|
||||
GetWorkspaceAgentMetadata(ctx context.Context, workspaceAgentID uuid.UUID) ([]WorkspaceAgentMetadatum, error)
|
||||
GetWorkspaceAgentStartupLogsAfter(ctx context.Context, arg GetWorkspaceAgentStartupLogsAfterParams) ([]WorkspaceAgentStartupLog, error)
|
||||
GetWorkspaceAgentStartupLogsEOF(ctx context.Context, agentID uuid.UUID) (bool, error)
|
||||
GetWorkspaceAgentStats(ctx context.Context, createdAt time.Time) ([]GetWorkspaceAgentStatsRow, error)
|
||||
GetWorkspaceAgentStatsAndLabels(ctx context.Context, createdAt time.Time) ([]GetWorkspaceAgentStatsAndLabelsRow, error)
|
||||
GetWorkspaceAgentsByResourceIDs(ctx context.Context, ids []uuid.UUID) ([]WorkspaceAgent, error)
|
||||
|
@ -114,6 +114,7 @@ func TestInsertWorkspaceAgentStartupLogs(t *testing.T) {
|
||||
CreatedAt: []time.Time{database.Now()},
|
||||
Output: []string{"first"},
|
||||
Level: []database.LogLevel{database.LogLevelInfo},
|
||||
EOF: []bool{false},
|
||||
// 1 MB is the max
|
||||
OutputLength: 1 << 20,
|
||||
})
|
||||
@ -125,6 +126,7 @@ func TestInsertWorkspaceAgentStartupLogs(t *testing.T) {
|
||||
CreatedAt: []time.Time{database.Now()},
|
||||
Output: []string{"second"},
|
||||
Level: []database.LogLevel{database.LogLevelInfo},
|
||||
EOF: []bool{false},
|
||||
OutputLength: 1,
|
||||
})
|
||||
require.True(t, database.IsStartupLogsLimitError(err))
|
||||
|
@ -5438,7 +5438,7 @@ func (q *sqlQuerier) GetWorkspaceAgentMetadata(ctx context.Context, workspaceAge
|
||||
|
||||
const getWorkspaceAgentStartupLogsAfter = `-- name: GetWorkspaceAgentStartupLogsAfter :many
|
||||
SELECT
|
||||
agent_id, created_at, output, id, level
|
||||
agent_id, created_at, output, id, level, eof
|
||||
FROM
|
||||
workspace_agent_startup_logs
|
||||
WHERE
|
||||
@ -5468,6 +5468,7 @@ func (q *sqlQuerier) GetWorkspaceAgentStartupLogsAfter(ctx context.Context, arg
|
||||
&i.Output,
|
||||
&i.ID,
|
||||
&i.Level,
|
||||
&i.EOF,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -5482,6 +5483,26 @@ func (q *sqlQuerier) GetWorkspaceAgentStartupLogsAfter(ctx context.Context, arg
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getWorkspaceAgentStartupLogsEOF = `-- name: GetWorkspaceAgentStartupLogsEOF :one
|
||||
SELECT CASE WHEN EXISTS (
|
||||
SELECT
|
||||
agent_id, created_at, output, id, level, eof
|
||||
FROM
|
||||
workspace_agent_startup_logs
|
||||
WHERE
|
||||
agent_id = $1
|
||||
AND eof = true
|
||||
LIMIT 1
|
||||
) THEN TRUE ELSE FALSE END
|
||||
`
|
||||
|
||||
func (q *sqlQuerier) GetWorkspaceAgentStartupLogsEOF(ctx context.Context, agentID uuid.UUID) (bool, error) {
|
||||
row := q.db.QueryRowContext(ctx, getWorkspaceAgentStartupLogsEOF, agentID)
|
||||
var column_1 bool
|
||||
err := row.Scan(&column_1)
|
||||
return column_1, err
|
||||
}
|
||||
|
||||
const getWorkspaceAgentsByResourceIDs = `-- name: GetWorkspaceAgentsByResourceIDs :many
|
||||
SELECT
|
||||
id, created_at, updated_at, name, first_connected_at, last_connected_at, disconnected_at, resource_id, auth_token, auth_instance_id, architecture, environment_variables, operating_system, startup_script, instance_metadata, resource_metadata, directory, version, last_connected_replica_id, connection_timeout_seconds, troubleshooting_url, motd_file, lifecycle_state, startup_script_timeout_seconds, expanded_directory, shutdown_script, shutdown_script_timeout_seconds, startup_logs_length, startup_logs_overflowed, subsystem, startup_script_behavior
|
||||
@ -5833,16 +5854,17 @@ func (q *sqlQuerier) InsertWorkspaceAgentMetadata(ctx context.Context, arg Inser
|
||||
const insertWorkspaceAgentStartupLogs = `-- name: InsertWorkspaceAgentStartupLogs :many
|
||||
WITH new_length AS (
|
||||
UPDATE workspace_agents SET
|
||||
startup_logs_length = startup_logs_length + $5 WHERE workspace_agents.id = $1
|
||||
startup_logs_length = startup_logs_length + $6 WHERE workspace_agents.id = $1
|
||||
)
|
||||
INSERT INTO
|
||||
workspace_agent_startup_logs (agent_id, created_at, output, level)
|
||||
workspace_agent_startup_logs (agent_id, created_at, output, level, eof)
|
||||
SELECT
|
||||
$1 :: uuid AS agent_id,
|
||||
unnest($2 :: timestamptz [ ]) AS created_at,
|
||||
unnest($3 :: VARCHAR(1024) [ ]) AS output,
|
||||
unnest($4 :: log_level [ ]) AS level
|
||||
RETURNING workspace_agent_startup_logs.agent_id, workspace_agent_startup_logs.created_at, workspace_agent_startup_logs.output, workspace_agent_startup_logs.id, workspace_agent_startup_logs.level
|
||||
unnest($4 :: log_level [ ]) AS level,
|
||||
unnest($5 :: boolean [ ]) AS eof
|
||||
RETURNING workspace_agent_startup_logs.agent_id, workspace_agent_startup_logs.created_at, workspace_agent_startup_logs.output, workspace_agent_startup_logs.id, workspace_agent_startup_logs.level, workspace_agent_startup_logs.eof
|
||||
`
|
||||
|
||||
type InsertWorkspaceAgentStartupLogsParams struct {
|
||||
@ -5850,6 +5872,7 @@ type InsertWorkspaceAgentStartupLogsParams struct {
|
||||
CreatedAt []time.Time `db:"created_at" json:"created_at"`
|
||||
Output []string `db:"output" json:"output"`
|
||||
Level []LogLevel `db:"level" json:"level"`
|
||||
EOF []bool `db:"eof" json:"eof"`
|
||||
OutputLength int32 `db:"output_length" json:"output_length"`
|
||||
}
|
||||
|
||||
@ -5859,6 +5882,7 @@ func (q *sqlQuerier) InsertWorkspaceAgentStartupLogs(ctx context.Context, arg In
|
||||
pq.Array(arg.CreatedAt),
|
||||
pq.Array(arg.Output),
|
||||
pq.Array(arg.Level),
|
||||
pq.Array(arg.EOF),
|
||||
arg.OutputLength,
|
||||
)
|
||||
if err != nil {
|
||||
@ -5874,6 +5898,7 @@ func (q *sqlQuerier) InsertWorkspaceAgentStartupLogs(ctx context.Context, arg In
|
||||
&i.Output,
|
||||
&i.ID,
|
||||
&i.Level,
|
||||
&i.EOF,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -146,18 +146,31 @@ WHERE
|
||||
id > @created_after
|
||||
) ORDER BY id ASC;
|
||||
|
||||
-- name: GetWorkspaceAgentStartupLogsEOF :one
|
||||
SELECT CASE WHEN EXISTS (
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
workspace_agent_startup_logs
|
||||
WHERE
|
||||
agent_id = $1
|
||||
AND eof = true
|
||||
LIMIT 1
|
||||
) THEN TRUE ELSE FALSE END;
|
||||
|
||||
-- name: InsertWorkspaceAgentStartupLogs :many
|
||||
WITH new_length AS (
|
||||
UPDATE workspace_agents SET
|
||||
startup_logs_length = startup_logs_length + @output_length WHERE workspace_agents.id = @agent_id
|
||||
)
|
||||
INSERT INTO
|
||||
workspace_agent_startup_logs (agent_id, created_at, output, level)
|
||||
workspace_agent_startup_logs (agent_id, created_at, output, level, eof)
|
||||
SELECT
|
||||
@agent_id :: uuid AS agent_id,
|
||||
unnest(@created_at :: timestamptz [ ]) AS created_at,
|
||||
unnest(@output :: VARCHAR(1024) [ ]) AS output,
|
||||
unnest(@level :: log_level [ ]) AS level
|
||||
unnest(@level :: log_level [ ]) AS level,
|
||||
unnest(@eof :: boolean [ ]) AS eof
|
||||
RETURNING workspace_agent_startup_logs.*;
|
||||
|
||||
-- If an agent hasn't connected in the last 7 days, we purge it's logs.
|
||||
|
@ -55,6 +55,7 @@ overrides:
|
||||
uuid: UUID
|
||||
failure_ttl: FailureTTL
|
||||
inactivity_ttl: InactivityTTL
|
||||
eof: EOF
|
||||
|
||||
sql:
|
||||
- schema: "./dump.sql"
|
||||
|
@ -259,8 +259,16 @@ func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.R
|
||||
createdAt := make([]time.Time, 0)
|
||||
output := make([]string, 0)
|
||||
level := make([]database.LogLevel, 0)
|
||||
eof := make([]bool, 0)
|
||||
outputLength := 0
|
||||
for _, logEntry := range req.Logs {
|
||||
for i, logEntry := range req.Logs {
|
||||
if logEntry.EOF && i != len(req.Logs)-1 {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "EOF log must be the last log entry.",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
createdAt = append(createdAt, logEntry.CreatedAt)
|
||||
output = append(output, logEntry.Output)
|
||||
outputLength += len(logEntry.Output)
|
||||
@ -277,15 +285,42 @@ func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.R
|
||||
return
|
||||
}
|
||||
level = append(level, parsedLevel)
|
||||
eof = append(eof, logEntry.EOF)
|
||||
}
|
||||
logs, err := api.Database.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{
|
||||
AgentID: workspaceAgent.ID,
|
||||
CreatedAt: createdAt,
|
||||
Output: output,
|
||||
Level: level,
|
||||
OutputLength: int32(outputLength),
|
||||
})
|
||||
|
||||
var logs []database.WorkspaceAgentStartupLog
|
||||
// Ensure logs are not written after EOF.
|
||||
eofError := xerrors.New("EOF log already received")
|
||||
err := api.Database.InTx(func(db database.Store) error {
|
||||
isEOF, err := db.GetWorkspaceAgentStartupLogsEOF(ctx, workspaceAgent.ID)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("EOF status: %w", err)
|
||||
}
|
||||
|
||||
if isEOF {
|
||||
// The agent has already sent an EOF log, so we don't need to process
|
||||
// any more logs.
|
||||
return eofError
|
||||
}
|
||||
|
||||
logs, err = db.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{
|
||||
AgentID: workspaceAgent.ID,
|
||||
CreatedAt: createdAt,
|
||||
Output: output,
|
||||
Level: level,
|
||||
EOF: eof,
|
||||
OutputLength: int32(outputLength),
|
||||
})
|
||||
return err
|
||||
}, nil)
|
||||
if err != nil {
|
||||
if errors.Is(err, eofError) {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Startup log has been closed.",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
if database.IsStartupLogsLimitError(err) {
|
||||
if !workspaceAgent.StartupLogsOverflowed {
|
||||
err := api.Database.UpdateWorkspaceAgentStartupLogOverflowByID(ctx, database.UpdateWorkspaceAgentStartupLogOverflowByIDParams{
|
||||
@ -332,6 +367,17 @@ func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.R
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
firstLog := logs[0]
|
||||
lastLog := logs[len(logs)-1]
|
||||
|
||||
// Publish by the lowest log ID inserted so the
|
||||
// log stream will fetch everything from that point.
|
||||
api.publishWorkspaceAgentStartupLogsUpdate(ctx, workspaceAgent.ID, agentsdk.StartupLogsNotifyMessage{
|
||||
CreatedAfter: firstLog.ID - 1,
|
||||
EndOfLogs: lastLog.EOF,
|
||||
})
|
||||
|
||||
if workspaceAgent.StartupLogsLength == 0 {
|
||||
// If these are the first logs being appended, we publish a UI update
|
||||
// to notify the UI that logs are now available.
|
||||
@ -356,26 +402,6 @@ func (api *API) patchWorkspaceAgentStartupLogs(rw http.ResponseWriter, r *http.R
|
||||
api.publishWorkspaceUpdate(ctx, build.WorkspaceID)
|
||||
}
|
||||
|
||||
lowestID := logs[0].ID
|
||||
// Publish by the lowest log ID inserted so the
|
||||
// log stream will fetch everything from that point.
|
||||
data, err := json.Marshal(agentsdk.StartupLogsNotifyMessage{
|
||||
CreatedAfter: lowestID - 1,
|
||||
})
|
||||
if err != nil {
|
||||
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Failed to marshal startup logs notify message",
|
||||
Detail: err.Error(),
|
||||
})
|
||||
return
|
||||
}
|
||||
err = api.Pubsub.Publish(agentsdk.StartupLogsNotifyChannel(workspaceAgent.ID), data)
|
||||
if err != nil {
|
||||
// We don't want to return an error to the agent here,
|
||||
// otherwise it might try to reinsert the logs.
|
||||
api.Logger.Warn(ctx, "failed to publish startup logs notify message", slog.Error(err))
|
||||
}
|
||||
|
||||
httpapi.Write(ctx, rw, http.StatusOK, nil)
|
||||
}
|
||||
|
||||
@ -397,7 +423,6 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
|
||||
// This mostly copies how provisioner job logs are streamed!
|
||||
var (
|
||||
ctx = r.Context()
|
||||
actor, _ = dbauthz.ActorFromContext(ctx)
|
||||
workspaceAgent = httpmw.WorkspaceAgentParam(r)
|
||||
logger = api.Logger.With(slog.F("workspace_agent_id", workspaceAgent.ID))
|
||||
follow = r.URL.Query().Has("follow")
|
||||
@ -409,11 +434,11 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
|
||||
if afterRaw != "" {
|
||||
var err error
|
||||
after, err = strconv.ParseInt(afterRaw, 10, 64)
|
||||
if err != nil {
|
||||
if err != nil || after < 0 {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Query param \"after\" must be an integer.",
|
||||
Message: "Query param \"after\" must be an integer greater than or equal to zero.",
|
||||
Validations: []codersdk.ValidationError{
|
||||
{Field: "after", Detail: "Must be an integer"},
|
||||
{Field: "after", Detail: "Must be an integer greater than or equal to zero"},
|
||||
},
|
||||
})
|
||||
return
|
||||
@ -467,66 +492,36 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if workspaceAgent.LifecycleState == database.WorkspaceAgentLifecycleStateReady {
|
||||
// The startup script has finished running, so we can close the connection.
|
||||
|
||||
lastSentLogID := after
|
||||
if len(logs) > 0 {
|
||||
last := logs[len(logs)-1]
|
||||
|
||||
if last.EOF {
|
||||
// The startup script has finished running, so we can close the connection.
|
||||
return
|
||||
}
|
||||
lastSentLogID = last.ID
|
||||
}
|
||||
if !codersdk.WorkspaceAgentLifecycle(workspaceAgent.LifecycleState).Starting() {
|
||||
// Backwards compatibility: Avoid waiting forever in case this agent was
|
||||
// created before the current release.
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
bufferedLogs = make(chan []database.WorkspaceAgentStartupLog, 128)
|
||||
endOfLogs atomic.Bool
|
||||
lastSentLogID atomic.Int64
|
||||
)
|
||||
notifyCh := make(chan struct{}, 1)
|
||||
// Allow us to immediately check if we missed any logs
|
||||
// between initial fetch and subscribe.
|
||||
notifyCh <- struct{}{}
|
||||
|
||||
sendLogs := func(logs []database.WorkspaceAgentStartupLog) {
|
||||
// Subscribe early to prevent missing log events.
|
||||
closeSubscribe, err := api.Pubsub.Subscribe(agentsdk.StartupLogsNotifyChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) {
|
||||
// The message is not important, we're tracking lastSentLogID manually.
|
||||
select {
|
||||
case bufferedLogs <- logs:
|
||||
lastSentLogID.Store(logs[len(logs)-1].ID)
|
||||
case notifyCh <- struct{}{}:
|
||||
default:
|
||||
logger.Warn(ctx, "workspace agent startup log overflowing channel")
|
||||
}
|
||||
}
|
||||
|
||||
closeSubscribe, err := api.Pubsub.Subscribe(
|
||||
agentsdk.StartupLogsNotifyChannel(workspaceAgent.ID),
|
||||
func(ctx context.Context, message []byte) {
|
||||
if endOfLogs.Load() {
|
||||
return
|
||||
}
|
||||
jlMsg := agentsdk.StartupLogsNotifyMessage{}
|
||||
err := json.Unmarshal(message, &jlMsg)
|
||||
if err != nil {
|
||||
logger.Warn(ctx, "invalid startup logs notify message", slog.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if jlMsg.CreatedAfter != 0 {
|
||||
logs, err := api.Database.GetWorkspaceAgentStartupLogsAfter(dbauthz.As(ctx, actor), database.GetWorkspaceAgentStartupLogsAfterParams{
|
||||
AgentID: workspaceAgent.ID,
|
||||
CreatedAfter: jlMsg.CreatedAfter,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Warn(ctx, "failed to get workspace agent startup logs after", slog.Error(err))
|
||||
return
|
||||
}
|
||||
sendLogs(logs)
|
||||
}
|
||||
|
||||
if jlMsg.EndOfLogs {
|
||||
endOfLogs.Store(true)
|
||||
logs, err := api.Database.GetWorkspaceAgentStartupLogsAfter(dbauthz.As(ctx, actor), database.GetWorkspaceAgentStartupLogsAfterParams{
|
||||
AgentID: workspaceAgent.ID,
|
||||
CreatedAfter: lastSentLogID.Load(),
|
||||
})
|
||||
if err != nil {
|
||||
logger.Warn(ctx, "get workspace agent startup logs after", slog.Error(err))
|
||||
return
|
||||
}
|
||||
sendLogs(logs)
|
||||
bufferedLogs <- nil
|
||||
}
|
||||
},
|
||||
)
|
||||
})
|
||||
if err != nil {
|
||||
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
||||
Message: "Failed to subscribe to startup logs.",
|
||||
@ -536,15 +531,70 @@ func (api *API) workspaceAgentStartupLogs(rw http.ResponseWriter, r *http.Reques
|
||||
}
|
||||
defer closeSubscribe()
|
||||
|
||||
// Buffer size controls the log prefetch capacity.
|
||||
bufferedLogs := make(chan []database.WorkspaceAgentStartupLog, 8)
|
||||
// Check at least once per minute in case we didn't receive a pubsub message.
|
||||
recheckInterval := time.Minute
|
||||
t := time.NewTicker(recheckInterval)
|
||||
defer t.Stop()
|
||||
|
||||
go func() {
|
||||
defer close(bufferedLogs)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
case <-notifyCh:
|
||||
t.Reset(recheckInterval)
|
||||
}
|
||||
|
||||
logs, err := api.Database.GetWorkspaceAgentStartupLogsAfter(ctx, database.GetWorkspaceAgentStartupLogsAfterParams{
|
||||
AgentID: workspaceAgent.ID,
|
||||
CreatedAfter: lastSentLogID,
|
||||
})
|
||||
if err != nil {
|
||||
if xerrors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
logger.Warn(ctx, "failed to get workspace agent startup logs after", slog.Error(err))
|
||||
continue
|
||||
}
|
||||
if len(logs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case bufferedLogs <- logs:
|
||||
lastSentLogID = logs[len(logs)-1].ID
|
||||
}
|
||||
if logs[len(logs)-1].EOF {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer func() {
|
||||
// Ensure that we don't return until the goroutine has exited.
|
||||
//nolint:revive // Consume channel to wait until it's closed.
|
||||
for range bufferedLogs {
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Debug(context.Background(), "job logs context canceled")
|
||||
logger.Debug(ctx, "job logs context canceled")
|
||||
return
|
||||
case logs, ok := <-bufferedLogs:
|
||||
// A nil log is sent when complete!
|
||||
if !ok || logs == nil {
|
||||
logger.Debug(context.Background(), "reached the end of published logs")
|
||||
if !ok {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logger.Debug(ctx, "job logs context canceled")
|
||||
default:
|
||||
logger.Debug(ctx, "reached the end of published logs")
|
||||
}
|
||||
return
|
||||
}
|
||||
err = encoder.Encode(convertWorkspaceAgentStartupLogs(logs))
|
||||
@ -1526,29 +1576,75 @@ func (api *API) workspaceAgentReportLifecycle(rw http.ResponseWriter, r *http.Re
|
||||
return
|
||||
}
|
||||
|
||||
api.Logger.Debug(ctx, "workspace agent state report",
|
||||
logger := api.Logger.With(
|
||||
slog.F("agent", workspaceAgent.ID),
|
||||
slog.F("workspace", workspace.ID),
|
||||
slog.F("payload", req),
|
||||
)
|
||||
logger.Debug(ctx, "workspace agent state report")
|
||||
|
||||
lifecycleState := database.WorkspaceAgentLifecycleState(req.State)
|
||||
if !lifecycleState.Valid() {
|
||||
lifecycleState := req.State
|
||||
dbLifecycleState := database.WorkspaceAgentLifecycleState(lifecycleState)
|
||||
if !dbLifecycleState.Valid() {
|
||||
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
||||
Message: "Invalid lifecycle state.",
|
||||
Detail: fmt.Sprintf("Invalid lifecycle state %q, must be be one of %q.", req.State, database.AllWorkspaceAgentLifecycleStateValues()),
|
||||
Detail: fmt.Sprintf("Invalid lifecycle state %q, must be be one of %q.", lifecycleState, database.AllWorkspaceAgentLifecycleStateValues()),
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
err = api.Database.UpdateWorkspaceAgentLifecycleStateByID(ctx, database.UpdateWorkspaceAgentLifecycleStateByIDParams{
|
||||
ID: workspaceAgent.ID,
|
||||
LifecycleState: lifecycleState,
|
||||
LifecycleState: dbLifecycleState,
|
||||
})
|
||||
if err != nil {
|
||||
httpapi.InternalServerError(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !lifecycleState.Starting() {
|
||||
var eofLog []database.WorkspaceAgentStartupLog
|
||||
// Ensure the startup logs are marked as complete if the agent
|
||||
// is no longer starting. This should be reported by the agent
|
||||
// itself, but we do it here as a fallback.
|
||||
err = api.Database.InTx(func(db database.Store) error {
|
||||
isEOF, err := db.GetWorkspaceAgentStartupLogsEOF(ctx, workspaceAgent.ID)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("EOF status: %w", err)
|
||||
}
|
||||
if isEOF {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Debug(ctx, "marking startup logs as complete because agent is no longer starting")
|
||||
eofLog, err = db.InsertWorkspaceAgentStartupLogs(ctx, database.InsertWorkspaceAgentStartupLogsParams{
|
||||
AgentID: workspaceAgent.ID,
|
||||
CreatedAt: []time.Time{database.Now()},
|
||||
Output: []string{""},
|
||||
Level: []database.LogLevel{database.LogLevelInfo},
|
||||
EOF: []bool{true},
|
||||
OutputLength: 0,
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("write EOF log entry: %w", err)
|
||||
}
|
||||
return nil
|
||||
}, nil)
|
||||
if err != nil {
|
||||
logger.Warn(ctx, "failed to mark startup logs as complete", slog.Error(err))
|
||||
// If this fails, we want the agent to keep trying so that the
|
||||
// startup log is eventually marked as complete.
|
||||
httpapi.InternalServerError(rw, err)
|
||||
return
|
||||
}
|
||||
if len(eofLog) > 0 {
|
||||
api.publishWorkspaceAgentStartupLogsUpdate(ctx, workspaceAgent.ID, agentsdk.StartupLogsNotifyMessage{
|
||||
CreatedAfter: eofLog[0].ID - 1,
|
||||
EndOfLogs: true,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
api.publishWorkspaceUpdate(ctx, workspace.ID)
|
||||
|
||||
httpapi.Write(ctx, rw, http.StatusNoContent, nil)
|
||||
@ -1984,6 +2080,7 @@ func convertWorkspaceAgentStartupLog(logEntry database.WorkspaceAgentStartupLog)
|
||||
CreatedAt: logEntry.CreatedAt,
|
||||
Output: logEntry.Output,
|
||||
Level: codersdk.LogLevel(logEntry.Level),
|
||||
EOF: logEntry.EOF,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -211,14 +211,20 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
|
||||
agentClient := agentsdk.New(client.URL)
|
||||
agentClient.SetSessionToken(authToken)
|
||||
err := agentClient.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
|
||||
Logs: []agentsdk.StartupLog{{
|
||||
CreatedAt: database.Now(),
|
||||
Output: "testing",
|
||||
}},
|
||||
Logs: []agentsdk.StartupLog{
|
||||
{
|
||||
CreatedAt: database.Now(),
|
||||
Output: "testing",
|
||||
},
|
||||
{
|
||||
CreatedAt: database.Now(),
|
||||
Output: "testing2",
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, -500)
|
||||
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
_ = closer.Close()
|
||||
@ -229,8 +235,9 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
|
||||
case logChunk = <-logs:
|
||||
}
|
||||
require.NoError(t, ctx.Err())
|
||||
require.Len(t, logChunk, 1)
|
||||
require.Len(t, logChunk, 2) // No EOF.
|
||||
require.Equal(t, "testing", logChunk[0].Output)
|
||||
require.Equal(t, "testing2", logChunk[1].Output)
|
||||
})
|
||||
t.Run("PublishesOnOverflow", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
@ -294,6 +301,251 @@ func TestWorkspaceAgentStartupLogs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
})
|
||||
t.Run("AllowEOFAfterOverflowAndCloseFollowWebsocket", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitMedium)
|
||||
client := coderdtest.New(t, &coderdtest.Options{
|
||||
IncludeProvisionerDaemon: true,
|
||||
})
|
||||
user := coderdtest.CreateFirstUser(t, client)
|
||||
authToken := uuid.NewString()
|
||||
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
|
||||
Parse: echo.ParseComplete,
|
||||
ProvisionPlan: echo.ProvisionComplete,
|
||||
ProvisionApply: []*proto.Provision_Response{{
|
||||
Type: &proto.Provision_Response_Complete{
|
||||
Complete: &proto.Provision_Complete{
|
||||
Resources: []*proto.Resource{{
|
||||
Name: "example",
|
||||
Type: "aws_instance",
|
||||
Agents: []*proto.Agent{{
|
||||
Id: uuid.NewString(),
|
||||
Auth: &proto.Agent_Token{
|
||||
Token: authToken,
|
||||
},
|
||||
}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
}},
|
||||
})
|
||||
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
|
||||
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
|
||||
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
|
||||
build := coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
|
||||
|
||||
updates, err := client.WatchWorkspace(ctx, workspace.ID)
|
||||
require.NoError(t, err)
|
||||
|
||||
logs, closeLogs, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0)
|
||||
require.NoError(t, err)
|
||||
defer closeLogs.Close()
|
||||
|
||||
wantLogs := []codersdk.WorkspaceAgentStartupLog{
|
||||
{
|
||||
CreatedAt: database.Now(),
|
||||
Output: "testing",
|
||||
Level: "info",
|
||||
},
|
||||
{
|
||||
CreatedAt: database.Now().Add(time.Minute),
|
||||
Level: "info",
|
||||
EOF: true,
|
||||
},
|
||||
}
|
||||
|
||||
agentClient := agentsdk.New(client.URL)
|
||||
agentClient.SetSessionToken(authToken)
|
||||
|
||||
var convertedLogs []agentsdk.StartupLog
|
||||
for _, log := range wantLogs {
|
||||
convertedLogs = append(convertedLogs, agentsdk.StartupLog{
|
||||
CreatedAt: log.CreatedAt,
|
||||
Output: log.Output,
|
||||
Level: log.Level,
|
||||
EOF: log.EOF,
|
||||
})
|
||||
}
|
||||
initialLogs := convertedLogs[:len(convertedLogs)-1]
|
||||
eofLog := convertedLogs[len(convertedLogs)-1]
|
||||
err = agentClient.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{Logs: initialLogs})
|
||||
require.NoError(t, err)
|
||||
|
||||
overflowLogs := []agentsdk.StartupLog{
|
||||
{
|
||||
CreatedAt: database.Now(),
|
||||
Output: strings.Repeat("a", (1<<20)+1),
|
||||
},
|
||||
eofLog, // Include EOF which will be discarded due to overflow.
|
||||
}
|
||||
err = agentClient.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{Logs: overflowLogs})
|
||||
var apiError *codersdk.Error
|
||||
require.ErrorAs(t, err, &apiError)
|
||||
require.Equal(t, http.StatusRequestEntityTooLarge, apiError.StatusCode())
|
||||
|
||||
// It's possible we have multiple updates queued, but that's alright, we just
|
||||
// wait for the one where it overflows.
|
||||
for {
|
||||
var update codersdk.Workspace
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
require.Fail(t, "timed out waiting for overflow")
|
||||
case update = <-updates:
|
||||
}
|
||||
if update.LatestBuild.Resources[0].Agents[0].StartupLogsOverflowed {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Now we should still be able to send the EOF.
|
||||
err = agentClient.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{Logs: []agentsdk.StartupLog{eofLog}})
|
||||
require.NoError(t, err)
|
||||
|
||||
var gotLogs []codersdk.WorkspaceAgentStartupLog
|
||||
logsLoop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
require.Fail(t, "timed out waiting for logs")
|
||||
case l, ok := <-logs:
|
||||
if !ok {
|
||||
break logsLoop
|
||||
}
|
||||
gotLogs = append(gotLogs, l...)
|
||||
}
|
||||
}
|
||||
for i := range gotLogs {
|
||||
gotLogs[i].ID = 0 // Ignore ID for comparison.
|
||||
}
|
||||
require.Equal(t, wantLogs, gotLogs)
|
||||
})
|
||||
t.Run("CloseAfterLifecycleStateIsNotRunning", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitMedium)
|
||||
client := coderdtest.New(t, &coderdtest.Options{
|
||||
IncludeProvisionerDaemon: true,
|
||||
})
|
||||
user := coderdtest.CreateFirstUser(t, client)
|
||||
authToken := uuid.NewString()
|
||||
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
|
||||
Parse: echo.ParseComplete,
|
||||
ProvisionPlan: echo.ProvisionComplete,
|
||||
ProvisionApply: []*proto.Provision_Response{{
|
||||
Type: &proto.Provision_Response_Complete{
|
||||
Complete: &proto.Provision_Complete{
|
||||
Resources: []*proto.Resource{{
|
||||
Name: "example",
|
||||
Type: "aws_instance",
|
||||
Agents: []*proto.Agent{{
|
||||
Id: uuid.NewString(),
|
||||
Auth: &proto.Agent_Token{
|
||||
Token: authToken,
|
||||
},
|
||||
}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
}},
|
||||
})
|
||||
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
|
||||
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
|
||||
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
|
||||
build := coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
|
||||
|
||||
agentClient := agentsdk.New(client.URL)
|
||||
agentClient.SetSessionToken(authToken)
|
||||
|
||||
logs, closer, err := client.WorkspaceAgentStartupLogsAfter(ctx, build.Resources[0].Agents[0].ID, 0)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
_ = closer.Close()
|
||||
}()
|
||||
|
||||
err = agentClient.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
|
||||
Logs: []agentsdk.StartupLog{
|
||||
{
|
||||
CreatedAt: database.Now(),
|
||||
Output: "testing",
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = agentClient.PostLifecycle(ctx, agentsdk.PostLifecycleRequest{
|
||||
State: codersdk.WorkspaceAgentLifecycleReady,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
require.Fail(t, "timed out waiting for logs EOF")
|
||||
case l := <-logs:
|
||||
for _, log := range l {
|
||||
if log.EOF {
|
||||
// Success.
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
t.Run("NoLogAfterEOF", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ctx := testutil.Context(t, testutil.WaitMedium)
|
||||
client := coderdtest.New(t, &coderdtest.Options{
|
||||
IncludeProvisionerDaemon: true,
|
||||
})
|
||||
user := coderdtest.CreateFirstUser(t, client)
|
||||
authToken := uuid.NewString()
|
||||
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
|
||||
Parse: echo.ParseComplete,
|
||||
ProvisionPlan: echo.ProvisionComplete,
|
||||
ProvisionApply: []*proto.Provision_Response{{
|
||||
Type: &proto.Provision_Response_Complete{
|
||||
Complete: &proto.Provision_Complete{
|
||||
Resources: []*proto.Resource{{
|
||||
Name: "example",
|
||||
Type: "aws_instance",
|
||||
Agents: []*proto.Agent{{
|
||||
Id: uuid.NewString(),
|
||||
Auth: &proto.Agent_Token{
|
||||
Token: authToken,
|
||||
},
|
||||
}},
|
||||
}},
|
||||
},
|
||||
},
|
||||
}},
|
||||
})
|
||||
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
|
||||
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
|
||||
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
|
||||
_ = coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
|
||||
|
||||
agentClient := agentsdk.New(client.URL)
|
||||
agentClient.SetSessionToken(authToken)
|
||||
|
||||
err := agentClient.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
|
||||
Logs: []agentsdk.StartupLog{
|
||||
{
|
||||
CreatedAt: database.Now(),
|
||||
EOF: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = agentClient.PatchStartupLogs(ctx, agentsdk.PatchStartupLogs{
|
||||
Logs: []agentsdk.StartupLog{
|
||||
{
|
||||
CreatedAt: database.Now(),
|
||||
Output: "testing",
|
||||
},
|
||||
},
|
||||
})
|
||||
require.Error(t, err, "insert after EOF should not succeed")
|
||||
})
|
||||
}
|
||||
|
||||
func TestWorkspaceAgentListen(t *testing.T) {
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
"github.com/coder/coder/coderd/util/ptr"
|
||||
"github.com/coder/coder/coderd/wsbuilder"
|
||||
"github.com/coder/coder/codersdk"
|
||||
"github.com/coder/coder/codersdk/agentsdk"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -1178,3 +1179,14 @@ func (api *API) publishWorkspaceUpdate(ctx context.Context, workspaceID uuid.UUI
|
||||
slog.F("workspace_id", workspaceID), slog.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (api *API) publishWorkspaceAgentStartupLogsUpdate(ctx context.Context, workspaceAgentID uuid.UUID, m agentsdk.StartupLogsNotifyMessage) {
|
||||
b, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
api.Logger.Warn(ctx, "failed to marshal startup logs notify message", slog.F("workspace_agent_id", workspaceAgentID), slog.Error(err))
|
||||
}
|
||||
err = api.Pubsub.Publish(agentsdk.StartupLogsNotifyChannel(workspaceAgentID), b)
|
||||
if err != nil {
|
||||
api.Logger.Warn(ctx, "failed to publish workspace agent startup logs update", slog.F("workspace_agent_id", workspaceAgentID), slog.Error(err))
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user