- update server
continuous-integration/drone/push Build is passing Details

master v1.0.3
李光春 2 years ago
parent e5d9cb9c66
commit 5cf6101d2b

@ -3,6 +3,7 @@ package pb
import (
"context"
"go.dtapp.net/gojobs/pubsub"
"log"
"strings"
"time"
)
@ -21,6 +22,7 @@ func NewPubSubServerService() *PubSubServerService {
// Publish 实现发布方法
func (p *PubSubServerService) Publish(ctx context.Context, arg *String) (*String, error) {
log.Printf("[服务中转]%v\n", arg.GetValue())
// 发布消息
p.pub.Publish(arg.GetValue())
return &String{Value: arg.GetValue()}, nil
@ -28,10 +30,13 @@ func (p *PubSubServerService) Publish(ctx context.Context, arg *String) (*String
// Subscribe 实现订阅方法
func (p *PubSubServerService) Subscribe(arg *String, stream PubSub_SubscribeServer) error {
// SubscribeTopic 增加一个使用函数过滤器的订阅者
// func(v interface{}) 定义函数过滤的规则
// SubscribeTopic 返回一个chan interface{}
log.Printf("[服务中转]收到任务:%v\n", arg.GetValue())
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
// 接收数据是string并且key是以arg为前缀的
if key, ok := v.(string); ok {
@ -42,10 +47,14 @@ func (p *PubSubServerService) Subscribe(arg *String, stream PubSub_SubscribeServ
return false
})
log.Println("[服务中转]工作线:", ch)
log.Println("[服务中转]工作线数量:", p.pub.Len())
// 服务器遍历chan并将其中信息发送给订阅客户端
for v := range ch {
err := stream.Send(&String{Value: v.(string)})
if err != nil {
log.Println("[服务中转]任务分配失败:", err.Error())
return err
}
}

@ -1,3 +1,3 @@
package gojobs
const Version = "1.0.2"
const Version = "1.0.3"

Loading…
Cancel
Save