fix: check if logs are completed before publishing (#6824)

This commit is contained in:
Kyle Carberry
2023-03-27 15:50:53 -05:00
committed by GitHub
parent 48f9521fcb
commit 5e01e6e448

View File

@ -54,6 +54,47 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
} }
} }
if !follow {
logs, err := api.Database.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
JobID: job.ID,
CreatedAfter: after,
})
if errors.Is(err, sql.ErrNoRows) {
err = nil
}
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner logs.",
Detail: err.Error(),
})
return
}
if logs == nil {
logs = []database.ProvisionerJobLog{}
}
logger.Debug(ctx, "Finished non-follow job logs")
httpapi.Write(ctx, rw, http.StatusOK, convertProvisionerJobLogs(logs))
return
}
// if we are following logs, start the subscription before we query the database, so that we don't miss any logs
// between the end of our query and the start of the subscription. We might get duplicates, so we'll keep track
// of processed IDs.
var bufferedLogs <-chan *database.ProvisionerJobLog
if follow {
bl, closeFollow, err := api.followProvisionerJobLogs(actor, job.ID)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error watching provisioner logs.",
Detail: err.Error(),
})
return
}
defer closeFollow()
bufferedLogs = bl
}
logs, err := api.Database.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{ logs, err := api.Database.GetProvisionerLogsAfterID(ctx, database.GetProvisionerLogsAfterIDParams{
JobID: job.ID, JobID: job.ID,
CreatedAfter: after, CreatedAfter: after,
@ -72,12 +113,6 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
logs = []database.ProvisionerJobLog{} logs = []database.ProvisionerJobLog{}
} }
if !follow {
logger.Debug(ctx, "Finished non-follow job logs")
httpapi.Write(ctx, rw, http.StatusOK, convertProvisionerJobLogs(logs))
return
}
api.WebsocketWaitMutex.Lock() api.WebsocketWaitMutex.Lock()
api.WebsocketWaitGroup.Add(1) api.WebsocketWaitGroup.Add(1)
api.WebsocketWaitMutex.Unlock() api.WebsocketWaitMutex.Unlock()
@ -106,28 +141,19 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
return return
} }
} }
job, err = api.Database.GetProvisionerJobByID(ctx, job.ID)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching provisioner job.",
Detail: err.Error(),
})
return
}
if job.CompletedAt.Valid { if job.CompletedAt.Valid {
// job was complete before we queried the database for historical logs // job was complete before we queried the database for historical logs
return return
} }
// if we are following logs, start the subscription before we query the database, so that we don't miss any logs
// between the end of our query and the start of the subscription. We might get duplicates, so we'll keep track
// of processed IDs.
var bufferedLogs <-chan *database.ProvisionerJobLog
if follow {
bl, closeFollow, err := api.followProvisionerJobLogs(actor, job.ID)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error watching provisioner logs.",
Detail: err.Error(),
})
return
}
defer closeFollow()
bufferedLogs = bl
}
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():