WIP: claim triggering manifest push to agent

Signed-off-by: Danny Kopping <danny@coder.com>
This commit is contained in:
Danny Kopping
2025-02-04 13:50:54 +00:00
parent 903f89694c
commit 0ba8f89df1
9 changed files with 98 additions and 16 deletions

View File

@ -87,7 +87,7 @@ type Options struct {
} }
type Client interface { type Client interface {
ConnectRPC23(ctx context.Context) ( ConnectRPC24(ctx context.Context) (
proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error, proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error,
) )
RewriteDERPMap(derpMap *tailcfg.DERPMap) RewriteDERPMap(derpMap *tailcfg.DERPMap)
@ -742,7 +742,7 @@ func (a *agent) run() (retErr error) {
a.sessionToken.Store(&sessionToken) a.sessionToken.Store(&sessionToken)
// ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs // ConnectRPC returns the dRPC connection we use for the Agent and Tailnet v2+ APIs
aAPI, tAPI, err := a.client.ConnectRPC23(a.hardCtx) aAPI, tAPI, err := a.client.ConnectRPC24(a.hardCtx)
if err != nil { if err != nil {
return err return err
} }
@ -915,7 +915,6 @@ func (a *agent) handleManifestStream(manifestOK *checkpoint) func(ctx context.Co
} }
} }
// TODO: change signature to just take in all inputs instead of returning closure; return error
func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentClient24, manifestOK *checkpoint, mp *proto.Manifest) error { func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentClient24, manifestOK *checkpoint, mp *proto.Manifest) error {
var ( var (
sentResult bool sentResult bool
@ -927,6 +926,8 @@ func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentCl
} }
}() }()
a.metrics.manifestsReceived.Inc()
manifest, err := agentsdk.ManifestFromProto(mp) manifest, err := agentsdk.ManifestFromProto(mp)
if err != nil { if err != nil {
a.logger.Critical(ctx, "failed to convert manifest", slog.F("manifest", mp), slog.Error(err)) a.logger.Critical(ctx, "failed to convert manifest", slog.F("manifest", mp), slog.Error(err))
@ -964,9 +965,6 @@ func (a *agent) handleSingleManifest(ctx context.Context, aAPI proto.DRPCAgentCl
manifestOK.complete(nil) manifestOK.complete(nil)
sentResult = true sentResult = true
// TODO: remove
a.logger.Info(ctx, "NOW OWNED BY", slog.F("owner", manifest.OwnerName))
// TODO: this will probably have to change in the case of prebuilds; maybe check if owner is the same, // TODO: this will probably have to change in the case of prebuilds; maybe check if owner is the same,
// or add prebuild metadata to manifest? // or add prebuild metadata to manifest?
// The startup script should only execute on the first run! // The startup script should only execute on the first run!

View File

@ -20,6 +20,7 @@ type agentMetrics struct {
// took to run. This is reported once per agent. // took to run. This is reported once per agent.
startupScriptSeconds *prometheus.GaugeVec startupScriptSeconds *prometheus.GaugeVec
currentConnections *prometheus.GaugeVec currentConnections *prometheus.GaugeVec
manifestsReceived prometheus.Counter
} }
func newAgentMetrics(registerer prometheus.Registerer) *agentMetrics { func newAgentMetrics(registerer prometheus.Registerer) *agentMetrics {
@ -54,11 +55,20 @@ func newAgentMetrics(registerer prometheus.Registerer) *agentMetrics {
}, []string{"connection_type"}) }, []string{"connection_type"})
registerer.MustRegister(currentConnections) registerer.MustRegister(currentConnections)
manifestsReceived := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "coderd",
Subsystem: "agentstats",
Name: "manifests_received",
Help: "The number of manifests this agent has received from the control plane.",
})
registerer.MustRegister(manifestsReceived)
return &agentMetrics{ return &agentMetrics{
connectionsTotal: connectionsTotal, connectionsTotal: connectionsTotal,
reconnectingPTYErrors: reconnectingPTYErrors, reconnectingPTYErrors: reconnectingPTYErrors,
startupScriptSeconds: startupScriptSeconds, startupScriptSeconds: startupScriptSeconds,
currentConnections: currentConnections, currentConnections: currentConnections,
manifestsReceived: manifestsReceived,
} }
} }

View File

@ -97,6 +97,7 @@ func New(opts Options) *API {
DerpMapFn: opts.DerpMapFn, DerpMapFn: opts.DerpMapFn,
WorkspaceID: opts.WorkspaceID, WorkspaceID: opts.WorkspaceID,
Log: opts.Log.Named("manifests"), Log: opts.Log.Named("manifests"),
Pubsub: opts.Pubsub,
} }
api.AnnouncementBannerAPI = &AnnouncementBannerAPI{ api.AnnouncementBannerAPI = &AnnouncementBannerAPI{

View File

@ -4,6 +4,8 @@ import (
"cdr.dev/slog" "cdr.dev/slog"
"context" "context"
"database/sql" "database/sql"
"fmt"
"github.com/coder/coder/v2/coderd/database/pubsub"
"net/url" "net/url"
"strings" "strings"
"time" "time"
@ -35,6 +37,7 @@ type ManifestAPI struct {
AgentFn func(context.Context) (database.WorkspaceAgent, error) AgentFn func(context.Context) (database.WorkspaceAgent, error)
Database database.Store Database database.Store
DerpMapFn func() *tailcfg.DERPMap DerpMapFn func() *tailcfg.DERPMap
Pubsub pubsub.Pubsub
Log slog.Logger Log slog.Logger
} }
@ -47,19 +50,50 @@ func (a *ManifestAPI) StreamManifests(in *agentproto.GetManifestRequest, stream
} }
}() }()
updates := make(chan struct{}, 1)
unsub, err := a.Pubsub.Subscribe(ManifestUpdateChannel(a.WorkspaceID), func(ctx context.Context, _ []byte) {
a.Log.Info(ctx, "received 'prebuild claimed' event for workspace, pushing down new manifest", slog.F("workspace_id", a.WorkspaceID.String()))
select {
case <-streamCtx.Done():
return
case <-ctx.Done():
return
case updates <- struct{}{}:
}
})
if err != nil {
return xerrors.Errorf("subscribe to 'prebuild claimed' event: %w", err)
}
defer unsub()
for { for {
resp, err := a.GetManifest(streamCtx, in) manifest, err := a.GetManifest(streamCtx, in)
if err != nil { if err != nil {
return xerrors.Errorf("receive manifest: %w", err) return xerrors.Errorf("receive manifest: %w", err)
} }
err = stream.Send(resp) a.Log.Debug(streamCtx, "pushing manifest to workspace", slog.F("workspace_id", a.WorkspaceID))
// Send first retrieved manifest.
err = stream.Send(manifest)
if err != nil { if err != nil {
return xerrors.Errorf("send manifest: %w", err) return xerrors.Errorf("send manifest: %w", err)
} }
time.Sleep(time.Second * 5) // ...then wait until triggered by prebuild claim completion.
// At this stage, a prebuild will have been claimed by a user and the agent will need to be reconfigured.
select {
case <-updates:
a.Log.Info(streamCtx, "received manifest update request", slog.F("workspace_id", a.WorkspaceID))
case <-streamCtx.Done():
return xerrors.Errorf("stream close: %w", streamCtx.Err())
} }
}
}
func ManifestUpdateChannel(id uuid.UUID) string {
return fmt.Sprintf("prebuild_claimed_%s", id)
} }
func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifestRequest) (*agentproto.Manifest, error) { func (a *ManifestAPI) GetManifest(ctx context.Context, _ *agentproto.GetManifestRequest) (*agentproto.Manifest, error) {

View File

@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/coder/coder/v2/coderd/agentapi"
"net/http" "net/http"
"net/url" "net/url"
"reflect" "reflect"
@ -1707,6 +1708,17 @@ func (s *server) CompleteJob(ctx context.Context, completed *proto.CompletedJob)
if err != nil { if err != nil {
return nil, xerrors.Errorf("update workspace: %w", err) return nil, xerrors.Errorf("update workspace: %w", err)
} }
// If this job was initiated by the prebuilds user and the job is not a prebuild, then it MUST be the claim run.
// TODO: maybe add some specific metadata to indicate this rather than imputing it.
if input.IsPrebuildClaimByUser != uuid.Nil {
s.Logger.Info(ctx, "workspace prebuild successfully claimed by user",
slog.F("user", input.IsPrebuildClaimByUser.String()),
slog.F("workspace_id", workspace.ID))
if err := s.Pubsub.Publish(agentapi.ManifestUpdateChannel(workspace.ID), nil); err != nil {
s.Logger.Error(ctx, "failed to publish message to workspace agent to pull new manifest", slog.Error(err))
}
}
case *proto.CompletedJob_TemplateDryRun_: case *proto.CompletedJob_TemplateDryRun_:
for _, resource := range jobType.TemplateDryRun.Resources { for _, resource := range jobType.TemplateDryRun.Resources {
s.Logger.Info(ctx, "inserting template dry-run job resource", s.Logger.Info(ctx, "inserting template dry-run job resource",
@ -2359,6 +2371,7 @@ type WorkspaceProvisionJob struct {
WorkspaceBuildID uuid.UUID `json:"workspace_build_id"` WorkspaceBuildID uuid.UUID `json:"workspace_build_id"`
DryRun bool `json:"dry_run"` DryRun bool `json:"dry_run"`
IsPrebuild bool `json:"is_prebuild,omitempty"` IsPrebuild bool `json:"is_prebuild,omitempty"`
IsPrebuildClaimByUser uuid.UUID `json:"is_prebuild_claim_by,omitempty"`
// RunningWorkspaceAgentID is *only* used for prebuilds. We pass it down when we want to rebuild a prebuilt workspace // RunningWorkspaceAgentID is *only* used for prebuilds. We pass it down when we want to rebuild a prebuilt workspace
// but not generate a new agent token. The provisionerdserver will retrieve this token and push it down to // but not generate a new agent token. The provisionerdserver will retrieve this token and push it down to
// the provisioner (and ultimately to the `coder_agent` resource in the Terraform provider) where it will be // the provisioner (and ultimately to the `coder_agent` resource in the Terraform provider) where it will be

View File

@ -685,6 +685,7 @@ func createWorkspace(
if claimedWorkspace != nil { if claimedWorkspace != nil {
workspaceID = claimedWorkspace.ID workspaceID = claimedWorkspace.ID
initiatorID = prebuilds.PrebuildOwnerUUID
} else { } else {
// Workspaces are created without any versions. // Workspaces are created without any versions.
minimumWorkspace, err := db.InsertWorkspace(ctx, database.InsertWorkspaceParams{ minimumWorkspace, err := db.InsertWorkspace(ctx, database.InsertWorkspaceParams{
@ -727,6 +728,10 @@ func createWorkspace(
builder = builder.VersionID(req.TemplateVersionID) builder = builder.VersionID(req.TemplateVersionID)
} }
if claimedWorkspace != nil {
builder = builder.MarkPrebuildClaimBy(owner.ID)
}
workspaceBuild, provisionerJob, provisionerDaemons, err = builder.Build( workspaceBuild, provisionerJob, provisionerDaemons, err = builder.Build(
ctx, ctx,
db, db,

View File

@ -72,7 +72,9 @@ type Builder struct {
lastBuildJob *database.ProvisionerJob lastBuildJob *database.ProvisionerJob
parameterNames *[]string parameterNames *[]string
parameterValues *[]string parameterValues *[]string
prebuild bool prebuild bool
prebuildClaimBy uuid.UUID
runningWorkspaceAgentID uuid.UUID runningWorkspaceAgentID uuid.UUID
verifyNoLegacyParametersOnce bool verifyNoLegacyParametersOnce bool
@ -176,6 +178,12 @@ func (b Builder) MarkPrebuild() Builder {
return b return b
} }
func (b Builder) MarkPrebuildClaimBy(userID uuid.UUID) Builder {
// nolint: revive
b.prebuildClaimBy = userID
return b
}
// RunningWorkspaceAgentID is only used for prebuilds; see the associated field in `provisionerdserver.WorkspaceProvisionJob`. // RunningWorkspaceAgentID is only used for prebuilds; see the associated field in `provisionerdserver.WorkspaceProvisionJob`.
func (b Builder) RunningWorkspaceAgentID(id uuid.UUID) Builder { func (b Builder) RunningWorkspaceAgentID(id uuid.UUID) Builder {
// nolint: revive // nolint: revive
@ -311,6 +319,7 @@ func (b *Builder) buildTx(authFunc func(action policy.Action, object rbac.Object
WorkspaceBuildID: workspaceBuildID, WorkspaceBuildID: workspaceBuildID,
LogLevel: b.logLevel, LogLevel: b.logLevel,
IsPrebuild: b.prebuild, IsPrebuild: b.prebuild,
IsPrebuildClaimByUser: b.prebuildClaimBy,
RunningWorkspaceAgentID: b.runningWorkspaceAgentID, RunningWorkspaceAgentID: b.runningWorkspaceAgentID,
}) })
if err != nil { if err != nil {

View File

@ -220,7 +220,7 @@ func (c *Client) ConnectRPC22(ctx context.Context) (
// ConnectRPC23 returns a dRPC client to the Agent API v2.3. It is useful when you want to be // ConnectRPC23 returns a dRPC client to the Agent API v2.3. It is useful when you want to be
// maximally compatible with Coderd Release Versions from 2.18+ // maximally compatible with Coderd Release Versions from 2.18+
func (c *Client) ConnectRPC23(ctx context.Context) ( func (c *Client) ConnectRPC23(ctx context.Context) (
proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error, proto.DRPCAgentClient23, tailnetproto.DRPCTailnetClient23, error,
) { ) {
conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 3)) conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 3))
if err != nil { if err != nil {
@ -229,6 +229,18 @@ func (c *Client) ConnectRPC23(ctx context.Context) (
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
} }
// ConnectRPC24 returns a dRPC client to the Agent API v2.4. It is useful when you want to be
// maximally compatible with Coderd Release Versions from 2.18+ // TODO update release
func (c *Client) ConnectRPC24(ctx context.Context) (
proto.DRPCAgentClient24, tailnetproto.DRPCTailnetClient23, error,
) {
conn, err := c.connectRPCVersion(ctx, apiversion.New(2, 4))
if err != nil {
return nil, nil, err
}
return proto.NewDRPCAgentClient(conn), tailnetproto.NewDRPCTailnetClient(conn), nil
}
// ConnectRPC connects to the workspace agent API and tailnet API // ConnectRPC connects to the workspace agent API and tailnet API
func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) { func (c *Client) ConnectRPC(ctx context.Context) (drpc.Conn, error) {
return c.connectRPCVersion(ctx, proto.CurrentVersion) return c.connectRPCVersion(ctx, proto.CurrentVersion)

View File

@ -40,7 +40,7 @@ import (
// ScriptCompleted, but be prepared to process "unsupported" errors.) // ScriptCompleted, but be prepared to process "unsupported" errors.)
const ( const (
CurrentMajor = 2 CurrentMajor = 2
CurrentMinor = 3 CurrentMinor = 4
) )
var CurrentVersion = apiversion.New(CurrentMajor, CurrentMinor) var CurrentVersion = apiversion.New(CurrentMajor, CurrentMinor)