1
0
mirror of https://github.com/coder/coder.git synced 2025-03-15 10:17:09 +00:00
Spike Curtis 2c7f8ac65f chore: migrate to coder/websocket 1.8.12 ()
Migrates us to `coder/websocket` v1.8.12 rather than `nhooyr/websocket` on an older version.

Works around https://github.com/coder/websocket/issues/504 by adding an explicit test for `xerrors.Is(err, io.EOF)` where we were previously getting `io.EOF` from the netConn.
2024-12-19 00:51:30 +04:00

189 lines
5.0 KiB
Go

package vpn_test
import (
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"tailscale.com/net/dns"
"tailscale.com/tailcfg"
"github.com/coder/coder/v2/coderd/httpapi"
"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/workspacesdk"
"github.com/coder/coder/v2/tailnet"
"github.com/coder/coder/v2/tailnet/proto"
"github.com/coder/coder/v2/tailnet/tailnettest"
"github.com/coder/coder/v2/testutil"
"github.com/coder/coder/v2/vpn"
"github.com/coder/websocket"
)
func TestClient_WorkspaceUpdates(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
logger := testutil.Logger(t)
userID := uuid.UUID{1}
wsID := uuid.UUID{2}
peerID := uuid.UUID{3}
fCoord := tailnettest.NewFakeCoordinator()
var coord tailnet.Coordinator = fCoord
coordPtr := atomic.Pointer[tailnet.Coordinator]{}
coordPtr.Store(&coord)
ctrl := gomock.NewController(t)
mProvider := tailnettest.NewMockWorkspaceUpdatesProvider(ctrl)
mSub := tailnettest.NewMockSubscription(ctrl)
outUpdateCh := make(chan *proto.WorkspaceUpdate, 1)
inUpdateCh := make(chan tailnet.WorkspaceUpdate, 1)
mProvider.EXPECT().Subscribe(gomock.Any(), userID).Times(1).Return(mSub, nil)
mSub.EXPECT().Updates().MinTimes(1).Return(outUpdateCh)
mSub.EXPECT().Close().Times(1).Return(nil)
svc, err := tailnet.NewClientService(tailnet.ClientServiceOptions{
Logger: logger,
CoordPtr: &coordPtr,
DERPMapUpdateFrequency: time.Hour,
DERPMapFn: func() *tailcfg.DERPMap { return &tailcfg.DERPMap{} },
WorkspaceUpdatesProvider: mProvider,
ResumeTokenProvider: tailnet.NewInsecureTestResumeTokenProvider(),
})
require.NoError(t, err)
user := make(chan struct{})
connInfo := make(chan struct{})
serveErrCh := make(chan error)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/users/me":
httpapi.Write(ctx, w, http.StatusOK, codersdk.User{
ReducedUser: codersdk.ReducedUser{
MinimalUser: codersdk.MinimalUser{
ID: userID,
},
},
})
user <- struct{}{}
case "/api/v2/workspaceagents/connection":
httpapi.Write(ctx, w, http.StatusOK, workspacesdk.AgentConnectionInfo{
DisableDirectConnections: false,
})
connInfo <- struct{}{}
case "/api/v2/tailnet":
// need 2.3 for WorkspaceUpdates RPC
cVer := r.URL.Query().Get("version")
assert.Equal(t, "2.3", cVer)
sws, err := websocket.Accept(w, r, nil)
if !assert.NoError(t, err) {
return
}
wsCtx, nc := codersdk.WebsocketNetConn(ctx, sws, websocket.MessageBinary)
serveErrCh <- svc.ServeConnV2(wsCtx, nc, tailnet.StreamID{
Name: "client",
ID: peerID,
// Auth can be nil as we use a mock update provider
Auth: tailnet.ClientUserCoordinateeAuth{
Auth: nil,
},
})
default:
http.NotFound(w, r)
}
}))
t.Cleanup(server.Close)
svrURL, err := url.Parse(server.URL)
require.NoError(t, err)
connErrCh := make(chan error)
connCh := make(chan vpn.Conn)
go func() {
conn, err := vpn.NewClient().NewConn(ctx, svrURL, "fakeToken", &vpn.Options{
UpdateHandler: updateHandler(func(wu tailnet.WorkspaceUpdate) error {
inUpdateCh <- wu
return nil
}),
DNSConfigurator: &noopConfigurator{},
})
connErrCh <- err
connCh <- conn
}()
testutil.RequireRecvCtx(ctx, t, user)
testutil.RequireRecvCtx(ctx, t, connInfo)
err = testutil.RequireRecvCtx(ctx, t, connErrCh)
require.NoError(t, err)
conn := testutil.RequireRecvCtx(ctx, t, connCh)
// Send a workspace update
update := &proto.WorkspaceUpdate{
UpsertedWorkspaces: []*proto.Workspace{
{
Id: wsID[:],
},
},
}
testutil.RequireSendCtx(ctx, t, outUpdateCh, update)
// It'll be received by the update handler
recvUpdate := testutil.RequireRecvCtx(ctx, t, inUpdateCh)
require.Len(t, recvUpdate.UpsertedWorkspaces, 1)
require.Equal(t, wsID, recvUpdate.UpsertedWorkspaces[0].ID)
// And be reflected on the Conn's state
state, err := conn.CurrentWorkspaceState()
require.NoError(t, err)
require.Equal(t, tailnet.WorkspaceUpdate{
UpsertedWorkspaces: []*tailnet.Workspace{
{
ID: wsID,
},
},
UpsertedAgents: []*tailnet.Agent{},
DeletedWorkspaces: []*tailnet.Workspace{},
DeletedAgents: []*tailnet.Agent{},
}, state)
// Close the conn
conn.Close()
err = testutil.RequireRecvCtx(ctx, t, serveErrCh)
require.NoError(t, err)
}
type updateHandler func(tailnet.WorkspaceUpdate) error
func (h updateHandler) Update(u tailnet.WorkspaceUpdate) error {
return h(u)
}
type noopConfigurator struct{}
func (*noopConfigurator) Close() error {
return nil
}
func (*noopConfigurator) GetBaseConfig() (dns.OSConfig, error) {
return dns.OSConfig{}, nil
}
func (*noopConfigurator) SetDNS(dns.OSConfig) error {
return nil
}
func (*noopConfigurator) SupportsSplitDNS() bool {
return true
}
var _ dns.OSConfigurator = (*noopConfigurator)(nil)