chore: add test for coord rolling restart (#14680)

Closes https://github.com/coder/team-coconut/issues/50

---------

Co-authored-by: Ethan Dickson <ethan@coder.com>
This commit is contained in:
Dean Sheather
2024-11-20 17:04:33 +10:00
committed by GitHub
parent 9e63caf593
commit fbe2fa66f5
4 changed files with 348 additions and 6 deletions

View File

@ -5,26 +5,38 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/http/httptest"
"net/url"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/moby/moby/pkg/namesgenerator"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/agent"
"github.com/coder/coder/v2/agent/agenttest"
"github.com/coder/coder/v2/coderd/httpapi"
"github.com/coder/coder/v2/coderd/rbac/policy"
"github.com/coder/coder/v2/coderd/util/ptr"
"github.com/coder/coder/v2/tailnet/tailnettest"
agplaudit "github.com/coder/coder/v2/coderd/audit"
"github.com/coder/coder/v2/coderd/coderdtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbauthz"
"github.com/coder/coder/v2/coderd/database/dbfake"
"github.com/coder/coder/v2/coderd/database/dbmem"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/dbtime"
@ -562,3 +574,330 @@ func testDBAuthzRole(ctx context.Context) context.Context {
Scope: rbac.ScopeAll,
})
}
// restartableListener is a TCP listener that can have all of it's connections
// severed on demand.
type restartableListener struct {
net.Listener
mu sync.Mutex
conns []net.Conn
}
func (l *restartableListener) Accept() (net.Conn, error) {
conn, err := l.Listener.Accept()
if err != nil {
return nil, err
}
l.mu.Lock()
l.conns = append(l.conns, conn)
l.mu.Unlock()
return conn, nil
}
func (l *restartableListener) CloseConnections() {
l.mu.Lock()
defer l.mu.Unlock()
for _, conn := range l.conns {
_ = conn.Close()
}
l.conns = nil
}
type restartableTestServer struct {
options *coderdenttest.Options
rl *restartableListener
mu sync.Mutex
api *coderd.API
closer io.Closer
}
func newRestartableTestServer(t *testing.T, options *coderdenttest.Options) (*codersdk.Client, codersdk.CreateFirstUserResponse, *restartableTestServer) {
t.Helper()
if options == nil {
options = &coderdenttest.Options{}
}
s := &restartableTestServer{
options: options,
}
srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
api := s.api
s.mu.Unlock()
if api == nil {
w.WriteHeader(http.StatusBadGateway)
_, _ = w.Write([]byte("server is not started"))
return
}
api.AGPL.RootHandler.ServeHTTP(w, r)
}))
s.rl = &restartableListener{Listener: srv.Listener}
srv.Listener = s.rl
srv.Start()
t.Cleanup(srv.Close)
u, err := url.Parse(srv.URL)
require.NoError(t, err, "failed to parse server URL")
s.options.AccessURL = u
client, firstUser := s.startWithFirstUser(t)
client.URL = u
return client, firstUser, s
}
func (s *restartableTestServer) Stop(t *testing.T) {
t.Helper()
s.mu.Lock()
closer := s.closer
s.closer = nil
api := s.api
s.api = nil
s.mu.Unlock()
if closer != nil {
err := closer.Close()
require.NoError(t, err)
}
if api != nil {
err := api.Close()
require.NoError(t, err)
}
s.rl.CloseConnections()
}
func (s *restartableTestServer) Start(t *testing.T) {
t.Helper()
_, _ = s.startWithFirstUser(t)
}
func (s *restartableTestServer) startWithFirstUser(t *testing.T) (client *codersdk.Client, firstUser codersdk.CreateFirstUserResponse) {
t.Helper()
s.mu.Lock()
defer s.mu.Unlock()
if s.closer != nil || s.api != nil {
t.Fatal("server already started, close must be called first")
}
// This creates it's own TCP listener unfortunately, but it's not being
// used in this test.
client, s.closer, s.api, firstUser = coderdenttest.NewWithAPI(t, s.options)
// Never add the first user or license on subsequent restarts.
s.options.DontAddFirstUser = true
s.options.DontAddLicense = true
return client, firstUser
}
// Test_CoordinatorRollingRestart tests that two peers can maintain a connection
// without forgetting about each other when a HA coordinator does a rolling
// restart.
//
// We had a few issues with this in the past:
// 1. We didn't allow clients to maintain their peer ID after a reconnect,
// which resulted in the other peer thinking the client was a new peer.
// (This is fixed and independently tested in AGPL code)
// 2. HA coordinators would delete all peers (via FK constraints) when they
// were closed, which meant tunnels would be deleted and peers would be
// notified that the other peer was permanently gone.
// (This is fixed and independently tested above)
//
// This test uses a real server and real clients.
func TestConn_CoordinatorRollingRestart(t *testing.T) {
t.Parallel()
if !dbtestutil.WillUsePostgres() {
t.Skip("test only with postgres")
}
// Although DERP will have connection issues until the connection is
// reestablished, any open connections should be maintained.
//
// Direct connections should be able to transmit packets throughout the
// restart without issue.
//nolint:paralleltest // Outdated rule
for _, direct := range []bool{true, false} {
name := "DERP"
if direct {
name = "Direct"
}
t.Run(name, func(t *testing.T) {
t.Parallel()
store, ps := dbtestutil.NewDB(t)
dv := coderdtest.DeploymentValues(t, func(dv *codersdk.DeploymentValues) {
dv.DERP.Config.BlockDirect = serpent.Bool(!direct)
})
logger := slogtest.Make(t, &slogtest.Options{IgnoreErrors: true}).Leveled(slog.LevelDebug)
// Create two restartable test servers with the same database.
client1, user, s1 := newRestartableTestServer(t, &coderdenttest.Options{
DontAddFirstUser: false,
DontAddLicense: false,
Options: &coderdtest.Options{
Logger: ptr.Ref(logger.Named("server1")),
Database: store,
Pubsub: ps,
DeploymentValues: dv,
IncludeProvisionerDaemon: true,
},
LicenseOptions: &coderdenttest.LicenseOptions{
Features: license.Features{
codersdk.FeatureHighAvailability: 1,
},
},
})
client2, _, s2 := newRestartableTestServer(t, &coderdenttest.Options{
DontAddFirstUser: true,
DontAddLicense: true,
Options: &coderdtest.Options{
Logger: ptr.Ref(logger.Named("server2")),
Database: store,
Pubsub: ps,
DeploymentValues: dv,
},
})
client2.SetSessionToken(client1.SessionToken())
workspace := dbfake.WorkspaceBuild(t, store, database.WorkspaceTable{
OrganizationID: user.OrganizationID,
OwnerID: user.UserID,
}).WithAgent().Do()
// Agent connects via the first coordinator.
_ = agenttest.New(t, client1.URL, workspace.AgentToken, func(o *agent.Options) {
o.Logger = logger.Named("agent1")
})
resources := coderdtest.NewWorkspaceAgentWaiter(t, client1, workspace.Workspace.ID).Wait()
agentID := uuid.Nil
for _, r := range resources {
for _, a := range r.Agents {
agentID = a.ID
break
}
}
require.NotEqual(t, uuid.Nil, agentID)
// Client connects via the second coordinator.
ctx := testutil.Context(t, testutil.WaitSuperLong)
workspaceClient2 := workspacesdk.New(client2)
conn, err := workspaceClient2.DialAgent(ctx, agentID, &workspacesdk.DialAgentOptions{
Logger: logger.Named("client"),
})
require.NoError(t, err)
defer conn.Close()
require.Eventually(t, func() bool {
_, p2p, _, err := conn.Ping(ctx)
assert.NoError(t, err)
return p2p == direct
}, testutil.WaitShort, testutil.IntervalFast)
// Open a TCP server and connection to it through the tunnel that
// should be maintained throughout the restart.
tcpServerAddr := tcpEchoServer(t)
tcpConn, err := conn.DialContext(ctx, "tcp", tcpServerAddr)
require.NoError(t, err)
defer tcpConn.Close()
writeReadEcho(t, ctx, tcpConn)
// Stop the first server.
logger.Info(ctx, "test: stopping server 1")
s1.Stop(t)
// Pings should fail on DERP but succeed on direct connections.
pingCtx, pingCancel := context.WithTimeout(ctx, 2*time.Second) //nolint:gocritic // it's going to hang and timeout for DERP, so this needs to be short
defer pingCancel()
_, p2p, _, err := conn.Ping(pingCtx)
if direct {
require.NoError(t, err)
require.True(t, p2p, "expected direct connection")
} else {
require.ErrorIs(t, err, context.DeadlineExceeded)
}
// The existing TCP connection should still be working if we're
// using direct connections.
if direct {
writeReadEcho(t, ctx, tcpConn)
}
// Start the first server again.
logger.Info(ctx, "test: starting server 1")
s1.Start(t)
// Restart the second server.
logger.Info(ctx, "test: stopping server 2")
s2.Stop(t)
logger.Info(ctx, "test: starting server 2")
s2.Start(t)
// Pings should eventually succeed on both DERP and direct
// connections.
require.True(t, conn.AwaitReachable(ctx))
_, p2p, _, err = conn.Ping(ctx)
require.NoError(t, err)
require.Equal(t, direct, p2p, "mismatched p2p state")
// The existing TCP connection should still be working.
writeReadEcho(t, ctx, tcpConn)
})
}
}
func tcpEchoServer(t *testing.T) string {
var listenerWg sync.WaitGroup
tcpListener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() {
_ = tcpListener.Close()
listenerWg.Wait()
})
listenerWg.Add(1)
go func() {
defer listenerWg.Done()
for {
conn, err := tcpListener.Accept()
if err != nil {
return
}
listenerWg.Add(1)
go func() {
defer listenerWg.Done()
defer conn.Close()
_, _ = io.Copy(conn, conn)
}()
}
}()
return tcpListener.Addr().String()
}
// nolint:revive // t takes precedence.
func writeReadEcho(t *testing.T, ctx context.Context, conn net.Conn) {
msg := namesgenerator.GetRandomName(0)
deadline, ok := ctx.Deadline()
if ok {
_ = conn.SetWriteDeadline(deadline)
defer conn.SetWriteDeadline(time.Time{})
_ = conn.SetReadDeadline(deadline)
defer conn.SetReadDeadline(time.Time{})
}
// Write a message
_, err := conn.Write([]byte(msg))
require.NoError(t, err)
// Read the message back
buf := make([]byte, 1024)
n, err := conn.Read(buf)
require.NoError(t, err)
require.Equal(t, msg, string(buf[:n]))
}