From 630ec55c48acf18af80d9aef3e7686960ffda08e Mon Sep 17 00:00:00 2001 From: Ammar Bandukwala Date: Thu, 24 Aug 2023 15:18:42 -0500 Subject: [PATCH] fix(coderd): remove rate limits from agent metadata (#9308) Include the full update message in the PubSub notification so that we don't have to refresh metadata from the DB and can avoid rate limiting. --- coderd/workspaceagents.go | 178 +++++++++++++++++---------------- coderd/workspaceagents_test.go | 2 +- 2 files changed, 95 insertions(+), 85 deletions(-) diff --git a/coderd/workspaceagents.go b/coderd/workspaceagents.go index 3f90abb3a4..c127b2342d 100644 --- a/coderd/workspaceagents.go +++ b/coderd/workspaceagents.go @@ -6,7 +6,6 @@ import ( "database/sql" "encoding/json" "errors" - "flag" "fmt" "io" "net" @@ -23,10 +22,9 @@ import ( "github.com/go-chi/chi/v5" "github.com/google/uuid" - "golang.org/x/exp/slices" + "golang.org/x/exp/maps" "golang.org/x/mod/semver" "golang.org/x/sync/errgroup" - "golang.org/x/time/rate" "golang.org/x/xerrors" "nhooyr.io/websocket" "tailscale.com/tailcfg" @@ -39,7 +37,6 @@ import ( "github.com/coder/coder/v2/coderd/httpmw" "github.com/coder/coder/v2/coderd/rbac" "github.com/coder/coder/v2/coderd/util/ptr" - "github.com/coder/coder/v2/coderd/util/slice" "github.com/coder/coder/v2/codersdk" "github.com/coder/coder/v2/codersdk/agentsdk" "github.com/coder/coder/v2/tailnet" @@ -1528,7 +1525,11 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque key := chi.URLParam(r, "key") const ( - maxValueLen = 32 << 10 + // maxValueLen is set to 2048 to stay under the 8000 byte Postgres + // NOTIFY limit. Since both value and error can be set, the real + // payload limit is 2 * 2048 * 4/3 = 5461 bytes + a few hundred bytes for JSON + // syntax, key names, and metadata. + maxValueLen = 2048 maxErrorLen = maxValueLen ) @@ -1571,7 +1572,13 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque slog.F("value", ellipse(datum.Value, 16)), ) - err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), []byte(datum.Key)) + datumJSON, err := json.Marshal(datum) + if err != nil { + httpapi.InternalServerError(rw, err) + return + } + + err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON) if err != nil { httpapi.InternalServerError(rw, err) return @@ -1597,7 +1604,42 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ ) ) - sendEvent, senderClosed, err := httpapi.ServerSentEventSender(rw, r) + // We avoid channel-based synchronization here to avoid backpressure problems. + var ( + metadataMapMu sync.Mutex + metadataMap = make(map[string]database.WorkspaceAgentMetadatum) + // pendingChanges must only be mutated when metadataMapMu is held. + pendingChanges atomic.Bool + ) + + // Send metadata on updates, we must ensure subscription before sending + // initial metadata to guarantee that events in-between are not missed. + cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) { + var update database.UpdateWorkspaceAgentMetadataParams + err := json.Unmarshal(byt, &update) + if err != nil { + api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err)) + return + } + + log.Debug(ctx, "received metadata update", "key", update.Key) + + metadataMapMu.Lock() + defer metadataMapMu.Unlock() + md := metadataMap[update.Key] + md.Value = update.Value + md.Error = update.Error + md.CollectedAt = update.CollectedAt + metadataMap[update.Key] = md + pendingChanges.Store(true) + }) + if err != nil { + httpapi.InternalServerError(rw, err) + return + } + defer cancelSub() + + sseSendEvent, sseSenderClosed, err := httpapi.ServerSentEventSender(rw, r) if err != nil { httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{ Message: "Internal error setting up server-sent events.", @@ -1607,97 +1649,61 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ } // Prevent handler from returning until the sender is closed. defer func() { - <-senderClosed + <-sseSenderClosed }() - const refreshInterval = time.Second * 5 - refreshTicker := time.NewTicker(refreshInterval) - defer refreshTicker.Stop() + // We send updates exactly every second. + const sendInterval = time.Second * 1 + sendTicker := time.NewTicker(sendInterval) + defer sendTicker.Stop() - var ( - lastDBMetaMu sync.Mutex - lastDBMeta []database.WorkspaceAgentMetadatum - ) - - sendMetadata := func(pull bool) { - log.Debug(ctx, "sending metadata update", "pull", pull) - lastDBMetaMu.Lock() - defer lastDBMetaMu.Unlock() - - var err error - if pull { - // We always use the original Request context because it contains - // the RBAC actor. - lastDBMeta, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID) - if err != nil { - _ = sendEvent(ctx, codersdk.ServerSentEvent{ - Type: codersdk.ServerSentEventTypeError, - Data: codersdk.Response{ - Message: "Internal error getting metadata.", - Detail: err.Error(), - }, - }) - return - } - slices.SortFunc(lastDBMeta, func(a, b database.WorkspaceAgentMetadatum) int { - return slice.Ascending(a.Key, b.Key) - }) - - // Avoid sending refresh if the client is about to get a - // fresh update. - refreshTicker.Reset(refreshInterval) - } - - _ = sendEvent(ctx, codersdk.ServerSentEvent{ - Type: codersdk.ServerSentEventTypeData, - Data: convertWorkspaceAgentMetadata(lastDBMeta), - }) - } - - // Note: we previously used a debounce here, but when the rate of metadata updates was too - // high the debounce would never fire. - // - // The rate-limit has its own caveat. If the agent sends a burst of metadata - // but then goes quiet, we will never pull the new metadata and the frontend - // will go stale until refresh. This would only happen if the agent was - // under extreme load. Under normal operations, the interval between metadata - // updates is constant so there is no burst phenomenon. - pubsubRatelimit := rate.NewLimiter(rate.Every(time.Second), 2) - if flag.Lookup("test.v") != nil { - // We essentially disable the rate-limit in tests for determinism. - pubsubRatelimit = rate.NewLimiter(rate.Every(time.Second*100), 100) - } - - // Send metadata on updates, we must ensure subscription before sending - // initial metadata to guarantee that events in-between are not missed. - cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, _ []byte) { - allow := pubsubRatelimit.Allow() - log.Debug(ctx, "received metadata update", "allow", allow) - if allow { - sendMetadata(true) - } - }) + // We always use the original Request context because it contains + // the RBAC actor. + md, err := api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID) if err != nil { + // If we can't successfully pull the initial metadata, pubsub + // updates will be no-op so we may as well terminate the + // connection early. httpapi.InternalServerError(rw, err) return } - defer cancelSub() + + metadataMapMu.Lock() + for _, datum := range md { + metadataMap[datum.Key] = datum + } + metadataMapMu.Unlock() // Send initial metadata. - sendMetadata(true) + + var lastSend time.Time + sendMetadata := func() { + metadataMapMu.Lock() + values := maps.Values(metadataMap) + pendingChanges.Store(false) + metadataMapMu.Unlock() + + lastSend = time.Now() + _ = sseSendEvent(ctx, codersdk.ServerSentEvent{ + Type: codersdk.ServerSentEventTypeData, + Data: convertWorkspaceAgentMetadata(values), + }) + } + + sendMetadata() for { select { - case <-senderClosed: + case <-sendTicker.C: + // We send an update even if there's no change every 5 seconds + // to ensure that the frontend always has an accurate "Result.Age". + if !pendingChanges.Load() && time.Since(lastSend) < time.Second*5 { + continue + } + sendMetadata() + case <-sseSenderClosed: return - case <-refreshTicker.C: } - - // Avoid spamming the DB with reads we know there are no updates. We want - // to continue sending updates to the frontend so that "Result.Age" - // is always accurate. This way, the frontend doesn't need to - // sync its own clock with the backend. - sendMetadata(false) } } @@ -1721,6 +1727,10 @@ func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []code }, }) } + // Sorting prevents the metadata from jumping around in the frontend. + sort.Slice(result, func(i, j int) bool { + return result[i].Description.Key < result[j].Description.Key + }) return result } diff --git a/coderd/workspaceagents_test.go b/coderd/workspaceagents_test.go index d1b379d3d7..48a399cee3 100644 --- a/coderd/workspaceagents_test.go +++ b/coderd/workspaceagents_test.go @@ -1153,7 +1153,7 @@ func TestWorkspaceAgent_Metadata(t *testing.T) { require.Len(t, update, 3) check(wantMetadata1, update[0], true) - const maxValueLen = 32 << 10 + const maxValueLen = 2048 tooLongValueMetadata := wantMetadata1 tooLongValueMetadata.Value = strings.Repeat("a", maxValueLen*2) tooLongValueMetadata.Error = ""