package pubsub import ( "sync" "time" ) // 等待组放在共享内存池中,减少GC var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }} // NewPublisher // 第一个参数控制发布时最大阻塞时间 // 第二个参数是缓冲区大小,控制每个订阅者的chan缓冲区大小 func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, subscribers: make(map[subscriber]topicFunc), } } type subscriber chan interface{} type topicFunc func(v interface{}) bool type Publisher struct { m sync.RWMutex // 控制订阅者map并发读写安全 buffer int // 每个订阅者chan缓冲区大小 timeout time.Duration // 发布阻塞超时时间 subscribers map[subscriber]topicFunc } // Len 返回订阅者数量 func (p *Publisher) Len() int { p.m.RLock() i := len(p.subscribers) p.m.RUnlock() return i } // Subscribe 无Topic订阅 func (p *Publisher) Subscribe() chan interface{} { return p.SubscribeTopic(nil) } // SubscribeTopic 通过Topic订阅 func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() p.subscribers[ch] = topic p.m.Unlock() return ch } // SubscribeTopicWithBuffer 通过自定义chan缓冲区大小定义新的订阅者 func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} { ch := make(chan interface{}, buffer) p.m.Lock() p.subscribers[ch] = topic p.m.Unlock() return ch } // Evict 移除某个订阅者 func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() _, exists := p.subscribers[sub] if exists { delete(p.subscribers, sub) close(sub) } p.m.Unlock() } // Publish 发布消息 func (p *Publisher) Publish(v interface{}) { p.m.RLock() if len(p.subscribers) == 0 { p.m.RUnlock() return } wg := wgPool.Get().(*sync.WaitGroup) for sub, topic := range p.subscribers { wg.Add(1) go p.sendTopic(sub, topic, v, wg) } wg.Wait() wgPool.Put(wg) p.m.RUnlock() } // Close 关闭服务 func (p *Publisher) Close() { p.m.Lock() for sub := range p.subscribers { delete(p.subscribers, sub) close(sub) } p.m.Unlock() } // 真正发布消息的逻辑,通过Timer,根据传入的timeout控制每次发布消息最大阻塞时长 func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { defer wg.Done() if topic != nil && !topic(v) { return } // 如果接收器不可用,请在选择“不阻止”下发送 if p.timeout > 0 { timeout := time.NewTimer(p.timeout) defer timeout.Stop() select { case sub <- v: case <-timeout.C: } return } select { case sub <- v: default: } }