diff --git a/jobs_test.go b/jobs_test.go index bd71a14..032b2a2 100644 --- a/jobs_test.go +++ b/jobs_test.go @@ -29,8 +29,10 @@ func testServer(wg *sync.WaitGroup) { Address: "0.0.0.0:8888", }) + // 启动定时任务 server.StartCron() + // 启动服务 server.StartUp() <-make(chan bool) @@ -52,10 +54,10 @@ func testCron(wg *sync.WaitGroup) { _, _ = c.AddFunc("*/15 * * * * *", func() { server.Send(&pb.PublishRequest{ - Id: gouuid.GetUuId(), - //Value: prefix + "wechat.send" + " 我是定时任务", - Value: prefixSprintf("127.0.0.1") + "wechat.send" + " 我是定时任务", - Ip: "127.0.0.1", + Id: gouuid.GetUuId(), + Value: prefix, + Method: "wechat.1.send", + Ip: "127.0.0.1", }) }) @@ -64,10 +66,10 @@ func testCron(wg *sync.WaitGroup) { _, _ = c.AddFunc("*/30 * * * * *", func() { server.Send(&pb.PublishRequest{ - Id: gouuid.GetUuId(), - //Value: prefix + "wechat.send" + " 我是定时任务", - Value: prefixSprintf("14.155.157.19") + "wechat.send" + " 我是定时任务", - Ip: "14.155.157.19", + Id: gouuid.GetUuId(), + Value: prefix, + Method: "wechat.2.send", + Ip: "14.155.157.19", }) }) @@ -91,10 +93,10 @@ func testWorker1(wg *sync.WaitGroup) { defer server.Conn.Close() // 订阅服务 - server.SubscribeCron() + stream := server.SubscribeCron() // 启动任务,会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 - stream := server.StartCron() + //stream := server.StartCron() // 阻塞遍历流,输出结果 for { @@ -107,7 +109,7 @@ func testWorker1(wg *sync.WaitGroup) { log.Println("[跑业务1]异常:", err.Error()) break } - log.Println("[跑业务1]收到:", reply) + log.Printf("[跑业务1]{收到}编号:%s 方法:%s\n", reply.GetId(), reply.GetMethod()) } wg.Done() @@ -122,10 +124,10 @@ func testWorker2(wg *sync.WaitGroup) { defer server.Conn.Close() // 订阅服务 - server.SubscribeCron() + stream := server.SubscribeCron() // 启动任务,会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 - stream := server.StartCron() + //stream := server.StartCron() // 阻塞遍历流,输出结果 for { @@ -138,7 +140,7 @@ func testWorker2(wg *sync.WaitGroup) { log.Println("[跑业务2]异常:", err.Error()) break } - log.Println("[跑业务2]收到:", reply) + log.Printf("[跑业务2]{收到}编号:%s 方法:%s\n", reply.GetId(), reply.GetMethod()) } wg.Done() diff --git a/pb/pubsub.pb.go b/pb/pubsub.pb.go index c848cd1..bb7ac7b 100644 --- a/pb/pubsub.pb.go +++ b/pb/pubsub.pb.go @@ -30,9 +30,10 @@ type PublishRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - Ip string `protobuf:"bytes,3,opt,name=ip,proto3" json:"ip,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Method string `protobuf:"bytes,3,opt,name=method,proto3" json:"method,omitempty"` + Ip string `protobuf:"bytes,4,opt,name=ip,proto3" json:"ip,omitempty"` } func (x *PublishRequest) Reset() { @@ -81,6 +82,13 @@ func (x *PublishRequest) GetValue() string { return "" } +func (x *PublishRequest) GetMethod() string { + if x != nil { + return x.Method + } + return "" +} + func (x *PublishRequest) GetIp() string { if x != nil { return x.Ip @@ -158,9 +166,10 @@ type SubscribeRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - Ip string `protobuf:"bytes,3,opt,name=ip,proto3" json:"ip,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Method string `protobuf:"bytes,3,opt,name=method,proto3" json:"method,omitempty"` + Ip string `protobuf:"bytes,4,opt,name=ip,proto3" json:"ip,omitempty"` } func (x *SubscribeRequest) Reset() { @@ -209,6 +218,13 @@ func (x *SubscribeRequest) GetValue() string { return "" } +func (x *SubscribeRequest) GetMethod() string { + if x != nil { + return x.Method + } + return "" +} + func (x *SubscribeRequest) GetIp() string { if x != nil { return x.Ip @@ -222,9 +238,9 @@ type SubscribeResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - Ip string `protobuf:"bytes,3,opt,name=ip,proto3" json:"ip,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Method string `protobuf:"bytes,3,opt,name=method,proto3" json:"method,omitempty"` } func (x *SubscribeResponse) Reset() { @@ -273,9 +289,9 @@ func (x *SubscribeResponse) GetValue() string { return "" } -func (x *SubscribeResponse) GetIp() string { +func (x *SubscribeResponse) GetMethod() string { if x != nil { - return x.Ip + return x.Method } return "" } @@ -284,34 +300,37 @@ var File_pb_pubsub_proto protoreflect.FileDescriptor var file_pb_pubsub_proto_rawDesc = []byte{ 0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x46, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x5e, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, 0x0a, - 0x02, 0x69, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x47, 0x0a, - 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, - 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x03, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x48, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, - 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, - 0x22, 0x49, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x16, 0x0a, + 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, + 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x47, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x60, + 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x74, 0x68, + 0x6f, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, + 0x22, 0x51, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, - 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x32, 0x78, 0x0a, 0x06, 0x50, - 0x75, 0x62, 0x53, 0x75, 0x62, 0x12, 0x32, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, - 0x12, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, - 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x09, 0x53, 0x75, 0x62, - 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, - 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6d, + 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x74, + 0x68, 0x6f, 0x64, 0x32, 0x78, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x12, 0x32, 0x0a, + 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x75, + 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x70, + 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x3a, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x14, + 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, + 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x07, 0x5a, + 0x05, 0x2e, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pb/pubsub.proto b/pb/pubsub.proto index 0ad6e5b..9605fb1 100644 --- a/pb/pubsub.proto +++ b/pb/pubsub.proto @@ -19,7 +19,8 @@ service PubSub { message PublishRequest { string id = 1; string value = 2; - string ip = 3; + string method = 3; + string ip = 4; } // 响应消息 @@ -33,12 +34,13 @@ message PublishResponse { message SubscribeRequest { string id = 1; string value = 2; - string ip = 3; + string method = 3; + string ip = 4; } // 响应消息 message SubscribeResponse { string id = 1; string value = 2; - string ip = 3; + string method = 3; } diff --git a/pb/pubsub.server.go b/pb/pubsub.server.go index a443b12..78993d9 100644 --- a/pb/pubsub.server.go +++ b/pb/pubsub.server.go @@ -41,12 +41,9 @@ func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_Sub // func(v interface{}) 定义函数过滤的规则 // SubscribeTopic 返回一个chan interface{} - log.Printf("[服务中转]{订阅}编号:%s 类型:%s ip地址:%s\n", req.GetId(), req.GetValue(), req.GetIp()) - ch := p.pub.SubscribeTopic(func(v interface{}) bool { log.Printf("[服务中转]{订阅}主题:%v\n", v) - log.Printf("[服务中转]{订阅}主题:%+v\n", v) // 接收数据是string,并且key是以arg为前缀的 if key, ok := v.(string); ok { @@ -57,6 +54,7 @@ func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_Sub return false }) + log.Printf("[服务中转]{订阅}编号:%s 类型:%s 方法:%s ip地址:%s\n", req.GetId(), req.GetValue(), req.GetMethod(), req.GetIp()) log.Println("[服务中转]{订阅}工作线:", ch) log.Println("[服务中转]{订阅}当前工作线数量:", p.pub.Len()) @@ -65,8 +63,9 @@ func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_Sub log.Println("[服务中转]{订阅}for ch:", ch) log.Println("[服务中转]{订阅}for v:", v) err := stream.Send(&SubscribeResponse{ - Value: v.(string), - Ip: "", + Id: req.GetId(), + Value: req.GetValue(), + Method: req.GetMethod(), }) if err != nil { log.Println("[服务中转]{订阅}任务分配失败 ", err.Error()) diff --git a/server.go b/server.go index 979cf4a..7c632d6 100644 --- a/server.go +++ b/server.go @@ -65,7 +65,7 @@ func (s *Server) StartCron() { }() } -// StartUp 启动 +// StartUp 启动服务 func (s *Server) StartUp() { // 监听本地端口 diff --git a/worker.go b/worker.go index 24a146c..3364804 100644 --- a/worker.go +++ b/worker.go @@ -50,8 +50,8 @@ func NewWorker(config *WorkerConfig) *Worker { } // SubscribeCron 订阅服务 -func (w *Worker) SubscribeCron() { - _, err := w.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{ +func (w *Worker) SubscribeCron() pb.PubSub_SubscribeClient { + stream, err := w.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{ Id: gouuid.GetUuId(), Value: prefix, Ip: w.ClientIp, @@ -59,6 +59,7 @@ func (w *Worker) SubscribeCron() { if err != nil { panic("[工作线]{订阅服务失败}" + err.Error()) } + return stream } // StartCron 启动任务