chore: refactor agent stats streaming (#5112)

This commit is contained in:
Colin Adler
2022-11-18 16:46:53 -06:00
committed by GitHub
parent 13a4cfa670
commit ae38bbeab6
20 changed files with 534 additions and 310 deletions

View File

@ -18,7 +18,6 @@ import (
"github.com/google/uuid"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
"tailscale.com/tailcfg"
"cdr.dev/slog"
@ -553,12 +552,46 @@ func (c *Client) WorkspaceAgentListeningPorts(ctx context.Context, agentID uuid.
// Stats records the Agent's network connection statistics for use in
// user-facing metrics and debugging.
// Each member value must be written and read with atomic.
// @typescript-ignore AgentStats
type AgentStats struct {
// ConnsByProto is a count of connections by protocol.
ConnsByProto map[string]int64 `json:"conns_by_proto"`
// NumConns is the number of connections received by an agent.
NumConns int64 `json:"num_comms"`
RxBytes int64 `json:"rx_bytes"`
TxBytes int64 `json:"tx_bytes"`
// RxPackets is the number of received packets.
RxPackets int64 `json:"rx_packets"`
// RxBytes is the number of received bytes.
RxBytes int64 `json:"rx_bytes"`
// TxPackets is the number of transmitted bytes.
TxPackets int64 `json:"tx_packets"`
// TxBytes is the number of transmitted bytes.
TxBytes int64 `json:"tx_bytes"`
}
// @typescript-ignore AgentStatsResponse
type AgentStatsResponse struct {
// ReportInterval is the duration after which the agent should send stats
// again.
ReportInterval time.Duration `json:"report_interval"`
}
func (c *Client) PostAgentStats(ctx context.Context, stats *AgentStats) (AgentStatsResponse, error) {
res, err := c.Request(ctx, http.MethodPost, "/api/v2/workspaceagents/me/report-stats", stats)
if err != nil {
return AgentStatsResponse{}, xerrors.Errorf("send request: %w", err)
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return AgentStatsResponse{}, readBodyAsError(res)
}
var interval AgentStatsResponse
err = json.NewDecoder(res.Body).Decode(&interval)
if err != nil {
return AgentStatsResponse{}, xerrors.Errorf("decode stats response: %w", err)
}
return interval, nil
}
// AgentReportStats begins a stat streaming connection with the Coder server.
@ -566,84 +599,41 @@ type AgentStats struct {
func (c *Client) AgentReportStats(
ctx context.Context,
log slog.Logger,
stats func() *AgentStats,
getStats func() *AgentStats,
) (io.Closer, error) {
serverURL, err := c.URL.Parse("/api/v2/workspaceagents/me/report-stats")
if err != nil {
return nil, xerrors.Errorf("parse url: %w", err)
}
jar, err := cookiejar.New(nil)
if err != nil {
return nil, xerrors.Errorf("create cookie jar: %w", err)
}
jar.SetCookies(serverURL, []*http.Cookie{{
Name: SessionTokenKey,
Value: c.SessionToken(),
}})
httpClient := &http.Client{
Jar: jar,
Transport: c.HTTPClient.Transport,
}
doneCh := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
go func() {
defer close(doneCh)
// Immediately trigger a stats push to get the correct interval.
timer := time.NewTimer(time.Nanosecond)
defer timer.Stop()
// If the agent connection succeeds for a while, then fails, then succeeds
// for a while (etc.) the retry may hit the maximum. This is a normal
// case for long-running agents that experience coderd upgrades, so
// we use a short maximum retry limit.
for r := retry.New(time.Second, time.Minute); r.Wait(ctx); {
err = func() error {
conn, res, err := websocket.Dial(ctx, serverURL.String(), &websocket.DialOptions{
HTTPClient: httpClient,
// Need to disable compression to avoid a data-race.
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
if res == nil {
return err
}
return readBodyAsError(res)
}
for {
var req AgentStatsReportRequest
err := wsjson.Read(ctx, conn, &req)
if err != nil {
_ = conn.Close(websocket.StatusGoingAway, "")
return err
}
s := stats()
resp := AgentStatsReportResponse{
NumConns: s.NumConns,
RxBytes: s.RxBytes,
TxBytes: s.TxBytes,
}
err = wsjson.Write(ctx, conn, resp)
if err != nil {
_ = conn.Close(websocket.StatusGoingAway, "")
return err
}
}
}()
if err != nil && ctx.Err() == nil {
log.Error(ctx, "report stats", slog.Error(err))
for {
select {
case <-ctx.Done():
return
case <-timer.C:
}
var nextInterval time.Duration
for r := retry.New(100*time.Millisecond, time.Minute); r.Wait(ctx); {
resp, err := c.PostAgentStats(ctx, getStats())
if err != nil {
if !xerrors.Is(err, context.Canceled) {
log.Error(ctx, "report stats", slog.Error(err))
}
continue
}
nextInterval = resp.ReportInterval
break
}
timer.Reset(nextInterval)
}
}()
return closeFunc(func() error {
cancel()
<-doneCh
return nil
}), nil
}