feat: integrate Acquirer for provisioner jobs (#9717)

* chore: add Acquirer to provisionerdserver pkg

Signed-off-by: Spike Curtis <spike@coder.com>

* code review improvements & fixes

Signed-off-by: Spike Curtis <spike@coder.com>

* feat: integrate Acquirer for provisioner jobs

Signed-off-by: Spike Curtis <spike@coder.com>

* Fix imports, whitespace

Signed-off-by: Spike Curtis <spike@coder.com>

* provisionerdserver always closes; remove poll interval from playwright

Signed-off-by: Spike Curtis <spike@coder.com>

* post jobs outside transactions

Signed-off-by: Spike Curtis <spike@coder.com>

* graceful shutdown in test

Signed-off-by: Spike Curtis <spike@coder.com>

* Mark AcquireJob deprecated

Signed-off-by: Spike Curtis <spike@coder.com>

* Graceful shutdown on all provisionerd tests

Signed-off-by: Spike Curtis <spike@coder.com>

* Deprecate, not remove CLI flags

Signed-off-by: Spike Curtis <spike@coder.com>

---------

Signed-off-by: Spike Curtis <spike@coder.com>
This commit is contained in:
Spike Curtis
2023-09-19 10:25:57 +04:00
committed by GitHub
parent 6cf531bfef
commit 375c70d141
41 changed files with 1497 additions and 1096 deletions

View File

@ -11,7 +11,6 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
@ -44,16 +43,18 @@ import (
sdkproto "github.com/coder/coder/v2/provisionersdk/proto"
)
var (
lastAcquire time.Time
lastAcquireMutex sync.RWMutex
)
// DefaultAcquireJobLongPollDur is the time the (deprecated) AcquireJob rpc waits to try to obtain a job before
// canceling and returning an empty job.
const DefaultAcquireJobLongPollDur = time.Second * 5
type Options struct {
OIDCConfig httpmw.OAuth2Config
GitAuthConfigs []*gitauth.Config
// TimeNowFn is only used in tests
TimeNowFn func() time.Time
// AcquireJobLongPollDur is used in tests
AcquireJobLongPollDur time.Duration
}
type server struct {
@ -62,9 +63,10 @@ type server struct {
Logger slog.Logger
Provisioners []database.ProvisionerType
GitAuthConfigs []*gitauth.Config
Tags json.RawMessage
Tags Tags
Database database.Store
Pubsub pubsub.Pubsub
Acquirer *Acquirer
Telemetry telemetry.Reporter
Tracer trace.Tracer
QuotaCommitter *atomic.Pointer[proto.QuotaCommitter]
@ -73,10 +75,11 @@ type server struct {
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
DeploymentValues *codersdk.DeploymentValues
AcquireJobDebounce time.Duration
OIDCConfig httpmw.OAuth2Config
OIDCConfig httpmw.OAuth2Config
TimeNowFn func() time.Time
acquireJobLongPollDur time.Duration
}
// We use the null byte (0x00) in generating a canonical map key for tags, so
@ -108,9 +111,10 @@ func NewServer(
id uuid.UUID,
logger slog.Logger,
provisioners []database.ProvisionerType,
tags json.RawMessage,
tags Tags,
db database.Store,
ps pubsub.Pubsub,
acquirer *Acquirer,
tel telemetry.Reporter,
tracer trace.Tracer,
quotaCommitter *atomic.Pointer[proto.QuotaCommitter],
@ -118,7 +122,6 @@ func NewServer(
templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore],
userQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore],
deploymentValues *codersdk.DeploymentValues,
acquireJobDebounce time.Duration,
options Options,
) (proto.DRPCProvisionerDaemonServer, error) {
// Panic early if pointers are nil
@ -137,6 +140,18 @@ func NewServer(
if deploymentValues == nil {
return nil, xerrors.New("deploymentValues is nil")
}
if acquirer == nil {
return nil, xerrors.New("acquirer is nil")
}
if tags == nil {
return nil, xerrors.Errorf("tags is nil")
}
if err := tags.Valid(); err != nil {
return nil, xerrors.Errorf("invalid tags: %w", err)
}
if options.AcquireJobLongPollDur == 0 {
options.AcquireJobLongPollDur = DefaultAcquireJobLongPollDur
}
return &server{
AccessURL: accessURL,
ID: id,
@ -146,6 +161,7 @@ func NewServer(
Tags: tags,
Database: db,
Pubsub: ps,
Acquirer: acquirer,
Telemetry: tel,
Tracer: tracer,
QuotaCommitter: quotaCommitter,
@ -153,9 +169,9 @@ func NewServer(
TemplateScheduleStore: templateScheduleStore,
UserQuietHoursScheduleStore: userQuietHoursScheduleStore,
DeploymentValues: deploymentValues,
AcquireJobDebounce: acquireJobDebounce,
OIDCConfig: options.OIDCConfig,
TimeNowFn: options.TimeNowFn,
acquireJobLongPollDur: options.AcquireJobLongPollDur,
}, nil
}
@ -169,50 +185,119 @@ func (s *server) timeNow() time.Time {
}
// AcquireJob queries the database to lock a job.
//
// Deprecated: This method is only available for back-level provisioner daemons.
func (s *server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.AcquiredJob, error) {
//nolint:gocritic // Provisionerd has specific authz rules.
ctx = dbauthz.AsProvisionerd(ctx)
// This prevents loads of provisioner daemons from consistently
// querying the database when no jobs are available.
//
// The debounce only occurs when no job is returned, so if loads of
// jobs are added at once, they will start after at most this duration.
lastAcquireMutex.RLock()
if !lastAcquire.IsZero() && time.Since(lastAcquire) < s.AcquireJobDebounce {
s.Logger.Debug(ctx, "debounce acquire job", slog.F("debounce", s.AcquireJobDebounce), slog.F("last_acquire", lastAcquire))
lastAcquireMutex.RUnlock()
return &proto.AcquiredJob{}, nil
}
lastAcquireMutex.RUnlock()
// This marks the job as locked in the database.
job, err := s.Database.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{
StartedAt: sql.NullTime{
Time: dbtime.Now(),
Valid: true,
},
WorkerID: uuid.NullUUID{
UUID: s.ID,
Valid: true,
},
Types: s.Provisioners,
Tags: s.Tags,
})
if errors.Is(err, sql.ErrNoRows) {
// The provisioner daemon assumes no jobs are available if
// an empty struct is returned.
lastAcquireMutex.Lock()
lastAcquire = dbtime.Now()
lastAcquireMutex.Unlock()
// Since AcquireJob blocks until a job is available, we set a long (5s by default) timeout. This allows back-level
// provisioner daemons to gracefully shut down within a few seconds, but keeps them from rapidly polling the
// database.
acqCtx, acqCancel := context.WithTimeout(ctx, s.acquireJobLongPollDur)
defer acqCancel()
job, err := s.Acquirer.AcquireJob(acqCtx, s.ID, s.Provisioners, s.Tags)
if xerrors.Is(err, context.DeadlineExceeded) {
s.Logger.Debug(ctx, "successful cancel")
return &proto.AcquiredJob{}, nil
}
if err != nil {
return nil, xerrors.Errorf("acquire job: %w", err)
}
s.Logger.Debug(ctx, "locked job from database", slog.F("job_id", job.ID))
return s.acquireProtoJob(ctx, job)
}
type jobAndErr struct {
job database.ProvisionerJob
err error
}
// AcquireJobWithCancel queries the database to lock a job.
func (s *server) AcquireJobWithCancel(stream proto.DRPCProvisionerDaemon_AcquireJobWithCancelStream) (retErr error) {
//nolint:gocritic // Provisionerd has specific authz rules.
streamCtx := dbauthz.AsProvisionerd(stream.Context())
defer func() {
closeErr := stream.Close()
s.Logger.Debug(streamCtx, "closed stream", slog.Error(closeErr))
if retErr == nil {
retErr = closeErr
}
}()
acqCtx, acqCancel := context.WithCancel(streamCtx)
defer acqCancel()
recvCh := make(chan error, 1)
go func() {
_, err := stream.Recv() // cancel is the only message
recvCh <- err
}()
jec := make(chan jobAndErr, 1)
go func() {
job, err := s.Acquirer.AcquireJob(acqCtx, s.ID, s.Provisioners, s.Tags)
jec <- jobAndErr{job: job, err: err}
}()
var recvErr error
var je jobAndErr
select {
case recvErr = <-recvCh:
acqCancel()
je = <-jec
case je = <-jec:
}
if xerrors.Is(je.err, context.Canceled) {
s.Logger.Debug(streamCtx, "successful cancel")
err := stream.Send(&proto.AcquiredJob{})
if err != nil {
// often this is just because the other side hangs up and doesn't wait for the cancel, so log at INFO
s.Logger.Info(streamCtx, "failed to send empty job", slog.Error(err))
return err
}
return nil
}
if je.err != nil {
return xerrors.Errorf("acquire job: %w", je.err)
}
logger := s.Logger.With(slog.F("job_id", je.job.ID))
logger.Debug(streamCtx, "locked job from database")
if recvErr != nil {
logger.Error(streamCtx, "recv error and failed to cancel acquire job", slog.Error(recvErr))
// Well, this is awkward. We hit an error receiving from the stream, but didn't cancel before we locked a job
// in the database. We need to mark this job as failed so the end user can retry if they want to.
err := s.Database.UpdateProvisionerJobWithCompleteByID(
context.Background(),
database.UpdateProvisionerJobWithCompleteByIDParams{
ID: je.job.ID,
CompletedAt: sql.NullTime{
Time: dbtime.Now(),
Valid: true,
},
Error: sql.NullString{
String: "connection to provisioner daemon broken",
Valid: true,
},
})
if err != nil {
logger.Error(streamCtx, "error updating failed job", slog.Error(err))
}
return recvErr
}
pj, err := s.acquireProtoJob(streamCtx, je.job)
if err != nil {
return err
}
err = stream.Send(pj)
if err != nil {
s.Logger.Error(streamCtx, "failed to send job", slog.Error(err))
return err
}
return nil
}
func (s *server) acquireProtoJob(ctx context.Context, job database.ProvisionerJob) (*proto.AcquiredJob, error) {
// Marks the acquired job as failed with the error message provided.
failJob := func(errorMessage string) error {
err = s.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
err := s.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: job.ID,
CompletedAt: sql.NullTime{
Time: dbtime.Now(),