package runner import ( "bytes" "context" "encoding/json" "errors" "fmt" "reflect" "strings" "sync" "sync/atomic" "time" "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.14.0" "go.opentelemetry.io/otel/trace" "golang.org/x/xerrors" "cdr.dev/slog" "github.com/coder/coder/v2/coderd/tracing" "github.com/coder/coder/v2/coderd/util/ptr" "github.com/coder/coder/v2/provisionerd/proto" sdkproto "github.com/coder/coder/v2/provisionersdk/proto" ) const ( MissingParameterErrorCode = "MISSING_TEMPLATE_PARAMETER" missingParameterErrorText = "missing parameter" RequiredTemplateVariablesErrorCode = "REQUIRED_TEMPLATE_VARIABLES" requiredTemplateVariablesErrorText = "required template variables" ) var errorCodes = map[string]string{ MissingParameterErrorCode: missingParameterErrorText, RequiredTemplateVariablesErrorCode: requiredTemplateVariablesErrorText, } var errUpdateSkipped = xerrors.New("update skipped; job complete or failed") type Runner struct { tracer trace.Tracer metrics Metrics job *proto.AcquiredJob sender JobUpdater quotaCommitter QuotaCommitter logger slog.Logger provisioner sdkproto.DRPCProvisionerClient lastUpdate atomic.Pointer[time.Time] updateInterval time.Duration forceCancelInterval time.Duration logBufferInterval time.Duration // session is the provisioning session with the (possibly remote) provisioner session sdkproto.DRPCProvisioner_SessionClient // closed when the Runner is finished sending any updates/failed/complete. done chan struct{} // active as long as we are not canceled notCanceled context.Context cancel context.CancelFunc // active as long as we haven't been force stopped notStopped context.Context stop context.CancelFunc // mutex controls access to all the following variables. mutex *sync.Mutex // used to wait for the failedJob or completedJob to be populated cond *sync.Cond flushLogsTimer *time.Timer queuedLogs []*proto.Log failedJob *proto.FailedJob completedJob *proto.CompletedJob // setting this false signals that no more messages about this job should be sent. Usually this // means that a terminal message like FailedJob or CompletedJob has been sent, even in the case // of a Cancel(). However, when someone calls Fail() or ForceStop(), we might not send the // terminal message, but okToSend is set to false regardless. okToSend bool } type Metrics struct { ConcurrentJobs *prometheus.GaugeVec NumDaemons prometheus.Gauge // JobTimings also counts the total amount of jobs. JobTimings *prometheus.HistogramVec // WorkspaceBuilds counts workspace build successes and failures. WorkspaceBuilds *prometheus.CounterVec WorkspaceBuildTimings *prometheus.HistogramVec } type JobUpdater interface { UpdateJob(ctx context.Context, in *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) FailJob(ctx context.Context, in *proto.FailedJob) error CompleteJob(ctx context.Context, in *proto.CompletedJob) error } type QuotaCommitter interface { CommitQuota(ctx context.Context, in *proto.CommitQuotaRequest) (*proto.CommitQuotaResponse, error) } type Options struct { Updater JobUpdater QuotaCommitter QuotaCommitter Logger slog.Logger Provisioner sdkproto.DRPCProvisionerClient UpdateInterval time.Duration ForceCancelInterval time.Duration LogDebounceInterval time.Duration Tracer trace.Tracer Metrics Metrics } func New( ctx context.Context, job *proto.AcquiredJob, opts Options, ) *Runner { m := new(sync.Mutex) // we need to create our contexts here in case a call to Cancel() comes immediately. forceStopContext, forceStopFunc := context.WithCancel(ctx) gracefulContext, cancelFunc := context.WithCancel(forceStopContext) logger := opts.Logger.With(slog.F("job_id", job.JobId)) if build := job.GetWorkspaceBuild(); build != nil { logger = logger.With( slog.F("template_name", build.Metadata.TemplateName), slog.F("template_version", build.Metadata.TemplateVersion), slog.F("workspace_build_id", build.WorkspaceBuildId), slog.F("workspace_id", build.Metadata.WorkspaceId), slog.F("workspace_name", build.Metadata.WorkspaceName), slog.F("workspace_owner", build.Metadata.WorkspaceOwner), slog.F("workspace_transition", strings.ToLower(build.Metadata.WorkspaceTransition.String())), ) } return &Runner{ tracer: opts.Tracer, metrics: opts.Metrics, job: job, sender: opts.Updater, quotaCommitter: opts.QuotaCommitter, logger: logger, provisioner: opts.Provisioner, updateInterval: opts.UpdateInterval, forceCancelInterval: opts.ForceCancelInterval, logBufferInterval: opts.LogDebounceInterval, queuedLogs: make([]*proto.Log, 0), mutex: m, cond: sync.NewCond(m), done: make(chan struct{}), okToSend: true, notStopped: forceStopContext, stop: forceStopFunc, notCanceled: gracefulContext, cancel: cancelFunc, } } // Run executes the job. // // the idea here is to run two goroutines to work on the job: doCleanFinish and heartbeat, then use // the `r.cond` to wait until the job is either complete or failed. This function then sends the // complete or failed message --- the exception to this is if something calls Fail() on the Runner; // either something external, like the server getting closed, or the heartbeat goroutine timing out // after attempting to gracefully cancel. If something calls Fail(), then the failure is sent on // that goroutine on the context passed into Fail(), and it marks okToSend false to signal us here // that this function should not also send a terminal message. func (r *Runner) Run() { start := time.Now() ctx, span := r.startTrace(r.notStopped, tracing.FuncName()) defer span.End() concurrentGauge := r.metrics.ConcurrentJobs.WithLabelValues(r.job.Provisioner) concurrentGauge.Inc() defer func() { status := "success" if r.failedJob != nil { status = "failed" } concurrentGauge.Dec() if build := r.job.GetWorkspaceBuild(); build != nil { r.metrics.WorkspaceBuilds.WithLabelValues( build.Metadata.WorkspaceOwner, build.Metadata.WorkspaceName, build.Metadata.TemplateName, build.Metadata.TemplateVersion, build.Metadata.WorkspaceTransition.String(), status, ).Inc() r.metrics.WorkspaceBuildTimings.WithLabelValues( build.Metadata.TemplateName, build.Metadata.TemplateVersion, build.Metadata.WorkspaceTransition.String(), status, ).Observe(time.Since(start).Seconds()) } r.metrics.JobTimings.WithLabelValues(r.job.Provisioner, status).Observe(time.Since(start).Seconds()) }() r.mutex.Lock() defer r.mutex.Unlock() defer r.stop() go r.doCleanFinish(ctx) go r.heartbeatRoutine(ctx) for r.failedJob == nil && r.completedJob == nil { r.cond.Wait() } if !r.okToSend { // nothing else to do. return } if r.failedJob != nil { span.RecordError(xerrors.New(r.failedJob.Error)) span.SetStatus(codes.Error, r.failedJob.Error) r.logger.Debug(ctx, "sending FailedJob") err := r.sender.FailJob(ctx, r.failedJob) if err != nil { r.logger.Error(ctx, "sending FailJob failed", slog.Error(err)) } else { r.logger.Debug(ctx, "sent FailedJob") } } else { r.logger.Debug(ctx, "sending CompletedJob") err := r.sender.CompleteJob(ctx, r.completedJob) if err != nil { r.logger.Error(ctx, "sending CompletedJob failed", slog.Error(err)) err = r.sender.FailJob(ctx, r.failedJobf("internal provisionerserver error: %s", err)) if err != nil { r.logger.Error(ctx, "sending FailJob failed (while CompletedJob)", slog.Error(err)) } } else { r.logger.Debug(ctx, "sent CompletedJob") } } close(r.done) r.okToSend = false } // Cancel initiates a Cancel on the job, but allows it to keep running to do so gracefully. Read from Done() to // be notified when the job completes. func (r *Runner) Cancel() { r.cancel() } func (r *Runner) Done() <-chan struct{} { return r.done } // Fail immediately halts updates and, if the job is not complete sends FailJob to the coder server. Running goroutines // are canceled but complete asynchronously (although they are prevented from further updating the job to the coder // server). The provided context sets how long to keep trying to send the FailJob. func (r *Runner) Fail(ctx context.Context, f *proto.FailedJob) error { f.JobId = r.job.JobId r.mutex.Lock() defer r.mutex.Unlock() if !r.okToSend { return nil // already done } r.Cancel() if r.failedJob == nil { r.failedJob = f r.cond.Signal() } // here we keep the original failed reason if there already was one, but we hadn't yet sent it. It is likely more // informative of the job failing due to some problem running it, whereas this function is used to externally // force a Fail. err := r.sender.FailJob(ctx, r.failedJob) r.okToSend = false r.stop() close(r.done) return err } // setComplete is an internal function to set the job to completed. This does not send the completedJob. func (r *Runner) setComplete(c *proto.CompletedJob) { r.mutex.Lock() defer r.mutex.Unlock() if r.completedJob == nil { r.completedJob = c r.cond.Signal() } } // setFail is an internal function to set the job to failed. This does not send the failedJob. func (r *Runner) setFail(f *proto.FailedJob) { r.mutex.Lock() defer r.mutex.Unlock() if r.failedJob == nil { f.JobId = r.job.JobId r.failedJob = f r.cond.Signal() } } // ForceStop signals all goroutines to stop and prevents any further API calls back to coder server for this job func (r *Runner) ForceStop() { r.mutex.Lock() defer r.mutex.Unlock() r.stop() if !r.okToSend { return } r.okToSend = false close(r.done) // doesn't matter what we put here, since it won't get sent! Just need something to satisfy the condition in // Start() r.failedJob = &proto.FailedJob{} r.cond.Signal() } func (r *Runner) sendHeartbeat(ctx context.Context) (*proto.UpdateJobResponse, error) { ctx, span := r.startTrace(ctx, "updateHeartbeat") defer span.End() r.mutex.Lock() if !r.okToSend { r.mutex.Unlock() return nil, errUpdateSkipped } r.mutex.Unlock() // Skip sending a heartbeat if we've sent an update recently. if lastUpdate := r.lastUpdate.Load(); lastUpdate != nil { if time.Since(*lastUpdate) < r.updateInterval { span.SetAttributes(attribute.Bool("heartbeat_skipped", true)) return &proto.UpdateJobResponse{}, nil } } return r.update(ctx, &proto.UpdateJobRequest{ JobId: r.job.JobId, }) } func (r *Runner) update(ctx context.Context, u *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() defer func() { r.lastUpdate.Store(ptr.Ref(time.Now())) }() span.SetAttributes( attribute.Int64("logs_len", int64(len(u.Logs))), attribute.Int64("template_variables_len", int64(len(u.TemplateVariables))), attribute.Int64("user_variable_values_len", int64(len(u.UserVariableValues))), attribute.Int64("readme_len", int64(len(u.Readme))), ) r.mutex.Lock() defer r.mutex.Unlock() if !r.okToSend { return nil, errUpdateSkipped } return r.sender.UpdateJob(ctx, u) } // doCleanFinish wraps a call to do() with cleaning up the job and setting the terminal messages func (r *Runner) doCleanFinish(ctx context.Context) { var ( failedJob *proto.FailedJob completedJob *proto.CompletedJob ) // push the fail/succeed write onto the defer stack before the cleanup, so // that cleanup happens before this. defer func() { _, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() if failedJob != nil { r.setFail(failedJob) return } r.setComplete(completedJob) }() var err error r.session, err = r.provisioner.Session(ctx) if err != nil { failedJob = r.failedJobf("open session: %s", err) return } defer r.session.Close() defer func() { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER_DAEMON, Level: sdkproto.LogLevel_INFO, Stage: "Cleaning Up", CreatedAt: time.Now().UnixMilli(), }) r.flushQueuedLogs(ctx) }() completedJob, failedJob = r.do(ctx) } // do actually does the work of running the job func (r *Runner) do(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER_DAEMON, Level: sdkproto.LogLevel_INFO, Stage: "Setting up", CreatedAt: time.Now().UnixMilli(), }) switch jobType := r.job.Type.(type) { case *proto.AcquiredJob_TemplateImport_: r.logger.Debug(context.Background(), "acquired job is template import", slog.F("user_variable_values", redactVariableValues(jobType.TemplateImport.UserVariableValues)), ) return r.runTemplateImport(ctx) case *proto.AcquiredJob_TemplateDryRun_: r.logger.Debug(context.Background(), "acquired job is template dry-run", slog.F("workspace_name", jobType.TemplateDryRun.Metadata.WorkspaceName), slog.F("rich_parameter_values", jobType.TemplateDryRun.RichParameterValues), slog.F("variable_values", redactVariableValues(jobType.TemplateDryRun.VariableValues)), ) return r.runTemplateDryRun(ctx) case *proto.AcquiredJob_WorkspaceBuild_: r.logger.Debug(context.Background(), "acquired job is workspace provision", slog.F("workspace_name", jobType.WorkspaceBuild.WorkspaceName), slog.F("state_length", len(jobType.WorkspaceBuild.State)), slog.F("rich_parameter_values", jobType.WorkspaceBuild.RichParameterValues), slog.F("variable_values", redactVariableValues(jobType.WorkspaceBuild.VariableValues)), ) return r.runWorkspaceBuild(ctx) default: return nil, r.failedJobf("unknown job type %q; ensure your provisioner daemon is up-to-date", reflect.TypeOf(r.job.Type).String()) } } func (r *Runner) configure(config *sdkproto.Config) *proto.FailedJob { err := r.session.Send(&sdkproto.Request{Type: &sdkproto.Request_Config{Config: config}}) if err != nil { return r.failedJobf("send config: %s", err) } return nil } // heartbeatRoutine periodically sends updates on the job, which keeps coder server // from assuming the job is stalled, and allows the runner to learn if the job // has been canceled by the user. func (r *Runner) heartbeatRoutine(ctx context.Context) { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() ticker := time.NewTicker(r.updateInterval) defer ticker.Stop() for { select { case <-r.notCanceled.Done(): return case <-ticker.C: } resp, err := r.sendHeartbeat(ctx) if err != nil { // Calling Fail starts cancellation so the process will exit. err = r.Fail(ctx, r.failedJobf("send periodic update: %s", err)) if err != nil { r.logger.Error(ctx, "failed to call FailJob", slog.Error(err)) } return } if !resp.Canceled { ticker.Reset(r.updateInterval) continue } r.logger.Info(ctx, "attempting graceful cancellation") r.Cancel() // Mark the job as failed after a minute of pending cancellation. timer := time.NewTimer(r.forceCancelInterval) select { case <-timer.C: r.logger.Debug(ctx, "cancel timed out") err := r.Fail(ctx, r.failedJobf("Cancel timed out")) if err != nil { r.logger.Warn(ctx, "failed to call FailJob", slog.Error(err)) } return case <-r.Done(): timer.Stop() return case <-r.notStopped.Done(): timer.Stop() return } } } func (r *Runner) runTemplateImport(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() failedJob := r.configure(&sdkproto.Config{ TemplateSourceArchive: r.job.GetTemplateSourceArchive(), }) if failedJob != nil { return nil, failedJob } // Parse parameters and update the job with the parameter specs r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER_DAEMON, Level: sdkproto.LogLevel_INFO, Stage: "Parsing template parameters", CreatedAt: time.Now().UnixMilli(), }) workspaceTags, templateVariables, readme, err := r.runTemplateImportParse(ctx) // TODO workspace_tags if err != nil { return nil, r.failedJobf("run parse: %s", err) } // Once Terraform template variables are parsed, the runner can pass variables // to store in database and filter valid ones. updateResponse, err := r.update(ctx, &proto.UpdateJobRequest{ JobId: r.job.JobId, TemplateVariables: templateVariables, UserVariableValues: r.job.GetTemplateImport().GetUserVariableValues(), Readme: readme, WorkspaceTags: workspaceTags, }) if err != nil { return nil, r.failedJobf("update job: %s", err) } // Determine persistent resources r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER_DAEMON, Level: sdkproto.LogLevel_INFO, Stage: "Detecting persistent resources", CreatedAt: time.Now().UnixMilli(), }) startProvision, err := r.runTemplateImportProvision(ctx, updateResponse.VariableValues, &sdkproto.Metadata{ CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl, WorkspaceOwnerGroups: r.job.GetTemplateImport().Metadata.WorkspaceOwnerGroups, WorkspaceTransition: sdkproto.WorkspaceTransition_START, }, false) if err != nil { return nil, r.failedJobf("template import provision for start: %s", err) } // Determine ephemeral resources. r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER_DAEMON, Level: sdkproto.LogLevel_INFO, Stage: "Detecting ephemeral resources", CreatedAt: time.Now().UnixMilli(), }) stopProvision, err := r.runTemplateImportProvision(ctx, updateResponse.VariableValues, &sdkproto.Metadata{ CoderUrl: r.job.GetTemplateImport().Metadata.CoderUrl, WorkspaceOwnerGroups: r.job.GetTemplateImport().Metadata.WorkspaceOwnerGroups, WorkspaceTransition: sdkproto.WorkspaceTransition_STOP, }, true, // Modules downloaded on the start provision ) if err != nil { return nil, r.failedJobf("template import provision for stop: %s", err) } // For backwards compatibility with older versions of coderd externalAuthProviderNames := make([]string, 0, len(startProvision.ExternalAuthProviders)) for _, it := range startProvision.ExternalAuthProviders { externalAuthProviderNames = append(externalAuthProviderNames, it.Id) } return &proto.CompletedJob{ JobId: r.job.JobId, Type: &proto.CompletedJob_TemplateImport_{ TemplateImport: &proto.CompletedJob_TemplateImport{ StartResources: startProvision.Resources, StopResources: stopProvision.Resources, RichParameters: startProvision.Parameters, ExternalAuthProvidersNames: externalAuthProviderNames, ExternalAuthProviders: startProvision.ExternalAuthProviders, StartModules: startProvision.Modules, StopModules: stopProvision.Modules, Presets: startProvision.Presets, Plan: startProvision.Plan, // ModuleFiles are not on the stopProvision. So grab from the startProvision. ModuleFiles: startProvision.ModuleFiles, // ModuleFileHash will be populated if the file is uploaded async ModuleFilesHash: []byte{}, HasAiTasks: startProvision.HasAITasks, }, }, }, nil } // Parses template variables and README from source. func (r *Runner) runTemplateImportParse(ctx context.Context) ( workspaceTags map[string]string, vars []*sdkproto.TemplateVariable, readme []byte, err error, ) { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() err = r.session.Send(&sdkproto.Request{Type: &sdkproto.Request_Parse{Parse: &sdkproto.ParseRequest{}}}) if err != nil { return nil, nil, nil, xerrors.Errorf("parse source: %w", err) } for { msg, err := r.session.Recv() if err != nil { return nil, nil, nil, xerrors.Errorf("recv parse source: %w", err) } switch msgType := msg.Type.(type) { case *sdkproto.Response_Log: r.logProvisionerJobLog(context.Background(), msgType.Log.Level, "parse job logged", slog.F("level", msgType.Log.Level), slog.F("output", msgType.Log.Output), ) r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER, Level: msgType.Log.Level, CreatedAt: time.Now().UnixMilli(), Output: msgType.Log.Output, Stage: "Parse parameters", }) case *sdkproto.Response_Parse: pc := msgType.Parse r.logger.Debug(context.Background(), "parse complete", slog.F("workspace_tags", pc.WorkspaceTags), slog.F("template_variables", pc.TemplateVariables), slog.F("readme_len", len(pc.Readme)), slog.F("error", pc.Error), ) if pc.Error != "" { return nil, nil, nil, xerrors.Errorf("parse error: %s", pc.Error) } return msgType.Parse.WorkspaceTags, msgType.Parse.TemplateVariables, msgType.Parse.Readme, nil default: return nil, nil, nil, xerrors.Errorf("invalid message type %q received from provisioner", reflect.TypeOf(msg.Type).String()) } } } type templateImportProvision struct { Resources []*sdkproto.Resource Parameters []*sdkproto.RichParameter ExternalAuthProviders []*sdkproto.ExternalAuthProviderResource Modules []*sdkproto.Module Presets []*sdkproto.Preset Plan json.RawMessage ModuleFiles []byte HasAITasks bool } // Performs a dry-run provision when importing a template. // This is used to detect resources that would be provisioned for a workspace in various states. // It doesn't define values for rich parameters as they're unknown during template import. func (r *Runner) runTemplateImportProvision(ctx context.Context, variableValues []*sdkproto.VariableValue, metadata *sdkproto.Metadata, omitModules bool) (*templateImportProvision, error) { return r.runTemplateImportProvisionWithRichParameters(ctx, variableValues, nil, metadata, omitModules) } // Performs a dry-run provision with provided rich parameters. // This is used to detect resources that would be provisioned for a workspace in various states. func (r *Runner) runTemplateImportProvisionWithRichParameters( ctx context.Context, variableValues []*sdkproto.VariableValue, richParameterValues []*sdkproto.RichParameterValue, metadata *sdkproto.Metadata, omitModules bool, ) (*templateImportProvision, error) { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() var stage string switch metadata.WorkspaceTransition { case sdkproto.WorkspaceTransition_START: stage = "Detecting persistent resources" case sdkproto.WorkspaceTransition_STOP: stage = "Detecting ephemeral resources" } // use the notStopped so that if we attempt to gracefully cancel, the stream will still be available for us // to send the cancel to the provisioner err := r.session.Send(&sdkproto.Request{Type: &sdkproto.Request_Plan{Plan: &sdkproto.PlanRequest{ Metadata: metadata, RichParameterValues: richParameterValues, // Template import has no previous values PreviousParameterValues: make([]*sdkproto.RichParameterValue, 0), VariableValues: variableValues, OmitModuleFiles: omitModules, }}}) if err != nil { return nil, xerrors.Errorf("start provision: %w", err) } nevermind := make(chan struct{}) defer close(nevermind) go func() { select { case <-nevermind: return case <-r.notStopped.Done(): return case <-r.notCanceled.Done(): _ = r.session.Send(&sdkproto.Request{ Type: &sdkproto.Request_Cancel{ Cancel: &sdkproto.CancelRequest{}, }, }) } }() var moduleFilesUpload *sdkproto.DataBuilder for { msg, err := r.session.Recv() if err != nil { return nil, xerrors.Errorf("recv import provision: %w", err) } switch msgType := msg.Type.(type) { case *sdkproto.Response_Log: r.logProvisionerJobLog(context.Background(), msgType.Log.Level, "template import provision job logged", slog.F("level", msgType.Log.Level), slog.F("output", msgType.Log.Output), ) r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER, Level: msgType.Log.Level, CreatedAt: time.Now().UnixMilli(), Output: msgType.Log.Output, Stage: stage, }) case *sdkproto.Response_DataUpload: c := msgType.DataUpload if c.UploadType != sdkproto.DataUploadType_UPLOAD_TYPE_MODULE_FILES { return nil, xerrors.Errorf("invalid data upload type: %q", c.UploadType) } if moduleFilesUpload != nil { return nil, xerrors.New("multiple module data uploads received, only expect 1") } moduleFilesUpload, err = sdkproto.NewDataBuilder(c) if err != nil { return nil, xerrors.Errorf("create data builder: %w", err) } case *sdkproto.Response_ChunkPiece: c := msgType.ChunkPiece if moduleFilesUpload == nil { return nil, xerrors.New("received chunk piece before module files data upload") } _, err := moduleFilesUpload.Add(c) if err != nil { return nil, xerrors.Errorf("module files, add chunk piece: %w", err) } case *sdkproto.Response_Plan: c := msgType.Plan if c.Error != "" { r.logger.Info(context.Background(), "dry-run provision failure", slog.F("error", c.Error), ) return nil, xerrors.New(c.Error) } if moduleFilesUpload != nil && len(c.ModuleFiles) > 0 { return nil, xerrors.New("module files were uploaded and module files were returned in the plan response. Only one of these should be set") } r.logger.Info(context.Background(), "parse dry-run provision successful", slog.F("resource_count", len(c.Resources)), slog.F("resources", resourceNames(c.Resources)), ) moduleFilesData := c.ModuleFiles if moduleFilesUpload != nil { uploadData, err := moduleFilesUpload.Complete() if err != nil { return nil, xerrors.Errorf("module files, complete upload: %w", err) } moduleFilesData = uploadData if !bytes.Equal(c.ModuleFilesHash, moduleFilesUpload.Hash) { return nil, xerrors.Errorf("module files hash mismatch, uploaded: %x, expected: %x", moduleFilesUpload.Hash, c.ModuleFilesHash) } } return &templateImportProvision{ Resources: c.Resources, Parameters: c.Parameters, ExternalAuthProviders: c.ExternalAuthProviders, Modules: c.Modules, Presets: c.Presets, Plan: c.Plan, ModuleFiles: moduleFilesData, HasAITasks: c.HasAiTasks, }, nil default: return nil, xerrors.Errorf("invalid message type %q received from provisioner", reflect.TypeOf(msg.Type).String()) } } } func (r *Runner) runTemplateDryRun(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() // Ensure all metadata fields are set as they are all optional for dry-run. metadata := r.job.GetTemplateDryRun().GetMetadata() metadata.WorkspaceTransition = sdkproto.WorkspaceTransition_START if metadata.CoderUrl == "" { metadata.CoderUrl = "http://localhost:3000" } if metadata.WorkspaceName == "" { metadata.WorkspaceName = "dryrun" } metadata.WorkspaceOwner = r.job.UserName if metadata.WorkspaceOwner == "" { metadata.WorkspaceOwner = "dryrunner" } if metadata.WorkspaceId == "" { id, err := uuid.NewRandom() if err != nil { return nil, r.failedJobf("generate random ID: %s", err) } metadata.WorkspaceId = id.String() } if metadata.WorkspaceOwnerId == "" { id, err := uuid.NewRandom() if err != nil { return nil, r.failedJobf("generate random ID: %s", err) } metadata.WorkspaceOwnerId = id.String() } failedJob := r.configure(&sdkproto.Config{ TemplateSourceArchive: r.job.GetTemplateSourceArchive(), }) if failedJob != nil { return nil, failedJob } // Run the template import provision task since it's already a dry run. provision, err := r.runTemplateImportProvisionWithRichParameters(ctx, r.job.GetTemplateDryRun().GetVariableValues(), r.job.GetTemplateDryRun().GetRichParameterValues(), metadata, false, ) if err != nil { return nil, r.failedJobf("run dry-run provision job: %s", err) } return &proto.CompletedJob{ JobId: r.job.JobId, Type: &proto.CompletedJob_TemplateDryRun_{ TemplateDryRun: &proto.CompletedJob_TemplateDryRun{ Resources: provision.Resources, Modules: provision.Modules, }, }, }, nil } func (r *Runner) buildWorkspace(ctx context.Context, stage string, req *sdkproto.Request) ( *sdkproto.Response, *proto.FailedJob, ) { // use the notStopped so that if we attempt to gracefully cancel, the stream // will still be available for us to send the cancel to the provisioner err := r.session.Send(req) if err != nil { return nil, r.failedWorkspaceBuildf("start provision: %s", err) } nevermind := make(chan struct{}) defer close(nevermind) go func() { select { case <-nevermind: return case <-r.notStopped.Done(): return case <-r.notCanceled.Done(): _ = r.session.Send(&sdkproto.Request{ Type: &sdkproto.Request_Cancel{ Cancel: &sdkproto.CancelRequest{}, }, }) } }() for { msg, err := r.session.Recv() if err != nil { return nil, r.failedWorkspaceBuildf("recv workspace provision: %s", err) } switch msgType := msg.Type.(type) { case *sdkproto.Response_Log: r.logProvisionerJobLog(context.Background(), msgType.Log.Level, "workspace provisioner job logged", slog.F("level", msgType.Log.Level), slog.F("output", msgType.Log.Output), slog.F("workspace_build_id", r.job.GetWorkspaceBuild().WorkspaceBuildId), ) r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER, Level: msgType.Log.Level, CreatedAt: time.Now().UnixMilli(), Output: msgType.Log.Output, Stage: stage, }) case *sdkproto.Response_DataUpload: continue // Only for template imports case *sdkproto.Response_ChunkPiece: continue // Only for template imports default: // Stop looping! return msg, nil } } } func (r *Runner) commitQuota(ctx context.Context, resources []*sdkproto.Resource) *proto.FailedJob { cost := sumDailyCost(resources) r.logger.Debug(ctx, "committing quota", slog.F("resources", resourceNames(resources)), slog.F("cost", cost), ) if cost == 0 { return nil } const stage = "Commit quota" resp, err := r.quotaCommitter.CommitQuota(ctx, &proto.CommitQuotaRequest{ JobId: r.job.JobId, // #nosec G115 - Safe conversion as cost is expected to be within int32 range for provisioning costs DailyCost: int32(cost), }) if err != nil { r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER, Level: sdkproto.LogLevel_ERROR, CreatedAt: time.Now().UnixMilli(), Output: fmt.Sprintf("Failed to commit quota: %+v", err), Stage: stage, }) return r.failedJobf("commit quota: %+v", err) } for _, line := range []string{ fmt.Sprintf("Build cost — %v", cost), fmt.Sprintf("Budget — %v", resp.Budget), fmt.Sprintf("Credits consumed — %v", resp.CreditsConsumed), } { r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER, Level: sdkproto.LogLevel_INFO, CreatedAt: time.Now().UnixMilli(), Output: line, Stage: stage, }) } if !resp.Ok { r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER, Level: sdkproto.LogLevel_WARN, CreatedAt: time.Now().UnixMilli(), Output: "This build would exceed your quota. Failing.", Stage: stage, }) return r.failedWorkspaceBuildf("insufficient quota") } return nil } func (r *Runner) runWorkspaceBuild(ctx context.Context) (*proto.CompletedJob, *proto.FailedJob) { ctx, span := r.startTrace(ctx, tracing.FuncName()) defer span.End() var ( applyStage string commitQuota bool ) switch r.job.GetWorkspaceBuild().Metadata.WorkspaceTransition { case sdkproto.WorkspaceTransition_START: applyStage = "Starting workspace" commitQuota = true case sdkproto.WorkspaceTransition_STOP: applyStage = "Stopping workspace" commitQuota = true case sdkproto.WorkspaceTransition_DESTROY: applyStage = "Destroying workspace" } failedJob := r.configure(&sdkproto.Config{ TemplateSourceArchive: r.job.GetTemplateSourceArchive(), State: r.job.GetWorkspaceBuild().State, ProvisionerLogLevel: r.job.GetWorkspaceBuild().LogLevel, }) if failedJob != nil { return nil, failedJob } resp, failed := r.buildWorkspace(ctx, "Planning infrastructure", &sdkproto.Request{ Type: &sdkproto.Request_Plan{ Plan: &sdkproto.PlanRequest{ OmitModuleFiles: true, // Only useful for template imports Metadata: r.job.GetWorkspaceBuild().Metadata, RichParameterValues: r.job.GetWorkspaceBuild().RichParameterValues, PreviousParameterValues: r.job.GetWorkspaceBuild().PreviousParameterValues, VariableValues: r.job.GetWorkspaceBuild().VariableValues, ExternalAuthProviders: r.job.GetWorkspaceBuild().ExternalAuthProviders, }, }, }) if failed != nil { return nil, failed } planComplete := resp.GetPlan() if planComplete == nil { return nil, r.failedWorkspaceBuildf("invalid message type %T received from provisioner", resp.Type) } if planComplete.Error != "" { r.logger.Warn(context.Background(), "plan request failed", slog.F("error", planComplete.Error), ) return nil, &proto.FailedJob{ JobId: r.job.JobId, Error: planComplete.Error, Type: &proto.FailedJob_WorkspaceBuild_{ WorkspaceBuild: &proto.FailedJob_WorkspaceBuild{}, }, } } if len(planComplete.AiTasks) > 1 { return nil, r.failedWorkspaceBuildf("only one 'coder_ai_task' resource can be provisioned per template") } r.logger.Info(context.Background(), "plan request successful", slog.F("resource_count", len(planComplete.Resources)), slog.F("resources", resourceNames(planComplete.Resources)), ) r.flushQueuedLogs(ctx) if commitQuota { failed = r.commitQuota(ctx, planComplete.Resources) r.flushQueuedLogs(ctx) if failed != nil { return nil, failed } } r.queueLog(ctx, &proto.Log{ Source: proto.LogSource_PROVISIONER_DAEMON, Level: sdkproto.LogLevel_INFO, Stage: applyStage, CreatedAt: time.Now().UnixMilli(), }) resp, failed = r.buildWorkspace(ctx, applyStage, &sdkproto.Request{ Type: &sdkproto.Request_Apply{ Apply: &sdkproto.ApplyRequest{ Metadata: r.job.GetWorkspaceBuild().Metadata, }, }, }) if failed != nil { return nil, failed } applyComplete := resp.GetApply() if applyComplete == nil { return nil, r.failedWorkspaceBuildf("invalid message type %T received from provisioner", resp.Type) } // Prepend the plan timings (since they occurred first). applyComplete.Timings = append(planComplete.Timings, applyComplete.Timings...) if applyComplete.Error != "" { r.logger.Warn(context.Background(), "apply failed; updating state", slog.F("error", applyComplete.Error), slog.F("state_len", len(applyComplete.State)), ) return nil, &proto.FailedJob{ JobId: r.job.JobId, Error: applyComplete.Error, Type: &proto.FailedJob_WorkspaceBuild_{ WorkspaceBuild: &proto.FailedJob_WorkspaceBuild{ State: applyComplete.State, Timings: applyComplete.Timings, }, }, } } r.logger.Info(context.Background(), "apply successful", slog.F("resource_count", len(applyComplete.Resources)), slog.F("resources", resourceNames(applyComplete.Resources)), slog.F("state_len", len(applyComplete.State)), ) r.flushQueuedLogs(ctx) return &proto.CompletedJob{ JobId: r.job.JobId, Type: &proto.CompletedJob_WorkspaceBuild_{ WorkspaceBuild: &proto.CompletedJob_WorkspaceBuild{ State: applyComplete.State, Resources: applyComplete.Resources, Timings: applyComplete.Timings, // Modules are created on disk by `terraform init`, and that is only // called by `plan`. `apply` does not modify them, so we can use the // modules from the plan response. Modules: planComplete.Modules, // Resource replacements are discovered at plan time, only. ResourceReplacements: planComplete.ResourceReplacements, AiTasks: applyComplete.AiTasks, }, }, }, nil } func resourceNames(rs []*sdkproto.Resource) []string { var sb strings.Builder names := make([]string, 0, len(rs)) for _, r := range rs { _, _ = sb.WriteString(r.Type) _, _ = sb.WriteString(".") _, _ = sb.WriteString(r.Name) names = append(names, sb.String()) sb.Reset() } return names } func (r *Runner) failedWorkspaceBuildf(format string, args ...interface{}) *proto.FailedJob { failedJob := r.failedJobf(format, args...) failedJob.Type = &proto.FailedJob_WorkspaceBuild_{} return failedJob } func (r *Runner) failedJobf(format string, args ...interface{}) *proto.FailedJob { message := fmt.Sprintf(format, args...) var code string for c, m := range errorCodes { if strings.Contains(message, m) { code = c break } } return &proto.FailedJob{ JobId: r.job.JobId, Error: message, ErrorCode: code, } } func (r *Runner) startTrace(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) { return r.tracer.Start(ctx, name, append(opts, trace.WithAttributes( semconv.ServiceNameKey.String("coderd.provisionerd"), attribute.String("job_id", r.job.JobId), ))...) } // queueLog adds a log to the buffer and debounces a timer // if one exists to flush the logs. It stores a maximum of // 100 log lines before flushing as a safe-guard mechanism. func (r *Runner) queueLog(ctx context.Context, log *proto.Log) { r.mutex.Lock() defer r.mutex.Unlock() r.queuedLogs = append(r.queuedLogs, log) if r.flushLogsTimer != nil { r.flushLogsTimer.Reset(r.logBufferInterval) return } // This can be configurable if there are a ton of logs. if len(r.queuedLogs) > 100 { // Flushing logs requires a lock, so this can happen async. go r.flushQueuedLogs(ctx) return } r.flushLogsTimer = time.AfterFunc(r.logBufferInterval, func() { r.flushQueuedLogs(ctx) }) } func (r *Runner) flushQueuedLogs(ctx context.Context) { r.mutex.Lock() if r.flushLogsTimer != nil { r.flushLogsTimer.Stop() } logs := r.queuedLogs r.queuedLogs = make([]*proto.Log, 0) r.mutex.Unlock() _, err := r.update(ctx, &proto.UpdateJobRequest{ JobId: r.job.JobId, Logs: logs, }) if err != nil { if errors.Is(err, errUpdateSkipped) { return } r.logger.Error(ctx, "flush queued logs", slog.Error(err)) } } func redactVariableValues(variableValues []*sdkproto.VariableValue) []*sdkproto.VariableValue { var redacted []*sdkproto.VariableValue for _, v := range variableValues { if v.Sensitive { redacted = append(redacted, &sdkproto.VariableValue{ Name: v.Name, Value: "*redacted*", Sensitive: true, }) continue } redacted = append(redacted, v) } return redacted } // logProvisionerJobLog logs a message from the provisioner daemon at the appropriate level. func (r *Runner) logProvisionerJobLog(ctx context.Context, logLevel sdkproto.LogLevel, msg string, fields ...any) { switch logLevel { case sdkproto.LogLevel_TRACE: r.logger.Debug(ctx, msg, fields...) // There's no trace, so we'll just use debug. case sdkproto.LogLevel_DEBUG: r.logger.Debug(ctx, msg, fields...) case sdkproto.LogLevel_INFO: r.logger.Info(ctx, msg, fields...) case sdkproto.LogLevel_WARN: r.logger.Warn(ctx, msg, fields...) case sdkproto.LogLevel_ERROR: r.logger.Error(ctx, msg, fields...) default: // should never happen, but we should not explode either. r.logger.Info(ctx, msg, fields...) } }