mirror of
https://github.com/coder/coder.git
synced 2025-07-12 00:14:10 +00:00
947 lines
25 KiB
Go
947 lines
25 KiB
Go
package provisionerd
|
|
|
|
import (
|
|
"archive/tar"
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/yamux"
|
|
"go.uber.org/atomic"
|
|
"golang.org/x/xerrors"
|
|
|
|
"cdr.dev/slog"
|
|
"github.com/coder/coder/provisionerd/proto"
|
|
sdkproto "github.com/coder/coder/provisionersdk/proto"
|
|
"github.com/coder/retry"
|
|
)
|
|
|
|
const (
|
|
missingParameterErrorText = "missing parameter"
|
|
)
|
|
|
|
// IsMissingParameterError returns whether the error message provided
|
|
// is a missing parameter error. This can indicate to consumers that
|
|
// they should check parameters.
|
|
func IsMissingParameterError(err string) bool {
|
|
return strings.Contains(err, missingParameterErrorText)
|
|
}
|
|
|
|
// Dialer represents the function to create a daemon client connection.
|
|
type Dialer func(ctx context.Context) (proto.DRPCProvisionerDaemonClient, error)
|
|
|
|
// Provisioners maps provisioner ID to implementation.
|
|
type Provisioners map[string]sdkproto.DRPCProvisionerClient
|
|
|
|
// Options provides customizations to the behavior of a provisioner daemon.
|
|
type Options struct {
|
|
Logger slog.Logger
|
|
|
|
ForceCancelInterval time.Duration
|
|
UpdateInterval time.Duration
|
|
PollInterval time.Duration
|
|
Provisioners Provisioners
|
|
WorkDirectory string
|
|
}
|
|
|
|
// New creates and starts a provisioner daemon.
|
|
func New(clientDialer Dialer, opts *Options) *Server {
|
|
if opts.PollInterval == 0 {
|
|
opts.PollInterval = 5 * time.Second
|
|
}
|
|
if opts.UpdateInterval == 0 {
|
|
opts.UpdateInterval = 5 * time.Second
|
|
}
|
|
if opts.ForceCancelInterval == 0 {
|
|
opts.ForceCancelInterval = time.Minute
|
|
}
|
|
ctx, ctxCancel := context.WithCancel(context.Background())
|
|
daemon := &Server{
|
|
clientDialer: clientDialer,
|
|
opts: opts,
|
|
|
|
closeContext: ctx,
|
|
closeCancel: ctxCancel,
|
|
|
|
shutdown: make(chan struct{}),
|
|
|
|
jobRunning: make(chan struct{}),
|
|
jobFailed: *atomic.NewBool(true),
|
|
}
|
|
// Start off with a closed channel so
|
|
// isRunningJob() returns properly.
|
|
close(daemon.jobRunning)
|
|
go daemon.connect(ctx)
|
|
return daemon
|
|
}
|
|
|
|
type Server struct {
|
|
opts *Options
|
|
|
|
clientDialer Dialer
|
|
clientValue atomic.Value
|
|
|
|
// Locked when closing the daemon.
|
|
closeMutex sync.Mutex
|
|
closeContext context.Context
|
|
closeCancel context.CancelFunc
|
|
closeError error
|
|
|
|
shutdownMutex sync.Mutex
|
|
shutdown chan struct{}
|
|
|
|
// Locked when acquiring or failing a job.
|
|
jobMutex sync.Mutex
|
|
jobID string
|
|
jobRunningMutex sync.Mutex
|
|
jobRunning chan struct{}
|
|
jobFailed atomic.Bool
|
|
jobCancel context.CancelFunc
|
|
}
|
|
|
|
// Connect establishes a connection to coderd.
|
|
func (p *Server) connect(ctx context.Context) {
|
|
// An exponential back-off occurs when the connection is failing to dial.
|
|
// This is to prevent server spam in case of a coderd outage.
|
|
for retrier := retry.New(50*time.Millisecond, 10*time.Second); retrier.Wait(ctx); {
|
|
client, err := p.clientDialer(ctx)
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
if p.isClosed() {
|
|
return
|
|
}
|
|
p.opts.Logger.Warn(context.Background(), "failed to dial", slog.Error(err))
|
|
continue
|
|
}
|
|
p.clientValue.Store(client)
|
|
p.opts.Logger.Debug(context.Background(), "connected")
|
|
break
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
go func() {
|
|
if p.isClosed() {
|
|
return
|
|
}
|
|
client, ok := p.client()
|
|
if !ok {
|
|
return
|
|
}
|
|
select {
|
|
case <-p.closeContext.Done():
|
|
return
|
|
case <-client.DRPCConn().Closed():
|
|
// We use the update stream to detect when the connection
|
|
// has been interrupted. This works well, because logs need
|
|
// to buffer if a job is running in the background.
|
|
p.opts.Logger.Debug(context.Background(), "client stream ended")
|
|
p.connect(ctx)
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
if p.isClosed() {
|
|
return
|
|
}
|
|
ticker := time.NewTicker(p.opts.PollInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
client, ok := p.client()
|
|
if !ok {
|
|
return
|
|
}
|
|
select {
|
|
case <-p.closeContext.Done():
|
|
return
|
|
case <-client.DRPCConn().Closed():
|
|
return
|
|
case <-ticker.C:
|
|
p.acquireJob(ctx)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (p *Server) client() (proto.DRPCProvisionerDaemonClient, bool) {
|
|
rawClient := p.clientValue.Load()
|
|
if rawClient == nil {
|
|
return nil, false
|
|
}
|
|
client, ok := rawClient.(proto.DRPCProvisionerDaemonClient)
|
|
return client, ok
|
|
}
|
|
|
|
func (p *Server) isRunningJob() bool {
|
|
select {
|
|
case <-p.jobRunning:
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
// Locks a job in the database, and runs it!
|
|
func (p *Server) acquireJob(ctx context.Context) {
|
|
p.jobMutex.Lock()
|
|
defer p.jobMutex.Unlock()
|
|
if p.isClosed() {
|
|
return
|
|
}
|
|
if p.isRunningJob() {
|
|
p.opts.Logger.Debug(context.Background(), "skipping acquire; job is already running")
|
|
return
|
|
}
|
|
if p.isShutdown() {
|
|
p.opts.Logger.Debug(context.Background(), "skipping acquire; provisionerd is shutting down...")
|
|
return
|
|
}
|
|
var err error
|
|
client, ok := p.client()
|
|
if !ok {
|
|
return
|
|
}
|
|
job, err := client.AcquireJob(ctx, &proto.Empty{})
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
if errors.Is(err, yamux.ErrSessionShutdown) {
|
|
return
|
|
}
|
|
p.opts.Logger.Warn(context.Background(), "acquire job", slog.Error(err))
|
|
return
|
|
}
|
|
if job.JobId == "" {
|
|
return
|
|
}
|
|
ctx, p.jobCancel = context.WithCancel(ctx)
|
|
p.jobRunningMutex.Lock()
|
|
p.jobRunning = make(chan struct{})
|
|
p.jobRunningMutex.Unlock()
|
|
p.jobFailed.Store(false)
|
|
p.jobID = job.JobId
|
|
|
|
p.opts.Logger.Info(context.Background(), "acquired job",
|
|
slog.F("initiator_username", job.UserName),
|
|
slog.F("provisioner", job.Provisioner),
|
|
slog.F("id", job.JobId),
|
|
)
|
|
|
|
go p.runJob(ctx, job)
|
|
}
|
|
|
|
func (p *Server) runJob(ctx context.Context, job *proto.AcquiredJob) {
|
|
shutdown, shutdownCancel := context.WithCancel(ctx)
|
|
defer shutdownCancel()
|
|
|
|
complete, completeCancel := context.WithCancel(ctx)
|
|
defer completeCancel()
|
|
go func() {
|
|
ticker := time.NewTicker(p.opts.UpdateInterval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-p.closeContext.Done():
|
|
return
|
|
case <-ctx.Done():
|
|
return
|
|
case <-complete.Done():
|
|
return
|
|
case <-p.shutdown:
|
|
p.opts.Logger.Info(ctx, "attempting graceful cancelation")
|
|
shutdownCancel()
|
|
return
|
|
case <-ticker.C:
|
|
}
|
|
client, ok := p.client()
|
|
if !ok {
|
|
continue
|
|
}
|
|
resp, err := client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.JobId,
|
|
})
|
|
if errors.Is(err, yamux.ErrSessionShutdown) || errors.Is(err, io.EOF) {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
p.failActiveJobf("send periodic update: %s", err)
|
|
return
|
|
}
|
|
if !resp.Canceled {
|
|
continue
|
|
}
|
|
p.opts.Logger.Info(ctx, "attempting graceful cancelation")
|
|
shutdownCancel()
|
|
// Hard-cancel the job after a minute of pending cancelation.
|
|
timer := time.NewTimer(p.opts.ForceCancelInterval)
|
|
select {
|
|
case <-timer.C:
|
|
p.failActiveJobf("cancelation timed out")
|
|
return
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
defer func() {
|
|
// Cleanup the work directory after execution.
|
|
for attempt := 0; attempt < 5; attempt++ {
|
|
err := os.RemoveAll(p.opts.WorkDirectory)
|
|
if err != nil {
|
|
// On Windows, open files cannot be removed.
|
|
// When the provisioner daemon is shutting down,
|
|
// it may take a few milliseconds for processes to exit.
|
|
// See: https://github.com/golang/go/issues/50510
|
|
p.opts.Logger.Debug(ctx, "failed to clean work directory; trying again", slog.Error(err))
|
|
time.Sleep(250 * time.Millisecond)
|
|
continue
|
|
}
|
|
p.opts.Logger.Debug(ctx, "cleaned up work directory", slog.Error(err))
|
|
break
|
|
}
|
|
|
|
close(p.jobRunning)
|
|
}()
|
|
// It's safe to cast this ProvisionerType. This data is coming directly from coderd.
|
|
provisioner, hasProvisioner := p.opts.Provisioners[job.Provisioner]
|
|
if !hasProvisioner {
|
|
p.failActiveJobf("provisioner %q not registered", job.Provisioner)
|
|
return
|
|
}
|
|
|
|
err := os.MkdirAll(p.opts.WorkDirectory, 0700)
|
|
if err != nil {
|
|
p.failActiveJobf("create work directory %q: %s", p.opts.WorkDirectory, err)
|
|
return
|
|
}
|
|
|
|
client, ok := p.client()
|
|
if !ok {
|
|
p.failActiveJobf("client disconnected")
|
|
return
|
|
}
|
|
_, err = client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.GetJobId(),
|
|
Logs: []*proto.Log{{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: "Setting up",
|
|
CreatedAt: time.Now().UTC().UnixMilli(),
|
|
}},
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("write log: %s", err)
|
|
return
|
|
}
|
|
|
|
p.opts.Logger.Info(ctx, "unpacking template source archive", slog.F("size_bytes", len(job.TemplateSourceArchive)))
|
|
reader := tar.NewReader(bytes.NewBuffer(job.TemplateSourceArchive))
|
|
for {
|
|
header, err := reader.Next()
|
|
if errors.Is(err, io.EOF) {
|
|
break
|
|
}
|
|
if err != nil {
|
|
p.failActiveJobf("read template source archive: %s", err)
|
|
return
|
|
}
|
|
// #nosec
|
|
path := filepath.Join(p.opts.WorkDirectory, header.Name)
|
|
if !strings.HasPrefix(path, filepath.Clean(p.opts.WorkDirectory)) {
|
|
p.failActiveJobf("tar attempts to target relative upper directory")
|
|
return
|
|
}
|
|
mode := header.FileInfo().Mode()
|
|
if mode == 0 {
|
|
mode = 0600
|
|
}
|
|
switch header.Typeflag {
|
|
case tar.TypeDir:
|
|
err = os.MkdirAll(path, mode)
|
|
if err != nil {
|
|
p.failActiveJobf("mkdir %q: %s", path, err)
|
|
return
|
|
}
|
|
p.opts.Logger.Debug(context.Background(), "extracted directory", slog.F("path", path))
|
|
case tar.TypeReg:
|
|
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, mode)
|
|
if err != nil {
|
|
p.failActiveJobf("create file %q (mode %s): %s", path, mode, err)
|
|
return
|
|
}
|
|
// Max file size of 10MB.
|
|
size, err := io.CopyN(file, reader, (1<<20)*10)
|
|
if errors.Is(err, io.EOF) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
_ = file.Close()
|
|
p.failActiveJobf("copy file %q: %s", path, err)
|
|
return
|
|
}
|
|
err = file.Close()
|
|
if err != nil {
|
|
p.failActiveJobf("close file %q: %s", path, err)
|
|
return
|
|
}
|
|
p.opts.Logger.Debug(context.Background(), "extracted file",
|
|
slog.F("size_bytes", size),
|
|
slog.F("path", path),
|
|
slog.F("mode", mode),
|
|
)
|
|
}
|
|
}
|
|
|
|
switch jobType := job.Type.(type) {
|
|
case *proto.AcquiredJob_TemplateImport_:
|
|
p.opts.Logger.Debug(context.Background(), "acquired job is template import")
|
|
|
|
p.runTemplateImport(ctx, shutdown, provisioner, job)
|
|
case *proto.AcquiredJob_WorkspaceBuild_:
|
|
p.opts.Logger.Debug(context.Background(), "acquired job is workspace provision",
|
|
slog.F("workspace_name", jobType.WorkspaceBuild.WorkspaceName),
|
|
slog.F("state_length", len(jobType.WorkspaceBuild.State)),
|
|
slog.F("parameters", jobType.WorkspaceBuild.ParameterValues),
|
|
)
|
|
|
|
p.runWorkspaceBuild(ctx, shutdown, provisioner, job)
|
|
default:
|
|
p.failActiveJobf("unknown job type %q; ensure your provisioner daemon is up-to-date", reflect.TypeOf(job.Type).String())
|
|
return
|
|
}
|
|
|
|
client, ok = p.client()
|
|
if !ok {
|
|
return
|
|
}
|
|
// Ensure the job is still running to output.
|
|
// It's possible the job has failed.
|
|
if p.isRunningJob() {
|
|
_, err = client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.GetJobId(),
|
|
Logs: []*proto.Log{{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: "Cleaning Up",
|
|
CreatedAt: time.Now().UTC().UnixMilli(),
|
|
}},
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("write log: %s", err)
|
|
return
|
|
}
|
|
|
|
p.opts.Logger.Info(context.Background(), "completed job", slog.F("id", job.JobId))
|
|
}
|
|
}
|
|
|
|
func (p *Server) runTemplateImport(ctx, shutdown context.Context, provisioner sdkproto.DRPCProvisionerClient, job *proto.AcquiredJob) {
|
|
client, ok := p.client()
|
|
if !ok {
|
|
p.failActiveJobf("client disconnected")
|
|
return
|
|
}
|
|
_, err := client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.GetJobId(),
|
|
Logs: []*proto.Log{{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: "Parse parameters",
|
|
CreatedAt: time.Now().UTC().UnixMilli(),
|
|
}},
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("write log: %s", err)
|
|
return
|
|
}
|
|
|
|
parameterSchemas, err := p.runTemplateImportParse(ctx, provisioner, job)
|
|
if err != nil {
|
|
p.failActiveJobf("run parse: %s", err)
|
|
return
|
|
}
|
|
|
|
updateResponse, err := client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.JobId,
|
|
ParameterSchemas: parameterSchemas,
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("update job: %s", err)
|
|
return
|
|
}
|
|
|
|
valueByName := map[string]*sdkproto.ParameterValue{}
|
|
for _, parameterValue := range updateResponse.ParameterValues {
|
|
valueByName[parameterValue.Name] = parameterValue
|
|
}
|
|
for _, parameterSchema := range parameterSchemas {
|
|
_, ok := valueByName[parameterSchema.Name]
|
|
if !ok {
|
|
p.failActiveJobf("%s: %s", missingParameterErrorText, parameterSchema.Name)
|
|
return
|
|
}
|
|
}
|
|
|
|
_, err = client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.GetJobId(),
|
|
Logs: []*proto.Log{{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: "Detecting persistent resources",
|
|
CreatedAt: time.Now().UTC().UnixMilli(),
|
|
}},
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("write log: %s", err)
|
|
return
|
|
}
|
|
startResources, err := p.runTemplateImportProvision(ctx, shutdown, provisioner, job, updateResponse.ParameterValues, &sdkproto.Provision_Metadata{
|
|
CoderUrl: job.GetTemplateImport().Metadata.CoderUrl,
|
|
WorkspaceTransition: sdkproto.WorkspaceTransition_START,
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("template import provision for start: %s", err)
|
|
return
|
|
}
|
|
_, err = client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.GetJobId(),
|
|
Logs: []*proto.Log{{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: "Detecting ephemeral resources",
|
|
CreatedAt: time.Now().UTC().UnixMilli(),
|
|
}},
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("write log: %s", err)
|
|
return
|
|
}
|
|
stopResources, err := p.runTemplateImportProvision(ctx, shutdown, provisioner, job, updateResponse.ParameterValues, &sdkproto.Provision_Metadata{
|
|
CoderUrl: job.GetTemplateImport().Metadata.CoderUrl,
|
|
WorkspaceTransition: sdkproto.WorkspaceTransition_STOP,
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("template import provision for start: %s", err)
|
|
return
|
|
}
|
|
|
|
p.completeJob(&proto.CompletedJob{
|
|
JobId: job.JobId,
|
|
Type: &proto.CompletedJob_TemplateImport_{
|
|
TemplateImport: &proto.CompletedJob_TemplateImport{
|
|
StartResources: startResources,
|
|
StopResources: stopResources,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
// Parses parameter schemas from source.
|
|
func (p *Server) runTemplateImportParse(ctx context.Context, provisioner sdkproto.DRPCProvisionerClient, job *proto.AcquiredJob) ([]*sdkproto.ParameterSchema, error) {
|
|
client, ok := p.client()
|
|
if !ok {
|
|
return nil, xerrors.New("client disconnected")
|
|
}
|
|
stream, err := provisioner.Parse(ctx, &sdkproto.Parse_Request{
|
|
Directory: p.opts.WorkDirectory,
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("parse source: %w", err)
|
|
}
|
|
defer stream.Close()
|
|
for {
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("recv parse source: %w", err)
|
|
}
|
|
switch msgType := msg.Type.(type) {
|
|
case *sdkproto.Parse_Response_Log:
|
|
p.opts.Logger.Debug(context.Background(), "parse job logged",
|
|
slog.F("level", msgType.Log.Level),
|
|
slog.F("output", msgType.Log.Output),
|
|
)
|
|
|
|
_, err = client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.JobId,
|
|
Logs: []*proto.Log{{
|
|
Source: proto.LogSource_PROVISIONER,
|
|
Level: msgType.Log.Level,
|
|
CreatedAt: time.Now().UTC().UnixMilli(),
|
|
Output: msgType.Log.Output,
|
|
}},
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update job: %w", err)
|
|
}
|
|
case *sdkproto.Parse_Response_Complete:
|
|
p.opts.Logger.Info(context.Background(), "parse complete",
|
|
slog.F("parameter_schemas", msgType.Complete.ParameterSchemas))
|
|
|
|
return msgType.Complete.ParameterSchemas, nil
|
|
default:
|
|
return nil, xerrors.Errorf("invalid message type %q received from provisioner",
|
|
reflect.TypeOf(msg.Type).String())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Performs a dry-run provision when importing a template.
|
|
// This is used to detect resources that would be provisioned
|
|
// for a workspace in various states.
|
|
func (p *Server) runTemplateImportProvision(ctx, shutdown context.Context, provisioner sdkproto.DRPCProvisionerClient, job *proto.AcquiredJob, values []*sdkproto.ParameterValue, metadata *sdkproto.Provision_Metadata) ([]*sdkproto.Resource, error) {
|
|
stream, err := provisioner.Provision(ctx)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("provision: %w", err)
|
|
}
|
|
defer stream.Close()
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-shutdown.Done():
|
|
_ = stream.Send(&sdkproto.Provision_Request{
|
|
Type: &sdkproto.Provision_Request_Cancel{
|
|
Cancel: &sdkproto.Provision_Cancel{},
|
|
},
|
|
})
|
|
}
|
|
}()
|
|
err = stream.Send(&sdkproto.Provision_Request{
|
|
Type: &sdkproto.Provision_Request_Start{
|
|
Start: &sdkproto.Provision_Start{
|
|
Directory: p.opts.WorkDirectory,
|
|
ParameterValues: values,
|
|
DryRun: true,
|
|
Metadata: metadata,
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("start provision: %w", err)
|
|
}
|
|
|
|
for {
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("recv import provision: %w", err)
|
|
}
|
|
switch msgType := msg.Type.(type) {
|
|
case *sdkproto.Provision_Response_Log:
|
|
p.opts.Logger.Debug(context.Background(), "template import provision job logged",
|
|
slog.F("level", msgType.Log.Level),
|
|
slog.F("output", msgType.Log.Output),
|
|
)
|
|
client, ok := p.client()
|
|
if !ok {
|
|
continue
|
|
}
|
|
_, err = client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.JobId,
|
|
Logs: []*proto.Log{{
|
|
Source: proto.LogSource_PROVISIONER,
|
|
Level: msgType.Log.Level,
|
|
CreatedAt: time.Now().UTC().UnixMilli(),
|
|
Output: msgType.Log.Output,
|
|
}},
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("send job update: %w", err)
|
|
}
|
|
case *sdkproto.Provision_Response_Complete:
|
|
p.opts.Logger.Info(context.Background(), "parse dry-run provision successful",
|
|
slog.F("resource_count", len(msgType.Complete.Resources)),
|
|
slog.F("resources", msgType.Complete.Resources),
|
|
slog.F("state_length", len(msgType.Complete.State)),
|
|
)
|
|
|
|
return msgType.Complete.Resources, nil
|
|
default:
|
|
return nil, xerrors.Errorf("invalid message type %q received from provisioner",
|
|
reflect.TypeOf(msg.Type).String())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Server) runWorkspaceBuild(ctx, shutdown context.Context, provisioner sdkproto.DRPCProvisionerClient, job *proto.AcquiredJob) {
|
|
var stage string
|
|
switch job.GetWorkspaceBuild().Metadata.WorkspaceTransition {
|
|
case sdkproto.WorkspaceTransition_START:
|
|
stage = "Starting workspace"
|
|
case sdkproto.WorkspaceTransition_STOP:
|
|
stage = "Stopping workspace"
|
|
case sdkproto.WorkspaceTransition_DESTROY:
|
|
stage = "Destroying workspace"
|
|
}
|
|
|
|
client, ok := p.client()
|
|
if !ok {
|
|
p.failActiveJobf("client disconnected")
|
|
return
|
|
}
|
|
_, err := client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.GetJobId(),
|
|
Logs: []*proto.Log{{
|
|
Source: proto.LogSource_PROVISIONER_DAEMON,
|
|
Level: sdkproto.LogLevel_INFO,
|
|
Stage: stage,
|
|
CreatedAt: time.Now().UTC().UnixMilli(),
|
|
}},
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("write log: %s", err)
|
|
return
|
|
}
|
|
|
|
stream, err := provisioner.Provision(ctx)
|
|
if err != nil {
|
|
p.failActiveJobf("provision: %s", err)
|
|
return
|
|
}
|
|
defer stream.Close()
|
|
go func() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-shutdown.Done():
|
|
_ = stream.Send(&sdkproto.Provision_Request{
|
|
Type: &sdkproto.Provision_Request_Cancel{
|
|
Cancel: &sdkproto.Provision_Cancel{},
|
|
},
|
|
})
|
|
}
|
|
}()
|
|
err = stream.Send(&sdkproto.Provision_Request{
|
|
Type: &sdkproto.Provision_Request_Start{
|
|
Start: &sdkproto.Provision_Start{
|
|
Directory: p.opts.WorkDirectory,
|
|
ParameterValues: job.GetWorkspaceBuild().ParameterValues,
|
|
Metadata: job.GetWorkspaceBuild().Metadata,
|
|
State: job.GetWorkspaceBuild().State,
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("start provision: %s", err)
|
|
return
|
|
}
|
|
|
|
for {
|
|
msg, err := stream.Recv()
|
|
if err != nil {
|
|
p.failActiveJobf("recv workspace provision: %s", err)
|
|
return
|
|
}
|
|
switch msgType := msg.Type.(type) {
|
|
case *sdkproto.Provision_Response_Log:
|
|
p.opts.Logger.Debug(context.Background(), "workspace provision job logged",
|
|
slog.F("level", msgType.Log.Level),
|
|
slog.F("output", msgType.Log.Output),
|
|
slog.F("workspace_build_id", job.GetWorkspaceBuild().WorkspaceBuildId),
|
|
)
|
|
|
|
_, err = client.UpdateJob(ctx, &proto.UpdateJobRequest{
|
|
JobId: job.JobId,
|
|
Logs: []*proto.Log{{
|
|
Source: proto.LogSource_PROVISIONER,
|
|
Level: msgType.Log.Level,
|
|
CreatedAt: time.Now().UTC().UnixMilli(),
|
|
Output: msgType.Log.Output,
|
|
}},
|
|
})
|
|
if err != nil {
|
|
p.failActiveJobf("send job update: %s", err)
|
|
return
|
|
}
|
|
case *sdkproto.Provision_Response_Complete:
|
|
if msgType.Complete.Error != "" {
|
|
p.opts.Logger.Info(context.Background(), "provision failed; updating state",
|
|
slog.F("state_length", len(msgType.Complete.State)),
|
|
)
|
|
|
|
p.failActiveJob(&proto.FailedJob{
|
|
Error: msgType.Complete.Error,
|
|
Type: &proto.FailedJob_WorkspaceBuild_{
|
|
WorkspaceBuild: &proto.FailedJob_WorkspaceBuild{
|
|
State: msgType.Complete.State,
|
|
},
|
|
},
|
|
})
|
|
return
|
|
}
|
|
|
|
p.completeJob(&proto.CompletedJob{
|
|
JobId: job.JobId,
|
|
Type: &proto.CompletedJob_WorkspaceBuild_{
|
|
WorkspaceBuild: &proto.CompletedJob_WorkspaceBuild{
|
|
State: msgType.Complete.State,
|
|
Resources: msgType.Complete.Resources,
|
|
},
|
|
},
|
|
})
|
|
p.opts.Logger.Info(context.Background(), "provision successful; marked job as complete",
|
|
slog.F("resource_count", len(msgType.Complete.Resources)),
|
|
slog.F("resources", msgType.Complete.Resources),
|
|
slog.F("state_length", len(msgType.Complete.State)),
|
|
)
|
|
// Stop looping!
|
|
return
|
|
default:
|
|
p.failActiveJobf("invalid message type %T received from provisioner", msg.Type)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Server) completeJob(job *proto.CompletedJob) {
|
|
for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(p.closeContext); {
|
|
client, ok := p.client()
|
|
if !ok {
|
|
continue
|
|
}
|
|
// Complete job may need to be async if we disconnected...
|
|
// When we reconnect we can flush any of these cached values.
|
|
_, err := client.CompleteJob(p.closeContext, job)
|
|
if xerrors.Is(err, yamux.ErrSessionShutdown) || xerrors.Is(err, io.EOF) {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
p.opts.Logger.Warn(p.closeContext, "failed to complete job", slog.Error(err))
|
|
p.failActiveJobf(err.Error())
|
|
return
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
func (p *Server) failActiveJobf(format string, args ...interface{}) {
|
|
p.failActiveJob(&proto.FailedJob{
|
|
Error: fmt.Sprintf(format, args...),
|
|
})
|
|
}
|
|
|
|
func (p *Server) failActiveJob(failedJob *proto.FailedJob) {
|
|
p.jobMutex.Lock()
|
|
defer p.jobMutex.Unlock()
|
|
if !p.isRunningJob() {
|
|
if p.isClosed() {
|
|
return
|
|
}
|
|
p.opts.Logger.Info(context.Background(), "skipping job fail; none running", slog.F("error_message", failedJob.Error))
|
|
return
|
|
}
|
|
if p.jobFailed.Load() {
|
|
p.opts.Logger.Debug(context.Background(), "job has already been marked as failed", slog.F("error_messsage", failedJob.Error))
|
|
return
|
|
}
|
|
p.jobFailed.Store(true)
|
|
p.jobCancel()
|
|
p.opts.Logger.Info(context.Background(), "failing running job",
|
|
slog.F("error_message", failedJob.Error),
|
|
slog.F("job_id", p.jobID),
|
|
)
|
|
failedJob.JobId = p.jobID
|
|
for retrier := retry.New(25*time.Millisecond, 5*time.Second); retrier.Wait(p.closeContext); {
|
|
client, ok := p.client()
|
|
if !ok {
|
|
continue
|
|
}
|
|
_, err := client.FailJob(p.closeContext, failedJob)
|
|
if xerrors.Is(err, yamux.ErrSessionShutdown) || xerrors.Is(err, io.EOF) {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
if p.isClosed() {
|
|
return
|
|
}
|
|
p.opts.Logger.Warn(context.Background(), "failed to notify of error; job is no longer running", slog.Error(err))
|
|
return
|
|
}
|
|
p.opts.Logger.Debug(context.Background(), "marked running job as failed")
|
|
return
|
|
}
|
|
}
|
|
|
|
// isClosed returns whether the API is closed or not.
|
|
func (p *Server) isClosed() bool {
|
|
select {
|
|
case <-p.closeContext.Done():
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// isShutdown returns whether the API is shutdown or not.
|
|
func (p *Server) isShutdown() bool {
|
|
select {
|
|
case <-p.shutdown:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Shutdown triggers a graceful exit of each registered provisioner.
|
|
// It exits when an active job stops.
|
|
func (p *Server) Shutdown(ctx context.Context) error {
|
|
p.shutdownMutex.Lock()
|
|
defer p.shutdownMutex.Unlock()
|
|
if !p.isRunningJob() {
|
|
return nil
|
|
}
|
|
p.opts.Logger.Info(ctx, "attempting graceful shutdown")
|
|
close(p.shutdown)
|
|
select {
|
|
case <-ctx.Done():
|
|
p.opts.Logger.Warn(ctx, "graceful shutdown failed", slog.Error(ctx.Err()))
|
|
return ctx.Err()
|
|
case <-p.jobRunning:
|
|
p.opts.Logger.Info(ctx, "gracefully shutdown")
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Close ends the provisioner. It will mark any running jobs as failed.
|
|
func (p *Server) Close() error {
|
|
return p.closeWithError(nil)
|
|
}
|
|
|
|
// closeWithError closes the provisioner; subsequent reads/writes will return the error err.
|
|
func (p *Server) closeWithError(err error) error {
|
|
p.closeMutex.Lock()
|
|
defer p.closeMutex.Unlock()
|
|
if p.isClosed() {
|
|
return p.closeError
|
|
}
|
|
p.closeError = err
|
|
|
|
errMsg := "provisioner daemon was shutdown gracefully"
|
|
if err != nil {
|
|
errMsg = err.Error()
|
|
}
|
|
p.failActiveJobf(errMsg)
|
|
p.jobRunningMutex.Lock()
|
|
<-p.jobRunning
|
|
p.jobRunningMutex.Unlock()
|
|
p.closeCancel()
|
|
|
|
p.opts.Logger.Debug(context.Background(), "closing server with error", slog.Error(err))
|
|
|
|
return err
|
|
}
|