From d43699306bf4977de24b84e65d63d0ae854ffceb Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Thu, 17 Feb 2022 08:45:14 -0600 Subject: [PATCH] fix: Use buffered reader in peer to fix ShortBuffer (#303) This prevents a io.ErrShortBuffer from occurring when the byte slice being read is smaller than the chunks sent from the opposite pipe. This makes sense for unordered connections, where transmission is not guaranteed, but does not make sense for TCP-like connections. We use a bufio.Reader when ordered to ensure data isn't lost. --- .vscode/settings.json | 1 + peer/channel.go | 21 +++++++++++++++++++-- peer/conn_test.go | 21 +++++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 34ed9fbae2..d9b2b88f17 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -57,6 +57,7 @@ "tfexec", "tfstate", "unconvert", + "webrtc", "xerrors", "yamux" ] diff --git a/peer/channel.go b/peer/channel.go index b01154bcfa..d1f4930fe3 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -1,6 +1,7 @@ package peer import ( + "bufio" "context" "io" "net" @@ -78,7 +79,8 @@ type Channel struct { dc *webrtc.DataChannel // This field can be nil. It becomes set after the DataChannel // has been opened and is detached. - rwc datachannel.ReadWriteCloser + rwc datachannel.ReadWriteCloser + reader io.Reader closed chan struct{} closeMutex sync.Mutex @@ -130,6 +132,21 @@ func (c *Channel) init() { _ = c.closeWithError(xerrors.Errorf("detach: %w", err)) return } + // pion/webrtc will return an io.ErrShortBuffer when a read + // is triggerred with a buffer size less than the chunks written. + // + // This makes sense when considering UDP connections, because + // bufferring of data that has no transmit guarantees is likely + // to cause unexpected behavior. + // + // When ordered, this adds a bufio.Reader. This ensures additional + // data on TCP-like connections can be read in parts, while still + // being bufferred. + if c.opts.Unordered { + c.reader = c.rwc + } else { + c.reader = bufio.NewReader(c.rwc) + } close(c.opened) }) @@ -181,7 +198,7 @@ func (c *Channel) Read(bytes []byte) (int, error) { } } - bytesRead, err := c.rwc.Read(bytes) + bytesRead, err := c.reader.Read(bytes) if err != nil { if c.isClosed() { return 0, c.closeError diff --git a/peer/conn_test.go b/peer/conn_test.go index 519e5f3b74..644390ba2e 100644 --- a/peer/conn_test.go +++ b/peer/conn_test.go @@ -267,6 +267,27 @@ func TestConn(t *testing.T) { _, err := client.Ping() require.NoError(t, err) }) + + t.Run("ShortBuffer", func(t *testing.T) { + t.Parallel() + client, server, _ := createPair(t) + exchange(client, server) + go func() { + channel, err := client.Dial(context.Background(), "test", nil) + require.NoError(t, err) + _, err = channel.Write([]byte{1, 2}) + require.NoError(t, err) + }() + channel, err := server.Accept(context.Background()) + require.NoError(t, err) + data := make([]byte, 1) + _, err = channel.Read(data) + require.NoError(t, err) + require.Equal(t, uint8(0x1), data[0]) + _, err = channel.Read(data) + require.NoError(t, err) + require.Equal(t, uint8(0x2), data[0]) + }) } func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.Router) {