mirror of
https://github.com/coder/coder.git
synced 2025-07-12 00:14:10 +00:00
fix: fix race condition in pubsub startup (#17088)
fixes https://github.com/coder/internal/issues/525 If the context is canceled, the goroutine that is supposed to read from the `errCh` could exit prematurely, leading to a goroutine leak. Refactors this code so it cannot block.
This commit is contained in:
@ -492,7 +492,6 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error {
|
|||||||
p.connected.Set(0)
|
p.connected.Set(0)
|
||||||
// Creates a new listener using pq.
|
// Creates a new listener using pq.
|
||||||
var (
|
var (
|
||||||
errCh = make(chan error)
|
|
||||||
dialer = logDialer{
|
dialer = logDialer{
|
||||||
logger: p.logger,
|
logger: p.logger,
|
||||||
// pq.defaultDialer uses a zero net.Dialer as well.
|
// pq.defaultDialer uses a zero net.Dialer as well.
|
||||||
@ -525,6 +524,10 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error {
|
|||||||
dc.Dialer(dialer)
|
dc.Dialer(dialer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
errCh = make(chan error, 1)
|
||||||
|
sentErrCh = false
|
||||||
|
)
|
||||||
p.pgListener = pqListenerShim{
|
p.pgListener = pqListenerShim{
|
||||||
Listener: pq.NewConnectorListener(connector, connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
|
Listener: pq.NewConnectorListener(connector, connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
|
||||||
switch t {
|
switch t {
|
||||||
@ -541,18 +544,16 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error {
|
|||||||
p.logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
|
p.logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
|
||||||
}
|
}
|
||||||
// This callback gets events whenever the connection state changes.
|
// This callback gets events whenever the connection state changes.
|
||||||
// Don't send if the errChannel has already been closed.
|
// Only send the first error.
|
||||||
select {
|
if sentErrCh {
|
||||||
case <-errCh:
|
|
||||||
return
|
return
|
||||||
default:
|
|
||||||
errCh <- err
|
|
||||||
close(errCh)
|
|
||||||
}
|
}
|
||||||
|
errCh <- err // won't block because we are buffered.
|
||||||
|
sentErrCh = true
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case err := <-errCh:
|
case err = <-errCh:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = p.pgListener.Close()
|
_ = p.pgListener.Close()
|
||||||
return xerrors.Errorf("create pq listener: %w", err)
|
return xerrors.Errorf("create pq listener: %w", err)
|
||||||
|
Reference in New Issue
Block a user