mirror of
https://github.com/coder/coder.git
synced 2025-07-18 14:17:22 +00:00
chore: fix wsproxy flakes (#18522)
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
package wsproxy_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@ -497,8 +498,8 @@ func TestDERPMesh(t *testing.T) {
|
||||
proxyURL, err := url.Parse("https://proxy.test.coder.com")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create 6 proxy replicas.
|
||||
const count = 6
|
||||
// Create 3 proxy replicas.
|
||||
const count = 3
|
||||
var (
|
||||
sessionToken = ""
|
||||
proxies = [count]coderdenttest.WorkspaceProxy{}
|
||||
@ -838,27 +839,33 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create 1 real proxy replica.
|
||||
replicaPingErr := make(chan string, 4)
|
||||
replicaPingRes := make(chan replicaPingCallback, 4)
|
||||
proxy := coderdenttest.NewWorkspaceProxyReplica(t, api, client, &coderdenttest.ProxyOptions{
|
||||
Name: "proxy-2",
|
||||
ProxyURL: proxyURL,
|
||||
ReplicaPingCallback: func(_ []codersdk.Replica, err string) {
|
||||
replicaPingErr <- err
|
||||
ReplicaPingCallback: func(replicas []codersdk.Replica, err string) {
|
||||
t.Logf("got wsproxy ping callback: replica count: %v, ping error: %s", len(replicas), err)
|
||||
replicaPingRes <- replicaPingCallback{
|
||||
replicas: replicas,
|
||||
err: err,
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
// Create a second proxy replica that isn't working.
|
||||
ctx := testutil.Context(t, testutil.WaitLong)
|
||||
otherReplicaID := registerBrokenProxy(ctx, t, api.AccessURL, proxyURL.String(), proxy.Options.ProxySessionToken)
|
||||
|
||||
// Force the proxy to re-register immediately.
|
||||
err = proxy.RegisterNow()
|
||||
require.NoError(t, err, "failed to force proxy to re-register")
|
||||
|
||||
// Wait for the ping to fail.
|
||||
// Force the proxy to re-register and wait for the ping to fail.
|
||||
for {
|
||||
replicaErr := testutil.TryReceive(ctx, t, replicaPingErr)
|
||||
t.Log("replica ping error:", replicaErr)
|
||||
if replicaErr != "" {
|
||||
err = proxy.RegisterNow()
|
||||
require.NoError(t, err, "failed to force proxy to re-register")
|
||||
|
||||
pingRes := testutil.TryReceive(ctx, t, replicaPingRes)
|
||||
// We want to ensure that we know about the other replica, and the
|
||||
// ping failed.
|
||||
if len(pingRes.replicas) == 1 && pingRes.err != "" {
|
||||
t.Log("got failed ping callback for other replica, continuing")
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -884,17 +891,17 @@ func TestWorkspaceProxyDERPMeshProbe(t *testing.T) {
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Force the proxy to re-register immediately.
|
||||
err = proxy.RegisterNow()
|
||||
require.NoError(t, err, "failed to force proxy to re-register")
|
||||
|
||||
// Wait for the ping to be skipped.
|
||||
// Force the proxy to re-register and wait for the ping to be skipped
|
||||
// because there are no more siblings.
|
||||
for {
|
||||
replicaErr := testutil.TryReceive(ctx, t, replicaPingErr)
|
||||
t.Log("replica ping error:", replicaErr)
|
||||
err = proxy.RegisterNow()
|
||||
require.NoError(t, err, "failed to force proxy to re-register")
|
||||
|
||||
replicaErr := testutil.TryReceive(ctx, t, replicaPingRes)
|
||||
// Should be empty because there are no more peers. This was where
|
||||
// the regression was.
|
||||
if replicaErr == "" {
|
||||
if len(replicaErr.replicas) == 0 && replicaErr.err == "" {
|
||||
t.Log("got empty ping callback with no sibling replicas, continuing")
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -993,6 +1000,11 @@ func TestWorkspaceProxyWorkspaceApps(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
type replicaPingCallback struct {
|
||||
replicas []codersdk.Replica
|
||||
err string
|
||||
}
|
||||
|
||||
func TestWorkspaceProxyWorkspaceApps_BlockDirect(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@ -1118,7 +1130,7 @@ func createDERPClient(t *testing.T, ctx context.Context, name string, derpURL st
|
||||
// received on dstCh.
|
||||
//
|
||||
// If the packet doesn't arrive within 500ms, it will try to send it again until
|
||||
// testutil.WaitLong is reached.
|
||||
// the context expires.
|
||||
//
|
||||
//nolint:revive
|
||||
func testDERPSend(t *testing.T, ctx context.Context, dstKey key.NodePublic, dstCh <-chan derp.ReceivedPacket, src *derphttp.Client) {
|
||||
@ -1139,11 +1151,17 @@ func testDERPSend(t *testing.T, ctx context.Context, dstKey key.NodePublic, dstC
|
||||
for {
|
||||
select {
|
||||
case pkt := <-dstCh:
|
||||
require.Equal(t, src.SelfPublicKey(), pkt.Source, "packet came from wrong source")
|
||||
require.Equal(t, msg, pkt.Data, "packet data is wrong")
|
||||
if pkt.Source != src.SelfPublicKey() {
|
||||
t.Logf("packet came from wrong source: %s", pkt.Source)
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(pkt.Data, msg) {
|
||||
t.Logf("packet data is wrong: %s", pkt.Data)
|
||||
continue
|
||||
}
|
||||
return
|
||||
case <-ctx.Done():
|
||||
t.Fatal("timed out waiting for packet")
|
||||
t.Fatal("timed out waiting for valid packet")
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
|
Reference in New Issue
Block a user