fix: Use WebSockets to stream workspace build logs (#2569)

* fix: Use WebSockets to stream workspace build logs

This was using a streaming HTTP request before, which didn't work
on my version of Chrome. This method seemed less reliable and standard
than a WebSocket, so figured switching would be best.

* Update site/src/xServices/workspaceBuild/workspaceBuildXService.ts

Co-authored-by: Abhineet Jain <AbhineetJain@users.noreply.github.com>

* Update site/src/pages/WorkspaceBuildPage/WorkspaceBuildPage.test.tsx

Co-authored-by: Abhineet Jain <AbhineetJain@users.noreply.github.com>

* Update site/src/api/api.ts

Co-authored-by: Abhineet Jain <AbhineetJain@users.noreply.github.com>

* Remove unused prop

Co-authored-by: Abhineet Jain <AbhineetJain@users.noreply.github.com>
This commit is contained in:
Kyle Carberry
2022-06-22 08:23:14 -05:00
committed by GitHub
parent dc7d6def8e
commit 1778db23cb
12 changed files with 130 additions and 108 deletions

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/google/uuid"
"nhooyr.io/websocket"
"cdr.dev/slog"
@ -98,12 +99,28 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
return
}
api.websocketWaitMutex.Lock()
api.websocketWaitGroup.Add(1)
api.websocketWaitMutex.Unlock()
defer api.websocketWaitGroup.Done()
conn, err := websocket.Accept(rw, r, nil)
if err != nil {
httpapi.Write(rw, http.StatusBadRequest, httpapi.Response{
Message: "Failed to accept websocket.",
Detail: err.Error(),
})
return
}
ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageText)
defer wsNetConn.Close() // Also closes conn.
bufferedLogs := make(chan database.ProvisionerJobLog, 128)
closeSubscribe, err := api.Pubsub.Subscribe(provisionerJobLogsChannel(job.ID), func(ctx context.Context, message []byte) {
var logs []database.ProvisionerJobLog
err := json.Unmarshal(message, &logs)
if err != nil {
api.Logger.Warn(r.Context(), fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error()))
api.Logger.Warn(ctx, fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error()))
return
}
@ -113,7 +130,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
default:
// If this overflows users could miss logs streaming. This can happen
// if a database request takes a long amount of time, and we get a lot of logs.
api.Logger.Warn(r.Context(), "provisioner job log overflowing channel")
api.Logger.Warn(ctx, "provisioner job log overflowing channel")
}
}
})
@ -126,7 +143,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
}
defer closeSubscribe()
provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(r.Context(), database.GetProvisionerLogsByIDBetweenParams{
provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
JobID: job.ID,
CreatedAfter: after,
CreatedBefore: before,
@ -142,17 +159,8 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
return
}
// "follow" uses the ndjson format to stream data.
// See: https://canjs.com/doc/can-ndjson-stream.html
rw.Header().Set("Content-Type", "application/stream+json")
rw.WriteHeader(http.StatusOK)
if flusher, ok := rw.(http.Flusher); ok {
flusher.Flush()
}
// The Go stdlib JSON encoder appends a newline character after message write.
encoder := json.NewEncoder(rw)
encoder := json.NewEncoder(wsNetConn)
for _, provisionerJobLog := range provisionerJobLogs {
err = encoder.Encode(convertProvisionerJobLog(provisionerJobLog))
if err != nil {
@ -171,9 +179,6 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
if err != nil {
return
}
if flusher, ok := rw.(http.Flusher); ok {
flusher.Flush()
}
case <-ticker.C:
job, err := api.Database.GetProvisionerJobByID(r.Context(), job.ID)
if err != nil {

View File

@ -6,11 +6,16 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/http/cookiejar"
"net/url"
"strconv"
"time"
"github.com/google/uuid"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"github.com/coder/coder/coderd/httpmw"
)
type LogSource string
@ -106,17 +111,30 @@ func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after
if !after.IsZero() {
afterQuery = fmt.Sprintf("&after=%d", after.UTC().UnixMilli())
}
res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("%s?follow%s", path, afterQuery), nil)
followURL, err := c.URL.Parse(fmt.Sprintf("%s?follow%s", path, afterQuery))
if err != nil {
return nil, err
}
if res.StatusCode != http.StatusOK {
defer res.Body.Close()
jar, err := cookiejar.New(nil)
if err != nil {
return nil, xerrors.Errorf("create cookie jar: %w", err)
}
jar.SetCookies(followURL, []*http.Cookie{{
Name: httpmw.SessionTokenKey,
Value: c.SessionToken,
}})
httpClient := &http.Client{
Jar: jar,
}
conn, res, err := websocket.Dial(ctx, followURL.String(), &websocket.DialOptions{
HTTPClient: httpClient,
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
return nil, readBodyAsError(res)
}
logs := make(chan ProvisionerJobLog)
decoder := json.NewDecoder(res.Body)
decoder := json.NewDecoder(websocket.NetConn(ctx, conn, websocket.MessageText))
go func() {
defer close(logs)
var log ProvisionerJobLog

View File

@ -193,7 +193,7 @@ for spec in "${specs[@]}"; do
--os "$spec_os" \
--arch "$spec_arch" \
--output "$spec_output_binary" \
"${build_args[@]}"
"${build_args[@]}" &
log
log
@ -227,3 +227,5 @@ for spec in "${specs[@]}"; do
log
fi
done
wait

View File

@ -1,5 +1,4 @@
import axios, { AxiosRequestHeaders } from "axios"
import ndjsonStream from "can-ndjson-stream"
import * as Types from "./types"
import { WorkspaceBuildTransition } from "./types"
import * as TypesGen from "./typesGenerated"
@ -280,25 +279,13 @@ export const getWorkspaceBuildByNumber = async (
return response.data
}
export const getWorkspaceBuildLogs = async (buildname: string): Promise<TypesGen.ProvisionerJobLog[]> => {
const response = await axios.get<TypesGen.ProvisionerJobLog[]>(`/api/v2/workspacebuilds/${buildname}/logs`)
export const getWorkspaceBuildLogs = async (buildname: string, before: Date): Promise<TypesGen.ProvisionerJobLog[]> => {
const response = await axios.get<TypesGen.ProvisionerJobLog[]>(
`/api/v2/workspacebuilds/${buildname}/logs?before=${before.getTime()}`,
)
return response.data
}
export const streamWorkspaceBuildLogs = async (
buildname: string,
): Promise<ReadableStreamDefaultReader<TypesGen.ProvisionerJobLog>> => {
// Axios does not support HTTP stream in the browser
// https://github.com/axios/axios/issues/1474
// So we are going to use window.fetch and return a "stream" reader
const reader = await window
.fetch(`/api/v2/workspacebuilds/${buildname}/logs?follow=true`)
.then((res) => ndjsonStream<TypesGen.ProvisionerJobLog>(res.body))
.then((stream) => stream.getReader())
return reader
}
export const putWorkspaceExtension = async (
workspaceId: string,
extendWorkspaceRequest: TypesGen.PutExtendWorkspaceRequest,

View File

@ -201,7 +201,7 @@ export interface ParameterSchema {
readonly validation_contains?: string[]
}
// From codersdk/provisionerdaemons.go:33:6
// From codersdk/provisionerdaemons.go:38:6
export interface ProvisionerDaemon {
readonly id: string
readonly created_at: string
@ -210,7 +210,7 @@ export interface ProvisionerDaemon {
readonly provisioners: ProvisionerType[]
}
// From codersdk/provisionerdaemons.go:62:6
// From codersdk/provisionerdaemons.go:67:6
export interface ProvisionerJob {
readonly id: string
readonly created_at: string
@ -222,7 +222,7 @@ export interface ProvisionerJob {
readonly storage_source: string
}
// From codersdk/provisionerdaemons.go:73:6
// From codersdk/provisionerdaemons.go:78:6
export interface ProvisionerJobLog {
readonly id: string
readonly created_at: string
@ -485,10 +485,10 @@ export interface WorkspaceResource {
// From codersdk/workspacebuilds.go:22:6
export type BuildReason = "autostart" | "autostop" | "initiator"
// From codersdk/provisionerdaemons.go:23:6
// From codersdk/provisionerdaemons.go:28:6
export type LogLevel = "debug" | "error" | "info" | "trace" | "warn"
// From codersdk/provisionerdaemons.go:16:6
// From codersdk/provisionerdaemons.go:21:6
export type LogSource = "provisioner" | "provisioner_daemon"
// From codersdk/parameters.go:29:6
@ -503,7 +503,7 @@ export type ParameterSourceScheme = "data" | "none"
// From codersdk/parameters.go:37:6
export type ParameterTypeSystem = "hcl" | "none"
// From codersdk/provisionerdaemons.go:42:6
// From codersdk/provisionerdaemons.go:47:6
export type ProvisionerJobStatus = "canceled" | "canceling" | "failed" | "pending" | "running" | "succeeded"
// From codersdk/organizations.go:14:6

View File

@ -13,9 +13,3 @@ export const Example = Template.bind({})
Example.args = {
logs: MockWorkspaceBuildLogs,
}
export const Loading = Template.bind({})
Loading.args = {
logs: MockWorkspaceBuildLogs,
isWaitingForLogs: true,
}

View File

@ -1,4 +1,3 @@
import CircularProgress from "@material-ui/core/CircularProgress"
import { makeStyles } from "@material-ui/core/styles"
import dayjs from "dayjs"
import { FC } from "react"
@ -40,17 +39,16 @@ const getStageDurationInSeconds = (logs: ProvisionerJobLog[]) => {
export interface WorkspaceBuildLogsProps {
logs: ProvisionerJobLog[]
isWaitingForLogs: boolean
}
export const WorkspaceBuildLogs: FC<WorkspaceBuildLogsProps> = ({ logs, isWaitingForLogs }) => {
export const WorkspaceBuildLogs: FC<WorkspaceBuildLogsProps> = ({ logs }) => {
const groupedLogsByStage = groupLogsByStage(logs)
const stages = Object.keys(groupedLogsByStage)
const styles = useStyles()
return (
<div className={styles.logs}>
{stages.map((stage, stageIndex) => {
{stages.map((stage) => {
const logs = groupedLogsByStage[stage]
const isEmpty = logs.every((log) => log.output === "")
const lines = logs.map((log) => ({
@ -58,15 +56,12 @@ export const WorkspaceBuildLogs: FC<WorkspaceBuildLogsProps> = ({ logs, isWaitin
output: log.output,
}))
const duration = getStageDurationInSeconds(logs)
const isLastStage = stageIndex === stages.length - 1
const shouldDisplaySpinner = isWaitingForLogs && isLastStage
const shouldDisplayDuration = !isWaitingForLogs && duration
const shouldDisplayDuration = duration !== undefined
return (
<div key={stage}>
<div className={styles.header}>
<div>{stage}</div>
{shouldDisplaySpinner && <CircularProgress size={14} className={styles.spinner} />}
{shouldDisplayDuration && (
<div className={styles.duration}>
{duration} {Language.seconds}
@ -109,8 +104,4 @@ const useStyles = makeStyles((theme) => ({
padding: theme.spacing(2),
paddingLeft: theme.spacing(4),
},
spinner: {
marginLeft: "auto",
},
}))

View File

@ -1,26 +1,27 @@
import { screen } from "@testing-library/react"
import * as API from "../../api/api"
import WS from "jest-websocket-mock"
import { MockWorkspace, MockWorkspaceBuild, renderWithAuth } from "../../testHelpers/renderHelpers"
import { WorkspaceBuildPage } from "./WorkspaceBuildPage"
describe("WorkspaceBuildPage", () => {
it("renders the stats and logs", async () => {
jest.spyOn(API, "streamWorkspaceBuildLogs").mockResolvedValueOnce({
read() {
return Promise.resolve({
value: undefined,
done: true,
})
},
releaseLock: jest.fn(),
closed: Promise.resolve(undefined),
cancel: jest.fn(),
})
const server = new WS(`ws://localhost/api/v2/workspacebuilds/${MockWorkspaceBuild.id}/logs`)
renderWithAuth(<WorkspaceBuildPage />, {
route: `/@${MockWorkspace.owner_name}/${MockWorkspace.name}/builds/${MockWorkspace.latest_build.build_number}`,
path: "/@:username/:workspace/builds/:buildNumber",
})
await server.connected
const log = {
id: "70459334-4878-4bda-a546-98eee166c4c6",
created_at: "2022-05-19T16:46:02.283Z",
log_source: "provisioner_daemon",
log_level: "info",
stage: "Another stage",
output: "",
}
server.send(JSON.stringify(log))
await screen.findByText(MockWorkspaceBuild.workspace_name)
await screen.findByText(log.stage)
server.close()
})
})

View File

@ -8,9 +8,10 @@ import { WorkspaceBuildPageView } from "./WorkspaceBuildPageView"
export const WorkspaceBuildPage: FC = () => {
const { username, workspace: workspaceName, buildNumber } = useParams()
const [buildState] = useMachine(workspaceBuildMachine, { context: { username, workspaceName, buildNumber } })
const [buildState] = useMachine(workspaceBuildMachine, {
context: { username, workspaceName, buildNumber, timeCursor: new Date() },
})
const { logs, build } = buildState.context
const isWaitingForLogs = !buildState.matches("logs.loaded")
return (
<>
@ -18,7 +19,7 @@ export const WorkspaceBuildPage: FC = () => {
<title>{build ? pageTitle(`Build #${build.build_number} · ${build.workspace_name}`) : ""}</title>
</Helmet>
<WorkspaceBuildPageView logs={logs} build={build} isWaitingForLogs={isWaitingForLogs} />
<WorkspaceBuildPageView logs={logs} build={build} />
</>
)
}

View File

@ -14,10 +14,9 @@ const sortLogsByCreatedAt = (logs: ProvisionerJobLog[]) => {
export interface WorkspaceBuildPageViewProps {
logs: ProvisionerJobLog[] | undefined
build: WorkspaceBuild | undefined
isWaitingForLogs: boolean
}
export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs, build, isWaitingForLogs }) => {
export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs, build }) => {
return (
<Margins>
<PageHeader>
@ -27,7 +26,7 @@ export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs,
<Stack>
{build && <WorkspaceBuildStats build={build} />}
{!logs && <Loader />}
{logs && <WorkspaceBuildLogs logs={sortLogsByCreatedAt(logs)} isWaitingForLogs={isWaitingForLogs} />}
{logs && <WorkspaceBuildLogs logs={sortLogsByCreatedAt(logs)} />}
</Stack>
</Margins>
)

View File

@ -124,4 +124,7 @@ export const handlers = [
rest.patch("/api/v2/workspacebuilds/:workspaceBuildId/cancel", (req, res, ctx) => {
return res(ctx.status(200), ctx.json(M.MockCancellationMessage))
}),
rest.get("/api/v2/workspacebuilds/:workspaceBuildId/logs", (req, res, ctx) => {
return res(ctx.status(200), ctx.json(M.MockWorkspaceBuildLogs))
}),
]

View File

@ -8,20 +8,18 @@ type LogsContext = {
workspaceName: string
buildNumber: string
buildId: string
// Used to reference logs before + after.
timeCursor: Date
build?: WorkspaceBuild
getBuildError?: Error | unknown
// Logs
logs?: ProvisionerJobLog[]
}
type LogsEvent =
| {
type: "ADD_LOG"
log: ProvisionerJobLog
}
| {
type: "NO_MORE_LOGS"
}
type LogsEvent = {
type: "ADD_LOG"
log: ProvisionerJobLog
}
export const workspaceBuildMachine = createMachine(
{
@ -33,6 +31,9 @@ export const workspaceBuildMachine = createMachine(
getWorkspaceBuild: {
data: WorkspaceBuild
}
getLogs: {
data: ProvisionerJobLog[]
}
},
},
tsTypes: {} as import("./workspaceBuildXService.typegen").Typegen0,
@ -54,8 +55,18 @@ export const workspaceBuildMachine = createMachine(
},
idle: {},
logs: {
initial: "watchingLogs",
initial: "gettingExistentLogs",
states: {
gettingExistentLogs: {
invoke: {
id: "getLogs",
src: "getLogs",
onDone: {
actions: ["assignLogs"],
target: "watchingLogs",
},
},
},
watchingLogs: {
id: "watchingLogs",
invoke: {
@ -71,9 +82,6 @@ export const workspaceBuildMachine = createMachine(
ADD_LOG: {
actions: "addLog",
},
NO_MORE_LOGS: {
target: "logs.loaded",
},
},
},
},
@ -94,6 +102,10 @@ export const workspaceBuildMachine = createMachine(
clearGetBuildError: assign({
getBuildError: (_) => undefined,
}),
// Logs
assignLogs: assign({
logs: (_, event) => event.data,
}),
addLog: assign({
logs: (context, event) => {
const previousLogs = context.logs ?? []
@ -103,21 +115,30 @@ export const workspaceBuildMachine = createMachine(
},
services: {
getWorkspaceBuild: (ctx) => API.getWorkspaceBuildByNumber(ctx.username, ctx.workspaceName, ctx.buildNumber),
getLogs: async (ctx) => API.getWorkspaceBuildLogs(ctx.buildId, ctx.timeCursor),
streamWorkspaceBuildLogs: (ctx) => async (callback) => {
const reader = await API.streamWorkspaceBuildLogs(ctx.buildId)
// Watching for the stream
// eslint-disable-next-line no-constant-condition, @typescript-eslint/no-unnecessary-condition
while (true) {
const { value, done } = await reader.read()
if (done) {
callback("NO_MORE_LOGS")
break
}
callback({ type: "ADD_LOG", log: value })
}
return new Promise<void>((resolve, reject) => {
const proto = location.protocol === "https:" ? "wss:" : "ws:"
const socket = new WebSocket(
`${proto}//${location.host}/api/v2/workspacebuilds/${
ctx.buildId
}/logs?follow=true&after=${ctx.timeCursor.getTime()}`,
)
socket.binaryType = "blob"
socket.addEventListener("message", (event) => {
callback({ type: "ADD_LOG", log: JSON.parse(event.data) })
})
socket.addEventListener("error", () => {
reject(new Error("socket errored"))
})
socket.addEventListener("open", () => {
resolve()
})
socket.addEventListener("close", () => {
// When the socket closes, logs have finished streaming!
resolve()
})
})
},
},
},