Enable reconciliator on entitlements change

Signed-off-by: Danny Kopping <dannykopping@gmail.com>
This commit is contained in:
Danny Kopping
2025-03-04 09:55:39 +00:00
parent df743e6961
commit c0f81d03d4
7 changed files with 155 additions and 95 deletions

View File

@ -3,24 +3,18 @@ package prebuilds
import ( import (
"context" "context"
"github.com/coder/coder/v2/coderd/database"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/coder/coder/v2/coderd/database"
) )
type Reconciler interface {
RunLoop(ctx context.Context)
Stop(ctx context.Context, cause error)
ReconcileAll(ctx context.Context) error
}
type Claimer interface { type Claimer interface {
Claim(ctx context.Context, store database.Store, userID uuid.UUID, name string, presetID uuid.UUID) (*uuid.UUID, error) Claim(ctx context.Context, store database.Store, userID uuid.UUID, name string, presetID uuid.UUID) (*uuid.UUID, error)
Initiator() uuid.UUID Initiator() uuid.UUID
} }
type AGPLPrebuildClaimer struct{}
func (c AGPLPrebuildClaimer) Claim(context.Context, database.Store, uuid.UUID, string, uuid.UUID) (*uuid.UUID, error) {
// Not entitled to claim prebuilds in AGPL version.
return nil, nil
}
func (c AGPLPrebuildClaimer) Initiator() uuid.UUID {
return uuid.Nil
}
var DefaultClaimer Claimer = AGPLPrebuildClaimer{}

22
coderd/prebuilds/claim.go Normal file
View File

@ -0,0 +1,22 @@
package prebuilds
import (
"context"
"github.com/google/uuid"
"github.com/coder/coder/v2/coderd/database"
)
type AGPLPrebuildClaimer struct{}
func (c AGPLPrebuildClaimer) Claim(context.Context, database.Store, uuid.UUID, string, uuid.UUID) (*uuid.UUID, error) {
// Not entitled to claim prebuilds in AGPL version.
return nil, nil
}
func (c AGPLPrebuildClaimer) Initiator() uuid.UUID {
return uuid.Nil
}
var DefaultClaimer Claimer = AGPLPrebuildClaimer{}

View File

@ -0,0 +1,18 @@
package prebuilds
import (
"context"
)
type noopReconciler struct{}
func NewNoopReconciler() Reconciler {
return &noopReconciler{}
}
func (noopReconciler) RunLoop(context.Context) {}
func (noopReconciler) Stop(context.Context, error) {}
func (noopReconciler) ReconcileAll(context.Context) error { return nil }
func (noopReconciler) ReconcileTemplate() error { return nil }
var _ Reconciler = noopReconciler{}

View File

@ -3,6 +3,7 @@ package coderd
import ( import (
"context" "context"
"crypto/ed25519" "crypto/ed25519"
"errors"
"fmt" "fmt"
"math" "math"
"net/http" "net/http"
@ -583,23 +584,6 @@ func New(ctx context.Context, options *Options) (_ *API, err error) {
} }
go api.runEntitlementsLoop(ctx) go api.runEntitlementsLoop(ctx)
if api.AGPL.Experiments.Enabled(codersdk.ExperimentWorkspacePrebuilds) {
// TODO: future enhancement, start this up without restarting coderd when entitlement is updated.
if !api.Entitlements.Enabled(codersdk.FeatureWorkspacePrebuilds) {
options.Logger.Warn(ctx, "prebuilds experiment enabled but not entitled to use")
} else {
api.prebuildsController = prebuilds.NewController(options.Database, options.Pubsub, options.DeploymentValues.Prebuilds, options.Logger.Named("prebuilds.controller"))
go api.prebuildsController.Loop(ctx)
prebuildMetricsCollector := prebuilds.NewMetricsCollector(options.Database, options.Logger)
// should this be api.prebuild...
err = api.PrometheusRegistry.Register(prebuildMetricsCollector)
if err != nil {
return nil, xerrors.Errorf("unable to register prebuilds metrics collector: %w", err)
}
}
}
return api, nil return api, nil
} }
@ -654,7 +638,8 @@ type API struct {
licenseMetricsCollector *license.MetricsCollector licenseMetricsCollector *license.MetricsCollector
tailnetService *tailnet.ClientService tailnetService *tailnet.ClientService
prebuildsController *prebuilds.Controller prebuildsReconciler agplprebuilds.Reconciler
prebuildsMetricsCollector *prebuilds.MetricsCollector
} }
// writeEntitlementWarningsHeader writes the entitlement warnings to the response header // writeEntitlementWarningsHeader writes the entitlement warnings to the response header
@ -686,8 +671,10 @@ func (api *API) Close() error {
api.Options.CheckInactiveUsersCancelFunc() api.Options.CheckInactiveUsersCancelFunc()
} }
if api.prebuildsController != nil { if api.prebuildsReconciler != nil {
api.prebuildsController.Close(xerrors.New("api closed")) // TODO: determine root cause (requires changes up the stack, though). ctx, giveUp := context.WithTimeoutCause(context.Background(), time.Second*30, errors.New("gave up waiting for reconciler to stop"))
defer giveUp()
api.prebuildsReconciler.Stop(ctx, xerrors.New("api closed")) // TODO: determine root cause (requires changes up the stack, though).
} }
return api.AGPL.Close() return api.AGPL.Close()
@ -893,11 +880,22 @@ func (api *API) updateEntitlements(ctx context.Context) error {
} }
if initial, changed, enabled := featureChanged(codersdk.FeatureWorkspacePrebuilds); shouldUpdate(initial, changed, enabled) { if initial, changed, enabled := featureChanged(codersdk.FeatureWorkspacePrebuilds); shouldUpdate(initial, changed, enabled) {
c := agplprebuilds.DefaultClaimer reconciler, claimer, metrics := api.setupPrebuilds(enabled)
if enabled { if api.prebuildsReconciler != nil {
c = prebuilds.EnterpriseClaimer{} stopCtx, giveUp := context.WithTimeoutCause(context.Background(), time.Second*30, errors.New("gave up waiting for reconciler to stop"))
defer giveUp()
api.prebuildsReconciler.Stop(stopCtx, errors.New("entitlements change"))
} }
api.AGPL.PrebuildsClaimer.Store(&c)
// Only register metrics once.
if api.prebuildsMetricsCollector != nil {
api.prebuildsMetricsCollector = metrics
}
api.prebuildsReconciler = reconciler
go reconciler.RunLoop(context.Background())
api.AGPL.PrebuildsClaimer.Store(&claimer)
} }
// External token encryption is soft-enforced // External token encryption is soft-enforced
@ -1168,3 +1166,25 @@ func (api *API) runEntitlementsLoop(ctx context.Context) {
func (api *API) Authorize(r *http.Request, action policy.Action, object rbac.Objecter) bool { func (api *API) Authorize(r *http.Request, action policy.Action, object rbac.Objecter) bool {
return api.AGPL.HTTPAuth.Authorize(r, action, object) return api.AGPL.HTTPAuth.Authorize(r, action, object)
} }
func (api *API) setupPrebuilds(entitled bool) (agplprebuilds.Reconciler, agplprebuilds.Claimer, *prebuilds.MetricsCollector) {
enabled := api.AGPL.Experiments.Enabled(codersdk.ExperimentWorkspacePrebuilds)
if !enabled || !entitled {
api.Logger.Debug(context.Background(), "prebuilds not enabled",
slog.F("experiment_enabled", enabled), slog.F("entitled", entitled))
return agplprebuilds.NewNoopReconciler(), agplprebuilds.DefaultClaimer, nil
}
logger := api.Logger.Named("prebuilds.metrics")
collector := prebuilds.NewMetricsCollector(api.Database, logger)
err := api.PrometheusRegistry.Register(collector)
if err != nil {
logger.Error(context.Background(), "failed to register prebuilds metrics collector", slog.F("error", err))
collector = nil
}
return prebuilds.NewStoreReconciler(api.Database, api.Pubsub, api.DeploymentValues.Prebuilds, api.Logger.Named("prebuilds")),
prebuilds.EnterpriseClaimer{},
collector
}

View File

@ -85,7 +85,7 @@ func TestClaimPrebuild(t *testing.T) {
}, },
}) })
controller := prebuilds.NewController(spy, pubsub, codersdk.PrebuildsConfig{}, testutil.Logger(t)) controller := prebuilds.NewStoreReconciler(spy, pubsub, codersdk.PrebuildsConfig{}, testutil.Logger(t))
const ( const (
desiredInstances = 1 desiredInstances = 1

View File

@ -33,7 +33,7 @@ func TestNoReconciliationActionsIfNoPresets(t *testing.T) {
ReconciliationInterval: serpent.Duration(testutil.WaitLong), ReconciliationInterval: serpent.Duration(testutil.WaitLong),
} }
logger := testutil.Logger(t) logger := testutil.Logger(t)
controller := prebuilds.NewController(db, pubsub, cfg, logger) controller := prebuilds.NewStoreReconciler(db, pubsub, cfg, logger)
// given a template version with no presets // given a template version with no presets
org := dbgen.Organization(t, db, database.Organization{}) org := dbgen.Organization(t, db, database.Organization{})
@ -77,7 +77,7 @@ func TestNoReconciliationActionsIfNoPrebuilds(t *testing.T) {
ReconciliationInterval: serpent.Duration(testutil.WaitLong), ReconciliationInterval: serpent.Duration(testutil.WaitLong),
} }
logger := testutil.Logger(t) logger := testutil.Logger(t)
controller := prebuilds.NewController(db, pubsub, cfg, logger) controller := prebuilds.NewStoreReconciler(db, pubsub, cfg, logger)
// given there are presets, but no prebuilds // given there are presets, but no prebuilds
org := dbgen.Organization(t, db, database.Organization{}) org := dbgen.Organization(t, db, database.Organization{})
@ -302,7 +302,7 @@ func TestActiveTemplateVersionPrebuilds(t *testing.T) {
db, pubsub := dbtestutil.NewDB(t) db, pubsub := dbtestutil.NewDB(t)
cfg := codersdk.PrebuildsConfig{} cfg := codersdk.PrebuildsConfig{}
logger := testutil.Logger(t) logger := testutil.Logger(t)
controller := prebuilds.NewController(db, pubsub, cfg, logger) controller := prebuilds.NewStoreReconciler(db, pubsub, cfg, logger)
orgID, userID, templateID := setupTestDBTemplate(t, db) orgID, userID, templateID := setupTestDBTemplate(t, db)
_, _, prebuildID := setupTestDBPrebuild( _, _, prebuildID := setupTestDBPrebuild(
@ -346,7 +346,7 @@ func TestInactiveTemplateVersionPrebuilds(t *testing.T) {
db, pubsub := dbtestutil.NewDB(t) db, pubsub := dbtestutil.NewDB(t)
cfg := codersdk.PrebuildsConfig{} cfg := codersdk.PrebuildsConfig{}
logger := testutil.Logger(t) logger := testutil.Logger(t)
controller := prebuilds.NewController(db, pubsub, cfg, logger) controller := prebuilds.NewStoreReconciler(db, pubsub, cfg, logger)
// when does a prebuild get deleted? // when does a prebuild get deleted?
// * when it is in some way permanently ineligible to be claimed // * when it is in some way permanently ineligible to be claimed

View File

@ -18,6 +18,7 @@ import (
"github.com/coder/coder/v2/coderd/database/dbtime" "github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/provisionerjobs" "github.com/coder/coder/v2/coderd/database/provisionerjobs"
"github.com/coder/coder/v2/coderd/database/pubsub" "github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/prebuilds"
"github.com/coder/coder/v2/coderd/rbac" "github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy" "github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/wsbuilder" "github.com/coder/coder/v2/coderd/wsbuilder"
@ -30,34 +31,40 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
) )
type Controller struct { type storeReconciler struct {
store database.Store store database.Store
cfg codersdk.PrebuildsConfig cfg codersdk.PrebuildsConfig
pubsub pubsub.Pubsub pubsub pubsub.Pubsub
logger slog.Logger logger slog.Logger
nudgeCh chan *uuid.UUID
cancelFn context.CancelCauseFunc cancelFn context.CancelCauseFunc
closed atomic.Bool stopped atomic.Bool
done chan struct{}
} }
func NewController(store database.Store, pubsub pubsub.Pubsub, cfg codersdk.PrebuildsConfig, logger slog.Logger) *Controller { func NewStoreReconciler(store database.Store, pubsub pubsub.Pubsub, cfg codersdk.PrebuildsConfig, logger slog.Logger) prebuilds.Reconciler {
return &Controller{ return &storeReconciler{
store: store, store: store,
pubsub: pubsub, pubsub: pubsub,
logger: logger, logger: logger,
cfg: cfg, cfg: cfg,
nudgeCh: make(chan *uuid.UUID, 1), done: make(chan struct{}, 1),
} }
} }
func (c *Controller) Loop(ctx context.Context) error { func (c *storeReconciler) RunLoop(ctx context.Context) {
reconciliationInterval := c.cfg.ReconciliationInterval.Value() reconciliationInterval := c.cfg.ReconciliationInterval.Value()
if reconciliationInterval <= 0 { // avoids a panic if reconciliationInterval <= 0 { // avoids a panic
reconciliationInterval = 5 * time.Minute reconciliationInterval = 5 * time.Minute
} }
c.logger.Info(ctx, "starting reconciler", slog.F("interval", reconciliationInterval))
ticker := time.NewTicker(reconciliationInterval) ticker := time.NewTicker(reconciliationInterval)
defer ticker.Stop() defer ticker.Stop()
defer func() {
c.done <- struct{}{}
}()
// TODO: create new authz role // TODO: create new authz role
ctx, cancel := context.WithCancelCause(dbauthz.AsSystemRestricted(ctx)) ctx, cancel := context.WithCancelCause(dbauthz.AsSystemRestricted(ctx))
@ -65,39 +72,47 @@ func (c *Controller) Loop(ctx context.Context) error {
for { for {
select { select {
// Accept nudges from outside the control loop to trigger a new iteration. // TODO: implement pubsub listener to allow reconciling a specific template imperatively once it has been changed,
case template := <-c.nudgeCh: // instead of waiting for the next reconciliation interval
c.Reconcile(ctx, template)
// Trigger a new iteration on each tick.
case <-ticker.C: case <-ticker.C:
c.Reconcile(ctx, nil) // Trigger a new iteration on each tick.
err := c.ReconcileAll(ctx)
if err != nil {
c.logger.Error(context.Background(), "reconciliation failed", slog.Error(err))
}
case <-ctx.Done(): case <-ctx.Done():
c.logger.Error(context.Background(), "prebuilds reconciliation loop exited", slog.Error(ctx.Err()), slog.F("cause", context.Cause(ctx))) c.logger.Warn(context.Background(), "reconciliation loop exited", slog.Error(ctx.Err()), slog.F("cause", context.Cause(ctx)))
return ctx.Err() return
} }
} }
} }
func (c *Controller) Close(cause error) { func (c *storeReconciler) Stop(ctx context.Context, cause error) {
if c.isClosed() { c.logger.Warn(context.Background(), "stopping reconciler", slog.F("cause", cause))
if c.isStopped() {
return return
} }
c.closed.Store(true) c.stopped.Store(true)
if c.cancelFn != nil { if c.cancelFn != nil {
c.cancelFn(cause) c.cancelFn(cause)
} }
select {
// Give up waiting for control loop to exit.
case <-ctx.Done():
c.logger.Error(context.Background(), "reconciler stop exited prematurely", slog.Error(ctx.Err()), slog.F("cause", context.Cause(ctx)))
// Wait for the control loop to exit.
case <-c.done:
c.logger.Info(context.Background(), "reconciler stopped")
}
} }
func (c *Controller) isClosed() bool { func (c *storeReconciler) isStopped() bool {
return c.closed.Load() return c.stopped.Load()
} }
func (c *Controller) ReconcileTemplate(templateID *uuid.UUID) { // ReconcileAll will attempt to resolve the desired vs actual state of all templates which have presets with prebuilds configured.
// TODO: replace this with pubsub listening
c.nudgeCh <- templateID
}
// Reconcile will attempt to resolve the desired vs actual state of all templates which have presets with prebuilds configured.
// //
// NOTE: // NOTE:
// //
@ -113,18 +128,13 @@ func (c *Controller) ReconcileTemplate(templateID *uuid.UUID) {
// be reconciled again, leading to another workspace being provisioned. Two workspace builds will be occurring // be reconciled again, leading to another workspace being provisioned. Two workspace builds will be occurring
// simultaneously for the same preset, but once both jobs have completed the reconciliation loop will notice the // simultaneously for the same preset, but once both jobs have completed the reconciliation loop will notice the
// extraneous instance and delete it. // extraneous instance and delete it.
func (c *Controller) Reconcile(ctx context.Context, templateID *uuid.UUID) { func (c *storeReconciler) ReconcileAll(ctx context.Context) error {
var logger slog.Logger logger := c.logger.With(slog.F("reconcile_context", "all"))
if templateID == nil {
logger = c.logger.With(slog.F("reconcile_context", "all"))
} else {
logger = c.logger.With(slog.F("reconcile_context", "specific"), slog.F("template_id", templateID.String()))
}
select { select {
case <-ctx.Done(): case <-ctx.Done():
logger.Warn(context.Background(), "reconcile exiting prematurely; context done", slog.Error(ctx.Err())) logger.Warn(context.Background(), "reconcile exiting prematurely; context done", slog.Error(ctx.Err()))
return return nil
default: default:
} }
@ -144,13 +154,13 @@ func (c *Controller) Reconcile(ctx context.Context, templateID *uuid.UUID) {
// TODO: use TryAcquireLock here and bail out early. // TODO: use TryAcquireLock here and bail out early.
err := db.AcquireLock(ctx, database.LockIDReconcileTemplatePrebuilds) err := db.AcquireLock(ctx, database.LockIDReconcileTemplatePrebuilds)
if err != nil { if err != nil {
logger.Warn(ctx, "failed to acquire top-level prebuilds reconciliation lock; likely running on another coderd replica", slog.Error(err)) logger.Warn(ctx, "failed to acquire top-level reconciliation lock; likely running on another coderd replica", slog.Error(err))
return nil return nil
} }
logger.Debug(ctx, "acquired top-level prebuilds reconciliation lock", slog.F("acquire_wait_secs", fmt.Sprintf("%.4f", time.Since(start).Seconds()))) logger.Debug(ctx, "acquired top-level reconciliation lock", slog.F("acquire_wait_secs", fmt.Sprintf("%.4f", time.Since(start).Seconds())))
state, err := c.determineState(ctx, db, templateID) state, err := c.determineState(ctx, db)
if err != nil { if err != nil {
return xerrors.Errorf("determine current state: %w", err) return xerrors.Errorf("determine current state: %w", err)
} }
@ -194,11 +204,13 @@ func (c *Controller) Reconcile(ctx context.Context, templateID *uuid.UUID) {
if err != nil { if err != nil {
logger.Error(ctx, "failed to reconcile", slog.Error(err)) logger.Error(ctx, "failed to reconcile", slog.Error(err))
} }
return err
} }
// determineState determines the current state of prebuilds & the presets which define them. // determineState determines the current state of prebuilds & the presets which define them.
// An application-level lock is used // An application-level lock is used
func (c *Controller) determineState(ctx context.Context, store database.Store, templateId *uuid.UUID) (*reconciliationState, error) { func (c *storeReconciler) determineState(ctx context.Context, store database.Store) (*reconciliationState, error) {
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
return nil, err return nil, err
} }
@ -216,13 +228,7 @@ func (c *Controller) determineState(ctx context.Context, store database.Store, t
c.logger.Debug(ctx, "acquired state determination lock", slog.F("acquire_wait_secs", fmt.Sprintf("%.4f", time.Since(start).Seconds()))) c.logger.Debug(ctx, "acquired state determination lock", slog.F("acquire_wait_secs", fmt.Sprintf("%.4f", time.Since(start).Seconds())))
var dbTemplateID uuid.NullUUID presetsWithPrebuilds, err := db.GetTemplatePresetsWithPrebuilds(ctx, uuid.NullUUID{}) // TODO: implement template-specific reconciliations later
if templateId != nil {
dbTemplateID.UUID = *templateId
dbTemplateID.Valid = true
}
presetsWithPrebuilds, err := db.GetTemplatePresetsWithPrebuilds(ctx, dbTemplateID)
if err != nil { if err != nil {
return xerrors.Errorf("failed to get template presets with prebuilds: %w", err) return xerrors.Errorf("failed to get template presets with prebuilds: %w", err)
} }
@ -251,7 +257,7 @@ func (c *Controller) determineState(ctx context.Context, store database.Store, t
return &state, err return &state, err
} }
func (c *Controller) reconcilePrebuildsForPreset(ctx context.Context, ps *presetState) error { func (c *storeReconciler) reconcilePrebuildsForPreset(ctx context.Context, ps *presetState) error {
if ps == nil { if ps == nil {
return xerrors.Errorf("unexpected nil preset state") return xerrors.Errorf("unexpected nil preset state")
} }
@ -311,7 +317,7 @@ func (c *Controller) reconcilePrebuildsForPreset(ctx context.Context, ps *preset
return lastErr.ErrorOrNil() return lastErr.ErrorOrNil()
} }
func (c *Controller) createPrebuild(ctx context.Context, prebuildID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID) error { func (c *storeReconciler) createPrebuild(ctx context.Context, prebuildID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID) error {
name, err := generateName() name, err := generateName()
if err != nil { if err != nil {
return xerrors.Errorf("failed to generate unique prebuild ID: %w", err) return xerrors.Errorf("failed to generate unique prebuild ID: %w", err)
@ -351,7 +357,7 @@ func (c *Controller) createPrebuild(ctx context.Context, prebuildID uuid.UUID, t
return c.provision(ctx, prebuildID, template, presetID, database.WorkspaceTransitionStart, workspace) return c.provision(ctx, prebuildID, template, presetID, database.WorkspaceTransitionStart, workspace)
} }
func (c *Controller) deletePrebuild(ctx context.Context, prebuildID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID) error { func (c *storeReconciler) deletePrebuild(ctx context.Context, prebuildID uuid.UUID, templateID uuid.UUID, presetID uuid.UUID) error {
workspace, err := c.store.GetWorkspaceByID(ctx, prebuildID) workspace, err := c.store.GetWorkspaceByID(ctx, prebuildID)
if err != nil { if err != nil {
return xerrors.Errorf("get workspace by ID: %w", err) return xerrors.Errorf("get workspace by ID: %w", err)
@ -368,7 +374,7 @@ func (c *Controller) deletePrebuild(ctx context.Context, prebuildID uuid.UUID, t
return c.provision(ctx, prebuildID, template, presetID, database.WorkspaceTransitionDelete, workspace) return c.provision(ctx, prebuildID, template, presetID, database.WorkspaceTransitionDelete, workspace)
} }
func (c *Controller) provision(ctx context.Context, prebuildID uuid.UUID, template database.Template, presetID uuid.UUID, transition database.WorkspaceTransition, workspace database.Workspace) error { func (c *storeReconciler) provision(ctx context.Context, prebuildID uuid.UUID, template database.Template, presetID uuid.UUID, transition database.WorkspaceTransition, workspace database.Workspace) error {
tvp, err := c.store.GetPresetParametersByTemplateVersionID(ctx, template.ActiveVersionID) tvp, err := c.store.GetPresetParametersByTemplateVersionID(ctx, template.ActiveVersionID)
if err != nil { if err != nil {
return xerrors.Errorf("fetch preset details: %w", err) return xerrors.Errorf("fetch preset details: %w", err)