mirror of
https://github.com/coder/coder.git
synced 2025-07-13 21:36:50 +00:00
feat(agent/agentcontainers): add file watcher and dirty status (#17573)
Fixes coder/internal#479 Fixes coder/internal#480
This commit is contained in:
committed by
GitHub
parent
b6146dfe8a
commit
268a50c193
@ -1481,8 +1481,13 @@ func (a *agent) createTailnet(
|
||||
}()
|
||||
if err = a.trackGoroutine(func() {
|
||||
defer apiListener.Close()
|
||||
apiHandler, closeAPIHAndler := a.apiHandler()
|
||||
defer func() {
|
||||
_ = closeAPIHAndler()
|
||||
}()
|
||||
server := &http.Server{
|
||||
Handler: a.apiHandler(),
|
||||
BaseContext: func(net.Listener) context.Context { return ctx },
|
||||
Handler: apiHandler,
|
||||
ReadTimeout: 20 * time.Second,
|
||||
ReadHeaderTimeout: 20 * time.Second,
|
||||
WriteTimeout: 20 * time.Second,
|
||||
@ -1493,6 +1498,7 @@ func (a *agent) createTailnet(
|
||||
case <-ctx.Done():
|
||||
case <-a.hardCtx.Done():
|
||||
}
|
||||
_ = closeAPIHAndler()
|
||||
_ = server.Close()
|
||||
}()
|
||||
|
||||
|
@ -10,11 +10,13 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"cdr.dev/slog"
|
||||
"github.com/coder/coder/v2/agent/agentcontainers/watcher"
|
||||
"github.com/coder/coder/v2/agent/agentexec"
|
||||
"github.com/coder/coder/v2/coderd/httpapi"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
@ -30,6 +32,12 @@ const (
|
||||
// API is responsible for container-related operations in the agent.
|
||||
// It provides methods to list and manage containers.
|
||||
type API struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
done chan struct{}
|
||||
logger slog.Logger
|
||||
watcher watcher.Watcher
|
||||
|
||||
cacheDuration time.Duration
|
||||
cl Lister
|
||||
dccli DevcontainerCLI
|
||||
@ -42,6 +50,7 @@ type API struct {
|
||||
mtime time.Time
|
||||
devcontainerNames map[string]struct{} // Track devcontainer names to avoid duplicates.
|
||||
knownDevcontainers []codersdk.WorkspaceAgentDevcontainer // Track predefined and runtime-detected devcontainers.
|
||||
configFileModifiedTimes map[string]time.Time // Track when config files were last modified.
|
||||
}
|
||||
|
||||
// Option is a functional option for API.
|
||||
@ -55,6 +64,16 @@ func WithLister(cl Lister) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithClock sets the quartz.Clock implementation to use.
|
||||
// This is primarily used for testing to control time.
|
||||
func WithClock(clock quartz.Clock) Option {
|
||||
return func(api *API) {
|
||||
api.clock = clock
|
||||
}
|
||||
}
|
||||
|
||||
// WithDevcontainerCLI sets the DevcontainerCLI implementation to use.
|
||||
// This can be used in tests to modify @devcontainer/cli behavior.
|
||||
func WithDevcontainerCLI(dccli DevcontainerCLI) Option {
|
||||
return func(api *API) {
|
||||
api.dccli = dccli
|
||||
@ -76,14 +95,29 @@ func WithDevcontainers(devcontainers []codersdk.WorkspaceAgentDevcontainer) Opti
|
||||
}
|
||||
}
|
||||
|
||||
// WithWatcher sets the file watcher implementation to use. By default a
|
||||
// noop watcher is used. This can be used in tests to modify the watcher
|
||||
// behavior or to use an actual file watcher (e.g. fsnotify).
|
||||
func WithWatcher(w watcher.Watcher) Option {
|
||||
return func(api *API) {
|
||||
api.watcher = w
|
||||
}
|
||||
}
|
||||
|
||||
// NewAPI returns a new API with the given options applied.
|
||||
func NewAPI(logger slog.Logger, options ...Option) *API {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
api := &API{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
done: make(chan struct{}),
|
||||
logger: logger,
|
||||
clock: quartz.NewReal(),
|
||||
cacheDuration: defaultGetContainersCacheDuration,
|
||||
lockCh: make(chan struct{}, 1),
|
||||
devcontainerNames: make(map[string]struct{}),
|
||||
knownDevcontainers: []codersdk.WorkspaceAgentDevcontainer{},
|
||||
configFileModifiedTimes: make(map[string]time.Time),
|
||||
}
|
||||
for _, opt := range options {
|
||||
opt(api)
|
||||
@ -92,12 +126,64 @@ func NewAPI(logger slog.Logger, options ...Option) *API {
|
||||
api.cl = &DockerCLILister{}
|
||||
}
|
||||
if api.dccli == nil {
|
||||
api.dccli = NewDevcontainerCLI(logger, agentexec.DefaultExecer)
|
||||
api.dccli = NewDevcontainerCLI(logger.Named("devcontainer-cli"), agentexec.DefaultExecer)
|
||||
}
|
||||
if api.watcher == nil {
|
||||
api.watcher = watcher.NewNoop()
|
||||
}
|
||||
|
||||
// Make sure we watch the devcontainer config files for changes.
|
||||
for _, devcontainer := range api.knownDevcontainers {
|
||||
if devcontainer.ConfigPath != "" {
|
||||
if err := api.watcher.Add(devcontainer.ConfigPath); err != nil {
|
||||
api.logger.Error(ctx, "watch devcontainer config file failed", slog.Error(err), slog.F("file", devcontainer.ConfigPath))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go api.start()
|
||||
|
||||
return api
|
||||
}
|
||||
|
||||
func (api *API) start() {
|
||||
defer close(api.done)
|
||||
|
||||
for {
|
||||
event, err := api.watcher.Next(api.ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, watcher.ErrClosed) {
|
||||
api.logger.Debug(api.ctx, "watcher closed")
|
||||
return
|
||||
}
|
||||
if api.ctx.Err() != nil {
|
||||
api.logger.Debug(api.ctx, "api context canceled")
|
||||
return
|
||||
}
|
||||
api.logger.Error(api.ctx, "watcher error waiting for next event", slog.Error(err))
|
||||
continue
|
||||
}
|
||||
if event == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
now := api.clock.Now()
|
||||
switch {
|
||||
case event.Has(fsnotify.Create | fsnotify.Write):
|
||||
api.logger.Debug(api.ctx, "devcontainer config file changed", slog.F("file", event.Name))
|
||||
api.markDevcontainerDirty(event.Name, now)
|
||||
case event.Has(fsnotify.Remove):
|
||||
api.logger.Debug(api.ctx, "devcontainer config file removed", slog.F("file", event.Name))
|
||||
api.markDevcontainerDirty(event.Name, now)
|
||||
case event.Has(fsnotify.Rename):
|
||||
api.logger.Debug(api.ctx, "devcontainer config file renamed", slog.F("file", event.Name))
|
||||
api.markDevcontainerDirty(event.Name, now)
|
||||
default:
|
||||
api.logger.Debug(api.ctx, "devcontainer config file event ignored", slog.F("file", event.Name), slog.F("event", event))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Routes returns the HTTP handler for container-related routes.
|
||||
func (api *API) Routes() http.Handler {
|
||||
r := chi.NewRouter()
|
||||
@ -143,12 +229,12 @@ func copyListContainersResponse(resp codersdk.WorkspaceAgentListContainersRespon
|
||||
|
||||
func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListContainersResponse, error) {
|
||||
select {
|
||||
case <-api.ctx.Done():
|
||||
return codersdk.WorkspaceAgentListContainersResponse{}, api.ctx.Err()
|
||||
case <-ctx.Done():
|
||||
return codersdk.WorkspaceAgentListContainersResponse{}, ctx.Err()
|
||||
case api.lockCh <- struct{}{}:
|
||||
defer func() {
|
||||
<-api.lockCh
|
||||
}()
|
||||
defer func() { <-api.lockCh }()
|
||||
}
|
||||
|
||||
now := api.clock.Now()
|
||||
@ -165,29 +251,57 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC
|
||||
api.containers = updated
|
||||
api.mtime = now
|
||||
|
||||
dirtyStates := make(map[string]bool)
|
||||
// Reset all known devcontainers to not running.
|
||||
for i := range api.knownDevcontainers {
|
||||
api.knownDevcontainers[i].Running = false
|
||||
api.knownDevcontainers[i].Container = nil
|
||||
|
||||
// Preserve the dirty state and store in map for lookup.
|
||||
dirtyStates[api.knownDevcontainers[i].WorkspaceFolder] = api.knownDevcontainers[i].Dirty
|
||||
}
|
||||
|
||||
// Check if the container is running and update the known devcontainers.
|
||||
for _, container := range updated.Containers {
|
||||
workspaceFolder := container.Labels[DevcontainerLocalFolderLabel]
|
||||
if workspaceFolder != "" {
|
||||
configFile := container.Labels[DevcontainerConfigFileLabel]
|
||||
|
||||
if workspaceFolder == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this is already in our known list.
|
||||
if knownIndex := slices.IndexFunc(api.knownDevcontainers, func(dc codersdk.WorkspaceAgentDevcontainer) bool {
|
||||
return dc.WorkspaceFolder == workspaceFolder
|
||||
}); knownIndex != -1 {
|
||||
// Update existing entry with runtime information.
|
||||
if api.knownDevcontainers[knownIndex].ConfigPath == "" {
|
||||
api.knownDevcontainers[knownIndex].ConfigPath = container.Labels[DevcontainerConfigFileLabel]
|
||||
if configFile != "" && api.knownDevcontainers[knownIndex].ConfigPath == "" {
|
||||
api.knownDevcontainers[knownIndex].ConfigPath = configFile
|
||||
if err := api.watcher.Add(configFile); err != nil {
|
||||
api.logger.Error(ctx, "watch devcontainer config file failed", slog.Error(err), slog.F("file", configFile))
|
||||
}
|
||||
}
|
||||
api.knownDevcontainers[knownIndex].Running = container.Running
|
||||
api.knownDevcontainers[knownIndex].Container = &container
|
||||
|
||||
// Check if this container was created after the config
|
||||
// file was modified.
|
||||
if configFile != "" && api.knownDevcontainers[knownIndex].Dirty {
|
||||
lastModified, hasModTime := api.configFileModifiedTimes[configFile]
|
||||
if hasModTime && container.CreatedAt.After(lastModified) {
|
||||
api.logger.Info(ctx, "clearing dirty flag for container created after config modification",
|
||||
slog.F("container", container.ID),
|
||||
slog.F("created_at", container.CreatedAt),
|
||||
slog.F("config_modified_at", lastModified),
|
||||
slog.F("file", configFile),
|
||||
)
|
||||
api.knownDevcontainers[knownIndex].Dirty = false
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// NOTE(mafredri): This name impl. may change to accommodate devcontainer agents RFC.
|
||||
// If not in our known list, add as a runtime detected entry.
|
||||
name := path.Base(workspaceFolder)
|
||||
if _, ok := api.devcontainerNames[name]; ok {
|
||||
@ -201,16 +315,36 @@ func (api *API) getContainers(ctx context.Context) (codersdk.WorkspaceAgentListC
|
||||
}
|
||||
}
|
||||
api.devcontainerNames[name] = struct{}{}
|
||||
if configFile != "" {
|
||||
if err := api.watcher.Add(configFile); err != nil {
|
||||
api.logger.Error(ctx, "watch devcontainer config file failed", slog.Error(err), slog.F("file", configFile))
|
||||
}
|
||||
}
|
||||
|
||||
dirty := dirtyStates[workspaceFolder]
|
||||
if dirty {
|
||||
lastModified, hasModTime := api.configFileModifiedTimes[configFile]
|
||||
if hasModTime && container.CreatedAt.After(lastModified) {
|
||||
api.logger.Info(ctx, "new container created after config modification, not marking as dirty",
|
||||
slog.F("container", container.ID),
|
||||
slog.F("created_at", container.CreatedAt),
|
||||
slog.F("config_modified_at", lastModified),
|
||||
slog.F("file", configFile),
|
||||
)
|
||||
dirty = false
|
||||
}
|
||||
}
|
||||
|
||||
api.knownDevcontainers = append(api.knownDevcontainers, codersdk.WorkspaceAgentDevcontainer{
|
||||
ID: uuid.New(),
|
||||
Name: name,
|
||||
WorkspaceFolder: workspaceFolder,
|
||||
ConfigPath: container.Labels[DevcontainerConfigFileLabel],
|
||||
ConfigPath: configFile,
|
||||
Running: container.Running,
|
||||
Dirty: dirty,
|
||||
Container: &container,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return copyListContainersResponse(api.containers), nil
|
||||
}
|
||||
@ -271,6 +405,29 @@ func (api *API) handleRecreate(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO(mafredri): Temporarily handle clearing the dirty state after
|
||||
// recreation, later on this should be handled by a "container watcher".
|
||||
select {
|
||||
case <-api.ctx.Done():
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case api.lockCh <- struct{}{}:
|
||||
defer func() { <-api.lockCh }()
|
||||
}
|
||||
for i := range api.knownDevcontainers {
|
||||
if api.knownDevcontainers[i].WorkspaceFolder == workspaceFolder {
|
||||
if api.knownDevcontainers[i].Dirty {
|
||||
api.logger.Info(ctx, "clearing dirty flag after recreation",
|
||||
slog.F("workspace_folder", workspaceFolder),
|
||||
slog.F("name", api.knownDevcontainers[i].Name),
|
||||
)
|
||||
api.knownDevcontainers[i].Dirty = false
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
@ -289,6 +446,8 @@ func (api *API) handleListDevcontainers(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-api.ctx.Done():
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case api.lockCh <- struct{}{}:
|
||||
@ -309,3 +468,46 @@ func (api *API) handleListDevcontainers(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
httpapi.Write(ctx, w, http.StatusOK, response)
|
||||
}
|
||||
|
||||
// markDevcontainerDirty finds the devcontainer with the given config file path
|
||||
// and marks it as dirty. It acquires the lock before modifying the state.
|
||||
func (api *API) markDevcontainerDirty(configPath string, modifiedAt time.Time) {
|
||||
select {
|
||||
case <-api.ctx.Done():
|
||||
return
|
||||
case api.lockCh <- struct{}{}:
|
||||
defer func() { <-api.lockCh }()
|
||||
}
|
||||
|
||||
// Record the timestamp of when this configuration file was modified.
|
||||
api.configFileModifiedTimes[configPath] = modifiedAt
|
||||
|
||||
for i := range api.knownDevcontainers {
|
||||
if api.knownDevcontainers[i].ConfigPath != configPath {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO(mafredri): Simplistic mark for now, we should check if the
|
||||
// container is running and if the config file was modified after
|
||||
// the container was created.
|
||||
if !api.knownDevcontainers[i].Dirty {
|
||||
api.logger.Info(api.ctx, "marking devcontainer as dirty",
|
||||
slog.F("file", configPath),
|
||||
slog.F("name", api.knownDevcontainers[i].Name),
|
||||
slog.F("workspace_folder", api.knownDevcontainers[i].WorkspaceFolder),
|
||||
slog.F("modified_at", modifiedAt),
|
||||
)
|
||||
api.knownDevcontainers[i].Dirty = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (api *API) Close() error {
|
||||
api.cancel()
|
||||
<-api.done
|
||||
err := api.watcher.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -103,6 +103,8 @@ func TestAPI(t *testing.T) {
|
||||
logger = slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
api = NewAPI(logger, WithLister(mockLister))
|
||||
)
|
||||
defer api.Close()
|
||||
|
||||
api.cacheDuration = tc.cacheDur
|
||||
api.clock = clk
|
||||
api.containers = tc.cacheData
|
||||
|
@ -6,7 +6,9 @@ import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@ -17,6 +19,8 @@ import (
|
||||
"cdr.dev/slog/sloggers/slogtest"
|
||||
"github.com/coder/coder/v2/agent/agentcontainers"
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
"github.com/coder/quartz"
|
||||
)
|
||||
|
||||
// fakeLister implements the agentcontainers.Lister interface for
|
||||
@ -41,6 +45,103 @@ func (f *fakeDevcontainerCLI) Up(_ context.Context, _, _ string, _ ...agentconta
|
||||
return f.id, f.err
|
||||
}
|
||||
|
||||
// fakeWatcher implements the watcher.Watcher interface for testing.
|
||||
// It allows controlling what events are sent and when.
|
||||
type fakeWatcher struct {
|
||||
t testing.TB
|
||||
events chan *fsnotify.Event
|
||||
closeNotify chan struct{}
|
||||
addedPaths []string
|
||||
closed bool
|
||||
nextCalled chan struct{}
|
||||
nextErr error
|
||||
closeErr error
|
||||
}
|
||||
|
||||
func newFakeWatcher(t testing.TB) *fakeWatcher {
|
||||
return &fakeWatcher{
|
||||
t: t,
|
||||
events: make(chan *fsnotify.Event, 10), // Buffered to avoid blocking tests.
|
||||
closeNotify: make(chan struct{}),
|
||||
addedPaths: make([]string, 0),
|
||||
nextCalled: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *fakeWatcher) Add(file string) error {
|
||||
w.addedPaths = append(w.addedPaths, file)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *fakeWatcher) Remove(file string) error {
|
||||
for i, path := range w.addedPaths {
|
||||
if path == file {
|
||||
w.addedPaths = append(w.addedPaths[:i], w.addedPaths[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *fakeWatcher) clearNext() {
|
||||
select {
|
||||
case <-w.nextCalled:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (w *fakeWatcher) waitNext(ctx context.Context) bool {
|
||||
select {
|
||||
case <-w.t.Context().Done():
|
||||
return false
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-w.closeNotify:
|
||||
return false
|
||||
case <-w.nextCalled:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (w *fakeWatcher) Next(ctx context.Context) (*fsnotify.Event, error) {
|
||||
select {
|
||||
case w.nextCalled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
if w.nextErr != nil {
|
||||
err := w.nextErr
|
||||
w.nextErr = nil
|
||||
return nil, err
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-w.closeNotify:
|
||||
return nil, xerrors.New("watcher closed")
|
||||
case event := <-w.events:
|
||||
return event, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (w *fakeWatcher) Close() error {
|
||||
if w.closed {
|
||||
return nil
|
||||
}
|
||||
|
||||
w.closed = true
|
||||
close(w.closeNotify)
|
||||
return w.closeErr
|
||||
}
|
||||
|
||||
// sendEvent sends a file system event through the fake watcher.
|
||||
func (w *fakeWatcher) sendEventWaitNextCalled(ctx context.Context, event fsnotify.Event) {
|
||||
w.clearNext()
|
||||
w.events <- &event
|
||||
w.waitNext(ctx)
|
||||
}
|
||||
|
||||
func TestAPI(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@ -153,6 +254,7 @@ func TestAPI(t *testing.T) {
|
||||
agentcontainers.WithLister(tt.lister),
|
||||
agentcontainers.WithDevcontainerCLI(tt.devcontainerCLI),
|
||||
)
|
||||
defer api.Close()
|
||||
r.Mount("/", api.Routes())
|
||||
|
||||
// Simulate HTTP request to the recreate endpoint.
|
||||
@ -463,6 +565,7 @@ func TestAPI(t *testing.T) {
|
||||
}
|
||||
|
||||
api := agentcontainers.NewAPI(logger, apiOptions...)
|
||||
defer api.Close()
|
||||
r.Mount("/", api.Routes())
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/devcontainers", nil)
|
||||
@ -489,6 +592,109 @@ func TestAPI(t *testing.T) {
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("FileWatcher", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitMedium)
|
||||
|
||||
startTime := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||
mClock := quartz.NewMock(t)
|
||||
mClock.Set(startTime)
|
||||
fWatcher := newFakeWatcher(t)
|
||||
|
||||
// Create a fake container with a config file.
|
||||
configPath := "/workspace/project/.devcontainer/devcontainer.json"
|
||||
container := codersdk.WorkspaceAgentContainer{
|
||||
ID: "container-id",
|
||||
FriendlyName: "container-name",
|
||||
Running: true,
|
||||
CreatedAt: startTime.Add(-1 * time.Hour), // Created 1 hour before test start.
|
||||
Labels: map[string]string{
|
||||
agentcontainers.DevcontainerLocalFolderLabel: "/workspace/project",
|
||||
agentcontainers.DevcontainerConfigFileLabel: configPath,
|
||||
},
|
||||
}
|
||||
|
||||
fLister := &fakeLister{
|
||||
containers: codersdk.WorkspaceAgentListContainersResponse{
|
||||
Containers: []codersdk.WorkspaceAgentContainer{container},
|
||||
},
|
||||
}
|
||||
|
||||
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
|
||||
api := agentcontainers.NewAPI(
|
||||
logger,
|
||||
agentcontainers.WithLister(fLister),
|
||||
agentcontainers.WithWatcher(fWatcher),
|
||||
agentcontainers.WithClock(mClock),
|
||||
)
|
||||
defer api.Close()
|
||||
|
||||
r := chi.NewRouter()
|
||||
r.Mount("/", api.Routes())
|
||||
|
||||
// Call the list endpoint first to ensure config files are
|
||||
// detected and watched.
|
||||
req := httptest.NewRequest(http.MethodGet, "/devcontainers", nil)
|
||||
rec := httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, req)
|
||||
require.Equal(t, http.StatusOK, rec.Code)
|
||||
|
||||
var response codersdk.WorkspaceAgentDevcontainersResponse
|
||||
err := json.NewDecoder(rec.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, response.Devcontainers, 1)
|
||||
assert.False(t, response.Devcontainers[0].Dirty,
|
||||
"container should not be marked as dirty initially")
|
||||
|
||||
// Verify the watcher is watching the config file.
|
||||
assert.Contains(t, fWatcher.addedPaths, configPath,
|
||||
"watcher should be watching the container's config file")
|
||||
|
||||
// Make sure the start loop has been called.
|
||||
fWatcher.waitNext(ctx)
|
||||
|
||||
// Send a file modification event and check if the container is
|
||||
// marked dirty.
|
||||
fWatcher.sendEventWaitNextCalled(ctx, fsnotify.Event{
|
||||
Name: configPath,
|
||||
Op: fsnotify.Write,
|
||||
})
|
||||
|
||||
mClock.Advance(time.Minute).MustWait(ctx)
|
||||
|
||||
// Check if the container is marked as dirty.
|
||||
req = httptest.NewRequest(http.MethodGet, "/devcontainers", nil)
|
||||
rec = httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, req)
|
||||
require.Equal(t, http.StatusOK, rec.Code)
|
||||
|
||||
err = json.NewDecoder(rec.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, response.Devcontainers, 1)
|
||||
assert.True(t, response.Devcontainers[0].Dirty,
|
||||
"container should be marked as dirty after config file was modified")
|
||||
|
||||
mClock.Advance(time.Minute).MustWait(ctx)
|
||||
|
||||
container.ID = "new-container-id" // Simulate a new container ID after recreation.
|
||||
container.FriendlyName = "new-container-name"
|
||||
container.CreatedAt = mClock.Now() // Update the creation time.
|
||||
fLister.containers.Containers = []codersdk.WorkspaceAgentContainer{container}
|
||||
|
||||
// Check if dirty flag is cleared.
|
||||
req = httptest.NewRequest(http.MethodGet, "/devcontainers", nil)
|
||||
rec = httptest.NewRecorder()
|
||||
r.ServeHTTP(rec, req)
|
||||
require.Equal(t, http.StatusOK, rec.Code)
|
||||
|
||||
err = json.NewDecoder(rec.Body).Decode(&response)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, response.Devcontainers, 1)
|
||||
assert.False(t, response.Devcontainers[0].Dirty,
|
||||
"dirty flag should be cleared after container recreation")
|
||||
})
|
||||
}
|
||||
|
||||
// mustFindDevcontainerByPath returns the devcontainer with the given workspace
|
||||
|
48
agent/agentcontainers/watcher/noop.go
Normal file
48
agent/agentcontainers/watcher/noop.go
Normal file
@ -0,0 +1,48 @@
|
||||
package watcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
)
|
||||
|
||||
// NewNoop creates a new watcher that does nothing.
|
||||
func NewNoop() Watcher {
|
||||
return &noopWatcher{done: make(chan struct{})}
|
||||
}
|
||||
|
||||
type noopWatcher struct {
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func (*noopWatcher) Add(string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*noopWatcher) Remove(string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next blocks until the context is canceled or the watcher is closed.
|
||||
func (n *noopWatcher) Next(ctx context.Context) (*fsnotify.Event, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-n.done:
|
||||
return nil, ErrClosed
|
||||
}
|
||||
}
|
||||
|
||||
func (n *noopWatcher) Close() error {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
if n.closed {
|
||||
return ErrClosed
|
||||
}
|
||||
n.closed = true
|
||||
close(n.done)
|
||||
return nil
|
||||
}
|
70
agent/agentcontainers/watcher/noop_test.go
Normal file
70
agent/agentcontainers/watcher/noop_test.go
Normal file
@ -0,0 +1,70 @@
|
||||
package watcher_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/coder/coder/v2/agent/agentcontainers/watcher"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
func TestNoopWatcher(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create the noop watcher under test.
|
||||
wut := watcher.NewNoop()
|
||||
|
||||
// Test adding/removing files (should have no effect).
|
||||
err := wut.Add("some-file.txt")
|
||||
assert.NoError(t, err, "noop watcher should not return error on Add")
|
||||
|
||||
err = wut.Remove("some-file.txt")
|
||||
assert.NoError(t, err, "noop watcher should not return error on Remove")
|
||||
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
defer cancel()
|
||||
|
||||
// Start a goroutine to wait for Next to return.
|
||||
errC := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := wut.Next(ctx)
|
||||
errC <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-errC:
|
||||
require.Fail(t, "want Next to block")
|
||||
default:
|
||||
}
|
||||
|
||||
// Cancel the context and check that Next returns.
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case err := <-errC:
|
||||
assert.Error(t, err, "want Next error when context is canceled")
|
||||
case <-time.After(testutil.WaitShort):
|
||||
t.Fatal("want Next to return after context was canceled")
|
||||
}
|
||||
|
||||
// Test Close.
|
||||
err = wut.Close()
|
||||
assert.NoError(t, err, "want no error on Close")
|
||||
}
|
||||
|
||||
func TestNoopWatcher_CloseBeforeNext(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
wut := watcher.NewNoop()
|
||||
|
||||
err := wut.Close()
|
||||
require.NoError(t, err, "close watcher failed")
|
||||
|
||||
ctx := context.Background()
|
||||
_, err = wut.Next(ctx)
|
||||
assert.Error(t, err, "want Next to return error when watcher is closed")
|
||||
}
|
195
agent/agentcontainers/watcher/watcher.go
Normal file
195
agent/agentcontainers/watcher/watcher.go
Normal file
@ -0,0 +1,195 @@
|
||||
// Package watcher provides file system watching capabilities for the
|
||||
// agent. It defines an interface for monitoring file changes and
|
||||
// implementations that can be used to detect when configuration files
|
||||
// are modified. This is primarily used to track changes to devcontainer
|
||||
// configuration files and notify users when containers need to be
|
||||
// recreated to apply the new configuration.
|
||||
package watcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var ErrClosed = xerrors.New("watcher closed")
|
||||
|
||||
// Watcher defines an interface for monitoring file system changes.
|
||||
// Implementations track file modifications and provide an event stream
|
||||
// that clients can consume to react to changes.
|
||||
type Watcher interface {
|
||||
// Add starts watching a file for changes.
|
||||
Add(file string) error
|
||||
|
||||
// Remove stops watching a file for changes.
|
||||
Remove(file string) error
|
||||
|
||||
// Next blocks until a file system event occurs or the context is canceled.
|
||||
// It returns the next event or an error if the watcher encountered a problem.
|
||||
Next(context.Context) (*fsnotify.Event, error)
|
||||
|
||||
// Close shuts down the watcher and releases any resources.
|
||||
Close() error
|
||||
}
|
||||
|
||||
type fsnotifyWatcher struct {
|
||||
*fsnotify.Watcher
|
||||
|
||||
mu sync.Mutex // Protects following.
|
||||
watchedFiles map[string]bool // Files being watched (absolute path -> bool).
|
||||
watchedDirs map[string]int // Refcount of directories being watched (absolute path -> count).
|
||||
closed bool // Protects closing of done.
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewFSNotify creates a new file system watcher that watches parent directories
|
||||
// instead of individual files for more reliable event detection.
|
||||
func NewFSNotify() (Watcher, error) {
|
||||
w, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("create fsnotify watcher: %w", err)
|
||||
}
|
||||
return &fsnotifyWatcher{
|
||||
Watcher: w,
|
||||
done: make(chan struct{}),
|
||||
watchedFiles: make(map[string]bool),
|
||||
watchedDirs: make(map[string]int),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (f *fsnotifyWatcher) Add(file string) error {
|
||||
absPath, err := filepath.Abs(file)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("absolute path: %w", err)
|
||||
}
|
||||
|
||||
dir := filepath.Dir(absPath)
|
||||
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
// Already watching this file.
|
||||
if f.closed || f.watchedFiles[absPath] {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start watching the parent directory if not already watching.
|
||||
if f.watchedDirs[dir] == 0 {
|
||||
if err := f.Watcher.Add(dir); err != nil {
|
||||
return xerrors.Errorf("add directory to watcher: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Increment the reference count for this directory.
|
||||
f.watchedDirs[dir]++
|
||||
// Mark this file as watched.
|
||||
f.watchedFiles[absPath] = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fsnotifyWatcher) Remove(file string) error {
|
||||
absPath, err := filepath.Abs(file)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("absolute path: %w", err)
|
||||
}
|
||||
|
||||
dir := filepath.Dir(absPath)
|
||||
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
// Not watching this file.
|
||||
if f.closed || !f.watchedFiles[absPath] {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove the file from our watch list.
|
||||
delete(f.watchedFiles, absPath)
|
||||
|
||||
// Decrement the reference count for this directory.
|
||||
f.watchedDirs[dir]--
|
||||
|
||||
// If no more files in this directory are being watched, stop
|
||||
// watching the directory.
|
||||
if f.watchedDirs[dir] <= 0 {
|
||||
f.watchedDirs[dir] = 0 // Ensure non-negative count.
|
||||
if err := f.Watcher.Remove(dir); err != nil {
|
||||
return xerrors.Errorf("remove directory from watcher: %w", err)
|
||||
}
|
||||
delete(f.watchedDirs, dir)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fsnotifyWatcher) Next(ctx context.Context) (event *fsnotify.Event, err error) {
|
||||
defer func() {
|
||||
if ctx.Err() != nil {
|
||||
event = nil
|
||||
err = ctx.Err()
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case evt, ok := <-f.Events:
|
||||
if !ok {
|
||||
return nil, ErrClosed
|
||||
}
|
||||
|
||||
// Get the absolute path to match against our watched files.
|
||||
absPath, err := filepath.Abs(evt.Name)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
f.mu.Lock()
|
||||
if f.closed {
|
||||
f.mu.Unlock()
|
||||
return nil, ErrClosed
|
||||
}
|
||||
isWatched := f.watchedFiles[absPath]
|
||||
f.mu.Unlock()
|
||||
if !isWatched {
|
||||
continue // Ignore events for files not being watched.
|
||||
}
|
||||
|
||||
return &evt, nil
|
||||
|
||||
case err, ok := <-f.Errors:
|
||||
if !ok {
|
||||
return nil, ErrClosed
|
||||
}
|
||||
return nil, xerrors.Errorf("watcher error: %w", err)
|
||||
case <-f.done:
|
||||
return nil, ErrClosed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fsnotifyWatcher) Close() (err error) {
|
||||
f.mu.Lock()
|
||||
f.watchedFiles = nil
|
||||
f.watchedDirs = nil
|
||||
closed := f.closed
|
||||
f.closed = true
|
||||
f.mu.Unlock()
|
||||
|
||||
if closed {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
close(f.done)
|
||||
|
||||
if err := f.Watcher.Close(); err != nil {
|
||||
return xerrors.Errorf("close watcher: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
128
agent/agentcontainers/watcher/watcher_test.go
Normal file
128
agent/agentcontainers/watcher/watcher_test.go
Normal file
@ -0,0 +1,128 @@
|
||||
package watcher_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/coder/coder/v2/agent/agentcontainers/watcher"
|
||||
"github.com/coder/coder/v2/testutil"
|
||||
)
|
||||
|
||||
func TestFSNotifyWatcher(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Create test files.
|
||||
dir := t.TempDir()
|
||||
testFile := filepath.Join(dir, "test.json")
|
||||
err := os.WriteFile(testFile, []byte(`{"test": "initial"}`), 0o600)
|
||||
require.NoError(t, err, "create test file failed")
|
||||
|
||||
// Create the watcher under test.
|
||||
wut, err := watcher.NewFSNotify()
|
||||
require.NoError(t, err, "create FSNotify watcher failed")
|
||||
defer wut.Close()
|
||||
|
||||
// Add the test file to the watch list.
|
||||
err = wut.Add(testFile)
|
||||
require.NoError(t, err, "add file to watcher failed")
|
||||
|
||||
ctx := testutil.Context(t, testutil.WaitShort)
|
||||
|
||||
// Modify the test file to trigger an event.
|
||||
err = os.WriteFile(testFile, []byte(`{"test": "modified"}`), 0o600)
|
||||
require.NoError(t, err, "modify test file failed")
|
||||
|
||||
// Verify that we receive the event we want.
|
||||
for {
|
||||
event, err := wut.Next(ctx)
|
||||
require.NoError(t, err, "next event failed")
|
||||
|
||||
require.NotNil(t, event, "want non-nil event")
|
||||
if !event.Has(fsnotify.Write) {
|
||||
t.Logf("Ignoring event: %s", event)
|
||||
continue
|
||||
}
|
||||
require.Truef(t, event.Has(fsnotify.Write), "want write event: %s", event.String())
|
||||
require.Equal(t, event.Name, testFile, "want event for test file")
|
||||
break
|
||||
}
|
||||
|
||||
// Rename the test file to trigger a rename event.
|
||||
err = os.Rename(testFile, testFile+".bak")
|
||||
require.NoError(t, err, "rename test file failed")
|
||||
|
||||
// Verify that we receive the event we want.
|
||||
for {
|
||||
event, err := wut.Next(ctx)
|
||||
require.NoError(t, err, "next event failed")
|
||||
require.NotNil(t, event, "want non-nil event")
|
||||
if !event.Has(fsnotify.Rename) {
|
||||
t.Logf("Ignoring event: %s", event)
|
||||
continue
|
||||
}
|
||||
require.Truef(t, event.Has(fsnotify.Rename), "want rename event: %s", event.String())
|
||||
require.Equal(t, event.Name, testFile, "want event for test file")
|
||||
break
|
||||
}
|
||||
|
||||
err = os.WriteFile(testFile, []byte(`{"test": "new"}`), 0o600)
|
||||
require.NoError(t, err, "write new test file failed")
|
||||
|
||||
// Verify that we receive the event we want.
|
||||
for {
|
||||
event, err := wut.Next(ctx)
|
||||
require.NoError(t, err, "next event failed")
|
||||
require.NotNil(t, event, "want non-nil event")
|
||||
if !event.Has(fsnotify.Create) {
|
||||
t.Logf("Ignoring event: %s", event)
|
||||
continue
|
||||
}
|
||||
require.Truef(t, event.Has(fsnotify.Create), "want create event: %s", event.String())
|
||||
require.Equal(t, event.Name, testFile, "want event for test file")
|
||||
break
|
||||
}
|
||||
|
||||
err = os.WriteFile(testFile+".atomic", []byte(`{"test": "atomic"}`), 0o600)
|
||||
require.NoError(t, err, "write new atomic test file failed")
|
||||
|
||||
err = os.Rename(testFile+".atomic", testFile)
|
||||
require.NoError(t, err, "rename atomic test file failed")
|
||||
|
||||
// Verify that we receive the event we want.
|
||||
for {
|
||||
event, err := wut.Next(ctx)
|
||||
require.NoError(t, err, "next event failed")
|
||||
require.NotNil(t, event, "want non-nil event")
|
||||
if !event.Has(fsnotify.Create) {
|
||||
t.Logf("Ignoring event: %s", event)
|
||||
continue
|
||||
}
|
||||
require.Truef(t, event.Has(fsnotify.Create), "want create event: %s", event.String())
|
||||
require.Equal(t, event.Name, testFile, "want event for test file")
|
||||
break
|
||||
}
|
||||
|
||||
// Test removing the file from the watcher.
|
||||
err = wut.Remove(testFile)
|
||||
require.NoError(t, err, "remove file from watcher failed")
|
||||
}
|
||||
|
||||
func TestFSNotifyWatcher_CloseBeforeNext(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
wut, err := watcher.NewFSNotify()
|
||||
require.NoError(t, err, "create FSNotify watcher failed")
|
||||
|
||||
err = wut.Close()
|
||||
require.NoError(t, err, "close watcher failed")
|
||||
|
||||
ctx := context.Background()
|
||||
_, err = wut.Next(ctx)
|
||||
assert.Error(t, err, "want Next to return error when watcher is closed")
|
||||
}
|
@ -12,7 +12,7 @@ import (
|
||||
"github.com/coder/coder/v2/codersdk"
|
||||
)
|
||||
|
||||
func (a *agent) apiHandler() http.Handler {
|
||||
func (a *agent) apiHandler() (http.Handler, func() error) {
|
||||
r := chi.NewRouter()
|
||||
r.Get("/", func(rw http.ResponseWriter, r *http.Request) {
|
||||
httpapi.Write(r.Context(), rw, http.StatusOK, codersdk.Response{
|
||||
@ -63,7 +63,9 @@ func (a *agent) apiHandler() http.Handler {
|
||||
r.Get("/debug/manifest", a.HandleHTTPDebugManifest)
|
||||
r.Get("/debug/prometheus", promHandler.ServeHTTP)
|
||||
|
||||
return r
|
||||
return r, func() error {
|
||||
return containerAPI.Close()
|
||||
}
|
||||
}
|
||||
|
||||
type listeningPortsHandler struct {
|
||||
|
@ -408,6 +408,7 @@ type WorkspaceAgentDevcontainer struct {
|
||||
|
||||
// Additional runtime fields.
|
||||
Running bool `json:"running"`
|
||||
Dirty bool `json:"dirty"`
|
||||
Container *WorkspaceAgentContainer `json:"container,omitempty"`
|
||||
}
|
||||
|
||||
|
1
go.mod
1
go.mod
@ -488,6 +488,7 @@ require (
|
||||
|
||||
require (
|
||||
github.com/coder/preview v0.0.1
|
||||
github.com/fsnotify/fsnotify v1.9.0
|
||||
github.com/kylecarbs/aisdk-go v0.0.5
|
||||
github.com/mark3labs/mcp-go v0.23.1
|
||||
)
|
||||
|
1
site/src/api/typesGenerated.ts
generated
1
site/src/api/typesGenerated.ts
generated
@ -3252,6 +3252,7 @@ export interface WorkspaceAgentDevcontainer {
|
||||
readonly workspace_folder: string;
|
||||
readonly config_path?: string;
|
||||
readonly running: boolean;
|
||||
readonly dirty: boolean;
|
||||
readonly container?: WorkspaceAgentContainer;
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user