diff --git a/agent/agent.go b/agent/agent.go index 0ed89b6045..ef8e54e009 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -87,7 +87,7 @@ type Options struct { } type Client interface { - ConnectRPC23(ctx context.Context) ( + ConnectRPC24(ctx context.Context) ( proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error, ) RewriteDERPMap(derpMap *tailcfg.DERPMap) @@ -742,7 +742,7 @@ func (a *agent) run() (retErr error) { a.sessionToken.Store(&sessionToken) // ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs - aAPI, tAPI, err := a.client.ConnectRPC23(a.hardCtx) + aAPI, tAPI, err := a.client.ConnectRPC24(a.hardCtx) if err != nil { return err } @@ -915,7 +915,6 @@ func (a *agent) handleManifestStream(manifestOK *checkpoint) func(ctx context.Co } } -// TODO: change signature to just take in all inputs instead of returning closure; return error func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentClient24, manifestOK *checkpoint, mp *proto.Manifest) error { var ( sentResult bool @@ -927,6 +926,8 @@ func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentCl } }() + a.metrics.manifestsReceived.Inc() + manifest, err := agentsdk.ManifestFromProto(mp) if err != nil { a.logger.Critical(ctx, "failed to convert manifest", slog.F("manifest", mp), slog.Error(err)) @@ -964,9 +965,6 @@ func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentCl manifestOK.complete(nil) sentResult = true - // TODO: remove - a.logger.Info(ctx, "NOW OWNED BY", slog.F("owner", manifest.OwnerName)) - // TODO: this will probably have to change in the case of prebuilds; maybe check if owner is the same, // or add prebuild metadata to manifest? // The startup script should only execute on the first run! diff --git a/agent/metrics.go b/agent/metrics.go index 6c89827d2c..94995469e9 100644 --- a/agent/metrics.go +++ b/agent/metrics.go @@ -20,6 +20,7 @@ type agentMetrics struct { // took to run. This is reported once per agent. startupScriptSeconds *prometheus.GaugeVec currentConnections *prometheus.GaugeVec + manifestsReceived prometheus.Counter } func newAgentMetrics(registerer prometheus.Registerer) *agentMetrics { @@ -54,11 +55,20 @@ func newAgentMetrics(registerer prometheus.Registerer) *agentMetrics { }, []string{"connection_type"}) registerer.MustRegister(currentConnections) + manifestsReceived := prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "coderd", + Subsystem: "agentstats", + Name: "manifests_received", + Help: "The number of manifests this agent has received from the control plane.", + }) + registerer.MustRegister(manifestsReceived) + return &agentMetrics{ connectionsTotal: connectionsTotal, reconnectingPTYErrors: reconnectingPTYErrors, startupScriptSeconds: startupScriptSeconds, currentConnections: currentConnections, + manifestsReceived: manifestsReceived, } } diff --git a/coderd/agentapi/api.go b/coderd/agentapi/api.go index 1b9ac82a9f..c48fd17f8a 100644 --- a/coderd/agentapi/api.go +++ b/coderd/agentapi/api.go @@ -97,6 +97,7 @@ func New(opts Options) *API { DerpMapFn: opts.DerpMapFn, WorkspaceID: opts.WorkspaceID, Log: opts.Log.Named("manifests"), + Pubsub: opts.Pubsub, } api.AnnouncementBannerAPI = &AnnouncementBannerAPI{ diff --git a/coderd/agentapi/manifest.go b/coderd/agentapi/manifest.go index f0ac6ebe34..e07b24094b 100644 --- a/coderd/agentapi/manifest.go +++ b/coderd/agentapi/manifest.go @@ -4,6 +4,8 @@ import ( "cdr.dev/slog" "context" "database/sql" + "fmt" + "github.com/coder/coder/v2/coderd/database/pubsub" "net/url" "strings" "time" @@ -35,6 +37,7 @@ type ManifestAPI struct { AgentFn func(context.Context) (database.WorkspaceAgent, error) Database database.Store DerpMapFn func() *tailcfg.DERPMap + Pubsub pubsub.Pubsub Log slog.Logger } @@ -47,21 +50,52 @@ func (a *ManifestAPI) StreamManifests(in *agentproto.GetManifestRequest, stream } }() + updates := make(chan struct{}, 1) + + unsub, err := a.Pubsub.Subscribe(ManifestUpdateChannel(a.WorkspaceID), func(ctx context.Context, _ []byte) { + a.Log.Info(ctx, "received 'prebuild claimed' event for workspace, pushing down new manifest", slog.F("workspace_id", a.WorkspaceID.String())) + select { + case <-streamCtx.Done(): + return + case <-ctx.Done(): + return + case updates <- struct{}{}: + } + }) + if err != nil { + return xerrors.Errorf("subscribe to 'prebuild claimed' event: %w", err) + } + defer unsub() + for { - resp, err := a.GetManifest(streamCtx, in) + manifest, err := a.GetManifest(streamCtx, in) if err != nil { return xerrors.Errorf("receive manifest: %w", err) } - err = stream.Send(resp) + a.Log.Debug(streamCtx, "pushing manifest to workspace", slog.F("workspace_id", a.WorkspaceID)) + + // Send first retrieved manifest. + err = stream.Send(manifest) if err != nil { return xerrors.Errorf("send manifest: %w", err) } - time.Sleep(time.Second * 5) + // ...then wait until triggered by prebuild claim completion. + // At this stage, a prebuild will have been claimed by a user and the agent will need to be reconfigured. + select { + case <-updates: + a.Log.Info(streamCtx, "received manifest update request", slog.F("workspace_id", a.WorkspaceID)) + case <-streamCtx.Done(): + return xerrors.Errorf("stream close: %w", streamCtx.Err()) + } } } +func ManifestUpdateChannel(id uuid.UUID) string { + return fmt.Sprintf("prebuild_claimed_%s", id) +} + func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifestRequest) (*agentproto.Manifest, error) { workspaceAgent, err := a.AgentFn(ctx) if err != nil { diff --git a/coderd/provisionerdserver/provisionerdserver.go b/coderd/provisionerdserver/provisionerdserver.go index e3db484a71..633ca30d35 100644 --- a/coderd/provisionerdserver/provisionerdserver.go +++ b/coderd/provisionerdserver/provisionerdserver.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/coder/coder/v2/coderd/agentapi" "net/http" "net/url" "reflect" @@ -1707,6 +1708,17 @@ func (s *server) CompleteJob(ctx context.Context, completed *proto.CompletedJob) if err != nil { return nil, xerrors.Errorf("update workspace: %w", err) } + + // If this job was initiated by the prebuilds user and the job is not a prebuild, then it MUST be the claim run. + // TODO: maybe add some specific metadata to indicate this rather than imputing it. + if input.IsPrebuildClaimByUser != uuid.Nil { + s.Logger.Info(ctx, "workspace prebuild successfully claimed by user", + slog.F("user", input.IsPrebuildClaimByUser.String()), + slog.F("workspace_id", workspace.ID)) + if err := s.Pubsub.Publish(agentapi.ManifestUpdateChannel(workspace.ID), nil); err != nil { + s.Logger.Error(ctx, "failed to publish message to workspace agent to pull new manifest", slog.Error(err)) + } + } case *proto.CompletedJob_TemplateDryRun_: for _, resource := range jobType.TemplateDryRun.Resources { s.Logger.Info(ctx, "inserting template dry-run job resource", @@ -2356,9 +2368,10 @@ type TemplateVersionImportJob struct { // WorkspaceProvisionJob is the payload for the "workspace_provision" job type. type WorkspaceProvisionJob struct { - WorkspaceBuildID uuid.UUID `json:"workspace_build_id"` - DryRun bool `json:"dry_run"` - IsPrebuild bool `json:"is_prebuild,omitempty"` + WorkspaceBuildID uuid.UUID `json:"workspace_build_id"` + DryRun bool `json:"dry_run"` + IsPrebuild bool `json:"is_prebuild,omitempty"` + IsPrebuildClaimByUser uuid.UUID `json:"is_prebuild_claim_by,omitempty"` // RunningWorkspaceAgentID is *only* used for prebuilds. We pass it down when we want to rebuild a prebuilt workspace // but not generate a new agent token. The provisionerdserver will retrieve this token and push it down to // the provisioner (and ultimately to the `coder_agent` resource in the Terraform provider) where it will be diff --git a/coderd/workspaces.go b/coderd/workspaces.go index af8a8e8223..c0455925b1 100644 --- a/coderd/workspaces.go +++ b/coderd/workspaces.go @@ -685,6 +685,7 @@ func createWorkspace( if claimedWorkspace != nil { workspaceID = claimedWorkspace.ID + initiatorID = prebuilds.PrebuildOwnerUUID } else { // Workspaces are created without any versions. minimumWorkspace, err := db.InsertWorkspace(ctx, database.InsertWorkspaceParams{ @@ -727,6 +728,10 @@ func createWorkspace( builder = builder.VersionID(req.TemplateVersionID) } + if claimedWorkspace != nil { + builder = builder.MarkPrebuildClaimBy(owner.ID) + } + workspaceBuild, provisionerJob, provisionerDaemons, err = builder.Build( ctx, db, diff --git a/coderd/wsbuilder/wsbuilder.go b/coderd/wsbuilder/wsbuilder.go index 9ad87f1e04..cf8cb6c362 100644 --- a/coderd/wsbuilder/wsbuilder.go +++ b/coderd/wsbuilder/wsbuilder.go @@ -72,8 +72,10 @@ type Builder struct { lastBuildJob *database.ProvisionerJob parameterNames *[]string parameterValues *[]string - prebuild bool - runningWorkspaceAgentID uuid.UUID + + prebuild bool + prebuildClaimBy uuid.UUID + runningWorkspaceAgentID uuid.UUID verifyNoLegacyParametersOnce bool } @@ -176,6 +178,12 @@ func (b Builder) MarkPrebuild() Builder { return b } +func (b Builder) MarkPrebuildClaimBy(userID uuid.UUID) Builder { + // nolint: revive + b.prebuildClaimBy = userID + return b +} + // RunningWorkspaceAgentID is only used for prebuilds; see the associated field in `provisionerdserver.WorkspaceProvisionJob`. func (b Builder) RunningWorkspaceAgentID(id uuid.UUID) Builder { // nolint: revive @@ -311,6 +319,7 @@ func (b *Builder) buildTx(authFunc func(action policy.Action, object rbac.Object WorkspaceBuildID: workspaceBuildID, LogLevel: b.logLevel, IsPrebuild: b.prebuild, + IsPrebuildClaimByUser: b.prebuildClaimBy, RunningWorkspaceAgentID: b.runningWorkspaceAgentID, }) if err != nil { diff --git a/codersdk/agentsdk/agentsdk.go b/codersdk/agentsdk/agentsdk.go index f942ab199a..0ff9f04867 100644 --- a/codersdk/agentsdk/agentsdk.go +++ b/codersdk/agentsdk/agentsdk.go @@ -220,7 +220,7 @@ func (c *Client) ConnectRPC22(ctx context.Context) ( // ConnectRPC23 returns a dRPC client to the Agent API v2.3. It is useful when you want to be // maximally compatible with Coderd Release Versions from 2.18+ func (c *Client) ConnectRPC23(ctx context.Context) ( - proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error, + proto.DRPCAgentClient23, tailnetproto.DRPCTailnetClient23, error, ) { conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 3)) if err != nil { @@ -229,6 +229,18 @@ func (c *Client) ConnectRPC23(ctx context.Context) ( return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil } +// ConnectRPC24 returns a dRPC client to the Agent API v2.4. It is useful when you want to be +// maximally compatible with Coderd Release Versions from 2.18+ // TODO update release +func (c *Client) ConnectRPC24(ctx context.Context) ( + proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error, +) { + conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 4)) + if err != nil { + return nil, nil, err + } + return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil +} + // ConnectRPC connects to the workspace agent API and tailnet API func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) { return c.connectRPCVersion(ctx, proto.CurrentVersion) diff --git a/tailnet/proto/version.go b/tailnet/proto/version.go index 8d8bd5343d..f86711171d 100644 --- a/tailnet/proto/version.go +++ b/tailnet/proto/version.go @@ -40,7 +40,7 @@ import ( // ScriptCompleted, but be prepared to process "unsupported" errors.) const ( CurrentMajor = 2 - CurrentMinor = 3 + CurrentMinor = 4 ) var CurrentVersion = apiversion.New(CurrentMajor, CurrentMinor)