mirror of
https://github.com/coder/coder.git
synced 2025-07-06 15:41:45 +00:00
refactor: Improve handshake resiliency of peer (#95)
* fix: Synchronize peer logging with a channel We were depending on the close mutex to properly report connection state. This ensures the RTC connection is properly closed before returning. * Disable pion logging * Remove buffer * Try ICE servers * Remove flushed * Add diagram explaining handshake * Fix candidate accept ordering * Add debug logging to peerbroker * Fix send ordering * Lock adding ICE candidate * Add test for negotiating out of order * Reduce connection to a single negotiation channel * Improve test times by pre-installing Terraform * Lock remote session description being applied * Organize conn * Revert to multi-channel setup * Properly close ICE gatherer * Improve comments * Try removing buffered candidates * Buffer local and remote messages * Log dTLS transport state * Add pion logging
This commit is contained in:
8
.github/workflows/coder.yaml
vendored
8
.github/workflows/coder.yaml
vendored
@ -148,11 +148,17 @@ jobs:
|
|||||||
|
|
||||||
- run: go install gotest.tools/gotestsum@latest
|
- run: go install gotest.tools/gotestsum@latest
|
||||||
|
|
||||||
|
- uses: hashicorp/setup-terraform@v1
|
||||||
|
if: runner.os == 'Linux'
|
||||||
|
with:
|
||||||
|
terraform_version: 1.1.2
|
||||||
|
terraform_wrapper: false
|
||||||
|
|
||||||
- name: Test with Mock Database
|
- name: Test with Mock Database
|
||||||
run:
|
run:
|
||||||
gotestsum --jsonfile="gotests.json" --packages="./..." --
|
gotestsum --jsonfile="gotests.json" --packages="./..." --
|
||||||
-covermode=atomic -coverprofile="gotests.coverage" -timeout=3m
|
-covermode=atomic -coverprofile="gotests.coverage" -timeout=3m
|
||||||
-count=3 -race -parallel=2
|
-count=3 -race -short -parallel=2
|
||||||
|
|
||||||
- name: Test with PostgreSQL Database
|
- name: Test with PostgreSQL Database
|
||||||
if: runner.os == 'Linux'
|
if: runner.os == 'Linux'
|
||||||
|
@ -1,3 +1,7 @@
|
|||||||
|
// This test runs slowly on MacOS instance, and really
|
||||||
|
// only needs to run on Linux anyways.
|
||||||
|
//go:build linux
|
||||||
|
|
||||||
package userpassword_test
|
package userpassword_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -20,6 +20,11 @@ func TestMain(m *testing.M) {
|
|||||||
func TestMigrate(t *testing.T) {
|
func TestMigrate(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
t.Run("Once", func(t *testing.T) {
|
t.Run("Once", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
connection, closeFn, err := postgres.Open()
|
connection, closeFn, err := postgres.Open()
|
||||||
|
@ -21,6 +21,11 @@ func TestMain(m *testing.M) {
|
|||||||
func TestPostgres(t *testing.T) {
|
func TestPostgres(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
connect, close, err := postgres.Open()
|
connect, close, err := postgres.Open()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer close()
|
defer close()
|
||||||
|
2
go.mod
2
go.mod
@ -13,7 +13,6 @@ require (
|
|||||||
github.com/golang-migrate/migrate/v4 v4.15.1
|
github.com/golang-migrate/migrate/v4 v4.15.1
|
||||||
github.com/google/uuid v1.3.0
|
github.com/google/uuid v1.3.0
|
||||||
github.com/hashicorp/go-version v1.4.0
|
github.com/hashicorp/go-version v1.4.0
|
||||||
github.com/hashicorp/hc-install v0.3.1
|
|
||||||
github.com/hashicorp/terraform-config-inspect v0.0.0-20211115214459-90acf1ca460f
|
github.com/hashicorp/terraform-config-inspect v0.0.0-20211115214459-90acf1ca460f
|
||||||
github.com/hashicorp/terraform-exec v0.15.0
|
github.com/hashicorp/terraform-exec v0.15.0
|
||||||
github.com/justinas/nosurf v1.1.1
|
github.com/justinas/nosurf v1.1.1
|
||||||
@ -64,7 +63,6 @@ require (
|
|||||||
github.com/google/go-cmp v0.5.7 // indirect
|
github.com/google/go-cmp v0.5.7 // indirect
|
||||||
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
|
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
|
||||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||||
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
|
|
||||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||||
github.com/hashicorp/hcl/v2 v2.11.1 // indirect
|
github.com/hashicorp/hcl/v2 v2.11.1 // indirect
|
||||||
|
3
go.sum
3
go.sum
@ -693,8 +693,6 @@ github.com/hashicorp/go-version v1.4.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
|
|||||||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||||
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||||
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||||
github.com/hashicorp/hc-install v0.3.1 h1:VIjllE6KyAI1A244G8kTaHXy+TL5/XYzvrtFi8po/Yk=
|
|
||||||
github.com/hashicorp/hc-install v0.3.1/go.mod h1:3LCdWcCDS1gaHC9mhHCGbkYfoY6vdsKohGjugbZdZak=
|
|
||||||
github.com/hashicorp/hcl v0.0.0-20170504190234-a4b07c25de5f/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w=
|
github.com/hashicorp/hcl v0.0.0-20170504190234-a4b07c25de5f/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w=
|
||||||
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
|
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
|
||||||
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
||||||
@ -1309,7 +1307,6 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPh
|
|||||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||||
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||||
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
|
||||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||||
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||||
|
@ -141,9 +141,9 @@ func (c *Channel) init() {
|
|||||||
// A DataChannel can disconnect multiple times, so this needs to loop.
|
// A DataChannel can disconnect multiple times, so this needs to loop.
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.closed:
|
case <-c.conn.closedRTC:
|
||||||
// If this channel was closed, there's no need to close again.
|
// If this channel was closed, there's no need to close again.
|
||||||
return
|
err = c.conn.closeError
|
||||||
case <-c.conn.Closed():
|
case <-c.conn.Closed():
|
||||||
// If the RTC connection closed with an error, this channel
|
// If the RTC connection closed with an error, this channel
|
||||||
// should end with the same one.
|
// should end with the same one.
|
||||||
|
433
peer/conn.go
433
peer/conn.go
@ -3,8 +3,8 @@ package peer
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"crypto/sha256"
|
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -49,12 +49,10 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
|
|||||||
opts = &ConnOptions{}
|
opts = &ConnOptions{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enables preference to STUN.
|
|
||||||
opts.SettingEngine.SetSrflxAcceptanceMinWait(0)
|
|
||||||
opts.SettingEngine.DetachDataChannels()
|
opts.SettingEngine.DetachDataChannels()
|
||||||
lf := logging.NewDefaultLoggerFactory()
|
factory := logging.NewDefaultLoggerFactory()
|
||||||
lf.DefaultLogLevel = logging.LogLevelDisabled
|
factory.DefaultLogLevel = logging.LogLevelDisabled
|
||||||
opts.SettingEngine.LoggerFactory = lf
|
opts.SettingEngine.LoggerFactory = factory
|
||||||
api := webrtc.NewAPI(webrtc.WithSettingEngine(opts.SettingEngine))
|
api := webrtc.NewAPI(webrtc.WithSettingEngine(opts.SettingEngine))
|
||||||
rtc, err := api.NewPeerConnection(webrtc.Configuration{
|
rtc, err := api.NewPeerConnection(webrtc.Configuration{
|
||||||
ICEServers: servers,
|
ICEServers: servers,
|
||||||
@ -63,21 +61,20 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
|
|||||||
return nil, xerrors.Errorf("create peer connection: %w", err)
|
return nil, xerrors.Errorf("create peer connection: %w", err)
|
||||||
}
|
}
|
||||||
conn := &Conn{
|
conn := &Conn{
|
||||||
pingChannelID: 1,
|
pingChannelID: 1,
|
||||||
pingEchoChannelID: 2,
|
pingEchoChannelID: 2,
|
||||||
opts: opts,
|
opts: opts,
|
||||||
rtc: rtc,
|
rtc: rtc,
|
||||||
offerrer: client,
|
offerrer: client,
|
||||||
closed: make(chan struct{}),
|
closed: make(chan struct{}),
|
||||||
dcOpenChannel: make(chan *webrtc.DataChannel),
|
closedRTC: make(chan struct{}),
|
||||||
dcDisconnectChannel: make(chan struct{}),
|
closedICE: make(chan struct{}),
|
||||||
dcFailedChannel: make(chan struct{}),
|
dcOpenChannel: make(chan *webrtc.DataChannel),
|
||||||
// This channel needs to be bufferred otherwise slow consumers
|
dcDisconnectChannel: make(chan struct{}),
|
||||||
// of this will cause a connection failure.
|
dcFailedChannel: make(chan struct{}),
|
||||||
localCandidateChannel: make(chan webrtc.ICECandidateInit, 16),
|
localCandidateChannel: make(chan webrtc.ICECandidateInit),
|
||||||
pendingRemoteCandidates: make([]webrtc.ICECandidateInit, 0),
|
localSessionDescriptionChannel: make(chan webrtc.SessionDescription, 1),
|
||||||
localSessionDescriptionChannel: make(chan webrtc.SessionDescription),
|
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription, 1),
|
||||||
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription),
|
|
||||||
}
|
}
|
||||||
if client {
|
if client {
|
||||||
// If we're the client, we want to flip the echo and
|
// If we're the client, we want to flip the echo and
|
||||||
@ -108,9 +105,13 @@ type Conn struct {
|
|||||||
// Determines whether this connection will send the offer or the answer.
|
// Determines whether this connection will send the offer or the answer.
|
||||||
offerrer bool
|
offerrer bool
|
||||||
|
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
closeMutex sync.Mutex
|
closedRTC chan struct{}
|
||||||
closeError error
|
closedRTCMutex sync.Mutex
|
||||||
|
closedICE chan struct{}
|
||||||
|
closedICEMutex sync.Mutex
|
||||||
|
closeMutex sync.Mutex
|
||||||
|
closeError error
|
||||||
|
|
||||||
dcOpenChannel chan *webrtc.DataChannel
|
dcOpenChannel chan *webrtc.DataChannel
|
||||||
dcDisconnectChannel chan struct{}
|
dcDisconnectChannel chan struct{}
|
||||||
@ -123,9 +124,8 @@ type Conn struct {
|
|||||||
localSessionDescriptionChannel chan webrtc.SessionDescription
|
localSessionDescriptionChannel chan webrtc.SessionDescription
|
||||||
remoteSessionDescriptionChannel chan webrtc.SessionDescription
|
remoteSessionDescriptionChannel chan webrtc.SessionDescription
|
||||||
|
|
||||||
pendingRemoteCandidates []webrtc.ICECandidateInit
|
negotiateMutex sync.Mutex
|
||||||
pendingCandidatesMutex sync.Mutex
|
hasNegotiated bool
|
||||||
pendingCandidatesFlushed bool
|
|
||||||
|
|
||||||
pingChannelID uint16
|
pingChannelID uint16
|
||||||
pingEchoChannelID uint16
|
pingEchoChannelID uint16
|
||||||
@ -140,67 +140,53 @@ type Conn struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) init() error {
|
func (c *Conn) init() error {
|
||||||
|
// The negotiation needed callback can take a little bit to execute!
|
||||||
|
c.negotiateMutex.Lock()
|
||||||
|
|
||||||
c.rtc.OnNegotiationNeeded(c.negotiate)
|
c.rtc.OnNegotiationNeeded(c.negotiate)
|
||||||
c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) {
|
c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) {
|
||||||
// Close must be locked here otherwise log output can appear
|
|
||||||
// after the connection has been closed.
|
|
||||||
c.closeMutex.Lock()
|
|
||||||
defer c.closeMutex.Unlock()
|
|
||||||
if c.isClosed() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.opts.Logger.Debug(context.Background(), "ice connection state updated",
|
c.opts.Logger.Debug(context.Background(), "ice connection state updated",
|
||||||
slog.F("state", iceConnectionState))
|
slog.F("state", iceConnectionState))
|
||||||
|
|
||||||
|
if iceConnectionState == webrtc.ICEConnectionStateClosed {
|
||||||
|
// pion/webrtc can update this state multiple times.
|
||||||
|
// A connection can never become un-closed, so we
|
||||||
|
// close the channel if it isn't already.
|
||||||
|
c.closedICEMutex.Lock()
|
||||||
|
defer c.closedICEMutex.Unlock()
|
||||||
|
select {
|
||||||
|
case <-c.closedICE:
|
||||||
|
default:
|
||||||
|
close(c.closedICE)
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) {
|
c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) {
|
||||||
// Close can't be locked here, because this is triggered
|
|
||||||
// when close is called. It doesn't appear this get's
|
|
||||||
// executed after close though, so it shouldn't cause
|
|
||||||
// problems.
|
|
||||||
if c.isClosed() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.opts.Logger.Debug(context.Background(), "ice gathering state updated",
|
c.opts.Logger.Debug(context.Background(), "ice gathering state updated",
|
||||||
slog.F("state", iceGatherState))
|
slog.F("state", iceGatherState))
|
||||||
})
|
|
||||||
c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) {
|
if iceGatherState == webrtc.ICEGathererStateClosed {
|
||||||
if iceCandidate == nil {
|
// pion/webrtc can update this state multiple times.
|
||||||
return
|
// A connection can never become un-closed, so we
|
||||||
}
|
// close the channel if it isn't already.
|
||||||
json := iceCandidate.ToJSON()
|
c.closedICEMutex.Lock()
|
||||||
c.opts.Logger.Debug(context.Background(), "writing candidate to channel",
|
defer c.closedICEMutex.Unlock()
|
||||||
slog.F("hash", sha256.Sum224([]byte(json.Candidate))),
|
select {
|
||||||
slog.F("length", len(json.Candidate)),
|
case <-c.closedICE:
|
||||||
)
|
default:
|
||||||
select {
|
close(c.closedICE)
|
||||||
case <-c.closed:
|
}
|
||||||
break
|
|
||||||
case c.localCandidateChannel <- json:
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) {
|
c.rtc.OnConnectionStateChange(func(peerConnectionState webrtc.PeerConnectionState) {
|
||||||
select {
|
|
||||||
case <-c.closed:
|
|
||||||
return
|
|
||||||
case c.dcOpenChannel <- dc:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
})
|
|
||||||
c.rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) {
|
|
||||||
// Close must be locked here otherwise log output can appear
|
|
||||||
// after the connection has been closed.
|
|
||||||
c.closeMutex.Lock()
|
|
||||||
defer c.closeMutex.Unlock()
|
|
||||||
if c.isClosed() {
|
if c.isClosed() {
|
||||||
|
// Make sure we don't log after Close() has been called.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.opts.Logger.Debug(context.Background(), "rtc connection updated",
|
c.opts.Logger.Debug(context.Background(), "rtc connection updated",
|
||||||
slog.F("state", pcs))
|
slog.F("state", peerConnectionState))
|
||||||
|
|
||||||
switch pcs {
|
switch peerConnectionState {
|
||||||
case webrtc.PeerConnectionStateDisconnected:
|
case webrtc.PeerConnectionStateDisconnected:
|
||||||
for i := 0; i < int(c.dcDisconnectListeners.Load()); i++ {
|
for i := 0; i < int(c.dcDisconnectListeners.Load()); i++ {
|
||||||
select {
|
select {
|
||||||
@ -215,6 +201,52 @@ func (c *Conn) init() error {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case webrtc.PeerConnectionStateClosed:
|
||||||
|
// pion/webrtc can update this state multiple times.
|
||||||
|
// A connection can never become un-closed, so we
|
||||||
|
// close the channel if it isn't already.
|
||||||
|
c.closedRTCMutex.Lock()
|
||||||
|
defer c.closedRTCMutex.Unlock()
|
||||||
|
select {
|
||||||
|
case <-c.closedRTC:
|
||||||
|
default:
|
||||||
|
close(c.closedRTC)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
c.rtc.OnSignalingStateChange(func(signalState webrtc.SignalingState) {
|
||||||
|
c.opts.Logger.Debug(context.Background(), "signaling state updated",
|
||||||
|
slog.F("state", signalState))
|
||||||
|
})
|
||||||
|
c.rtc.SCTP().Transport().OnStateChange(func(dtlsTransportState webrtc.DTLSTransportState) {
|
||||||
|
c.opts.Logger.Debug(context.Background(), "dtls transport state updated",
|
||||||
|
slog.F("state", dtlsTransportState))
|
||||||
|
})
|
||||||
|
c.rtc.SCTP().Transport().ICETransport().OnSelectedCandidatePairChange(func(candidatePair *webrtc.ICECandidatePair) {
|
||||||
|
c.opts.Logger.Debug(context.Background(), "selected candidate pair changed",
|
||||||
|
slog.F("local", candidatePair.Local), slog.F("remote", candidatePair.Remote))
|
||||||
|
})
|
||||||
|
c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) {
|
||||||
|
if iceCandidate == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Run this in a goroutine so we don't block pion/webrtc
|
||||||
|
// from continuing.
|
||||||
|
go func() {
|
||||||
|
c.opts.Logger.Debug(context.Background(), "sending local candidate", slog.F("candidate", iceCandidate.ToJSON().Candidate))
|
||||||
|
select {
|
||||||
|
case <-c.closed:
|
||||||
|
break
|
||||||
|
case c.localCandidateChannel <- iceCandidate.ToJSON():
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
})
|
||||||
|
c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) {
|
||||||
|
select {
|
||||||
|
case <-c.closed:
|
||||||
|
return
|
||||||
|
case c.dcOpenChannel <- dc:
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
_, err := c.pingChannel()
|
_, err := c.pingChannel()
|
||||||
@ -229,6 +261,123 @@ func (c *Conn) init() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// negotiate is triggered when a connection is ready to be established.
|
||||||
|
// See trickle ICE for the expected exchange: https://webrtchacks.com/trickle-ice/
|
||||||
|
func (c *Conn) negotiate() {
|
||||||
|
c.opts.Logger.Debug(context.Background(), "negotiating")
|
||||||
|
// ICE candidates cannot be added until SessionDescriptions have been
|
||||||
|
// exchanged between peers.
|
||||||
|
if c.hasNegotiated {
|
||||||
|
c.negotiateMutex.Lock()
|
||||||
|
}
|
||||||
|
c.hasNegotiated = true
|
||||||
|
defer c.negotiateMutex.Unlock()
|
||||||
|
|
||||||
|
if c.offerrer {
|
||||||
|
offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{})
|
||||||
|
if err != nil {
|
||||||
|
_ = c.CloseWithError(xerrors.Errorf("create offer: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// pion/webrtc will panic if Close is called while this
|
||||||
|
// function is being executed.
|
||||||
|
c.closeMutex.Lock()
|
||||||
|
err = c.rtc.SetLocalDescription(offer)
|
||||||
|
c.closeMutex.Unlock()
|
||||||
|
if err != nil {
|
||||||
|
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.opts.Logger.Debug(context.Background(), "sending offer")
|
||||||
|
select {
|
||||||
|
case <-c.closed:
|
||||||
|
return
|
||||||
|
case c.localSessionDescriptionChannel <- offer:
|
||||||
|
}
|
||||||
|
c.opts.Logger.Debug(context.Background(), "sent offer")
|
||||||
|
}
|
||||||
|
|
||||||
|
var sessionDescription webrtc.SessionDescription
|
||||||
|
c.opts.Logger.Debug(context.Background(), "awaiting remote description...")
|
||||||
|
select {
|
||||||
|
case <-c.closed:
|
||||||
|
return
|
||||||
|
case sessionDescription = <-c.remoteSessionDescriptionChannel:
|
||||||
|
}
|
||||||
|
c.opts.Logger.Debug(context.Background(), "setting remote description")
|
||||||
|
|
||||||
|
err := c.rtc.SetRemoteDescription(sessionDescription)
|
||||||
|
if err != nil {
|
||||||
|
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !c.offerrer {
|
||||||
|
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{})
|
||||||
|
if err != nil {
|
||||||
|
_ = c.CloseWithError(xerrors.Errorf("create answer: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// pion/webrtc will panic if Close is called while this
|
||||||
|
// function is being executed.
|
||||||
|
c.closeMutex.Lock()
|
||||||
|
err = c.rtc.SetLocalDescription(answer)
|
||||||
|
c.closeMutex.Unlock()
|
||||||
|
if err != nil {
|
||||||
|
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.opts.Logger.Debug(context.Background(), "sending answer")
|
||||||
|
select {
|
||||||
|
case <-c.closed:
|
||||||
|
return
|
||||||
|
case c.localSessionDescriptionChannel <- answer:
|
||||||
|
}
|
||||||
|
c.opts.Logger.Debug(context.Background(), "sent answer")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddRemoteCandidate adds a remote candidate to the RTC connection.
|
||||||
|
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) {
|
||||||
|
if c.isClosed() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// This must occur in a goroutine to allow the SessionDescriptions
|
||||||
|
// to be exchanged first.
|
||||||
|
go func() {
|
||||||
|
c.negotiateMutex.Lock()
|
||||||
|
defer c.negotiateMutex.Unlock()
|
||||||
|
c.opts.Logger.Debug(context.Background(), "accepting candidate", slog.F("candidate", i.Candidate))
|
||||||
|
err := c.rtc.AddICECandidate(i)
|
||||||
|
if err != nil {
|
||||||
|
if c.rtc.ConnectionState() == webrtc.PeerConnectionStateClosed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = c.CloseWithError(xerrors.Errorf("accept candidate: %w", err))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRemoteSessionDescription sets the remote description for the WebRTC connection.
|
||||||
|
func (c *Conn) SetRemoteSessionDescription(sessionDescription webrtc.SessionDescription) {
|
||||||
|
select {
|
||||||
|
case <-c.closed:
|
||||||
|
case c.remoteSessionDescriptionChannel <- sessionDescription:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// LocalSessionDescription returns a channel that emits a session description
|
||||||
|
// when one is required to be exchanged.
|
||||||
|
func (c *Conn) LocalSessionDescription() <-chan webrtc.SessionDescription {
|
||||||
|
return c.localSessionDescriptionChannel
|
||||||
|
}
|
||||||
|
|
||||||
|
// LocalCandidate returns a channel that emits when a local candidate
|
||||||
|
// needs to be exchanged with a remote connection.
|
||||||
|
func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
|
||||||
|
return c.localCandidateChannel
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Conn) pingChannel() (*Channel, error) {
|
func (c *Conn) pingChannel() (*Channel, error) {
|
||||||
c.pingOnce.Do(func() {
|
c.pingOnce.Do(func() {
|
||||||
c.pingChan, c.pingError = c.dialChannel(context.Background(), "ping", &ChannelOptions{
|
c.pingChan, c.pingError = c.dialChannel(context.Background(), "ping", &ChannelOptions{
|
||||||
@ -275,136 +424,12 @@ func (c *Conn) pingEchoChannel() (*Channel, error) {
|
|||||||
return c.pingEchoChan, c.pingEchoError
|
return c.pingEchoChan, c.pingEchoError
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Conn) negotiate() {
|
|
||||||
c.opts.Logger.Debug(context.Background(), "negotiating")
|
|
||||||
|
|
||||||
if c.offerrer {
|
|
||||||
offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{})
|
|
||||||
if err != nil {
|
|
||||||
_ = c.CloseWithError(xerrors.Errorf("create offer: %w", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.opts.Logger.Debug(context.Background(), "setting local description")
|
|
||||||
err = c.rtc.SetLocalDescription(offer)
|
|
||||||
if err != nil {
|
|
||||||
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-c.closed:
|
|
||||||
return
|
|
||||||
case c.localSessionDescriptionChannel <- offer:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var remoteDescription webrtc.SessionDescription
|
|
||||||
select {
|
|
||||||
case <-c.closed:
|
|
||||||
return
|
|
||||||
case remoteDescription = <-c.remoteSessionDescriptionChannel:
|
|
||||||
}
|
|
||||||
|
|
||||||
c.opts.Logger.Debug(context.Background(), "setting remote description")
|
|
||||||
err := c.rtc.SetRemoteDescription(remoteDescription)
|
|
||||||
if err != nil {
|
|
||||||
_ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if !c.offerrer {
|
|
||||||
answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{})
|
|
||||||
if err != nil {
|
|
||||||
_ = c.CloseWithError(xerrors.Errorf("create answer: %w", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.opts.Logger.Debug(context.Background(), "setting local description")
|
|
||||||
err = c.rtc.SetLocalDescription(answer)
|
|
||||||
if err != nil {
|
|
||||||
_ = c.CloseWithError(xerrors.Errorf("set local description: %w", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if c.isClosed() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case <-c.closed:
|
|
||||||
return
|
|
||||||
case c.localSessionDescriptionChannel <- answer:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// The ICE transport resets when the remote description is updated.
|
|
||||||
// Adding ICE candidates before this point causes a failed connection,
|
|
||||||
// because the candidate would be lost.
|
|
||||||
c.pendingCandidatesMutex.Lock()
|
|
||||||
defer c.pendingCandidatesMutex.Unlock()
|
|
||||||
for _, pendingCandidate := range c.pendingRemoteCandidates {
|
|
||||||
hash := sha256.Sum224([]byte(pendingCandidate.Candidate))
|
|
||||||
c.opts.Logger.Debug(context.Background(), "flushing buffered remote candidate",
|
|
||||||
slog.F("hash", hash),
|
|
||||||
slog.F("length", len(pendingCandidate.Candidate)),
|
|
||||||
)
|
|
||||||
err := c.rtc.AddICECandidate(pendingCandidate)
|
|
||||||
if err != nil {
|
|
||||||
_ = c.CloseWithError(xerrors.Errorf("flush pending remote candidate: %w", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.opts.Logger.Debug(context.Background(), "flushed buffered remote candidates",
|
|
||||||
slog.F("count", len(c.pendingRemoteCandidates)),
|
|
||||||
)
|
|
||||||
c.pendingCandidatesFlushed = true
|
|
||||||
c.pendingRemoteCandidates = make([]webrtc.ICECandidateInit, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LocalCandidate returns a channel that emits when a local candidate
|
|
||||||
// needs to be exchanged with a remote connection.
|
|
||||||
func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit {
|
|
||||||
return c.localCandidateChannel
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddRemoteCandidate adds a remote candidate to the RTC connection.
|
|
||||||
func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error {
|
|
||||||
c.pendingCandidatesMutex.Lock()
|
|
||||||
defer c.pendingCandidatesMutex.Unlock()
|
|
||||||
fields := []slog.Field{
|
|
||||||
slog.F("hash", sha256.Sum224([]byte(i.Candidate))),
|
|
||||||
slog.F("length", len(i.Candidate)),
|
|
||||||
}
|
|
||||||
if !c.pendingCandidatesFlushed {
|
|
||||||
c.opts.Logger.Debug(context.Background(), "bufferring remote candidate", fields...)
|
|
||||||
c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
c.opts.Logger.Debug(context.Background(), "adding remote candidate", fields...)
|
|
||||||
return c.rtc.AddICECandidate(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// LocalSessionDescription returns a channel that emits a session description
|
|
||||||
// when one is required to be exchanged.
|
|
||||||
func (c *Conn) LocalSessionDescription() <-chan webrtc.SessionDescription {
|
|
||||||
return c.localSessionDescriptionChannel
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetConfiguration applies options to the WebRTC connection.
|
// SetConfiguration applies options to the WebRTC connection.
|
||||||
// Generally used for updating transport options, like ICE servers.
|
// Generally used for updating transport options, like ICE servers.
|
||||||
func (c *Conn) SetConfiguration(configuration webrtc.Configuration) error {
|
func (c *Conn) SetConfiguration(configuration webrtc.Configuration) error {
|
||||||
return c.rtc.SetConfiguration(configuration)
|
return c.rtc.SetConfiguration(configuration)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetRemoteSessionDescription sets the remote description for the WebRTC connection.
|
|
||||||
func (c *Conn) SetRemoteSessionDescription(sessionDescription webrtc.SessionDescription) {
|
|
||||||
if c.isClosed() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.closeMutex.Lock()
|
|
||||||
defer c.closeMutex.Unlock()
|
|
||||||
select {
|
|
||||||
case <-c.closed:
|
|
||||||
case c.remoteSessionDescriptionChannel <- sessionDescription:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept blocks waiting for a channel to be opened.
|
// Accept blocks waiting for a channel to be opened.
|
||||||
func (c *Conn) Accept(ctx context.Context) (*Channel, error) {
|
func (c *Conn) Accept(ctx context.Context) (*Channel, error) {
|
||||||
var dataChannel *webrtc.DataChannel
|
var dataChannel *webrtc.DataChannel
|
||||||
@ -528,7 +553,6 @@ func (c *Conn) CloseWithError(err error) error {
|
|||||||
} else {
|
} else {
|
||||||
c.closeError = err
|
c.closeError = err
|
||||||
}
|
}
|
||||||
close(c.closed)
|
|
||||||
|
|
||||||
if ch, _ := c.pingChannel(); ch != nil {
|
if ch, _ := c.pingChannel(); ch != nil {
|
||||||
_ = ch.closeWithError(c.closeError)
|
_ = ch.closeWithError(c.closeError)
|
||||||
@ -538,9 +562,22 @@ func (c *Conn) CloseWithError(err error) error {
|
|||||||
// closing an already closed connection isn't an issue for us.
|
// closing an already closed connection isn't an issue for us.
|
||||||
_ = c.rtc.Close()
|
_ = c.rtc.Close()
|
||||||
|
|
||||||
|
// Waiting for pion/webrtc to report closed state on both of these
|
||||||
|
// ensures no goroutine leaks.
|
||||||
|
if c.rtc.ConnectionState() != webrtc.PeerConnectionStateNew {
|
||||||
|
c.opts.Logger.Debug(context.Background(), "waiting for rtc connection close...")
|
||||||
|
<-c.closedRTC
|
||||||
|
}
|
||||||
|
if c.rtc.ICEConnectionState() != webrtc.ICEConnectionStateNew {
|
||||||
|
c.opts.Logger.Debug(context.Background(), "waiting for ice connection close...")
|
||||||
|
<-c.closedICE
|
||||||
|
}
|
||||||
|
|
||||||
// Waits for all DataChannels to exit before officially labeling as closed.
|
// Waits for all DataChannels to exit before officially labeling as closed.
|
||||||
// All logging, goroutines, and async functionality is cleaned up after this.
|
// All logging, goroutines, and async functionality is cleaned up after this.
|
||||||
c.dcClosedWaitGroup.Wait()
|
c.dcClosedWaitGroup.Wait()
|
||||||
|
|
||||||
|
close(c.closed)
|
||||||
|
c.opts.Logger.Debug(context.Background(), "closed")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -35,7 +35,7 @@ var (
|
|||||||
// In CI resources are frequently contended, so increasing this value
|
// In CI resources are frequently contended, so increasing this value
|
||||||
// results in less flakes.
|
// results in less flakes.
|
||||||
if os.Getenv("CI") == "true" {
|
if os.Getenv("CI") == "true" {
|
||||||
return 3 * time.Second
|
return time.Second
|
||||||
}
|
}
|
||||||
return 100 * time.Millisecond
|
return 100 * time.Millisecond
|
||||||
}()
|
}()
|
||||||
@ -48,14 +48,7 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
// pion/ice doesn't properly close immediately. The solution for this isn't yet known. See:
|
goleak.VerifyTestMain(m)
|
||||||
// https://github.com/pion/ice/pull/413
|
|
||||||
goleak.VerifyTestMain(m,
|
|
||||||
goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).startOnConnectionStateChangeRoutine.func1"),
|
|
||||||
goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).startOnConnectionStateChangeRoutine.func2"),
|
|
||||||
goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).taskLoop"),
|
|
||||||
goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConn(t *testing.T) {
|
func TestConn(t *testing.T) {
|
||||||
@ -64,6 +57,7 @@ func TestConn(t *testing.T) {
|
|||||||
t.Run("Ping", func(t *testing.T) {
|
t.Run("Ping", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
client, server, _ := createPair(t)
|
client, server, _ := createPair(t)
|
||||||
|
exchange(client, server)
|
||||||
_, err := client.Ping()
|
_, err := client.Ping()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
_, err = server.Ping()
|
_, err = server.Ping()
|
||||||
@ -72,7 +66,8 @@ func TestConn(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("PingNetworkOffline", func(t *testing.T) {
|
t.Run("PingNetworkOffline", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
_, server, wan := createPair(t)
|
client, server, wan := createPair(t)
|
||||||
|
exchange(client, server)
|
||||||
_, err := server.Ping()
|
_, err := server.Ping()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = wan.Stop()
|
err = wan.Stop()
|
||||||
@ -83,7 +78,8 @@ func TestConn(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("PingReconnect", func(t *testing.T) {
|
t.Run("PingReconnect", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
_, server, wan := createPair(t)
|
client, server, wan := createPair(t)
|
||||||
|
exchange(client, server)
|
||||||
_, err := server.Ping()
|
_, err := server.Ping()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
// Create a channel that closes on disconnect.
|
// Create a channel that closes on disconnect.
|
||||||
@ -104,6 +100,7 @@ func TestConn(t *testing.T) {
|
|||||||
t.Run("Accept", func(t *testing.T) {
|
t.Run("Accept", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
client, server, _ := createPair(t)
|
client, server, _ := createPair(t)
|
||||||
|
exchange(client, server)
|
||||||
cch, err := client.Dial(context.Background(), "hello", &peer.ChannelOptions{})
|
cch, err := client.Dial(context.Background(), "hello", &peer.ChannelOptions{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -119,6 +116,7 @@ func TestConn(t *testing.T) {
|
|||||||
t.Run("AcceptNetworkOffline", func(t *testing.T) {
|
t.Run("AcceptNetworkOffline", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
client, server, wan := createPair(t)
|
client, server, wan := createPair(t)
|
||||||
|
exchange(client, server)
|
||||||
cch, err := client.Dial(context.Background(), "hello", &peer.ChannelOptions{})
|
cch, err := client.Dial(context.Background(), "hello", &peer.ChannelOptions{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
sch, err := server.Accept(context.Background())
|
sch, err := server.Accept(context.Background())
|
||||||
@ -135,20 +133,23 @@ func TestConn(t *testing.T) {
|
|||||||
t.Run("Buffering", func(t *testing.T) {
|
t.Run("Buffering", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
client, server, _ := createPair(t)
|
client, server, _ := createPair(t)
|
||||||
|
exchange(client, server)
|
||||||
cch, err := client.Dial(context.Background(), "hello", &peer.ChannelOptions{})
|
cch, err := client.Dial(context.Background(), "hello", &peer.ChannelOptions{})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
sch, err := server.Accept(context.Background())
|
sch, err := server.Accept(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer sch.Close()
|
defer sch.Close()
|
||||||
go func() {
|
go func() {
|
||||||
|
bytes := make([]byte, 4096)
|
||||||
for i := 0; i < 1024; i++ {
|
for i := 0; i < 1024; i++ {
|
||||||
_, err := cch.Write(make([]byte, 4096))
|
_, err := cch.Write(bytes)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
_ = cch.Close()
|
_ = cch.Close()
|
||||||
}()
|
}()
|
||||||
|
bytes := make([]byte, 4096)
|
||||||
for {
|
for {
|
||||||
_, err = sch.Read(make([]byte, 4096))
|
_, err = sch.Read(bytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
require.ErrorIs(t, err, peer.ErrClosed)
|
require.ErrorIs(t, err, peer.ErrClosed)
|
||||||
break
|
break
|
||||||
@ -159,6 +160,7 @@ func TestConn(t *testing.T) {
|
|||||||
t.Run("NetConn", func(t *testing.T) {
|
t.Run("NetConn", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
client, server, _ := createPair(t)
|
client, server, _ := createPair(t)
|
||||||
|
exchange(client, server)
|
||||||
srv, err := net.Listen("tcp", "127.0.0.1:0")
|
srv, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
@ -211,6 +213,7 @@ func TestConn(t *testing.T) {
|
|||||||
t.Run("CloseBeforeNegotiate", func(t *testing.T) {
|
t.Run("CloseBeforeNegotiate", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
client, server, _ := createPair(t)
|
client, server, _ := createPair(t)
|
||||||
|
exchange(client, server)
|
||||||
err := client.Close()
|
err := client.Close()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = server.Close()
|
err = server.Close()
|
||||||
@ -230,6 +233,7 @@ func TestConn(t *testing.T) {
|
|||||||
t.Run("PingConcurrent", func(t *testing.T) {
|
t.Run("PingConcurrent", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
client, server, _ := createPair(t)
|
client, server, _ := createPair(t)
|
||||||
|
exchange(client, server)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(2)
|
wg.Add(2)
|
||||||
go func() {
|
go func() {
|
||||||
@ -244,6 +248,18 @@ func TestConn(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("CandidateBeforeSessionDescription", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
client, server, _ := createPair(t)
|
||||||
|
server.SetRemoteSessionDescription(<-client.LocalSessionDescription())
|
||||||
|
sdp := <-server.LocalSessionDescription()
|
||||||
|
client.AddRemoteCandidate(<-server.LocalCandidate())
|
||||||
|
client.SetRemoteSessionDescription(sdp)
|
||||||
|
server.AddRemoteCandidate(<-client.LocalCandidate())
|
||||||
|
_, err := client.Ping()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.Router) {
|
func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.Router) {
|
||||||
@ -271,7 +287,7 @@ func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.R
|
|||||||
c1SettingEngine.SetVNet(c1Net)
|
c1SettingEngine.SetVNet(c1Net)
|
||||||
c1SettingEngine.SetPrflxAcceptanceMinWait(0)
|
c1SettingEngine.SetPrflxAcceptanceMinWait(0)
|
||||||
c1SettingEngine.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval)
|
c1SettingEngine.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval)
|
||||||
channel1, err := peer.Client([]webrtc.ICEServer{}, &peer.ConnOptions{
|
channel1, err := peer.Client([]webrtc.ICEServer{{}}, &peer.ConnOptions{
|
||||||
SettingEngine: c1SettingEngine,
|
SettingEngine: c1SettingEngine,
|
||||||
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
|
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
|
||||||
})
|
})
|
||||||
@ -283,7 +299,7 @@ func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.R
|
|||||||
c2SettingEngine.SetVNet(c2Net)
|
c2SettingEngine.SetVNet(c2Net)
|
||||||
c2SettingEngine.SetPrflxAcceptanceMinWait(0)
|
c2SettingEngine.SetPrflxAcceptanceMinWait(0)
|
||||||
c2SettingEngine.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval)
|
c2SettingEngine.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval)
|
||||||
channel2, err := peer.Server([]webrtc.ICEServer{}, &peer.ConnOptions{
|
channel2, err := peer.Server([]webrtc.ICEServer{{}}, &peer.ConnOptions{
|
||||||
SettingEngine: c2SettingEngine,
|
SettingEngine: c2SettingEngine,
|
||||||
Logger: slogtest.Make(t, nil).Named("server").Leveled(slog.LevelDebug),
|
Logger: slogtest.Make(t, nil).Named("server").Leveled(slog.LevelDebug),
|
||||||
})
|
})
|
||||||
@ -298,31 +314,33 @@ func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.R
|
|||||||
_ = wan.Stop()
|
_ = wan.Stop()
|
||||||
})
|
})
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case c := <-channel2.LocalCandidate():
|
|
||||||
_ = channel1.AddRemoteCandidate(c)
|
|
||||||
case c := <-channel2.LocalSessionDescription():
|
|
||||||
channel1.SetRemoteSessionDescription(c)
|
|
||||||
case <-channel2.Closed():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case c := <-channel1.LocalCandidate():
|
|
||||||
_ = channel2.AddRemoteCandidate(c)
|
|
||||||
case c := <-channel1.LocalSessionDescription():
|
|
||||||
channel2.SetRemoteSessionDescription(c)
|
|
||||||
case <-channel1.Closed():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return channel1, channel2, wan
|
return channel1, channel2, wan
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func exchange(client, server *peer.Conn) {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case c := <-server.LocalCandidate():
|
||||||
|
client.AddRemoteCandidate(c)
|
||||||
|
case c := <-server.LocalSessionDescription():
|
||||||
|
client.SetRemoteSessionDescription(c)
|
||||||
|
case <-server.Closed():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case c := <-client.LocalCandidate():
|
||||||
|
server.AddRemoteCandidate(c)
|
||||||
|
case c := <-client.LocalSessionDescription():
|
||||||
|
server.SetRemoteSessionDescription(c)
|
||||||
|
case <-client.Closed():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
@ -95,13 +95,9 @@ func Dial(stream proto.DRPCPeerBroker_NegotiateConnectionClient, iceServers []we
|
|||||||
SDP: serverToClientMessage.GetAnswer().Sdp,
|
SDP: serverToClientMessage.GetAnswer().Sdp,
|
||||||
})
|
})
|
||||||
case serverToClientMessage.GetIceCandidate() != "":
|
case serverToClientMessage.GetIceCandidate() != "":
|
||||||
err = peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{
|
peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{
|
||||||
Candidate: serverToClientMessage.GetIceCandidate(),
|
Candidate: serverToClientMessage.GetIceCandidate(),
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
_ = peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %w", err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
_ = peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(serverToClientMessage).String()))
|
_ = peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(serverToClientMessage).String()))
|
||||||
return
|
return
|
||||||
|
@ -9,6 +9,10 @@ import (
|
|||||||
"go.uber.org/goleak"
|
"go.uber.org/goleak"
|
||||||
"storj.io/drpc/drpcconn"
|
"storj.io/drpc/drpcconn"
|
||||||
|
|
||||||
|
"cdr.dev/slog"
|
||||||
|
"cdr.dev/slog/sloggers/slogtest"
|
||||||
|
|
||||||
|
"github.com/coder/coder/peer"
|
||||||
"github.com/coder/coder/peerbroker"
|
"github.com/coder/coder/peerbroker"
|
||||||
"github.com/coder/coder/peerbroker/proto"
|
"github.com/coder/coder/peerbroker/proto"
|
||||||
"github.com/coder/coder/provisionersdk"
|
"github.com/coder/coder/provisionersdk"
|
||||||
@ -28,7 +32,9 @@ func TestDial(t *testing.T) {
|
|||||||
defer client.Close()
|
defer client.Close()
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
listener, err := peerbroker.Listen(server, nil)
|
listener, err := peerbroker.Listen(server, &peer.ConnOptions{
|
||||||
|
Logger: slogtest.Make(t, nil).Named("server").Leveled(slog.LevelDebug),
|
||||||
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
api := proto.NewDRPCPeerBrokerClient(drpcconn.New(client))
|
api := proto.NewDRPCPeerBrokerClient(drpcconn.New(client))
|
||||||
@ -36,7 +42,9 @@ func TestDial(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
clientConn, err := peerbroker.Dial(stream, []webrtc.ICEServer{{
|
clientConn, err := peerbroker.Dial(stream, []webrtc.ICEServer{{
|
||||||
URLs: []string{"stun:stun.l.google.com:19302"},
|
URLs: []string{"stun:stun.l.google.com:19302"},
|
||||||
}}, nil)
|
}}, &peer.ConnOptions{
|
||||||
|
Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug),
|
||||||
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer clientConn.Close()
|
defer clientConn.Close()
|
||||||
|
|
||||||
|
@ -179,12 +179,9 @@ func (b *peerBrokerService) NegotiateConnection(stream proto.DRPCPeerBroker_Nego
|
|||||||
return peerConn.CloseWithError(xerrors.Errorf("set ice configuration: %w", err))
|
return peerConn.CloseWithError(xerrors.Errorf("set ice configuration: %w", err))
|
||||||
}
|
}
|
||||||
case clientToServerMessage.GetIceCandidate() != "":
|
case clientToServerMessage.GetIceCandidate() != "":
|
||||||
err = peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{
|
peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{
|
||||||
Candidate: clientToServerMessage.GetIceCandidate(),
|
Candidate: clientToServerMessage.GetIceCandidate(),
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
return peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %w", err))
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
return peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(clientToServerMessage).String()))
|
return peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(clientToServerMessage).String()))
|
||||||
}
|
}
|
||||||
|
@ -45,4 +45,4 @@ service PeerBroker {
|
|||||||
//
|
//
|
||||||
// See: https://davekilian.com/webrtc-the-hard-way.html
|
// See: https://davekilian.com/webrtc-the-hard-way.html
|
||||||
rpc NegotiateConnection(stream NegotiateConnection.ClientToServer) returns (stream NegotiateConnection.ServerToClient);
|
rpc NegotiateConnection(stream NegotiateConnection.ClientToServer) returns (stream NegotiateConnection.ServerToClient);
|
||||||
}
|
}
|
@ -9,27 +9,16 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/go-version"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"storj.io/drpc/drpcconn"
|
"storj.io/drpc/drpcconn"
|
||||||
|
|
||||||
"github.com/coder/coder/provisionersdk"
|
"github.com/coder/coder/provisionersdk"
|
||||||
"github.com/coder/coder/provisionersdk/proto"
|
"github.com/coder/coder/provisionersdk/proto"
|
||||||
|
|
||||||
"github.com/hashicorp/hc-install/product"
|
|
||||||
"github.com/hashicorp/hc-install/releases"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestProvision(t *testing.T) {
|
func TestProvision(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
installer := &releases.ExactVersion{
|
|
||||||
Product: product.Terraform,
|
|
||||||
Version: version.Must(version.NewVersion("1.1.2")),
|
|
||||||
}
|
|
||||||
execPath, err := installer.Install(context.Background())
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
client, server := provisionersdk.TransportPipe()
|
client, server := provisionersdk.TransportPipe()
|
||||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
@ -42,7 +31,6 @@ func TestProvision(t *testing.T) {
|
|||||||
ServeOptions: &provisionersdk.ServeOptions{
|
ServeOptions: &provisionersdk.ServeOptions{
|
||||||
Transport: server,
|
Transport: server,
|
||||||
},
|
},
|
||||||
BinaryPath: execPath,
|
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}()
|
}()
|
||||||
|
Reference in New Issue
Block a user