feat(coderd): add support for sending batched agent metadata (#10223)

Part of #9782
This commit is contained in:
Mathias Fredriksson
2023-10-13 16:37:55 +03:00
committed by GitHub
parent 1b1ab97c24
commit 7eeba15d16
18 changed files with 472 additions and 146 deletions

View File

@ -16,11 +16,9 @@ import (
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"github.com/sqlc-dev/pqtype"
"golang.org/x/exp/maps"
@ -181,7 +179,10 @@ func (api *API) workspaceAgentManifest(rw http.ResponseWriter, r *http.Request)
return err
})
eg.Go(func() (err error) {
metadata, err = api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
metadata, err = api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
WorkspaceAgentID: workspaceAgent.ID,
Keys: nil,
})
return err
})
eg.Go(func() (err error) {
@ -1723,10 +1724,9 @@ func ellipse(v string, n int) string {
// @Security CoderSessionToken
// @Accept json
// @Tags Agents
// @Param request body agentsdk.PostMetadataRequest true "Workspace agent metadata request"
// @Param key path string true "metadata key" format(string)
// @Param request body []agentsdk.PostMetadataRequest true "Workspace agent metadata request"
// @Success 204 "Success"
// @Router /workspaceagents/me/metadata/{key} [post]
// @Router /workspaceagents/me/metadata [post]
// @x-apidocgen {"skip": true}
func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
@ -1738,17 +1738,18 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
workspaceAgent := httpmw.WorkspaceAgent(r)
workspace, err := api.Database.GetWorkspaceByAgentID(ctx, workspaceAgent.ID)
// Split into function to allow call by deprecated handler.
err := api.workspaceAgentUpdateMetadata(ctx, workspaceAgent, req)
if err != nil {
httpapi.Write(ctx, rw, http.StatusBadRequest, codersdk.Response{
Message: "Failed to get workspace.",
Detail: err.Error(),
})
api.Logger.Error(ctx, "failed to handle metadata request", slog.Error(err))
httpapi.InternalServerError(rw, err)
return
}
key := chi.URLParam(r, "key")
httpapi.Write(ctx, rw, http.StatusNoContent, nil)
}
func (api *API) workspaceAgentUpdateMetadata(ctx context.Context, workspaceAgent database.WorkspaceAgent, req agentsdk.PostMetadataRequest) error {
const (
// maxValueLen is set to 2048 to stay under the 8000 byte Postgres
// NOTIFY limit. Since both value and error can be set, the real
@ -1758,58 +1759,67 @@ func (api *API) workspaceAgentPostMetadata(rw http.ResponseWriter, r *http.Reque
maxErrorLen = maxValueLen
)
metadataError := req.Error
// We overwrite the error if the provided payload is too long.
if len(req.Value) > maxValueLen {
metadataError = fmt.Sprintf("value of %d bytes exceeded %d bytes", len(req.Value), maxValueLen)
req.Value = req.Value[:maxValueLen]
}
if len(req.Error) > maxErrorLen {
metadataError = fmt.Sprintf("error of %d bytes exceeded %d bytes", len(req.Error), maxErrorLen)
req.Error = req.Error[:maxErrorLen]
}
collectedAt := time.Now()
datum := database.UpdateWorkspaceAgentMetadataParams{
WorkspaceAgentID: workspaceAgent.ID,
Key: []string{},
Value: []string{},
Error: []string{},
CollectedAt: []time.Time{},
}
for _, md := range req.Metadata {
metadataError := md.Error
// We overwrite the error if the provided payload is too long.
if len(md.Value) > maxValueLen {
metadataError = fmt.Sprintf("value of %d bytes exceeded %d bytes", len(md.Value), maxValueLen)
md.Value = md.Value[:maxValueLen]
}
if len(md.Error) > maxErrorLen {
metadataError = fmt.Sprintf("error of %d bytes exceeded %d bytes", len(md.Error), maxErrorLen)
md.Error = md.Error[:maxErrorLen]
}
// We don't want a misconfigured agent to fill the database.
Key: key,
Value: req.Value,
Error: metadataError,
datum.Key = append(datum.Key, md.Key)
datum.Value = append(datum.Value, md.Value)
datum.Error = append(datum.Error, metadataError)
// We ignore the CollectedAt from the agent to avoid bugs caused by
// clock skew.
CollectedAt: time.Now(),
datum.CollectedAt = append(datum.CollectedAt, collectedAt)
api.Logger.Debug(
ctx, "accepted metadata report",
slog.F("workspace_agent_id", workspaceAgent.ID),
slog.F("collected_at", collectedAt),
slog.F("original_collected_at", md.CollectedAt),
slog.F("key", md.Key),
slog.F("value", ellipse(md.Value, 16)),
)
}
payload, err := json.Marshal(workspaceAgentMetadataChannelPayload{
CollectedAt: collectedAt,
Keys: datum.Key,
})
if err != nil {
return err
}
err = api.Database.UpdateWorkspaceAgentMetadata(ctx, datum)
if err != nil {
httpapi.InternalServerError(rw, err)
return
return err
}
api.Logger.Debug(
ctx, "accepted metadata report",
slog.F("workspace_agent_id", workspaceAgent.ID),
slog.F("workspace_id", workspace.ID),
slog.F("collected_at", datum.CollectedAt),
slog.F("key", datum.Key),
slog.F("value", ellipse(datum.Value, 16)),
)
datumJSON, err := json.Marshal(datum)
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), payload)
if err != nil {
httpapi.InternalServerError(rw, err)
return
return err
}
err = api.Pubsub.Publish(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), datumJSON)
if err != nil {
httpapi.InternalServerError(rw, err)
return
}
httpapi.Write(ctx, rw, http.StatusNoContent, nil)
return nil
}
// @Summary Watch for workspace agent metadata updates
@ -1829,34 +1839,37 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
)
)
// We avoid channel-based synchronization here to avoid backpressure problems.
var (
metadataMapMu sync.Mutex
metadataMap = make(map[string]database.WorkspaceAgentMetadatum)
// pendingChanges must only be mutated when metadataMapMu is held.
pendingChanges atomic.Bool
)
// Send metadata on updates, we must ensure subscription before sending
// initial metadata to guarantee that events in-between are not missed.
update := make(chan workspaceAgentMetadataChannelPayload, 1)
cancelSub, err := api.Pubsub.Subscribe(watchWorkspaceAgentMetadataChannel(workspaceAgent.ID), func(_ context.Context, byt []byte) {
var update database.UpdateWorkspaceAgentMetadataParams
err := json.Unmarshal(byt, &update)
var payload workspaceAgentMetadataChannelPayload
err := json.Unmarshal(byt, &payload)
if err != nil {
api.Logger.Error(ctx, "failed to unmarshal pubsub message", slog.Error(err))
return
}
log.Debug(ctx, "received metadata update", "key", update.Key)
log.Debug(ctx, "received metadata update", "payload", payload)
metadataMapMu.Lock()
defer metadataMapMu.Unlock()
md := metadataMap[update.Key]
md.Value = update.Value
md.Error = update.Error
md.CollectedAt = update.CollectedAt
metadataMap[update.Key] = md
pendingChanges.Store(true)
select {
case prev := <-update:
// This update wasn't consumed yet, merge the keys.
newKeysSet := make(map[string]struct{})
for _, key := range payload.Keys {
newKeysSet[key] = struct{}{}
}
keys := prev.Keys
for _, key := range prev.Keys {
if _, ok := newKeysSet[key]; !ok {
keys = append(keys, key)
}
}
payload.Keys = keys
default:
}
// This can never block since we pop and merge beforehand.
update <- payload
})
if err != nil {
httpapi.InternalServerError(rw, err)
@ -1877,14 +1890,12 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
<-sseSenderClosed
}()
// We send updates exactly every second.
const sendInterval = time.Second * 1
sendTicker := time.NewTicker(sendInterval)
defer sendTicker.Stop()
// We always use the original Request context because it contains
// the RBAC actor.
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, workspaceAgent.ID)
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
WorkspaceAgentID: workspaceAgent.ID,
Keys: nil,
})
if err != nil {
// If we can't successfully pull the initial metadata, pubsub
// updates will be no-op so we may as well terminate the
@ -1893,42 +1904,84 @@ func (api *API) watchWorkspaceAgentMetadata(rw http.ResponseWriter, r *http.Requ
return
}
metadataMapMu.Lock()
metadataMap := make(map[string]database.WorkspaceAgentMetadatum)
for _, datum := range md {
metadataMap[datum.Key] = datum
}
metadataMapMu.Unlock()
// Send initial metadata.
var lastSend time.Time
sendMetadata := func() {
metadataMapMu.Lock()
values := maps.Values(metadataMap)
pendingChanges.Store(false)
metadataMapMu.Unlock()
lastSend = time.Now()
values := maps.Values(metadataMap)
_ = sseSendEvent(ctx, codersdk.ServerSentEvent{
Type: codersdk.ServerSentEventTypeData,
Data: convertWorkspaceAgentMetadata(values),
})
}
// We send updates exactly every second.
const sendInterval = time.Second * 1
sendTicker := time.NewTicker(sendInterval)
defer sendTicker.Stop()
// Send initial metadata.
sendMetadata()
// Fetch updated metadata keys as they come in.
fetchedMetadata := make(chan []database.WorkspaceAgentMetadatum)
go func() {
defer close(fetchedMetadata)
for {
select {
case <-sseSenderClosed:
return
case payload := <-update:
md, err := api.Database.GetWorkspaceAgentMetadata(ctx, database.GetWorkspaceAgentMetadataParams{
WorkspaceAgentID: workspaceAgent.ID,
Keys: payload.Keys,
})
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Error(ctx, "failed to get metadata", slog.Error(err))
}
return
}
select {
case <-sseSenderClosed:
return
// We want to block here to avoid constantly pinging the
// database when the metadata isn't being processed.
case fetchedMetadata <- md:
}
}
}
}()
pendingChanges := true
for {
select {
case <-sseSenderClosed:
return
case md, ok := <-fetchedMetadata:
if !ok {
return
}
for _, datum := range md {
metadataMap[datum.Key] = datum
}
pendingChanges = true
continue
case <-sendTicker.C:
// We send an update even if there's no change every 5 seconds
// to ensure that the frontend always has an accurate "Result.Age".
if !pendingChanges.Load() && time.Since(lastSend) < time.Second*5 {
if !pendingChanges && time.Since(lastSend) < 5*time.Second {
continue
}
sendMetadata()
case <-sseSenderClosed:
return
pendingChanges = false
}
sendMetadata()
}
}
@ -1959,6 +2012,11 @@ func convertWorkspaceAgentMetadata(db []database.WorkspaceAgentMetadatum) []code
return result
}
type workspaceAgentMetadataChannelPayload struct {
CollectedAt time.Time `json:"collected_at"`
Keys []string `json:"keys"`
}
func watchWorkspaceAgentMetadataChannel(id uuid.UUID) string {
return "workspace_agent_metadata:" + id.String()
}