mirror of
https://github.com/coder/coder.git
synced 2025-07-21 01:28:49 +00:00
fix: Close peer negotiate mutex if we haven't negotiated (#1774)
Closes #1706 and #1644.
This commit is contained in:
@ -6,6 +6,7 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pion/datachannel"
|
||||
"github.com/pion/webrtc/v3"
|
||||
@ -244,6 +245,14 @@ func (c *Channel) Write(bytes []byte) (n int, err error) {
|
||||
if c.dc.BufferedAmount()+uint64(len(bytes)) >= maxBufferedAmount {
|
||||
<-c.sendMore
|
||||
}
|
||||
|
||||
// There's an obvious race-condition here. This is an edge-case, as
|
||||
// most-frequently data won't be pooled so synchronously, but is
|
||||
// definitely possible.
|
||||
//
|
||||
// See: https://github.com/pion/sctp/issues/181
|
||||
time.Sleep(time.Microsecond)
|
||||
|
||||
return c.rwc.Write(bytes)
|
||||
}
|
||||
|
||||
|
25
peer/conn.go
25
peer/conn.go
@ -73,6 +73,7 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp
|
||||
dcFailedChannel: make(chan struct{}),
|
||||
localCandidateChannel: make(chan webrtc.ICECandidateInit),
|
||||
localSessionDescriptionChannel: make(chan webrtc.SessionDescription, 1),
|
||||
negotiated: make(chan struct{}),
|
||||
remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription, 1),
|
||||
settingEngine: opts.SettingEngine,
|
||||
}
|
||||
@ -124,8 +125,7 @@ type Conn struct {
|
||||
localSessionDescriptionChannel chan webrtc.SessionDescription
|
||||
remoteSessionDescriptionChannel chan webrtc.SessionDescription
|
||||
|
||||
negotiateMutex sync.Mutex
|
||||
hasNegotiated bool
|
||||
negotiated chan struct{}
|
||||
|
||||
loggerValue atomic.Value
|
||||
settingEngine webrtc.SettingEngine
|
||||
@ -152,9 +152,6 @@ func (c *Conn) logger() slog.Logger {
|
||||
}
|
||||
|
||||
func (c *Conn) init() error {
|
||||
// The negotiation needed callback can take a little bit to execute!
|
||||
c.negotiateMutex.Lock()
|
||||
|
||||
c.rtc.OnNegotiationNeeded(c.negotiate)
|
||||
c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) {
|
||||
c.closedICEMutex.Lock()
|
||||
@ -290,11 +287,13 @@ func (c *Conn) negotiate() {
|
||||
c.logger().Debug(context.Background(), "negotiating")
|
||||
// ICE candidates cannot be added until SessionDescriptions have been
|
||||
// exchanged between peers.
|
||||
if c.hasNegotiated {
|
||||
c.negotiateMutex.Lock()
|
||||
}
|
||||
c.hasNegotiated = true
|
||||
defer c.negotiateMutex.Unlock()
|
||||
defer func() {
|
||||
select {
|
||||
case <-c.negotiated:
|
||||
default:
|
||||
close(c.negotiated)
|
||||
}
|
||||
}()
|
||||
|
||||
if c.offerer {
|
||||
offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{})
|
||||
@ -368,8 +367,10 @@ func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) {
|
||||
// This must occur in a goroutine to allow the SessionDescriptions
|
||||
// to be exchanged first.
|
||||
go func() {
|
||||
c.negotiateMutex.Lock()
|
||||
defer c.negotiateMutex.Unlock()
|
||||
select {
|
||||
case <-c.closed:
|
||||
case <-c.negotiated:
|
||||
}
|
||||
if c.isClosed() {
|
||||
return
|
||||
}
|
||||
|
@ -59,9 +59,7 @@ func TestMain(m *testing.M) {
|
||||
}
|
||||
|
||||
func TestConn(t *testing.T) {
|
||||
t.Skip("known flake -- https://github.com/coder/coder/issues/1644")
|
||||
t.Parallel()
|
||||
|
||||
t.Run("Ping", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
client, server, _ := createPair(t)
|
||||
|
Reference in New Issue
Block a user