From 3a9f9c845372ec8a97d7ad80a31a6175901cabeb Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 7 Feb 2025 14:06:39 +0200 Subject: [PATCH] Moving from streaming manifest approach to SSE + explicit full agent reinitialization Signed-off-by: Danny Kopping # Conflicts: # agent/agent.go # Conflicts: # cli/agent.go --- agent/agent.go | 270 +++++++----------- agent/agenttest/client.go | 3 +- agent/proto/agent.pb.go | 171 ++++++----- agent/proto/agent.proto | 1 - agent/proto/agent_drpc.pb.go | 86 +----- agent/proto/agent_drpc_old.go | 8 - agent/reaper/reaper_unix.go | 5 + cli/agent.go | 118 +++++--- coderd/agentapi/manifest.go | 62 +--- coderd/coderd.go | 1 + .../provisionerdserver/provisionerdserver.go | 20 +- coderd/workspaceagents.go | 102 ++++++- coderd/wsbuilder/wsbuilder.go | 2 +- codersdk/agentsdk/agentsdk.go | 74 ++++- 14 files changed, 482 insertions(+), 441 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index ef8e54e009..b05648c9c8 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -33,6 +33,9 @@ import ( "tailscale.com/util/clientmetric" "cdr.dev/slog" + + "github.com/coder/retry" + "github.com/coder/coder/v2/agent/agentcontainers" "github.com/coder/coder/v2/agent/agentexec" "github.com/coder/coder/v2/agent/agentscripts" @@ -47,7 +50,6 @@ import ( "github.com/coder/coder/v2/codersdk/workspacesdk" "github.com/coder/coder/v2/tailnet" tailnetproto "github.com/coder/coder/v2/tailnet/proto" - "github.com/coder/retry" ) const ( @@ -87,8 +89,8 @@ type Options struct { } type Client interface { - ConnectRPC24(ctx context.Context) ( - proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error, + ConnectRPC23(ctx context.Context) ( + proto.DRPCAgentClient23, tailnetproto.DRPCTailnetClient23, error, ) RewriteDERPMap(derpMap *tailcfg.DERPMap) } @@ -314,11 +316,11 @@ func (a *agent) runLoop() { if ctx.Err() != nil { // Context canceled errors may come from websocket pings, so we // don't want to use `errors.Is(err, context.Canceled)` here. - a.logger.Warn(ctx, "exiting", slog.Error(ctx.Err())) + a.logger.Warn(ctx, "runLoop exited with error", slog.Error(ctx.Err())) return } if a.isClosed() { - a.logger.Debug(ctx, "closed") + a.logger.Warn(ctx, "runLoop exited because agent is closed") return } if errors.Is(err, io.EOF) { @@ -408,7 +410,7 @@ func (t *trySingleflight) Do(key string, fn func()) { fn() } -func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient24) error { +func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient23) error { tickerDone := make(chan struct{}) collectDone := make(chan struct{}) ctx, cancel := context.WithCancel(ctx) @@ -624,7 +626,7 @@ func (a *agent) reportMetadata(ctx context.Context, aAPI proto.DRPCAgentClient24 // reportLifecycle reports the current lifecycle state once. All state // changes are reported in order. -func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient24) error { +func (a *agent) reportLifecycle(ctx context.Context, aAPI proto.DRPCAgentClient23) error { for { select { case <-a.lifecycleUpdate: @@ -706,7 +708,7 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) { // fetchServiceBannerLoop fetches the service banner on an interval. It will // not be fetched immediately; the expectation is that it is primed elsewhere // (and must be done before the session actually starts). -func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient24) error { +func (a *agent) fetchServiceBannerLoop(ctx context.Context, aAPI proto.DRPCAgentClient23) error { ticker := time.NewTicker(a.announcementBannersRefreshInterval) defer ticker.Stop() for { @@ -742,7 +744,7 @@ func (a *agent) run() (retErr error) { a.sessionToken.Store(&sessionToken) // ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs - aAPI, tAPI, err := a.client.ConnectRPC24(a.hardCtx) + aAPI, tAPI, err := a.client.ConnectRPC23(a.hardCtx) if err != nil { return err } @@ -759,7 +761,7 @@ func (a *agent) run() (retErr error) { connMan := newAPIConnRoutineManager(a.gracefulCtx, a.hardCtx, a.logger, aAPI, tAPI) connMan.startAgentAPI("init notification banners", gracefulShutdownBehaviorStop, - func(ctx context.Context, aAPI proto.DRPCAgentClient24) error { + func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { bannersProto, err := aAPI.GetAnnouncementBanners(ctx, &proto.GetAnnouncementBannersRequest{}) if err != nil { return xerrors.Errorf("fetch service banner: %w", err) @@ -776,7 +778,7 @@ func (a *agent) run() (retErr error) { // sending logs gets gracefulShutdownBehaviorRemain because we want to send logs generated by // shutdown scripts. connMan.startAgentAPI("send logs", gracefulShutdownBehaviorRemain, - func(ctx context.Context, aAPI proto.DRPCAgentClient24) error { + func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { err := a.logSender.SendLoop(ctx, aAPI) if xerrors.Is(err, agentsdk.LogLimitExceededError) { // we don't want this error to tear down the API connection and propagate to the @@ -813,11 +815,10 @@ func (a *agent) run() (retErr error) { networkOK := newCheckpoint(a.logger) manifestOK := newCheckpoint(a.logger) - //connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK)) - connMan.startAgentAPI("handle manifest stream", gracefulShutdownBehaviorStop, a.handleManifestStream(manifestOK)) + connMan.startAgentAPI("handle manifest", gracefulShutdownBehaviorStop, a.handleManifest(manifestOK)) connMan.startAgentAPI("app health reporter", gracefulShutdownBehaviorStop, - func(ctx context.Context, aAPI proto.DRPCAgentClient24) error { + func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { if err := manifestOK.wait(ctx); err != nil { return xerrors.Errorf("no manifest: %w", err) } @@ -850,182 +851,131 @@ func (a *agent) run() (retErr error) { connMan.startAgentAPI("fetch service banner loop", gracefulShutdownBehaviorStop, a.fetchServiceBannerLoop) - connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient24) error { + connMan.startAgentAPI("stats report loop", gracefulShutdownBehaviorStop, func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { if err := networkOK.wait(ctx); err != nil { return xerrors.Errorf("no network: %w", err) } return a.statsReporter.reportLoop(ctx, aAPI) }) - return connMan.wait() + err = connMan.wait() + a.logger.Error(context.Background(), "connection manager errored", slog.Error(err)) + return err } // handleManifest returns a function that fetches and processes the manifest -func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient24) error { - return func(ctx context.Context, aAPI proto.DRPCAgentClient24) error { - var err error +func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { + return func(ctx context.Context, aAPI proto.DRPCAgentClient23) error { + var ( + sentResult = false + err error + ) defer func() { - if err != nil { + if !sentResult { manifestOK.complete(err) } }() - mp, err := aAPI.GetManifest(ctx, &proto.GetManifestRequest{}) if err != nil { - return xerrors.Errorf("fetch manifest: %w", err) + return xerrors.Errorf("fetch metadata: %w", err) } a.logger.Info(ctx, "fetched manifest", slog.F("manifest", mp)) - return a.handleSingleManifest(ctx, aAPI, manifestOK, mp) - } -} - -func (a *agent) handleManifestStream(manifestOK *checkpoint) func(ctx context.Context, aAPI proto.DRPCAgentClient24) error { - return func(ctx context.Context, aAPI proto.DRPCAgentClient24) error { - var err error - defer func() { - if err != nil { - manifestOK.complete(err) - } - }() - - client, err := aAPI.StreamManifests(ctx, &proto.GetManifestRequest{}) + manifest, err := agentsdk.ManifestFromProto(mp) if err != nil { - if err == io.EOF { - a.logger.Info(ctx, "stream manifest received EOF") - return nil - } - return xerrors.Errorf("stream manifests: %w", err) + a.logger.Critical(ctx, "failed to convert manifest", slog.F("manifest", mp), slog.Error(err)) + return xerrors.Errorf("convert manifest: %w", err) } - - for { - a.logger.Debug(ctx, "waiting on new streamed manifest") - - manifest, err := client.Recv() - if err != nil { - return xerrors.Errorf("recv manifest: %w", err) - } - - a.logger.Info(ctx, "received new streamed manifest", slog.F("manifest", manifest)) - - err = a.handleSingleManifest(ctx, aAPI, manifestOK, manifest) - if err != nil { - return xerrors.Errorf("handle streamed manifest: %w", err) - } + if manifest.AgentID == uuid.Nil { + return xerrors.New("nil agentID returned by manifest") } - } -} + a.client.RewriteDERPMap(manifest.DERPMap) -func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentClient24, manifestOK *checkpoint, mp *proto.Manifest) error { - var ( - sentResult bool - err error - ) - defer func() { - if !sentResult { - manifestOK.complete(err) - } - }() - - a.metrics.manifestsReceived.Inc() - - manifest, err := agentsdk.ManifestFromProto(mp) - if err != nil { - a.logger.Critical(ctx, "failed to convert manifest", slog.F("manifest", mp), slog.Error(err)) - return xerrors.Errorf("convert manifest: %w", err) - } - if manifest.AgentID == uuid.Nil { - return xerrors.New("nil agentID returned by manifest") - } - a.client.RewriteDERPMap(manifest.DERPMap) - - // Expand the directory and send it back to coderd so external - // applications that rely on the directory can use it. - // - // An example is VS Code Remote, which must know the directory - // before initializing a connection. - manifest.Directory, err = expandDirectory(manifest.Directory) - if err != nil { - return xerrors.Errorf("expand directory: %w", err) - } - subsys, err := agentsdk.ProtoFromSubsystems(a.subsystems) - if err != nil { - a.logger.Critical(ctx, "failed to convert subsystems", slog.Error(err)) - return xerrors.Errorf("failed to convert subsystems: %w", err) - } - _, err = aAPI.UpdateStartup(ctx, &proto.UpdateStartupRequest{Startup: &proto.Startup{ - Version: buildinfo.Version(), - ExpandedDirectory: manifest.Directory, - Subsystems: subsys, - }}) - if err != nil { - return xerrors.Errorf("update workspace agent startup: %w", err) - } - - oldManifest := a.manifest.Swap(&manifest) - manifestOK.complete(nil) - sentResult = true - - // TODO: this will probably have to change in the case of prebuilds; maybe check if owner is the same, - // or add prebuild metadata to manifest? - // The startup script should only execute on the first run! - if oldManifest == nil { - a.setLifecycle(codersdk.WorkspaceAgentLifecycleStarting) - - // Perform overrides early so that Git auth can work even if users - // connect to a workspace that is not yet ready. We don't run this - // concurrently with the startup script to avoid conflicts between - // them. - if manifest.GitAuthConfigs > 0 { - // If this fails, we should consider surfacing the error in the - // startup log and setting the lifecycle state to be "start_error" - // (after startup script completion), but for now we'll just log it. - err := gitauth.OverrideVSCodeConfigs(a.filesystem) - if err != nil { - a.logger.Warn(ctx, "failed to override vscode git auth configs", slog.Error(err)) - } - } - - err = a.scriptRunner.Init(manifest.Scripts, aAPI.ScriptCompleted) + // Expand the directory and send it back to coderd so external + // applications that rely on the directory can use it. + // + // An example is VS Code Remote, which must know the directory + // before initializing a connection. + manifest.Directory, err = expandDirectory(manifest.Directory) if err != nil { - return xerrors.Errorf("init script runner: %w", err) + return xerrors.Errorf("expand directory: %w", err) } - err = a.trackGoroutine(func() { - start := time.Now() - // here we use the graceful context because the script runner is not directly tied - // to the agent API. - err := a.scriptRunner.Execute(a.gracefulCtx, agentscripts.ExecuteStartScripts) - // Measure the time immediately after the script has finished - dur := time.Since(start).Seconds() - if err != nil { - a.logger.Warn(ctx, "startup script(s) failed", slog.Error(err)) - if errors.Is(err, agentscripts.ErrTimeout) { - a.setLifecycle(codersdk.WorkspaceAgentLifecycleStartTimeout) - } else { - a.setLifecycle(codersdk.WorkspaceAgentLifecycleStartError) + subsys, err := agentsdk.ProtoFromSubsystems(a.subsystems) + if err != nil { + a.logger.Critical(ctx, "failed to convert subsystems", slog.Error(err)) + return xerrors.Errorf("failed to convert subsystems: %w", err) + } + _, err = aAPI.UpdateStartup(ctx, &proto.UpdateStartupRequest{Startup: &proto.Startup{ + Version: buildinfo.Version(), + ExpandedDirectory: manifest.Directory, + Subsystems: subsys, + }}) + if err != nil { + return xerrors.Errorf("update workspace agent startup: %w", err) + } + + oldManifest := a.manifest.Swap(&manifest) + manifestOK.complete(nil) + sentResult = true + + // The startup script should only execute on the first run! + if oldManifest == nil { + a.setLifecycle(codersdk.WorkspaceAgentLifecycleStarting) + + // Perform overrides early so that Git auth can work even if users + // connect to a workspace that is not yet ready. We don't run this + // concurrently with the startup script to avoid conflicts between + // them. + if manifest.GitAuthConfigs > 0 { + // If this fails, we should consider surfacing the error in the + // startup log and setting the lifecycle state to be "start_error" + // (after startup script completion), but for now we'll just log it. + err := gitauth.OverrideVSCodeConfigs(a.filesystem) + if err != nil { + a.logger.Warn(ctx, "failed to override vscode git auth configs", slog.Error(err)) } - } else { - a.setLifecycle(codersdk.WorkspaceAgentLifecycleReady) } - label := "false" - if err == nil { - label = "true" + err = a.scriptRunner.Init(manifest.Scripts, aAPI.ScriptCompleted) + if err != nil { + return xerrors.Errorf("init script runner: %w", err) + } + err = a.trackGoroutine(func() { + start := time.Now() + // here we use the graceful context because the script runner is not directly tied + // to the agent API. + err := a.scriptRunner.Execute(a.gracefulCtx, agentscripts.ExecuteStartScripts) + // Measure the time immediately after the script has finished + dur := time.Since(start).Seconds() + if err != nil { + a.logger.Warn(ctx, "startup script(s) failed", slog.Error(err)) + if errors.Is(err, agentscripts.ErrTimeout) { + a.setLifecycle(codersdk.WorkspaceAgentLifecycleStartTimeout) + } else { + a.setLifecycle(codersdk.WorkspaceAgentLifecycleStartError) + } + } else { + a.setLifecycle(codersdk.WorkspaceAgentLifecycleReady) + } + + label := "false" + if err == nil { + label = "true" + } + a.metrics.startupScriptSeconds.WithLabelValues(label).Set(dur) + a.scriptRunner.StartCron() + }) + if err != nil { + return xerrors.Errorf("track conn goroutine: %w", err) } - a.metrics.startupScriptSeconds.WithLabelValues(label).Set(dur) - a.scriptRunner.StartCron() - }) - if err != nil { - return xerrors.Errorf("track conn goroutine: %w", err) } + return nil } - - return nil } // createOrUpdateNetwork waits for the manifest to be set using manifestOK, then creates or updates // the tailnet using the information in the manifest -func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient24) error { - return func(ctx context.Context, _ proto.DRPCAgentClient24) (retErr error) { +func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(context.Context, proto.DRPCAgentClient23) error { + return func(ctx context.Context, _ proto.DRPCAgentClient23) (retErr error) { if err := manifestOK.wait(ctx); err != nil { return xerrors.Errorf("no manifest: %w", err) } @@ -1746,7 +1696,7 @@ const ( type apiConnRoutineManager struct { logger slog.Logger - aAPI proto.DRPCAgentClient24 + aAPI proto.DRPCAgentClient23 tAPI tailnetproto.DRPCTailnetClient23 eg *errgroup.Group stopCtx context.Context @@ -1755,7 +1705,7 @@ type apiConnRoutineManager struct { func newAPIConnRoutineManager( gracefulCtx, hardCtx context.Context, logger slog.Logger, - aAPI proto.DRPCAgentClient24, tAPI tailnetproto.DRPCTailnetClient23, + aAPI proto.DRPCAgentClient23, tAPI tailnetproto.DRPCTailnetClient23, ) *apiConnRoutineManager { // routines that remain in operation during graceful shutdown use the remainCtx. They'll still // exit if the errgroup hits an error, which usually means a problem with the conn. @@ -1788,7 +1738,7 @@ func newAPIConnRoutineManager( // but for Tailnet. func (a *apiConnRoutineManager) startAgentAPI( name string, behavior gracefulShutdownBehavior, - f func(context.Context, proto.DRPCAgentClient24) error, + f func(context.Context, proto.DRPCAgentClient23) error, ) { logger := a.logger.With(slog.F("name", name)) var ctx context.Context diff --git a/agent/agenttest/client.go b/agent/agenttest/client.go index 1926f5fa2e..fbd1ce7060 100644 --- a/agent/agenttest/client.go +++ b/agent/agenttest/client.go @@ -20,6 +20,7 @@ import ( "tailscale.com/tailcfg" "cdr.dev/slog" + agentproto "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" @@ -97,7 +98,7 @@ func (c *Client) Close() { } func (c *Client) ConnectRPC23(ctx context.Context) ( - agentproto.DRPCAgentClient24, proto.DRPCTailnetClient23, error, + agentproto.DRPCAgentClient23, proto.DRPCTailnetClient23, error, ) { conn, lis := drpcsdk.MemTransportPipe() c.LastWorkspaceAgent = func() { diff --git a/agent/proto/agent.pb.go b/agent/proto/agent.pb.go index a6a7433180..4b90e0cf60 100644 --- a/agent/proto/agent.pb.go +++ b/agent/proto/agent.pb.go @@ -3098,78 +3098,73 @@ var file_agent_proto_agent_proto_rawDesc = []byte{ 0x44, 0x49, 0x53, 0x41, 0x42, 0x4c, 0x45, 0x44, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x49, 0x4e, 0x49, 0x54, 0x49, 0x41, 0x4c, 0x49, 0x5a, 0x49, 0x4e, 0x47, 0x10, 0x02, 0x12, 0x0b, 0x0a, 0x07, 0x48, 0x45, 0x41, 0x4c, 0x54, 0x48, 0x59, 0x10, 0x03, 0x12, 0x0d, 0x0a, 0x09, 0x55, 0x4e, 0x48, - 0x45, 0x41, 0x4c, 0x54, 0x48, 0x59, 0x10, 0x04, 0x32, 0xc2, 0x08, 0x0a, 0x05, 0x41, 0x67, 0x65, + 0x45, 0x41, 0x4c, 0x54, 0x48, 0x59, 0x10, 0x04, 0x32, 0xef, 0x07, 0x0a, 0x05, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x12, 0x4b, 0x0a, 0x0b, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x12, 0x22, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x12, - 0x51, 0x0a, 0x0f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, - 0x74, 0x73, 0x12, 0x22, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, - 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x4d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, - 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x4d, 0x61, 0x6e, 0x69, 0x66, 0x65, 0x73, 0x74, - 0x30, 0x01, 0x12, 0x5a, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x42, 0x61, 0x6e, 0x6e, 0x65, 0x72, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, - 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x42, 0x61, 0x6e, 0x6e, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1d, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, - 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x42, 0x61, 0x6e, 0x6e, 0x65, 0x72, 0x12, 0x56, - 0x0a, 0x0b, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x22, 0x2e, - 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x55, - 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x23, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, - 0x76, 0x32, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x54, 0x0a, 0x0f, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x4c, 0x69, 0x66, 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x12, 0x26, 0x2e, 0x63, 0x6f, 0x64, 0x65, - 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, - 0x65, 0x4c, 0x69, 0x66, 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, - 0x76, 0x32, 0x2e, 0x4c, 0x69, 0x66, 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x12, 0x72, 0x0a, 0x15, - 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x41, 0x70, 0x70, 0x48, 0x65, - 0x61, 0x6c, 0x74, 0x68, 0x73, 0x12, 0x2b, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, - 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x41, 0x70, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x2c, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, - 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x41, - 0x70, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x4e, 0x0a, 0x0d, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x75, - 0x70, 0x12, 0x24, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, - 0x76, 0x32, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x75, 0x70, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, - 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x75, 0x70, - 0x12, 0x6e, 0x0a, 0x13, 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4d, - 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x2a, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, - 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, - 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x62, 0x0a, 0x0f, 0x42, 0x61, 0x74, 0x63, 0x68, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4c, - 0x6f, 0x67, 0x73, 0x12, 0x26, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, - 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, - 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x27, 0x2e, 0x63, 0x6f, - 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x61, 0x74, - 0x63, 0x68, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x77, 0x0a, 0x16, 0x47, 0x65, 0x74, 0x41, 0x6e, 0x6e, 0x6f, 0x75, - 0x6e, 0x63, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x61, 0x6e, 0x6e, 0x65, 0x72, 0x73, 0x12, 0x2d, + 0x5a, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x42, 0x61, 0x6e, + 0x6e, 0x65, 0x72, 0x12, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, + 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x42, + 0x61, 0x6e, 0x6e, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x63, + 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x42, 0x61, 0x6e, 0x6e, 0x65, 0x72, 0x12, 0x56, 0x0a, 0x0b, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x22, 0x2e, 0x63, 0x6f, 0x64, + 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x55, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, - 0x47, 0x65, 0x74, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x42, - 0x61, 0x6e, 0x6e, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, 0x2e, - 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x47, - 0x65, 0x74, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x61, - 0x6e, 0x6e, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x7e, 0x0a, - 0x0f, 0x53, 0x63, 0x72, 0x69, 0x70, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, - 0x12, 0x34, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, - 0x32, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x41, 0x67, 0x65, 0x6e, 0x74, - 0x53, 0x63, 0x72, 0x69, 0x70, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x35, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, - 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, - 0x65, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x63, 0x72, 0x69, 0x70, 0x74, 0x43, 0x6f, 0x6d, 0x70, - 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x27, 0x5a, - 0x25, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x64, 0x65, - 0x72, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x54, 0x0a, 0x0f, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x66, + 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x12, 0x26, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, + 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4c, 0x69, + 0x66, 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, + 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, + 0x4c, 0x69, 0x66, 0x65, 0x63, 0x79, 0x63, 0x6c, 0x65, 0x12, 0x72, 0x0a, 0x15, 0x42, 0x61, 0x74, + 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x41, 0x70, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x74, + 0x68, 0x73, 0x12, 0x2b, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, + 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x41, + 0x70, 0x70, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x2c, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, + 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x41, 0x70, 0x70, 0x48, + 0x65, 0x61, 0x6c, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4e, 0x0a, + 0x0d, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x75, 0x70, 0x12, 0x24, + 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x53, 0x74, 0x61, 0x72, 0x74, 0x75, 0x70, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, + 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x53, 0x74, 0x61, 0x72, 0x74, 0x75, 0x70, 0x12, 0x6e, 0x0a, + 0x13, 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4d, 0x65, 0x74, 0x61, + 0x64, 0x61, 0x74, 0x61, 0x12, 0x2a, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, + 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, + 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x2b, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, + 0x32, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x4d, 0x65, 0x74, + 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x62, 0x0a, + 0x0f, 0x42, 0x61, 0x74, 0x63, 0x68, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4c, 0x6f, 0x67, 0x73, + 0x12, 0x26, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, + 0x32, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4c, 0x6f, 0x67, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x27, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, + 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x77, 0x0a, 0x16, 0x47, 0x65, 0x74, 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, + 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x61, 0x6e, 0x6e, 0x65, 0x72, 0x73, 0x12, 0x2d, 0x2e, 0x63, 0x6f, + 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, + 0x41, 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x61, 0x6e, 0x6e, + 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2e, 0x2e, 0x63, 0x6f, 0x64, + 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x47, 0x65, 0x74, 0x41, + 0x6e, 0x6e, 0x6f, 0x75, 0x6e, 0x63, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x42, 0x61, 0x6e, 0x6e, 0x65, + 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x7e, 0x0a, 0x0f, 0x53, 0x63, + 0x72, 0x69, 0x70, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x12, 0x34, 0x2e, + 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x57, + 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x53, 0x63, 0x72, + 0x69, 0x70, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x35, 0x2e, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2e, 0x61, 0x67, 0x65, 0x6e, + 0x74, 0x2e, 0x76, 0x32, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x73, 0x70, 0x61, 0x63, 0x65, 0x41, 0x67, + 0x65, 0x6e, 0x74, 0x53, 0x63, 0x72, 0x69, 0x70, 0x74, 0x43, 0x6f, 0x6d, 0x70, 0x6c, 0x65, 0x74, + 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x27, 0x5a, 0x25, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x63, + 0x6f, 0x64, 0x65, 0x72, 0x2f, 0x76, 0x32, 0x2f, 0x61, 0x67, 0x65, 0x6e, 0x74, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -3277,29 +3272,27 @@ var file_agent_proto_agent_proto_depIdxs = []int32{ 43, // 37: coder.agent.v2.Stats.Metric.labels:type_name -> coder.agent.v2.Stats.Metric.Label 0, // 38: coder.agent.v2.BatchUpdateAppHealthRequest.HealthUpdate.health:type_name -> coder.agent.v2.AppHealth 13, // 39: coder.agent.v2.Agent.GetManifest:input_type -> coder.agent.v2.GetManifestRequest - 13, // 40: coder.agent.v2.Agent.StreamManifests:input_type -> coder.agent.v2.GetManifestRequest - 15, // 41: coder.agent.v2.Agent.GetServiceBanner:input_type -> coder.agent.v2.GetServiceBannerRequest - 17, // 42: coder.agent.v2.Agent.UpdateStats:input_type -> coder.agent.v2.UpdateStatsRequest - 20, // 43: coder.agent.v2.Agent.UpdateLifecycle:input_type -> coder.agent.v2.UpdateLifecycleRequest - 21, // 44: coder.agent.v2.Agent.BatchUpdateAppHealths:input_type -> coder.agent.v2.BatchUpdateAppHealthRequest - 24, // 45: coder.agent.v2.Agent.UpdateStartup:input_type -> coder.agent.v2.UpdateStartupRequest - 26, // 46: coder.agent.v2.Agent.BatchUpdateMetadata:input_type -> coder.agent.v2.BatchUpdateMetadataRequest - 29, // 47: coder.agent.v2.Agent.BatchCreateLogs:input_type -> coder.agent.v2.BatchCreateLogsRequest - 31, // 48: coder.agent.v2.Agent.GetAnnouncementBanners:input_type -> coder.agent.v2.GetAnnouncementBannersRequest - 34, // 49: coder.agent.v2.Agent.ScriptCompleted:input_type -> coder.agent.v2.WorkspaceAgentScriptCompletedRequest - 12, // 50: coder.agent.v2.Agent.GetManifest:output_type -> coder.agent.v2.Manifest - 12, // 51: coder.agent.v2.Agent.StreamManifests:output_type -> coder.agent.v2.Manifest - 14, // 52: coder.agent.v2.Agent.GetServiceBanner:output_type -> coder.agent.v2.ServiceBanner - 18, // 53: coder.agent.v2.Agent.UpdateStats:output_type -> coder.agent.v2.UpdateStatsResponse - 19, // 54: coder.agent.v2.Agent.UpdateLifecycle:output_type -> coder.agent.v2.Lifecycle - 22, // 55: coder.agent.v2.Agent.BatchUpdateAppHealths:output_type -> coder.agent.v2.BatchUpdateAppHealthResponse - 23, // 56: coder.agent.v2.Agent.UpdateStartup:output_type -> coder.agent.v2.Startup - 27, // 57: coder.agent.v2.Agent.BatchUpdateMetadata:output_type -> coder.agent.v2.BatchUpdateMetadataResponse - 30, // 58: coder.agent.v2.Agent.BatchCreateLogs:output_type -> coder.agent.v2.BatchCreateLogsResponse - 32, // 59: coder.agent.v2.Agent.GetAnnouncementBanners:output_type -> coder.agent.v2.GetAnnouncementBannersResponse - 35, // 60: coder.agent.v2.Agent.ScriptCompleted:output_type -> coder.agent.v2.WorkspaceAgentScriptCompletedResponse - 50, // [50:61] is the sub-list for method output_type - 39, // [39:50] is the sub-list for method input_type + 15, // 40: coder.agent.v2.Agent.GetServiceBanner:input_type -> coder.agent.v2.GetServiceBannerRequest + 17, // 41: coder.agent.v2.Agent.UpdateStats:input_type -> coder.agent.v2.UpdateStatsRequest + 20, // 42: coder.agent.v2.Agent.UpdateLifecycle:input_type -> coder.agent.v2.UpdateLifecycleRequest + 21, // 43: coder.agent.v2.Agent.BatchUpdateAppHealths:input_type -> coder.agent.v2.BatchUpdateAppHealthRequest + 24, // 44: coder.agent.v2.Agent.UpdateStartup:input_type -> coder.agent.v2.UpdateStartupRequest + 26, // 45: coder.agent.v2.Agent.BatchUpdateMetadata:input_type -> coder.agent.v2.BatchUpdateMetadataRequest + 29, // 46: coder.agent.v2.Agent.BatchCreateLogs:input_type -> coder.agent.v2.BatchCreateLogsRequest + 31, // 47: coder.agent.v2.Agent.GetAnnouncementBanners:input_type -> coder.agent.v2.GetAnnouncementBannersRequest + 34, // 48: coder.agent.v2.Agent.ScriptCompleted:input_type -> coder.agent.v2.WorkspaceAgentScriptCompletedRequest + 12, // 49: coder.agent.v2.Agent.GetManifest:output_type -> coder.agent.v2.Manifest + 14, // 50: coder.agent.v2.Agent.GetServiceBanner:output_type -> coder.agent.v2.ServiceBanner + 18, // 51: coder.agent.v2.Agent.UpdateStats:output_type -> coder.agent.v2.UpdateStatsResponse + 19, // 52: coder.agent.v2.Agent.UpdateLifecycle:output_type -> coder.agent.v2.Lifecycle + 22, // 53: coder.agent.v2.Agent.BatchUpdateAppHealths:output_type -> coder.agent.v2.BatchUpdateAppHealthResponse + 23, // 54: coder.agent.v2.Agent.UpdateStartup:output_type -> coder.agent.v2.Startup + 27, // 55: coder.agent.v2.Agent.BatchUpdateMetadata:output_type -> coder.agent.v2.BatchUpdateMetadataResponse + 30, // 56: coder.agent.v2.Agent.BatchCreateLogs:output_type -> coder.agent.v2.BatchCreateLogsResponse + 32, // 57: coder.agent.v2.Agent.GetAnnouncementBanners:output_type -> coder.agent.v2.GetAnnouncementBannersResponse + 35, // 58: coder.agent.v2.Agent.ScriptCompleted:output_type -> coder.agent.v2.WorkspaceAgentScriptCompletedResponse + 49, // [49:59] is the sub-list for method output_type + 39, // [39:49] is the sub-list for method input_type 39, // [39:39] is the sub-list for extension type_name 39, // [39:39] is the sub-list for extension extendee 0, // [0:39] is the sub-list for field type_name diff --git a/agent/proto/agent.proto b/agent/proto/agent.proto index 7c5327949d..f307066fcb 100644 --- a/agent/proto/agent.proto +++ b/agent/proto/agent.proto @@ -297,7 +297,6 @@ message Timing { service Agent { rpc GetManifest(GetManifestRequest) returns (Manifest); - rpc StreamManifests(GetManifestRequest) returns (stream Manifest); rpc GetServiceBanner(GetServiceBannerRequest) returns (ServiceBanner); rpc UpdateStats(UpdateStatsRequest) returns (UpdateStatsResponse); rpc UpdateLifecycle(UpdateLifecycleRequest) returns (Lifecycle); diff --git a/agent/proto/agent_drpc.pb.go b/agent/proto/agent_drpc.pb.go index 7c5c325305..9f7e21c962 100644 --- a/agent/proto/agent_drpc.pb.go +++ b/agent/proto/agent_drpc.pb.go @@ -39,7 +39,6 @@ type DRPCAgentClient interface { DRPCConn() drpc.Conn GetManifest(ctx context.Context, in *GetManifestRequest) (*Manifest, error) - StreamManifests(ctx context.Context, in *GetManifestRequest) (DRPCAgent_StreamManifestsClient, error) GetServiceBanner(ctx context.Context, in *GetServiceBannerRequest) (*ServiceBanner, error) UpdateStats(ctx context.Context, in *UpdateStatsRequest) (*UpdateStatsResponse, error) UpdateLifecycle(ctx context.Context, in *UpdateLifecycleRequest) (*Lifecycle, error) @@ -70,46 +69,6 @@ func (c *drpcAgentClient) GetManifest(ctx context.Context, in *GetManifestReques return out, nil } -func (c *drpcAgentClient) StreamManifests(ctx context.Context, in *GetManifestRequest) (DRPCAgent_StreamManifestsClient, error) { - stream, err := c.cc.NewStream(ctx, "/coder.agent.v2.Agent/StreamManifests", drpcEncoding_File_agent_proto_agent_proto{}) - if err != nil { - return nil, err - } - x := &drpcAgent_StreamManifestsClient{stream} - if err := x.MsgSend(in, drpcEncoding_File_agent_proto_agent_proto{}); err != nil { - return nil, err - } - if err := x.CloseSend(); err != nil { - return nil, err - } - return x, nil -} - -type DRPCAgent_StreamManifestsClient interface { - drpc.Stream - Recv() (*Manifest, error) -} - -type drpcAgent_StreamManifestsClient struct { - drpc.Stream -} - -func (x *drpcAgent_StreamManifestsClient) GetStream() drpc.Stream { - return x.Stream -} - -func (x *drpcAgent_StreamManifestsClient) Recv() (*Manifest, error) { - m := new(Manifest) - if err := x.MsgRecv(m, drpcEncoding_File_agent_proto_agent_proto{}); err != nil { - return nil, err - } - return m, nil -} - -func (x *drpcAgent_StreamManifestsClient) RecvMsg(m *Manifest) error { - return x.MsgRecv(m, drpcEncoding_File_agent_proto_agent_proto{}) -} - func (c *drpcAgentClient) GetServiceBanner(ctx context.Context, in *GetServiceBannerRequest) (*ServiceBanner, error) { out := new(ServiceBanner) err := c.cc.Invoke(ctx, "/coder.agent.v2.Agent/GetServiceBanner", drpcEncoding_File_agent_proto_agent_proto{}, in, out) @@ -193,7 +152,6 @@ func (c *drpcAgentClient) ScriptCompleted(ctx context.Context, in *WorkspaceAgen type DRPCAgentServer interface { GetManifest(context.Context, *GetManifestRequest) (*Manifest, error) - StreamManifests(*GetManifestRequest, DRPCAgent_StreamManifestsStream) error GetServiceBanner(context.Context, *GetServiceBannerRequest) (*ServiceBanner, error) UpdateStats(context.Context, *UpdateStatsRequest) (*UpdateStatsResponse, error) UpdateLifecycle(context.Context, *UpdateLifecycleRequest) (*Lifecycle, error) @@ -211,10 +169,6 @@ func (s *DRPCAgentUnimplementedServer) GetManifest(context.Context, *GetManifest return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } -func (s *DRPCAgentUnimplementedServer) StreamManifests(*GetManifestRequest, DRPCAgent_StreamManifestsStream) error { - return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) -} - func (s *DRPCAgentUnimplementedServer) GetServiceBanner(context.Context, *GetServiceBannerRequest) (*ServiceBanner, error) { return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } @@ -253,7 +207,7 @@ func (s *DRPCAgentUnimplementedServer) ScriptCompleted(context.Context, *Workspa type DRPCAgentDescription struct{} -func (DRPCAgentDescription) NumMethods() int { return 11 } +func (DRPCAgentDescription) NumMethods() int { return 10 } func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) { switch n { @@ -267,15 +221,6 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, ) }, DRPCAgentServer.GetManifest, true case 1: - return "/coder.agent.v2.Agent/StreamManifests", drpcEncoding_File_agent_proto_agent_proto{}, - func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { - return nil, srv.(DRPCAgentServer). - StreamManifests( - in1.(*GetManifestRequest), - &drpcAgent_StreamManifestsStream{in2.(drpc.Stream)}, - ) - }, DRPCAgentServer.StreamManifests, true - case 2: return "/coder.agent.v2.Agent/GetServiceBanner", drpcEncoding_File_agent_proto_agent_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCAgentServer). @@ -284,7 +229,7 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, in1.(*GetServiceBannerRequest), ) }, DRPCAgentServer.GetServiceBanner, true - case 3: + case 2: return "/coder.agent.v2.Agent/UpdateStats", drpcEncoding_File_agent_proto_agent_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCAgentServer). @@ -293,7 +238,7 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, in1.(*UpdateStatsRequest), ) }, DRPCAgentServer.UpdateStats, true - case 4: + case 3: return "/coder.agent.v2.Agent/UpdateLifecycle", drpcEncoding_File_agent_proto_agent_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCAgentServer). @@ -302,7 +247,7 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, in1.(*UpdateLifecycleRequest), ) }, DRPCAgentServer.UpdateLifecycle, true - case 5: + case 4: return "/coder.agent.v2.Agent/BatchUpdateAppHealths", drpcEncoding_File_agent_proto_agent_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCAgentServer). @@ -311,7 +256,7 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, in1.(*BatchUpdateAppHealthRequest), ) }, DRPCAgentServer.BatchUpdateAppHealths, true - case 6: + case 5: return "/coder.agent.v2.Agent/UpdateStartup", drpcEncoding_File_agent_proto_agent_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCAgentServer). @@ -320,7 +265,7 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, in1.(*UpdateStartupRequest), ) }, DRPCAgentServer.UpdateStartup, true - case 7: + case 6: return "/coder.agent.v2.Agent/BatchUpdateMetadata", drpcEncoding_File_agent_proto_agent_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCAgentServer). @@ -329,7 +274,7 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, in1.(*BatchUpdateMetadataRequest), ) }, DRPCAgentServer.BatchUpdateMetadata, true - case 8: + case 7: return "/coder.agent.v2.Agent/BatchCreateLogs", drpcEncoding_File_agent_proto_agent_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCAgentServer). @@ -338,7 +283,7 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, in1.(*BatchCreateLogsRequest), ) }, DRPCAgentServer.BatchCreateLogs, true - case 9: + case 8: return "/coder.agent.v2.Agent/GetAnnouncementBanners", drpcEncoding_File_agent_proto_agent_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCAgentServer). @@ -347,7 +292,7 @@ func (DRPCAgentDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, in1.(*GetAnnouncementBannersRequest), ) }, DRPCAgentServer.GetAnnouncementBanners, true - case 10: + case 9: return "/coder.agent.v2.Agent/ScriptCompleted", drpcEncoding_File_agent_proto_agent_proto{}, func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return srv.(DRPCAgentServer). @@ -381,19 +326,6 @@ func (x *drpcAgent_GetManifestStream) SendAndClose(m *Manifest) error { return x.CloseSend() } -type DRPCAgent_StreamManifestsStream interface { - drpc.Stream - Send(*Manifest) error -} - -type drpcAgent_StreamManifestsStream struct { - drpc.Stream -} - -func (x *drpcAgent_StreamManifestsStream) Send(m *Manifest) error { - return x.MsgSend(m, drpcEncoding_File_agent_proto_agent_proto{}) -} - type DRPCAgent_GetServiceBannerStream interface { drpc.Stream SendAndClose(*ServiceBanner) error diff --git a/agent/proto/agent_drpc_old.go b/agent/proto/agent_drpc_old.go index 33d56c71ac..f46afaba42 100644 --- a/agent/proto/agent_drpc_old.go +++ b/agent/proto/agent_drpc_old.go @@ -40,11 +40,3 @@ type DRPCAgentClient23 interface { DRPCAgentClient22 ScriptCompleted(ctx context.Context, in *WorkspaceAgentScriptCompletedRequest) (*WorkspaceAgentScriptCompletedResponse, error) } - -// DRPCAgentClient24 is the Agent API at v2.4. It adds the StreamManifests RPC. Compatible with -// Coder v2.20+ -// -type DRPCAgentClient24 interface { - DRPCAgentClient23 - StreamManifests(ctx context.Context, in *GetManifestRequest) (DRPCAgent_StreamManifestsClient, error) -} diff --git a/agent/reaper/reaper_unix.go b/agent/reaper/reaper_unix.go index 35ce9bfaa1..5a7c7d2f51 100644 --- a/agent/reaper/reaper_unix.go +++ b/agent/reaper/reaper_unix.go @@ -3,6 +3,7 @@ package reaper import ( + "fmt" "os" "os/signal" "syscall" @@ -29,6 +30,10 @@ func catchSignals(pid int, sigs []os.Signal) { s := <-sc sig, ok := s.(syscall.Signal) if ok { + // TODO: + // Tried using a logger here but the I/O streams are already closed at this point... + // Why is os.Stderr still working then? + _, _ = fmt.Fprintf(os.Stderr, "reaper caught %q signal, killing process %v\n", sig.String(), pid) _ = syscall.Kill(pid, sig) } } diff --git a/cli/agent.go b/cli/agent.go index e8a46a84e0..acdcd2c9d3 100644 --- a/cli/agent.go +++ b/cli/agent.go @@ -2,6 +2,7 @@ package cli import ( "context" + "errors" "fmt" "io" "net/http" @@ -15,6 +16,7 @@ import ( "time" "cloud.google.com/go/compute/metadata" + "github.com/coder/retry" "golang.org/x/xerrors" "gopkg.in/natefinch/lumberjack.v2" @@ -24,6 +26,8 @@ import ( "cdr.dev/slog/sloggers/sloghuman" "cdr.dev/slog/sloggers/slogjson" "cdr.dev/slog/sloggers/slogstackdriver" + "github.com/coder/serpent" + "github.com/coder/coder/v2/agent" "github.com/coder/coder/v2/agent/agentcontainers" "github.com/coder/coder/v2/agent/agentexec" @@ -33,7 +37,6 @@ import ( "github.com/coder/coder/v2/cli/clilog" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" - "github.com/coder/serpent" ) func (r *RootCmd) workspaceAgent() *serpent.Command { @@ -61,8 +64,11 @@ func (r *RootCmd) workspaceAgent() *serpent.Command { // This command isn't useful to manually execute. Hidden: true, Handler: func(inv *serpent.Invocation) error { - ctx, cancel := context.WithCancel(inv.Context()) - defer cancel() + ctx, cancel := context.WithCancelCause(inv.Context()) + defer func() { + fmt.Printf(">>>>>CANCELING CONTEXT") + cancel(errors.New("defer")) + }() var ( ignorePorts = map[int]string{} @@ -278,7 +284,6 @@ func (r *RootCmd) workspaceAgent() *serpent.Command { return xerrors.Errorf("add executable to $PATH: %w", err) } - prometheusRegistry := prometheus.NewRegistry() subsystemsRaw := inv.Environ.Get(agent.EnvAgentSubsystem) subsystems := []codersdk.AgentSubsystem{} for _, s := range strings.Split(subsystemsRaw, ",") { @@ -325,43 +330,88 @@ func (r *RootCmd) workspaceAgent() *serpent.Command { containerLister = agentcontainers.NewDocker(execer) } - agnt := agent.New(agent.Options{ - Client: client, - Logger: logger, - LogDir: logDir, - ScriptDataDir: scriptDataDir, - TailnetListenPort: uint16(tailnetListenPort), - ExchangeToken: func(ctx context.Context) (string, error) { - if exchangeToken == nil { - return client.SDK.SessionToken(), nil + // TODO: timeout ok? + reinitCtx, reinitCancel := context.WithTimeout(context.Background(), time.Hour*24) + defer reinitCancel() + reinitEvents := make(chan agentsdk.ReinitializationResponse) + + go func() { + // Retry to wait for reinit, main context cancels the retrier. + for retrier := retry.New(100*time.Millisecond, 10*time.Second); retrier.Wait(ctx); { + select { + case <-reinitCtx.Done(): + return + default: } - resp, err := exchangeToken(ctx) + + err := client.WaitForReinit(reinitCtx, reinitEvents) if err != nil { - return "", err + logger.Error(ctx, "failed to wait for reinit instructions, will retry", slog.Error(err)) } - client.SetSessionToken(resp.SessionToken) - return resp.SessionToken, nil - }, - EnvironmentVariables: environmentVariables, - IgnorePorts: ignorePorts, - SSHMaxTimeout: sshMaxTimeout, - Subsystems: subsystems, + } + }() - PrometheusRegistry: prometheusRegistry, - BlockFileTransfer: blockFileTransfer, - Execer: execer, - ContainerLister: containerLister, - }) + var ( + lastErr error + mustExit bool + ) + for { + prometheusRegistry := prometheus.NewRegistry() - promHandler := agent.PrometheusMetricsHandler(prometheusRegistry, logger) - prometheusSrvClose := ServeHandler(ctx, logger, promHandler, prometheusAddress, "prometheus") - defer prometheusSrvClose() + agnt := agent.New(agent.Options{ + Client: client, + Logger: logger, + LogDir: logDir, + ScriptDataDir: scriptDataDir, + TailnetListenPort: uint16(tailnetListenPort), + ExchangeToken: func(ctx context.Context) (string, error) { + if exchangeToken == nil { + return client.SDK.SessionToken(), nil + } + resp, err := exchangeToken(ctx) + if err != nil { + return "", err + } + client.SetSessionToken(resp.SessionToken) + return resp.SessionToken, nil + }, + EnvironmentVariables: environmentVariables, + IgnorePorts: ignorePorts, + SSHMaxTimeout: sshMaxTimeout, + Subsystems: subsystems, - debugSrvClose := ServeHandler(ctx, logger, agnt.HTTPDebug(), debugAddress, "debug") - defer debugSrvClose() + PrometheusRegistry: prometheusRegistry, + BlockFileTransfer: blockFileTransfer, + Execer: execer, + ContainerLister: containerLister, + }) - <-ctx.Done() - return agnt.Close() + promHandler := agent.PrometheusMetricsHandler(prometheusRegistry, logger) + prometheusSrvClose := ServeHandler(ctx, logger, promHandler, prometheusAddress, "prometheus") + + debugSrvClose := ServeHandler(ctx, logger, agnt.HTTPDebug(), debugAddress, "debug") + + select { + case <-ctx.Done(): + logger.Warn(ctx, "agent shutting down", slog.Error(ctx.Err()), slog.F("cause", context.Cause(ctx))) + mustExit = true + case event := <-reinitEvents: + logger.Warn(ctx, "agent received instruction to reinitialize", + slog.F("message", event.Message), slog.F("reason", event.Reason)) + } + + lastErr = agnt.Close() + debugSrvClose() + prometheusSrvClose() + + if mustExit { + reinitCancel() + break + } + + logger.Info(ctx, "reinitializing...") + } + return lastErr }, } diff --git a/coderd/agentapi/manifest.go b/coderd/agentapi/manifest.go index e07b24094b..dc3fafafe3 100644 --- a/coderd/agentapi/manifest.go +++ b/coderd/agentapi/manifest.go @@ -1,15 +1,16 @@ package agentapi import ( - "cdr.dev/slog" "context" "database/sql" - "fmt" - "github.com/coder/coder/v2/coderd/database/pubsub" "net/url" "strings" "time" + "cdr.dev/slog" + + "github.com/coder/coder/v2/coderd/database/pubsub" + "github.com/google/uuid" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" @@ -41,61 +42,6 @@ type ManifestAPI struct { Log slog.Logger } -func (a *ManifestAPI) StreamManifests(in *agentproto.GetManifestRequest, stream agentproto.DRPCAgent_StreamManifestsStream) error { - streamCtx := dbauthz.AsSystemRestricted(stream.Context()) // TODO: - - defer func() { - if err := stream.CloseSend(); err != nil { - a.Log.Error(streamCtx, "error closing stream: %v", err) - } - }() - - updates := make(chan struct{}, 1) - - unsub, err := a.Pubsub.Subscribe(ManifestUpdateChannel(a.WorkspaceID), func(ctx context.Context, _ []byte) { - a.Log.Info(ctx, "received 'prebuild claimed' event for workspace, pushing down new manifest", slog.F("workspace_id", a.WorkspaceID.String())) - select { - case <-streamCtx.Done(): - return - case <-ctx.Done(): - return - case updates <- struct{}{}: - } - }) - if err != nil { - return xerrors.Errorf("subscribe to 'prebuild claimed' event: %w", err) - } - defer unsub() - - for { - manifest, err := a.GetManifest(streamCtx, in) - if err != nil { - return xerrors.Errorf("receive manifest: %w", err) - } - - a.Log.Debug(streamCtx, "pushing manifest to workspace", slog.F("workspace_id", a.WorkspaceID)) - - // Send first retrieved manifest. - err = stream.Send(manifest) - if err != nil { - return xerrors.Errorf("send manifest: %w", err) - } - - // ...then wait until triggered by prebuild claim completion. - // At this stage, a prebuild will have been claimed by a user and the agent will need to be reconfigured. - select { - case <-updates: - a.Log.Info(streamCtx, "received manifest update request", slog.F("workspace_id", a.WorkspaceID)) - case <-streamCtx.Done(): - return xerrors.Errorf("stream close: %w", streamCtx.Err()) - } - } -} - -func ManifestUpdateChannel(id uuid.UUID) string { - return fmt.Sprintf("prebuild_claimed_%s", id) -} - func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifestRequest) (*agentproto.Manifest, error) { workspaceAgent, err := a.AgentFn(ctx) if err != nil { diff --git a/coderd/coderd.go b/coderd/coderd.go index 2b62d96b56..8de5c48181 100644 --- a/coderd/coderd.go +++ b/coderd/coderd.go @@ -1193,6 +1193,7 @@ func New(options *Options) *API { r.Get("/external-auth", api.workspaceAgentsExternalAuth) r.Get("/gitsshkey", api.agentGitSSHKey) r.Post("/log-source", api.workspaceAgentPostLogSource) + r.Get("/reinit", api.workspaceAgentReinit) }) r.Route("/{workspaceagent}", func(r chi.Router) { r.Use( diff --git a/coderd/provisionerdserver/provisionerdserver.go b/coderd/provisionerdserver/provisionerdserver.go index 2ae5fd4946..a8203ecb4d 100644 --- a/coderd/provisionerdserver/provisionerdserver.go +++ b/coderd/provisionerdserver/provisionerdserver.go @@ -15,7 +15,7 @@ import ( "sync/atomic" "time" - "github.com/coder/coder/v2/coderd/agentapi" + "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/google/uuid" "github.com/sqlc-dev/pqtype" @@ -1713,11 +1713,13 @@ func (s *server) CompleteJob(ctx context.Context, completed *proto.CompletedJob) // If this job was initiated by the prebuilds user and the job is not a prebuild, then it MUST be the claim run. // TODO: maybe add some specific metadata to indicate this rather than imputing it. - if input.IsPrebuildClaimByUser != uuid.Nil { + if input.PrebuildClaimByUser != uuid.Nil { + channel := agentsdk.PrebuildClaimedChannel(workspace.ID) s.Logger.Info(ctx, "workspace prebuild successfully claimed by user", - slog.F("user", input.IsPrebuildClaimByUser.String()), - slog.F("workspace_id", workspace.ID)) - if err := s.Pubsub.Publish(agentapi.ManifestUpdateChannel(workspace.ID), nil); err != nil { + slog.F("user", input.PrebuildClaimByUser.String()), + slog.F("workspace_id", workspace.ID), + slog.F("channel", channel)) + if err := s.Pubsub.Publish(channel, []byte(input.PrebuildClaimByUser.String())); err != nil { s.Logger.Error(ctx, "failed to publish message to workspace agent to pull new manifest", slog.Error(err)) } } @@ -2381,10 +2383,10 @@ type TemplateVersionImportJob struct { // WorkspaceProvisionJob is the payload for the "workspace_provision" job type. type WorkspaceProvisionJob struct { - WorkspaceBuildID uuid.UUID `json:"workspace_build_id"` - DryRun bool `json:"dry_run"` - IsPrebuild bool `json:"is_prebuild,omitempty"` - IsPrebuildClaimByUser uuid.UUID `json:"is_prebuild_claim_by,omitempty"` + WorkspaceBuildID uuid.UUID `json:"workspace_build_id"` + DryRun bool `json:"dry_run"` + IsPrebuild bool `json:"is_prebuild,omitempty"` + PrebuildClaimByUser uuid.UUID `json:"prebuild_claim_by,omitempty"` // RunningWorkspaceAgentID is *only* used for prebuilds. We pass it down when we want to rebuild a prebuilt workspace // but not generate a new agent token. The provisionerdserver will retrieve this token and push it down to // the provisioner (and ultimately to the `coder_agent` resource in the Terraform provider) where it will be diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 8132da9bd7..2bae75b02a 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -23,6 +23,8 @@ import ( "tailscale.com/tailcfg" "cdr.dev/slog" + "github.com/coder/websocket" + "github.com/coder/coder/v2/coderd/agentapi" "github.com/coder/coder/v2/coderd/database" "github.com/coder/coder/v2/coderd/database/db2sdk" @@ -42,7 +44,6 @@ import ( "github.com/coder/coder/v2/codersdk/wsjson" "github.com/coder/coder/v2/tailnet" "github.com/coder/coder/v2/tailnet/proto" - "github.com/coder/websocket" ) // @Summary Get workspace agent by ID @@ -1046,6 +1047,105 @@ func (api *API) workspaceAgentPostLogSource(rw http.ResponseWriter, r *http.Requ httpapi.Write(ctx, rw, http.StatusCreated, apiSource) } +// TODO @Summary Post workspace agent log source +// TODO @ID post-workspace-agent-log-source +// TODO @Security CoderSessionToken +// TODO @Accept json +// TODO @Produce json +// TODO @Tags Agents +// TODO @Param request body agentsdk.PostLogSourceRequest true "Log source request" +// TODO @Success 200 {object} codersdk.WorkspaceAgentLogSource +// TODO @Router /workspaceagents/me/log-source [post] +func (api *API) workspaceAgentReinit(rw http.ResponseWriter, r *http.Request) { + // Allow us to interrupt watch via cancel. + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + r = r.WithContext(ctx) // Rewire context for SSE cancellation. + + workspaceAgent := httpmw.WorkspaceAgent(r) + log := api.Logger.Named("workspace_agent_reinit_watcher").With( + slog.F("workspace_agent_id", workspaceAgent.ID), + ) + + workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID) + if err != nil { + log.Error(ctx, "failed to retrieve workspace from agent token", slog.Error(err)) + httpapi.InternalServerError(rw, errors.New("failed to determine workspace from agent token")) + } + + log.Info(ctx, "agent waiting for reinit instruction") + + prebuildClaims := make(chan uuid.UUID, 1) + cancelSub, err := api.Pubsub.Subscribe(agentsdk.PrebuildClaimedChannel(workspace.ID), func(inner context.Context, id []byte) { + select { + case <-ctx.Done(): + return + case <-inner.Done(): + return + default: + } + + parsed, err := uuid.ParseBytes(id) + if err != nil { + log.Error(ctx, "invalid prebuild claimed channel payload", slog.F("input", string(id))) + return + } + prebuildClaims <- parsed + }) + if err != nil { + log.Error(ctx, "failed to subscribe to prebuild claimed channel", slog.Error(err)) + httpapi.InternalServerError(rw, errors.New("failed to subscribe to prebuild claimed channel")) + return + } + defer cancelSub() + + sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r) + if err != nil { + httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ + Message: "Internal error setting up server-sent events.", + Detail: err.Error(), + }) + return + } + // Prevent handler from returning until the sender is closed. + defer func() { + cancel() + <-sseSenderClosed + }() + // Synchronize cancellation from SSE -> context, this lets us simplify the + // cancellation logic. + go func() { + select { + case <-ctx.Done(): + case <-sseSenderClosed: + cancel() + } + }() + + // An initial ping signals to the request that the server is now ready + // and the client can begin servicing a channel with data. + _ = sseSendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypePing, + }) + + // Expand with future use-cases for agent reinitialization. + for { + select { + case <-ctx.Done(): + return + case user := <-prebuildClaims: + err = sseSendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypeData, + Data: agentsdk.ReinitializationResponse{ + Message: fmt.Sprintf("prebuild claimed by user: %s", user), + Reason: agentsdk.ReinitializeReasonPrebuildClaimed, + }, + }) + log.Warn(ctx, "failed to send SSE response to trigger reinit", slog.Error(err)) + } + } +} + // convertProvisionedApps converts applications that are in the middle of provisioning process. // It means that they may not have an agent or workspace assigned (dry-run job). func convertProvisionedApps(dbApps []database.WorkspaceApp) []codersdk.WorkspaceApp { diff --git a/coderd/wsbuilder/wsbuilder.go b/coderd/wsbuilder/wsbuilder.go index cf8cb6c362..9671f2cbed 100644 --- a/coderd/wsbuilder/wsbuilder.go +++ b/coderd/wsbuilder/wsbuilder.go @@ -319,7 +319,7 @@ func (b *Builder) buildTx(authFunc func(action policy.Action, object rbac.Object WorkspaceBuildID: workspaceBuildID, LogLevel: b.logLevel, IsPrebuild: b.prebuild, - IsPrebuildClaimByUser: b.prebuildClaimBy, + PrebuildClaimByUser: b.prebuildClaimBy, RunningWorkspaceAgentID: b.runningWorkspaceAgentID, }) if err != nil { diff --git a/codersdk/agentsdk/agentsdk.go b/codersdk/agentsdk/agentsdk.go index 0ff9f04867..4b2c98b24c 100644 --- a/codersdk/agentsdk/agentsdk.go +++ b/codersdk/agentsdk/agentsdk.go @@ -19,12 +19,13 @@ import ( "tailscale.com/tailcfg" "cdr.dev/slog" + "github.com/coder/websocket" + "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/apiversion" "github.com/coder/coder/v2/codersdk" drpcsdk "github.com/coder/coder/v2/codersdk/drpc" tailnetproto "github.com/coder/coder/v2/tailnet/proto" - "github.com/coder/websocket" ) // ExternalLogSourceID is the statically-defined ID of a log-source that @@ -232,7 +233,7 @@ func (c *Client) ConnectRPC23(ctx context.Context) ( // ConnectRPC24 returns a dRPC client to the Agent API v2.4. It is useful when you want to be // maximally compatible with Coderd Release Versions from 2.18+ // TODO update release func (c *Client) ConnectRPC24(ctx context.Context) ( - proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error, + proto.DRPCAgentClient23, tailnetproto.DRPCTailnetClient23, error, ) { conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 4)) if err != nil { @@ -649,3 +650,72 @@ func LogsNotifyChannel(agentID uuid.UUID) string { type LogsNotifyMessage struct { CreatedAfter int64 `json:"created_after"` } + +type ReinitializationReason string + +const ( + ReinitializeReasonPrebuildClaimed ReinitializationReason = "prebuild_claimed" +) + +type ReinitializationResponse struct { + Message string `json:"message"` + Reason ReinitializationReason `json:"reason"` +} + +// TODO: maybe place this somewhere else? +func PrebuildClaimedChannel(id uuid.UUID) string { + return fmt.Sprintf("prebuild_claimed_%s", id) +} + +// WaitForReinit polls a SSE endpoint, and receives an event back under the following conditions: +// - ping: ignored, keepalive +// - prebuild claimed: a prebuilt workspace is claimed, so the agent must reinitialize. +// NOTE: the caller is responsible for closing the events chan. +func (c *Client) WaitForReinit(ctx context.Context, events chan<- ReinitializationResponse) error { + // TODO: allow configuring httpclient + c.SDK.HTTPClient.Timeout = time.Hour * 24 + + res, err := c.SDK.Request(ctx, http.MethodGet, "/api/v2/workspaceagents/me/reinit", nil) + if err != nil { + return xerrors.Errorf("execute request: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return codersdk.ReadBodyAsError(res) + } + + nextEvent := codersdk.ServerSentEventReader(ctx, res.Body) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + sse, err := nextEvent() + if err != nil { + return xerrors.Errorf("failed to read server-sent event: %w", err) + } + // TODO: remove + fmt.Printf("RECEIVED SSE EVENT: %s\n", sse.Type) + if sse.Type != codersdk.ServerSentEventTypeData { + continue + } + var reinitResp ReinitializationResponse + b, ok := sse.Data.([]byte) + if !ok { + return xerrors.Errorf("expected data as []byte, got %T", sse.Data) + } + err = json.Unmarshal(b, &reinitResp) + if err != nil { + return xerrors.Errorf("unmarshal reinit response: %w", err) + } + select { + case <-ctx.Done(): + return ctx.Err() + case events <- reinitResp: + } + } +}