Files
coder/peer/channel.go
Kyle Carberry 8accb815e1 chore: Add peer package for networking (#6)
This package was pulled straight from github.com/coder/m. Nothing has been changed.

It will be used for networking clients<->workspaces, and coderd<->provisionerd.
2022-01-05 11:18:29 -06:00

309 lines
7.7 KiB
Go

package peer
import (
"context"
"io"
"net"
"sync"
"time"
"github.com/pion/datachannel"
"github.com/pion/webrtc/v3"
"golang.org/x/xerrors"
"cdr.dev/slog"
)
const (
bufferedAmountLowThreshold uint64 = 512 * 1024 // 512 KB
maxBufferedAmount uint64 = 1024 * 1024 // 1 MB
// For some reason messages larger just don't work...
// This shouldn't be a huge deal for real-world usage.
// See: https://github.com/pion/datachannel/issues/59
maxMessageLength = 32 * 1024 // 32 KB
)
// newChannel creates a new channel and initializes it.
// The initialization overrides listener handles, and detaches
// the channel on open. The datachannel should not be manually
// mutated after being passed to this function.
func newChannel(conn *Conn, dc *webrtc.DataChannel, opts *ChannelOpts) *Channel {
c := &Channel{
opts: opts,
conn: conn,
dc: dc,
opened: make(chan struct{}),
closed: make(chan struct{}),
sendMore: make(chan struct{}, 1),
}
c.init()
return c
}
type ChannelOpts struct {
// ID is a channel ID that should be used when `Negotiated`
// is true.
ID uint16
// Negotiated returns whether the data channel will already
// be active on the other end. Defaults to false.
Negotiated bool
// Arbitrary string that can be parsed on `Accept`.
Protocol string
// Ordered determines whether the channel acts like
// a TCP connection. Defaults to false.
Unordered bool
// Whether the channel will be left open on disconnect or not.
// If true, data will be buffered on either end to be sent
// once reconnected. Defaults to false.
OpenOnDisconnect bool
}
// Channel represents a WebRTC DataChannel.
//
// This struct wraps webrtc.DataChannel to add concurrent-safe usage,
// data bufferring, and standardized errors for connection state.
//
// It modifies the default behavior of a DataChannel by closing on
// WebRTC PeerConnection failure. This is done to emulate TCP connections.
// This option can be changed in the options when creating a Channel.
type Channel struct {
opts *ChannelOpts
conn *Conn
dc *webrtc.DataChannel
// This field can be nil. It becomes set after the DataChannel
// has been opened and is detached.
rwc datachannel.ReadWriteCloser
closed chan struct{}
closeMutex sync.Mutex
closeError error
opened chan struct{}
// sendMore is used to block Write operations on a full buffer.
// It's signaled when the buffer can accept more data.
sendMore chan struct{}
writeMutex sync.Mutex
}
// init attaches listeners to the DataChannel to detect opening,
// closing, and when the channel is ready to transmit data.
//
// This should only be called once on creation.
func (c *Channel) init() {
// WebRTC connections maintain an internal buffer that can fill when:
// 1. Data is being sent faster than it can flush.
// 2. The connection is disconnected, but data is still being sent.
//
// This applies a maximum in-memory buffer for data, and will cause
// write operations to block once the threshold is set.
c.dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold)
c.dc.OnBufferedAmountLow(func() {
if c.isClosed() {
return
}
select {
case c.sendMore <- struct{}{}:
default:
}
})
c.dc.OnClose(func() {
c.conn.opts.Logger.Debug(context.Background(), "datachannel closing from OnClose", slog.F("id", c.dc.ID()), slog.F("label", c.dc.Label()))
_ = c.closeWithError(ErrClosed)
})
c.dc.OnOpen(func() {
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
c.conn.opts.Logger.Debug(context.Background(), "datachannel opening", slog.F("id", c.dc.ID()), slog.F("label", c.dc.Label()))
var err error
c.rwc, err = c.dc.Detach()
if err != nil {
_ = c.closeWithError(xerrors.Errorf("detach: %w", err))
return
}
close(c.opened)
})
c.conn.dcDisconnectListeners.Add(1)
c.conn.dcFailedListeners.Add(1)
go func() {
var err error
// A DataChannel can disconnect multiple times, so this needs to loop.
for {
select {
case <-c.closed:
// If this channel was closed, there's no need to close again.
return
case <-c.conn.Closed():
// If the RTC connection closed with an error, this channel
// should end with the same one.
err = c.conn.closeError
case <-c.conn.dcDisconnectChannel:
// If the RTC connection is disconnected, we need to check if
// the DataChannel is supposed to end on disconnect.
if c.opts.OpenOnDisconnect {
continue
}
err = xerrors.Errorf("rtc disconnected. closing: %w", ErrClosed)
case <-c.conn.dcFailedChannel:
// If the RTC connection failed, close the Channel.
err = ErrFailed
}
if err != nil {
break
}
}
_ = c.closeWithError(err)
}()
}
// Read blocks until data is received.
//
// This will block until the underlying DataChannel has been opened.
func (c *Channel) Read(b []byte) (n int, err error) {
if c.isClosed() {
return 0, c.closeError
}
if !c.isOpened() {
err := c.waitOpened()
if err != nil {
return 0, err
}
}
n, err = c.rwc.Read(b)
if err != nil {
if c.isClosed() {
return 0, c.closeError
}
// An EOF always occurs when the connection is closed.
// Alternative close errors will occur first if an unexpected
// close has occurred.
if xerrors.Is(err, io.EOF) {
err = c.closeWithError(ErrClosed)
}
return
}
return
}
// Write sends data to the underlying DataChannel.
//
// This function will block if too much data is being sent.
// Data will buffer if the connection is temporarily disconnected,
// and will be flushed upon reconnection.
//
// If the Channel is setup to close on disconnect, any buffered
// data will be lost.
func (c *Channel) Write(b []byte) (n int, err error) {
if len(b) > maxMessageLength {
return 0, xerrors.Errorf("outbound packet larger than maximum message size: %d", maxMessageLength)
}
c.writeMutex.Lock()
defer c.writeMutex.Unlock()
if c.isClosed() {
return 0, c.closeWithError(nil)
}
if !c.isOpened() {
err := c.waitOpened()
if err != nil {
return 0, err
}
}
if c.dc.BufferedAmount()+uint64(len(b)) >= maxBufferedAmount {
<-c.sendMore
}
// TODO (@kyle): There's an obvious race-condition here.
// This is an edge-case, as most-frequently data won't
// be pooled so synchronously, but is definitely possible.
//
// See: https://github.com/pion/sctp/issues/181
time.Sleep(time.Microsecond)
return c.rwc.Write(b)
}
// Close gracefully closes the DataChannel.
func (c *Channel) Close() error {
return c.closeWithError(nil)
}
// Label returns the label of the underlying DataChannel.
func (c *Channel) Label() string {
return c.dc.Label()
}
// NetConn wraps the DataChannel in a struct fulfilling net.Conn.
// Read, Write, and Close operations can still be used on the *Channel struct.
func (c *Channel) NetConn() net.Conn {
return &fakeNetConn{
c: c,
addr: &peerAddr{},
}
}
// closeWithError closes the Channel with the error provided.
// If a graceful close occurs, the error will be nil.
func (c *Channel) closeWithError(err error) error {
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
if c.isClosed() {
return c.closeError
}
c.conn.opts.Logger.Debug(context.Background(), "datachannel closing with error", slog.F("id", c.dc.ID()), slog.F("label", c.dc.Label()), slog.Error(err))
if err == nil {
c.closeError = ErrClosed
} else {
c.closeError = err
}
close(c.closed)
close(c.sendMore)
c.conn.dcDisconnectListeners.Sub(1)
c.conn.dcFailedListeners.Sub(1)
if c.rwc != nil {
_ = c.rwc.Close()
}
_ = c.dc.Close()
return err
}
func (c *Channel) isClosed() bool {
select {
case <-c.closed:
return true
default:
return false
}
}
func (c *Channel) isOpened() bool {
select {
case <-c.opened:
return true
default:
return false
}
}
func (c *Channel) waitOpened() error {
select {
case <-c.opened:
return nil
case <-c.closed:
return c.closeError
}
}