You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-library/utils/gojobs/pubsub/pubsub.go

126 lines
2.7 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package pubsub
import (
"sync"
"time"
)
// 等待组放在共享内存池中减少GC
var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}
// NewPublisher
// 第一个参数控制发布时最大阻塞时间
// 第二个参数是缓冲区大小控制每个订阅者的chan缓冲区大小
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
type subscriber chan interface{}
type topicFunc func(v interface{}) bool
type Publisher struct {
m sync.RWMutex // 控制订阅者map并发读写安全
buffer int // 每个订阅者chan缓冲区大小
timeout time.Duration // 发布阻塞超时时间
subscribers map[subscriber]topicFunc
}
// Len 返回订阅者数量
func (p *Publisher) Len() int {
p.m.RLock()
i := len(p.subscribers)
p.m.RUnlock()
return i
}
// Subscribe 无Topic订阅
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// SubscribeTopic 通过Topic订阅
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// SubscribeTopicWithBuffer 通过自定义chan缓冲区大小定义新的订阅者
func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
ch := make(chan interface{}, buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// Evict 移除某个订阅者
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
_, exists := p.subscribers[sub]
if exists {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
// Publish 发布消息
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
if len(p.subscribers) == 0 {
p.m.RUnlock()
return
}
wg := wgPool.Get().(*sync.WaitGroup)
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, wg)
}
wg.Wait()
wgPool.Put(wg)
p.m.RUnlock()
}
// Close 关闭服务
func (p *Publisher) Close() {
p.m.Lock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
// 真正发布消息的逻辑通过Timer根据传入的timeout控制每次发布消息最大阻塞时长
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
// 如果接收器不可用,请在选择“不阻止”下发送
if p.timeout > 0 {
timeout := time.NewTimer(p.timeout)
defer timeout.Stop()
select {
case sub <- v:
case <-timeout.C:
}
return
}
select {
case sub <- v:
default:
}
}