feat(agent/agentcontainers): update containers periodically (#17972)

This change introduces a significant refactor to the agentcontainers API
and enables periodic updates of Docker containers rather than on-demand.
Consequently this change also allows us to move away from using a
locking channel and replace it with a mutex, which simplifies usage.

Additionally a previous oversight was fixed, and testing added, to clear
devcontainer running/dirty status when the container has been removed.

Updates coder/coder#16424
Updates coder/internal#621
This commit is contained in:
Mathias Fredriksson
2025-05-22 19:44:33 +03:00
committed by GitHub
parent 13b41c200c
commit d6c14f3d8a
6 changed files with 530 additions and 339 deletions

View File

@ -8,6 +8,7 @@ import (
"path"
"slices"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
@ -25,35 +26,35 @@ import (
)
const (
defaultGetContainersCacheDuration = 10 * time.Second
dockerCreatedAtTimeFormat = "2006-01-02 15:04:05 -0700 MST"
getContainersTimeout = 5 * time.Second
defaultUpdateInterval = 10 * time.Second
listContainersTimeout = 15 * time.Second
)
// API is responsible for container-related operations in the agent.
// It provides methods to list and manage containers.
type API struct {
ctx context.Context
cancel context.CancelFunc
done chan struct{}
logger slog.Logger
watcher watcher.Watcher
ctx context.Context
cancel context.CancelFunc
watcherDone chan struct{}
updaterDone chan struct{}
initialUpdateDone chan struct{} // Closed after first update in updaterLoop.
updateTrigger chan chan error // Channel to trigger manual refresh.
updateInterval time.Duration // Interval for periodic container updates.
logger slog.Logger
watcher watcher.Watcher
execer agentexec.Execer
cl Lister
dccli DevcontainerCLI
clock quartz.Clock
scriptLogger func(logSourceID uuid.UUID) ScriptLogger
cacheDuration time.Duration
execer agentexec.Execer
cl Lister
dccli DevcontainerCLI
clock quartz.Clock
scriptLogger func(logSourceID uuid.UUID) ScriptLogger
// lockCh protects the below fields. We use a channel instead of a
// mutex so we can handle cancellation properly.
lockCh chan struct{}
containers codersdk.WorkspaceAgentListContainersResponse
mtime time.Time
devcontainerNames map[string]struct{} // Track devcontainer names to avoid duplicates.
knownDevcontainers []codersdk.WorkspaceAgentDevcontainer // Track predefined and runtime-detected devcontainers.
configFileModifiedTimes map[string]time.Time // Track when config files were last modified.
mu sync.RWMutex
closed bool
containers codersdk.WorkspaceAgentListContainersResponse // Output from the last list operation.
containersErr error // Error from the last list operation.
devcontainerNames map[string]struct{}
knownDevcontainers []codersdk.WorkspaceAgentDevcontainer
configFileModifiedTimes map[string]time.Time
devcontainerLogSourceIDs map[string]uuid.UUID // Track devcontainer log source IDs.
}
@ -69,15 +70,6 @@ func WithClock(clock quartz.Clock) Option {
}
}
// WithCacheDuration sets the cache duration for the API.
// This is used to control how often the API refreshes the list of
// containers. The default is 10 seconds.
func WithCacheDuration(d time.Duration) Option {
return func(api *API) {
api.cacheDuration = d
}
}
// WithExecer sets the agentexec.Execer implementation to use.
func WithExecer(execer agentexec.Execer) Option {
return func(api *API) {
@ -169,12 +161,14 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
api := &API{
ctx: ctx,
cancel: cancel,
done: make(chan struct{}),
watcherDone: make(chan struct{}),
updaterDone: make(chan struct{}),
initialUpdateDone: make(chan struct{}),
updateTrigger: make(chan chan error),
updateInterval: defaultUpdateInterval,
logger: logger,
clock: quartz.NewReal(),
execer: agentexec.DefaultExecer,
cacheDuration: defaultGetContainersCacheDuration,
lockCh: make(chan struct{}, 1),
devcontainerNames: make(map[string]struct{}),
knownDevcontainers: []codersdk.WorkspaceAgentDevcontainer{},
configFileModifiedTimes: make(map[string]time.Time),
@ -200,33 +194,16 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
}
}
go api.loop()
go api.watcherLoop()
go api.updaterLoop()
return api
}
// SignalReady signals the API that we are ready to begin watching for
// file changes. This is used to prime the cache with the current list
// of containers and to start watching the devcontainer config files for
// changes. It should be called after the agent ready.
func (api *API) SignalReady() {
// Prime the cache with the current list of containers.
_, _ = api.cl.List(api.ctx)
// Make sure we watch the devcontainer config files for changes.
for _, devcontainer := range api.knownDevcontainers {
if devcontainer.ConfigPath == "" {
continue
}
if err := api.watcher.Add(devcontainer.ConfigPath); err != nil {
api.logger.Error(api.ctx, "watch devcontainer config file failed", slog.Error(err), slog.F("file", devcontainer.ConfigPath))
}
}
}
func (api *API) loop() {
defer close(api.done)
func (api *API) watcherLoop() {
defer close(api.watcherDone)
defer api.logger.Debug(api.ctx, "watcher loop stopped")
api.logger.Debug(api.ctx, "watcher loop started")
for {
event, err := api.watcher.Next(api.ctx)
@ -263,10 +240,94 @@ func (api *API) loop() {
}
}
// updaterLoop is responsible for periodically updating the container
// list and handling manual refresh requests.
func (api *API) updaterLoop() {
defer close(api.updaterDone)
defer api.logger.Debug(api.ctx, "updater loop stopped")
api.logger.Debug(api.ctx, "updater loop started")
// Perform an initial update to populate the container list, this
// gives us a guarantee that the API has loaded the initial state
// before returning any responses. This is useful for both tests
// and anyone looking to interact with the API.
api.logger.Debug(api.ctx, "performing initial containers update")
if err := api.updateContainers(api.ctx); err != nil {
api.logger.Error(api.ctx, "initial containers update failed", slog.Error(err))
} else {
api.logger.Debug(api.ctx, "initial containers update complete")
}
// Signal that the initial update attempt (successful or not) is done.
// Other services can wait on this if they need the first data to be available.
close(api.initialUpdateDone)
// We utilize a TickerFunc here instead of a regular Ticker so that
// we can guarantee execution of the updateContainers method after
// advancing the clock.
ticker := api.clock.TickerFunc(api.ctx, api.updateInterval, func() error {
done := make(chan error, 1)
defer close(done)
select {
case <-api.ctx.Done():
return api.ctx.Err()
case api.updateTrigger <- done:
err := <-done
if err != nil {
api.logger.Error(api.ctx, "updater loop ticker failed", slog.Error(err))
}
default:
api.logger.Debug(api.ctx, "updater loop ticker skipped, update in progress")
}
return nil // Always nil to keep the ticker going.
}, "updaterLoop")
defer func() {
if err := ticker.Wait("updaterLoop"); err != nil && !errors.Is(err, context.Canceled) {
api.logger.Error(api.ctx, "updater loop ticker failed", slog.Error(err))
}
}()
for {
select {
case <-api.ctx.Done():
return
case done := <-api.updateTrigger:
// Note that although we pass api.ctx here, updateContainers
// has an internal timeout to prevent long blocking calls.
done <- api.updateContainers(api.ctx)
}
}
}
// Routes returns the HTTP handler for container-related routes.
func (api *API) Routes() http.Handler {
r := chi.NewRouter()
ensureInitialUpdateDoneMW := func(next http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
select {
case <-api.ctx.Done():
httpapi.Write(r.Context(), rw, http.StatusServiceUnavailable, codersdk.Response{
Message: "API closed",
Detail: "The API is closed and cannot process requests.",
})
return
case <-r.Context().Done():
return
case <-api.initialUpdateDone:
// Initial update is done, we can start processing
// requests.
}
next.ServeHTTP(rw, r)
})
}
// For now, all endpoints require the initial update to be done.
// If we want to allow some endpoints to be available before
// the initial update, we can enable this per-route.
r.Use(ensureInitialUpdateDoneMW)
r.Get("/", api.handleList)
r.Route("/devcontainers", func(r chi.Router) {
r.Get("/", api.handleDevcontainersList)
@ -278,62 +339,53 @@ func (api *API) Routes() http.Handler {
// handleList handles the HTTP request to list containers.
func (api *API) handleList(rw http.ResponseWriter, r *http.Request) {
select {
case <-r.Context().Done():
// Client went away.
ct, err := api.getContainers()
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Could not get containers",
Detail: err.Error(),
})
return
default:
ct, err := api.getContainers(r.Context())
if err != nil {
if errors.Is(err, context.Canceled) {
httpapi.Write(r.Context(), rw, http.StatusRequestTimeout, codersdk.Response{
Message: "Could not get containers.",
Detail: "Took too long to list containers.",
})
return
}
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Could not get containers.",
Detail: err.Error(),
})
return
}
httpapi.Write(r.Context(), rw, http.StatusOK, ct)
}
// updateContainers fetches the latest container list, processes it, and
// updates the cache. It performs locking for updating shared API state.
func (api *API) updateContainers(ctx context.Context) error {
listCtx, listCancel := context.WithTimeout(ctx, listContainersTimeout)
defer listCancel()
updated, err := api.cl.List(listCtx)
if err != nil {
// If the context was canceled, we hold off on clearing the
// containers cache. This is to avoid clearing the cache if
// the update was canceled due to a timeout. Hopefully this
// will clear up on the next update.
if !errors.Is(err, context.Canceled) {
api.mu.Lock()
api.containers = codersdk.WorkspaceAgentListContainersResponse{}
api.containersErr = err
api.mu.Unlock()
}
httpapi.Write(r.Context(), rw, http.StatusOK, ct)
return xerrors.Errorf("list containers failed: %w", err)
}
api.mu.Lock()
defer api.mu.Unlock()
api.processUpdatedContainersLocked(ctx, updated)
api.logger.Debug(ctx, "containers updated successfully", slog.F("container_count", len(api.containers.Containers)), slog.F("warning_count", len(api.containers.Warnings)), slog.F("devcontainer_count", len(api.knownDevcontainers)))
return nil
}
func copyListContainersResponse(resp codersdk.WorkspaceAgentListContainersResponse) codersdk.WorkspaceAgentListContainersResponse {
return codersdk.WorkspaceAgentListContainersResponse{
Containers: slices.Clone(resp.Containers),
Warnings: slices.Clone(resp.Warnings),
}
}
func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListContainersResponse, error) {
select {
case <-api.ctx.Done():
return codersdk.WorkspaceAgentListContainersResponse{}, api.ctx.Err()
case <-ctx.Done():
return codersdk.WorkspaceAgentListContainersResponse{}, ctx.Err()
case api.lockCh <- struct{}{}:
defer func() { <-api.lockCh }()
}
now := api.clock.Now()
if now.Sub(api.mtime) < api.cacheDuration {
return copyListContainersResponse(api.containers), nil
}
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, getContainersTimeout)
defer timeoutCancel()
updated, err := api.cl.List(timeoutCtx)
if err != nil {
return codersdk.WorkspaceAgentListContainersResponse{}, xerrors.Errorf("get containers: %w", err)
}
api.containers = updated
api.mtime = now
// processUpdatedContainersLocked updates the devcontainer state based
// on the latest list of containers. This method assumes that api.mu is
// held.
func (api *API) processUpdatedContainersLocked(ctx context.Context, updated codersdk.WorkspaceAgentListContainersResponse) {
dirtyStates := make(map[string]bool)
// Reset all known devcontainers to not running.
for i := range api.knownDevcontainers {
@ -345,6 +397,7 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC
}
// Check if the container is running and update the known devcontainers.
updatedDevcontainers := make(map[string]bool)
for i := range updated.Containers {
container := &updated.Containers[i]
workspaceFolder := container.Labels[DevcontainerLocalFolderLabel]
@ -354,18 +407,16 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC
continue
}
container.DevcontainerDirty = dirtyStates[workspaceFolder]
if container.DevcontainerDirty {
lastModified, hasModTime := api.configFileModifiedTimes[configFile]
if hasModTime && container.CreatedAt.After(lastModified) {
api.logger.Info(ctx, "new container created after config modification, not marking as dirty",
slog.F("container", container.ID),
slog.F("created_at", container.CreatedAt),
slog.F("config_modified_at", lastModified),
slog.F("file", configFile),
)
container.DevcontainerDirty = false
}
if lastModified, hasModTime := api.configFileModifiedTimes[configFile]; !hasModTime || container.CreatedAt.Before(lastModified) {
api.logger.Debug(ctx, "container created before config modification, setting dirty state from devcontainer",
slog.F("container", container.ID),
slog.F("created_at", container.CreatedAt),
slog.F("config_modified_at", lastModified),
slog.F("file", configFile),
slog.F("workspace_folder", workspaceFolder),
slog.F("dirty", dirtyStates[workspaceFolder]),
)
container.DevcontainerDirty = dirtyStates[workspaceFolder]
}
// Check if this is already in our known list.
@ -373,29 +424,17 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC
return dc.WorkspaceFolder == workspaceFolder
}); knownIndex != -1 {
// Update existing entry with runtime information.
if configFile != "" && api.knownDevcontainers[knownIndex].ConfigPath == "" {
api.knownDevcontainers[knownIndex].ConfigPath = configFile
dc := &api.knownDevcontainers[knownIndex]
if configFile != "" && dc.ConfigPath == "" {
dc.ConfigPath = configFile
if err := api.watcher.Add(configFile); err != nil {
api.logger.Error(ctx, "watch devcontainer config file failed", slog.Error(err), slog.F("file", configFile))
}
}
api.knownDevcontainers[knownIndex].Running = container.Running
api.knownDevcontainers[knownIndex].Container = container
// Check if this container was created after the config
// file was modified.
if configFile != "" && api.knownDevcontainers[knownIndex].Dirty {
lastModified, hasModTime := api.configFileModifiedTimes[configFile]
if hasModTime && container.CreatedAt.After(lastModified) {
api.logger.Info(ctx, "clearing dirty flag for container created after config modification",
slog.F("container", container.ID),
slog.F("created_at", container.CreatedAt),
slog.F("config_modified_at", lastModified),
slog.F("file", configFile),
)
api.knownDevcontainers[knownIndex].Dirty = false
}
}
dc.Running = container.Running
dc.Container = container
dc.Dirty = container.DevcontainerDirty
updatedDevcontainers[workspaceFolder] = true
continue
}
@ -428,9 +467,73 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC
Dirty: container.DevcontainerDirty,
Container: container,
})
updatedDevcontainers[workspaceFolder] = true
}
return copyListContainersResponse(api.containers), nil
for i := range api.knownDevcontainers {
if _, ok := updatedDevcontainers[api.knownDevcontainers[i].WorkspaceFolder]; ok {
continue
}
dc := &api.knownDevcontainers[i]
if !dc.Running && !dc.Dirty && dc.Container == nil {
// Already marked as not running, skip.
continue
}
api.logger.Debug(ctx, "devcontainer is not running anymore, marking as not running",
slog.F("workspace_folder", dc.WorkspaceFolder),
slog.F("config_path", dc.ConfigPath),
slog.F("name", dc.Name),
)
dc.Running = false
dc.Dirty = false
dc.Container = nil
}
api.containers = updated
api.containersErr = nil
}
// refreshContainers triggers an immediate update of the container list
// and waits for it to complete.
func (api *API) refreshContainers(ctx context.Context) (err error) {
defer func() {
if err != nil {
err = xerrors.Errorf("refresh containers failed: %w", err)
}
}()
done := make(chan error, 1)
select {
case <-api.ctx.Done():
return xerrors.Errorf("API closed: %w", api.ctx.Err())
case <-ctx.Done():
return ctx.Err()
case api.updateTrigger <- done:
select {
case <-api.ctx.Done():
return xerrors.Errorf("API closed: %w", api.ctx.Err())
case <-ctx.Done():
return ctx.Err()
case err := <-done:
return err
}
}
}
func (api *API) getContainers() (codersdk.WorkspaceAgentListContainersResponse, error) {
api.mu.RLock()
defer api.mu.RUnlock()
if api.containersErr != nil {
return codersdk.WorkspaceAgentListContainersResponse{}, api.containersErr
}
return codersdk.WorkspaceAgentListContainersResponse{
Containers: slices.Clone(api.containers.Containers),
Warnings: slices.Clone(api.containers.Warnings),
}, nil
}
// handleDevcontainerRecreate handles the HTTP request to recreate a
@ -447,7 +550,7 @@ func (api *API) handleDevcontainerRecreate(w http.ResponseWriter, r *http.Reques
return
}
containers, err := api.getContainers(ctx)
containers, err := api.getContainers()
if err != nil {
httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{
Message: "Could not list containers",
@ -509,30 +612,9 @@ func (api *API) handleDevcontainerRecreate(w http.ResponseWriter, r *http.Reques
return
}
// TODO(mafredri): Temporarily handle clearing the dirty state after
// recreation, later on this should be handled by a "container watcher".
if !api.doLockedHandler(w, r, func() {
for i := range api.knownDevcontainers {
if api.knownDevcontainers[i].WorkspaceFolder == workspaceFolder {
if api.knownDevcontainers[i].Dirty {
api.logger.Info(ctx, "clearing dirty flag after recreation",
slog.F("workspace_folder", workspaceFolder),
slog.F("name", api.knownDevcontainers[i].Name),
)
api.knownDevcontainers[i].Dirty = false
// TODO(mafredri): This should be handled by a service that
// updates the devcontainer state periodically and on-demand.
api.knownDevcontainers[i].Container = nil
// Set the modified time to the zero value to indicate that
// the containers list must be refreshed. This will see to
// it that the new container is re-assigned.
api.mtime = time.Time{}
}
return
}
}
}) {
return
// NOTE(mafredri): This won't be needed once recreation is done async.
if err := api.refreshContainers(r.Context()); err != nil {
api.logger.Error(ctx, "failed to trigger immediate refresh after devcontainer recreation", slog.Error(err))
}
w.WriteHeader(http.StatusNoContent)
@ -542,8 +624,10 @@ func (api *API) handleDevcontainerRecreate(w http.ResponseWriter, r *http.Reques
func (api *API) handleDevcontainersList(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Run getContainers to detect the latest devcontainers and their state.
_, err := api.getContainers(ctx)
api.mu.RLock()
err := api.containersErr
devcontainers := slices.Clone(api.knownDevcontainers)
api.mu.RUnlock()
if err != nil {
httpapi.Write(ctx, w, http.StatusInternalServerError, codersdk.Response{
Message: "Could not list containers",
@ -552,13 +636,6 @@ func (api *API) handleDevcontainersList(w http.ResponseWriter, r *http.Request)
return
}
var devcontainers []codersdk.WorkspaceAgentDevcontainer
if !api.doLockedHandler(w, r, func() {
devcontainers = slices.Clone(api.knownDevcontainers)
}) {
return
}
slices.SortFunc(devcontainers, func(a, b codersdk.WorkspaceAgentDevcontainer) int {
if cmp := strings.Compare(a.WorkspaceFolder, b.WorkspaceFolder); cmp != 0 {
return cmp
@ -576,75 +653,56 @@ func (api *API) handleDevcontainersList(w http.ResponseWriter, r *http.Request)
// markDevcontainerDirty finds the devcontainer with the given config file path
// and marks it as dirty. It acquires the lock before modifying the state.
func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) {
ok := api.doLocked(func() {
// Record the timestamp of when this configuration file was modified.
api.configFileModifiedTimes[configPath] = modifiedAt
api.mu.Lock()
defer api.mu.Unlock()
for i := range api.knownDevcontainers {
if api.knownDevcontainers[i].ConfigPath != configPath {
continue
}
// Record the timestamp of when this configuration file was modified.
api.configFileModifiedTimes[configPath] = modifiedAt
// TODO(mafredri): Simplistic mark for now, we should check if the
// container is running and if the config file was modified after
// the container was created.
if !api.knownDevcontainers[i].Dirty {
api.logger.Info(api.ctx, "marking devcontainer as dirty",
slog.F("file", configPath),
slog.F("name", api.knownDevcontainers[i].Name),
slog.F("workspace_folder", api.knownDevcontainers[i].WorkspaceFolder),
slog.F("modified_at", modifiedAt),
)
api.knownDevcontainers[i].Dirty = true
if api.knownDevcontainers[i].Container != nil {
api.knownDevcontainers[i].Container.DevcontainerDirty = true
}
}
for i := range api.knownDevcontainers {
dc := &api.knownDevcontainers[i]
if dc.ConfigPath != configPath {
continue
}
})
if !ok {
api.logger.Debug(api.ctx, "mark devcontainer dirty failed", slog.F("file", configPath))
}
}
func (api *API) doLockedHandler(w http.ResponseWriter, r *http.Request, f func()) bool {
select {
case <-r.Context().Done():
httpapi.Write(r.Context(), w, http.StatusRequestTimeout, codersdk.Response{
Message: "Request canceled",
Detail: "Request was canceled before we could process it.",
})
return false
case <-api.ctx.Done():
httpapi.Write(r.Context(), w, http.StatusServiceUnavailable, codersdk.Response{
Message: "API closed",
Detail: "The API is closed and cannot process requests.",
})
return false
case api.lockCh <- struct{}{}:
defer func() { <-api.lockCh }()
}
f()
return true
}
logger := api.logger.With(
slog.F("file", configPath),
slog.F("name", dc.Name),
slog.F("workspace_folder", dc.WorkspaceFolder),
slog.F("modified_at", modifiedAt),
)
func (api *API) doLocked(f func()) bool {
select {
case <-api.ctx.Done():
return false
case api.lockCh <- struct{}{}:
defer func() { <-api.lockCh }()
// TODO(mafredri): Simplistic mark for now, we should check if the
// container is running and if the config file was modified after
// the container was created.
if !dc.Dirty {
logger.Info(api.ctx, "marking devcontainer as dirty")
dc.Dirty = true
}
if dc.Container != nil && !dc.Container.DevcontainerDirty {
logger.Info(api.ctx, "marking devcontainer container as dirty")
dc.Container.DevcontainerDirty = true
}
}
f()
return true
}
func (api *API) Close() error {
api.cancel()
<-api.done
err := api.watcher.Close()
if err != nil {
return err
api.mu.Lock()
if api.closed {
api.mu.Unlock()
return nil
}
return nil
api.closed = true
api.logger.Debug(api.ctx, "closing API")
defer api.logger.Debug(api.ctx, "closed API")
api.cancel()
err := api.watcher.Close()
api.mu.Unlock()
<-api.watcherDone
<-api.updaterDone
return err
}