chore: refactor apphealth and tests to use clock testing library (#13576)

Refactors the apphealth subsystem and unit tests to use `clock.Clock`.

Also slightly simplifies the implementation, which wrapped a function that never returned an error in a `retry.Retry`.  The retry is entirely superfluous in that case, so removed.

UTs used to take a few seconds to run, and now run in milliseconds or better.  No sleeps, `Eventually`, or polling.

Dropped the "no spamming" test since we can directly assert the number of handler calls on the mainline test case.
This commit is contained in:
Spike Curtis
2024-06-14 15:06:33 +04:00
committed by GitHub
parent fe240add86
commit c01d6fdf46
2 changed files with 206 additions and 172 deletions

View File

@ -10,14 +10,11 @@ import (
"golang.org/x/xerrors"
"cdr.dev/slog"
"github.com/coder/coder/v2/clock"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/retry"
)
// WorkspaceAgentApps fetches the workspace apps.
type WorkspaceAgentApps func(context.Context) ([]codersdk.WorkspaceApp, error)
// PostWorkspaceAgentAppHealth updates the workspace app health.
type PostWorkspaceAgentAppHealth func(context.Context, agentsdk.PostAppHealthsRequest) error
@ -26,15 +23,26 @@ type WorkspaceAppHealthReporter func(ctx context.Context)
// NewWorkspaceAppHealthReporter creates a WorkspaceAppHealthReporter that reports app health to coderd.
func NewWorkspaceAppHealthReporter(logger slog.Logger, apps []codersdk.WorkspaceApp, postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth) WorkspaceAppHealthReporter {
return NewAppHealthReporterWithClock(logger, apps, postWorkspaceAgentAppHealth, clock.NewReal())
}
// NewAppHealthReporterWithClock is only called directly by test code. Product code should call
// NewAppHealthReporter.
func NewAppHealthReporterWithClock(
logger slog.Logger,
apps []codersdk.WorkspaceApp,
postWorkspaceAgentAppHealth PostWorkspaceAgentAppHealth,
clk clock.Clock,
) WorkspaceAppHealthReporter {
logger = logger.Named("apphealth")
runHealthcheckLoop := func(ctx context.Context) error {
return func(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// no need to run this loop if no apps for this workspace.
if len(apps) == 0 {
return nil
return
}
hasHealthchecksEnabled := false
@ -49,7 +57,7 @@ func NewWorkspaceAppHealthReporter(logger slog.Logger, apps []codersdk.Workspace
// no need to run this loop if no health checks are configured.
if !hasHealthchecksEnabled {
return nil
return
}
// run a ticker for each app health check.
@ -61,25 +69,29 @@ func NewWorkspaceAppHealthReporter(logger slog.Logger, apps []codersdk.Workspace
}
app := nextApp
go func() {
t := time.NewTicker(time.Duration(app.Healthcheck.Interval) * time.Second)
defer t.Stop()
_ = clk.TickerFunc(ctx, time.Duration(app.Healthcheck.Interval)*time.Second, func() error {
// We time out at the healthcheck interval to prevent getting too backed up, but
// set it 1ms early so that it's not simultaneous with the next tick in testing,
// which makes the test easier to understand.
//
// It would be idiomatic to use the http.Client.Timeout or a context.WithTimeout,
// but we are passing this off to the native http library, which is not aware
// of the clock library we are using. That means in testing, with a mock clock
// it will compare mocked times with real times, and we will get strange results.
// So, we just implement the timeout as a context we cancel with an AfterFunc
reqCtx, reqCancel := context.WithCancel(ctx)
timeout := clk.AfterFunc(
time.Duration(app.Healthcheck.Interval)*time.Second-time.Millisecond,
reqCancel,
"timeout", app.Slug)
defer timeout.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
// we set the http timeout to the healthcheck interval to prevent getting too backed up.
client := &http.Client{
Timeout: time.Duration(app.Healthcheck.Interval) * time.Second,
}
err := func() error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, app.Healthcheck.URL, nil)
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, app.Healthcheck.URL, nil)
if err != nil {
return err
}
res, err := client.Do(req)
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
@ -118,54 +130,36 @@ func NewWorkspaceAppHealthReporter(logger slog.Logger, apps []codersdk.Workspace
mu.Unlock()
logger.Debug(ctx, "workspace app healthy", slog.F("id", app.ID.String()), slog.F("slug", app.Slug))
}
t.Reset(time.Duration(app.Healthcheck.Interval) * time.Second)
}
return nil
}, "healthcheck", app.Slug)
}()
}
mu.Lock()
lastHealth := copyHealth(health)
mu.Unlock()
reportTicker := time.NewTicker(time.Second)
defer reportTicker.Stop()
// every second we check if the health values of the apps have changed
// and if there is a change we will report the new values.
for {
select {
case <-ctx.Done():
reportTicker := clk.TickerFunc(ctx, time.Second, func() error {
mu.RLock()
changed := healthChanged(lastHealth, health)
mu.RUnlock()
if !changed {
return nil
case <-reportTicker.C:
mu.RLock()
changed := healthChanged(lastHealth, health)
mu.RUnlock()
if !changed {
continue
}
mu.Lock()
lastHealth = copyHealth(health)
mu.Unlock()
err := postWorkspaceAgentAppHealth(ctx, agentsdk.PostAppHealthsRequest{
Healths: lastHealth,
})
if err != nil {
logger.Error(ctx, "failed to report workspace app health", slog.Error(err))
} else {
logger.Debug(ctx, "sent workspace app health", slog.F("health", lastHealth))
}
}
}
}
return func(ctx context.Context) {
for r := retry.New(time.Second, 30*time.Second); r.Wait(ctx); {
err := runHealthcheckLoop(ctx)
if err == nil || xerrors.Is(err, context.Canceled) || xerrors.Is(err, context.DeadlineExceeded) {
return
mu.Lock()
lastHealth = copyHealth(health)
mu.Unlock()
err := postWorkspaceAgentAppHealth(ctx, agentsdk.PostAppHealthsRequest{
Healths: lastHealth,
})
if err != nil {
logger.Error(ctx, "failed to report workspace app health", slog.Error(err))
} else {
logger.Debug(ctx, "sent workspace app health", slog.F("health", lastHealth))
}
logger.Error(ctx, "failed running workspace app reporter", slog.Error(err))
}
return nil
}, "report")
_ = reportTicker.Wait() // only possible error is context done
}
}

View File

@ -4,14 +4,12 @@ import (
"context"
"net/http"
"net/http/httptest"
"slices"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"cdr.dev/slog"
@ -19,6 +17,7 @@ import (
"github.com/coder/coder/v2/agent"
"github.com/coder/coder/v2/agent/agenttest"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/clock"
"github.com/coder/coder/v2/coderd/httpapi"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
@ -27,15 +26,17 @@ import (
func TestAppHealth_Healthy(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
apps := []codersdk.WorkspaceApp{
{
ID: uuid.UUID{1},
Slug: "app1",
Healthcheck: codersdk.Healthcheck{},
Health: codersdk.WorkspaceAppHealthDisabled,
},
{
ID: uuid.UUID{2},
Slug: "app2",
Healthcheck: codersdk.Healthcheck{
// URL: We don't set the URL for this test because the setup will
@ -46,6 +47,7 @@ func TestAppHealth_Healthy(t *testing.T) {
Health: codersdk.WorkspaceAppHealthInitializing,
},
{
ID: uuid.UUID{3},
Slug: "app3",
Healthcheck: codersdk.Healthcheck{
Interval: 2,
@ -54,36 +56,70 @@ func TestAppHealth_Healthy(t *testing.T) {
Health: codersdk.WorkspaceAppHealthInitializing,
},
}
checks := make(map[string]int)
handlers := []http.Handler{
nil,
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
checks["app2"]++
httpapi.Write(r.Context(), w, http.StatusOK, nil)
}),
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
checks["app3"]++
httpapi.Write(r.Context(), w, http.StatusOK, nil)
}),
}
getApps, closeFn := setupAppReporter(ctx, t, apps, handlers)
defer closeFn()
apps, err := getApps(ctx)
require.NoError(t, err)
require.EqualValues(t, codersdk.WorkspaceAppHealthDisabled, apps[0].Health)
require.Eventually(t, func() bool {
apps, err := getApps(ctx)
if err != nil {
return false
}
mClock := clock.NewMock(t)
healthcheckTrap := mClock.Trap().TickerFunc("healthcheck")
defer healthcheckTrap.Close()
reportTrap := mClock.Trap().TickerFunc("report")
defer reportTrap.Close()
return apps[1].Health == codersdk.WorkspaceAppHealthHealthy && apps[2].Health == codersdk.WorkspaceAppHealthHealthy
}, testutil.WaitLong, testutil.IntervalSlow)
fakeAPI, closeFn := setupAppReporter(ctx, t, slices.Clone(apps), handlers, mClock)
defer closeFn()
healthchecksStarted := make([]string, 2)
for i := 0; i < 2; i++ {
c := healthcheckTrap.MustWait(ctx)
c.Release()
healthchecksStarted[i] = c.Tags[1]
}
slices.Sort(healthchecksStarted)
require.Equal(t, []string{"app2", "app3"}, healthchecksStarted)
// advance the clock 1ms before the report ticker starts, so that it's not
// simultaneous with the checks.
mClock.Advance(time.Millisecond).MustWait(ctx)
reportTrap.MustWait(ctx).Release()
mClock.Advance(999 * time.Millisecond).MustWait(ctx) // app2 is now healthy
mClock.Advance(time.Millisecond).MustWait(ctx) // report gets triggered
update := testutil.RequireRecvCtx(ctx, t, fakeAPI.AppHealthCh())
require.Len(t, update.GetUpdates(), 2)
applyUpdate(t, apps, update)
require.Equal(t, codersdk.WorkspaceAppHealthHealthy, apps[1].Health)
require.Equal(t, codersdk.WorkspaceAppHealthInitializing, apps[2].Health)
mClock.Advance(999 * time.Millisecond).MustWait(ctx) // app3 is now healthy
mClock.Advance(time.Millisecond).MustWait(ctx) // report gets triggered
update = testutil.RequireRecvCtx(ctx, t, fakeAPI.AppHealthCh())
require.Len(t, update.GetUpdates(), 2)
applyUpdate(t, apps, update)
require.Equal(t, codersdk.WorkspaceAppHealthHealthy, apps[1].Health)
require.Equal(t, codersdk.WorkspaceAppHealthHealthy, apps[2].Health)
// ensure we aren't spamming
require.Equal(t, 2, checks["app2"])
require.Equal(t, 1, checks["app3"])
}
func TestAppHealth_500(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
apps := []codersdk.WorkspaceApp{
{
ID: uuid.UUID{2},
Slug: "app2",
Healthcheck: codersdk.Healthcheck{
// URL: We don't set the URL for this test because the setup will
@ -99,59 +135,40 @@ func TestAppHealth_500(t *testing.T) {
httpapi.Write(r.Context(), w, http.StatusInternalServerError, nil)
}),
}
getApps, closeFn := setupAppReporter(ctx, t, apps, handlers)
defer closeFn()
require.Eventually(t, func() bool {
apps, err := getApps(ctx)
if err != nil {
return false
}
return apps[0].Health == codersdk.WorkspaceAppHealthUnhealthy
}, testutil.WaitLong, testutil.IntervalSlow)
mClock := clock.NewMock(t)
healthcheckTrap := mClock.Trap().TickerFunc("healthcheck")
defer healthcheckTrap.Close()
reportTrap := mClock.Trap().TickerFunc("report")
defer reportTrap.Close()
fakeAPI, closeFn := setupAppReporter(ctx, t, slices.Clone(apps), handlers, mClock)
defer closeFn()
healthcheckTrap.MustWait(ctx).Release()
// advance the clock 1ms before the report ticker starts, so that it's not
// simultaneous with the checks.
mClock.Advance(time.Millisecond).MustWait(ctx)
reportTrap.MustWait(ctx).Release()
mClock.Advance(999 * time.Millisecond).MustWait(ctx) // check gets triggered
mClock.Advance(time.Millisecond).MustWait(ctx) // report gets triggered, but unsent since we are at the threshold
mClock.Advance(999 * time.Millisecond).MustWait(ctx) // 2nd check, crosses threshold
mClock.Advance(time.Millisecond).MustWait(ctx) // 2nd report, sends update
update := testutil.RequireRecvCtx(ctx, t, fakeAPI.AppHealthCh())
require.Len(t, update.GetUpdates(), 1)
applyUpdate(t, apps, update)
require.Equal(t, codersdk.WorkspaceAppHealthUnhealthy, apps[0].Health)
}
func TestAppHealth_Timeout(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()
apps := []codersdk.WorkspaceApp{
{
Slug: "app2",
Healthcheck: codersdk.Healthcheck{
// URL: We don't set the URL for this test because the setup will
// create a httptest server for us and set it for us.
Interval: 1,
Threshold: 1,
},
Health: codersdk.WorkspaceAppHealthInitializing,
},
}
handlers := []http.Handler{
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// sleep longer than the interval to cause the health check to time out
time.Sleep(2 * time.Second)
httpapi.Write(r.Context(), w, http.StatusOK, nil)
}),
}
getApps, closeFn := setupAppReporter(ctx, t, apps, handlers)
defer closeFn()
require.Eventually(t, func() bool {
apps, err := getApps(ctx)
if err != nil {
return false
}
return apps[0].Health == codersdk.WorkspaceAppHealthUnhealthy
}, testutil.WaitLong, testutil.IntervalSlow)
}
func TestAppHealth_NotSpamming(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
apps := []codersdk.WorkspaceApp{
{
ID: uuid.UUID{2},
Slug: "app2",
Healthcheck: codersdk.Healthcheck{
// URL: We don't set the URL for this test because the setup will
@ -163,27 +180,65 @@ func TestAppHealth_NotSpamming(t *testing.T) {
},
}
counter := new(int32)
handlers := []http.Handler{
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddInt32(counter, 1)
http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) {
// allow the request to time out
<-r.Context().Done()
}),
}
_, closeFn := setupAppReporter(ctx, t, apps, handlers)
mClock := clock.NewMock(t)
start := mClock.Now()
// for this test, it's easier to think in the number of milliseconds elapsed
// since start.
ms := func(n int) time.Time {
return start.Add(time.Duration(n) * time.Millisecond)
}
healthcheckTrap := mClock.Trap().TickerFunc("healthcheck")
defer healthcheckTrap.Close()
reportTrap := mClock.Trap().TickerFunc("report")
defer reportTrap.Close()
timeoutTrap := mClock.Trap().AfterFunc("timeout")
defer timeoutTrap.Close()
fakeAPI, closeFn := setupAppReporter(ctx, t, apps, handlers, mClock)
defer closeFn()
// Ensure we haven't made more than 2 (expected 1 + 1 for buffer) requests in the last second.
// if there is a bug where we are spamming the healthcheck route this will catch it.
time.Sleep(time.Second)
require.LessOrEqual(t, atomic.LoadInt32(counter), int32(2))
healthcheckTrap.MustWait(ctx).Release()
// advance the clock 1ms before the report ticker starts, so that it's not
// simultaneous with the checks.
mClock.Set(ms(1)).MustWait(ctx)
reportTrap.MustWait(ctx).Release()
w := mClock.Set(ms(1000)) // 1st check starts
timeoutTrap.MustWait(ctx).Release()
mClock.Set(ms(1001)).MustWait(ctx) // report tick, no change
mClock.Set(ms(1999)) // timeout pops
w.MustWait(ctx) // 1st check finished
w = mClock.Set(ms(2000)) // 2nd check starts
timeoutTrap.MustWait(ctx).Release()
mClock.Set(ms(2001)).MustWait(ctx) // report tick, no change
mClock.Set(ms(2999)) // timeout pops
w.MustWait(ctx) // 2nd check finished
// app is now unhealthy after 2 timeouts
mClock.Set(ms(3000)) // 3rd check starts
timeoutTrap.MustWait(ctx).Release()
mClock.Set(ms(3001)).MustWait(ctx) // report tick, sends changes
update := testutil.RequireRecvCtx(ctx, t, fakeAPI.AppHealthCh())
require.Len(t, update.GetUpdates(), 1)
applyUpdate(t, apps, update)
require.Equal(t, codersdk.WorkspaceAppHealthUnhealthy, apps[0].Health)
}
func setupAppReporter(ctx context.Context, t *testing.T, apps []codersdk.WorkspaceApp, handlers []http.Handler) (agent.WorkspaceAgentApps, func()) {
func setupAppReporter(
ctx context.Context, t *testing.T,
apps []codersdk.WorkspaceApp,
handlers []http.Handler,
clk clock.Clock,
) (*agenttest.FakeAgentAPI, func()) {
closers := []func(){}
for i, app := range apps {
if app.ID == uuid.Nil {
app.ID = uuid.New()
apps[i] = app
}
for _, app := range apps {
require.NotEqual(t, uuid.Nil, app.ID, "all apps must have ID set")
}
for i, handler := range handlers {
if handler == nil {
@ -196,14 +251,6 @@ func setupAppReporter(ctx context.Context, t *testing.T, apps []codersdk.Workspa
closers = append(closers, ts.Close)
}
var mu sync.Mutex
workspaceAgentApps := func(context.Context) ([]codersdk.WorkspaceApp, error) {
mu.Lock()
defer mu.Unlock()
var newApps []codersdk.WorkspaceApp
return append(newApps, apps...), nil
}
// We don't care about manifest or stats in this test since it's not using
// a full agent and these RPCs won't get called.
//
@ -212,38 +259,31 @@ func setupAppReporter(ctx context.Context, t *testing.T, apps []codersdk.Workspa
// post function.
fakeAAPI := agenttest.NewFakeAgentAPI(t, slogtest.Make(t, nil), nil, nil)
// Process events from the channel and update the health of the apps.
go func() {
appHealthCh := fakeAAPI.AppHealthCh()
for {
select {
case <-ctx.Done():
return
case req := <-appHealthCh:
mu.Lock()
for _, update := range req.Updates {
updateID, err := uuid.FromBytes(update.Id)
assert.NoError(t, err)
updateHealth := codersdk.WorkspaceAppHealth(strings.ToLower(proto.AppHealth_name[int32(update.Health)]))
go agent.NewAppHealthReporterWithClock(
slogtest.Make(t, nil).Leveled(slog.LevelDebug),
apps, agentsdk.AppHealthPoster(fakeAAPI), clk,
)(ctx)
for i, app := range apps {
if app.ID != updateID {
continue
}
app.Health = updateHealth
apps[i] = app
}
}
mu.Unlock()
}
}
}()
go agent.NewWorkspaceAppHealthReporter(slogtest.Make(t, nil).Leveled(slog.LevelDebug), apps, agentsdk.AppHealthPoster(fakeAAPI))(ctx)
return workspaceAgentApps, func() {
return fakeAAPI, func() {
for _, closeFn := range closers {
closeFn()
}
}
}
func applyUpdate(t *testing.T, apps []codersdk.WorkspaceApp, req *proto.BatchUpdateAppHealthRequest) {
t.Helper()
for _, update := range req.Updates {
updateID, err := uuid.FromBytes(update.Id)
require.NoError(t, err)
updateHealth := codersdk.WorkspaceAppHealth(strings.ToLower(proto.AppHealth_name[int32(update.Health)]))
for i, app := range apps {
if app.ID != updateID {
continue
}
app.Health = updateHealth
apps[i] = app
}
}
}