feat: cancel stuck pending jobs (#17803)

Closes: #16488
This commit is contained in:
Michael Suchacz
2025-05-20 15:22:44 +02:00
committed by GitHub
parent 613117bde2
commit 769c9ee337
23 changed files with 779 additions and 297 deletions

View File

@ -68,6 +68,7 @@ import (
"github.com/coder/coder/v2/coderd/externalauth"
"github.com/coder/coder/v2/coderd/gitsshkey"
"github.com/coder/coder/v2/coderd/httpmw"
"github.com/coder/coder/v2/coderd/jobreaper"
"github.com/coder/coder/v2/coderd/notifications"
"github.com/coder/coder/v2/coderd/notifications/notificationstest"
"github.com/coder/coder/v2/coderd/rbac"
@ -75,7 +76,6 @@ import (
"github.com/coder/coder/v2/coderd/runtimeconfig"
"github.com/coder/coder/v2/coderd/schedule"
"github.com/coder/coder/v2/coderd/telemetry"
"github.com/coder/coder/v2/coderd/unhanger"
"github.com/coder/coder/v2/coderd/updatecheck"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/coderd/webpush"
@ -368,11 +368,11 @@ func NewOptions(t testing.TB, options *Options) (func(http.Handler), context.Can
).WithStatsChannel(options.AutobuildStats)
lifecycleExecutor.Run()
hangDetectorTicker := time.NewTicker(options.DeploymentValues.JobHangDetectorInterval.Value())
defer hangDetectorTicker.Stop()
hangDetector := unhanger.New(ctx, options.Database, options.Pubsub, options.Logger.Named("unhanger.detector"), hangDetectorTicker.C)
hangDetector.Start()
t.Cleanup(hangDetector.Close)
jobReaperTicker := time.NewTicker(options.DeploymentValues.JobReaperDetectorInterval.Value())
defer jobReaperTicker.Stop()
jobReaper := jobreaper.New(ctx, options.Database, options.Pubsub, options.Logger.Named("reaper.detector"), jobReaperTicker.C)
jobReaper.Start()
t.Cleanup(jobReaper.Close)
if options.TelemetryReporter == nil {
options.TelemetryReporter = telemetry.NewNoop()

View File

@ -170,10 +170,10 @@ var (
Identifier: rbac.RoleIdentifier{Name: "provisionerd"},
DisplayName: "Provisioner Daemon",
Site: rbac.Permissions(map[string][]policy.Action{
// TODO: Add ProvisionerJob resource type.
rbac.ResourceFile.Type: {policy.ActionRead},
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
rbac.ResourceTemplate.Type: {policy.ActionRead, policy.ActionUpdate},
rbac.ResourceProvisionerJobs.Type: {policy.ActionRead, policy.ActionUpdate, policy.ActionCreate},
rbac.ResourceFile.Type: {policy.ActionRead},
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
rbac.ResourceTemplate.Type: {policy.ActionRead, policy.ActionUpdate},
// Unsure why provisionerd needs update and read personal
rbac.ResourceUser.Type: {policy.ActionRead, policy.ActionReadPersonal, policy.ActionUpdatePersonal},
rbac.ResourceWorkspaceDormant.Type: {policy.ActionDelete, policy.ActionRead, policy.ActionUpdate, policy.ActionWorkspaceStop},
@ -219,19 +219,20 @@ var (
Scope: rbac.ScopeAll,
}.WithCachedASTValue()
// See unhanger package.
subjectHangDetector = rbac.Subject{
Type: rbac.SubjectTypeHangDetector,
FriendlyName: "Hang Detector",
// See reaper package.
subjectJobReaper = rbac.Subject{
Type: rbac.SubjectTypeJobReaper,
FriendlyName: "Job Reaper",
ID: uuid.Nil.String(),
Roles: rbac.Roles([]rbac.Role{
{
Identifier: rbac.RoleIdentifier{Name: "hangdetector"},
DisplayName: "Hang Detector Daemon",
Identifier: rbac.RoleIdentifier{Name: "jobreaper"},
DisplayName: "Job Reaper Daemon",
Site: rbac.Permissions(map[string][]policy.Action{
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
rbac.ResourceTemplate.Type: {policy.ActionRead},
rbac.ResourceWorkspace.Type: {policy.ActionRead, policy.ActionUpdate},
rbac.ResourceSystem.Type: {policy.WildcardSymbol},
rbac.ResourceTemplate.Type: {policy.ActionRead},
rbac.ResourceWorkspace.Type: {policy.ActionRead, policy.ActionUpdate},
rbac.ResourceProvisionerJobs.Type: {policy.ActionRead, policy.ActionUpdate},
}),
Org: map[string][]rbac.Permission{},
User: []rbac.Permission{},
@ -346,6 +347,7 @@ var (
rbac.ResourceNotificationTemplate.Type: {policy.ActionCreate, policy.ActionUpdate, policy.ActionDelete},
rbac.ResourceCryptoKey.Type: {policy.ActionCreate, policy.ActionUpdate, policy.ActionDelete},
rbac.ResourceFile.Type: {policy.ActionCreate, policy.ActionRead},
rbac.ResourceProvisionerJobs.Type: {policy.ActionRead, policy.ActionUpdate, policy.ActionCreate},
}),
Org: map[string][]rbac.Permission{},
User: []rbac.Permission{},
@ -407,10 +409,10 @@ func AsAutostart(ctx context.Context) context.Context {
return As(ctx, subjectAutostart)
}
// AsHangDetector returns a context with an actor that has permissions required
// for unhanger.Detector to function.
func AsHangDetector(ctx context.Context) context.Context {
return As(ctx, subjectHangDetector)
// AsJobReaper returns a context with an actor that has permissions required
// for reaper.Detector to function.
func AsJobReaper(ctx context.Context) context.Context {
return As(ctx, subjectJobReaper)
}
// AsKeyRotator returns a context with an actor that has permissions required for rotating crypto keys.
@ -1085,11 +1087,10 @@ func (q *querier) AcquireNotificationMessages(ctx context.Context, arg database.
return q.db.AcquireNotificationMessages(ctx, arg)
}
// TODO: We need to create a ProvisionerJob resource type
func (q *querier) AcquireProvisionerJob(ctx context.Context, arg database.AcquireProvisionerJobParams) (database.ProvisionerJob, error) {
// if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
// return database.ProvisionerJob{}, err
// }
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceProvisionerJobs); err != nil {
return database.ProvisionerJob{}, err
}
return q.db.AcquireProvisionerJob(ctx, arg)
}
@ -1912,14 +1913,6 @@ func (q *querier) GetHealthSettings(ctx context.Context) (string, error) {
return q.db.GetHealthSettings(ctx)
}
// TODO: We need to create a ProvisionerJob resource type
func (q *querier) GetHungProvisionerJobs(ctx context.Context, hungSince time.Time) ([]database.ProvisionerJob, error) {
// if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
// return nil, err
// }
return q.db.GetHungProvisionerJobs(ctx, hungSince)
}
func (q *querier) GetInboxNotificationByID(ctx context.Context, id uuid.UUID) (database.InboxNotification, error) {
return fetchWithAction(q.log, q.auth, policy.ActionRead, q.db.GetInboxNotificationByID)(ctx, id)
}
@ -2307,6 +2300,13 @@ func (q *querier) GetProvisionerJobByID(ctx context.Context, id uuid.UUID) (data
return job, nil
}
func (q *querier) GetProvisionerJobByIDForUpdate(ctx context.Context, id uuid.UUID) (database.ProvisionerJob, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceProvisionerJobs); err != nil {
return database.ProvisionerJob{}, err
}
return q.db.GetProvisionerJobByIDForUpdate(ctx, id)
}
func (q *querier) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]database.ProvisionerJobTiming, error) {
_, err := q.GetProvisionerJobByID(ctx, jobID)
if err != nil {
@ -2315,31 +2315,49 @@ func (q *querier) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uui
return q.db.GetProvisionerJobTimingsByJobID(ctx, jobID)
}
// TODO: We have a ProvisionerJobs resource, but it hasn't been checked for this use-case.
func (q *querier) GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]database.ProvisionerJob, error) {
// if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
// return nil, err
// }
return q.db.GetProvisionerJobsByIDs(ctx, ids)
provisionerJobs, err := q.db.GetProvisionerJobsByIDs(ctx, ids)
if err != nil {
return nil, err
}
orgIDs := make(map[uuid.UUID]struct{})
for _, job := range provisionerJobs {
orgIDs[job.OrganizationID] = struct{}{}
}
for orgID := range orgIDs {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceProvisionerJobs.InOrg(orgID)); err != nil {
return nil, err
}
}
return provisionerJobs, nil
}
// TODO: We have a ProvisionerJobs resource, but it hasn't been checked for this use-case.
func (q *querier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
// Details in https://github.com/coder/coder/issues/16160
return q.db.GetProvisionerJobsByIDsWithQueuePosition(ctx, ids)
}
func (q *querier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) {
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
// Details in https://github.com/coder/coder/issues/16160
return fetchWithPostFilter(q.auth, policy.ActionRead, q.db.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner)(ctx, arg)
}
// TODO: We have a ProvisionerJobs resource, but it hasn't been checked for this use-case.
func (q *querier) GetProvisionerJobsCreatedAfter(ctx context.Context, createdAt time.Time) ([]database.ProvisionerJob, error) {
// if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceSystem); err != nil {
// return nil, err
// }
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceProvisionerJobs); err != nil {
return nil, err
}
return q.db.GetProvisionerJobsCreatedAfter(ctx, createdAt)
}
func (q *querier) GetProvisionerJobsToBeReaped(ctx context.Context, arg database.GetProvisionerJobsToBeReapedParams) ([]database.ProvisionerJob, error) {
if err := q.authorizeContext(ctx, policy.ActionRead, rbac.ResourceProvisionerJobs); err != nil {
return nil, err
}
return q.db.GetProvisionerJobsToBeReaped(ctx, arg)
}
func (q *querier) GetProvisionerKeyByHashedSecret(ctx context.Context, hashedSecret []byte) (database.ProvisionerKey, error) {
return fetch(q.log, q.auth, q.db.GetProvisionerKeyByHashedSecret)(ctx, hashedSecret)
}
@ -3533,27 +3551,22 @@ func (q *querier) InsertPresetParameters(ctx context.Context, arg database.Inser
return q.db.InsertPresetParameters(ctx, arg)
}
// TODO: We need to create a ProvisionerJob resource type
func (q *querier) InsertProvisionerJob(ctx context.Context, arg database.InsertProvisionerJobParams) (database.ProvisionerJob, error) {
// if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
// return database.ProvisionerJob{}, err
// }
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
// Details in https://github.com/coder/coder/issues/16160
return q.db.InsertProvisionerJob(ctx, arg)
}
// TODO: We need to create a ProvisionerJob resource type
func (q *querier) InsertProvisionerJobLogs(ctx context.Context, arg database.InsertProvisionerJobLogsParams) ([]database.ProvisionerJobLog, error) {
// if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
// return nil, err
// }
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
// Details in https://github.com/coder/coder/issues/16160
return q.db.InsertProvisionerJobLogs(ctx, arg)
}
// TODO: We need to create a ProvisionerJob resource type
func (q *querier) InsertProvisionerJobTimings(ctx context.Context, arg database.InsertProvisionerJobTimingsParams) ([]database.ProvisionerJobTiming, error) {
// if err := q.authorizeContext(ctx, policy.ActionCreate, rbac.ResourceSystem); err != nil {
// return nil, err
// }
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceProvisionerJobs); err != nil {
return nil, err
}
return q.db.InsertProvisionerJobTimings(ctx, arg)
}
@ -4176,15 +4189,17 @@ func (q *querier) UpdateProvisionerDaemonLastSeenAt(ctx context.Context, arg dat
return q.db.UpdateProvisionerDaemonLastSeenAt(ctx, arg)
}
// TODO: We need to create a ProvisionerJob resource type
func (q *querier) UpdateProvisionerJobByID(ctx context.Context, arg database.UpdateProvisionerJobByIDParams) error {
// if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
// return err
// }
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceProvisionerJobs); err != nil {
return err
}
return q.db.UpdateProvisionerJobByID(ctx, arg)
}
func (q *querier) UpdateProvisionerJobWithCancelByID(ctx context.Context, arg database.UpdateProvisionerJobWithCancelByIDParams) error {
// TODO: Remove this once we have a proper rbac check for provisioner jobs.
// Details in https://github.com/coder/coder/issues/16160
job, err := q.db.GetProvisionerJobByID(ctx, arg.ID)
if err != nil {
return err
@ -4251,14 +4266,20 @@ func (q *querier) UpdateProvisionerJobWithCancelByID(ctx context.Context, arg da
return q.db.UpdateProvisionerJobWithCancelByID(ctx, arg)
}
// TODO: We need to create a ProvisionerJob resource type
func (q *querier) UpdateProvisionerJobWithCompleteByID(ctx context.Context, arg database.UpdateProvisionerJobWithCompleteByIDParams) error {
// if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
// return err
// }
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceProvisionerJobs); err != nil {
return err
}
return q.db.UpdateProvisionerJobWithCompleteByID(ctx, arg)
}
func (q *querier) UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx context.Context, arg database.UpdateProvisionerJobWithCompleteWithStartedAtByIDParams) error {
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceProvisionerJobs); err != nil {
return err
}
return q.db.UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx, arg)
}
func (q *querier) UpdateReplica(ctx context.Context, arg database.UpdateReplicaParams) (database.Replica, error) {
if err := q.authorizeContext(ctx, policy.ActionUpdate, rbac.ResourceSystem); err != nil {
return database.Replica{}, err

View File

@ -694,9 +694,12 @@ func (s *MethodTestSuite) TestProvisionerJob() {
Asserts(v.RBACObject(tpl), []policy.Action{policy.ActionRead, policy.ActionUpdate}).Returns()
}))
s.Run("GetProvisionerJobsByIDs", s.Subtest(func(db database.Store, check *expects) {
a := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{})
b := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{})
check.Args([]uuid.UUID{a.ID, b.ID}).Asserts().Returns(slice.New(a, b))
o := dbgen.Organization(s.T(), db, database.Organization{})
a := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{OrganizationID: o.ID})
b := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{OrganizationID: o.ID})
check.Args([]uuid.UUID{a.ID, b.ID}).
Asserts(rbac.ResourceProvisionerJobs.InOrg(o.ID), policy.ActionRead).
Returns(slice.New(a, b))
}))
s.Run("GetProvisionerLogsAfterID", s.Subtest(func(db database.Store, check *expects) {
u := dbgen.User(s.T(), db, database.User{})
@ -3923,9 +3926,8 @@ func (s *MethodTestSuite) TestSystemFunctions() {
check.Args().Asserts(rbac.ResourceSystem, policy.ActionDelete)
}))
s.Run("GetProvisionerJobsCreatedAfter", s.Subtest(func(db database.Store, check *expects) {
// TODO: add provisioner job resource type
_ = dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{CreatedAt: time.Now().Add(-time.Hour)})
check.Args(time.Now()).Asserts( /*rbac.ResourceSystem, policy.ActionRead*/ )
check.Args(time.Now()).Asserts(rbac.ResourceProvisionerJobs, policy.ActionRead)
}))
s.Run("GetTemplateVersionsByIDs", s.Subtest(func(db database.Store, check *expects) {
dbtestutil.DisableForeignKeysAndTriggers(s.T(), db)
@ -4008,11 +4010,11 @@ func (s *MethodTestSuite) TestSystemFunctions() {
Returns([]database.WorkspaceAgent{agt})
}))
s.Run("GetProvisionerJobsByIDs", s.Subtest(func(db database.Store, check *expects) {
// TODO: add a ProvisionerJob resource type
a := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{})
b := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{})
o := dbgen.Organization(s.T(), db, database.Organization{})
a := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{OrganizationID: o.ID})
b := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{OrganizationID: o.ID})
check.Args([]uuid.UUID{a.ID, b.ID}).
Asserts( /*rbac.ResourceSystem, policy.ActionRead*/ ).
Asserts(rbac.ResourceProvisionerJobs.InOrg(o.ID), policy.ActionRead).
Returns(slice.New(a, b))
}))
s.Run("InsertWorkspaceAgent", s.Subtest(func(db database.Store, check *expects) {
@ -4048,7 +4050,6 @@ func (s *MethodTestSuite) TestSystemFunctions() {
}).Asserts(rbac.ResourceSystem, policy.ActionUpdate).Returns()
}))
s.Run("AcquireProvisionerJob", s.Subtest(func(db database.Store, check *expects) {
// TODO: we need to create a ProvisionerJob resource
j := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{
StartedAt: sql.NullTime{Valid: false},
UpdatedAt: time.Now(),
@ -4058,47 +4059,48 @@ func (s *MethodTestSuite) TestSystemFunctions() {
OrganizationID: j.OrganizationID,
Types: []database.ProvisionerType{j.Provisioner},
ProvisionerTags: must(json.Marshal(j.Tags)),
}).Asserts( /*rbac.ResourceSystem, policy.ActionUpdate*/ )
}).Asserts(rbac.ResourceProvisionerJobs, policy.ActionUpdate)
}))
s.Run("UpdateProvisionerJobWithCompleteByID", s.Subtest(func(db database.Store, check *expects) {
// TODO: we need to create a ProvisionerJob resource
j := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{})
check.Args(database.UpdateProvisionerJobWithCompleteByIDParams{
ID: j.ID,
}).Asserts( /*rbac.ResourceSystem, policy.ActionUpdate*/ )
}).Asserts(rbac.ResourceProvisionerJobs, policy.ActionUpdate)
}))
s.Run("UpdateProvisionerJobWithCompleteWithStartedAtByID", s.Subtest(func(db database.Store, check *expects) {
j := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{})
check.Args(database.UpdateProvisionerJobWithCompleteWithStartedAtByIDParams{
ID: j.ID,
}).Asserts(rbac.ResourceProvisionerJobs, policy.ActionUpdate)
}))
s.Run("UpdateProvisionerJobByID", s.Subtest(func(db database.Store, check *expects) {
// TODO: we need to create a ProvisionerJob resource
j := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{})
check.Args(database.UpdateProvisionerJobByIDParams{
ID: j.ID,
UpdatedAt: time.Now(),
}).Asserts( /*rbac.ResourceSystem, policy.ActionUpdate*/ )
}).Asserts(rbac.ResourceProvisionerJobs, policy.ActionUpdate)
}))
s.Run("InsertProvisionerJob", s.Subtest(func(db database.Store, check *expects) {
dbtestutil.DisableForeignKeysAndTriggers(s.T(), db)
// TODO: we need to create a ProvisionerJob resource
check.Args(database.InsertProvisionerJobParams{
ID: uuid.New(),
Provisioner: database.ProvisionerTypeEcho,
StorageMethod: database.ProvisionerStorageMethodFile,
Type: database.ProvisionerJobTypeWorkspaceBuild,
Input: json.RawMessage("{}"),
}).Asserts( /*rbac.ResourceSystem, policy.ActionCreate*/ )
}).Asserts( /* rbac.ResourceProvisionerJobs, policy.ActionCreate */ )
}))
s.Run("InsertProvisionerJobLogs", s.Subtest(func(db database.Store, check *expects) {
// TODO: we need to create a ProvisionerJob resource
j := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{})
check.Args(database.InsertProvisionerJobLogsParams{
JobID: j.ID,
}).Asserts( /*rbac.ResourceSystem, policy.ActionCreate*/ )
}).Asserts( /* rbac.ResourceProvisionerJobs, policy.ActionUpdate */ )
}))
s.Run("InsertProvisionerJobTimings", s.Subtest(func(db database.Store, check *expects) {
// TODO: we need to create a ProvisionerJob resource
j := dbgen.ProvisionerJob(s.T(), db, nil, database.ProvisionerJob{})
check.Args(database.InsertProvisionerJobTimingsParams{
JobID: j.ID,
}).Asserts( /*rbac.ResourceSystem, policy.ActionCreate*/ )
}).Asserts(rbac.ResourceProvisionerJobs, policy.ActionUpdate)
}))
s.Run("UpsertProvisionerDaemon", s.Subtest(func(db database.Store, check *expects) {
dbtestutil.DisableForeignKeysAndTriggers(s.T(), db)
@ -4234,8 +4236,8 @@ func (s *MethodTestSuite) TestSystemFunctions() {
s.Run("GetFileTemplates", s.Subtest(func(db database.Store, check *expects) {
check.Args(uuid.New()).Asserts(rbac.ResourceSystem, policy.ActionRead)
}))
s.Run("GetHungProvisionerJobs", s.Subtest(func(db database.Store, check *expects) {
check.Args(time.Time{}).Asserts()
s.Run("GetProvisionerJobsToBeReaped", s.Subtest(func(db database.Store, check *expects) {
check.Args(database.GetProvisionerJobsToBeReapedParams{}).Asserts(rbac.ResourceProvisionerJobs, policy.ActionRead)
}))
s.Run("UpsertOAuthSigningKey", s.Subtest(func(db database.Store, check *expects) {
check.Args("foo").Asserts(rbac.ResourceSystem, policy.ActionUpdate)
@ -4479,6 +4481,9 @@ func (s *MethodTestSuite) TestSystemFunctions() {
VapidPrivateKey: "test",
}).Asserts(rbac.ResourceDeploymentConfig, policy.ActionUpdate)
}))
s.Run("GetProvisionerJobByIDForUpdate", s.Subtest(func(db database.Store, check *expects) {
check.Args(uuid.New()).Asserts(rbac.ResourceProvisionerJobs, policy.ActionRead).Errors(sql.ErrNoRows)
}))
}
func (s *MethodTestSuite) TestNotifications() {

View File

@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"math"
insecurerand "math/rand" //#nosec // this is only used for shuffling an array to pick random jobs to reap
"reflect"
"regexp"
"slices"
@ -3707,23 +3708,6 @@ func (q *FakeQuerier) GetHealthSettings(_ context.Context) (string, error) {
return string(q.healthSettings), nil
}
func (q *FakeQuerier) GetHungProvisionerJobs(_ context.Context, hungSince time.Time) ([]database.ProvisionerJob, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
hungJobs := []database.ProvisionerJob{}
for _, provisionerJob := range q.provisionerJobs {
if provisionerJob.StartedAt.Valid && !provisionerJob.CompletedAt.Valid && provisionerJob.UpdatedAt.Before(hungSince) {
// clone the Tags before appending, since maps are reference types and
// we don't want the caller to be able to mutate the map we have inside
// dbmem!
provisionerJob.Tags = maps.Clone(provisionerJob.Tags)
hungJobs = append(hungJobs, provisionerJob)
}
}
return hungJobs, nil
}
func (q *FakeQuerier) GetInboxNotificationByID(_ context.Context, id uuid.UUID) (database.InboxNotification, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
@ -4642,6 +4626,13 @@ func (q *FakeQuerier) GetProvisionerJobByID(ctx context.Context, id uuid.UUID) (
return q.getProvisionerJobByIDNoLock(ctx, id)
}
func (q *FakeQuerier) GetProvisionerJobByIDForUpdate(ctx context.Context, id uuid.UUID) (database.ProvisionerJob, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
return q.getProvisionerJobByIDNoLock(ctx, id)
}
func (q *FakeQuerier) GetProvisionerJobTimingsByJobID(_ context.Context, jobID uuid.UUID) ([]database.ProvisionerJobTiming, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
@ -4884,6 +4875,33 @@ func (q *FakeQuerier) GetProvisionerJobsCreatedAfter(_ context.Context, after ti
return jobs, nil
}
func (q *FakeQuerier) GetProvisionerJobsToBeReaped(_ context.Context, arg database.GetProvisionerJobsToBeReapedParams) ([]database.ProvisionerJob, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
maxJobs := arg.MaxJobs
hungJobs := []database.ProvisionerJob{}
for _, provisionerJob := range q.provisionerJobs {
if !provisionerJob.CompletedAt.Valid {
if (provisionerJob.StartedAt.Valid && provisionerJob.UpdatedAt.Before(arg.HungSince)) ||
(!provisionerJob.StartedAt.Valid && provisionerJob.UpdatedAt.Before(arg.PendingSince)) {
// clone the Tags before appending, since maps are reference types and
// we don't want the caller to be able to mutate the map we have inside
// dbmem!
provisionerJob.Tags = maps.Clone(provisionerJob.Tags)
hungJobs = append(hungJobs, provisionerJob)
if len(hungJobs) >= int(maxJobs) {
break
}
}
}
}
insecurerand.Shuffle(len(hungJobs), func(i, j int) {
hungJobs[i], hungJobs[j] = hungJobs[j], hungJobs[i]
})
return hungJobs, nil
}
func (q *FakeQuerier) GetProvisionerKeyByHashedSecret(_ context.Context, hashedSecret []byte) (database.ProvisionerKey, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
@ -10958,6 +10976,30 @@ func (q *FakeQuerier) UpdateProvisionerJobWithCompleteByID(_ context.Context, ar
return sql.ErrNoRows
}
func (q *FakeQuerier) UpdateProvisionerJobWithCompleteWithStartedAtByID(_ context.Context, arg database.UpdateProvisionerJobWithCompleteWithStartedAtByIDParams) error {
if err := validateDatabaseType(arg); err != nil {
return err
}
q.mutex.Lock()
defer q.mutex.Unlock()
for index, job := range q.provisionerJobs {
if arg.ID != job.ID {
continue
}
job.UpdatedAt = arg.UpdatedAt
job.CompletedAt = arg.CompletedAt
job.Error = arg.Error
job.ErrorCode = arg.ErrorCode
job.StartedAt = arg.StartedAt
job.JobStatus = provisionerJobStatus(job)
q.provisionerJobs[index] = job
return nil
}
return sql.ErrNoRows
}
func (q *FakeQuerier) UpdateReplica(_ context.Context, arg database.UpdateReplicaParams) (database.Replica, error) {
if err := validateDatabaseType(arg); err != nil {
return database.Replica{}, err

View File

@ -865,13 +865,6 @@ func (m queryMetricsStore) GetHealthSettings(ctx context.Context) (string, error
return r0, r1
}
func (m queryMetricsStore) GetHungProvisionerJobs(ctx context.Context, hungSince time.Time) ([]database.ProvisionerJob, error) {
start := time.Now()
jobs, err := m.s.GetHungProvisionerJobs(ctx, hungSince)
m.queryLatencies.WithLabelValues("GetHungProvisionerJobs").Observe(time.Since(start).Seconds())
return jobs, err
}
func (m queryMetricsStore) GetInboxNotificationByID(ctx context.Context, id uuid.UUID) (database.InboxNotification, error) {
start := time.Now()
r0, r1 := m.s.GetInboxNotificationByID(ctx, id)
@ -1194,6 +1187,13 @@ func (m queryMetricsStore) GetProvisionerJobByID(ctx context.Context, id uuid.UU
return job, err
}
func (m queryMetricsStore) GetProvisionerJobByIDForUpdate(ctx context.Context, id uuid.UUID) (database.ProvisionerJob, error) {
start := time.Now()
r0, r1 := m.s.GetProvisionerJobByIDForUpdate(ctx, id)
m.queryLatencies.WithLabelValues("GetProvisionerJobByIDForUpdate").Observe(time.Since(start).Seconds())
return r0, r1
}
func (m queryMetricsStore) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]database.ProvisionerJobTiming, error) {
start := time.Now()
r0, r1 := m.s.GetProvisionerJobTimingsByJobID(ctx, jobID)
@ -1229,6 +1229,13 @@ func (m queryMetricsStore) GetProvisionerJobsCreatedAfter(ctx context.Context, c
return jobs, err
}
func (m queryMetricsStore) GetProvisionerJobsToBeReaped(ctx context.Context, arg database.GetProvisionerJobsToBeReapedParams) ([]database.ProvisionerJob, error) {
start := time.Now()
r0, r1 := m.s.GetProvisionerJobsToBeReaped(ctx, arg)
m.queryLatencies.WithLabelValues("GetProvisionerJobsToBeReaped").Observe(time.Since(start).Seconds())
return r0, r1
}
func (m queryMetricsStore) GetProvisionerKeyByHashedSecret(ctx context.Context, hashedSecret []byte) (database.ProvisionerKey, error) {
start := time.Now()
r0, r1 := m.s.GetProvisionerKeyByHashedSecret(ctx, hashedSecret)
@ -2706,6 +2713,13 @@ func (m queryMetricsStore) UpdateProvisionerJobWithCompleteByID(ctx context.Cont
return err
}
func (m queryMetricsStore) UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx context.Context, arg database.UpdateProvisionerJobWithCompleteWithStartedAtByIDParams) error {
start := time.Now()
r0 := m.s.UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx, arg)
m.queryLatencies.WithLabelValues("UpdateProvisionerJobWithCompleteWithStartedAtByID").Observe(time.Since(start).Seconds())
return r0
}
func (m queryMetricsStore) UpdateReplica(ctx context.Context, arg database.UpdateReplicaParams) (database.Replica, error) {
start := time.Now()
replica, err := m.s.UpdateReplica(ctx, arg)

View File

@ -1743,21 +1743,6 @@ func (mr *MockStoreMockRecorder) GetHealthSettings(ctx any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHealthSettings", reflect.TypeOf((*MockStore)(nil).GetHealthSettings), ctx)
}
// GetHungProvisionerJobs mocks base method.
func (m *MockStore) GetHungProvisionerJobs(ctx context.Context, updatedAt time.Time) ([]database.ProvisionerJob, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetHungProvisionerJobs", ctx, updatedAt)
ret0, _ := ret[0].([]database.ProvisionerJob)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetHungProvisionerJobs indicates an expected call of GetHungProvisionerJobs.
func (mr *MockStoreMockRecorder) GetHungProvisionerJobs(ctx, updatedAt any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHungProvisionerJobs", reflect.TypeOf((*MockStore)(nil).GetHungProvisionerJobs), ctx, updatedAt)
}
// GetInboxNotificationByID mocks base method.
func (m *MockStore) GetInboxNotificationByID(ctx context.Context, id uuid.UUID) (database.InboxNotification, error) {
m.ctrl.T.Helper()
@ -2448,6 +2433,21 @@ func (mr *MockStoreMockRecorder) GetProvisionerJobByID(ctx, id any) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProvisionerJobByID", reflect.TypeOf((*MockStore)(nil).GetProvisionerJobByID), ctx, id)
}
// GetProvisionerJobByIDForUpdate mocks base method.
func (m *MockStore) GetProvisionerJobByIDForUpdate(ctx context.Context, id uuid.UUID) (database.ProvisionerJob, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetProvisionerJobByIDForUpdate", ctx, id)
ret0, _ := ret[0].(database.ProvisionerJob)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetProvisionerJobByIDForUpdate indicates an expected call of GetProvisionerJobByIDForUpdate.
func (mr *MockStoreMockRecorder) GetProvisionerJobByIDForUpdate(ctx, id any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProvisionerJobByIDForUpdate", reflect.TypeOf((*MockStore)(nil).GetProvisionerJobByIDForUpdate), ctx, id)
}
// GetProvisionerJobTimingsByJobID mocks base method.
func (m *MockStore) GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]database.ProvisionerJobTiming, error) {
m.ctrl.T.Helper()
@ -2523,6 +2523,21 @@ func (mr *MockStoreMockRecorder) GetProvisionerJobsCreatedAfter(ctx, createdAt a
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProvisionerJobsCreatedAfter", reflect.TypeOf((*MockStore)(nil).GetProvisionerJobsCreatedAfter), ctx, createdAt)
}
// GetProvisionerJobsToBeReaped mocks base method.
func (m *MockStore) GetProvisionerJobsToBeReaped(ctx context.Context, arg database.GetProvisionerJobsToBeReapedParams) ([]database.ProvisionerJob, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetProvisionerJobsToBeReaped", ctx, arg)
ret0, _ := ret[0].([]database.ProvisionerJob)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetProvisionerJobsToBeReaped indicates an expected call of GetProvisionerJobsToBeReaped.
func (mr *MockStoreMockRecorder) GetProvisionerJobsToBeReaped(ctx, arg any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProvisionerJobsToBeReaped", reflect.TypeOf((*MockStore)(nil).GetProvisionerJobsToBeReaped), ctx, arg)
}
// GetProvisionerKeyByHashedSecret mocks base method.
func (m *MockStore) GetProvisionerKeyByHashedSecret(ctx context.Context, hashedSecret []byte) (database.ProvisionerKey, error) {
m.ctrl.T.Helper()
@ -5732,6 +5747,20 @@ func (mr *MockStoreMockRecorder) UpdateProvisionerJobWithCompleteByID(ctx, arg a
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateProvisionerJobWithCompleteByID", reflect.TypeOf((*MockStore)(nil).UpdateProvisionerJobWithCompleteByID), ctx, arg)
}
// UpdateProvisionerJobWithCompleteWithStartedAtByID mocks base method.
func (m *MockStore) UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx context.Context, arg database.UpdateProvisionerJobWithCompleteWithStartedAtByIDParams) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateProvisionerJobWithCompleteWithStartedAtByID", ctx, arg)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateProvisionerJobWithCompleteWithStartedAtByID indicates an expected call of UpdateProvisionerJobWithCompleteWithStartedAtByID.
func (mr *MockStoreMockRecorder) UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx, arg any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateProvisionerJobWithCompleteWithStartedAtByID", reflect.TypeOf((*MockStore)(nil).UpdateProvisionerJobWithCompleteWithStartedAtByID), ctx, arg)
}
// UpdateReplica mocks base method.
func (m *MockStore) UpdateReplica(ctx context.Context, arg database.UpdateReplicaParams) (database.Replica, error) {
m.ctrl.T.Helper()

View File

@ -196,7 +196,6 @@ type sqlcQuerier interface {
GetGroupMembersCountByGroupID(ctx context.Context, arg GetGroupMembersCountByGroupIDParams) (int64, error)
GetGroups(ctx context.Context, arg GetGroupsParams) ([]GetGroupsRow, error)
GetHealthSettings(ctx context.Context) (string, error)
GetHungProvisionerJobs(ctx context.Context, updatedAt time.Time) ([]ProvisionerJob, error)
GetInboxNotificationByID(ctx context.Context, id uuid.UUID) (InboxNotification, error)
// Fetches inbox notifications for a user filtered by templates and targets
// param user_id: The user ID
@ -265,11 +264,16 @@ type sqlcQuerier interface {
// Previous job information.
GetProvisionerDaemonsWithStatusByOrganization(ctx context.Context, arg GetProvisionerDaemonsWithStatusByOrganizationParams) ([]GetProvisionerDaemonsWithStatusByOrganizationRow, error)
GetProvisionerJobByID(ctx context.Context, id uuid.UUID) (ProvisionerJob, error)
// Gets a single provisioner job by ID for update.
// This is used to securely reap jobs that have been hung/pending for a long time.
GetProvisionerJobByIDForUpdate(ctx context.Context, id uuid.UUID) (ProvisionerJob, error)
GetProvisionerJobTimingsByJobID(ctx context.Context, jobID uuid.UUID) ([]ProvisionerJobTiming, error)
GetProvisionerJobsByIDs(ctx context.Context, ids []uuid.UUID) ([]ProvisionerJob, error)
GetProvisionerJobsByIDsWithQueuePosition(ctx context.Context, ids []uuid.UUID) ([]GetProvisionerJobsByIDsWithQueuePositionRow, error)
GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error)
GetProvisionerJobsCreatedAfter(ctx context.Context, createdAt time.Time) ([]ProvisionerJob, error)
// To avoid repeatedly attempting to reap the same jobs, we randomly order and limit to @max_jobs.
GetProvisionerJobsToBeReaped(ctx context.Context, arg GetProvisionerJobsToBeReapedParams) ([]ProvisionerJob, error)
GetProvisionerKeyByHashedSecret(ctx context.Context, hashedSecret []byte) (ProvisionerKey, error)
GetProvisionerKeyByID(ctx context.Context, id uuid.UUID) (ProvisionerKey, error)
GetProvisionerKeyByName(ctx context.Context, arg GetProvisionerKeyByNameParams) (ProvisionerKey, error)
@ -567,6 +571,7 @@ type sqlcQuerier interface {
UpdateProvisionerJobByID(ctx context.Context, arg UpdateProvisionerJobByIDParams) error
UpdateProvisionerJobWithCancelByID(ctx context.Context, arg UpdateProvisionerJobWithCancelByIDParams) error
UpdateProvisionerJobWithCompleteByID(ctx context.Context, arg UpdateProvisionerJobWithCompleteByIDParams) error
UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx context.Context, arg UpdateProvisionerJobWithCompleteWithStartedAtByIDParams) error
UpdateReplica(ctx context.Context, arg UpdateReplicaParams) (Replica, error)
UpdateTailnetPeerStatusByCoordinator(ctx context.Context, arg UpdateTailnetPeerStatusByCoordinatorParams) error
UpdateTemplateACLByID(ctx context.Context, arg UpdateTemplateACLByIDParams) error

View File

@ -7384,60 +7384,6 @@ func (q *sqlQuerier) AcquireProvisionerJob(ctx context.Context, arg AcquireProvi
return i, err
}
const getHungProvisionerJobs = `-- name: GetHungProvisionerJobs :many
SELECT
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status
FROM
provisioner_jobs
WHERE
updated_at < $1
AND started_at IS NOT NULL
AND completed_at IS NULL
`
func (q *sqlQuerier) GetHungProvisionerJobs(ctx context.Context, updatedAt time.Time) ([]ProvisionerJob, error) {
rows, err := q.db.QueryContext(ctx, getHungProvisionerJobs, updatedAt)
if err != nil {
return nil, err
}
defer rows.Close()
var items []ProvisionerJob
for rows.Next() {
var i ProvisionerJob
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.StartedAt,
&i.CanceledAt,
&i.CompletedAt,
&i.Error,
&i.OrganizationID,
&i.InitiatorID,
&i.Provisioner,
&i.StorageMethod,
&i.Type,
&i.Input,
&i.WorkerID,
&i.FileID,
&i.Tags,
&i.ErrorCode,
&i.TraceMetadata,
&i.JobStatus,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getProvisionerJobByID = `-- name: GetProvisionerJobByID :one
SELECT
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status
@ -7474,6 +7420,46 @@ func (q *sqlQuerier) GetProvisionerJobByID(ctx context.Context, id uuid.UUID) (P
return i, err
}
const getProvisionerJobByIDForUpdate = `-- name: GetProvisionerJobByIDForUpdate :one
SELECT
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status
FROM
provisioner_jobs
WHERE
id = $1
FOR UPDATE
SKIP LOCKED
`
// Gets a single provisioner job by ID for update.
// This is used to securely reap jobs that have been hung/pending for a long time.
func (q *sqlQuerier) GetProvisionerJobByIDForUpdate(ctx context.Context, id uuid.UUID) (ProvisionerJob, error) {
row := q.db.QueryRowContext(ctx, getProvisionerJobByIDForUpdate, id)
var i ProvisionerJob
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.StartedAt,
&i.CanceledAt,
&i.CompletedAt,
&i.Error,
&i.OrganizationID,
&i.InitiatorID,
&i.Provisioner,
&i.StorageMethod,
&i.Type,
&i.Input,
&i.WorkerID,
&i.FileID,
&i.Tags,
&i.ErrorCode,
&i.TraceMetadata,
&i.JobStatus,
)
return i, err
}
const getProvisionerJobTimingsByJobID = `-- name: GetProvisionerJobTimingsByJobID :many
SELECT job_id, started_at, ended_at, stage, source, action, resource FROM provisioner_job_timings
WHERE job_id = $1
@ -7913,6 +7899,79 @@ func (q *sqlQuerier) GetProvisionerJobsCreatedAfter(ctx context.Context, created
return items, nil
}
const getProvisionerJobsToBeReaped = `-- name: GetProvisionerJobsToBeReaped :many
SELECT
id, created_at, updated_at, started_at, canceled_at, completed_at, error, organization_id, initiator_id, provisioner, storage_method, type, input, worker_id, file_id, tags, error_code, trace_metadata, job_status
FROM
provisioner_jobs
WHERE
(
-- If the job has not been started before @pending_since, reap it.
updated_at < $1
AND started_at IS NULL
AND completed_at IS NULL
)
OR
(
-- If the job has been started but not completed before @hung_since, reap it.
updated_at < $2
AND started_at IS NOT NULL
AND completed_at IS NULL
)
ORDER BY random()
LIMIT $3
`
type GetProvisionerJobsToBeReapedParams struct {
PendingSince time.Time `db:"pending_since" json:"pending_since"`
HungSince time.Time `db:"hung_since" json:"hung_since"`
MaxJobs int32 `db:"max_jobs" json:"max_jobs"`
}
// To avoid repeatedly attempting to reap the same jobs, we randomly order and limit to @max_jobs.
func (q *sqlQuerier) GetProvisionerJobsToBeReaped(ctx context.Context, arg GetProvisionerJobsToBeReapedParams) ([]ProvisionerJob, error) {
rows, err := q.db.QueryContext(ctx, getProvisionerJobsToBeReaped, arg.PendingSince, arg.HungSince, arg.MaxJobs)
if err != nil {
return nil, err
}
defer rows.Close()
var items []ProvisionerJob
for rows.Next() {
var i ProvisionerJob
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.StartedAt,
&i.CanceledAt,
&i.CompletedAt,
&i.Error,
&i.OrganizationID,
&i.InitiatorID,
&i.Provisioner,
&i.StorageMethod,
&i.Type,
&i.Input,
&i.WorkerID,
&i.FileID,
&i.Tags,
&i.ErrorCode,
&i.TraceMetadata,
&i.JobStatus,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const insertProvisionerJob = `-- name: InsertProvisionerJob :one
INSERT INTO
provisioner_jobs (
@ -8121,6 +8180,40 @@ func (q *sqlQuerier) UpdateProvisionerJobWithCompleteByID(ctx context.Context, a
return err
}
const updateProvisionerJobWithCompleteWithStartedAtByID = `-- name: UpdateProvisionerJobWithCompleteWithStartedAtByID :exec
UPDATE
provisioner_jobs
SET
updated_at = $2,
completed_at = $3,
error = $4,
error_code = $5,
started_at = $6
WHERE
id = $1
`
type UpdateProvisionerJobWithCompleteWithStartedAtByIDParams struct {
ID uuid.UUID `db:"id" json:"id"`
UpdatedAt time.Time `db:"updated_at" json:"updated_at"`
CompletedAt sql.NullTime `db:"completed_at" json:"completed_at"`
Error sql.NullString `db:"error" json:"error"`
ErrorCode sql.NullString `db:"error_code" json:"error_code"`
StartedAt sql.NullTime `db:"started_at" json:"started_at"`
}
func (q *sqlQuerier) UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx context.Context, arg UpdateProvisionerJobWithCompleteWithStartedAtByIDParams) error {
_, err := q.db.ExecContext(ctx, updateProvisionerJobWithCompleteWithStartedAtByID,
arg.ID,
arg.UpdatedAt,
arg.CompletedAt,
arg.Error,
arg.ErrorCode,
arg.StartedAt,
)
return err
}
const deleteProvisionerKey = `-- name: DeleteProvisionerKey :exec
DELETE FROM
provisioner_keys

View File

@ -41,6 +41,18 @@ FROM
WHERE
id = $1;
-- name: GetProvisionerJobByIDForUpdate :one
-- Gets a single provisioner job by ID for update.
-- This is used to securely reap jobs that have been hung/pending for a long time.
SELECT
*
FROM
provisioner_jobs
WHERE
id = $1
FOR UPDATE
SKIP LOCKED;
-- name: GetProvisionerJobsByIDs :many
SELECT
*
@ -262,15 +274,40 @@ SET
WHERE
id = $1;
-- name: GetHungProvisionerJobs :many
-- name: UpdateProvisionerJobWithCompleteWithStartedAtByID :exec
UPDATE
provisioner_jobs
SET
updated_at = $2,
completed_at = $3,
error = $4,
error_code = $5,
started_at = $6
WHERE
id = $1;
-- name: GetProvisionerJobsToBeReaped :many
SELECT
*
FROM
provisioner_jobs
WHERE
updated_at < $1
AND started_at IS NOT NULL
AND completed_at IS NULL;
(
-- If the job has not been started before @pending_since, reap it.
updated_at < @pending_since
AND started_at IS NULL
AND completed_at IS NULL
)
OR
(
-- If the job has been started but not completed before @hung_since, reap it.
updated_at < @hung_since
AND started_at IS NOT NULL
AND completed_at IS NULL
)
-- To avoid repeatedly attempting to reap the same jobs, we randomly order and limit to @max_jobs.
ORDER BY random()
LIMIT @max_jobs;
-- name: InsertProvisionerJobTimings :many
INSERT INTO provisioner_job_timings (job_id, started_at, ended_at, stage, source, action, resource)

View File

@ -132,7 +132,7 @@ var actorLogOrder = []rbac.SubjectType{
rbac.SubjectTypeAutostart,
rbac.SubjectTypeCryptoKeyReader,
rbac.SubjectTypeCryptoKeyRotator,
rbac.SubjectTypeHangDetector,
rbac.SubjectTypeJobReaper,
rbac.SubjectTypeNotifier,
rbac.SubjectTypePrebuildsOrchestrator,
rbac.SubjectTypeProvisionerd,

View File

@ -1,11 +1,10 @@
package unhanger
package jobreaper
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"math/rand" //#nosec // this is only used for shuffling an array to pick random jobs to unhang
"fmt" //#nosec // this is only used for shuffling an array to pick random jobs to unhang
"time"
"golang.org/x/xerrors"
@ -21,10 +20,14 @@ import (
)
const (
// HungJobDuration is the duration of time since the last update to a job
// before it is considered hung.
// HungJobDuration is the duration of time since the last update
// to a RUNNING job before it is considered hung.
HungJobDuration = 5 * time.Minute
// PendingJobDuration is the duration of time since last update
// to a PENDING job before it is considered dead.
PendingJobDuration = 30 * time.Minute
// HungJobExitTimeout is the duration of time that provisioners should allow
// for a graceful exit upon cancellation due to failing to send an update to
// a job.
@ -38,16 +41,30 @@ const (
MaxJobsPerRun = 10
)
// HungJobLogMessages are written to provisioner job logs when a job is hung and
// terminated.
var HungJobLogMessages = []string{
"",
"====================",
"Coder: Build has been detected as hung for 5 minutes and will be terminated.",
"====================",
"",
// jobLogMessages are written to provisioner job logs when a job is reaped
func JobLogMessages(reapType ReapType, threshold time.Duration) []string {
return []string{
"",
"====================",
fmt.Sprintf("Coder: Build has been detected as %s for %.0f minutes and will be terminated.", reapType, threshold.Minutes()),
"====================",
"",
}
}
type jobToReap struct {
ID uuid.UUID
Threshold time.Duration
Type ReapType
}
type ReapType string
const (
Pending ReapType = "pending"
Hung ReapType = "hung"
)
// acquireLockError is returned when the detector fails to acquire a lock and
// cancels the current run.
type acquireLockError struct{}
@ -93,10 +110,10 @@ type Stats struct {
Error error
}
// New returns a new hang detector.
// New returns a new job reaper.
func New(ctx context.Context, db database.Store, pub pubsub.Pubsub, log slog.Logger, tick <-chan time.Time) *Detector {
//nolint:gocritic // Hang detector has a limited set of permissions.
ctx, cancel := context.WithCancel(dbauthz.AsHangDetector(ctx))
//nolint:gocritic // Job reaper has a limited set of permissions.
ctx, cancel := context.WithCancel(dbauthz.AsJobReaper(ctx))
d := &Detector{
ctx: ctx,
cancel: cancel,
@ -172,34 +189,42 @@ func (d *Detector) run(t time.Time) Stats {
Error: nil,
}
// Find all provisioner jobs that are currently running but have not
// received an update in the last 5 minutes.
jobs, err := d.db.GetHungProvisionerJobs(ctx, t.Add(-HungJobDuration))
// Find all provisioner jobs to be reaped
jobs, err := d.db.GetProvisionerJobsToBeReaped(ctx, database.GetProvisionerJobsToBeReapedParams{
PendingSince: t.Add(-PendingJobDuration),
HungSince: t.Add(-HungJobDuration),
MaxJobs: MaxJobsPerRun,
})
if err != nil {
stats.Error = xerrors.Errorf("get hung provisioner jobs: %w", err)
stats.Error = xerrors.Errorf("get provisioner jobs to be reaped: %w", err)
return stats
}
// Limit the number of jobs we'll unhang in a single run to avoid
// timing out.
if len(jobs) > MaxJobsPerRun {
// Pick a random subset of the jobs to unhang.
rand.Shuffle(len(jobs), func(i, j int) {
jobs[i], jobs[j] = jobs[j], jobs[i]
})
jobs = jobs[:MaxJobsPerRun]
jobsToReap := make([]*jobToReap, 0, len(jobs))
for _, job := range jobs {
j := &jobToReap{
ID: job.ID,
}
if job.JobStatus == database.ProvisionerJobStatusPending {
j.Threshold = PendingJobDuration
j.Type = Pending
} else {
j.Threshold = HungJobDuration
j.Type = Hung
}
jobsToReap = append(jobsToReap, j)
}
// Send a message into the build log for each hung job saying that it
// has been detected and will be terminated, then mark the job as
// failed.
for _, job := range jobs {
// Send a message into the build log for each hung or pending job saying that it
// has been detected and will be terminated, then mark the job as failed.
for _, job := range jobsToReap {
log := d.log.With(slog.F("job_id", job.ID))
err := unhangJob(ctx, log, d.db, d.pubsub, job.ID)
err := reapJob(ctx, log, d.db, d.pubsub, job)
if err != nil {
if !(xerrors.As(err, &acquireLockError{}) || xerrors.As(err, &jobIneligibleError{})) {
log.Error(ctx, "error forcefully terminating hung provisioner job", slog.Error(err))
log.Error(ctx, "error forcefully terminating provisioner job", slog.F("type", job.Type), slog.Error(err))
}
continue
}
@ -210,47 +235,34 @@ func (d *Detector) run(t time.Time) Stats {
return stats
}
func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubsub.Pubsub, jobID uuid.UUID) error {
func reapJob(ctx context.Context, log slog.Logger, db database.Store, pub pubsub.Pubsub, jobToReap *jobToReap) error {
var lowestLogID int64
err := db.InTx(func(db database.Store) error {
locked, err := db.TryAcquireLock(ctx, database.GenLockID(fmt.Sprintf("hang-detector:%s", jobID)))
if err != nil {
return xerrors.Errorf("acquire lock: %w", err)
}
if !locked {
// This error is ignored.
return acquireLockError{}
}
// Refetch the job while we hold the lock.
job, err := db.GetProvisionerJobByID(ctx, jobID)
job, err := db.GetProvisionerJobByIDForUpdate(ctx, jobToReap.ID)
if err != nil {
if xerrors.Is(err, sql.ErrNoRows) {
return acquireLockError{}
}
return xerrors.Errorf("get provisioner job: %w", err)
}
// Check if we should still unhang it.
if !job.StartedAt.Valid {
// This shouldn't be possible to hit because the query only selects
// started and not completed jobs, and a job can't be "un-started".
return jobIneligibleError{
Err: xerrors.New("job is not started"),
}
}
if job.CompletedAt.Valid {
return jobIneligibleError{
Err: xerrors.Errorf("job is completed (status %s)", job.JobStatus),
}
}
if job.UpdatedAt.After(time.Now().Add(-HungJobDuration)) {
if job.UpdatedAt.After(time.Now().Add(-jobToReap.Threshold)) {
return jobIneligibleError{
Err: xerrors.New("job has been updated recently"),
}
}
log.Warn(
ctx, "detected hung provisioner job, forcefully terminating",
"threshold", HungJobDuration,
ctx, "forcefully terminating provisioner job",
"type", jobToReap.Type,
"threshold", jobToReap.Threshold,
)
// First, get the latest logs from the build so we can make sure
@ -260,7 +272,7 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
CreatedAfter: 0,
})
if err != nil {
return xerrors.Errorf("get logs for hung job: %w", err)
return xerrors.Errorf("get logs for %s job: %w", jobToReap.Type, err)
}
logStage := ""
if len(logs) != 0 {
@ -280,7 +292,7 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
Output: nil,
}
now := dbtime.Now()
for i, msg := range HungJobLogMessages {
for i, msg := range JobLogMessages(jobToReap.Type, jobToReap.Threshold) {
// Set the created at in a way that ensures each message has
// a unique timestamp so they will be sorted correctly.
insertParams.CreatedAt = append(insertParams.CreatedAt, now.Add(time.Millisecond*time.Duration(i)))
@ -291,13 +303,22 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
}
newLogs, err := db.InsertProvisionerJobLogs(ctx, insertParams)
if err != nil {
return xerrors.Errorf("insert logs for hung job: %w", err)
return xerrors.Errorf("insert logs for %s job: %w", job.JobStatus, err)
}
lowestLogID = newLogs[0].ID
// Mark the job as failed.
now = dbtime.Now()
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
// If the job was never started (pending), set the StartedAt time to the current
// time so that the build duration is correct.
if job.JobStatus == database.ProvisionerJobStatusPending {
job.StartedAt = sql.NullTime{
Time: now,
Valid: true,
}
}
err = db.UpdateProvisionerJobWithCompleteWithStartedAtByID(ctx, database.UpdateProvisionerJobWithCompleteWithStartedAtByIDParams{
ID: job.ID,
UpdatedAt: now,
CompletedAt: sql.NullTime{
@ -305,12 +326,13 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
Valid: true,
},
Error: sql.NullString{
String: "Coder: Build has been detected as hung for 5 minutes and has been terminated by hang detector.",
String: fmt.Sprintf("Coder: Build has been detected as %s for %.0f minutes and has been terminated by the reaper.", jobToReap.Type, jobToReap.Threshold.Minutes()),
Valid: true,
},
ErrorCode: sql.NullString{
Valid: false,
},
StartedAt: job.StartedAt,
})
if err != nil {
return xerrors.Errorf("mark job as failed: %w", err)
@ -364,7 +386,7 @@ func unhangJob(ctx context.Context, log slog.Logger, db database.Store, pub pubs
if err != nil {
return xerrors.Errorf("marshal log notification: %w", err)
}
err = pub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(jobID), data)
err = pub.Publish(provisionersdk.ProvisionerJobLogsNotifyChannel(jobToReap.ID), data)
if err != nil {
return xerrors.Errorf("publish log notification: %w", err)
}

View File

@ -1,4 +1,4 @@
package unhanger_test
package jobreaper_test
import (
"context"
@ -20,9 +20,9 @@ import (
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbgen"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/jobreaper"
"github.com/coder/coder/v2/coderd/provisionerdserver"
"github.com/coder/coder/v2/coderd/rbac"
"github.com/coder/coder/v2/coderd/unhanger"
"github.com/coder/coder/v2/provisionersdk"
"github.com/coder/coder/v2/testutil"
)
@ -39,10 +39,10 @@ func TestDetectorNoJobs(t *testing.T) {
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan unhanger.Stats)
statsCh = make(chan jobreaper.Stats)
)
detector := unhanger.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
tickCh <- time.Now()
@ -62,7 +62,7 @@ func TestDetectorNoHungJobs(t *testing.T) {
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan unhanger.Stats)
statsCh = make(chan jobreaper.Stats)
)
// Insert some jobs that are running and haven't been updated in a while,
@ -89,7 +89,7 @@ func TestDetectorNoHungJobs(t *testing.T) {
})
}
detector := unhanger.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
tickCh <- now
@ -109,7 +109,7 @@ func TestDetectorHungWorkspaceBuild(t *testing.T) {
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan unhanger.Stats)
statsCh = make(chan jobreaper.Stats)
)
var (
@ -195,7 +195,7 @@ func TestDetectorHungWorkspaceBuild(t *testing.T) {
t.Log("previous job ID: ", previousWorkspaceBuildJob.ID)
t.Log("current job ID: ", currentWorkspaceBuildJob.ID)
detector := unhanger.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
tickCh <- now
@ -231,7 +231,7 @@ func TestDetectorHungWorkspaceBuildNoOverrideState(t *testing.T) {
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan unhanger.Stats)
statsCh = make(chan jobreaper.Stats)
)
var (
@ -318,7 +318,7 @@ func TestDetectorHungWorkspaceBuildNoOverrideState(t *testing.T) {
t.Log("previous job ID: ", previousWorkspaceBuildJob.ID)
t.Log("current job ID: ", currentWorkspaceBuildJob.ID)
detector := unhanger.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
tickCh <- now
@ -354,7 +354,7 @@ func TestDetectorHungWorkspaceBuildNoOverrideStateIfNoExistingBuild(t *testing.T
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan unhanger.Stats)
statsCh = make(chan jobreaper.Stats)
)
var (
@ -411,7 +411,7 @@ func TestDetectorHungWorkspaceBuildNoOverrideStateIfNoExistingBuild(t *testing.T
t.Log("current job ID: ", currentWorkspaceBuildJob.ID)
detector := unhanger.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
tickCh <- now
@ -439,6 +439,100 @@ func TestDetectorHungWorkspaceBuildNoOverrideStateIfNoExistingBuild(t *testing.T
detector.Wait()
}
func TestDetectorPendingWorkspaceBuildNoOverrideStateIfNoExistingBuild(t *testing.T) {
t.Parallel()
var (
ctx = testutil.Context(t, testutil.WaitLong)
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan jobreaper.Stats)
)
var (
now = time.Now()
thirtyFiveMinAgo = now.Add(-time.Minute * 35)
org = dbgen.Organization(t, db, database.Organization{})
user = dbgen.User(t, db, database.User{})
file = dbgen.File(t, db, database.File{})
template = dbgen.Template(t, db, database.Template{
OrganizationID: org.ID,
CreatedBy: user.ID,
})
templateVersion = dbgen.TemplateVersion(t, db, database.TemplateVersion{
OrganizationID: org.ID,
TemplateID: uuid.NullUUID{
UUID: template.ID,
Valid: true,
},
CreatedBy: user.ID,
})
workspace = dbgen.Workspace(t, db, database.WorkspaceTable{
OwnerID: user.ID,
OrganizationID: org.ID,
TemplateID: template.ID,
})
// First build.
expectedWorkspaceBuildState = []byte(`{"dean":"cool","colin":"also cool"}`)
currentWorkspaceBuildJob = dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
CreatedAt: thirtyFiveMinAgo,
UpdatedAt: thirtyFiveMinAgo,
StartedAt: sql.NullTime{
Time: time.Time{},
Valid: false,
},
OrganizationID: org.ID,
InitiatorID: user.ID,
Provisioner: database.ProvisionerTypeEcho,
StorageMethod: database.ProvisionerStorageMethodFile,
FileID: file.ID,
Type: database.ProvisionerJobTypeWorkspaceBuild,
Input: []byte("{}"),
})
currentWorkspaceBuild = dbgen.WorkspaceBuild(t, db, database.WorkspaceBuild{
WorkspaceID: workspace.ID,
TemplateVersionID: templateVersion.ID,
BuildNumber: 1,
JobID: currentWorkspaceBuildJob.ID,
// Should not be overridden.
ProvisionerState: expectedWorkspaceBuildState,
})
)
t.Log("current job ID: ", currentWorkspaceBuildJob.ID)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
tickCh <- now
stats := <-statsCh
require.NoError(t, stats.Error)
require.Len(t, stats.TerminatedJobIDs, 1)
require.Equal(t, currentWorkspaceBuildJob.ID, stats.TerminatedJobIDs[0])
// Check that the current provisioner job was updated.
job, err := db.GetProvisionerJobByID(ctx, currentWorkspaceBuildJob.ID)
require.NoError(t, err)
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
require.True(t, job.CompletedAt.Valid)
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
require.True(t, job.StartedAt.Valid)
require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second)
require.True(t, job.Error.Valid)
require.Contains(t, job.Error.String, "Build has been detected as pending")
require.False(t, job.ErrorCode.Valid)
// Check that the provisioner state was NOT updated.
build, err := db.GetWorkspaceBuildByID(ctx, currentWorkspaceBuild.ID)
require.NoError(t, err)
require.Equal(t, expectedWorkspaceBuildState, build.ProvisionerState)
detector.Close()
detector.Wait()
}
func TestDetectorHungOtherJobTypes(t *testing.T) {
t.Parallel()
@ -447,7 +541,7 @@ func TestDetectorHungOtherJobTypes(t *testing.T) {
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan unhanger.Stats)
statsCh = make(chan jobreaper.Stats)
)
var (
@ -509,7 +603,7 @@ func TestDetectorHungOtherJobTypes(t *testing.T) {
t.Log("template import job ID: ", templateImportJob.ID)
t.Log("template dry-run job ID: ", templateDryRunJob.ID)
detector := unhanger.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
tickCh <- now
@ -543,6 +637,113 @@ func TestDetectorHungOtherJobTypes(t *testing.T) {
detector.Wait()
}
func TestDetectorPendingOtherJobTypes(t *testing.T) {
t.Parallel()
var (
ctx = testutil.Context(t, testutil.WaitLong)
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan jobreaper.Stats)
)
var (
now = time.Now()
thirtyFiveMinAgo = now.Add(-time.Minute * 35)
org = dbgen.Organization(t, db, database.Organization{})
user = dbgen.User(t, db, database.User{})
file = dbgen.File(t, db, database.File{})
// Template import job.
templateImportJob = dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
CreatedAt: thirtyFiveMinAgo,
UpdatedAt: thirtyFiveMinAgo,
StartedAt: sql.NullTime{
Time: time.Time{},
Valid: false,
},
OrganizationID: org.ID,
InitiatorID: user.ID,
Provisioner: database.ProvisionerTypeEcho,
StorageMethod: database.ProvisionerStorageMethodFile,
FileID: file.ID,
Type: database.ProvisionerJobTypeTemplateVersionImport,
Input: []byte("{}"),
})
_ = dbgen.TemplateVersion(t, db, database.TemplateVersion{
OrganizationID: org.ID,
JobID: templateImportJob.ID,
CreatedBy: user.ID,
})
)
// Template dry-run job.
dryRunVersion := dbgen.TemplateVersion(t, db, database.TemplateVersion{
OrganizationID: org.ID,
CreatedBy: user.ID,
})
input, err := json.Marshal(provisionerdserver.TemplateVersionDryRunJob{
TemplateVersionID: dryRunVersion.ID,
})
require.NoError(t, err)
templateDryRunJob := dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
CreatedAt: thirtyFiveMinAgo,
UpdatedAt: thirtyFiveMinAgo,
StartedAt: sql.NullTime{
Time: time.Time{},
Valid: false,
},
OrganizationID: org.ID,
InitiatorID: user.ID,
Provisioner: database.ProvisionerTypeEcho,
StorageMethod: database.ProvisionerStorageMethodFile,
FileID: file.ID,
Type: database.ProvisionerJobTypeTemplateVersionDryRun,
Input: input,
})
t.Log("template import job ID: ", templateImportJob.ID)
t.Log("template dry-run job ID: ", templateDryRunJob.ID)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
tickCh <- now
stats := <-statsCh
require.NoError(t, stats.Error)
require.Len(t, stats.TerminatedJobIDs, 2)
require.Contains(t, stats.TerminatedJobIDs, templateImportJob.ID)
require.Contains(t, stats.TerminatedJobIDs, templateDryRunJob.ID)
// Check that the template import job was updated.
job, err := db.GetProvisionerJobByID(ctx, templateImportJob.ID)
require.NoError(t, err)
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
require.True(t, job.CompletedAt.Valid)
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
require.True(t, job.StartedAt.Valid)
require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second)
require.True(t, job.Error.Valid)
require.Contains(t, job.Error.String, "Build has been detected as pending")
require.False(t, job.ErrorCode.Valid)
// Check that the template dry-run job was updated.
job, err = db.GetProvisionerJobByID(ctx, templateDryRunJob.ID)
require.NoError(t, err)
require.WithinDuration(t, now, job.UpdatedAt, 30*time.Second)
require.True(t, job.CompletedAt.Valid)
require.WithinDuration(t, now, job.CompletedAt.Time, 30*time.Second)
require.True(t, job.StartedAt.Valid)
require.WithinDuration(t, now, job.StartedAt.Time, 30*time.Second)
require.True(t, job.Error.Valid)
require.Contains(t, job.Error.String, "Build has been detected as pending")
require.False(t, job.ErrorCode.Valid)
detector.Close()
detector.Wait()
}
func TestDetectorHungCanceledJob(t *testing.T) {
t.Parallel()
@ -551,7 +752,7 @@ func TestDetectorHungCanceledJob(t *testing.T) {
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan unhanger.Stats)
statsCh = make(chan jobreaper.Stats)
)
var (
@ -591,7 +792,7 @@ func TestDetectorHungCanceledJob(t *testing.T) {
t.Log("template import job ID: ", templateImportJob.ID)
detector := unhanger.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
tickCh <- now
@ -653,7 +854,7 @@ func TestDetectorPushesLogs(t *testing.T) {
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan unhanger.Stats)
statsCh = make(chan jobreaper.Stats)
)
var (
@ -706,7 +907,7 @@ func TestDetectorPushesLogs(t *testing.T) {
require.Len(t, logs, 10)
}
detector := unhanger.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
// Create pubsub subscription to listen for new log events.
@ -741,12 +942,19 @@ func TestDetectorPushesLogs(t *testing.T) {
CreatedAfter: after,
})
require.NoError(t, err)
require.Len(t, logs, len(unhanger.HungJobLogMessages))
threshold := jobreaper.HungJobDuration
jobType := jobreaper.Hung
if templateImportJob.JobStatus == database.ProvisionerJobStatusPending {
threshold = jobreaper.PendingJobDuration
jobType = jobreaper.Pending
}
expectedLogs := jobreaper.JobLogMessages(jobType, threshold)
require.Len(t, logs, len(expectedLogs))
for i, log := range logs {
assert.Equal(t, database.LogLevelError, log.Level)
assert.Equal(t, c.expectStage, log.Stage)
assert.Equal(t, database.LogSourceProvisionerDaemon, log.Source)
assert.Equal(t, unhanger.HungJobLogMessages[i], log.Output)
assert.Equal(t, expectedLogs[i], log.Output)
}
// Double check the full log count.
@ -755,7 +963,7 @@ func TestDetectorPushesLogs(t *testing.T) {
CreatedAfter: 0,
})
require.NoError(t, err)
require.Len(t, logs, c.preLogCount+len(unhanger.HungJobLogMessages))
require.Len(t, logs, c.preLogCount+len(expectedLogs))
detector.Close()
detector.Wait()
@ -771,15 +979,15 @@ func TestDetectorMaxJobsPerRun(t *testing.T) {
db, pubsub = dbtestutil.NewDB(t)
log = testutil.Logger(t)
tickCh = make(chan time.Time)
statsCh = make(chan unhanger.Stats)
statsCh = make(chan jobreaper.Stats)
org = dbgen.Organization(t, db, database.Organization{})
user = dbgen.User(t, db, database.User{})
file = dbgen.File(t, db, database.File{})
)
// Create unhanger.MaxJobsPerRun + 1 hung jobs.
// Create MaxJobsPerRun + 1 hung jobs.
now := time.Now()
for i := 0; i < unhanger.MaxJobsPerRun+1; i++ {
for i := 0; i < jobreaper.MaxJobsPerRun+1; i++ {
pj := dbgen.ProvisionerJob(t, db, pubsub, database.ProvisionerJob{
CreatedAt: now.Add(-time.Hour),
UpdatedAt: now.Add(-time.Hour),
@ -802,14 +1010,14 @@ func TestDetectorMaxJobsPerRun(t *testing.T) {
})
}
detector := unhanger.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector := jobreaper.New(ctx, wrapDBAuthz(db, log), pubsub, log, tickCh).WithStatsChannel(statsCh)
detector.Start()
tickCh <- now
// Make sure that only unhanger.MaxJobsPerRun jobs are terminated.
// Make sure that only MaxJobsPerRun jobs are terminated.
stats := <-statsCh
require.NoError(t, stats.Error)
require.Len(t, stats.TerminatedJobIDs, unhanger.MaxJobsPerRun)
require.Len(t, stats.TerminatedJobIDs, jobreaper.MaxJobsPerRun)
// Run the detector again and make sure that only the remaining job is
// terminated.
@ -823,7 +1031,7 @@ func TestDetectorMaxJobsPerRun(t *testing.T) {
}
// wrapDBAuthz adds our Authorization/RBAC around the given database store, to
// ensure the unhanger has the right permissions to do its work.
// ensure the reaper has the right permissions to do its work.
func wrapDBAuthz(db database.Store, logger slog.Logger) database.Store {
return dbauthz.New(
db,

View File

@ -65,7 +65,7 @@ const (
SubjectTypeUser SubjectType = "user"
SubjectTypeProvisionerd SubjectType = "provisionerd"
SubjectTypeAutostart SubjectType = "autostart"
SubjectTypeHangDetector SubjectType = "hang_detector"
SubjectTypeJobReaper SubjectType = "job_reaper"
SubjectTypeResourceMonitor SubjectType = "resource_monitor"
SubjectTypeCryptoKeyRotator SubjectType = "crypto_key_rotator"
SubjectTypeCryptoKeyReader SubjectType = "crypto_key_reader"

View File

@ -234,7 +234,9 @@ var (
// ResourceProvisionerJobs
// Valid Actions
// - "ActionCreate" :: create provisioner jobs
// - "ActionRead" :: read provisioner jobs
// - "ActionUpdate" :: update provisioner jobs
ResourceProvisionerJobs = Object{
Type: "provisioner_jobs",
}

View File

@ -182,7 +182,9 @@ var RBACPermissions = map[string]PermissionDefinition{
},
"provisioner_jobs": {
Actions: map[Action]ActionDefinition{
ActionRead: actDef("read provisioner jobs"),
ActionRead: actDef("read provisioner jobs"),
ActionUpdate: actDef("update provisioner jobs"),
ActionCreate: actDef("create provisioner jobs"),
},
},
"organization": {

View File

@ -503,7 +503,7 @@ func ReloadBuiltinRoles(opts *RoleOptions) {
// the ability to create templates and provisioners has
// a lot of overlap.
ResourceProvisionerDaemon.Type: {policy.ActionCreate, policy.ActionRead, policy.ActionUpdate, policy.ActionDelete},
ResourceProvisionerJobs.Type: {policy.ActionRead},
ResourceProvisionerJobs.Type: {policy.ActionRead, policy.ActionUpdate, policy.ActionCreate},
}),
},
User: []Permission{},

View File

@ -580,7 +580,7 @@ func TestRolePermissions(t *testing.T) {
},
{
Name: "ProvisionerJobs",
Actions: []policy.Action{policy.ActionRead},
Actions: []policy.Action{policy.ActionRead, policy.ActionUpdate, policy.ActionCreate},
Resource: rbac.ResourceProvisionerJobs.InOrg(orgID),
AuthorizeMap: map[bool][]hasAuthSubjects{
true: {owner, orgTemplateAdmin, orgAdmin},