fix(provisionerd): only heartbeat when logs aren't being flushed (#7110)

This commit is contained in:
Colin Adler
2023-04-13 14:02:10 -05:00
committed by GitHub
parent f5a8a27714
commit 085330ad96
4 changed files with 76 additions and 33 deletions

View File

@ -1214,7 +1214,7 @@ func newProvisionerDaemon(
JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value(), JobPollInterval: cfg.Provisioner.DaemonPollInterval.Value(),
JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value(), JobPollJitter: cfg.Provisioner.DaemonPollJitter.Value(),
JobPollDebounce: debounce, JobPollDebounce: debounce,
UpdateInterval: 500 * time.Millisecond, UpdateInterval: time.Second,
ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value(), ForceCancelInterval: cfg.Provisioner.ForceCancelInterval.Value(),
Provisioners: provisioners, Provisioners: provisioners,
WorkDirectory: tempDir, WorkDirectory: tempDir,

View File

@ -814,7 +814,7 @@ func Run(t *testing.T, factory DeploymentFactory) {
// Create workspace. // Create workspace.
port := appServer(t) port := appServer(t)
workspace, agnt = createWorkspaceWithApps(t, client, user.OrganizationIDs[0], user, proxyTestSubdomainRaw, port) workspace, _ = createWorkspaceWithApps(t, client, user.OrganizationIDs[0], user, proxyTestSubdomainRaw, port)
// Verify that the apps have the correct sharing levels set. // Verify that the apps have the correct sharing levels set.
workspaceBuild, err := client.WorkspaceBuild(ctx, workspace.LatestBuild.ID) workspaceBuild, err := client.WorkspaceBuild(ctx, workspace.LatestBuild.ID)

View File

@ -22,6 +22,7 @@ import (
"cdr.dev/slog" "cdr.dev/slog"
"github.com/coder/coder/coderd/tracing" "github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/coderd/util/ptr"
"github.com/coder/coder/cryptorand" "github.com/coder/coder/cryptorand"
"github.com/coder/coder/provisionerd/proto" "github.com/coder/coder/provisionerd/proto"
"github.com/coder/coder/provisionerd/runner" "github.com/coder/coder/provisionerd/runner"
@ -77,7 +78,7 @@ func New(clientDialer Dialer, opts *Options) *Server {
opts.ForceCancelInterval = 10 * time.Minute opts.ForceCancelInterval = 10 * time.Minute
} }
if opts.LogBufferInterval == 0 { if opts.LogBufferInterval == 0 {
opts.LogBufferInterval = 50 * time.Millisecond opts.LogBufferInterval = 250 * time.Millisecond
} }
if opts.Filesystem == nil { if opts.Filesystem == nil {
opts.Filesystem = afero.NewOsFs() opts.Filesystem = afero.NewOsFs()
@ -113,7 +114,7 @@ type Server struct {
tracer trace.Tracer tracer trace.Tracer
clientDialer Dialer clientDialer Dialer
clientValue atomic.Value clientValue atomic.Pointer[proto.DRPCProvisionerDaemonClient]
// Locked when closing the daemon, shutting down, or starting a new job. // Locked when closing the daemon, shutting down, or starting a new job.
mutex sync.Mutex mutex sync.Mutex
@ -194,7 +195,7 @@ func (p *Server) connect(ctx context.Context) {
p.mutex.Unlock() p.mutex.Unlock()
break break
} }
p.clientValue.Store(client) p.clientValue.Store(ptr.Ref(client))
p.mutex.Unlock() p.mutex.Unlock()
p.opts.Logger.Debug(context.Background(), "connected") p.opts.Logger.Debug(context.Background(), "connected")
@ -260,12 +261,11 @@ func (p *Server) nextInterval() time.Duration {
} }
func (p *Server) client() (proto.DRPCProvisionerDaemonClient, bool) { func (p *Server) client() (proto.DRPCProvisionerDaemonClient, bool) {
rawClient := p.clientValue.Load() client := p.clientValue.Load()
if rawClient == nil { if client == nil {
return nil, false return nil, false
} }
client, ok := rawClient.(proto.DRPCProvisionerDaemonClient) return *client, true
return client, ok
} }
// isRunningJob returns true if a job is running. Caller must hold the mutex. // isRunningJob returns true if a job is running. Caller must hold the mutex.
@ -417,14 +417,15 @@ func retryable(err error) bool {
xerrors.Is(err, context.Canceled) xerrors.Is(err, context.Canceled)
} }
// clientDoWithRetries runs the function f with a client, and retries with backoff until either the error returned // clientDoWithRetries runs the function f with a client, and retries with
// is not retryable() or the context expires. // backoff until either the error returned is not retryable() or the context
func (p *Server) clientDoWithRetries( // expires.
ctx context.Context, f func(context.Context, proto.DRPCProvisionerDaemonClient) (any, error)) ( func clientDoWithRetries[T any](ctx context.Context,
any, error, getClient func() (proto.DRPCProvisionerDaemonClient, bool),
) { f func(context.Context, proto.DRPCProvisionerDaemonClient) (T, error),
) (ret T, _ error) {
for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(ctx); { for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(ctx); {
client, ok := p.client() client, ok := getClient()
if !ok { if !ok {
continue continue
} }
@ -434,40 +435,38 @@ func (p *Server) clientDoWithRetries(
} }
return resp, err return resp, err
} }
return nil, ctx.Err() return ret, ctx.Err()
} }
func (p *Server) CommitQuota(ctx context.Context, in *proto.CommitQuotaRequest) (*proto.CommitQuotaResponse, error) { func (p *Server) CommitQuota(ctx context.Context, in *proto.CommitQuotaRequest) (*proto.CommitQuotaResponse, error) {
out, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) { out, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.CommitQuotaResponse, error) {
return client.CommitQuota(ctx, in) return client.CommitQuota(ctx, in)
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
// nolint: forcetypeassert return out, nil
return out.(*proto.CommitQuotaResponse), nil
} }
func (p *Server) UpdateJob(ctx context.Context, in *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) { func (p *Server) UpdateJob(ctx context.Context, in *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
out, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) { out, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.UpdateJobResponse, error) {
return client.UpdateJob(ctx, in) return client.UpdateJob(ctx, in)
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
// nolint: forcetypeassert return out, nil
return out.(*proto.UpdateJobResponse), nil
} }
func (p *Server) FailJob(ctx context.Context, in *proto.FailedJob) error { func (p *Server) FailJob(ctx context.Context, in *proto.FailedJob) error {
_, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) { _, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
return client.FailJob(ctx, in) return client.FailJob(ctx, in)
}) })
return err return err
} }
func (p *Server) CompleteJob(ctx context.Context, in *proto.CompletedJob) error { func (p *Server) CompleteJob(ctx context.Context, in *proto.CompletedJob) error {
_, err := p.clientDoWithRetries(ctx, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (any, error) { _, err := clientDoWithRetries(ctx, p.client, func(ctx context.Context, client proto.DRPCProvisionerDaemonClient) (*proto.Empty, error) {
return client.CompleteJob(ctx, in) return client.CompleteJob(ctx, in)
}) })
return err return err
@ -552,7 +551,7 @@ func (p *Server) closeWithError(err error) error {
p.opts.Logger.Debug(context.Background(), "closing server with error", slog.Error(err)) p.opts.Logger.Debug(context.Background(), "closing server with error", slog.Error(err))
if c, ok := p.clientValue.Load().(proto.DRPCProvisionerDaemonClient); ok { if c, ok := p.client(); ok {
_ = c.DRPCConn().Close() _ = c.DRPCConn().Close()
} }

View File

@ -13,11 +13,13 @@ import (
"reflect" "reflect"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero" "github.com/spf13/afero"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.14.0" semconv "go.opentelemetry.io/otel/semconv/v1.14.0"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@ -25,6 +27,7 @@ import (
"cdr.dev/slog" "cdr.dev/slog"
"github.com/coder/coder/coderd/tracing" "github.com/coder/coder/coderd/tracing"
"github.com/coder/coder/coderd/util/ptr"
"github.com/coder/coder/provisionerd/proto" "github.com/coder/coder/provisionerd/proto"
sdkproto "github.com/coder/coder/provisionersdk/proto" sdkproto "github.com/coder/coder/provisionersdk/proto"
) )
@ -54,6 +57,7 @@ type Runner struct {
filesystem afero.Fs filesystem afero.Fs
workDirectory string workDirectory string
provisioner sdkproto.DRPCProvisionerClient provisioner sdkproto.DRPCProvisionerClient
lastUpdate atomic.Pointer[time.Time]
updateInterval time.Duration updateInterval time.Duration
forceCancelInterval time.Duration forceCancelInterval time.Duration
logBufferInterval time.Duration logBufferInterval time.Duration
@ -203,7 +207,7 @@ func (r *Runner) Run() {
defer r.stop() defer r.stop()
go r.doCleanFinish(ctx) go r.doCleanFinish(ctx)
go r.heartbeat(ctx) go r.heartbeatRoutine(ctx)
for r.failedJob == nil && r.completedJob == nil { for r.failedJob == nil && r.completedJob == nil {
r.cond.Wait() r.cond.Wait()
} }
@ -307,15 +311,51 @@ func (r *Runner) ForceStop() {
r.cond.Signal() r.cond.Signal()
} }
func (r *Runner) sendHeartbeat(ctx context.Context) (*proto.UpdateJobResponse, error) {
ctx, span := r.startTrace(ctx, "updateHeartbeat")
defer span.End()
r.mutex.Lock()
if !r.okToSend {
r.mutex.Unlock()
return nil, errUpdateSkipped
}
r.mutex.Unlock()
// Skip sending a heartbeat if we've sent an update recently.
if lastUpdate := r.lastUpdate.Load(); lastUpdate != nil {
if time.Since(*lastUpdate) < r.updateInterval {
span.SetAttributes(attribute.Bool("heartbeat_skipped", true))
return &proto.UpdateJobResponse{}, nil
}
}
return r.update(ctx, &proto.UpdateJobRequest{
JobId: r.job.JobId,
})
}
func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) { func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
ctx, span := r.startTrace(ctx, tracing.FuncName()) ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End() defer span.End()
defer func() {
r.lastUpdate.Store(ptr.Ref(time.Now()))
}()
span.SetAttributes(
attribute.Int64("logs_len", int64(len(u.Logs))),
attribute.Int64("parameter_schemas_len", int64(len(u.ParameterSchemas))),
attribute.Int64("template_variables_len", int64(len(u.TemplateVariables))),
attribute.Int64("user_variable_values_len", int64(len(u.UserVariableValues))),
attribute.Int64("readme_len", int64(len(u.Readme))),
)
r.mutex.Lock() r.mutex.Lock()
defer r.mutex.Unlock() defer r.mutex.Unlock()
if !r.okToSend { if !r.okToSend {
return nil, errUpdateSkipped return nil, errUpdateSkipped
} }
return r.sender.UpdateJob(ctx, u) return r.sender.UpdateJob(ctx, u)
} }
@ -480,9 +520,13 @@ func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob)
} }
} }
// heartbeat periodically sends updates on the job, which keeps coder server from assuming the job // heartbeatRoutine periodically sends updates on the job, which keeps coder server
// is stalled, and allows the runner to learn if the job has been canceled by the user. // from assuming the job is stalled, and allows the runner to learn if the job
func (r *Runner) heartbeat(ctx context.Context) { // has been canceled by the user.
func (r *Runner) heartbeatRoutine(ctx context.Context) {
ctx, span := r.startTrace(ctx, tracing.FuncName())
defer span.End()
ticker := time.NewTicker(r.updateInterval) ticker := time.NewTicker(r.updateInterval)
defer ticker.Stop() defer ticker.Stop()
@ -493,9 +537,7 @@ func (r *Runner) heartbeat(ctx context.Context) {
case <-ticker.C: case <-ticker.C:
} }
resp, err := r.update(ctx, &proto.UpdateJobRequest{ resp, err := r.sendHeartbeat(ctx)
JobId: r.job.JobId,
})
if err != nil { if err != nil {
err = r.Fail(ctx, r.failedJobf("send periodic update: %s", err)) err = r.Fail(ctx, r.failedJobf("send periodic update: %s", err))
if err != nil { if err != nil {
@ -504,6 +546,7 @@ func (r *Runner) heartbeat(ctx context.Context) {
return return
} }
if !resp.Canceled { if !resp.Canceled {
ticker.Reset(r.updateInterval)
continue continue
} }
r.logger.Info(ctx, "attempting graceful cancelation") r.logger.Info(ctx, "attempting graceful cancelation")
@ -1079,6 +1122,7 @@ func (r *Runner) failedJobf(format string, args ...interface{}) *proto.FailedJob
func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return r.tracer.Start(ctx, name, append(opts, trace.WithAttributes( return r.tracer.Start(ctx, name, append(opts, trace.WithAttributes(
semconv.ServiceNameKey.String("coderd.provisionerd"), semconv.ServiceNameKey.String("coderd.provisionerd"),
attribute.String("job_id", r.job.JobId),
))...) ))...)
} }