mirror of
https://github.com/coder/coder.git
synced 2025-06-28 04:33:02 +00:00
- As part of merging support for Template RBAC and user groups a permission check on reading files was relaxed. With the addition of admin roles on individual templates, regular users are now able to push template versions if they have inherited the 'admin' role for a template. In order to do so they need to be able to create and read their own files. Since collisions on hash in the past were ignored, this means that a regular user who pushes a template version with a file hash that collides with an existing hash will not be able to read the file (since it belongs to another user). This commit fixes the underlying problem which was that the files table had a primary key on the 'hash' column. This was not a problem at the time because only template admins and other users with similar elevated roles were able to read all files regardless of ownership. To fix this a new column and primary key 'id' has been introduced to the files table. The unique constraint has been updated to be hash+created_by. Tables (provisioner_jobs) that referenced files.hash have been updated to reference files.id. Relevant API endpoints have also been updated.
967 lines
33 KiB
Go
967 lines
33 KiB
Go
package coderd
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"reflect"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/moby/moby/pkg/namesgenerator"
|
|
"github.com/tabbed/pqtype"
|
|
"golang.org/x/xerrors"
|
|
protobuf "google.golang.org/protobuf/proto"
|
|
"storj.io/drpc/drpcmux"
|
|
"storj.io/drpc/drpcserver"
|
|
|
|
"cdr.dev/slog"
|
|
|
|
"github.com/coder/coder/coderd/database"
|
|
"github.com/coder/coder/coderd/httpapi"
|
|
"github.com/coder/coder/coderd/parameter"
|
|
"github.com/coder/coder/coderd/rbac"
|
|
"github.com/coder/coder/coderd/telemetry"
|
|
"github.com/coder/coder/codersdk"
|
|
"github.com/coder/coder/provisionerd/proto"
|
|
"github.com/coder/coder/provisionersdk"
|
|
sdkproto "github.com/coder/coder/provisionersdk/proto"
|
|
)
|
|
|
|
func (api *API) provisionerDaemons(rw http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
daemons, err := api.Database.GetProvisionerDaemons(ctx)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching provisioner daemons.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
if daemons == nil {
|
|
daemons = []database.ProvisionerDaemon{}
|
|
}
|
|
daemons, err = AuthorizeFilter(api.HTTPAuth, r, rbac.ActionRead, daemons)
|
|
if err != nil {
|
|
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
|
|
Message: "Internal error fetching provisioner daemons.",
|
|
Detail: err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
httpapi.Write(ctx, rw, http.StatusOK, daemons)
|
|
}
|
|
|
|
// ListenProvisionerDaemon is an in-memory connection to a provisionerd. Useful when starting coderd and provisionerd
|
|
// in the same process.
|
|
func (api *API) ListenProvisionerDaemon(ctx context.Context) (client proto.DRPCProvisionerDaemonClient, err error) {
|
|
clientSession, serverSession := provisionersdk.TransportPipe()
|
|
defer func() {
|
|
if err != nil {
|
|
_ = clientSession.Close()
|
|
_ = serverSession.Close()
|
|
}
|
|
}()
|
|
|
|
name := namesgenerator.GetRandomName(1)
|
|
daemon, err := api.Database.InsertProvisionerDaemon(ctx, database.InsertProvisionerDaemonParams{
|
|
ID: uuid.New(),
|
|
CreatedAt: database.Now(),
|
|
Name: name,
|
|
Provisioners: []database.ProvisionerType{database.ProvisionerTypeEcho, database.ProvisionerTypeTerraform},
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("insert provisioner daemon %q: %w", name, err)
|
|
}
|
|
|
|
mux := drpcmux.New()
|
|
err = proto.DRPCRegisterProvisionerDaemon(mux, &provisionerdServer{
|
|
AccessURL: api.AccessURL,
|
|
ID: daemon.ID,
|
|
Database: api.Database,
|
|
Pubsub: api.Pubsub,
|
|
Provisioners: daemon.Provisioners,
|
|
Telemetry: api.Telemetry,
|
|
Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
server := drpcserver.NewWithOptions(mux, drpcserver.Options{
|
|
Log: func(err error) {
|
|
if xerrors.Is(err, io.EOF) {
|
|
return
|
|
}
|
|
api.Logger.Debug(ctx, "drpc server error", slog.Error(err))
|
|
},
|
|
})
|
|
go func() {
|
|
err := server.Serve(ctx, serverSession)
|
|
if err != nil && !xerrors.Is(err, io.EOF) {
|
|
api.Logger.Debug(ctx, "provisioner daemon disconnected", slog.Error(err))
|
|
}
|
|
// close the sessions so we don't leak goroutines serving them.
|
|
_ = clientSession.Close()
|
|
_ = serverSession.Close()
|
|
}()
|
|
|
|
return proto.NewDRPCProvisionerDaemonClient(provisionersdk.Conn(clientSession)), nil
|
|
}
|
|
|
|
// The input for a "workspace_provision" job.
|
|
type workspaceProvisionJob struct {
|
|
WorkspaceBuildID uuid.UUID `json:"workspace_build_id"`
|
|
DryRun bool `json:"dry_run"`
|
|
}
|
|
|
|
// The input for a "template_version_dry_run" job.
|
|
type templateVersionDryRunJob struct {
|
|
TemplateVersionID uuid.UUID `json:"template_version_id"`
|
|
WorkspaceName string `json:"workspace_name"`
|
|
ParameterValues []database.ParameterValue `json:"parameter_values"`
|
|
}
|
|
|
|
// Implementation of the provisioner daemon protobuf server.
|
|
type provisionerdServer struct {
|
|
AccessURL *url.URL
|
|
ID uuid.UUID
|
|
Logger slog.Logger
|
|
Provisioners []database.ProvisionerType
|
|
Database database.Store
|
|
Pubsub database.Pubsub
|
|
Telemetry telemetry.Reporter
|
|
}
|
|
|
|
// AcquireJob queries the database to lock a job.
|
|
func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty) (*proto.AcquiredJob, error) {
|
|
// This marks the job as locked in the database.
|
|
job, err := server.Database.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{
|
|
StartedAt: sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
},
|
|
WorkerID: uuid.NullUUID{
|
|
UUID: server.ID,
|
|
Valid: true,
|
|
},
|
|
Types: server.Provisioners,
|
|
})
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
// The provisioner daemon assumes no jobs are available if
|
|
// an empty struct is returned.
|
|
return &proto.AcquiredJob{}, nil
|
|
}
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("acquire job: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "locked job from database", slog.F("id", job.ID))
|
|
|
|
// Marks the acquired job as failed with the error message provided.
|
|
failJob := func(errorMessage string) error {
|
|
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
|
|
ID: job.ID,
|
|
CompletedAt: sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
},
|
|
Error: sql.NullString{
|
|
String: errorMessage,
|
|
Valid: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("update provisioner job: %w", err)
|
|
}
|
|
return xerrors.Errorf("request job was invalidated: %s", errorMessage)
|
|
}
|
|
|
|
user, err := server.Database.GetUserByID(ctx, job.InitiatorID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get user: %s", err))
|
|
}
|
|
|
|
protoJob := &proto.AcquiredJob{
|
|
JobId: job.ID.String(),
|
|
CreatedAt: job.CreatedAt.UnixMilli(),
|
|
Provisioner: string(job.Provisioner),
|
|
UserName: user.Username,
|
|
}
|
|
switch job.Type {
|
|
case database.ProvisionerJobTypeWorkspaceBuild:
|
|
var input workspaceProvisionJob
|
|
err = json.Unmarshal(job.Input, &input)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("unmarshal job input %q: %s", job.Input, err))
|
|
}
|
|
workspaceBuild, err := server.Database.GetWorkspaceBuildByID(ctx, input.WorkspaceBuildID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get workspace build: %s", err))
|
|
}
|
|
workspace, err := server.Database.GetWorkspaceByID(ctx, workspaceBuild.WorkspaceID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get workspace: %s", err))
|
|
}
|
|
templateVersion, err := server.Database.GetTemplateVersionByID(ctx, workspaceBuild.TemplateVersionID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get template version: %s", err))
|
|
}
|
|
template, err := server.Database.GetTemplateByID(ctx, templateVersion.TemplateID.UUID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get template: %s", err))
|
|
}
|
|
owner, err := server.Database.GetUserByID(ctx, workspace.OwnerID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get owner: %s", err))
|
|
}
|
|
|
|
// Compute parameters for the workspace to consume.
|
|
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
|
|
TemplateImportJobID: templateVersion.JobID,
|
|
TemplateID: uuid.NullUUID{
|
|
UUID: template.ID,
|
|
Valid: true,
|
|
},
|
|
WorkspaceID: uuid.NullUUID{
|
|
UUID: workspace.ID,
|
|
Valid: true,
|
|
},
|
|
}, nil)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("compute parameters: %s", err))
|
|
}
|
|
|
|
// Convert types to their corresponding protobuf types.
|
|
protoParameters, err := convertComputedParameterValues(parameters)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("convert computed parameters to protobuf: %s", err))
|
|
}
|
|
transition, err := convertWorkspaceTransition(workspaceBuild.Transition)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("convert workspace transition: %s", err))
|
|
}
|
|
|
|
protoJob.Type = &proto.AcquiredJob_WorkspaceBuild_{
|
|
WorkspaceBuild: &proto.AcquiredJob_WorkspaceBuild{
|
|
WorkspaceBuildId: workspaceBuild.ID.String(),
|
|
WorkspaceName: workspace.Name,
|
|
State: workspaceBuild.ProvisionerState,
|
|
ParameterValues: protoParameters,
|
|
Metadata: &sdkproto.Provision_Metadata{
|
|
CoderUrl: server.AccessURL.String(),
|
|
WorkspaceTransition: transition,
|
|
WorkspaceName: workspace.Name,
|
|
WorkspaceOwner: owner.Username,
|
|
WorkspaceOwnerEmail: owner.Email,
|
|
WorkspaceId: workspace.ID.String(),
|
|
WorkspaceOwnerId: owner.ID.String(),
|
|
},
|
|
},
|
|
}
|
|
case database.ProvisionerJobTypeTemplateVersionDryRun:
|
|
var input templateVersionDryRunJob
|
|
err = json.Unmarshal(job.Input, &input)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("unmarshal job input %q: %s", job.Input, err))
|
|
}
|
|
|
|
templateVersion, err := server.Database.GetTemplateVersionByID(ctx, input.TemplateVersionID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get template version: %s", err))
|
|
}
|
|
|
|
// Compute parameters for the dry-run to consume.
|
|
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
|
|
TemplateImportJobID: templateVersion.JobID,
|
|
TemplateID: templateVersion.TemplateID,
|
|
WorkspaceID: uuid.NullUUID{},
|
|
AdditionalParameterValues: input.ParameterValues,
|
|
}, nil)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("compute parameters: %s", err))
|
|
}
|
|
|
|
// Convert types to their corresponding protobuf types.
|
|
protoParameters, err := convertComputedParameterValues(parameters)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("convert computed parameters to protobuf: %s", err))
|
|
}
|
|
|
|
protoJob.Type = &proto.AcquiredJob_TemplateDryRun_{
|
|
TemplateDryRun: &proto.AcquiredJob_TemplateDryRun{
|
|
ParameterValues: protoParameters,
|
|
Metadata: &sdkproto.Provision_Metadata{
|
|
CoderUrl: server.AccessURL.String(),
|
|
WorkspaceName: input.WorkspaceName,
|
|
},
|
|
},
|
|
}
|
|
case database.ProvisionerJobTypeTemplateVersionImport:
|
|
protoJob.Type = &proto.AcquiredJob_TemplateImport_{
|
|
TemplateImport: &proto.AcquiredJob_TemplateImport{
|
|
Metadata: &sdkproto.Provision_Metadata{
|
|
CoderUrl: server.AccessURL.String(),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
switch job.StorageMethod {
|
|
case database.ProvisionerStorageMethodFile:
|
|
file, err := server.Database.GetFileByID(ctx, job.FileID)
|
|
if err != nil {
|
|
return nil, failJob(fmt.Sprintf("get file by hash: %s", err))
|
|
}
|
|
protoJob.TemplateSourceArchive = file.Data
|
|
default:
|
|
return nil, failJob(fmt.Sprintf("unsupported storage method: %s", job.StorageMethod))
|
|
}
|
|
if protobuf.Size(protoJob) > provisionersdk.MaxMessageSize {
|
|
return nil, failJob(fmt.Sprintf("payload was too big: %d > %d", protobuf.Size(protoJob), provisionersdk.MaxMessageSize))
|
|
}
|
|
|
|
return protoJob, err
|
|
}
|
|
|
|
func (server *provisionerdServer) UpdateJob(ctx context.Context, request *proto.UpdateJobRequest) (*proto.UpdateJobResponse, error) {
|
|
parsedID, err := uuid.Parse(request.JobId)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("parse job id: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "UpdateJob starting", slog.F("job_id", parsedID))
|
|
job, err := server.Database.GetProvisionerJobByID(ctx, parsedID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get job: %w", err)
|
|
}
|
|
if !job.WorkerID.Valid {
|
|
return nil, xerrors.New("job isn't running yet")
|
|
}
|
|
if job.WorkerID.UUID.String() != server.ID.String() {
|
|
return nil, xerrors.New("you don't own this job")
|
|
}
|
|
err = server.Database.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
|
|
ID: parsedID,
|
|
UpdatedAt: database.Now(),
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update job: %w", err)
|
|
}
|
|
|
|
if len(request.Logs) > 0 {
|
|
insertParams := database.InsertProvisionerJobLogsParams{
|
|
JobID: parsedID,
|
|
}
|
|
for _, log := range request.Logs {
|
|
logLevel, err := convertLogLevel(log.Level)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert log level: %w", err)
|
|
}
|
|
logSource, err := convertLogSource(log.Source)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert log source: %w", err)
|
|
}
|
|
insertParams.ID = append(insertParams.ID, uuid.New())
|
|
insertParams.CreatedAt = append(insertParams.CreatedAt, time.UnixMilli(log.CreatedAt))
|
|
insertParams.Level = append(insertParams.Level, logLevel)
|
|
insertParams.Stage = append(insertParams.Stage, log.Stage)
|
|
insertParams.Source = append(insertParams.Source, logSource)
|
|
insertParams.Output = append(insertParams.Output, log.Output)
|
|
server.Logger.Debug(ctx, "job log",
|
|
slog.F("job_id", parsedID),
|
|
slog.F("stage", log.Stage),
|
|
slog.F("output", log.Output))
|
|
}
|
|
logs, err := server.Database.InsertProvisionerJobLogs(context.Background(), insertParams)
|
|
if err != nil {
|
|
server.Logger.Error(ctx, "failed to insert job logs", slog.F("job_id", parsedID), slog.Error(err))
|
|
return nil, xerrors.Errorf("insert job logs: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "inserted job logs", slog.F("job_id", parsedID))
|
|
data, err := json.Marshal(provisionerJobLogsMessage{Logs: logs})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("marshal job log: %w", err)
|
|
}
|
|
err = server.Pubsub.Publish(provisionerJobLogsChannel(parsedID), data)
|
|
if err != nil {
|
|
server.Logger.Error(ctx, "failed to publish job logs", slog.F("job_id", parsedID), slog.Error(err))
|
|
return nil, xerrors.Errorf("publish job log: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "published job logs", slog.F("job_id", parsedID))
|
|
}
|
|
|
|
if len(request.Readme) > 0 {
|
|
err := server.Database.UpdateTemplateVersionDescriptionByJobID(ctx, database.UpdateTemplateVersionDescriptionByJobIDParams{
|
|
JobID: job.ID,
|
|
Readme: string(request.Readme),
|
|
UpdatedAt: database.Now(),
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update template version description: %w", err)
|
|
}
|
|
}
|
|
|
|
if len(request.ParameterSchemas) > 0 {
|
|
for index, protoParameter := range request.ParameterSchemas {
|
|
validationTypeSystem, err := convertValidationTypeSystem(protoParameter.ValidationTypeSystem)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert validation type system for %q: %w", protoParameter.Name, err)
|
|
}
|
|
|
|
parameterSchema := database.InsertParameterSchemaParams{
|
|
ID: uuid.New(),
|
|
CreatedAt: database.Now(),
|
|
JobID: job.ID,
|
|
Name: protoParameter.Name,
|
|
Description: protoParameter.Description,
|
|
RedisplayValue: protoParameter.RedisplayValue,
|
|
ValidationError: protoParameter.ValidationError,
|
|
ValidationCondition: protoParameter.ValidationCondition,
|
|
ValidationValueType: protoParameter.ValidationValueType,
|
|
ValidationTypeSystem: validationTypeSystem,
|
|
|
|
DefaultSourceScheme: database.ParameterSourceSchemeNone,
|
|
DefaultDestinationScheme: database.ParameterDestinationSchemeNone,
|
|
|
|
AllowOverrideDestination: protoParameter.AllowOverrideDestination,
|
|
AllowOverrideSource: protoParameter.AllowOverrideSource,
|
|
|
|
Index: int32(index),
|
|
}
|
|
|
|
// It's possible a parameter doesn't define a default source!
|
|
if protoParameter.DefaultSource != nil {
|
|
parameterSourceScheme, err := convertParameterSourceScheme(protoParameter.DefaultSource.Scheme)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert parameter source scheme: %w", err)
|
|
}
|
|
parameterSchema.DefaultSourceScheme = parameterSourceScheme
|
|
parameterSchema.DefaultSourceValue = protoParameter.DefaultSource.Value
|
|
}
|
|
|
|
// It's possible a parameter doesn't define a default destination!
|
|
if protoParameter.DefaultDestination != nil {
|
|
parameterDestinationScheme, err := convertParameterDestinationScheme(protoParameter.DefaultDestination.Scheme)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert parameter destination scheme: %w", err)
|
|
}
|
|
parameterSchema.DefaultDestinationScheme = parameterDestinationScheme
|
|
}
|
|
|
|
_, err = server.Database.InsertParameterSchema(ctx, parameterSchema)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("insert parameter schema: %w", err)
|
|
}
|
|
}
|
|
|
|
var templateID uuid.NullUUID
|
|
if job.Type == database.ProvisionerJobTypeTemplateVersionImport {
|
|
templateVersion, err := server.Database.GetTemplateVersionByJobID(ctx, job.ID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get template version by job id: %w", err)
|
|
}
|
|
templateID = templateVersion.TemplateID
|
|
}
|
|
|
|
parameters, err := parameter.Compute(ctx, server.Database, parameter.ComputeScope{
|
|
TemplateImportJobID: job.ID,
|
|
TemplateID: templateID,
|
|
}, nil)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("compute parameters: %w", err)
|
|
}
|
|
// Convert parameters to the protobuf type.
|
|
protoParameters := make([]*sdkproto.ParameterValue, 0, len(parameters))
|
|
for _, computedParameter := range parameters {
|
|
converted, err := convertComputedParameterValue(computedParameter)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert parameter: %s", err)
|
|
}
|
|
protoParameters = append(protoParameters, converted)
|
|
}
|
|
|
|
return &proto.UpdateJobResponse{
|
|
Canceled: job.CanceledAt.Valid,
|
|
ParameterValues: protoParameters,
|
|
}, nil
|
|
}
|
|
|
|
return &proto.UpdateJobResponse{
|
|
Canceled: job.CanceledAt.Valid,
|
|
}, nil
|
|
}
|
|
|
|
func (server *provisionerdServer) FailJob(ctx context.Context, failJob *proto.FailedJob) (*proto.Empty, error) {
|
|
jobID, err := uuid.Parse(failJob.JobId)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("parse job id: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "FailJob starting", slog.F("job_id", jobID))
|
|
job, err := server.Database.GetProvisionerJobByID(ctx, jobID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get provisioner job: %w", err)
|
|
}
|
|
if job.CompletedAt.Valid {
|
|
return nil, xerrors.Errorf("job already completed")
|
|
}
|
|
job.CompletedAt = sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
}
|
|
job.Error = sql.NullString{
|
|
String: failJob.Error,
|
|
Valid: failJob.Error != "",
|
|
}
|
|
|
|
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
|
|
ID: jobID,
|
|
CompletedAt: job.CompletedAt,
|
|
UpdatedAt: database.Now(),
|
|
Error: job.Error,
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update provisioner job: %w", err)
|
|
}
|
|
server.Telemetry.Report(&telemetry.Snapshot{
|
|
ProvisionerJobs: []telemetry.ProvisionerJob{telemetry.ConvertProvisionerJob(job)},
|
|
})
|
|
|
|
switch jobType := failJob.Type.(type) {
|
|
case *proto.FailedJob_WorkspaceBuild_:
|
|
if jobType.WorkspaceBuild.State == nil {
|
|
break
|
|
}
|
|
var input workspaceProvisionJob
|
|
err = json.Unmarshal(job.Input, &input)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("unmarshal workspace provision input: %w", err)
|
|
}
|
|
err = server.Database.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
|
|
ID: input.WorkspaceBuildID,
|
|
UpdatedAt: database.Now(),
|
|
ProvisionerState: jobType.WorkspaceBuild.State,
|
|
// We are explicitly not updating deadline here.
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update workspace build state: %w", err)
|
|
}
|
|
case *proto.FailedJob_TemplateImport_:
|
|
}
|
|
|
|
data, err := json.Marshal(provisionerJobLogsMessage{EndOfLogs: true})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("marshal job log: %w", err)
|
|
}
|
|
err = server.Pubsub.Publish(provisionerJobLogsChannel(jobID), data)
|
|
if err != nil {
|
|
server.Logger.Error(ctx, "failed to publish end of job logs", slog.F("job_id", jobID), slog.Error(err))
|
|
return nil, xerrors.Errorf("publish end of job logs: %w", err)
|
|
}
|
|
return &proto.Empty{}, nil
|
|
}
|
|
|
|
// CompleteJob is triggered by a provision daemon to mark a provisioner job as completed.
|
|
func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *proto.CompletedJob) (*proto.Empty, error) {
|
|
jobID, err := uuid.Parse(completed.JobId)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("parse job id: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "CompleteJob starting", slog.F("job_id", jobID))
|
|
job, err := server.Database.GetProvisionerJobByID(ctx, jobID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get job by id: %w", err)
|
|
}
|
|
if job.WorkerID.UUID.String() != server.ID.String() {
|
|
return nil, xerrors.Errorf("you don't have permission to update this job")
|
|
}
|
|
|
|
telemetrySnapshot := &telemetry.Snapshot{}
|
|
// Items are added to this snapshot as they complete!
|
|
defer server.Telemetry.Report(telemetrySnapshot)
|
|
|
|
switch jobType := completed.Type.(type) {
|
|
case *proto.CompletedJob_TemplateImport_:
|
|
for transition, resources := range map[database.WorkspaceTransition][]*sdkproto.Resource{
|
|
database.WorkspaceTransitionStart: jobType.TemplateImport.StartResources,
|
|
database.WorkspaceTransitionStop: jobType.TemplateImport.StopResources,
|
|
} {
|
|
for _, resource := range resources {
|
|
server.Logger.Info(ctx, "inserting template import job resource",
|
|
slog.F("job_id", job.ID.String()),
|
|
slog.F("resource_name", resource.Name),
|
|
slog.F("resource_type", resource.Type),
|
|
slog.F("transition", transition))
|
|
|
|
err = insertWorkspaceResource(ctx, server.Database, jobID, transition, resource, telemetrySnapshot)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("insert resource: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
|
|
ID: jobID,
|
|
UpdatedAt: database.Now(),
|
|
CompletedAt: sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update provisioner job: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "marked import job as completed", slog.F("job_id", jobID))
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("complete job: %w", err)
|
|
}
|
|
case *proto.CompletedJob_WorkspaceBuild_:
|
|
var input workspaceProvisionJob
|
|
err = json.Unmarshal(job.Input, &input)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("unmarshal job data: %w", err)
|
|
}
|
|
|
|
workspaceBuild, err := server.Database.GetWorkspaceBuildByID(ctx, input.WorkspaceBuildID)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("get workspace build: %w", err)
|
|
}
|
|
|
|
err = server.Database.InTx(func(db database.Store) error {
|
|
now := database.Now()
|
|
var workspaceDeadline time.Time
|
|
workspace, err := db.GetWorkspaceByID(ctx, workspaceBuild.WorkspaceID)
|
|
if err == nil {
|
|
if workspace.Ttl.Valid {
|
|
workspaceDeadline = now.Add(time.Duration(workspace.Ttl.Int64))
|
|
}
|
|
} else {
|
|
// Huh? Did the workspace get deleted?
|
|
// In any case, since this is just for the TTL, try and continue anyway.
|
|
server.Logger.Error(ctx, "fetch workspace for build", slog.F("workspace_build_id", workspaceBuild.ID), slog.F("workspace_id", workspaceBuild.WorkspaceID))
|
|
}
|
|
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
|
|
ID: jobID,
|
|
UpdatedAt: database.Now(),
|
|
CompletedAt: sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("update provisioner job: %w", err)
|
|
}
|
|
err = db.UpdateWorkspaceBuildByID(ctx, database.UpdateWorkspaceBuildByIDParams{
|
|
ID: workspaceBuild.ID,
|
|
Deadline: workspaceDeadline,
|
|
ProvisionerState: jobType.WorkspaceBuild.State,
|
|
UpdatedAt: now,
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("update workspace build: %w", err)
|
|
}
|
|
// This could be a bulk insert to improve performance.
|
|
for _, protoResource := range jobType.WorkspaceBuild.Resources {
|
|
err = insertWorkspaceResource(ctx, db, job.ID, workspaceBuild.Transition, protoResource, telemetrySnapshot)
|
|
if err != nil {
|
|
return xerrors.Errorf("insert provisioner job: %w", err)
|
|
}
|
|
}
|
|
|
|
if workspaceBuild.Transition != database.WorkspaceTransitionDelete {
|
|
// This is for deleting a workspace!
|
|
return nil
|
|
}
|
|
|
|
err = db.UpdateWorkspaceDeletedByID(ctx, database.UpdateWorkspaceDeletedByIDParams{
|
|
ID: workspaceBuild.WorkspaceID,
|
|
Deleted: true,
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("update workspace deleted: %w", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("complete job: %w", err)
|
|
}
|
|
case *proto.CompletedJob_TemplateDryRun_:
|
|
for _, resource := range jobType.TemplateDryRun.Resources {
|
|
server.Logger.Info(ctx, "inserting template dry-run job resource",
|
|
slog.F("job_id", job.ID.String()),
|
|
slog.F("resource_name", resource.Name),
|
|
slog.F("resource_type", resource.Type))
|
|
|
|
err = insertWorkspaceResource(ctx, server.Database, jobID, database.WorkspaceTransitionStart, resource, telemetrySnapshot)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("insert resource: %w", err)
|
|
}
|
|
}
|
|
|
|
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
|
|
ID: jobID,
|
|
UpdatedAt: database.Now(),
|
|
CompletedAt: sql.NullTime{
|
|
Time: database.Now(),
|
|
Valid: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("update provisioner job: %w", err)
|
|
}
|
|
server.Logger.Debug(ctx, "marked template dry-run job as completed", slog.F("job_id", jobID))
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("complete job: %w", err)
|
|
}
|
|
|
|
default:
|
|
return nil, xerrors.Errorf("unknown job type %q; ensure coderd and provisionerd versions match",
|
|
reflect.TypeOf(completed.Type).String())
|
|
}
|
|
|
|
data, err := json.Marshal(provisionerJobLogsMessage{EndOfLogs: true})
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("marshal job log: %w", err)
|
|
}
|
|
err = server.Pubsub.Publish(provisionerJobLogsChannel(jobID), data)
|
|
if err != nil {
|
|
server.Logger.Error(ctx, "failed to publish end of job logs", slog.F("job_id", jobID), slog.Error(err))
|
|
return nil, xerrors.Errorf("publish end of job logs: %w", err)
|
|
}
|
|
|
|
server.Logger.Debug(ctx, "CompleteJob done", slog.F("job_id", jobID))
|
|
return &proto.Empty{}, nil
|
|
}
|
|
|
|
func insertWorkspaceResource(ctx context.Context, db database.Store, jobID uuid.UUID, transition database.WorkspaceTransition, protoResource *sdkproto.Resource, snapshot *telemetry.Snapshot) error {
|
|
resource, err := db.InsertWorkspaceResource(ctx, database.InsertWorkspaceResourceParams{
|
|
ID: uuid.New(),
|
|
CreatedAt: database.Now(),
|
|
JobID: jobID,
|
|
Transition: transition,
|
|
Type: protoResource.Type,
|
|
Name: protoResource.Name,
|
|
Hide: protoResource.Hide,
|
|
Icon: protoResource.Icon,
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("insert provisioner job resource %q: %w", protoResource.Name, err)
|
|
}
|
|
snapshot.WorkspaceResources = append(snapshot.WorkspaceResources, telemetry.ConvertWorkspaceResource(resource))
|
|
|
|
for _, prAgent := range protoResource.Agents {
|
|
var instanceID sql.NullString
|
|
if prAgent.GetInstanceId() != "" {
|
|
instanceID = sql.NullString{
|
|
String: prAgent.GetInstanceId(),
|
|
Valid: true,
|
|
}
|
|
}
|
|
var env pqtype.NullRawMessage
|
|
if prAgent.Env != nil {
|
|
data, err := json.Marshal(prAgent.Env)
|
|
if err != nil {
|
|
return xerrors.Errorf("marshal env: %w", err)
|
|
}
|
|
env = pqtype.NullRawMessage{
|
|
RawMessage: data,
|
|
Valid: true,
|
|
}
|
|
}
|
|
authToken := uuid.New()
|
|
if prAgent.GetToken() != "" {
|
|
authToken, err = uuid.Parse(prAgent.GetToken())
|
|
if err != nil {
|
|
return xerrors.Errorf("invalid auth token format; must be uuid: %w", err)
|
|
}
|
|
}
|
|
|
|
agentID := uuid.New()
|
|
dbAgent, err := db.InsertWorkspaceAgent(ctx, database.InsertWorkspaceAgentParams{
|
|
ID: agentID,
|
|
CreatedAt: database.Now(),
|
|
UpdatedAt: database.Now(),
|
|
ResourceID: resource.ID,
|
|
Name: prAgent.Name,
|
|
AuthToken: authToken,
|
|
AuthInstanceID: instanceID,
|
|
Architecture: prAgent.Architecture,
|
|
EnvironmentVariables: env,
|
|
Directory: prAgent.Directory,
|
|
OperatingSystem: prAgent.OperatingSystem,
|
|
StartupScript: sql.NullString{
|
|
String: prAgent.StartupScript,
|
|
Valid: prAgent.StartupScript != "",
|
|
},
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("insert agent: %w", err)
|
|
}
|
|
snapshot.WorkspaceAgents = append(snapshot.WorkspaceAgents, telemetry.ConvertWorkspaceAgent(dbAgent))
|
|
|
|
for _, app := range prAgent.Apps {
|
|
health := database.WorkspaceAppHealthDisabled
|
|
if app.Healthcheck == nil {
|
|
app.Healthcheck = &sdkproto.Healthcheck{}
|
|
}
|
|
if app.Healthcheck.Url != "" {
|
|
health = database.WorkspaceAppHealthInitializing
|
|
}
|
|
|
|
dbApp, err := db.InsertWorkspaceApp(ctx, database.InsertWorkspaceAppParams{
|
|
ID: uuid.New(),
|
|
CreatedAt: database.Now(),
|
|
AgentID: dbAgent.ID,
|
|
Name: app.Name,
|
|
Icon: app.Icon,
|
|
Command: sql.NullString{
|
|
String: app.Command,
|
|
Valid: app.Command != "",
|
|
},
|
|
Url: sql.NullString{
|
|
String: app.Url,
|
|
Valid: app.Url != "",
|
|
},
|
|
Subdomain: app.Subdomain,
|
|
HealthcheckUrl: app.Healthcheck.Url,
|
|
HealthcheckInterval: app.Healthcheck.Interval,
|
|
HealthcheckThreshold: app.Healthcheck.Threshold,
|
|
Health: health,
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("insert app: %w", err)
|
|
}
|
|
snapshot.WorkspaceApps = append(snapshot.WorkspaceApps, telemetry.ConvertWorkspaceApp(dbApp))
|
|
}
|
|
}
|
|
|
|
for _, metadatum := range protoResource.Metadata {
|
|
var value sql.NullString
|
|
if !metadatum.IsNull {
|
|
value.String = metadatum.Value
|
|
value.Valid = true
|
|
}
|
|
|
|
_, err := db.InsertWorkspaceResourceMetadata(ctx, database.InsertWorkspaceResourceMetadataParams{
|
|
WorkspaceResourceID: resource.ID,
|
|
Key: metadatum.Key,
|
|
Value: value,
|
|
Sensitive: metadatum.Sensitive,
|
|
})
|
|
if err != nil {
|
|
return xerrors.Errorf("insert metadata: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func convertValidationTypeSystem(typeSystem sdkproto.ParameterSchema_TypeSystem) (database.ParameterTypeSystem, error) {
|
|
switch typeSystem {
|
|
case sdkproto.ParameterSchema_None:
|
|
return database.ParameterTypeSystemNone, nil
|
|
case sdkproto.ParameterSchema_HCL:
|
|
return database.ParameterTypeSystemHCL, nil
|
|
default:
|
|
return database.ParameterTypeSystem(""), xerrors.Errorf("unknown type system: %d", typeSystem)
|
|
}
|
|
}
|
|
|
|
func convertParameterSourceScheme(sourceScheme sdkproto.ParameterSource_Scheme) (database.ParameterSourceScheme, error) {
|
|
switch sourceScheme {
|
|
case sdkproto.ParameterSource_DATA:
|
|
return database.ParameterSourceSchemeData, nil
|
|
default:
|
|
return database.ParameterSourceScheme(""), xerrors.Errorf("unknown parameter source scheme: %d", sourceScheme)
|
|
}
|
|
}
|
|
|
|
func convertParameterDestinationScheme(destinationScheme sdkproto.ParameterDestination_Scheme) (database.ParameterDestinationScheme, error) {
|
|
switch destinationScheme {
|
|
case sdkproto.ParameterDestination_ENVIRONMENT_VARIABLE:
|
|
return database.ParameterDestinationSchemeEnvironmentVariable, nil
|
|
case sdkproto.ParameterDestination_PROVISIONER_VARIABLE:
|
|
return database.ParameterDestinationSchemeProvisionerVariable, nil
|
|
default:
|
|
return database.ParameterDestinationScheme(""), xerrors.Errorf("unknown parameter destination scheme: %d", destinationScheme)
|
|
}
|
|
}
|
|
|
|
func convertLogLevel(logLevel sdkproto.LogLevel) (database.LogLevel, error) {
|
|
switch logLevel {
|
|
case sdkproto.LogLevel_TRACE:
|
|
return database.LogLevelTrace, nil
|
|
case sdkproto.LogLevel_DEBUG:
|
|
return database.LogLevelDebug, nil
|
|
case sdkproto.LogLevel_INFO:
|
|
return database.LogLevelInfo, nil
|
|
case sdkproto.LogLevel_WARN:
|
|
return database.LogLevelWarn, nil
|
|
case sdkproto.LogLevel_ERROR:
|
|
return database.LogLevelError, nil
|
|
default:
|
|
return database.LogLevel(""), xerrors.Errorf("unknown log level: %d", logLevel)
|
|
}
|
|
}
|
|
|
|
func convertLogSource(logSource proto.LogSource) (database.LogSource, error) {
|
|
switch logSource {
|
|
case proto.LogSource_PROVISIONER_DAEMON:
|
|
return database.LogSourceProvisionerDaemon, nil
|
|
case proto.LogSource_PROVISIONER:
|
|
return database.LogSourceProvisioner, nil
|
|
default:
|
|
return database.LogSource(""), xerrors.Errorf("unknown log source: %d", logSource)
|
|
}
|
|
}
|
|
|
|
func convertComputedParameterValues(parameters []parameter.ComputedValue) ([]*sdkproto.ParameterValue, error) {
|
|
protoParameters := make([]*sdkproto.ParameterValue, len(parameters))
|
|
for i, computedParameter := range parameters {
|
|
converted, err := convertComputedParameterValue(computedParameter)
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("convert parameter: %w", err)
|
|
}
|
|
protoParameters[i] = converted
|
|
}
|
|
|
|
return protoParameters, nil
|
|
}
|
|
|
|
func convertComputedParameterValue(param parameter.ComputedValue) (*sdkproto.ParameterValue, error) {
|
|
var scheme sdkproto.ParameterDestination_Scheme
|
|
switch param.DestinationScheme {
|
|
case database.ParameterDestinationSchemeEnvironmentVariable:
|
|
scheme = sdkproto.ParameterDestination_ENVIRONMENT_VARIABLE
|
|
case database.ParameterDestinationSchemeProvisionerVariable:
|
|
scheme = sdkproto.ParameterDestination_PROVISIONER_VARIABLE
|
|
default:
|
|
return nil, xerrors.Errorf("unrecognized destination scheme: %q", param.DestinationScheme)
|
|
}
|
|
|
|
return &sdkproto.ParameterValue{
|
|
DestinationScheme: scheme,
|
|
Name: param.Name,
|
|
Value: param.SourceValue,
|
|
}, nil
|
|
}
|
|
|
|
func convertWorkspaceTransition(transition database.WorkspaceTransition) (sdkproto.WorkspaceTransition, error) {
|
|
switch transition {
|
|
case database.WorkspaceTransitionStart:
|
|
return sdkproto.WorkspaceTransition_START, nil
|
|
case database.WorkspaceTransitionStop:
|
|
return sdkproto.WorkspaceTransition_STOP, nil
|
|
case database.WorkspaceTransitionDelete:
|
|
return sdkproto.WorkspaceTransition_DESTROY, nil
|
|
default:
|
|
return 0, xerrors.Errorf("unrecognized transition: %q", transition)
|
|
}
|
|
}
|