feat: peer wireguard (#2445)

This commit is contained in:
Colin Adler
2022-06-24 10:25:01 -05:00
committed by GitHub
parent d21ab2115d
commit 05b67ab1cf
34 changed files with 1935 additions and 236 deletions

View File

@ -28,7 +28,7 @@ type pgPubsub struct {
pgListener *pq.Listener
db *sql.DB
mut sync.Mutex
listeners map[string]map[string]Listener
listeners map[string]map[uuid.UUID]Listener
}
// Subscribe calls the listener when an event matching the name is received.
@ -45,20 +45,22 @@ func (p *pgPubsub) Subscribe(event string, listener Listener) (cancel func(), er
return nil, xerrors.Errorf("listen: %w", err)
}
var listeners map[string]Listener
var eventListeners map[uuid.UUID]Listener
var ok bool
if listeners, ok = p.listeners[event]; !ok {
listeners = map[string]Listener{}
p.listeners[event] = listeners
if eventListeners, ok = p.listeners[event]; !ok {
eventListeners = map[uuid.UUID]Listener{}
p.listeners[event] = eventListeners
}
var id string
var id uuid.UUID
for {
id = uuid.New().String()
if _, ok = listeners[id]; !ok {
id = uuid.New()
if _, ok = eventListeners[id]; !ok {
break
}
}
listeners[id] = listener
eventListeners[id] = listener
return func() {
p.mut.Lock()
defer p.mut.Unlock()
@ -77,7 +79,7 @@ func (p *pgPubsub) Publish(event string, message []byte) error {
//nolint:gosec
_, err := p.db.ExecContext(context.Background(), `select pg_notify(`+pq.QuoteLiteral(event)+`, $1)`, message)
if err != nil {
return xerrors.Errorf("exec: %w", err)
return xerrors.Errorf("exec pg_notify: %w", err)
}
return nil
}
@ -128,7 +130,7 @@ func (p *pgPubsub) listenReceive(ctx context.Context, notif *pq.Notification) {
func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub, error) {
// Creates a new listener using pq.
errCh := make(chan error)
listener := pq.NewListener(connectURL, time.Second*10, time.Minute, func(event pq.ListenerEventType, err error) {
listener := pq.NewListener(connectURL, time.Second, time.Minute, func(event pq.ListenerEventType, err error) {
// This callback gets events whenever the connection state changes.
// Don't send if the errChannel has already been closed.
select {
@ -150,7 +152,7 @@ func NewPubsub(ctx context.Context, database *sql.DB, connectURL string) (Pubsub
pgPubsub := &pgPubsub{
db: database,
pgListener: listener,
listeners: make(map[string]map[string]Listener),
listeners: make(map[string]map[uuid.UUID]Listener),
}
go pgPubsub.listen(ctx)