Moving from streaming manifest approach to SSE + explicit full agent reinitialization

Signed-off-by: Danny Kopping <danny@coder.com>

# Conflicts:
#	agent/agent.go

# Conflicts:
#	cli/agent.go
This commit is contained in:
Danny Kopping
2025-02-07 14:06:39 +02:00
parent b60f2f66c6
commit 3a9f9c8453
14 changed files with 482 additions and 441 deletions

View File

@ -1,15 +1,16 @@
package agentapi
import (
"cdr.dev/slog"
"context"
"database/sql"
"fmt"
"github.com/coder/coder/v2/coderd/database/pubsub"
"net/url"
"strings"
"time"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
@ -41,61 +42,6 @@ type ManifestAPI struct {
Log slog.Logger
}
func (a *ManifestAPI) StreamManifests(in *agentproto.GetManifestRequest, stream agentproto.DRPCAgent_StreamManifestsStream) error {
streamCtx := dbauthz.AsSystemRestricted(stream.Context()) // TODO:
defer func() {
if err := stream.CloseSend(); err != nil {
a.Log.Error(streamCtx, "error closing stream: %v", err)
}
}()
updates := make(chan struct{}, 1)
unsub, err := a.Pubsub.Subscribe(ManifestUpdateChannel(a.WorkspaceID), func(ctx context.Context, _ []byte) {
a.Log.Info(ctx, "received 'prebuild claimed' event for workspace, pushing down new manifest", slog.F("workspace_id", a.WorkspaceID.String()))
select {
case <-streamCtx.Done():
return
case <-ctx.Done():
return
case updates <- struct{}{}:
}
})
if err != nil {
return xerrors.Errorf("subscribe to 'prebuild claimed' event: %w", err)
}
defer unsub()
for {
manifest, err := a.GetManifest(streamCtx, in)
if err != nil {
return xerrors.Errorf("receive manifest: %w", err)
}
a.Log.Debug(streamCtx, "pushing manifest to workspace", slog.F("workspace_id", a.WorkspaceID))
// Send first retrieved manifest.
err = stream.Send(manifest)
if err != nil {
return xerrors.Errorf("send manifest: %w", err)
}
// ...then wait until triggered by prebuild claim completion.
// At this stage, a prebuild will have been claimed by a user and the agent will need to be reconfigured.
select {
case <-updates:
a.Log.Info(streamCtx, "received manifest update request", slog.F("workspace_id", a.WorkspaceID))
case <-streamCtx.Done():
return xerrors.Errorf("stream close: %w", streamCtx.Err())
}
}
}
func ManifestUpdateChannel(id uuid.UUID) string {
return fmt.Sprintf("prebuild_claimed_%s", id)
}
func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {
workspaceAgent, err := a.AgentFn(ctx)
if err != nil {

View File

@ -1193,6 +1193,7 @@ func New(options *Options) *API {
r.Get("/external-auth", api.workspaceAgentsExternalAuth)
r.Get("/gitsshkey", api.agentGitSSHKey)
r.Post("/log-source", api.workspaceAgentPostLogSource)
r.Get("/reinit", api.workspaceAgentReinit)
})
r.Route("/{workspaceagent}", func(r chi.Router) {
r.Use(

View File

@ -15,7 +15,7 @@ import (
"sync/atomic"
"time"
"github.com/coder/coder/v2/coderd/agentapi"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/google/uuid"
"github.com/sqlc-dev/pqtype"
@ -1713,11 +1713,13 @@ func (s *server) CompleteJob(ctx context.Context, completed *proto.CompletedJob)
// If this job was initiated by the prebuilds user and the job is not a prebuild, then it MUST be the claim run.
// TODO: maybe add some specific metadata to indicate this rather than imputing it.
if input.IsPrebuildClaimByUser != uuid.Nil {
if input.PrebuildClaimByUser != uuid.Nil {
channel := agentsdk.PrebuildClaimedChannel(workspace.ID)
s.Logger.Info(ctx, "workspace prebuild successfully claimed by user",
slog.F("user", input.IsPrebuildClaimByUser.String()),
slog.F("workspace_id", workspace.ID))
if err := s.Pubsub.Publish(agentapi.ManifestUpdateChannel(workspace.ID), nil); err != nil {
slog.F("user", input.PrebuildClaimByUser.String()),
slog.F("workspace_id", workspace.ID),
slog.F("channel", channel))
if err := s.Pubsub.Publish(channel, []byte(input.PrebuildClaimByUser.String())); err != nil {
s.Logger.Error(ctx, "failed to publish message to workspace agent to pull new manifest", slog.Error(err))
}
}
@ -2381,10 +2383,10 @@ type TemplateVersionImportJob struct {
// WorkspaceProvisionJob is the payload for the "workspace_provision" job type.
type WorkspaceProvisionJob struct {
WorkspaceBuildID uuid.UUID `json:"workspace_build_id"`
DryRun bool `json:"dry_run"`
IsPrebuild bool `json:"is_prebuild,omitempty"`
IsPrebuildClaimByUser uuid.UUID `json:"is_prebuild_claim_by,omitempty"`
WorkspaceBuildID uuid.UUID `json:"workspace_build_id"`
DryRun bool `json:"dry_run"`
IsPrebuild bool `json:"is_prebuild,omitempty"`
PrebuildClaimByUser uuid.UUID `json:"prebuild_claim_by,omitempty"`
// RunningWorkspaceAgentID is *only* used for prebuilds. We pass it down when we want to rebuild a prebuilt workspace
// but not generate a new agent token. The provisionerdserver will retrieve this token and push it down to
// the provisioner (and ultimately to the `coder_agent` resource in the Terraform provider) where it will be

View File

@ -23,6 +23,8 @@ import (
"tailscale.com/tailcfg"
"cdr.dev/slog"
"github.com/coder/websocket"
"github.com/coder/coder/v2/coderd/agentapi"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/db2sdk"
@ -42,7 +44,6 @@ import (
"github.com/coder/coder/v2/codersdk/wsjson"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/websocket"
)
// @Summary Get workspace agent by ID
@ -1046,6 +1047,105 @@ func (api *API) workspaceAgentPostLogSource(rw http.ResponseWriter, r *http.Requ
httpapi.Write(ctx, rw, http.StatusCreated, apiSource)
}
// TODO @Summary Post workspace agent log source
// TODO @ID post-workspace-agent-log-source
// TODO @Security CoderSessionToken
// TODO @Accept json
// TODO @Produce json
// TODO @Tags Agents
// TODO @Param request body agentsdk.PostLogSourceRequest true "Log source request"
// TODO @Success 200 {object} codersdk.WorkspaceAgentLogSource
// TODO @Router /workspaceagents/me/log-source [post]
func (api *API) workspaceAgentReinit(rw http.ResponseWriter, r *http.Request) {
// Allow us to interrupt watch via cancel.
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
r = r.WithContext(ctx) // Rewire context for SSE cancellation.
workspaceAgent := httpmw.WorkspaceAgent(r)
log := api.Logger.Named("workspace_agent_reinit_watcher").With(
slog.F("workspace_agent_id", workspaceAgent.ID),
)
workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
if err != nil {
log.Error(ctx, "failed to retrieve workspace from agent token", slog.Error(err))
httpapi.InternalServerError(rw, errors.New("failed to determine workspace from agent token"))
}
log.Info(ctx, "agent waiting for reinit instruction")
prebuildClaims := make(chan uuid.UUID, 1)
cancelSub, err := api.Pubsub.Subscribe(agentsdk.PrebuildClaimedChannel(workspace.ID), func(inner context.Context, id []byte) {
select {
case <-ctx.Done():
return
case <-inner.Done():
return
default:
}
parsed, err := uuid.ParseBytes(id)
if err != nil {
log.Error(ctx, "invalid prebuild claimed channel payload", slog.F("input", string(id)))
return
}
prebuildClaims <- parsed
})
if err != nil {
log.Error(ctx, "failed to subscribe to prebuild claimed channel", slog.Error(err))
httpapi.InternalServerError(rw, errors.New("failed to subscribe to prebuild claimed channel"))
return
}
defer cancelSub()
sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error setting up server-sent events.",
Detail: err.Error(),
})
return
}
// Prevent handler from returning until the sender is closed.
defer func() {
cancel()
<-sseSenderClosed
}()
// Synchronize cancellation from SSE -> context, this lets us simplify the
// cancellation logic.
go func() {
select {
case <-ctx.Done():
case <-sseSenderClosed:
cancel()
}
}()
// An initial ping signals to the request that the server is now ready
// and the client can begin servicing a channel with data.
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypePing,
})
// Expand with future use-cases for agent reinitialization.
for {
select {
case <-ctx.Done():
return
case user := <-prebuildClaims:
err = sseSendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: agentsdk.ReinitializationResponse{
Message: fmt.Sprintf("prebuild claimed by user: %s", user),
Reason: agentsdk.ReinitializeReasonPrebuildClaimed,
},
})
log.Warn(ctx, "failed to send SSE response to trigger reinit", slog.Error(err))
}
}
}
// convertProvisionedApps converts applications that are in the middle of provisioning process.
// It means that they may not have an agent or workspace assigned (dry-run job).
func convertProvisionedApps(dbApps []database.WorkspaceApp) []codersdk.WorkspaceApp {

View File

@ -319,7 +319,7 @@ func (b *Builder) buildTx(authFunc func(action policy.Action, object rbac.Object
WorkspaceBuildID: workspaceBuildID,
LogLevel: b.logLevel,
IsPrebuild: b.prebuild,
IsPrebuildClaimByUser: b.prebuildClaimBy,
PrebuildClaimByUser: b.prebuildClaimBy,
RunningWorkspaceAgentID: b.runningWorkspaceAgentID,
})
if err != nil {