Files
coder/coderd/database/pubsub/pubsub_memory.go
Kyle Carberry e4b6f5695b chore: separate pubsub into a new package (#8017)
* chore: rename store to dbmock for consistency

* chore: remove redundant dbtype package

This wasn't necessary and forked how we do DB types.

* chore: separate pubsub into a new package

This didn't need to be in database and was bloating it.
2023-06-14 15:34:54 +00:00

95 lines
1.9 KiB
Go

package pubsub
import (
"context"
"sync"
"github.com/google/uuid"
)
// genericListener is either a Listener or ListenerWithErr
type genericListener struct {
l Listener
le ListenerWithErr
}
func (g genericListener) send(ctx context.Context, message []byte) {
if g.l != nil {
g.l(ctx, message)
}
if g.le != nil {
g.le(ctx, message, nil)
}
}
// memoryPubsub is an in-memory Pubsub implementation.
type memoryPubsub struct {
mut sync.RWMutex
listeners map[string]map[uuid.UUID]genericListener
}
func (m *memoryPubsub) Subscribe(event string, listener Listener) (cancel func(), err error) {
return m.subscribeGeneric(event, genericListener{l: listener})
}
func (m *memoryPubsub) SubscribeWithErr(event string, listener ListenerWithErr) (cancel func(), err error) {
return m.subscribeGeneric(event, genericListener{le: listener})
}
func (m *memoryPubsub) subscribeGeneric(event string, listener genericListener) (cancel func(), err error) {
m.mut.Lock()
defer m.mut.Unlock()
var listeners map[uuid.UUID]genericListener
var ok bool
if listeners, ok = m.listeners[event]; !ok {
listeners = map[uuid.UUID]genericListener{}
m.listeners[event] = listeners
}
var id uuid.UUID
for {
id = uuid.New()
if _, ok = listeners[id]; !ok {
break
}
}
listeners[id] = listener
return func() {
m.mut.Lock()
defer m.mut.Unlock()
listeners := m.listeners[event]
delete(listeners, id)
}, nil
}
func (m *memoryPubsub) Publish(event string, message []byte) error {
m.mut.RLock()
defer m.mut.RUnlock()
listeners, ok := m.listeners[event]
if !ok {
return nil
}
var wg sync.WaitGroup
for _, listener := range listeners {
wg.Add(1)
listener := listener
go func() {
defer wg.Done()
listener.send(context.Background(), message)
}()
}
wg.Wait()
return nil
}
func (*memoryPubsub) Close() error {
return nil
}
func NewInMemory() Pubsub {
return &memoryPubsub{
listeners: make(map[string]map[uuid.UUID]genericListener),
}
}