|
|
package pb
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
"go.dtapp.net/gojobs/pubsub"
|
|
|
"log"
|
|
|
"strings"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
type PubSubServerService struct {
|
|
|
pub *pubsub.Publisher
|
|
|
UnimplementedPubSubServer
|
|
|
}
|
|
|
|
|
|
func NewPubSubServerService() *PubSubServerService {
|
|
|
return &PubSubServerService{
|
|
|
// 新建一个Publisher对象
|
|
|
pub: pubsub.NewPublisher(time.Millisecond*100, 10),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Publish 实现发布方法
|
|
|
func (p *PubSubServerService) Publish(ctx context.Context, req *PublishRequest) (*PublishResponse, error) {
|
|
|
|
|
|
log.Printf("[服务中转]{发布}编号:%s 类型:%s ip地址:%s\n", req.GetId(), req.GetValue(), req.GetIp())
|
|
|
|
|
|
// 发布消息
|
|
|
p.pub.Publish(req.GetValue())
|
|
|
return &PublishResponse{
|
|
|
Id: req.GetId(),
|
|
|
Value: req.GetValue(),
|
|
|
Ip: req.GetIp(),
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
// Subscribe 实现订阅方法
|
|
|
func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_SubscribeServer) error {
|
|
|
|
|
|
// SubscribeTopic 增加一个使用函数过滤器的订阅者
|
|
|
// func(v interface{}) 定义函数过滤的规则
|
|
|
// SubscribeTopic 返回一个chan interface{}
|
|
|
|
|
|
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
|
|
|
|
|
|
log.Printf("[服务中转]{订阅}主题:%v\n", v)
|
|
|
|
|
|
// 接收数据是string,并且key是以arg为前缀的
|
|
|
if key, ok := v.(string); ok {
|
|
|
if strings.HasPrefix(key, req.GetValue()) {
|
|
|
return true
|
|
|
}
|
|
|
}
|
|
|
return false
|
|
|
})
|
|
|
|
|
|
log.Printf("[服务中转]{订阅}编号:%s 类型:%s 方法:%s ip地址:%s\n", req.GetId(), req.GetValue(), req.GetMethod(), req.GetIp())
|
|
|
log.Println("[服务中转]{订阅}工作线:", ch)
|
|
|
log.Println("[服务中转]{订阅}当前工作线数量:", p.pub.Len())
|
|
|
|
|
|
// 服务器遍历chan,并将其中信息发送给订阅客户端
|
|
|
for v := range ch {
|
|
|
log.Println("[服务中转]{订阅}for ch:", ch)
|
|
|
log.Println("[服务中转]{订阅}for v:", v)
|
|
|
err := stream.Send(&SubscribeResponse{
|
|
|
Id: req.GetId(),
|
|
|
Value: req.GetValue(),
|
|
|
Method: req.GetMethod(),
|
|
|
})
|
|
|
if err != nil {
|
|
|
log.Println("[服务中转]{订阅}任务分配失败 ", err.Error())
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
}
|