mirror of
https://github.com/coder/coder.git
synced 2025-07-06 15:41:45 +00:00
* feat: Add connection_timeout and troubleshooting_url to agent This commit adds the connection timeout and troubleshooting url fields to coder agents. If an initial connection cannot be established within connection timeout seconds, then the agent status will be marked as `"timeout"`. The troubleshooting URL will be present, if configured in the Terraform template, it can be presented to the user when the agent state is either `"timeout"` or `"disconnected"`. Fixes #4678
941 lines
32 KiB
Go
941 lines
32 KiB
Go
package provisionerdserver
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/tabbed/pqtype"
|
|
"golang.org/x/xerrors"
|
|
protobuf "google.golang.org/protobuf/proto"
|
|
|
|
"cdr.dev/slog"
|
|
|
|
"github.com/coder/coder/coderd/database"
|
|
"github.com/coder/coder/coderd/parameter"
|
|
"github.com/coder/coder/coderd/telemetry"
|
|
"github.com/coder/coder/codersdk"
|
|
"github.com/coder/coder/provisioner"
|
|
"github.com/coder/coder/provisionerd/proto"
|
|
"github.com/coder/coder/provisionersdk"
|
|
sdkproto "github.com/coder/coder/provisionersdk/proto"
|
|
)
|
|
|
|
type Server struct {
|
|
AccessURL *url.URL
|
|
ID uuid.UUID
|
|
Logger slog.Logger
|
|
Provisioners []database.ProvisionerType
|
|
Database database.Store
|
|
Pubsub database.Pubsub
|
|
Telemetry telemetry.Reporter
|
|
}
|
|
|
|
// AcquireJob queries the database to lock a job.
|
|
func (server *Server) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.AcquiredJob, error) {
|
|
// This marks the job as locked in the database.
|
|
job, err := server.Database.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{
|
|
StartedAt: sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
},
|
|
WorkerID: uuid.NullUUID{
|
|
UUID: server.ID,
|
|
Valid: true,
|
|
},
|
|
Types: server.Provisioners,
|
|
})
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
// The provisioner daemon assumes no jobs are available if
|
|
// an empty struct is returned.
|
|
return &proto.AcquiredJob{}, nil
|
|
}
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("acquire job: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "locked job from database", slog.F("id", job.ID))
|
|
|
|
// Marks the acquired job as failed with the error message provided.
|
|
failJob := func(errorMessage string) error {
|
|
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
|
|
ID: job.ID,
|
|
CompletedAt: sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
},
|
|
Error: sql.NullString{
|
|
String: errorMessage,
|
|
Valid: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("update provisioner job: %w", err)
|
|
}
|
|
return xerrors.Errorf("request job was invalidated: %s", errorMessage)
|
|
}
|
|
|
|
user, err := server.Database.GetUserByID(ctx, job.InitiatorID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get user: %s", err))
|
|
}
|
|
|
|
protoJob := &proto.AcquiredJob{
|
|
JobId: job.ID.String(),
|
|
CreatedAt: job.CreatedAt.UnixMilli(),
|
|
Provisioner: string(job.Provisioner),
|
|
UserName: user.Username,
|
|
}
|
|
switch job.Type {
|
|
case database.ProvisionerJobTypeWorkspaceBuild:
|
|
var input WorkspaceProvisionJob
|
|
err = json.Unmarshal(job.Input, &input)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("unmarshal job input %q: %s", job.Input, err))
|
|
}
|
|
workspaceBuild, err := server.Database.GetWorkspaceBuildByID(ctx, input.WorkspaceBuildID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get workspace build: %s", err))
|
|
}
|
|
workspace, err := server.Database.GetWorkspaceByID(ctx, workspaceBuild.WorkspaceID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get workspace: %s", err))
|
|
}
|
|
templateVersion, err := server.Database.GetTemplateVersionByID(ctx, workspaceBuild.TemplateVersionID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get template version: %s", err))
|
|
}
|
|
template, err := server.Database.GetTemplateByID(ctx, templateVersion.TemplateID.UUID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get template: %s", err))
|
|
}
|
|
owner, err := server.Database.GetUserByID(ctx, workspace.OwnerID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get owner: %s", err))
|
|
}
|
|
err = server.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspace.ID), []byte{})
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("publish workspace update: %s", err))
|
|
}
|
|
|
|
// Compute parameters for the workspace to consume.
|
|
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
|
|
TemplateImportJobID: templateVersion.JobID,
|
|
TemplateID: uuid.NullUUID{
|
|
UUID: template.ID,
|
|
Valid: true,
|
|
},
|
|
WorkspaceID: uuid.NullUUID{
|
|
UUID: workspace.ID,
|
|
Valid: true,
|
|
},
|
|
}, nil)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("compute parameters: %s", err))
|
|
}
|
|
|
|
// Convert types to their corresponding protobuf types.
|
|
protoParameters, err := convertComputedParameterValues(parameters)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("convert computed parameters to protobuf: %s", err))
|
|
}
|
|
transition, err := convertWorkspaceTransition(workspaceBuild.Transition)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("convert workspace transition: %s", err))
|
|
}
|
|
|
|
protoJob.Type = &proto.AcquiredJob_WorkspaceBuild_{
|
|
WorkspaceBuild: &proto.AcquiredJob_WorkspaceBuild{
|
|
WorkspaceBuildId: workspaceBuild.ID.String(),
|
|
WorkspaceName: workspace.Name,
|
|
State: workspaceBuild.ProvisionerState,
|
|
ParameterValues: protoParameters,
|
|
Metadata: &sdkproto.Provision_Metadata{
|
|
CoderUrl: server.AccessURL.String(),
|
|
WorkspaceTransition: transition,
|
|
WorkspaceName: workspace.Name,
|
|
WorkspaceOwner: owner.Username,
|
|
WorkspaceOwnerEmail: owner.Email,
|
|
WorkspaceId: workspace.ID.String(),
|
|
WorkspaceOwnerId: owner.ID.String(),
|
|
},
|
|
},
|
|
}
|
|
case database.ProvisionerJobTypeTemplateVersionDryRun:
|
|
var input TemplateVersionDryRunJob
|
|
err = json.Unmarshal(job.Input, &input)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("unmarshal job input %q: %s", job.Input, err))
|
|
}
|
|
|
|
templateVersion, err := server.Database.GetTemplateVersionByID(ctx, input.TemplateVersionID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get template version: %s", err))
|
|
}
|
|
|
|
// Compute parameters for the dry-run to consume.
|
|
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
|
|
TemplateImportJobID: templateVersion.JobID,
|
|
TemplateID: templateVersion.TemplateID,
|
|
WorkspaceID: uuid.NullUUID{},
|
|
AdditionalParameterValues: input.ParameterValues,
|
|
}, nil)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("compute parameters: %s", err))
|
|
}
|
|
|
|
// Convert types to their corresponding protobuf types.
|
|
protoParameters, err := convertComputedParameterValues(parameters)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("convert computed parameters to protobuf: %s", err))
|
|
}
|
|
|
|
protoJob.Type = &proto.AcquiredJob_TemplateDryRun_{
|
|
TemplateDryRun: &proto.AcquiredJob_TemplateDryRun{
|
|
ParameterValues: protoParameters,
|
|
Metadata: &sdkproto.Provision_Metadata{
|
|
CoderUrl: server.AccessURL.String(),
|
|
WorkspaceName: input.WorkspaceName,
|
|
},
|
|
},
|
|
}
|
|
case database.ProvisionerJobTypeTemplateVersionImport:
|
|
protoJob.Type = &proto.AcquiredJob_TemplateImport_{
|
|
TemplateImport: &proto.AcquiredJob_TemplateImport{
|
|
Metadata: &sdkproto.Provision_Metadata{
|
|
CoderUrl: server.AccessURL.String(),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
switch job.StorageMethod {
|
|
case database.ProvisionerStorageMethodFile:
|
|
file, err := server.Database.GetFileByID(ctx, job.FileID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get file by hash: %s", err))
|
|
}
|
|
protoJob.TemplateSourceArchive = file.Data
|
|
default:
|
|
return nil, failJob(fmt.Sprintf("unsupported storage method: %s", job.StorageMethod))
|
|
}
|
|
if protobuf.Size(protoJob) > provisionersdk.MaxMessageSize {
|
|
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), provisionersdk.MaxMessageSize))
|
|
}
|
|
|
|
return protoJob, err
|
|
}
|
|
|
|
func (server *Server) UpdateJob(ctx context.Context, request *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
|
|
parsedID, err := uuid.Parse(request.JobId)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("parse job id: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "UpdateJob starting", slog.F("job_id", parsedID))
|
|
job, err := server.Database.GetProvisionerJobByID(ctx, parsedID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get job: %w", err)
|
|
}
|
|
if !job.WorkerID.Valid {
|
|
return nil, xerrors.New("job isn't running yet")
|
|
}
|
|
if job.WorkerID.UUID.String() != server.ID.String() {
|
|
return nil, xerrors.New("you don't own this job")
|
|
}
|
|
err = server.Database.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
|
|
ID: parsedID,
|
|
UpdatedAt: database.Now(),
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update job: %w", err)
|
|
}
|
|
|
|
if len(request.Logs) > 0 {
|
|
insertParams := database.InsertProvisionerJobLogsParams{
|
|
JobID: parsedID,
|
|
}
|
|
for _, log := range request.Logs {
|
|
logLevel, err := convertLogLevel(log.Level)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert log level: %w", err)
|
|
}
|
|
logSource, err := convertLogSource(log.Source)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert log source: %w", err)
|
|
}
|
|
insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt))
|
|
insertParams.Level = append(insertParams.Level, logLevel)
|
|
insertParams.Stage = append(insertParams.Stage, log.Stage)
|
|
insertParams.Source = append(insertParams.Source, logSource)
|
|
insertParams.Output = append(insertParams.Output, log.Output)
|
|
server.Logger.Debug(ctx, "job log",
|
|
slog.F("job_id", parsedID),
|
|
slog.F("stage", log.Stage),
|
|
slog.F("output", log.Output))
|
|
}
|
|
logs, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
|
|
if err != nil {
|
|
server.Logger.Error(ctx, "failed to insert job logs", slog.F("job_id", parsedID), slog.Error(err))
|
|
return nil, xerrors.Errorf("insert job logs: %w", err)
|
|
}
|
|
// Publish by the lowest log ID inserted so the
|
|
// log stream will fetch everything from that point.
|
|
lowestID := logs[0].ID
|
|
server.Logger.Debug(ctx, "inserted job logs", slog.F("job_id", parsedID))
|
|
data, err := json.Marshal(ProvisionerJobLogsNotifyMessage{
|
|
CreatedAfter: lowestID - 1,
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("marshal: %w", err)
|
|
}
|
|
err = server.Pubsub.Publish(ProvisionerJobLogsNotifyChannel(parsedID), data)
|
|
if err != nil {
|
|
server.Logger.Error(ctx, "failed to publish job logs", slog.F("job_id", parsedID), slog.Error(err))
|
|
return nil, xerrors.Errorf("publish job log: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "published job logs", slog.F("job_id", parsedID))
|
|
}
|
|
|
|
if len(request.Readme) > 0 {
|
|
err := server.Database.UpdateTemplateVersionDescriptionByJobID(ctx, database.UpdateTemplateVersionDescriptionByJobIDParams{
|
|
JobID: job.ID,
|
|
Readme: string(request.Readme),
|
|
UpdatedAt: database.Now(),
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update template version description: %w", err)
|
|
}
|
|
}
|
|
|
|
if len(request.ParameterSchemas) > 0 {
|
|
for index, protoParameter := range request.ParameterSchemas {
|
|
validationTypeSystem, err := convertValidationTypeSystem(protoParameter.ValidationTypeSystem)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert validation type system for %q: %w", protoParameter.Name, err)
|
|
}
|
|
|
|
parameterSchema := database.InsertParameterSchemaParams{
|
|
ID: uuid.New(),
|
|
CreatedAt: database.Now(),
|
|
JobID: job.ID,
|
|
Name: protoParameter.Name,
|
|
Description: protoParameter.Description,
|
|
RedisplayValue: protoParameter.RedisplayValue,
|
|
ValidationError: protoParameter.ValidationError,
|
|
ValidationCondition: protoParameter.ValidationCondition,
|
|
ValidationValueType: protoParameter.ValidationValueType,
|
|
ValidationTypeSystem: validationTypeSystem,
|
|
|
|
DefaultSourceScheme: database.ParameterSourceSchemeNone,
|
|
DefaultDestinationScheme: database.ParameterDestinationSchemeNone,
|
|
|
|
AllowOverrideDestination: protoParameter.AllowOverrideDestination,
|
|
AllowOverrideSource: protoParameter.AllowOverrideSource,
|
|
|
|
Index: int32(index),
|
|
}
|
|
|
|
// It's possible a parameter doesn't define a default source!
|
|
if protoParameter.DefaultSource != nil {
|
|
parameterSourceScheme, err := convertParameterSourceScheme(protoParameter.DefaultSource.Scheme)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert parameter source scheme: %w", err)
|
|
}
|
|
parameterSchema.DefaultSourceScheme = parameterSourceScheme
|
|
parameterSchema.DefaultSourceValue = protoParameter.DefaultSource.Value
|
|
}
|
|
|
|
// It's possible a parameter doesn't define a default destination!
|
|
if protoParameter.DefaultDestination != nil {
|
|
parameterDestinationScheme, err := convertParameterDestinationScheme(protoParameter.DefaultDestination.Scheme)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert parameter destination scheme: %w", err)
|
|
}
|
|
parameterSchema.DefaultDestinationScheme = parameterDestinationScheme
|
|
}
|
|
|
|
_, err = server.Database.InsertParameterSchema(ctx, parameterSchema)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("insert parameter schema: %w", err)
|
|
}
|
|
}
|
|
|
|
var templateID uuid.NullUUID
|
|
if job.Type == database.ProvisionerJobTypeTemplateVersionImport {
|
|
templateVersion, err := server.Database.GetTemplateVersionByJobID(ctx, job.ID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get template version by job id: %w", err)
|
|
}
|
|
templateID = templateVersion.TemplateID
|
|
}
|
|
|
|
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
|
|
TemplateImportJobID: job.ID,
|
|
TemplateID: templateID,
|
|
}, nil)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("compute parameters: %w", err)
|
|
}
|
|
// Convert parameters to the protobuf type.
|
|
protoParameters := make([]*sdkproto.ParameterValue, 0, len(parameters))
|
|
for _, computedParameter := range parameters {
|
|
converted, err := convertComputedParameterValue(computedParameter)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert parameter: %s", err)
|
|
}
|
|
protoParameters = append(protoParameters, converted)
|
|
}
|
|
|
|
return &proto.UpdateJobResponse{
|
|
Canceled: job.CanceledAt.Valid,
|
|
ParameterValues: protoParameters,
|
|
}, nil
|
|
}
|
|
|
|
return &proto.UpdateJobResponse{
|
|
Canceled: job.CanceledAt.Valid,
|
|
}, nil
|
|
}
|
|
|
|
func (server *Server) FailJob(ctx context.Context, failJob *proto.FailedJob) (*proto.Empty, error) {
|
|
jobID, err := uuid.Parse(failJob.JobId)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("parse job id: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "FailJob starting", slog.F("job_id", jobID))
|
|
job, err := server.Database.GetProvisionerJobByID(ctx, jobID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get provisioner job: %w", err)
|
|
}
|
|
if job.WorkerID.UUID.String() != server.ID.String() {
|
|
return nil, xerrors.New("you don't own this job")
|
|
}
|
|
if job.CompletedAt.Valid {
|
|
return nil, xerrors.Errorf("job already completed")
|
|
}
|
|
job.CompletedAt = sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
}
|
|
job.Error = sql.NullString{
|
|
String: failJob.Error,
|
|
Valid: failJob.Error != "",
|
|
}
|
|
|
|
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
|
|
ID: jobID,
|
|
CompletedAt: job.CompletedAt,
|
|
UpdatedAt: database.Now(),
|
|
Error: job.Error,
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update provisioner job: %w", err)
|
|
}
|
|
server.Telemetry.Report(&telemetry.Snapshot{
|
|
ProvisionerJobs: []telemetry.ProvisionerJob{telemetry.ConvertProvisionerJob(job)},
|
|
})
|
|
|
|
switch jobType := failJob.Type.(type) {
|
|
case *proto.FailedJob_WorkspaceBuild_:
|
|
if jobType.WorkspaceBuild.State == nil {
|
|
break
|
|
}
|
|
var input WorkspaceProvisionJob
|
|
err = json.Unmarshal(job.Input, &input)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err)
|
|
}
|
|
build, err := server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
|
|
ID: input.WorkspaceBuildID,
|
|
UpdatedAt: database.Now(),
|
|
ProvisionerState: jobType.WorkspaceBuild.State,
|
|
// We are explicitly not updating deadline here.
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update workspace build state: %w", err)
|
|
}
|
|
err = server.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(build.WorkspaceID), []byte{})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update workspace: %w", err)
|
|
}
|
|
case *proto.FailedJob_TemplateImport_:
|
|
}
|
|
|
|
data, err := json.Marshal(ProvisionerJobLogsNotifyMessage{EndOfLogs: true})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("marshal job log: %w", err)
|
|
}
|
|
err = server.Pubsub.Publish(ProvisionerJobLogsNotifyChannel(jobID), data)
|
|
if err != nil {
|
|
server.Logger.Error(ctx, "failed to publish end of job logs", slog.F("job_id", jobID), slog.Error(err))
|
|
return nil, xerrors.Errorf("publish end of job logs: %w", err)
|
|
}
|
|
return &proto.Empty{}, nil
|
|
}
|
|
|
|
// CompleteJob is triggered by a provision daemon to mark a provisioner job as completed.
|
|
func (server *Server) CompleteJob(ctx context.Context, completed *proto.CompletedJob) (*proto.Empty, error) {
|
|
jobID, err := uuid.Parse(completed.JobId)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("parse job id: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "CompleteJob starting", slog.F("job_id", jobID))
|
|
job, err := server.Database.GetProvisionerJobByID(ctx, jobID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get job by id: %w", err)
|
|
}
|
|
if job.WorkerID.UUID.String() != server.ID.String() {
|
|
return nil, xerrors.Errorf("you don't own this job")
|
|
}
|
|
|
|
telemetrySnapshot := &telemetry.Snapshot{}
|
|
// Items are added to this snapshot as they complete!
|
|
defer server.Telemetry.Report(telemetrySnapshot)
|
|
|
|
switch jobType := completed.Type.(type) {
|
|
case *proto.CompletedJob_TemplateImport_:
|
|
for transition, resources := range map[database.WorkspaceTransition][]*sdkproto.Resource{
|
|
database.WorkspaceTransitionStart: jobType.TemplateImport.StartResources,
|
|
database.WorkspaceTransitionStop: jobType.TemplateImport.StopResources,
|
|
} {
|
|
for _, resource := range resources {
|
|
server.Logger.Info(ctx, "inserting template import job resource",
|
|
slog.F("job_id", job.ID.String()),
|
|
slog.F("resource_name", resource.Name),
|
|
slog.F("resource_type", resource.Type),
|
|
slog.F("transition", transition))
|
|
|
|
err = InsertWorkspaceResource(ctx, server.Database, jobID, transition, resource, telemetrySnapshot)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("insert resource: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
|
|
ID: jobID,
|
|
UpdatedAt: database.Now(),
|
|
CompletedAt: sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update provisioner job: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "marked import job as completed", slog.F("job_id", jobID))
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("complete job: %w", err)
|
|
}
|
|
case *proto.CompletedJob_WorkspaceBuild_:
|
|
var input WorkspaceProvisionJob
|
|
err = json.Unmarshal(job.Input, &input)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("unmarshal job data: %w", err)
|
|
}
|
|
|
|
workspaceBuild, err := server.Database.GetWorkspaceBuildByID(ctx, input.WorkspaceBuildID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get workspace build: %w", err)
|
|
}
|
|
|
|
err = server.Database.InTx(func(db database.Store) error {
|
|
now := database.Now()
|
|
var workspaceDeadline time.Time
|
|
workspace, err := db.GetWorkspaceByID(ctx, workspaceBuild.WorkspaceID)
|
|
if err == nil {
|
|
if workspace.Ttl.Valid {
|
|
workspaceDeadline = now.Add(time.Duration(workspace.Ttl.Int64))
|
|
}
|
|
} else {
|
|
// Huh? Did the workspace get deleted?
|
|
// In any case, since this is just for the TTL, try and continue anyway.
|
|
server.Logger.Error(ctx, "fetch workspace for build", slog.F("workspace_build_id", workspaceBuild.ID), slog.F("workspace_id", workspaceBuild.WorkspaceID))
|
|
}
|
|
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
|
|
ID: jobID,
|
|
UpdatedAt: database.Now(),
|
|
CompletedAt: sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("update provisioner job: %w", err)
|
|
}
|
|
_, err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
|
|
ID: workspaceBuild.ID,
|
|
Deadline: workspaceDeadline,
|
|
ProvisionerState: jobType.WorkspaceBuild.State,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("update workspace build: %w", err)
|
|
}
|
|
// This could be a bulk insert to improve performance.
|
|
for _, protoResource := range jobType.WorkspaceBuild.Resources {
|
|
err = InsertWorkspaceResource(ctx, db, job.ID, workspaceBuild.Transition, protoResource, telemetrySnapshot)
|
|
if err != nil {
|
|
return xerrors.Errorf("insert provisioner job: %w", err)
|
|
}
|
|
}
|
|
|
|
if workspaceBuild.Transition != database.WorkspaceTransitionDelete {
|
|
// This is for deleting a workspace!
|
|
return nil
|
|
}
|
|
|
|
err = db.UpdateWorkspaceDeletedByID(ctx, database.UpdateWorkspaceDeletedByIDParams{
|
|
ID: workspaceBuild.WorkspaceID,
|
|
Deleted: true,
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("update workspace deleted: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("complete job: %w", err)
|
|
}
|
|
|
|
err = server.Pubsub.Publish(codersdk.WorkspaceNotifyChannel(workspaceBuild.WorkspaceID), []byte{})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update workspace: %w", err)
|
|
}
|
|
case *proto.CompletedJob_TemplateDryRun_:
|
|
for _, resource := range jobType.TemplateDryRun.Resources {
|
|
server.Logger.Info(ctx, "inserting template dry-run job resource",
|
|
slog.F("job_id", job.ID.String()),
|
|
slog.F("resource_name", resource.Name),
|
|
slog.F("resource_type", resource.Type))
|
|
|
|
err = InsertWorkspaceResource(ctx, server.Database, jobID, database.WorkspaceTransitionStart, resource, telemetrySnapshot)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("insert resource: %w", err)
|
|
}
|
|
}
|
|
|
|
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
|
|
ID: jobID,
|
|
UpdatedAt: database.Now(),
|
|
CompletedAt: sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update provisioner job: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "marked template dry-run job as completed", slog.F("job_id", jobID))
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("complete job: %w", err)
|
|
}
|
|
|
|
default:
|
|
if completed.Type == nil {
|
|
return nil, xerrors.Errorf("type payload must be provided")
|
|
}
|
|
return nil, xerrors.Errorf("unknown job type %q; ensure coderd and provisionerd versions match",
|
|
reflect.TypeOf(completed.Type).String())
|
|
}
|
|
|
|
data, err := json.Marshal(ProvisionerJobLogsNotifyMessage{EndOfLogs: true})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("marshal job log: %w", err)
|
|
}
|
|
err = server.Pubsub.Publish(ProvisionerJobLogsNotifyChannel(jobID), data)
|
|
if err != nil {
|
|
server.Logger.Error(ctx, "failed to publish end of job logs", slog.F("job_id", jobID), slog.Error(err))
|
|
return nil, xerrors.Errorf("publish end of job logs: %w", err)
|
|
}
|
|
|
|
server.Logger.Debug(ctx, "CompleteJob done", slog.F("job_id", jobID))
|
|
return &proto.Empty{}, nil
|
|
}
|
|
|
|
func InsertWorkspaceResource(ctx context.Context, db database.Store, jobID uuid.UUID, transition database.WorkspaceTransition, protoResource *sdkproto.Resource, snapshot *telemetry.Snapshot) error {
|
|
resource, err := db.InsertWorkspaceResource(ctx, database.InsertWorkspaceResourceParams{
|
|
ID: uuid.New(),
|
|
CreatedAt: database.Now(),
|
|
JobID: jobID,
|
|
Transition: transition,
|
|
Type: protoResource.Type,
|
|
Name: protoResource.Name,
|
|
Hide: protoResource.Hide,
|
|
Icon: protoResource.Icon,
|
|
InstanceType: sql.NullString{
|
|
String: protoResource.InstanceType,
|
|
Valid: protoResource.InstanceType != "",
|
|
},
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("insert provisioner job resource %q: %w", protoResource.Name, err)
|
|
}
|
|
snapshot.WorkspaceResources = append(snapshot.WorkspaceResources, telemetry.ConvertWorkspaceResource(resource))
|
|
|
|
appSlugs := make(map[string]struct{})
|
|
for _, prAgent := range protoResource.Agents {
|
|
var instanceID sql.NullString
|
|
if prAgent.GetInstanceId() != "" {
|
|
instanceID = sql.NullString{
|
|
String: prAgent.GetInstanceId(),
|
|
Valid: true,
|
|
}
|
|
}
|
|
var env pqtype.NullRawMessage
|
|
if prAgent.Env != nil {
|
|
data, err := json.Marshal(prAgent.Env)
|
|
if err != nil {
|
|
return xerrors.Errorf("marshal env: %w", err)
|
|
}
|
|
env = pqtype.NullRawMessage{
|
|
RawMessage: data,
|
|
Valid: true,
|
|
}
|
|
}
|
|
authToken := uuid.New()
|
|
if prAgent.GetToken() != "" {
|
|
authToken, err = uuid.Parse(prAgent.GetToken())
|
|
if err != nil {
|
|
return xerrors.Errorf("invalid auth token format; must be uuid: %w", err)
|
|
}
|
|
}
|
|
|
|
agentID := uuid.New()
|
|
dbAgent, err := db.InsertWorkspaceAgent(ctx, database.InsertWorkspaceAgentParams{
|
|
ID: agentID,
|
|
CreatedAt: database.Now(),
|
|
UpdatedAt: database.Now(),
|
|
ResourceID: resource.ID,
|
|
Name: prAgent.Name,
|
|
AuthToken: authToken,
|
|
AuthInstanceID: instanceID,
|
|
Architecture: prAgent.Architecture,
|
|
EnvironmentVariables: env,
|
|
Directory: prAgent.Directory,
|
|
OperatingSystem: prAgent.OperatingSystem,
|
|
StartupScript: sql.NullString{
|
|
String: prAgent.StartupScript,
|
|
Valid: prAgent.StartupScript != "",
|
|
},
|
|
ConnectionTimeoutSeconds: prAgent.GetConnectionTimeoutSeconds(),
|
|
TroubleshootingURL: prAgent.GetTroubleshootingUrl(),
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("insert agent: %w", err)
|
|
}
|
|
snapshot.WorkspaceAgents = append(snapshot.WorkspaceAgents, telemetry.ConvertWorkspaceAgent(dbAgent))
|
|
|
|
for _, app := range prAgent.Apps {
|
|
slug := app.Slug
|
|
if slug == "" {
|
|
return xerrors.Errorf("app must have a slug or name set")
|
|
}
|
|
if !provisioner.AppSlugRegex.MatchString(slug) {
|
|
return xerrors.Errorf("app slug %q does not match regex %q", slug, provisioner.AppSlugRegex.String())
|
|
}
|
|
if _, exists := appSlugs[slug]; exists {
|
|
return xerrors.Errorf("duplicate app slug, must be unique per template: %q", slug)
|
|
}
|
|
appSlugs[slug] = struct{}{}
|
|
|
|
health := database.WorkspaceAppHealthDisabled
|
|
if app.Healthcheck == nil {
|
|
app.Healthcheck = &sdkproto.Healthcheck{}
|
|
}
|
|
if app.Healthcheck.Url != "" {
|
|
health = database.WorkspaceAppHealthInitializing
|
|
}
|
|
|
|
sharingLevel := database.AppSharingLevelOwner
|
|
switch app.SharingLevel {
|
|
case sdkproto.AppSharingLevel_AUTHENTICATED:
|
|
sharingLevel = database.AppSharingLevelAuthenticated
|
|
case sdkproto.AppSharingLevel_PUBLIC:
|
|
sharingLevel = database.AppSharingLevelPublic
|
|
}
|
|
|
|
dbApp, err := db.InsertWorkspaceApp(ctx, database.InsertWorkspaceAppParams{
|
|
ID: uuid.New(),
|
|
CreatedAt: database.Now(),
|
|
AgentID: dbAgent.ID,
|
|
Slug: slug,
|
|
DisplayName: app.DisplayName,
|
|
Icon: app.Icon,
|
|
Command: sql.NullString{
|
|
String: app.Command,
|
|
Valid: app.Command != "",
|
|
},
|
|
Url: sql.NullString{
|
|
String: app.Url,
|
|
Valid: app.Url != "",
|
|
},
|
|
Subdomain: app.Subdomain,
|
|
SharingLevel: sharingLevel,
|
|
HealthcheckUrl: app.Healthcheck.Url,
|
|
HealthcheckInterval: app.Healthcheck.Interval,
|
|
HealthcheckThreshold: app.Healthcheck.Threshold,
|
|
Health: health,
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("insert app: %w", err)
|
|
}
|
|
snapshot.WorkspaceApps = append(snapshot.WorkspaceApps, telemetry.ConvertWorkspaceApp(dbApp))
|
|
}
|
|
}
|
|
|
|
for _, metadatum := range protoResource.Metadata {
|
|
var value sql.NullString
|
|
if !metadatum.IsNull {
|
|
value.String = metadatum.Value
|
|
value.Valid = true
|
|
}
|
|
|
|
_, err := db.InsertWorkspaceResourceMetadata(ctx, database.InsertWorkspaceResourceMetadataParams{
|
|
WorkspaceResourceID: resource.ID,
|
|
Key: metadatum.Key,
|
|
Value: value,
|
|
Sensitive: metadatum.Sensitive,
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("insert metadata: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func convertValidationTypeSystem(typeSystem sdkproto.ParameterSchema_TypeSystem) (database.ParameterTypeSystem, error) {
|
|
switch typeSystem {
|
|
case sdkproto.ParameterSchema_None:
|
|
return database.ParameterTypeSystemNone, nil
|
|
case sdkproto.ParameterSchema_HCL:
|
|
return database.ParameterTypeSystemHCL, nil
|
|
default:
|
|
return database.ParameterTypeSystem(""), xerrors.Errorf("unknown type system: %d", typeSystem)
|
|
}
|
|
}
|
|
|
|
func convertParameterSourceScheme(sourceScheme sdkproto.ParameterSource_Scheme) (database.ParameterSourceScheme, error) {
|
|
switch sourceScheme {
|
|
case sdkproto.ParameterSource_DATA:
|
|
return database.ParameterSourceSchemeData, nil
|
|
default:
|
|
return database.ParameterSourceScheme(""), xerrors.Errorf("unknown parameter source scheme: %d", sourceScheme)
|
|
}
|
|
}
|
|
|
|
func convertParameterDestinationScheme(destinationScheme sdkproto.ParameterDestination_Scheme) (database.ParameterDestinationScheme, error) {
|
|
switch destinationScheme {
|
|
case sdkproto.ParameterDestination_ENVIRONMENT_VARIABLE:
|
|
return database.ParameterDestinationSchemeEnvironmentVariable, nil
|
|
case sdkproto.ParameterDestination_PROVISIONER_VARIABLE:
|
|
return database.ParameterDestinationSchemeProvisionerVariable, nil
|
|
default:
|
|
return database.ParameterDestinationScheme(""), xerrors.Errorf("unknown parameter destination scheme: %d", destinationScheme)
|
|
}
|
|
}
|
|
|
|
func convertLogLevel(logLevel sdkproto.LogLevel) (database.LogLevel, error) {
|
|
switch logLevel {
|
|
case sdkproto.LogLevel_TRACE:
|
|
return database.LogLevelTrace, nil
|
|
case sdkproto.LogLevel_DEBUG:
|
|
return database.LogLevelDebug, nil
|
|
case sdkproto.LogLevel_INFO:
|
|
return database.LogLevelInfo, nil
|
|
case sdkproto.LogLevel_WARN:
|
|
return database.LogLevelWarn, nil
|
|
case sdkproto.LogLevel_ERROR:
|
|
return database.LogLevelError, nil
|
|
default:
|
|
return database.LogLevel(""), xerrors.Errorf("unknown log level: %d", logLevel)
|
|
}
|
|
}
|
|
|
|
func convertLogSource(logSource proto.LogSource) (database.LogSource, error) {
|
|
switch logSource {
|
|
case proto.LogSource_PROVISIONER_DAEMON:
|
|
return database.LogSourceProvisionerDaemon, nil
|
|
case proto.LogSource_PROVISIONER:
|
|
return database.LogSourceProvisioner, nil
|
|
default:
|
|
return database.LogSource(""), xerrors.Errorf("unknown log source: %d", logSource)
|
|
}
|
|
}
|
|
|
|
func convertComputedParameterValues(parameters []parameter.ComputedValue) ([]*sdkproto.ParameterValue, error) {
|
|
protoParameters := make([]*sdkproto.ParameterValue, len(parameters))
|
|
for i, computedParameter := range parameters {
|
|
converted, err := convertComputedParameterValue(computedParameter)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert parameter: %w", err)
|
|
}
|
|
protoParameters[i] = converted
|
|
}
|
|
|
|
return protoParameters, nil
|
|
}
|
|
|
|
func convertComputedParameterValue(param parameter.ComputedValue) (*sdkproto.ParameterValue, error) {
|
|
var scheme sdkproto.ParameterDestination_Scheme
|
|
switch param.DestinationScheme {
|
|
case database.ParameterDestinationSchemeEnvironmentVariable:
|
|
scheme = sdkproto.ParameterDestination_ENVIRONMENT_VARIABLE
|
|
case database.ParameterDestinationSchemeProvisionerVariable:
|
|
scheme = sdkproto.ParameterDestination_PROVISIONER_VARIABLE
|
|
default:
|
|
return nil, xerrors.Errorf("unrecognized destination scheme: %q", param.DestinationScheme)
|
|
}
|
|
|
|
return &sdkproto.ParameterValue{
|
|
DestinationScheme: scheme,
|
|
Name: param.Name,
|
|
Value: param.SourceValue,
|
|
}, nil
|
|
}
|
|
|
|
func convertWorkspaceTransition(transition database.WorkspaceTransition) (sdkproto.WorkspaceTransition, error) {
|
|
switch transition {
|
|
case database.WorkspaceTransitionStart:
|
|
return sdkproto.WorkspaceTransition_START, nil
|
|
case database.WorkspaceTransitionStop:
|
|
return sdkproto.WorkspaceTransition_STOP, nil
|
|
case database.WorkspaceTransitionDelete:
|
|
return sdkproto.WorkspaceTransition_DESTROY, nil
|
|
default:
|
|
return 0, xerrors.Errorf("unrecognized transition: %q", transition)
|
|
}
|
|
}
|
|
|
|
// WorkspaceProvisionJob is the payload for the "workspace_provision" job type.
|
|
type WorkspaceProvisionJob struct {
|
|
WorkspaceBuildID uuid.UUID `json:"workspace_build_id"`
|
|
DryRun bool `json:"dry_run"`
|
|
}
|
|
|
|
// TemplateVersionDryRunJob is the payload for the "template_version_dry_run" job type.
|
|
type TemplateVersionDryRunJob struct {
|
|
TemplateVersionID uuid.UUID `json:"template_version_id"`
|
|
WorkspaceName string `json:"workspace_name"`
|
|
ParameterValues []database.ParameterValue `json:"parameter_values"`
|
|
}
|
|
|
|
// ProvisionerJobLogsNotifyMessage is the payload published on
|
|
// the provisioner job logs notify channel.
|
|
type ProvisionerJobLogsNotifyMessage struct {
|
|
CreatedAfter int64 `json:"created_after"`
|
|
EndOfLogs bool `json:"end_of_logs,omitempty"`
|
|
}
|
|
|
|
// ProvisionerJobLogsNotifyChannel is the PostgreSQL NOTIFY channel
|
|
// to publish updates to job logs on.
|
|
func ProvisionerJobLogsNotifyChannel(jobID uuid.UUID) string {
|
|
return fmt.Sprintf("provisioner-log-logs:%s", jobID)
|
|
}
|