- repair bug

master
李光春 2 years ago
parent b903776947
commit f1da238556

@ -31,7 +31,7 @@ func NewClient(config *ClientConfig) *Client {
// 建立连接 获取client // 建立连接 获取client
c.Conn, err = grpc.Dial(c.Address, grpc.WithInsecure()) c.Conn, err = grpc.Dial(c.Address, grpc.WithInsecure())
if err != nil { if err != nil {
panic("[客户端]{连接失败} " + err.Error()) panic("[客户端]{连接失败}" + err.Error())
} }
return c return c

@ -35,7 +35,7 @@ func NewCron(config *CronConfig) *Cron {
// 建立连接 获取client // 建立连接 获取client
c.Conn, err = grpc.Dial(c.Address, grpc.WithInsecure()) c.Conn, err = grpc.Dial(c.Address, grpc.WithInsecure())
if err != nil { if err != nil {
panic("[定时任务]{连接失败} " + err.Error()) panic("[定时任务]{连接失败}" + err.Error())
} }
// 新建一个客户端 // 新建一个客户端
@ -46,11 +46,11 @@ func NewCron(config *CronConfig) *Cron {
// Send 发送 // Send 发送
func (c *Cron) Send(in *pb.PublishRequest) (*pb.PublishResponse, error) { func (c *Cron) Send(in *pb.PublishRequest) (*pb.PublishResponse, error) {
log.Printf("[定时任务]{广播开始} 编号:%s 类型:%s ip%s\n", in.GetId(), in.GetValue(), in.GetIp()) log.Printf("[定时任务]{广播开始}编号:%s 类型:%s ip%s\n", in.GetId(), in.GetValue(), in.GetIp())
stream, err := c.Pub.Publish(context.Background(), in) stream, err := c.Pub.Publish(context.Background(), in)
if err != nil { if err != nil {
log.Printf("[定时任务]{广播失败} 编号:%s %v\n", in.GetId(), err) log.Printf("[定时任务]{广播失败}编号:%s %v\n", in.GetId(), err)
} }
log.Printf("[定时任务]{广播成功} 编号:%s 类型:%s ip%s\n", in.GetId(), in.GetValue(), in.GetIp()) log.Printf("[定时任务]{广播成功}编号:%s 类型:%s ip%s\n", in.GetId(), in.GetValue(), in.GetIp())
return stream, err return stream, err
} }

@ -23,7 +23,7 @@ func NewPubSubServerService() *PubSubServerService {
// Publish 实现发布方法 // Publish 实现发布方法
func (p *PubSubServerService) Publish(ctx context.Context, req *PublishRequest) (*PublishResponse, error) { func (p *PubSubServerService) Publish(ctx context.Context, req *PublishRequest) (*PublishResponse, error) {
log.Printf("[服务中转]{发布} 编号:%s 类型:%s ip地址%s\n", req.GetId(), req.GetValue(), req.GetIp()) log.Printf("[服务中转]{发布}编号:%s 类型:%s ip地址%s\n", req.GetId(), req.GetValue(), req.GetIp())
// 发布消息 // 发布消息
p.pub.Publish(req.GetValue()) p.pub.Publish(req.GetValue())
@ -41,12 +41,12 @@ func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_Sub
// func(v interface{}) 定义函数过滤的规则 // func(v interface{}) 定义函数过滤的规则
// SubscribeTopic 返回一个chan interface{} // SubscribeTopic 返回一个chan interface{}
log.Printf("[服务中转]{订阅} 编号:%s 类型:%s ip地址%s\n", req.GetId(), req.GetValue(), req.GetIp()) log.Printf("[服务中转]{订阅}编号:%s 类型:%s ip地址%s\n", req.GetId(), req.GetValue(), req.GetIp())
ch := p.pub.SubscribeTopic(func(v interface{}) bool { ch := p.pub.SubscribeTopic(func(v interface{}) bool {
log.Printf("[服务中转]{订阅} 主题:%v\n", v) log.Printf("[服务中转]{订阅}主题:%v\n", v)
log.Printf("[服务中转]{订阅} 主题:%+v\n", v) log.Printf("[服务中转]{订阅}主题:%+v\n", v)
// 接收数据是string并且key是以arg为前缀的 // 接收数据是string并且key是以arg为前缀的
if key, ok := v.(string); ok { if key, ok := v.(string); ok {
@ -57,19 +57,19 @@ func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_Sub
return false return false
}) })
log.Println("[服务中转]{订阅} 工作线:", ch) log.Println("[服务中转]{订阅}工作线:", ch)
log.Println("[服务中转]{订阅} 当前工作线数量:", p.pub.Len()) log.Println("[服务中转]{订阅}当前工作线数量:", p.pub.Len())
// 服务器遍历chan并将其中信息发送给订阅客户端 // 服务器遍历chan并将其中信息发送给订阅客户端
for v := range ch { for v := range ch {
log.Println("[服务中转] for ch", ch) log.Println("[服务中转]{订阅}for ch", ch)
log.Println("[服务中转] for v", v) log.Println("[服务中转]{订阅}for v", v)
err := stream.Send(&SubscribeResponse{ err := stream.Send(&SubscribeResponse{
Value: v.(string), Value: v.(string),
Ip: "", Ip: "",
}) })
if err != nil { if err != nil {
log.Println("[服务中转]{订阅} 任务分配失败 ", err.Error()) log.Println("[服务中转]{订阅}任务分配失败 ", err.Error())
return err return err
} }
} }

@ -49,20 +49,19 @@ func NewServer(config *ServerConfig) *Server {
} }
// StartUp 启动 // StartUp 启动
func (s *Server) StartUp() error { func (s *Server) StartUp() {
// 监听本地端口 // 监听本地端口
lis, err := net.Listen("tcp", s.Address) lis, err := net.Listen("tcp", s.Address)
if err != nil { if err != nil {
return errors.New("[服务中转]{创建监听失败} " + err.Error()) panic(errors.New("[服务中转]{创建监听失败}" + err.Error()))
} }
log.Println("[服务中转]{监听] ", lis.Addr()) log.Println("[服务中转]{监听}", lis.Addr())
// 启动grpc // 启动grpc
err = s.Conn.Serve(lis) err = s.Conn.Serve(lis)
if err != nil { if err != nil {
return errors.New("[服务中转]{创建服务失败} " + err.Error()) panic(errors.New("[服务中转]{创建服务失败}" + err.Error()))
} }
return nil
} }

@ -33,7 +33,7 @@ func NewWorker(config *WorkerConfig) *Worker {
// 建立连接 获取client // 建立连接 获取client
w.Conn, err = grpc.Dial(w.Address, grpc.WithInsecure()) w.Conn, err = grpc.Dial(w.Address, grpc.WithInsecure())
if err != nil { if err != nil {
panic("[工作线]{连接失败} " + err.Error()) panic("[工作线]{连接失败}" + err.Error())
} }
// 新建一个客户端 // 新建一个客户端

Loading…
Cancel
Save