mirror of
https://github.com/coder/coder.git
synced 2025-07-03 16:13:58 +00:00
We currently send empty payloads to pubsub channels of the form `workspace:<workspace_id>` to notify listeners of updates to workspaces (such as for refreshing the workspace dashboard). To support https://github.com/coder/coder/issues/14716, we'll instead send `WorkspaceEvent` payloads to pubsub channels of the form `workspace_owner:<owner_id>`. This enables a listener to receive events for all workspaces owned by a user. This PR replaces the usage of the old channels without modifying any existing behaviors. ``` type WorkspaceEvent struct { Kind WorkspaceEventKind `json:"kind"` WorkspaceID uuid.UUID `json:"workspace_id" format:"uuid"` // AgentID is only set for WorkspaceEventKindAgent* events // (excluding AgentTimeout) AgentID *uuid.UUID `json:"agent_id,omitempty" format:"uuid"` } ``` We've defined `WorkspaceEventKind`s based on how the old channel was used, but it's not yet necessary to inspect the types of any of the events, as the existing listeners are designed to fire off any of them. ``` WorkspaceEventKindStateChange WorkspaceEventKind = "state_change" WorkspaceEventKindStatsUpdate WorkspaceEventKind = "stats_update" WorkspaceEventKindMetadataUpdate WorkspaceEventKind = "mtd_update" WorkspaceEventKindAppHealthUpdate WorkspaceEventKind = "app_health" WorkspaceEventKindAgentLifecycleUpdate WorkspaceEventKind = "agt_lifecycle_update" WorkspaceEventKindAgentLogsUpdate WorkspaceEventKind = "agt_logs_update" WorkspaceEventKindAgentConnectionUpdate WorkspaceEventKind = "agt_connection_update" WorkspaceEventKindAgentLogsOverflow WorkspaceEventKind = "agt_logs_overflow" WorkspaceEventKindAgentTimeout WorkspaceEventKind = "agt_timeout" ```
107 lines
3.1 KiB
Go
107 lines
3.1 KiB
Go
package agentapi
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/google/uuid"
|
|
"golang.org/x/xerrors"
|
|
|
|
"cdr.dev/slog"
|
|
agentproto "github.com/coder/coder/v2/agent/proto"
|
|
"github.com/coder/coder/v2/coderd/database"
|
|
"github.com/coder/coder/v2/coderd/wspubsub"
|
|
)
|
|
|
|
type AppsAPI struct {
|
|
AgentFn func(context.Context) (database.WorkspaceAgent, error)
|
|
Database database.Store
|
|
Log slog.Logger
|
|
PublishWorkspaceUpdateFn func(context.Context, *database.WorkspaceAgent, wspubsub.WorkspaceEventKind) error
|
|
}
|
|
|
|
func (a *AppsAPI) BatchUpdateAppHealths(ctx context.Context, req *agentproto.BatchUpdateAppHealthRequest) (*agentproto.BatchUpdateAppHealthResponse, error) {
|
|
workspaceAgent, err := a.AgentFn(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
a.Log.Debug(ctx, "got batch app health update",
|
|
slog.F("agent_id", workspaceAgent.ID.String()),
|
|
slog.F("updates", req.Updates),
|
|
)
|
|
|
|
if len(req.Updates) == 0 {
|
|
return &agentproto.BatchUpdateAppHealthResponse{}, nil
|
|
}
|
|
|
|
apps, err := a.Database.GetWorkspaceAppsByAgentID(ctx, workspaceAgent.ID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get workspace apps by agent ID %q: %w", workspaceAgent.ID, err)
|
|
}
|
|
|
|
var newApps []database.WorkspaceApp
|
|
for _, update := range req.Updates {
|
|
updateID, err := uuid.FromBytes(update.Id)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("parse workspace app ID %q: %w", update.Id, err)
|
|
}
|
|
|
|
old := func() *database.WorkspaceApp {
|
|
for _, app := range apps {
|
|
if app.ID == updateID {
|
|
return &app
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}()
|
|
if old == nil {
|
|
return nil, xerrors.Errorf("workspace app ID %q not found", updateID)
|
|
}
|
|
|
|
if old.HealthcheckUrl == "" {
|
|
return nil, xerrors.Errorf("workspace app %q (%q) does not have healthchecks enabled", updateID, old.Slug)
|
|
}
|
|
|
|
var newHealth database.WorkspaceAppHealth
|
|
switch update.Health {
|
|
case agentproto.AppHealth_DISABLED:
|
|
newHealth = database.WorkspaceAppHealthDisabled
|
|
case agentproto.AppHealth_INITIALIZING:
|
|
newHealth = database.WorkspaceAppHealthInitializing
|
|
case agentproto.AppHealth_HEALTHY:
|
|
newHealth = database.WorkspaceAppHealthHealthy
|
|
case agentproto.AppHealth_UNHEALTHY:
|
|
newHealth = database.WorkspaceAppHealthUnhealthy
|
|
default:
|
|
return nil, xerrors.Errorf("unknown health status %q for app %q (%q)", update.Health, updateID, old.Slug)
|
|
}
|
|
|
|
// Don't bother updating if the value hasn't changed.
|
|
if old.Health == newHealth {
|
|
continue
|
|
}
|
|
old.Health = newHealth
|
|
|
|
newApps = append(newApps, *old)
|
|
}
|
|
|
|
for _, app := range newApps {
|
|
err = a.Database.UpdateWorkspaceAppHealthByID(ctx, database.UpdateWorkspaceAppHealthByIDParams{
|
|
ID: app.ID,
|
|
Health: app.Health,
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update workspace app health for app %q (%q): %w", err, app.ID, app.Slug)
|
|
}
|
|
}
|
|
|
|
if a.PublishWorkspaceUpdateFn != nil && len(newApps) > 0 {
|
|
err = a.PublishWorkspaceUpdateFn(ctx, &workspaceAgent, wspubsub.WorkspaceEventKindAppHealthUpdate)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("publish workspace update: %w", err)
|
|
}
|
|
}
|
|
return &agentproto.BatchUpdateAppHealthResponse{}, nil
|
|
}
|