feat: add queue_position and queue_size to provisioner jobs (#8074)

This commit is contained in:
Kyle Carberry
2023-06-20 15:07:18 -05:00
committed by GitHub
parent bbb0fab1de
commit 69f911dfd5
26 changed files with 417 additions and 75 deletions

View File

@ -5,6 +5,8 @@ package database_test
import (
"context"
"database/sql"
"encoding/json"
"sort"
"testing"
"time"
@ -312,3 +314,76 @@ func TestDefaultProxy(t *testing.T) {
require.NoError(t, err, "get deployment id")
require.Equal(t, depID, found)
}
func TestQueuePosition(t *testing.T) {
t.Parallel()
if testing.Short() {
t.SkipNow()
}
sqlDB := testSQLDB(t)
err := migrations.Up(sqlDB)
require.NoError(t, err)
db := database.New(sqlDB)
ctx := testutil.Context(t, testutil.WaitLong)
org := dbgen.Organization(t, db, database.Organization{})
jobCount := 10
jobs := []database.ProvisionerJob{}
jobIDs := []uuid.UUID{}
for i := 0; i < jobCount; i++ {
job := dbgen.ProvisionerJob(t, db, database.ProvisionerJob{
OrganizationID: org.ID,
Tags: database.StringMap{},
})
jobs = append(jobs, job)
jobIDs = append(jobIDs, job.ID)
// We need a slight amount of time between each insertion to ensure that
// the queue position is correct... it's sorted by `created_at`.
time.Sleep(time.Millisecond)
}
queued, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
require.NoError(t, err)
require.Len(t, queued, jobCount)
sort.Slice(queued, func(i, j int) bool {
return queued[i].QueuePosition < queued[j].QueuePosition
})
// Ensure that the queue positions are correct based on insertion ID!
for index, job := range queued {
require.Equal(t, job.QueuePosition, int64(index+1))
require.Equal(t, job.ProvisionerJob.ID, jobs[index].ID)
}
job, err := db.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{
StartedAt: sql.NullTime{
Time: database.Now(),
Valid: true,
},
Types: database.AllProvisionerTypeValues(),
WorkerID: uuid.NullUUID{
UUID: uuid.New(),
Valid: true,
},
Tags: json.RawMessage("{}"),
})
require.NoError(t, err)
require.Equal(t, jobs[0].ID, job.ID)
queued, err = db.GetProvisionerJobsByIDsWithQueuePosition(ctx, jobIDs)
require.NoError(t, err)
require.Len(t, queued, jobCount)
sort.Slice(queued, func(i, j int) bool {
return queued[i].QueuePosition < queued[j].QueuePosition
})
// Ensure that queue positions are updated now that the first job has been acquired!
for index, job := range queued {
if index == 0 {
require.Equal(t, job.QueuePosition, int64(0))
continue
}
require.Equal(t, job.QueuePosition, int64(index))
require.Equal(t, job.ProvisionerJob.ID, jobs[index].ID)
}
}