Files
coder/coderd/prebuilds/controller.go
Danny Kopping a59a03d5f4 Add provision job metadata to identify prebuilds
Signed-off-by: Danny Kopping <danny@coder.com>
2025-02-14 12:17:06 +00:00

357 lines
11 KiB
Go

package prebuilds
import (
"bytes"
"context"
"fmt"
"github.com/coder/coder/v2/coderd/audit"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/provisionerjobs"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/wsbuilder"
"math"
"time"
"cdr.dev/slog"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)
type Controller struct {
store database.Store
pubsub pubsub.Pubsub
logger slog.Logger
nudgeCh chan *uuid.UUID
closeCh chan struct{}
}
func NewController(store database.Store, pubsub pubsub.Pubsub, logger slog.Logger) *Controller {
return &Controller{
store: store,
pubsub: pubsub,
logger: logger,
nudgeCh: make(chan *uuid.UUID, 1),
closeCh: make(chan struct{}, 1),
}
}
func (c Controller) Loop(ctx context.Context) {
ticker := time.NewTicker(time.Second * 5) // TODO: configurable? 1m probably lowest valid value
defer ticker.Stop()
// TODO: create new authz role
ctx = dbauthz.AsSystemRestricted(ctx)
// TODO: bounded concurrency?
var eg errgroup.Group
for {
select {
// Accept nudges from outside the control loop to trigger a new iteration.
case template := <-c.nudgeCh:
eg.Go(func() error {
c.reconcile(ctx, template)
return nil
})
// Trigger a new iteration on each tick.
case <-ticker.C:
eg.Go(func() error {
c.reconcile(ctx, nil)
return nil
})
case <-c.closeCh:
c.logger.Info(ctx, "control loop stopped")
goto wait
case <-ctx.Done():
c.logger.Error(context.Background(), "control loop exited: %w", ctx.Err())
goto wait
}
}
// TODO: no gotos
wait:
_ = eg.Wait()
}
func (c Controller) ReconcileTemplate(templateID uuid.UUID) {
// TODO: replace this with pubsub listening
c.nudgeCh <- &templateID
}
func (c Controller) reconcile(ctx context.Context, templateID *uuid.UUID) {
var logger slog.Logger
if templateID == nil {
logger = c.logger.With(slog.F("reconcile_context", "all"))
} else {
logger = c.logger.With(slog.F("reconcile_context", "specific"), slog.F("template_id", templateID.String()))
}
select {
case <-ctx.Done():
logger.Warn(context.Background(), "reconcile exiting prematurely; context done", slog.Error(ctx.Err()))
return
default:
}
// get all templates or specific one requested
err := c.store.InTx(func(db database.Store) error {
err := db.AcquireLock(ctx, database.LockIDReconcileTemplatePrebuilds)
if err != nil {
logger.Warn(ctx, "failed to acquire top-level prebuilds lock; likely running on another coderd replica", slog.Error(err))
return nil
}
innerCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
var ids []uuid.UUID
if templateID != nil {
ids = append(ids, *templateID)
}
templates, err := db.GetTemplatesWithFilter(innerCtx, database.GetTemplatesWithFilterParams{
IDs: ids,
})
if err != nil {
c.logger.Debug(innerCtx, "could not fetch template(s)")
return xerrors.Errorf("fetch template(s): %w", err)
}
if len(templates) == 0 {
c.logger.Debug(innerCtx, "no templates found")
return nil
}
// TODO: bounded concurrency? probably not but consider
var eg errgroup.Group
for _, template := range templates {
eg.Go(func() error {
// Pass outer context.
// TODO: name these better to avoid the comment.
return c.reconcileTemplate(ctx, template)
})
}
return eg.Wait()
}, &database.TxOptions{
// TODO: isolation
ReadOnly: true,
TxIdentifier: "template_prebuilds",
})
if err != nil {
logger.Error(ctx, "failed to acquire database transaction", slog.Error(err))
}
}
type reconciliationActions struct {
deleteIDs []uuid.UUID
createIDs []uuid.UUID
meta database.GetTemplatePrebuildStateRow
}
// calculateActions MUST be called within the context of a transaction (TODO: isolation)
// with an advisory lock to prevent TOCTOU races.
func (c Controller) calculateActions(ctx context.Context, template database.Template, state database.GetTemplatePrebuildStateRow) (*reconciliationActions, error) {
toCreate := int(math.Max(0, float64(state.Desired-(state.Actual+state.Starting))))
toDelete := int(math.Max(0, float64(state.Extraneous-state.Deleting-state.Stopping)))
actions := &reconciliationActions{meta: state}
// If the template has become deleted or deprecated since the last reconciliation, we need to ensure we
// scale those prebuilds down to zero.
if state.TemplateDeleted || state.TemplateDeprecated {
toCreate = 0
toDelete = int(state.Actual + state.Extraneous)
}
for i := 0; i < toCreate; i++ {
actions.createIDs = append(actions.createIDs, uuid.New())
}
runningIDs := bytes.Split(state.RunningPrebuildIds, []byte{','})
if toDelete > 0 && len(runningIDs) != toDelete {
c.logger.Warn(ctx, "mismatch between running prebuilds and expected deletion count!",
slog.F("template_id", template.ID), slog.F("running", len(runningIDs)), slog.F("to_destroy", toDelete))
}
// TODO: implement lookup to not perform same action on workspace multiple times in $period
// i.e. a workspace cannot be deleted for some reason, which continually makes it eligible for deletion
for i := 0; i < toDelete; i++ {
if i >= len(runningIDs) {
// Above warning will have already addressed this.
continue
}
running := runningIDs[i]
id, err := uuid.ParseBytes(running)
if err != nil {
c.logger.Warn(ctx, "invalid prebuild ID", slog.F("template_id", template.ID),
slog.F("id", string(running)), slog.Error(err))
continue
}
actions.deleteIDs = append(actions.deleteIDs, id)
}
return actions, nil
}
func (c Controller) reconcileTemplate(ctx context.Context, template database.Template) error {
logger := c.logger.With(slog.F("template_id", template.ID.String()))
// get number of desired vs actual prebuild instances
err := c.store.InTx(func(db database.Store) error {
err := db.AcquireLock(ctx, database.GenLockID(fmt.Sprintf("template:%s", template.ID.String())))
if err != nil {
logger.Warn(ctx, "failed to acquire template prebuilds lock; likely running on another coderd replica", slog.Error(err))
return nil
}
innerCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
// TODO: change to "many" and return 1 or more rows, but only one should be returned
// more than 1 response is indicative of a query problem
state, err := db.GetTemplatePrebuildState(ctx, template.ID)
if err != nil {
return xerrors.Errorf("failed to retrieve template's prebuild states: %w", err)
}
actions, err := c.calculateActions(innerCtx, template, state)
if err != nil {
logger.Error(ctx, "failed to calculate reconciliation actions", slog.Error(err))
return nil
}
// TODO: authz // Can't use existing profiles (i.e. AsSystemRestricted) because of dbauthz rules
var ownerCtx = dbauthz.As(ctx, rbac.Subject{
ID: "owner",
Roles: rbac.RoleIdentifiers{rbac.RoleOwner()},
Groups: []string{},
Scope: rbac.ExpandableScope(rbac.ScopeAll),
})
levelFn := logger.Debug
if len(actions.createIDs) > 0 || len(actions.deleteIDs) > 0 {
// Only log with info level when there's a change that needs to be effected.
levelFn = logger.Info
}
levelFn(innerCtx, "template prebuild state retrieved",
slog.F("to_create", len(actions.createIDs)), slog.F("to_destroy", len(actions.deleteIDs)),
slog.F("desired", actions.meta.Desired), slog.F("actual", actions.meta.Actual), slog.F("extraneous", actions.meta.Extraneous),
slog.F("starting", actions.meta.Starting), slog.F("stopping", actions.meta.Stopping), slog.F("deleting", actions.meta.Deleting))
// Provision workspaces within the same tx so we don't get any timing issues here.
// i.e. we hold the advisory lock until all reconciliatory actions have been taken.
// TODO: max per reconciliation iteration?
for _, id := range actions.createIDs {
if err := c.createPrebuild(ownerCtx, db, id, template); err != nil {
logger.Error(ctx, "failed to create prebuild", slog.Error(err))
}
}
for _, id := range actions.deleteIDs {
if err := c.deletePrebuild(ownerCtx, db, id, template); err != nil {
logger.Error(ctx, "failed to delete prebuild", slog.Error(err))
}
}
return nil
}, &database.TxOptions{
// TODO: isolation
TxIdentifier: "tempdlate_prebuilds",
})
if err != nil {
logger.Error(ctx, "failed to acquire database transaction", slog.Error(err))
}
// trigger n InsertProvisionerJob calls to scale up or down instances
return nil
}
func (c Controller) createPrebuild(ctx context.Context, db database.Store, prebuildID uuid.UUID, template database.Template) error {
name := fmt.Sprintf("prebuild-%s", prebuildID)
now := dbtime.Now()
// Workspaces are created without any versions.
minimumWorkspace, err := db.InsertWorkspace(ctx, database.InsertWorkspaceParams{
ID: prebuildID,
CreatedAt: now,
UpdatedAt: now,
OwnerID: PrebuildOwnerUUID,
OrganizationID: template.OrganizationID,
TemplateID: template.ID,
Name: name,
LastUsedAt: dbtime.Now(),
AutomaticUpdates: database.AutomaticUpdatesNever,
})
if err != nil {
return xerrors.Errorf("insert workspace: %w", err)
}
// We have to refetch the workspace for the joined in fields.
workspace, err := db.GetWorkspaceByID(ctx, minimumWorkspace.ID)
if err != nil {
return xerrors.Errorf("get workspace by ID: %w", err)
}
c.logger.Info(ctx, "attempting to create prebuild", slog.F("name", name), slog.F("workspace_id", prebuildID.String()))
return c.provision(ctx, db, prebuildID, template, database.WorkspaceTransitionStart, workspace)
}
func (c Controller) deletePrebuild(ctx context.Context, db database.Store, prebuildID uuid.UUID, template database.Template) error {
workspace, err := db.GetWorkspaceByID(ctx, prebuildID)
if err != nil {
return xerrors.Errorf("get workspace by ID: %w", err)
}
c.logger.Info(ctx, "attempting to delete prebuild", slog.F("workspace_id", prebuildID.String()))
return c.provision(ctx, db, prebuildID, template, database.WorkspaceTransitionDelete, workspace)
}
func (c Controller) provision(ctx context.Context, db database.Store, prebuildID uuid.UUID, template database.Template, transition database.WorkspaceTransition, workspace database.Workspace) error {
builder := wsbuilder.New(workspace, transition).
Reason(database.BuildReasonInitiator).
Initiator(PrebuildOwnerUUID).
ActiveVersion().
VersionID(template.ActiveVersionID).
MarkPrebuild()
// RichParameterValues(req.RichParameterValues) // TODO: fetch preset's params
_, provisionerJob, _, err := builder.Build(
ctx,
db,
func(action policy.Action, object rbac.Objecter) bool {
return true // TODO: harden?
},
audit.WorkspaceBuildBaggage{},
)
if err != nil {
return xerrors.Errorf("provision workspace: %w", err)
}
err = provisionerjobs.PostJob(c.pubsub, *provisionerJob)
if err != nil {
// Client probably doesn't care about this error, so just log it.
c.logger.Error(ctx, "failed to post provisioner job to pubsub", slog.Error(err))
}
c.logger.Info(ctx, "prebuild job scheduled", slog.F("transition", transition),
slog.F("prebuild_id", prebuildID.String()), slog.F("job_id", provisionerJob.ID))
return nil
}
func (c Controller) Stop() {
c.closeCh <- struct{}{}
}