fix: Use WebSockets to stream workspace build logs (#2569)

* fix: Use WebSockets to stream workspace build logs

This was using a streaming HTTP request before, which didn't work
on my version of Chrome. This method seemed less reliable and standard
than a WebSocket, so figured switching would be best.

* Update site/src/xServices/workspaceBuild/workspaceBuildXService.ts

Co-authored-by: Abhineet Jain <AbhineetJain@users.noreply.github.com>

* Update site/src/pages/WorkspaceBuildPage/WorkspaceBuildPage.test.tsx

Co-authored-by: Abhineet Jain <AbhineetJain@users.noreply.github.com>

* Update site/src/api/api.ts

Co-authored-by: Abhineet Jain <AbhineetJain@users.noreply.github.com>

* Remove unused prop

Co-authored-by: Abhineet Jain <AbhineetJain@users.noreply.github.com>
This commit is contained in:
Kyle Carberry
2022-06-22 08:23:14 -05:00
committed by GitHub
parent dc7d6def8e
commit 1778db23cb
12 changed files with 130 additions and 108 deletions

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"nhooyr.io/websocket"
"cdr.dev/slog" "cdr.dev/slog"
@ -98,12 +99,28 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
return return
} }
api.websocketWaitMutex.Lock()
api.websocketWaitGroup.Add(1)
api.websocketWaitMutex.Unlock()
defer api.websocketWaitGroup.Done()
conn, err := websocket.Accept(rw, r, nil)
if err != nil {
httpapi.Write(rw, http.StatusBadRequest, httpapi.Response{
Message: "Failed to accept websocket.",
Detail: err.Error(),
})
return
}
ctx, wsNetConn := websocketNetConn(r.Context(), conn, websocket.MessageText)
defer wsNetConn.Close() // Also closes conn.
bufferedLogs := make(chan database.ProvisionerJobLog, 128) bufferedLogs := make(chan database.ProvisionerJobLog, 128)
closeSubscribe, err := api.Pubsub.Subscribe(provisionerJobLogsChannel(job.ID), func(ctx context.Context, message []byte) { closeSubscribe, err := api.Pubsub.Subscribe(provisionerJobLogsChannel(job.ID), func(ctx context.Context, message []byte) {
var logs []database.ProvisionerJobLog var logs []database.ProvisionerJobLog
err := json.Unmarshal(message, &logs) err := json.Unmarshal(message, &logs)
if err != nil { if err != nil {
api.Logger.Warn(r.Context(), fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error())) api.Logger.Warn(ctx, fmt.Sprintf("invalid provisioner job log on channel %q: %s", provisionerJobLogsChannel(job.ID), err.Error()))
return return
} }
@ -113,7 +130,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
default: default:
// If this overflows users could miss logs streaming. This can happen // If this overflows users could miss logs streaming. This can happen
// if a database request takes a long amount of time, and we get a lot of logs. // if a database request takes a long amount of time, and we get a lot of logs.
api.Logger.Warn(r.Context(), "provisioner job log overflowing channel") api.Logger.Warn(ctx, "provisioner job log overflowing channel")
} }
} }
}) })
@ -126,7 +143,7 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
} }
defer closeSubscribe() defer closeSubscribe()
provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(r.Context(), database.GetProvisionerLogsByIDBetweenParams{ provisionerJobLogs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
JobID: job.ID, JobID: job.ID,
CreatedAfter: after, CreatedAfter: after,
CreatedBefore: before, CreatedBefore: before,
@ -142,17 +159,8 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
return return
} }
// "follow" uses the ndjson format to stream data.
// See: https://canjs.com/doc/can-ndjson-stream.html
rw.Header().Set("Content-Type", "application/stream+json")
rw.WriteHeader(http.StatusOK)
if flusher, ok := rw.(http.Flusher); ok {
flusher.Flush()
}
// The Go stdlib JSON encoder appends a newline character after message write. // The Go stdlib JSON encoder appends a newline character after message write.
encoder := json.NewEncoder(rw) encoder := json.NewEncoder(wsNetConn)
for _, provisionerJobLog := range provisionerJobLogs { for _, provisionerJobLog := range provisionerJobLogs {
err = encoder.Encode(convertProvisionerJobLog(provisionerJobLog)) err = encoder.Encode(convertProvisionerJobLog(provisionerJobLog))
if err != nil { if err != nil {
@ -171,9 +179,6 @@ func (api *API) provisionerJobLogs(rw http.ResponseWriter, r *http.Request, job
if err != nil { if err != nil {
return return
} }
if flusher, ok := rw.(http.Flusher); ok {
flusher.Flush()
}
case <-ticker.C: case <-ticker.C:
job, err := api.Database.GetProvisionerJobByID(r.Context(), job.ID) job, err := api.Database.GetProvisionerJobByID(r.Context(), job.ID)
if err != nil { if err != nil {

View File

@ -6,11 +6,16 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"net/http/cookiejar"
"net/url" "net/url"
"strconv" "strconv"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"github.com/coder/coder/coderd/httpmw"
) )
type LogSource string type LogSource string
@ -106,17 +111,30 @@ func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after
if !after.IsZero() { if !after.IsZero() {
afterQuery = fmt.Sprintf("&after=%d", after.UTC().UnixMilli()) afterQuery = fmt.Sprintf("&after=%d", after.UTC().UnixMilli())
} }
res, err := c.Request(ctx, http.MethodGet, fmt.Sprintf("%s?follow%s", path, afterQuery), nil) followURL, err := c.URL.Parse(fmt.Sprintf("%s?follow%s", path, afterQuery))
if err != nil { if err != nil {
return nil, err return nil, err
} }
if res.StatusCode != http.StatusOK { jar, err := cookiejar.New(nil)
defer res.Body.Close() if err != nil {
return nil, xerrors.Errorf("create cookie jar: %w", err)
}
jar.SetCookies(followURL, []*http.Cookie{{
Name: httpmw.SessionTokenKey,
Value: c.SessionToken,
}})
httpClient := &http.Client{
Jar: jar,
}
conn, res, err := websocket.Dial(ctx, followURL.String(), &websocket.DialOptions{
HTTPClient: httpClient,
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
return nil, readBodyAsError(res) return nil, readBodyAsError(res)
} }
logs := make(chan ProvisionerJobLog) logs := make(chan ProvisionerJobLog)
decoder := json.NewDecoder(res.Body) decoder := json.NewDecoder(websocket.NetConn(ctx, conn, websocket.MessageText))
go func() { go func() {
defer close(logs) defer close(logs)
var log ProvisionerJobLog var log ProvisionerJobLog

View File

@ -193,7 +193,7 @@ for spec in "${specs[@]}"; do
--os "$spec_os" \ --os "$spec_os" \
--arch "$spec_arch" \ --arch "$spec_arch" \
--output "$spec_output_binary" \ --output "$spec_output_binary" \
"${build_args[@]}" "${build_args[@]}" &
log log
log log
@ -227,3 +227,5 @@ for spec in "${specs[@]}"; do
log log
fi fi
done done
wait

View File

@ -1,5 +1,4 @@
import axios, { AxiosRequestHeaders } from "axios" import axios, { AxiosRequestHeaders } from "axios"
import ndjsonStream from "can-ndjson-stream"
import * as Types from "./types" import * as Types from "./types"
import { WorkspaceBuildTransition } from "./types" import { WorkspaceBuildTransition } from "./types"
import * as TypesGen from "./typesGenerated" import * as TypesGen from "./typesGenerated"
@ -280,25 +279,13 @@ export const getWorkspaceBuildByNumber = async (
return response.data return response.data
} }
export const getWorkspaceBuildLogs = async (buildname: string): Promise<TypesGen.ProvisionerJobLog[]> => { export const getWorkspaceBuildLogs = async (buildname: string, before: Date): Promise<TypesGen.ProvisionerJobLog[]> => {
const response = await axios.get<TypesGen.ProvisionerJobLog[]>(`/api/v2/workspacebuilds/${buildname}/logs`) const response = await axios.get<TypesGen.ProvisionerJobLog[]>(
`/api/v2/workspacebuilds/${buildname}/logs?before=${before.getTime()}`,
)
return response.data return response.data
} }
export const streamWorkspaceBuildLogs = async (
buildname: string,
): Promise<ReadableStreamDefaultReader<TypesGen.ProvisionerJobLog>> => {
// Axios does not support HTTP stream in the browser
// https://github.com/axios/axios/issues/1474
// So we are going to use window.fetch and return a "stream" reader
const reader = await window
.fetch(`/api/v2/workspacebuilds/${buildname}/logs?follow=true`)
.then((res) => ndjsonStream<TypesGen.ProvisionerJobLog>(res.body))
.then((stream) => stream.getReader())
return reader
}
export const putWorkspaceExtension = async ( export const putWorkspaceExtension = async (
workspaceId: string, workspaceId: string,
extendWorkspaceRequest: TypesGen.PutExtendWorkspaceRequest, extendWorkspaceRequest: TypesGen.PutExtendWorkspaceRequest,

View File

@ -201,7 +201,7 @@ export interface ParameterSchema {
readonly validation_contains?: string[] readonly validation_contains?: string[]
} }
// From codersdk/provisionerdaemons.go:33:6 // From codersdk/provisionerdaemons.go:38:6
export interface ProvisionerDaemon { export interface ProvisionerDaemon {
readonly id: string readonly id: string
readonly created_at: string readonly created_at: string
@ -210,7 +210,7 @@ export interface ProvisionerDaemon {
readonly provisioners: ProvisionerType[] readonly provisioners: ProvisionerType[]
} }
// From codersdk/provisionerdaemons.go:62:6 // From codersdk/provisionerdaemons.go:67:6
export interface ProvisionerJob { export interface ProvisionerJob {
readonly id: string readonly id: string
readonly created_at: string readonly created_at: string
@ -222,7 +222,7 @@ export interface ProvisionerJob {
readonly storage_source: string readonly storage_source: string
} }
// From codersdk/provisionerdaemons.go:73:6 // From codersdk/provisionerdaemons.go:78:6
export interface ProvisionerJobLog { export interface ProvisionerJobLog {
readonly id: string readonly id: string
readonly created_at: string readonly created_at: string
@ -485,10 +485,10 @@ export interface WorkspaceResource {
// From codersdk/workspacebuilds.go:22:6 // From codersdk/workspacebuilds.go:22:6
export type BuildReason = "autostart" | "autostop" | "initiator" export type BuildReason = "autostart" | "autostop" | "initiator"
// From codersdk/provisionerdaemons.go:23:6 // From codersdk/provisionerdaemons.go:28:6
export type LogLevel = "debug" | "error" | "info" | "trace" | "warn" export type LogLevel = "debug" | "error" | "info" | "trace" | "warn"
// From codersdk/provisionerdaemons.go:16:6 // From codersdk/provisionerdaemons.go:21:6
export type LogSource = "provisioner" | "provisioner_daemon" export type LogSource = "provisioner" | "provisioner_daemon"
// From codersdk/parameters.go:29:6 // From codersdk/parameters.go:29:6
@ -503,7 +503,7 @@ export type ParameterSourceScheme = "data" | "none"
// From codersdk/parameters.go:37:6 // From codersdk/parameters.go:37:6
export type ParameterTypeSystem = "hcl" | "none" export type ParameterTypeSystem = "hcl" | "none"
// From codersdk/provisionerdaemons.go:42:6 // From codersdk/provisionerdaemons.go:47:6
export type ProvisionerJobStatus = "canceled" | "canceling" | "failed" | "pending" | "running" | "succeeded" export type ProvisionerJobStatus = "canceled" | "canceling" | "failed" | "pending" | "running" | "succeeded"
// From codersdk/organizations.go:14:6 // From codersdk/organizations.go:14:6

View File

@ -13,9 +13,3 @@ export const Example = Template.bind({})
Example.args = { Example.args = {
logs: MockWorkspaceBuildLogs, logs: MockWorkspaceBuildLogs,
} }
export const Loading = Template.bind({})
Loading.args = {
logs: MockWorkspaceBuildLogs,
isWaitingForLogs: true,
}

View File

@ -1,4 +1,3 @@
import CircularProgress from "@material-ui/core/CircularProgress"
import { makeStyles } from "@material-ui/core/styles" import { makeStyles } from "@material-ui/core/styles"
import dayjs from "dayjs" import dayjs from "dayjs"
import { FC } from "react" import { FC } from "react"
@ -40,17 +39,16 @@ const getStageDurationInSeconds = (logs: ProvisionerJobLog[]) => {
export interface WorkspaceBuildLogsProps { export interface WorkspaceBuildLogsProps {
logs: ProvisionerJobLog[] logs: ProvisionerJobLog[]
isWaitingForLogs: boolean
} }
export const WorkspaceBuildLogs: FC<WorkspaceBuildLogsProps> = ({ logs, isWaitingForLogs }) => { export const WorkspaceBuildLogs: FC<WorkspaceBuildLogsProps> = ({ logs }) => {
const groupedLogsByStage = groupLogsByStage(logs) const groupedLogsByStage = groupLogsByStage(logs)
const stages = Object.keys(groupedLogsByStage) const stages = Object.keys(groupedLogsByStage)
const styles = useStyles() const styles = useStyles()
return ( return (
<div className={styles.logs}> <div className={styles.logs}>
{stages.map((stage, stageIndex) => { {stages.map((stage) => {
const logs = groupedLogsByStage[stage] const logs = groupedLogsByStage[stage]
const isEmpty = logs.every((log) => log.output === "") const isEmpty = logs.every((log) => log.output === "")
const lines = logs.map((log) => ({ const lines = logs.map((log) => ({
@ -58,15 +56,12 @@ export const WorkspaceBuildLogs: FC<WorkspaceBuildLogsProps> = ({ logs, isWaitin
output: log.output, output: log.output,
})) }))
const duration = getStageDurationInSeconds(logs) const duration = getStageDurationInSeconds(logs)
const isLastStage = stageIndex === stages.length - 1 const shouldDisplayDuration = duration !== undefined
const shouldDisplaySpinner = isWaitingForLogs && isLastStage
const shouldDisplayDuration = !isWaitingForLogs && duration
return ( return (
<div key={stage}> <div key={stage}>
<div className={styles.header}> <div className={styles.header}>
<div>{stage}</div> <div>{stage}</div>
{shouldDisplaySpinner && <CircularProgress size={14} className={styles.spinner} />}
{shouldDisplayDuration && ( {shouldDisplayDuration && (
<div className={styles.duration}> <div className={styles.duration}>
{duration} {Language.seconds} {duration} {Language.seconds}
@ -109,8 +104,4 @@ const useStyles = makeStyles((theme) => ({
padding: theme.spacing(2), padding: theme.spacing(2),
paddingLeft: theme.spacing(4), paddingLeft: theme.spacing(4),
}, },
spinner: {
marginLeft: "auto",
},
})) }))

View File

@ -1,26 +1,27 @@
import { screen } from "@testing-library/react" import { screen } from "@testing-library/react"
import * as API from "../../api/api" import WS from "jest-websocket-mock"
import { MockWorkspace, MockWorkspaceBuild, renderWithAuth } from "../../testHelpers/renderHelpers" import { MockWorkspace, MockWorkspaceBuild, renderWithAuth } from "../../testHelpers/renderHelpers"
import { WorkspaceBuildPage } from "./WorkspaceBuildPage" import { WorkspaceBuildPage } from "./WorkspaceBuildPage"
describe("WorkspaceBuildPage", () => { describe("WorkspaceBuildPage", () => {
it("renders the stats and logs", async () => { it("renders the stats and logs", async () => {
jest.spyOn(API, "streamWorkspaceBuildLogs").mockResolvedValueOnce({ const server = new WS(`ws://localhost/api/v2/workspacebuilds/${MockWorkspaceBuild.id}/logs`)
read() {
return Promise.resolve({
value: undefined,
done: true,
})
},
releaseLock: jest.fn(),
closed: Promise.resolve(undefined),
cancel: jest.fn(),
})
renderWithAuth(<WorkspaceBuildPage />, { renderWithAuth(<WorkspaceBuildPage />, {
route: `/@${MockWorkspace.owner_name}/${MockWorkspace.name}/builds/${MockWorkspace.latest_build.build_number}`, route: `/@${MockWorkspace.owner_name}/${MockWorkspace.name}/builds/${MockWorkspace.latest_build.build_number}`,
path: "/@:username/:workspace/builds/:buildNumber", path: "/@:username/:workspace/builds/:buildNumber",
}) })
await server.connected
const log = {
id: "70459334-4878-4bda-a546-98eee166c4c6",
created_at: "2022-05-19T16:46:02.283Z",
log_source: "provisioner_daemon",
log_level: "info",
stage: "Another stage",
output: "",
}
server.send(JSON.stringify(log))
await screen.findByText(MockWorkspaceBuild.workspace_name) await screen.findByText(MockWorkspaceBuild.workspace_name)
await screen.findByText(log.stage)
server.close()
}) })
}) })

View File

@ -8,9 +8,10 @@ import { WorkspaceBuildPageView } from "./WorkspaceBuildPageView"
export const WorkspaceBuildPage: FC = () => { export const WorkspaceBuildPage: FC = () => {
const { username, workspace: workspaceName, buildNumber } = useParams() const { username, workspace: workspaceName, buildNumber } = useParams()
const [buildState] = useMachine(workspaceBuildMachine, { context: { username, workspaceName, buildNumber } }) const [buildState] = useMachine(workspaceBuildMachine, {
context: { username, workspaceName, buildNumber, timeCursor: new Date() },
})
const { logs, build } = buildState.context const { logs, build } = buildState.context
const isWaitingForLogs = !buildState.matches("logs.loaded")
return ( return (
<> <>
@ -18,7 +19,7 @@ export const WorkspaceBuildPage: FC = () => {
<title>{build ? pageTitle(`Build #${build.build_number} · ${build.workspace_name}`) : ""}</title> <title>{build ? pageTitle(`Build #${build.build_number} · ${build.workspace_name}`) : ""}</title>
</Helmet> </Helmet>
<WorkspaceBuildPageView logs={logs} build={build} isWaitingForLogs={isWaitingForLogs} /> <WorkspaceBuildPageView logs={logs} build={build} />
</> </>
) )
} }

View File

@ -14,10 +14,9 @@ const sortLogsByCreatedAt = (logs: ProvisionerJobLog[]) => {
export interface WorkspaceBuildPageViewProps { export interface WorkspaceBuildPageViewProps {
logs: ProvisionerJobLog[] | undefined logs: ProvisionerJobLog[] | undefined
build: WorkspaceBuild | undefined build: WorkspaceBuild | undefined
isWaitingForLogs: boolean
} }
export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs, build, isWaitingForLogs }) => { export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs, build }) => {
return ( return (
<Margins> <Margins>
<PageHeader> <PageHeader>
@ -27,7 +26,7 @@ export const WorkspaceBuildPageView: FC<WorkspaceBuildPageViewProps> = ({ logs,
<Stack> <Stack>
{build && <WorkspaceBuildStats build={build} />} {build && <WorkspaceBuildStats build={build} />}
{!logs && <Loader />} {!logs && <Loader />}
{logs && <WorkspaceBuildLogs logs={sortLogsByCreatedAt(logs)} isWaitingForLogs={isWaitingForLogs} />} {logs && <WorkspaceBuildLogs logs={sortLogsByCreatedAt(logs)} />}
</Stack> </Stack>
</Margins> </Margins>
) )

View File

@ -124,4 +124,7 @@ export const handlers = [
rest.patch("/api/v2/workspacebuilds/:workspaceBuildId/cancel", (req, res, ctx) => { rest.patch("/api/v2/workspacebuilds/:workspaceBuildId/cancel", (req, res, ctx) => {
return res(ctx.status(200), ctx.json(M.MockCancellationMessage)) return res(ctx.status(200), ctx.json(M.MockCancellationMessage))
}), }),
rest.get("/api/v2/workspacebuilds/:workspaceBuildId/logs", (req, res, ctx) => {
return res(ctx.status(200), ctx.json(M.MockWorkspaceBuildLogs))
}),
] ]

View File

@ -8,20 +8,18 @@ type LogsContext = {
workspaceName: string workspaceName: string
buildNumber: string buildNumber: string
buildId: string buildId: string
// Used to reference logs before + after.
timeCursor: Date
build?: WorkspaceBuild build?: WorkspaceBuild
getBuildError?: Error | unknown getBuildError?: Error | unknown
// Logs // Logs
logs?: ProvisionerJobLog[] logs?: ProvisionerJobLog[]
} }
type LogsEvent = type LogsEvent = {
| { type: "ADD_LOG"
type: "ADD_LOG" log: ProvisionerJobLog
log: ProvisionerJobLog }
}
| {
type: "NO_MORE_LOGS"
}
export const workspaceBuildMachine = createMachine( export const workspaceBuildMachine = createMachine(
{ {
@ -33,6 +31,9 @@ export const workspaceBuildMachine = createMachine(
getWorkspaceBuild: { getWorkspaceBuild: {
data: WorkspaceBuild data: WorkspaceBuild
} }
getLogs: {
data: ProvisionerJobLog[]
}
}, },
}, },
tsTypes: {} as import("./workspaceBuildXService.typegen").Typegen0, tsTypes: {} as import("./workspaceBuildXService.typegen").Typegen0,
@ -54,8 +55,18 @@ export const workspaceBuildMachine = createMachine(
}, },
idle: {}, idle: {},
logs: { logs: {
initial: "watchingLogs", initial: "gettingExistentLogs",
states: { states: {
gettingExistentLogs: {
invoke: {
id: "getLogs",
src: "getLogs",
onDone: {
actions: ["assignLogs"],
target: "watchingLogs",
},
},
},
watchingLogs: { watchingLogs: {
id: "watchingLogs", id: "watchingLogs",
invoke: { invoke: {
@ -71,9 +82,6 @@ export const workspaceBuildMachine = createMachine(
ADD_LOG: { ADD_LOG: {
actions: "addLog", actions: "addLog",
}, },
NO_MORE_LOGS: {
target: "logs.loaded",
},
}, },
}, },
}, },
@ -94,6 +102,10 @@ export const workspaceBuildMachine = createMachine(
clearGetBuildError: assign({ clearGetBuildError: assign({
getBuildError: (_) => undefined, getBuildError: (_) => undefined,
}), }),
// Logs
assignLogs: assign({
logs: (_, event) => event.data,
}),
addLog: assign({ addLog: assign({
logs: (context, event) => { logs: (context, event) => {
const previousLogs = context.logs ?? [] const previousLogs = context.logs ?? []
@ -103,21 +115,30 @@ export const workspaceBuildMachine = createMachine(
}, },
services: { services: {
getWorkspaceBuild: (ctx) => API.getWorkspaceBuildByNumber(ctx.username, ctx.workspaceName, ctx.buildNumber), getWorkspaceBuild: (ctx) => API.getWorkspaceBuildByNumber(ctx.username, ctx.workspaceName, ctx.buildNumber),
getLogs: async (ctx) => API.getWorkspaceBuildLogs(ctx.buildId, ctx.timeCursor),
streamWorkspaceBuildLogs: (ctx) => async (callback) => { streamWorkspaceBuildLogs: (ctx) => async (callback) => {
const reader = await API.streamWorkspaceBuildLogs(ctx.buildId) return new Promise<void>((resolve, reject) => {
const proto = location.protocol === "https:" ? "wss:" : "ws:"
// Watching for the stream const socket = new WebSocket(
// eslint-disable-next-line no-constant-condition, @typescript-eslint/no-unnecessary-condition `${proto}//${location.host}/api/v2/workspacebuilds/${
while (true) { ctx.buildId
const { value, done } = await reader.read() }/logs?follow=true&after=${ctx.timeCursor.getTime()}`,
)
if (done) { socket.binaryType = "blob"
callback("NO_MORE_LOGS") socket.addEventListener("message", (event) => {
break callback({ type: "ADD_LOG", log: JSON.parse(event.data) })
} })
socket.addEventListener("error", () => {
callback({ type: "ADD_LOG", log: value }) reject(new Error("socket errored"))
} })
socket.addEventListener("open", () => {
resolve()
})
socket.addEventListener("close", () => {
// When the socket closes, logs have finished streaming!
resolve()
})
})
}, },
}, },
}, },