fix: Simplify provisionerd job acquire (#158)

This uses a simple channel to detect whether a
job is running or not, and moves all cancels
to be in goroutines.
This commit is contained in:
Kyle Carberry
2022-02-03 19:13:22 -06:00
committed by GitHub
parent 7884b43c78
commit c65850b654
11 changed files with 186 additions and 131 deletions

View File

@ -23,6 +23,7 @@ type Options struct {
func New(options *Options) http.Handler {
api := &api{
Database: options.Database,
Logger: options.Logger,
Pubsub: options.Pubsub,
}
@ -110,5 +111,6 @@ func New(options *Options) http.Handler {
// be added to this struct for code clarity.
type api struct {
Database database.Store
Logger slog.Logger
Pubsub database.Pubsub
}

View File

@ -125,7 +125,7 @@ func New(t *testing.T) Server {
}
handler := coderd.New(&coderd.Options{
Logger: slogtest.Make(t, nil),
Logger: slogtest.Make(t, nil).Leveled(slog.LevelDebug),
Database: db,
Pubsub: pubsub,
})

View File

@ -19,6 +19,8 @@ import (
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
"cdr.dev/slog"
"github.com/coder/coder/coderd/projectparameter"
"github.com/coder/coder/database"
"github.com/coder/coder/httpapi"
@ -84,6 +86,7 @@ func (api *api) provisionerDaemonsServe(rw http.ResponseWriter, r *http.Request)
Database: api.Database,
Pubsub: api.Pubsub,
Provisioners: daemon.Provisioners,
Logger: api.Logger.Named(fmt.Sprintf("provisionerd-%s", daemon.Name)),
})
if err != nil {
_ = conn.Close(websocket.StatusInternalError, fmt.Sprintf("drpc register provisioner daemon: %s", err))
@ -109,6 +112,7 @@ type projectImportJob struct {
// Implementation of the provisioner daemon protobuf server.
type provisionerdServer struct {
ID uuid.UUID
Logger slog.Logger
Provisioners []database.ProvisionerType
Database database.Store
Pubsub database.Pubsub
@ -136,9 +140,11 @@ func (server *provisionerdServer) AcquireJob(ctx context.Context, _ *proto.Empty
if err != nil {
return nil, xerrors.Errorf("acquire job: %w", err)
}
server.Logger.Debug(ctx, "locked job from database", slog.F("id", job.ID))
// Marks the acquired job as failed with the error message provided.
failJob := func(errorMessage string) error {
err = server.Database.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: job.ID,
CompletedAt: sql.NullTime{
Time: database.Now(),
@ -381,8 +387,12 @@ func (server *provisionerdServer) CancelJob(ctx context.Context, cancelJob *prot
if err != nil {
return nil, xerrors.Errorf("parse job id: %w", err)
}
err = server.Database.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
err = server.Database.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: jobID,
CompletedAt: sql.NullTime{
Time: database.Now(),
Valid: true,
},
CancelledAt: sql.NullTime{
Time: database.Now(),
Valid: true,
@ -476,7 +486,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
// This must occur in a transaction in case of failure.
err = server.Database.InTx(func(db database.Store) error {
err = db.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: jobID,
UpdatedAt: database.Now(),
CompletedAt: sql.NullTime{
@ -495,6 +505,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
return xerrors.Errorf("insert project parameter %q: %w", projectParameter.Name, err)
}
}
server.Logger.Debug(ctx, "marked import job as completed", slog.F("job_id", jobID))
return nil
})
if err != nil {
@ -513,7 +524,7 @@ func (server *provisionerdServer) CompleteJob(ctx context.Context, completed *pr
}
err = server.Database.InTx(func(db database.Store) error {
err = db.UpdateProvisionerJobByID(ctx, database.UpdateProvisionerJobByIDParams{
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: jobID,
UpdatedAt: database.Now(),
CompletedAt: sql.NullTime{

View File

@ -12,7 +12,7 @@ type ProvisionerJobStatus string
// Completed returns whether the job is still processing.
func (p ProvisionerJobStatus) Completed() bool {
return p == ProvisionerJobStatusSucceeded || p == ProvisionerJobStatusFailed
return p == ProvisionerJobStatusSucceeded || p == ProvisionerJobStatusFailed || p == ProvisionerJobStatusCancelled
}
const (

View File

@ -82,6 +82,10 @@ func (api *api) postWorkspaceHistoryByUser(rw http.ResponseWriter, r *http.Reque
Message: fmt.Sprintf("The provided project history %q has failed to import. You cannot create workspaces using it!", projectHistory.Name),
})
return
case ProvisionerJobStatusCancelled:
httpapi.Write(rw, http.StatusPreconditionFailed, httpapi.Response{
Message: "The provided project history was canceled during import. You cannot create workspaces using it!",
})
}
project, err := api.Database.GetProjectByID(r.Context(), projectHistory.ProjectID)

View File

@ -56,6 +56,7 @@ func TestWorkspaceHistory(t *testing.T) {
require.Eventually(t, func() bool {
hist, err := client.ProjectHistory(context.Background(), user.Organization, project.Name, projectHistory.Name)
require.NoError(t, err)
t.Logf("Import status: %s\n", hist.Import.Status)
return hist.Import.Status.Completed()
}, 15*time.Second, 50*time.Millisecond)
return projectHistory