Merge branch 'main' of github.com:/coder/coder into dk/prebuilds

Signed-off-by: Danny Kopping <dannykopping@gmail.com>
This commit is contained in:
Danny Kopping
2025-03-04 10:06:58 +00:00
210 changed files with 5932 additions and 1846 deletions

View File

@ -10,6 +10,7 @@ import (
"math"
"reflect"
"regexp"
"slices"
"sort"
"strings"
"sync"
@ -19,7 +20,6 @@ import (
"github.com/lib/pq"
"golang.org/x/exp/constraints"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/xerrors"
"github.com/coder/coder/v2/coderd/notifications/types"
@ -67,6 +67,7 @@ func New() database.Store {
gitSSHKey: make([]database.GitSSHKey, 0),
notificationMessages: make([]database.NotificationMessage, 0),
notificationPreferences: make([]database.NotificationPreference, 0),
InboxNotification: make([]database.InboxNotification, 0),
parameterSchemas: make([]database.ParameterSchema, 0),
provisionerDaemons: make([]database.ProvisionerDaemon, 0),
provisionerKeys: make([]database.ProvisionerKey, 0),
@ -206,6 +207,7 @@ type data struct {
notificationMessages []database.NotificationMessage
notificationPreferences []database.NotificationPreference
notificationReportGeneratorLogs []database.NotificationReportGeneratorLog
InboxNotification []database.InboxNotification
oauth2ProviderApps []database.OAuth2ProviderApp
oauth2ProviderAppSecrets []database.OAuth2ProviderAppSecret
oauth2ProviderAppCodes []database.OAuth2ProviderAppCode
@ -269,7 +271,7 @@ type data struct {
presetParameters []database.TemplateVersionPresetParameter
}
func tryPercentile(fs []float64, p float64) float64 {
func tryPercentileCont(fs []float64, p float64) float64 {
if len(fs) == 0 {
return -1
}
@ -282,6 +284,14 @@ func tryPercentile(fs []float64, p float64) float64 {
return fs[lower] + (fs[upper]-fs[lower])*(pos-float64(lower))
}
func tryPercentileDisc(fs []float64, p float64) float64 {
if len(fs) == 0 {
return -1
}
sort.Float64s(fs)
return fs[max(int(math.Ceil(float64(len(fs))*p/100-1)), 0)]
}
func validateDatabaseTypeWithValid(v reflect.Value) (handled bool, err error) {
if v.Kind() == reflect.Struct {
return false, nil
@ -1139,7 +1149,119 @@ func getOwnerFromTags(tags map[string]string) string {
return ""
}
func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLocked(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
// provisionerTagsetContains checks if daemonTags contain all key-value pairs from jobTags
func provisionerTagsetContains(daemonTags, jobTags map[string]string) bool {
for jobKey, jobValue := range jobTags {
if daemonValue, exists := daemonTags[jobKey]; !exists || daemonValue != jobValue {
return false
}
}
return true
}
// GetProvisionerJobsByIDsWithQueuePosition mimics the SQL logic in pure Go
func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(_ context.Context, jobIDs []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
// Step 1: Filter provisionerJobs based on jobIDs
filteredJobs := make(map[uuid.UUID]database.ProvisionerJob)
for _, job := range q.provisionerJobs {
for _, id := range jobIDs {
if job.ID == id {
filteredJobs[job.ID] = job
}
}
}
// Step 2: Identify pending jobs
pendingJobs := make(map[uuid.UUID]database.ProvisionerJob)
for _, job := range q.provisionerJobs {
if job.JobStatus == "pending" {
pendingJobs[job.ID] = job
}
}
// Step 3: Identify pending jobs that have a matching provisioner
matchedJobs := make(map[uuid.UUID]struct{})
for _, job := range pendingJobs {
for _, daemon := range q.provisionerDaemons {
if provisionerTagsetContains(daemon.Tags, job.Tags) {
matchedJobs[job.ID] = struct{}{}
break
}
}
}
// Step 4: Rank pending jobs per provisioner
jobRanks := make(map[uuid.UUID][]database.ProvisionerJob)
for _, job := range pendingJobs {
for _, daemon := range q.provisionerDaemons {
if provisionerTagsetContains(daemon.Tags, job.Tags) {
jobRanks[daemon.ID] = append(jobRanks[daemon.ID], job)
}
}
}
// Sort jobs per provisioner by CreatedAt
for daemonID := range jobRanks {
sort.Slice(jobRanks[daemonID], func(i, j int) bool {
return jobRanks[daemonID][i].CreatedAt.Before(jobRanks[daemonID][j].CreatedAt)
})
}
// Step 5: Compute queue position & max queue size across all provisioners
jobQueueStats := make(map[uuid.UUID]database.GetProvisionerJobsByIDsWithQueuePositionRow)
for _, jobs := range jobRanks {
queueSize := int64(len(jobs)) // Queue size per provisioner
for i, job := range jobs {
queuePosition := int64(i + 1)
// If the job already exists, update only if this queuePosition is better
if existing, exists := jobQueueStats[job.ID]; exists {
jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{
ID: job.ID,
CreatedAt: job.CreatedAt,
ProvisionerJob: job,
QueuePosition: min(existing.QueuePosition, queuePosition),
QueueSize: max(existing.QueueSize, queueSize), // Take the maximum queue size across provisioners
}
} else {
jobQueueStats[job.ID] = database.GetProvisionerJobsByIDsWithQueuePositionRow{
ID: job.ID,
CreatedAt: job.CreatedAt,
ProvisionerJob: job,
QueuePosition: queuePosition,
QueueSize: queueSize,
}
}
}
}
// Step 6: Compute the final results with minimal checks
var results []database.GetProvisionerJobsByIDsWithQueuePositionRow
for _, job := range filteredJobs {
// If the job has a computed rank, use it
if rank, found := jobQueueStats[job.ID]; found {
results = append(results, rank)
} else {
// Otherwise, return (0,0) for non-pending jobs and unranked pending jobs
results = append(results, database.GetProvisionerJobsByIDsWithQueuePositionRow{
ID: job.ID,
CreatedAt: job.CreatedAt,
ProvisionerJob: job,
QueuePosition: 0,
QueueSize: 0,
})
}
}
// Step 7: Sort results by CreatedAt
sort.Slice(results, func(i, j int) bool {
return results[i].CreatedAt.Before(results[j].CreatedAt)
})
return results, nil
}
func (q *FakeQuerier) getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(_ context.Context, ids []uuid.UUID) ([]database.GetProvisionerJobsByIDsWithQueuePositionRow, error) {
// WITH pending_jobs AS (
// SELECT
// id, created_at
@ -1602,6 +1724,26 @@ func (*FakeQuerier) CleanTailnetTunnels(context.Context) error {
return ErrUnimplemented
}
func (q *FakeQuerier) CountUnreadInboxNotificationsByUserID(_ context.Context, userID uuid.UUID) (int64, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
var count int64
for _, notification := range q.InboxNotification {
if notification.UserID != userID {
continue
}
if notification.ReadAt.Valid {
continue
}
count++
}
return count, nil
}
func (q *FakeQuerier) CustomRoles(_ context.Context, arg database.CustomRolesParams) ([]database.CustomRole, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
@ -2365,6 +2507,19 @@ func (q *FakeQuerier) FetchMemoryResourceMonitorsByAgentID(_ context.Context, ag
return database.WorkspaceAgentMemoryResourceMonitor{}, sql.ErrNoRows
}
func (q *FakeQuerier) FetchMemoryResourceMonitorsUpdatedAfter(_ context.Context, updatedAt time.Time) ([]database.WorkspaceAgentMemoryResourceMonitor, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
monitors := []database.WorkspaceAgentMemoryResourceMonitor{}
for _, monitor := range q.workspaceAgentMemoryResourceMonitors {
if monitor.UpdatedAt.After(updatedAt) {
monitors = append(monitors, monitor)
}
}
return monitors, nil
}
func (q *FakeQuerier) FetchNewMessageMetadata(_ context.Context, arg database.FetchNewMessageMetadataParams) (database.FetchNewMessageMetadataRow, error) {
err := validateDatabaseType(arg)
if err != nil {
@ -2409,6 +2564,19 @@ func (q *FakeQuerier) FetchVolumesResourceMonitorsByAgentID(_ context.Context, a
return monitors, nil
}
func (q *FakeQuerier) FetchVolumesResourceMonitorsUpdatedAfter(_ context.Context, updatedAt time.Time) ([]database.WorkspaceAgentVolumeResourceMonitor, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
monitors := []database.WorkspaceAgentVolumeResourceMonitor{}
for _, monitor := range q.workspaceAgentVolumeResourceMonitors {
if monitor.UpdatedAt.After(updatedAt) {
monitors = append(monitors, monitor)
}
}
return monitors, nil
}
func (q *FakeQuerier) GetAPIKeyByID(_ context.Context, id string) (database.APIKey, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
@ -2794,8 +2962,8 @@ func (q *FakeQuerier) GetDeploymentWorkspaceAgentStats(_ context.Context, create
latencies = append(latencies, agentStat.ConnectionMedianLatencyMS)
}
stat.WorkspaceConnectionLatency50 = tryPercentile(latencies, 50)
stat.WorkspaceConnectionLatency95 = tryPercentile(latencies, 95)
stat.WorkspaceConnectionLatency50 = tryPercentileCont(latencies, 50)
stat.WorkspaceConnectionLatency95 = tryPercentileCont(latencies, 95)
return stat, nil
}
@ -2843,8 +3011,8 @@ func (q *FakeQuerier) GetDeploymentWorkspaceAgentUsageStats(_ context.Context, c
stat.WorkspaceTxBytes += agentStat.TxBytes
latencies = append(latencies, agentStat.ConnectionMedianLatencyMS)
}
stat.WorkspaceConnectionLatency50 = tryPercentile(latencies, 50)
stat.WorkspaceConnectionLatency95 = tryPercentile(latencies, 95)
stat.WorkspaceConnectionLatency50 = tryPercentileCont(latencies, 50)
stat.WorkspaceConnectionLatency95 = tryPercentileCont(latencies, 95)
for _, agentStat := range sessions {
stat.SessionCountVSCode += agentStat.SessionCountVSCode
@ -3126,6 +3294,45 @@ func (q *FakeQuerier) GetFileTemplates(_ context.Context, id uuid.UUID) ([]datab
return rows, nil
}
func (q *FakeQuerier) GetFilteredInboxNotificationsByUserID(_ context.Context, arg database.GetFilteredInboxNotificationsByUserIDParams) ([]database.InboxNotification, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
notifications := make([]database.InboxNotification, 0)
for _, notification := range q.InboxNotification {
if notification.UserID == arg.UserID {
for _, template := range arg.Templates {
templateFound := false
if notification.TemplateID == template {
templateFound = true
}
if !templateFound {
continue
}
}
for _, target := range arg.Targets {
isFound := false
for _, insertedTarget := range notification.Targets {
if insertedTarget == target {
isFound = true
break
}
}
if !isFound {
continue
}
notifications = append(notifications, notification)
}
}
}
return notifications, nil
}
func (q *FakeQuerier) GetGitSSHKey(_ context.Context, userID uuid.UUID) (database.GitSSHKey, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
@ -3324,6 +3531,33 @@ func (q *FakeQuerier) GetHungProvisionerJobs(_ context.Context, hungSince time.T
return hungJobs, nil
}
func (q *FakeQuerier) GetInboxNotificationByID(_ context.Context, id uuid.UUID) (database.InboxNotification, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
for _, notification := range q.InboxNotification {
if notification.ID == id {
return notification, nil
}
}
return database.InboxNotification{}, sql.ErrNoRows
}
func (q *FakeQuerier) GetInboxNotificationsByUserID(_ context.Context, params database.GetInboxNotificationsByUserIDParams) ([]database.InboxNotification, error) {
q.mutex.RLock()
defer q.mutex.RUnlock()
notifications := make([]database.InboxNotification, 0)
for _, notification := range q.InboxNotification {
if notification.UserID == params.UserID {
notifications = append(notifications, notification)
}
}
return notifications, nil
}
func (q *FakeQuerier) GetJFrogXrayScanByWorkspaceAndAgentID(_ context.Context, arg database.GetJFrogXrayScanByWorkspaceAndAgentIDParams) (database.JfrogXrayScan, error) {
err := validateDatabaseType(arg)
if err != nil {
@ -4085,7 +4319,7 @@ func (q *FakeQuerier) GetProvisionerDaemonsWithStatusByOrganization(ctx context.
}
slices.SortFunc(rows, func(a, b database.GetProvisionerDaemonsWithStatusByOrganizationRow) int {
return a.ProvisionerDaemon.CreatedAt.Compare(b.ProvisionerDaemon.CreatedAt)
return b.ProvisionerDaemon.CreatedAt.Compare(a.ProvisionerDaemon.CreatedAt)
})
if arg.Limit.Valid && arg.Limit.Int32 > 0 && len(rows) > int(arg.Limit.Int32) {
@ -4153,7 +4387,7 @@ func (q *FakeQuerier) GetProvisionerJobsByIDsWithQueuePosition(ctx context.Conte
if ids == nil {
ids = []uuid.UUID{}
}
return q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, ids)
return q.getProvisionerJobsByIDsWithQueuePositionLockedTagBasedQueue(ctx, ids)
}
func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner(ctx context.Context, arg database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerParams) ([]database.GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisionerRow, error) {
@ -4222,7 +4456,7 @@ func (q *FakeQuerier) GetProvisionerJobsByOrganizationAndStatusWithQueuePosition
LIMIT
sqlc.narg('limit')::int;
*/
rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLocked(ctx, nil)
rowsWithQueuePosition, err := q.getProvisionerJobsByIDsWithQueuePositionLockedGlobalQueue(ctx, nil)
if err != nil {
return nil, err
}
@ -5003,9 +5237,9 @@ func (q *FakeQuerier) GetTemplateAverageBuildTime(ctx context.Context, arg datab
}
var row database.GetTemplateAverageBuildTimeRow
row.Delete50, row.Delete95 = tryPercentile(deleteTimes, 50), tryPercentile(deleteTimes, 95)
row.Stop50, row.Stop95 = tryPercentile(stopTimes, 50), tryPercentile(stopTimes, 95)
row.Start50, row.Start95 = tryPercentile(startTimes, 50), tryPercentile(startTimes, 95)
row.Delete50, row.Delete95 = tryPercentileDisc(deleteTimes, 50), tryPercentileDisc(deleteTimes, 95)
row.Stop50, row.Stop95 = tryPercentileDisc(stopTimes, 50), tryPercentileDisc(stopTimes, 95)
row.Start50, row.Start95 = tryPercentileDisc(startTimes, 50), tryPercentileDisc(startTimes, 95)
return row, nil
}
@ -6044,8 +6278,8 @@ func (q *FakeQuerier) GetUserLatencyInsights(_ context.Context, arg database.Get
Username: user.Username,
AvatarURL: user.AvatarURL,
TemplateIDs: seenTemplatesByUserID[userID],
WorkspaceConnectionLatency50: tryPercentile(latencies, 50),
WorkspaceConnectionLatency95: tryPercentile(latencies, 95),
WorkspaceConnectionLatency50: tryPercentileCont(latencies, 50),
WorkspaceConnectionLatency95: tryPercentileCont(latencies, 95),
}
rows = append(rows, row)
}
@ -6689,8 +6923,8 @@ func (q *FakeQuerier) GetWorkspaceAgentStats(_ context.Context, createdAfter tim
if !ok {
continue
}
stat.WorkspaceConnectionLatency50 = tryPercentile(latencies, 50)
stat.WorkspaceConnectionLatency95 = tryPercentile(latencies, 95)
stat.WorkspaceConnectionLatency50 = tryPercentileCont(latencies, 50)
stat.WorkspaceConnectionLatency95 = tryPercentileCont(latencies, 95)
statByAgent[stat.AgentID] = stat
}
@ -6827,8 +7061,8 @@ func (q *FakeQuerier) GetWorkspaceAgentUsageStats(_ context.Context, createdAt t
for key, latencies := range latestAgentLatencies {
val, ok := latestAgentStats[key]
if ok {
val.WorkspaceConnectionLatency50 = tryPercentile(latencies, 50)
val.WorkspaceConnectionLatency95 = tryPercentile(latencies, 95)
val.WorkspaceConnectionLatency50 = tryPercentileCont(latencies, 50)
val.WorkspaceConnectionLatency95 = tryPercentileCont(latencies, 95)
}
latestAgentStats[key] = val
}
@ -6938,7 +7172,7 @@ func (q *FakeQuerier) GetWorkspaceAgentUsageStatsAndLabels(_ context.Context, cr
}
// WHERE usage = true AND created_at > now() - '1 minute'::interval
// GROUP BY user_id, agent_id, workspace_id
if agentStat.Usage && agentStat.CreatedAt.After(time.Now().Add(-time.Minute)) {
if agentStat.Usage && agentStat.CreatedAt.After(dbtime.Now().Add(-time.Minute)) {
val, ok := latestAgentStats[key]
if !ok {
latestAgentStats[key] = agentStat
@ -7977,6 +8211,30 @@ func (q *FakeQuerier) InsertGroupMember(_ context.Context, arg database.InsertGr
return nil
}
func (q *FakeQuerier) InsertInboxNotification(_ context.Context, arg database.InsertInboxNotificationParams) (database.InboxNotification, error) {
if err := validateDatabaseType(arg); err != nil {
return database.InboxNotification{}, err
}
q.mutex.Lock()
defer q.mutex.Unlock()
notification := database.InboxNotification{
ID: arg.ID,
UserID: arg.UserID,
TemplateID: arg.TemplateID,
Targets: arg.Targets,
Title: arg.Title,
Content: arg.Content,
Icon: arg.Icon,
Actions: arg.Actions,
CreatedAt: time.Now(),
}
q.InboxNotification = append(q.InboxNotification, notification)
return notification, nil
}
func (q *FakeQuerier) InsertLicense(
_ context.Context, arg database.InsertLicenseParams,
) (database.License, error) {
@ -9700,6 +9958,24 @@ func (q *FakeQuerier) UpdateInactiveUsersToDormant(_ context.Context, params dat
return updated, nil
}
func (q *FakeQuerier) UpdateInboxNotificationReadStatus(_ context.Context, arg database.UpdateInboxNotificationReadStatusParams) error {
err := validateDatabaseType(arg)
if err != nil {
return err
}
q.mutex.Lock()
defer q.mutex.Unlock()
for i := range q.InboxNotification {
if q.InboxNotification[i].ID == arg.ID {
q.InboxNotification[i].ReadAt = arg.ReadAt
}
}
return nil
}
func (q *FakeQuerier) UpdateMemberRoles(_ context.Context, arg database.UpdateMemberRolesParams) (database.OrganizationMember, error) {
if err := validateDatabaseType(arg); err != nil {
return database.OrganizationMember{}, err