mirror of
https://github.com/coder/coder.git
synced 2025-03-14 10:09:57 +00:00
This PR provides a convenience function for parsing a `map[string]string` from a query parameter. Context: https://github.com/coder/coder/pull/16558#discussion_r1956190615
648 lines
21 KiB
Go
648 lines
21 KiB
Go
package coderd
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"io"
|
|
"net/http"
|
|
"sort"
|
|
"strconv"
|
|
|
|
"github.com/google/uuid"
|
|
"golang.org/x/xerrors"
|
|
|
|
"cdr.dev/slog"
|
|
"github.com/coder/coder/v2/coderd/database"
|
|
"github.com/coder/coder/v2/coderd/database/db2sdk"
|
|
"github.com/coder/coder/v2/coderd/database/dbauthz"
|
|
"github.com/coder/coder/v2/coderd/database/pubsub"
|
|
"github.com/coder/coder/v2/coderd/httpapi"
|
|
"github.com/coder/coder/v2/coderd/httpmw"
|
|
"github.com/coder/coder/v2/coderd/rbac"
|
|
"github.com/coder/coder/v2/coderd/rbac/policy"
|
|
"github.com/coder/coder/v2/coderd/util/slice"
|
|
"github.com/coder/coder/v2/codersdk"
|
|
"github.com/coder/coder/v2/codersdk/wsjson"
|
|
"github.com/coder/coder/v2/provisionersdk"
|
|
"github.com/coder/websocket"
|
|
)
|
|
|
|
// @Summary Get provisioner job
|
|
// @ID get-provisioner-job
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Organizations
|
|
// @Param organization path string true "Organization ID" format(uuid)
|
|
// @Param job path string true "Job ID" format(uuid)
|
|
// @Success 200 {object} codersdk.ProvisionerJob
|
|
// @Router /organizations/{organization}/provisionerjobs/{job} [get]
|
|
func (api *API) provisionerJob(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
jobID, ok := httpmw.ParseUUIDParam(rw, r, "job")
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
jobs, ok := api.handleAuthAndFetchProvisionerJobs(rw, r, []uuid.UUID{jobID})
|
|
if !ok {
|
|
return
|
|
}
|
|
if len(jobs) == 0 {
|
|
httpapi.ResourceNotFound(rw)
|
|
return
|
|
}
|
|
if len(jobs) > 1 || jobs[0].ProvisionerJob.ID != jobID {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching provisioner job.",
|
|
Detail: "Database returned an unexpected job.",
|
|
})
|
|
return
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, convertProvisionerJobWithQueuePosition(jobs[0]))
|
|
}
|
|
|
|
// @Summary Get provisioner jobs
|
|
// @ID get-provisioner-jobs
|
|
// @Security CoderSessionToken
|
|
// @Produce json
|
|
// @Tags Organizations
|
|
// @Param organization path string true "Organization ID" format(uuid)
|
|
// @Param limit query int false "Page limit"
|
|
// @Param ids query []string false "Filter results by job IDs" format(uuid)
|
|
// @Param status query codersdk.ProvisionerJobStatus false "Filter results by status" enums(pending,running,succeeded,canceling,canceled,failed)
|
|
// @Param tags query object false "Provisioner tags to filter by (JSON of the form {'tag1':'value1','tag2':'value2'})"
|
|
// @Success 200 {array} codersdk.ProvisionerJob
|
|
// @Router /organizations/{organization}/provisionerjobs [get]
|
|
func (api *API) provisionerJobs(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
jobs, ok := api.handleAuthAndFetchProvisionerJobs(rw, r, nil)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, db2sdk.List(jobs, convertProvisionerJobWithQueuePosition))
|
|
}
|
|
|
|
// handleAuthAndFetchProvisionerJobs is an internal method shared by
|
|
// provisionerJob and provisionerJobs. If ok is false the caller should
|
|
// return immediately because the response has already been written.
|
|
func (api *API) handleAuthAndFetchProvisionerJobs(rw http.ResponseWriter, r *http.Request, ids []uuid.UUID) (_ []database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, ok bool) {
|
|
ctx := r.Context()
|
|
org := httpmw.OrganizationParam(r)
|
|
|
|
// For now, only owners and template admins can access provisioner jobs.
|
|
if !api.Authorize(r, policy.ActionRead, rbac.ResourceProvisionerJobs.InOrg(org.ID)) {
|
|
httpapi.ResourceNotFound(rw)
|
|
return nil, false
|
|
}
|
|
|
|
qp := r.URL.Query()
|
|
p := httpapi.NewQueryParamParser()
|
|
limit := p.PositiveInt32(qp, 50, "limit")
|
|
status := p.Strings(qp, nil, "status")
|
|
if ids == nil {
|
|
ids = p.UUIDs(qp, nil, "ids")
|
|
}
|
|
tags := p.JSONStringMap(qp, database.StringMap{}, "tags")
|
|
p.ErrorExcessParams(qp)
|
|
if len(p.Errors) > 0 {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Invalid query parameters.",
|
|
Validations: p.Errors,
|
|
})
|
|
return nil, false
|
|
}
|
|
|
|
jobs, err := api.Database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx, database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams{
|
|
OrganizationID: org.ID,
|
|
Status: slice.StringEnums[database.ProvisionerJobStatus](status),
|
|
Limit: sql.NullInt32{Int32: limit, Valid: limit > 0},
|
|
IDs: ids,
|
|
Tags: tags,
|
|
})
|
|
if err != nil {
|
|
if httpapi.Is404Error(err) {
|
|
httpapi.ResourceNotFound(rw)
|
|
return nil, false
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching provisioner jobs.",
|
|
Detail: err.Error(),
|
|
})
|
|
return nil, false
|
|
}
|
|
|
|
return jobs, true
|
|
}
|
|
|
|
// Returns provisioner logs based on query parameters.
|
|
// The intended usage for a client to stream all logs (with JS API):
|
|
// GET /logs
|
|
// GET /logs?after=<id>&follow
|
|
// The combination of these responses should provide all current logs
|
|
// to the consumer, and future logs are streamed in the follow request.
|
|
func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob) {
|
|
var (
|
|
ctx = r.Context()
|
|
logger = api.Logger.With(slog.F("job_id", job.ID))
|
|
follow = r.URL.Query().Has("follow")
|
|
afterRaw = r.URL.Query().Get("after")
|
|
)
|
|
|
|
var after int64
|
|
// Only fetch logs created after the time provided.
|
|
if afterRaw != "" {
|
|
var err error
|
|
after, err = strconv.ParseInt(afterRaw, 10, 64)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Query param \"after\" must be an integer.",
|
|
Validations: []codersdk.ValidationError{
|
|
{Field: "after", Detail: "Must be an integer"},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
}
|
|
|
|
if !follow {
|
|
fetchAndWriteLogs(ctx, api.Database, job.ID, after, rw)
|
|
return
|
|
}
|
|
|
|
follower := newLogFollower(ctx, logger, api.Database, api.Pubsub, rw, r, job, after)
|
|
api.WebsocketWaitMutex.Lock()
|
|
api.WebsocketWaitGroup.Add(1)
|
|
api.WebsocketWaitMutex.Unlock()
|
|
defer api.WebsocketWaitGroup.Done()
|
|
follower.follow()
|
|
}
|
|
|
|
func (api *API) provisionerJobResources(rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob) {
|
|
ctx := r.Context()
|
|
if !job.CompletedAt.Valid {
|
|
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Job hasn't completed!",
|
|
})
|
|
return
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceResourcesByJobID is a system function.
|
|
resources, err := api.Database.GetWorkspaceResourcesByJobID(dbauthz.AsSystemRestricted(ctx), job.ID)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching job resources.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
resourceIDs := make([]uuid.UUID, 0)
|
|
for _, resource := range resources {
|
|
resourceIDs = append(resourceIDs, resource.ID)
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceAgentsByResourceIDs is a system function.
|
|
resourceAgents, err := api.Database.GetWorkspaceAgentsByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
resourceAgentIDs := make([]uuid.UUID, 0)
|
|
for _, agent := range resourceAgents {
|
|
resourceAgentIDs = append(resourceAgentIDs, agent.ID)
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceAppsByAgentIDs is a system function.
|
|
apps, err := api.Database.GetWorkspaceAppsByAgentIDs(dbauthz.AsSystemRestricted(ctx), resourceAgentIDs)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace applications.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceAgentScriptsByAgentIDs is a system function.
|
|
scripts, err := api.Database.GetWorkspaceAgentScriptsByAgentIDs(dbauthz.AsSystemRestricted(ctx), resourceAgentIDs)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace agent scripts.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceAgentLogSourcesByAgentIDs is a system function.
|
|
logSources, err := api.Database.GetWorkspaceAgentLogSourcesByAgentIDs(dbauthz.AsSystemRestricted(ctx), resourceAgentIDs)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace agent log sources.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
// nolint:gocritic // GetWorkspaceResourceMetadataByResourceIDs is a system function.
|
|
resourceMetadata, err := api.Database.GetWorkspaceResourceMetadataByResourceIDs(dbauthz.AsSystemRestricted(ctx), resourceIDs)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching workspace metadata.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
apiResources := make([]codersdk.WorkspaceResource, 0)
|
|
for _, resource := range resources {
|
|
agents := make([]codersdk.WorkspaceAgent, 0)
|
|
for _, agent := range resourceAgents {
|
|
if agent.ResourceID != resource.ID {
|
|
continue
|
|
}
|
|
dbApps := make([]database.WorkspaceApp, 0)
|
|
for _, app := range apps {
|
|
if app.AgentID == agent.ID {
|
|
dbApps = append(dbApps, app)
|
|
}
|
|
}
|
|
dbScripts := make([]database.WorkspaceAgentScript, 0)
|
|
for _, script := range scripts {
|
|
if script.WorkspaceAgentID == agent.ID {
|
|
dbScripts = append(dbScripts, script)
|
|
}
|
|
}
|
|
dbLogSources := make([]database.WorkspaceAgentLogSource, 0)
|
|
for _, logSource := range logSources {
|
|
if logSource.WorkspaceAgentID == agent.ID {
|
|
dbLogSources = append(dbLogSources, logSource)
|
|
}
|
|
}
|
|
|
|
apiAgent, err := db2sdk.WorkspaceAgent(
|
|
api.DERPMap(), *api.TailnetCoordinator.Load(), agent, convertProvisionedApps(dbApps), convertScripts(dbScripts), convertLogSources(dbLogSources), api.AgentInactiveDisconnectTimeout,
|
|
api.DeploymentValues.AgentFallbackTroubleshootingURL.String(),
|
|
)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error reading job agent.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
agents = append(agents, apiAgent)
|
|
}
|
|
metadata := make([]database.WorkspaceResourceMetadatum, 0)
|
|
for _, field := range resourceMetadata {
|
|
if field.WorkspaceResourceID == resource.ID {
|
|
metadata = append(metadata, field)
|
|
}
|
|
}
|
|
apiResources = append(apiResources, convertWorkspaceResource(resource, agents, metadata))
|
|
}
|
|
sort.Slice(apiResources, func(i, j int) bool {
|
|
return apiResources[i].Name < apiResources[j].Name
|
|
})
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, apiResources)
|
|
}
|
|
|
|
func convertProvisionerJobLogs(provisionerJobLogs []database.ProvisionerJobLog) []codersdk.ProvisionerJobLog {
|
|
sdk := make([]codersdk.ProvisionerJobLog, 0, len(provisionerJobLogs))
|
|
for _, log := range provisionerJobLogs {
|
|
sdk = append(sdk, convertProvisionerJobLog(log))
|
|
}
|
|
return sdk
|
|
}
|
|
|
|
func convertProvisionerJobLog(provisionerJobLog database.ProvisionerJobLog) codersdk.ProvisionerJobLog {
|
|
return codersdk.ProvisionerJobLog{
|
|
ID: provisionerJobLog.ID,
|
|
CreatedAt: provisionerJobLog.CreatedAt,
|
|
Source: codersdk.LogSource(provisionerJobLog.Source),
|
|
Level: codersdk.LogLevel(provisionerJobLog.Level),
|
|
Stage: provisionerJobLog.Stage,
|
|
Output: provisionerJobLog.Output,
|
|
}
|
|
}
|
|
|
|
func convertProvisionerJob(pj database.GetProvisionerJobsByIDsWithQueuePositionRow) codersdk.ProvisionerJob {
|
|
provisionerJob := pj.ProvisionerJob
|
|
job := codersdk.ProvisionerJob{
|
|
ID: provisionerJob.ID,
|
|
OrganizationID: provisionerJob.OrganizationID,
|
|
CreatedAt: provisionerJob.CreatedAt,
|
|
Type: codersdk.ProvisionerJobType(provisionerJob.Type),
|
|
Error: provisionerJob.Error.String,
|
|
ErrorCode: codersdk.JobErrorCode(provisionerJob.ErrorCode.String),
|
|
FileID: provisionerJob.FileID,
|
|
Tags: provisionerJob.Tags,
|
|
QueuePosition: int(pj.QueuePosition),
|
|
QueueSize: int(pj.QueueSize),
|
|
}
|
|
// Applying values optional to the struct.
|
|
if provisionerJob.StartedAt.Valid {
|
|
job.StartedAt = &provisionerJob.StartedAt.Time
|
|
}
|
|
if provisionerJob.CompletedAt.Valid {
|
|
job.CompletedAt = &provisionerJob.CompletedAt.Time
|
|
}
|
|
if provisionerJob.CanceledAt.Valid {
|
|
job.CanceledAt = &provisionerJob.CanceledAt.Time
|
|
}
|
|
if provisionerJob.WorkerID.Valid {
|
|
job.WorkerID = &provisionerJob.WorkerID.UUID
|
|
}
|
|
job.Status = codersdk.ProvisionerJobStatus(pj.ProvisionerJob.JobStatus)
|
|
|
|
// Only unmarshal input if it exists, this should only be zero in testing.
|
|
if len(provisionerJob.Input) > 0 {
|
|
if err := json.Unmarshal(provisionerJob.Input, &job.Input); err != nil {
|
|
job.Input.Error = xerrors.Errorf("decode input %s: %w", provisionerJob.Input, err).Error()
|
|
}
|
|
}
|
|
|
|
return job
|
|
}
|
|
|
|
func convertProvisionerJobWithQueuePosition(pj database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow) codersdk.ProvisionerJob {
|
|
job := convertProvisionerJob(database.GetProvisionerJobsByIDsWithQueuePositionRow{
|
|
ProvisionerJob: pj.ProvisionerJob,
|
|
QueuePosition: pj.QueuePosition,
|
|
QueueSize: pj.QueueSize,
|
|
})
|
|
job.AvailableWorkers = pj.AvailableWorkers
|
|
job.Metadata = codersdk.ProvisionerJobMetadata{
|
|
TemplateVersionName: pj.TemplateVersionName,
|
|
TemplateID: pj.TemplateID.UUID,
|
|
TemplateName: pj.TemplateName,
|
|
TemplateDisplayName: pj.TemplateDisplayName,
|
|
TemplateIcon: pj.TemplateIcon,
|
|
WorkspaceName: pj.WorkspaceName,
|
|
}
|
|
if pj.WorkspaceID.Valid {
|
|
job.Metadata.WorkspaceID = &pj.WorkspaceID.UUID
|
|
}
|
|
return job
|
|
}
|
|
|
|
func fetchAndWriteLogs(ctx context.Context, db database.Store, jobID uuid.UUID, after int64, rw http.ResponseWriter) {
|
|
logs, err := db.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
|
|
JobID: jobID,
|
|
CreatedAfter: after,
|
|
})
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching provisioner logs.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if logs == nil {
|
|
logs = []database.ProvisionerJobLog{}
|
|
}
|
|
httpapi.Write(ctx, rw, http.StatusOK, convertProvisionerJobLogs(logs))
|
|
}
|
|
|
|
func jobIsComplete(logger slog.Logger, job database.ProvisionerJob) bool {
|
|
status := codersdk.ProvisionerJobStatus(job.JobStatus)
|
|
switch status {
|
|
case codersdk.ProvisionerJobCanceled:
|
|
return true
|
|
case codersdk.ProvisionerJobFailed:
|
|
return true
|
|
case codersdk.ProvisionerJobSucceeded:
|
|
return true
|
|
case codersdk.ProvisionerJobPending:
|
|
return false
|
|
case codersdk.ProvisionerJobCanceling:
|
|
return false
|
|
case codersdk.ProvisionerJobRunning:
|
|
return false
|
|
default:
|
|
logger.Error(context.Background(),
|
|
"can't convert the provisioner job status",
|
|
slog.F("job_id", job.ID), slog.F("status", status))
|
|
return false
|
|
}
|
|
}
|
|
|
|
type logFollower struct {
|
|
ctx context.Context
|
|
logger slog.Logger
|
|
db database.Store
|
|
pubsub pubsub.Pubsub
|
|
r *http.Request
|
|
rw http.ResponseWriter
|
|
conn *websocket.Conn
|
|
enc *wsjson.Encoder[codersdk.ProvisionerJobLog]
|
|
|
|
jobID uuid.UUID
|
|
after int64
|
|
complete bool
|
|
notifications chan provisionersdk.ProvisionerJobLogsNotifyMessage
|
|
errors chan error
|
|
}
|
|
|
|
func newLogFollower(
|
|
ctx context.Context, logger slog.Logger, db database.Store, ps pubsub.Pubsub,
|
|
rw http.ResponseWriter, r *http.Request, job database.ProvisionerJob, after int64,
|
|
) *logFollower {
|
|
return &logFollower{
|
|
ctx: ctx,
|
|
logger: logger,
|
|
db: db,
|
|
pubsub: ps,
|
|
r: r,
|
|
rw: rw,
|
|
jobID: job.ID,
|
|
after: after,
|
|
complete: jobIsComplete(logger, job),
|
|
notifications: make(chan provisionersdk.ProvisionerJobLogsNotifyMessage),
|
|
errors: make(chan error),
|
|
}
|
|
}
|
|
|
|
func (f *logFollower) follow() {
|
|
var cancel context.CancelFunc
|
|
f.ctx, cancel = context.WithCancel(f.ctx)
|
|
defer cancel()
|
|
// note that we only need to subscribe to updates if the job is not yet
|
|
// complete.
|
|
if !f.complete {
|
|
subCancel, err := f.pubsub.SubscribeWithErr(
|
|
provisionersdk.ProvisionerJobLogsNotifyChannel(f.jobID),
|
|
f.listener,
|
|
)
|
|
if err != nil {
|
|
httpapi.Write(f.ctx, f.rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "failed to subscribe to job updates",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer subCancel()
|
|
// Move cancel up the stack so it happens before unsubscribing,
|
|
// otherwise we can end up in a deadlock due to how the
|
|
// in-memory pubsub does mutex locking on send/unsubscribe.
|
|
defer cancel()
|
|
|
|
// we were provided `complete` prior to starting this subscription, so
|
|
// we also need to check whether the job is now complete, in case the
|
|
// job completed between the last time we queried the job and the start
|
|
// of the subscription. If the job completes after this, we will get
|
|
// a notification on the subscription.
|
|
job, err := f.db.GetProvisionerJobByID(f.ctx, f.jobID)
|
|
if err != nil {
|
|
httpapi.Write(f.ctx, f.rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "failed to query job",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
f.complete = jobIsComplete(f.logger, job)
|
|
f.logger.Debug(f.ctx, "queried job after subscribe", slog.F("complete", f.complete))
|
|
}
|
|
|
|
var err error
|
|
f.conn, err = websocket.Accept(f.rw, f.r, nil)
|
|
if err != nil {
|
|
httpapi.Write(f.ctx, f.rw, http.StatusBadRequest, codersdk.Response{
|
|
Message: "Failed to accept websocket.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
defer f.conn.Close(websocket.StatusNormalClosure, "done")
|
|
go httpapi.Heartbeat(f.ctx, f.conn)
|
|
f.enc = wsjson.NewEncoder[codersdk.ProvisionerJobLog](f.conn, websocket.MessageText)
|
|
|
|
// query for logs once right away, so we can get historical data from before
|
|
// subscription
|
|
if err := f.query(); err != nil {
|
|
if f.ctx.Err() == nil && !xerrors.Is(err, io.EOF) {
|
|
// neither context expiry, nor EOF, close and log
|
|
f.logger.Error(f.ctx, "failed to query logs", slog.Error(err))
|
|
err = f.conn.Close(websocket.StatusInternalError, err.Error())
|
|
if err != nil {
|
|
f.logger.Warn(f.ctx, "failed to close webscoket", slog.Error(err))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// no need to wait if the job is done
|
|
if f.complete {
|
|
return
|
|
}
|
|
for {
|
|
select {
|
|
case err := <-f.errors:
|
|
// we've dropped at least one notification. This can happen if we
|
|
// lose database connectivity. We don't know whether the job is
|
|
// now complete since we could have missed the end of logs message.
|
|
// We could soldier on and retry, but loss of database connectivity
|
|
// is fairly serious, so instead just 500 and bail out. Client
|
|
// can retry and hopefully find a healthier node.
|
|
f.logger.Error(f.ctx, "dropped or corrupted notification", slog.Error(err))
|
|
err = f.conn.Close(websocket.StatusInternalError, err.Error())
|
|
if err != nil {
|
|
f.logger.Warn(f.ctx, "failed to close webscoket", slog.Error(err))
|
|
}
|
|
return
|
|
case <-f.ctx.Done():
|
|
// client disconnect
|
|
return
|
|
case n := <-f.notifications:
|
|
if n.EndOfLogs {
|
|
// safe to return here because we started the subscription,
|
|
// and then queried at least once, so we will have already
|
|
// gotten all logs prior to the start of our subscription.
|
|
return
|
|
}
|
|
err = f.query()
|
|
if err != nil {
|
|
if f.ctx.Err() == nil && !xerrors.Is(err, io.EOF) {
|
|
// neither context expiry, nor EOF, close and log
|
|
f.logger.Error(f.ctx, "failed to query logs", slog.Error(err))
|
|
err = f.conn.Close(websocket.StatusInternalError, httpapi.WebsocketCloseSprintf("%s", err.Error()))
|
|
if err != nil {
|
|
f.logger.Warn(f.ctx, "failed to close webscoket", slog.Error(err))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *logFollower) listener(_ context.Context, message []byte, err error) {
|
|
// in this function we always pair writes to channels with a select on the context
|
|
// otherwise we could block a goroutine if the follow() method exits.
|
|
if err != nil {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
case f.errors <- err:
|
|
}
|
|
return
|
|
}
|
|
var n provisionersdk.ProvisionerJobLogsNotifyMessage
|
|
err = json.Unmarshal(message, &n)
|
|
if err != nil {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
case f.errors <- err:
|
|
}
|
|
return
|
|
}
|
|
select {
|
|
case <-f.ctx.Done():
|
|
case f.notifications <- n:
|
|
}
|
|
}
|
|
|
|
// query fetches the latest job logs from the database and writes them to the
|
|
// connection.
|
|
func (f *logFollower) query() error {
|
|
f.logger.Debug(f.ctx, "querying logs", slog.F("after", f.after))
|
|
logs, err := f.db.GetProvisionerLogsAfterID(f.ctx, database.GetProvisionerLogsAfterIDParams{
|
|
JobID: f.jobID,
|
|
CreatedAfter: f.after,
|
|
})
|
|
if err != nil && !errors.Is(err, sql.ErrNoRows) {
|
|
return xerrors.Errorf("error fetching logs: %w", err)
|
|
}
|
|
for _, log := range logs {
|
|
err := f.enc.Encode(convertProvisionerJobLog(log))
|
|
if err != nil {
|
|
return xerrors.Errorf("error writing to websocket: %w", err)
|
|
}
|
|
f.after = log.ID
|
|
f.logger.Debug(f.ctx, "wrote log to websocket", slog.F("id", log.ID))
|
|
}
|
|
return nil
|
|
}
|