feat: add logging to pgPubsub (#11953)

Should be helpful for #11950

Adds a logger to pgPubsub and logs various events, most especially connection and disconnection from postgres.
This commit is contained in:
Spike Curtis
2024-01-31 15:49:16 +04:00
committed by GitHub
parent 1c8b803785
commit a34cada09a
4 changed files with 61 additions and 14 deletions

View File

@ -10,6 +10,8 @@ import (
"github.com/google/uuid"
"github.com/lib/pq"
"golang.org/x/xerrors"
"cdr.dev/slog"
)
// Listener represents a pubsub handler.
@ -164,6 +166,7 @@ func (q *msgQueue) dropped() {
type pgPubsub struct {
ctx context.Context
cancel context.CancelFunc
logger slog.Logger
listenDone chan struct{}
pgListener *pq.Listener
db *sql.DB
@ -198,6 +201,9 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
}()
err = p.pgListener.Listen(event)
if err == nil {
p.logger.Debug(p.ctx, "started listening to event channel", slog.F("event", event))
}
if errors.Is(err, pq.ErrChannelAlreadyOpen) {
// It's ok if it's already open!
err = nil
@ -223,12 +229,18 @@ func (p *pgPubsub) subscribeQueue(event string, newQ *msgQueue) (cancel func(),
delete(listeners, id)
if len(listeners) == 0 {
_ = p.pgListener.Unlisten(event)
uErr := p.pgListener.Unlisten(event)
if uErr != nil && !p.closedListener {
p.logger.Warn(p.ctx, "failed to unlisten", slog.Error(uErr), slog.F("event", event))
} else {
p.logger.Debug(p.ctx, "stopped listening to event channel", slog.F("event", event))
}
}
}, nil
}
func (p *pgPubsub) Publish(event string, message []byte) error {
p.logger.Debug(p.ctx, "publish", slog.F("event", event), slog.F("message_len", len(message)))
// This is safe because we are calling pq.QuoteLiteral. pg_notify doesn't
// support the first parameter being a prepared statement.
//nolint:gosec
@ -241,9 +253,11 @@ func (p *pgPubsub) Publish(event string, message []byte) error {
// Close closes the pubsub instance.
func (p *pgPubsub) Close() error {
p.logger.Info(p.ctx, "pubsub is closing")
p.cancel()
err := p.closeListener()
<-p.listenDone
p.logger.Debug(p.ctx, "pubsub closed")
return err
}
@ -262,7 +276,11 @@ func (p *pgPubsub) closeListener() error {
// listen begins receiving messages on the pq listener.
func (p *pgPubsub) listen() {
defer func() {
_ = p.closeListener()
p.logger.Info(p.ctx, "pubsub listen stopped receiving notify")
cErr := p.closeListener()
if cErr != nil {
p.logger.Error(p.ctx, "failed to close listener")
}
close(p.listenDone)
}()
@ -281,6 +299,7 @@ func (p *pgPubsub) listen() {
}
// A nil notification can be dispatched on reconnect.
if notif == nil {
p.logger.Debug(p.ctx, "notifying subscribers of a reconnection")
p.recordReconnect()
continue
}
@ -312,10 +331,20 @@ func (p *pgPubsub) recordReconnect() {
}
// New creates a new Pubsub implementation using a PostgreSQL connection.
func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) {
func New(ctx context.Context, logger slog.Logger, database *sql.DB, connectURL string) (Pubsub, error) {
// Creates a new listener using pq.
errCh := make(chan error)
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(_ pq.ListenerEventType, err error) {
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
switch t {
case pq.ListenerEventConnected:
logger.Info(ctx, "pubsub connected to postgres")
case pq.ListenerEventDisconnected:
logger.Error(ctx, "pubsub disconnected from postgres", slog.Error(err))
case pq.ListenerEventReconnected:
logger.Info(ctx, "pubsub reconnected to postgres")
case pq.ListenerEventConnectionAttemptFailed:
logger.Error(ctx, "pubsub failed to connect to postgres", slog.Error(err))
}
// This callback gets events whenever the connection state changes.
// Don't send if the errChannel has already been closed.
select {
@ -342,12 +371,13 @@ func New(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, erro
pgPubsub := &pgPubsub{
ctx: ctx,
cancel: cancel,
logger: logger,
listenDone: make(chan struct{}),
db: database,
pgListener: listener,
queues: make(map[string]map[uuid.UUID]*msgQueue),
}
go pgPubsub.listen()
logger.Info(ctx, "pubsub has started")
return pgPubsub, nil
}