fix(agent/agentcontainers): update sub agent client on reconnect (#18399)

Fixes coder/internal#697
This commit is contained in:
Mathias Fredriksson
2025-06-17 16:58:09 +03:00
committed by GitHub
parent 97474bb28b
commit d6df1f23a9
2 changed files with 41 additions and 12 deletions

View File

@ -1188,6 +1188,14 @@ func (a *agent) handleManifest(manifestOK *checkpoint) func(ctx context.Context,
} }
a.metrics.startupScriptSeconds.WithLabelValues(label).Set(dur) a.metrics.startupScriptSeconds.WithLabelValues(label).Set(dur)
a.scriptRunner.StartCron() a.scriptRunner.StartCron()
// If the container API is enabled, trigger an immediate refresh
// for quick sub agent injection.
if cAPI := a.containerAPI.Load(); cAPI != nil {
if err := cAPI.RefreshContainers(ctx); err != nil {
a.logger.Error(ctx, "failed to refresh containers", slog.Error(err))
}
}
}) })
if err != nil { if err != nil {
return xerrors.Errorf("track conn goroutine: %w", err) return xerrors.Errorf("track conn goroutine: %w", err)
@ -1253,6 +1261,12 @@ func (a *agent) createOrUpdateNetwork(manifestOK, networkOK *checkpoint) func(co
network.SetDERPMap(manifest.DERPMap) network.SetDERPMap(manifest.DERPMap)
network.SetDERPForceWebSockets(manifest.DERPForceWebSockets) network.SetDERPForceWebSockets(manifest.DERPForceWebSockets)
network.SetBlockEndpoints(manifest.DisableDirectConnections) network.SetBlockEndpoints(manifest.DisableDirectConnections)
// Update the subagent client if the container API is available.
if cAPI := a.containerAPI.Load(); cAPI != nil {
client := agentcontainers.NewSubAgentClientFromAPI(a.logger, aAPI)
cAPI.UpdateSubAgentClient(client)
}
} }
return nil return nil
} }

View File

@ -14,6 +14,7 @@ import (
"slices" "slices"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
@ -59,7 +60,7 @@ type API struct {
dccli DevcontainerCLI dccli DevcontainerCLI
clock quartz.Clock clock quartz.Clock
scriptLogger func(logSourceID uuid.UUID) ScriptLogger scriptLogger func(logSourceID uuid.UUID) ScriptLogger
subAgentClient SubAgentClient subAgentClient atomic.Pointer[SubAgentClient]
subAgentURL string subAgentURL string
subAgentEnv []string subAgentEnv []string
@ -133,7 +134,7 @@ func WithDevcontainerCLI(dccli DevcontainerCLI) Option {
// This is used to list, create, and delete devcontainer agents. // This is used to list, create, and delete devcontainer agents.
func WithSubAgentClient(client SubAgentClient) Option { func WithSubAgentClient(client SubAgentClient) Option {
return func(api *API) { return func(api *API) {
api.subAgentClient = client api.subAgentClient.Store(&client)
} }
} }
@ -230,7 +231,6 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
logger: logger, logger: logger,
clock: quartz.NewReal(), clock: quartz.NewReal(),
execer: agentexec.DefaultExecer, execer: agentexec.DefaultExecer,
subAgentClient: noopSubAgentClient{},
containerLabelIncludeFilter: make(map[string]string), containerLabelIncludeFilter: make(map[string]string),
devcontainerNames: make(map[string]bool), devcontainerNames: make(map[string]bool),
knownDevcontainers: make(map[string]codersdk.WorkspaceAgentDevcontainer), knownDevcontainers: make(map[string]codersdk.WorkspaceAgentDevcontainer),
@ -259,6 +259,10 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
api.watcher = watcher.NewNoop() api.watcher = watcher.NewNoop()
} }
} }
if api.subAgentClient.Load() == nil {
var c SubAgentClient = noopSubAgentClient{}
api.subAgentClient.Store(&c)
}
go api.watcherLoop() go api.watcherLoop()
go api.updaterLoop() go api.updaterLoop()
@ -375,6 +379,11 @@ func (api *API) updaterLoop() {
} }
} }
// UpdateSubAgentClient updates the `SubAgentClient` for the API.
func (api *API) UpdateSubAgentClient(client SubAgentClient) {
api.subAgentClient.Store(&client)
}
// Routes returns the HTTP handler for container-related routes. // Routes returns the HTTP handler for container-related routes.
func (api *API) Routes() http.Handler { func (api *API) Routes() http.Handler {
r := chi.NewRouter() r := chi.NewRouter()
@ -623,9 +632,9 @@ func safeFriendlyName(name string) string {
return name return name
} }
// refreshContainers triggers an immediate update of the container list // RefreshContainers triggers an immediate update of the container list
// and waits for it to complete. // and waits for it to complete.
func (api *API) refreshContainers(ctx context.Context) (err error) { func (api *API) RefreshContainers(ctx context.Context) (err error) {
defer func() { defer func() {
if err != nil { if err != nil {
err = xerrors.Errorf("refresh containers failed: %w", err) err = xerrors.Errorf("refresh containers failed: %w", err)
@ -860,7 +869,7 @@ func (api *API) recreateDevcontainer(dc codersdk.WorkspaceAgentDevcontainer, con
// Ensure an immediate refresh to accurately reflect the // Ensure an immediate refresh to accurately reflect the
// devcontainer state after recreation. // devcontainer state after recreation.
if err := api.refreshContainers(ctx); err != nil { if err := api.RefreshContainers(ctx); err != nil {
logger.Error(ctx, "failed to trigger immediate refresh after devcontainer recreation", slog.Error(err)) logger.Error(ctx, "failed to trigger immediate refresh after devcontainer recreation", slog.Error(err))
} }
} }
@ -904,7 +913,8 @@ func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) {
// slate. This method has an internal timeout to prevent blocking // slate. This method has an internal timeout to prevent blocking
// indefinitely if something goes wrong with the subagent deletion. // indefinitely if something goes wrong with the subagent deletion.
func (api *API) cleanupSubAgents(ctx context.Context) error { func (api *API) cleanupSubAgents(ctx context.Context) error {
agents, err := api.subAgentClient.List(ctx) client := *api.subAgentClient.Load()
agents, err := client.List(ctx)
if err != nil { if err != nil {
return xerrors.Errorf("list agents: %w", err) return xerrors.Errorf("list agents: %w", err)
} }
@ -927,7 +937,8 @@ func (api *API) cleanupSubAgents(ctx context.Context) error {
if injected[agent.ID] { if injected[agent.ID] {
continue continue
} }
err := api.subAgentClient.Delete(ctx, agent.ID) client := *api.subAgentClient.Load()
err := client.Delete(ctx, agent.ID)
if err != nil { if err != nil {
api.logger.Error(ctx, "failed to delete agent", api.logger.Error(ctx, "failed to delete agent",
slog.Error(err), slog.Error(err),
@ -1101,7 +1112,8 @@ func (api *API) maybeInjectSubAgentIntoContainerLocked(ctx context.Context, dc c
if proc.agent.ID != uuid.Nil && recreateSubAgent { if proc.agent.ID != uuid.Nil && recreateSubAgent {
logger.Debug(ctx, "deleting existing subagent for recreation", slog.F("agent_id", proc.agent.ID)) logger.Debug(ctx, "deleting existing subagent for recreation", slog.F("agent_id", proc.agent.ID))
err = api.subAgentClient.Delete(ctx, proc.agent.ID) client := *api.subAgentClient.Load()
err = client.Delete(ctx, proc.agent.ID)
if err != nil { if err != nil {
return xerrors.Errorf("delete existing subagent failed: %w", err) return xerrors.Errorf("delete existing subagent failed: %w", err)
} }
@ -1144,7 +1156,8 @@ func (api *API) maybeInjectSubAgentIntoContainerLocked(ctx context.Context, dc c
) )
// Create new subagent record in the database to receive the auth token. // Create new subagent record in the database to receive the auth token.
proc.agent, err = api.subAgentClient.Create(ctx, SubAgent{ client := *api.subAgentClient.Load()
proc.agent, err = client.Create(ctx, SubAgent{
Name: dc.Name, Name: dc.Name,
Directory: directory, Directory: directory,
OperatingSystem: "linux", // Assuming Linux for devcontainers. OperatingSystem: "linux", // Assuming Linux for devcontainers.
@ -1163,7 +1176,8 @@ func (api *API) maybeInjectSubAgentIntoContainerLocked(ctx context.Context, dc c
if api.closed { if api.closed {
deleteCtx, deleteCancel := context.WithTimeout(context.Background(), defaultOperationTimeout) deleteCtx, deleteCancel := context.WithTimeout(context.Background(), defaultOperationTimeout)
defer deleteCancel() defer deleteCancel()
err := api.subAgentClient.Delete(deleteCtx, proc.agent.ID) client := *api.subAgentClient.Load()
err := client.Delete(deleteCtx, proc.agent.ID)
if err != nil { if err != nil {
return xerrors.Errorf("delete existing subagent failed after API closed: %w", err) return xerrors.Errorf("delete existing subagent failed after API closed: %w", err)
} }
@ -1249,8 +1263,9 @@ func (api *API) Close() error {
// Note: We can't use api.ctx here because it's canceled. // Note: We can't use api.ctx here because it's canceled.
deleteCtx, deleteCancel := context.WithTimeout(context.Background(), defaultOperationTimeout) deleteCtx, deleteCancel := context.WithTimeout(context.Background(), defaultOperationTimeout)
defer deleteCancel() defer deleteCancel()
client := *api.subAgentClient.Load()
for _, id := range subAgentIDs { for _, id := range subAgentIDs {
err := api.subAgentClient.Delete(deleteCtx, id) err := client.Delete(deleteCtx, id)
if err != nil { if err != nil {
api.logger.Error(api.ctx, "delete subagent record during shutdown failed", api.logger.Error(api.ctx, "delete subagent record during shutdown failed",
slog.Error(err), slog.Error(err),