feat: Add TURN proxying to enable offline deployments (#1000)

* Add turnconn

* Add option for passing ICE servers

* Log TURN remote address

* Add TURN server to coder start
This commit is contained in:
Kyle Carberry
2022-04-18 17:40:25 -05:00
committed by GitHub
parent e5a1c305d3
commit d202f20fdb
25 changed files with 604 additions and 113 deletions

203
coderd/turnconn/turnconn.go Normal file
View File

@ -0,0 +1,203 @@
package turnconn
import (
"io"
"net"
"sync"
"github.com/pion/logging"
"github.com/pion/turn/v2"
"github.com/pion/webrtc/v3"
"golang.org/x/net/proxy"
"golang.org/x/xerrors"
)
var (
// reservedAddress is a magic address that's used exclusively
// for proxying via Coder. We don't proxy all TURN connections,
// because that'd exclude the possibility of a customer using
// their own TURN server.
reservedAddress = "127.0.0.1:12345"
credential = "coder"
localhost = &net.TCPAddr{
IP: net.IPv4(127, 0, 0, 1),
}
// Proxy is a an ICE Server that uses a special hostname
// to indicate traffic should be proxied.
Proxy = webrtc.ICEServer{
URLs: []string{"turns:" + reservedAddress},
Username: "coder",
Credential: credential,
}
)
// New constructs a new TURN server binding to the relay address provided.
// The relay address is used to broadcast the location of an accepted connection.
func New(relayAddress *turn.RelayAddressGeneratorStatic) (*Server, error) {
if relayAddress == nil {
relayAddress = &turn.RelayAddressGeneratorStatic{
RelayAddress: localhost.IP,
Address: "127.0.0.1",
}
}
logger := logging.NewDefaultLoggerFactory()
logger.DefaultLogLevel = logging.LogLevelDebug
server := &Server{
conns: make(chan net.Conn, 1),
closed: make(chan struct{}),
}
server.listener = &listener{
srv: server,
}
var err error
server.turn, err = turn.NewServer(turn.ServerConfig{
AuthHandler: func(username, realm string, srcAddr net.Addr) (key []byte, ok bool) {
// TURN connections require credentials. It's not important
// for our use-case, because our listener is entirely in-memory.
return turn.GenerateAuthKey(Proxy.Username, "", credential), true
},
ListenerConfigs: []turn.ListenerConfig{{
Listener: server.listener,
RelayAddressGenerator: relayAddress,
}},
LoggerFactory: logger,
})
if err != nil {
return nil, xerrors.Errorf("create server: %w", err)
}
return server, nil
}
// Server accepts and connects TURN allocations.
//
// This is a thin wrapper around pion/turn that pipes
// connections directly to the in-memory handler.
type Server struct {
listener *listener
turn *turn.Server
closeMutex sync.Mutex
closed chan (struct{})
conns chan (net.Conn)
}
// Accept consumes a new connection into the TURN server.
// A unique remote address must exist per-connection.
// pion/turn indexes allocations based on the address.
func (s *Server) Accept(nc net.Conn, remoteAddress, localAddress *net.TCPAddr) *Conn {
if localAddress == nil {
localAddress = localhost
}
conn := &Conn{
Conn: nc,
remoteAddress: remoteAddress,
localAddress: localAddress,
closed: make(chan struct{}),
}
s.conns <- conn
return conn
}
// Close ends the TURN server.
func (s *Server) Close() error {
s.closeMutex.Lock()
defer s.closeMutex.Unlock()
if s.isClosed() {
return nil
}
err := s.turn.Close()
close(s.conns)
close(s.closed)
return err
}
func (s *Server) isClosed() bool {
select {
case <-s.closed:
return true
default:
return false
}
}
// listener implements net.Listener for the TURN
// server to consume.
type listener struct {
srv *Server
}
func (l *listener) Accept() (net.Conn, error) {
conn, ok := <-l.srv.conns
if !ok {
return nil, io.EOF
}
return conn, nil
}
func (*listener) Close() error {
return nil
}
func (*listener) Addr() net.Addr {
return nil
}
type Conn struct {
net.Conn
closed chan struct{}
localAddress *net.TCPAddr
remoteAddress *net.TCPAddr
}
func (c *Conn) LocalAddr() net.Addr {
return c.localAddress
}
func (c *Conn) RemoteAddr() net.Addr {
return c.remoteAddress
}
// Closed returns a channel which is closed when
// the connection is.
func (c *Conn) Closed() <-chan struct{} {
return c.closed
}
func (c *Conn) Close() error {
err := c.Conn.Close()
select {
case <-c.closed:
default:
close(c.closed)
}
return err
}
type dialer func(network, addr string) (c net.Conn, err error)
func (d dialer) Dial(network, addr string) (c net.Conn, err error) {
return d(network, addr)
}
// ProxyDialer accepts a proxy function that's called when the connection
// address matches the reserved host in the "Proxy" ICE server.
//
// This should be passed to WebRTC connections as an ICE dialer.
func ProxyDialer(proxyFunc func() (c net.Conn, err error)) proxy.Dialer {
return dialer(func(network, addr string) (net.Conn, error) {
if addr != reservedAddress {
return proxy.Direct.Dial(network, addr)
}
netConn, err := proxyFunc()
if err != nil {
return nil, err
}
return &Conn{
localAddress: localhost,
closed: make(chan struct{}),
Conn: netConn,
}, nil
})
}

View File

@ -0,0 +1,106 @@
package turnconn_test
import (
"net"
"sync"
"testing"
"github.com/pion/webrtc/v3"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/coderd/turnconn"
"github.com/coder/coder/peer"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestTURNConn(t *testing.T) {
t.Parallel()
turnServer, err := turnconn.New(nil)
require.NoError(t, err)
defer turnServer.Close()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
clientDialer, clientTURN := net.Pipe()
turnServer.Accept(clientTURN, &net.TCPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 16000,
}, nil)
require.NoError(t, err)
clientSettings := webrtc.SettingEngine{}
clientSettings.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6})
clientSettings.SetRelayAcceptanceMinWait(0)
clientSettings.SetICEProxyDialer(turnconn.ProxyDialer(func() (net.Conn, error) {
return clientDialer, nil
}))
client, err := peer.Client([]webrtc.ICEServer{turnconn.Proxy}, &peer.ConnOptions{
SettingEngine: clientSettings,
Logger: logger.Named("client"),
})
require.NoError(t, err)
serverDialer, serverTURN := net.Pipe()
turnServer.Accept(serverTURN, &net.TCPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 16001,
}, nil)
require.NoError(t, err)
serverSettings := webrtc.SettingEngine{}
serverSettings.SetNetworkTypes([]webrtc.NetworkType{webrtc.NetworkTypeTCP4, webrtc.NetworkTypeTCP6})
serverSettings.SetRelayAcceptanceMinWait(0)
serverSettings.SetICEProxyDialer(turnconn.ProxyDialer(func() (net.Conn, error) {
return serverDialer, nil
}))
server, err := peer.Server([]webrtc.ICEServer{turnconn.Proxy}, &peer.ConnOptions{
SettingEngine: serverSettings,
Logger: logger.Named("server"),
})
require.NoError(t, err)
exchange(t, client, server)
_, err = client.Ping()
require.NoError(t, err)
}
func exchange(t *testing.T, client, server *peer.Conn) {
var wg sync.WaitGroup
wg.Add(2)
t.Cleanup(func() {
_ = client.Close()
_ = server.Close()
wg.Wait()
})
go func() {
defer wg.Done()
for {
select {
case c := <-server.LocalCandidate():
client.AddRemoteCandidate(c)
case c := <-server.LocalSessionDescription():
client.SetRemoteSessionDescription(c)
case <-server.Closed():
return
}
}
}()
go func() {
defer wg.Done()
for {
select {
case c := <-client.LocalCandidate():
server.AddRemoteCandidate(c)
case c := <-client.LocalSessionDescription():
server.SetRemoteSessionDescription(c)
case <-client.Closed():
return
}
}
}()
}