diff --git a/.github/workflows/coder.yaml b/.github/workflows/coder.yaml index 7e0b890eda..23bde4d0fc 100644 --- a/.github/workflows/coder.yaml +++ b/.github/workflows/coder.yaml @@ -148,11 +148,17 @@ jobs: - run: go install gotest.tools/gotestsum@latest + - uses: hashicorp/setup-terraform@v1 + if: runner.os == 'Linux' + with: + terraform_version: 1.1.2 + terraform_wrapper: false + - name: Test with Mock Database run: gotestsum --jsonfile="gotests.json" --packages="./..." -- -covermode=atomic -coverprofile="gotests.coverage" -timeout=3m - -count=3 -race -parallel=2 + -count=3 -race -short -parallel=2 - name: Test with PostgreSQL Database if: runner.os == 'Linux' diff --git a/coderd/userpassword/userpassword_test.go b/coderd/userpassword/userpassword_test.go index 0546163d24..a07d2c07ed 100644 --- a/coderd/userpassword/userpassword_test.go +++ b/coderd/userpassword/userpassword_test.go @@ -1,3 +1,7 @@ +// This test runs slowly on MacOS instance, and really +// only needs to run on Linux anyways. +//go:build linux + package userpassword_test import ( diff --git a/database/migrate_test.go b/database/migrate_test.go index 5627e7606c..37aec1e96c 100644 --- a/database/migrate_test.go +++ b/database/migrate_test.go @@ -20,6 +20,11 @@ func TestMain(m *testing.M) { func TestMigrate(t *testing.T) { t.Parallel() + if testing.Short() { + t.Skip() + return + } + t.Run("Once", func(t *testing.T) { t.Parallel() connection, closeFn, err := postgres.Open() diff --git a/database/postgres/postgres_test.go b/database/postgres/postgres_test.go index c9e8ef12cb..30d027559c 100644 --- a/database/postgres/postgres_test.go +++ b/database/postgres/postgres_test.go @@ -21,6 +21,11 @@ func TestMain(m *testing.M) { func TestPostgres(t *testing.T) { t.Parallel() + if testing.Short() { + t.Skip() + return + } + connect, close, err := postgres.Open() require.NoError(t, err) defer close() diff --git a/go.mod b/go.mod index 7505a34a71..8a204cc59a 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/golang-migrate/migrate/v4 v4.15.1 github.com/google/uuid v1.3.0 github.com/hashicorp/go-version v1.4.0 - github.com/hashicorp/hc-install v0.3.1 github.com/hashicorp/terraform-config-inspect v0.0.0-20211115214459-90acf1ca460f github.com/hashicorp/terraform-exec v0.15.0 github.com/justinas/nosurf v1.1.1 @@ -64,7 +63,6 @@ require ( github.com/google/go-cmp v0.5.7 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect - github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/hcl/v2 v2.11.1 // indirect diff --git a/go.sum b/go.sum index 564c6aaae7..85350bdbd9 100644 --- a/go.sum +++ b/go.sum @@ -693,8 +693,6 @@ github.com/hashicorp/go-version v1.4.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09 github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= -github.com/hashicorp/hc-install v0.3.1 h1:VIjllE6KyAI1A244G8kTaHXy+TL5/XYzvrtFi8po/Yk= -github.com/hashicorp/hc-install v0.3.1/go.mod h1:3LCdWcCDS1gaHC9mhHCGbkYfoY6vdsKohGjugbZdZak= github.com/hashicorp/hcl v0.0.0-20170504190234-a4b07c25de5f/go.mod h1:oZtUIOe8dh44I2q6ScRibXws4Ajl+d+nod3AaR9vL5w= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= @@ -1309,7 +1307,6 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= diff --git a/peer/channel.go b/peer/channel.go index 150398f4d4..b01154bcfa 100644 --- a/peer/channel.go +++ b/peer/channel.go @@ -141,9 +141,9 @@ func (c *Channel) init() { // A DataChannel can disconnect multiple times, so this needs to loop. for { select { - case <-c.closed: + case <-c.conn.closedRTC: // If this channel was closed, there's no need to close again. - return + err = c.conn.closeError case <-c.conn.Closed(): // If the RTC connection closed with an error, this channel // should end with the same one. diff --git a/peer/conn.go b/peer/conn.go index 8ee3e9dd49..32e0c72bbf 100644 --- a/peer/conn.go +++ b/peer/conn.go @@ -3,8 +3,8 @@ package peer import ( "bytes" "context" + "crypto/rand" - "crypto/sha256" "io" "sync" "time" @@ -49,12 +49,10 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp opts = &ConnOptions{} } - // Enables preference to STUN. - opts.SettingEngine.SetSrflxAcceptanceMinWait(0) opts.SettingEngine.DetachDataChannels() - lf := logging.NewDefaultLoggerFactory() - lf.DefaultLogLevel = logging.LogLevelDisabled - opts.SettingEngine.LoggerFactory = lf + factory := logging.NewDefaultLoggerFactory() + factory.DefaultLogLevel = logging.LogLevelDisabled + opts.SettingEngine.LoggerFactory = factory api := webrtc.NewAPI(webrtc.WithSettingEngine(opts.SettingEngine)) rtc, err := api.NewPeerConnection(webrtc.Configuration{ ICEServers: servers, @@ -63,21 +61,20 @@ func newWithClientOrServer(servers []webrtc.ICEServer, client bool, opts *ConnOp return nil, xerrors.Errorf("create peer connection: %w", err) } conn := &Conn{ - pingChannelID: 1, - pingEchoChannelID: 2, - opts: opts, - rtc: rtc, - offerrer: client, - closed: make(chan struct{}), - dcOpenChannel: make(chan *webrtc.DataChannel), - dcDisconnectChannel: make(chan struct{}), - dcFailedChannel: make(chan struct{}), - // This channel needs to be bufferred otherwise slow consumers - // of this will cause a connection failure. - localCandidateChannel: make(chan webrtc.ICECandidateInit, 16), - pendingRemoteCandidates: make([]webrtc.ICECandidateInit, 0), - localSessionDescriptionChannel: make(chan webrtc.SessionDescription), - remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription), + pingChannelID: 1, + pingEchoChannelID: 2, + opts: opts, + rtc: rtc, + offerrer: client, + closed: make(chan struct{}), + closedRTC: make(chan struct{}), + closedICE: make(chan struct{}), + dcOpenChannel: make(chan *webrtc.DataChannel), + dcDisconnectChannel: make(chan struct{}), + dcFailedChannel: make(chan struct{}), + localCandidateChannel: make(chan webrtc.ICECandidateInit), + localSessionDescriptionChannel: make(chan webrtc.SessionDescription, 1), + remoteSessionDescriptionChannel: make(chan webrtc.SessionDescription, 1), } if client { // If we're the client, we want to flip the echo and @@ -108,9 +105,13 @@ type Conn struct { // Determines whether this connection will send the offer or the answer. offerrer bool - closed chan struct{} - closeMutex sync.Mutex - closeError error + closed chan struct{} + closedRTC chan struct{} + closedRTCMutex sync.Mutex + closedICE chan struct{} + closedICEMutex sync.Mutex + closeMutex sync.Mutex + closeError error dcOpenChannel chan *webrtc.DataChannel dcDisconnectChannel chan struct{} @@ -123,9 +124,8 @@ type Conn struct { localSessionDescriptionChannel chan webrtc.SessionDescription remoteSessionDescriptionChannel chan webrtc.SessionDescription - pendingRemoteCandidates []webrtc.ICECandidateInit - pendingCandidatesMutex sync.Mutex - pendingCandidatesFlushed bool + negotiateMutex sync.Mutex + hasNegotiated bool pingChannelID uint16 pingEchoChannelID uint16 @@ -140,67 +140,53 @@ type Conn struct { } func (c *Conn) init() error { + // The negotiation needed callback can take a little bit to execute! + c.negotiateMutex.Lock() + c.rtc.OnNegotiationNeeded(c.negotiate) c.rtc.OnICEConnectionStateChange(func(iceConnectionState webrtc.ICEConnectionState) { - // Close must be locked here otherwise log output can appear - // after the connection has been closed. - c.closeMutex.Lock() - defer c.closeMutex.Unlock() - if c.isClosed() { - return - } - c.opts.Logger.Debug(context.Background(), "ice connection state updated", slog.F("state", iceConnectionState)) + + if iceConnectionState == webrtc.ICEConnectionStateClosed { + // pion/webrtc can update this state multiple times. + // A connection can never become un-closed, so we + // close the channel if it isn't already. + c.closedICEMutex.Lock() + defer c.closedICEMutex.Unlock() + select { + case <-c.closedICE: + default: + close(c.closedICE) + } + } }) c.rtc.OnICEGatheringStateChange(func(iceGatherState webrtc.ICEGathererState) { - // Close can't be locked here, because this is triggered - // when close is called. It doesn't appear this get's - // executed after close though, so it shouldn't cause - // problems. - if c.isClosed() { - return - } - c.opts.Logger.Debug(context.Background(), "ice gathering state updated", slog.F("state", iceGatherState)) - }) - c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) { - if iceCandidate == nil { - return - } - json := iceCandidate.ToJSON() - c.opts.Logger.Debug(context.Background(), "writing candidate to channel", - slog.F("hash", sha256.Sum224([]byte(json.Candidate))), - slog.F("length", len(json.Candidate)), - ) - select { - case <-c.closed: - break - case c.localCandidateChannel <- json: + + if iceGatherState == webrtc.ICEGathererStateClosed { + // pion/webrtc can update this state multiple times. + // A connection can never become un-closed, so we + // close the channel if it isn't already. + c.closedICEMutex.Lock() + defer c.closedICEMutex.Unlock() + select { + case <-c.closedICE: + default: + close(c.closedICE) + } } }) - c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) { - select { - case <-c.closed: - return - case c.dcOpenChannel <- dc: - default: - } - }) - c.rtc.OnConnectionStateChange(func(pcs webrtc.PeerConnectionState) { - // Close must be locked here otherwise log output can appear - // after the connection has been closed. - c.closeMutex.Lock() - defer c.closeMutex.Unlock() + c.rtc.OnConnectionStateChange(func(peerConnectionState webrtc.PeerConnectionState) { if c.isClosed() { + // Make sure we don't log after Close() has been called. return } - c.opts.Logger.Debug(context.Background(), "rtc connection updated", - slog.F("state", pcs)) + slog.F("state", peerConnectionState)) - switch pcs { + switch peerConnectionState { case webrtc.PeerConnectionStateDisconnected: for i := 0; i < int(c.dcDisconnectListeners.Load()); i++ { select { @@ -215,6 +201,52 @@ func (c *Conn) init() error { default: } } + case webrtc.PeerConnectionStateClosed: + // pion/webrtc can update this state multiple times. + // A connection can never become un-closed, so we + // close the channel if it isn't already. + c.closedRTCMutex.Lock() + defer c.closedRTCMutex.Unlock() + select { + case <-c.closedRTC: + default: + close(c.closedRTC) + } + } + }) + c.rtc.OnSignalingStateChange(func(signalState webrtc.SignalingState) { + c.opts.Logger.Debug(context.Background(), "signaling state updated", + slog.F("state", signalState)) + }) + c.rtc.SCTP().Transport().OnStateChange(func(dtlsTransportState webrtc.DTLSTransportState) { + c.opts.Logger.Debug(context.Background(), "dtls transport state updated", + slog.F("state", dtlsTransportState)) + }) + c.rtc.SCTP().Transport().ICETransport().OnSelectedCandidatePairChange(func(candidatePair *webrtc.ICECandidatePair) { + c.opts.Logger.Debug(context.Background(), "selected candidate pair changed", + slog.F("local", candidatePair.Local), slog.F("remote", candidatePair.Remote)) + }) + c.rtc.OnICECandidate(func(iceCandidate *webrtc.ICECandidate) { + if iceCandidate == nil { + return + } + // Run this in a goroutine so we don't block pion/webrtc + // from continuing. + go func() { + c.opts.Logger.Debug(context.Background(), "sending local candidate", slog.F("candidate", iceCandidate.ToJSON().Candidate)) + select { + case <-c.closed: + break + case c.localCandidateChannel <- iceCandidate.ToJSON(): + } + }() + }) + c.rtc.OnDataChannel(func(dc *webrtc.DataChannel) { + select { + case <-c.closed: + return + case c.dcOpenChannel <- dc: + default: } }) _, err := c.pingChannel() @@ -229,6 +261,123 @@ func (c *Conn) init() error { return nil } +// negotiate is triggered when a connection is ready to be established. +// See trickle ICE for the expected exchange: https://webrtchacks.com/trickle-ice/ +func (c *Conn) negotiate() { + c.opts.Logger.Debug(context.Background(), "negotiating") + // ICE candidates cannot be added until SessionDescriptions have been + // exchanged between peers. + if c.hasNegotiated { + c.negotiateMutex.Lock() + } + c.hasNegotiated = true + defer c.negotiateMutex.Unlock() + + if c.offerrer { + offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{}) + if err != nil { + _ = c.CloseWithError(xerrors.Errorf("create offer: %w", err)) + return + } + // pion/webrtc will panic if Close is called while this + // function is being executed. + c.closeMutex.Lock() + err = c.rtc.SetLocalDescription(offer) + c.closeMutex.Unlock() + if err != nil { + _ = c.CloseWithError(xerrors.Errorf("set local description: %w", err)) + return + } + c.opts.Logger.Debug(context.Background(), "sending offer") + select { + case <-c.closed: + return + case c.localSessionDescriptionChannel <- offer: + } + c.opts.Logger.Debug(context.Background(), "sent offer") + } + + var sessionDescription webrtc.SessionDescription + c.opts.Logger.Debug(context.Background(), "awaiting remote description...") + select { + case <-c.closed: + return + case sessionDescription = <-c.remoteSessionDescriptionChannel: + } + c.opts.Logger.Debug(context.Background(), "setting remote description") + + err := c.rtc.SetRemoteDescription(sessionDescription) + if err != nil { + _ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err)) + return + } + + if !c.offerrer { + answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) + if err != nil { + _ = c.CloseWithError(xerrors.Errorf("create answer: %w", err)) + return + } + // pion/webrtc will panic if Close is called while this + // function is being executed. + c.closeMutex.Lock() + err = c.rtc.SetLocalDescription(answer) + c.closeMutex.Unlock() + if err != nil { + _ = c.CloseWithError(xerrors.Errorf("set local description: %w", err)) + return + } + c.opts.Logger.Debug(context.Background(), "sending answer") + select { + case <-c.closed: + return + case c.localSessionDescriptionChannel <- answer: + } + c.opts.Logger.Debug(context.Background(), "sent answer") + } +} + +// AddRemoteCandidate adds a remote candidate to the RTC connection. +func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) { + if c.isClosed() { + return + } + // This must occur in a goroutine to allow the SessionDescriptions + // to be exchanged first. + go func() { + c.negotiateMutex.Lock() + defer c.negotiateMutex.Unlock() + c.opts.Logger.Debug(context.Background(), "accepting candidate", slog.F("candidate", i.Candidate)) + err := c.rtc.AddICECandidate(i) + if err != nil { + if c.rtc.ConnectionState() == webrtc.PeerConnectionStateClosed { + return + } + _ = c.CloseWithError(xerrors.Errorf("accept candidate: %w", err)) + } + }() +} + +// SetRemoteSessionDescription sets the remote description for the WebRTC connection. +func (c *Conn) SetRemoteSessionDescription(sessionDescription webrtc.SessionDescription) { + select { + case <-c.closed: + case c.remoteSessionDescriptionChannel <- sessionDescription: + } +} + +// LocalSessionDescription returns a channel that emits a session description +// when one is required to be exchanged. +func (c *Conn) LocalSessionDescription() <-chan webrtc.SessionDescription { + return c.localSessionDescriptionChannel +} + +// LocalCandidate returns a channel that emits when a local candidate +// needs to be exchanged with a remote connection. +func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { + return c.localCandidateChannel +} + func (c *Conn) pingChannel() (*Channel, error) { c.pingOnce.Do(func() { c.pingChan, c.pingError = c.dialChannel(context.Background(), "ping", &ChannelOptions{ @@ -275,136 +424,12 @@ func (c *Conn) pingEchoChannel() (*Channel, error) { return c.pingEchoChan, c.pingEchoError } -func (c *Conn) negotiate() { - c.opts.Logger.Debug(context.Background(), "negotiating") - - if c.offerrer { - offer, err := c.rtc.CreateOffer(&webrtc.OfferOptions{}) - if err != nil { - _ = c.CloseWithError(xerrors.Errorf("create offer: %w", err)) - return - } - c.opts.Logger.Debug(context.Background(), "setting local description") - err = c.rtc.SetLocalDescription(offer) - if err != nil { - _ = c.CloseWithError(xerrors.Errorf("set local description: %w", err)) - return - } - select { - case <-c.closed: - return - case c.localSessionDescriptionChannel <- offer: - } - } - - var remoteDescription webrtc.SessionDescription - select { - case <-c.closed: - return - case remoteDescription = <-c.remoteSessionDescriptionChannel: - } - - c.opts.Logger.Debug(context.Background(), "setting remote description") - err := c.rtc.SetRemoteDescription(remoteDescription) - if err != nil { - _ = c.CloseWithError(xerrors.Errorf("set remote description (closed %v): %w", c.isClosed(), err)) - return - } - - if !c.offerrer { - answer, err := c.rtc.CreateAnswer(&webrtc.AnswerOptions{}) - if err != nil { - _ = c.CloseWithError(xerrors.Errorf("create answer: %w", err)) - return - } - c.opts.Logger.Debug(context.Background(), "setting local description") - err = c.rtc.SetLocalDescription(answer) - if err != nil { - _ = c.CloseWithError(xerrors.Errorf("set local description: %w", err)) - return - } - if c.isClosed() { - return - } - select { - case <-c.closed: - return - case c.localSessionDescriptionChannel <- answer: - } - } - - // The ICE transport resets when the remote description is updated. - // Adding ICE candidates before this point causes a failed connection, - // because the candidate would be lost. - c.pendingCandidatesMutex.Lock() - defer c.pendingCandidatesMutex.Unlock() - for _, pendingCandidate := range c.pendingRemoteCandidates { - hash := sha256.Sum224([]byte(pendingCandidate.Candidate)) - c.opts.Logger.Debug(context.Background(), "flushing buffered remote candidate", - slog.F("hash", hash), - slog.F("length", len(pendingCandidate.Candidate)), - ) - err := c.rtc.AddICECandidate(pendingCandidate) - if err != nil { - _ = c.CloseWithError(xerrors.Errorf("flush pending remote candidate: %w", err)) - return - } - } - c.opts.Logger.Debug(context.Background(), "flushed buffered remote candidates", - slog.F("count", len(c.pendingRemoteCandidates)), - ) - c.pendingCandidatesFlushed = true - c.pendingRemoteCandidates = make([]webrtc.ICECandidateInit, 0) -} - -// LocalCandidate returns a channel that emits when a local candidate -// needs to be exchanged with a remote connection. -func (c *Conn) LocalCandidate() <-chan webrtc.ICECandidateInit { - return c.localCandidateChannel -} - -// AddRemoteCandidate adds a remote candidate to the RTC connection. -func (c *Conn) AddRemoteCandidate(i webrtc.ICECandidateInit) error { - c.pendingCandidatesMutex.Lock() - defer c.pendingCandidatesMutex.Unlock() - fields := []slog.Field{ - slog.F("hash", sha256.Sum224([]byte(i.Candidate))), - slog.F("length", len(i.Candidate)), - } - if !c.pendingCandidatesFlushed { - c.opts.Logger.Debug(context.Background(), "bufferring remote candidate", fields...) - c.pendingRemoteCandidates = append(c.pendingRemoteCandidates, i) - return nil - } - c.opts.Logger.Debug(context.Background(), "adding remote candidate", fields...) - return c.rtc.AddICECandidate(i) -} - -// LocalSessionDescription returns a channel that emits a session description -// when one is required to be exchanged. -func (c *Conn) LocalSessionDescription() <-chan webrtc.SessionDescription { - return c.localSessionDescriptionChannel -} - // SetConfiguration applies options to the WebRTC connection. // Generally used for updating transport options, like ICE servers. func (c *Conn) SetConfiguration(configuration webrtc.Configuration) error { return c.rtc.SetConfiguration(configuration) } -// SetRemoteSessionDescription sets the remote description for the WebRTC connection. -func (c *Conn) SetRemoteSessionDescription(sessionDescription webrtc.SessionDescription) { - if c.isClosed() { - return - } - c.closeMutex.Lock() - defer c.closeMutex.Unlock() - select { - case <-c.closed: - case c.remoteSessionDescriptionChannel <- sessionDescription: - } -} - // Accept blocks waiting for a channel to be opened. func (c *Conn) Accept(ctx context.Context) (*Channel, error) { var dataChannel *webrtc.DataChannel @@ -528,7 +553,6 @@ func (c *Conn) CloseWithError(err error) error { } else { c.closeError = err } - close(c.closed) if ch, _ := c.pingChannel(); ch != nil { _ = ch.closeWithError(c.closeError) @@ -538,9 +562,22 @@ func (c *Conn) CloseWithError(err error) error { // closing an already closed connection isn't an issue for us. _ = c.rtc.Close() + // Waiting for pion/webrtc to report closed state on both of these + // ensures no goroutine leaks. + if c.rtc.ConnectionState() != webrtc.PeerConnectionStateNew { + c.opts.Logger.Debug(context.Background(), "waiting for rtc connection close...") + <-c.closedRTC + } + if c.rtc.ICEConnectionState() != webrtc.ICEConnectionStateNew { + c.opts.Logger.Debug(context.Background(), "waiting for ice connection close...") + <-c.closedICE + } + // Waits for all DataChannels to exit before officially labeling as closed. // All logging, goroutines, and async functionality is cleaned up after this. c.dcClosedWaitGroup.Wait() + close(c.closed) + c.opts.Logger.Debug(context.Background(), "closed") return err } diff --git a/peer/conn_test.go b/peer/conn_test.go index cc59e0cfbc..67d22483fe 100644 --- a/peer/conn_test.go +++ b/peer/conn_test.go @@ -35,7 +35,7 @@ var ( // In CI resources are frequently contended, so increasing this value // results in less flakes. if os.Getenv("CI") == "true" { - return 3 * time.Second + return time.Second } return 100 * time.Millisecond }() @@ -48,14 +48,7 @@ var ( ) func TestMain(m *testing.M) { - // pion/ice doesn't properly close immediately. The solution for this isn't yet known. See: - // https://github.com/pion/ice/pull/413 - goleak.VerifyTestMain(m, - goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).startOnConnectionStateChangeRoutine.func1"), - goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).startOnConnectionStateChangeRoutine.func2"), - goleak.IgnoreTopFunction("github.com/pion/ice/v2.(*Agent).taskLoop"), - goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"), - ) + goleak.VerifyTestMain(m) } func TestConn(t *testing.T) { @@ -64,6 +57,7 @@ func TestConn(t *testing.T) { t.Run("Ping", func(t *testing.T) { t.Parallel() client, server, _ := createPair(t) + exchange(client, server) _, err := client.Ping() require.NoError(t, err) _, err = server.Ping() @@ -72,7 +66,8 @@ func TestConn(t *testing.T) { t.Run("PingNetworkOffline", func(t *testing.T) { t.Parallel() - _, server, wan := createPair(t) + client, server, wan := createPair(t) + exchange(client, server) _, err := server.Ping() require.NoError(t, err) err = wan.Stop() @@ -83,7 +78,8 @@ func TestConn(t *testing.T) { t.Run("PingReconnect", func(t *testing.T) { t.Parallel() - _, server, wan := createPair(t) + client, server, wan := createPair(t) + exchange(client, server) _, err := server.Ping() require.NoError(t, err) // Create a channel that closes on disconnect. @@ -104,6 +100,7 @@ func TestConn(t *testing.T) { t.Run("Accept", func(t *testing.T) { t.Parallel() client, server, _ := createPair(t) + exchange(client, server) cch, err := client.Dial(context.Background(), "hello", &peer.ChannelOptions{}) require.NoError(t, err) @@ -119,6 +116,7 @@ func TestConn(t *testing.T) { t.Run("AcceptNetworkOffline", func(t *testing.T) { t.Parallel() client, server, wan := createPair(t) + exchange(client, server) cch, err := client.Dial(context.Background(), "hello", &peer.ChannelOptions{}) require.NoError(t, err) sch, err := server.Accept(context.Background()) @@ -135,20 +133,23 @@ func TestConn(t *testing.T) { t.Run("Buffering", func(t *testing.T) { t.Parallel() client, server, _ := createPair(t) + exchange(client, server) cch, err := client.Dial(context.Background(), "hello", &peer.ChannelOptions{}) require.NoError(t, err) sch, err := server.Accept(context.Background()) require.NoError(t, err) defer sch.Close() go func() { + bytes := make([]byte, 4096) for i := 0; i < 1024; i++ { - _, err := cch.Write(make([]byte, 4096)) + _, err := cch.Write(bytes) require.NoError(t, err) } _ = cch.Close() }() + bytes := make([]byte, 4096) for { - _, err = sch.Read(make([]byte, 4096)) + _, err = sch.Read(bytes) if err != nil { require.ErrorIs(t, err, peer.ErrClosed) break @@ -159,6 +160,7 @@ func TestConn(t *testing.T) { t.Run("NetConn", func(t *testing.T) { t.Parallel() client, server, _ := createPair(t) + exchange(client, server) srv, err := net.Listen("tcp", "127.0.0.1:0") require.NoError(t, err) defer srv.Close() @@ -211,6 +213,7 @@ func TestConn(t *testing.T) { t.Run("CloseBeforeNegotiate", func(t *testing.T) { t.Parallel() client, server, _ := createPair(t) + exchange(client, server) err := client.Close() require.NoError(t, err) err = server.Close() @@ -230,6 +233,7 @@ func TestConn(t *testing.T) { t.Run("PingConcurrent", func(t *testing.T) { t.Parallel() client, server, _ := createPair(t) + exchange(client, server) var wg sync.WaitGroup wg.Add(2) go func() { @@ -244,6 +248,18 @@ func TestConn(t *testing.T) { }() wg.Wait() }) + + t.Run("CandidateBeforeSessionDescription", func(t *testing.T) { + t.Parallel() + client, server, _ := createPair(t) + server.SetRemoteSessionDescription(<-client.LocalSessionDescription()) + sdp := <-server.LocalSessionDescription() + client.AddRemoteCandidate(<-server.LocalCandidate()) + client.SetRemoteSessionDescription(sdp) + server.AddRemoteCandidate(<-client.LocalCandidate()) + _, err := client.Ping() + require.NoError(t, err) + }) } func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.Router) { @@ -271,7 +287,7 @@ func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.R c1SettingEngine.SetVNet(c1Net) c1SettingEngine.SetPrflxAcceptanceMinWait(0) c1SettingEngine.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval) - channel1, err := peer.Client([]webrtc.ICEServer{}, &peer.ConnOptions{ + channel1, err := peer.Client([]webrtc.ICEServer{{}}, &peer.ConnOptions{ SettingEngine: c1SettingEngine, Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug), }) @@ -283,7 +299,7 @@ func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.R c2SettingEngine.SetVNet(c2Net) c2SettingEngine.SetPrflxAcceptanceMinWait(0) c2SettingEngine.SetICETimeouts(disconnectedTimeout, failedTimeout, keepAliveInterval) - channel2, err := peer.Server([]webrtc.ICEServer{}, &peer.ConnOptions{ + channel2, err := peer.Server([]webrtc.ICEServer{{}}, &peer.ConnOptions{ SettingEngine: c2SettingEngine, Logger: slogtest.Make(t, nil).Named("server").Leveled(slog.LevelDebug), }) @@ -298,31 +314,33 @@ func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.R _ = wan.Stop() }) - go func() { - for { - select { - case c := <-channel2.LocalCandidate(): - _ = channel1.AddRemoteCandidate(c) - case c := <-channel2.LocalSessionDescription(): - channel1.SetRemoteSessionDescription(c) - case <-channel2.Closed(): - return - } - } - }() - - go func() { - for { - select { - case c := <-channel1.LocalCandidate(): - _ = channel2.AddRemoteCandidate(c) - case c := <-channel1.LocalSessionDescription(): - channel2.SetRemoteSessionDescription(c) - case <-channel1.Closed(): - return - } - } - }() - return channel1, channel2, wan } + +func exchange(client, server *peer.Conn) { + go func() { + for { + select { + case c := <-server.LocalCandidate(): + client.AddRemoteCandidate(c) + case c := <-server.LocalSessionDescription(): + client.SetRemoteSessionDescription(c) + case <-server.Closed(): + return + } + } + }() + + go func() { + for { + select { + case c := <-client.LocalCandidate(): + server.AddRemoteCandidate(c) + case c := <-client.LocalSessionDescription(): + server.SetRemoteSessionDescription(c) + case <-client.Closed(): + return + } + } + }() +} diff --git a/peerbroker/dial.go b/peerbroker/dial.go index 9af9c663be..7253e44f0f 100644 --- a/peerbroker/dial.go +++ b/peerbroker/dial.go @@ -95,13 +95,9 @@ func Dial(stream proto.DRPCPeerBroker_NegotiateConnectionClient, iceServers []we SDP: serverToClientMessage.GetAnswer().Sdp, }) case serverToClientMessage.GetIceCandidate() != "": - err = peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{ + peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{ Candidate: serverToClientMessage.GetIceCandidate(), }) - if err != nil { - _ = peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %w", err)) - return - } default: _ = peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(serverToClientMessage).String())) return diff --git a/peerbroker/dial_test.go b/peerbroker/dial_test.go index 3d6fe807e0..30066b8d82 100644 --- a/peerbroker/dial_test.go +++ b/peerbroker/dial_test.go @@ -9,6 +9,10 @@ import ( "go.uber.org/goleak" "storj.io/drpc/drpcconn" + "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogtest" + + "github.com/coder/coder/peer" "github.com/coder/coder/peerbroker" "github.com/coder/coder/peerbroker/proto" "github.com/coder/coder/provisionersdk" @@ -28,7 +32,9 @@ func TestDial(t *testing.T) { defer client.Close() defer server.Close() - listener, err := peerbroker.Listen(server, nil) + listener, err := peerbroker.Listen(server, &peer.ConnOptions{ + Logger: slogtest.Make(t, nil).Named("server").Leveled(slog.LevelDebug), + }) require.NoError(t, err) api := proto.NewDRPCPeerBrokerClient(drpcconn.New(client)) @@ -36,7 +42,9 @@ func TestDial(t *testing.T) { require.NoError(t, err) clientConn, err := peerbroker.Dial(stream, []webrtc.ICEServer{{ URLs: []string{"stun:stun.l.google.com:19302"}, - }}, nil) + }}, &peer.ConnOptions{ + Logger: slogtest.Make(t, nil).Named("client").Leveled(slog.LevelDebug), + }) require.NoError(t, err) defer clientConn.Close() diff --git a/peerbroker/listen.go b/peerbroker/listen.go index db218a209f..fa68023689 100644 --- a/peerbroker/listen.go +++ b/peerbroker/listen.go @@ -179,12 +179,9 @@ func (b *peerBrokerService) NegotiateConnection(stream proto.DRPCPeerBroker_Nego return peerConn.CloseWithError(xerrors.Errorf("set ice configuration: %w", err)) } case clientToServerMessage.GetIceCandidate() != "": - err = peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{ + peerConn.AddRemoteCandidate(webrtc.ICECandidateInit{ Candidate: clientToServerMessage.GetIceCandidate(), }) - if err != nil { - return peerConn.CloseWithError(xerrors.Errorf("add remote candidate: %w", err)) - } default: return peerConn.CloseWithError(xerrors.Errorf("unhandled message: %s", reflect.TypeOf(clientToServerMessage).String())) } diff --git a/peerbroker/proto/peerbroker.proto b/peerbroker/proto/peerbroker.proto index c4f660231c..7afec197a5 100644 --- a/peerbroker/proto/peerbroker.proto +++ b/peerbroker/proto/peerbroker.proto @@ -45,4 +45,4 @@ service PeerBroker { // // See: https://davekilian.com/webrtc-the-hard-way.html rpc NegotiateConnection(stream NegotiateConnection.ClientToServer) returns (stream NegotiateConnection.ServerToClient); -} +} \ No newline at end of file diff --git a/provisioner/terraform/provision_test.go b/provisioner/terraform/provision_test.go index b596c85d0b..71b7864bbe 100644 --- a/provisioner/terraform/provision_test.go +++ b/provisioner/terraform/provision_test.go @@ -9,27 +9,16 @@ import ( "path/filepath" "testing" - "github.com/hashicorp/go-version" "github.com/stretchr/testify/require" "storj.io/drpc/drpcconn" "github.com/coder/coder/provisionersdk" "github.com/coder/coder/provisionersdk/proto" - - "github.com/hashicorp/hc-install/product" - "github.com/hashicorp/hc-install/releases" ) func TestProvision(t *testing.T) { t.Parallel() - installer := &releases.ExactVersion{ - Product: product.Terraform, - Version: version.Must(version.NewVersion("1.1.2")), - } - execPath, err := installer.Install(context.Background()) - require.NoError(t, err) - client, server := provisionersdk.TransportPipe() ctx, cancelFunc := context.WithCancel(context.Background()) t.Cleanup(func() { @@ -42,7 +31,6 @@ func TestProvision(t *testing.T) { ServeOptions: &provisionersdk.ServeOptions{ Transport: server, }, - BinaryPath: execPath, }) require.NoError(t, err) }()