From 736c82d8af5a833e851719c8e33ae52c55f05291 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=85=89=E6=98=A5?= Date: Mon, 23 May 2022 18:30:07 +0800 Subject: [PATCH] - bug --- consts.go | 18 ++++ cron.go | 2 +- go.mod | 7 ++ go.sum | 14 +++ jobs_test.go | 113 ++++++++++++++++++++++++ pb/pubsub.pb.go | 200 +++++++++++++++++++++++++++++++++++++------ pb/pubsub.proto | 18 +++- pb/pubsub.server.go | 19 ++-- pb/pubsub_grpc.pb.go | 34 ++++---- 9 files changed, 370 insertions(+), 55 deletions(-) create mode 100644 consts.go create mode 100644 jobs_test.go diff --git a/consts.go b/consts.go new file mode 100644 index 0000000..d45b03b --- /dev/null +++ b/consts.go @@ -0,0 +1,18 @@ +package gojobs + +import ( + "fmt" + "go.dtapp.net/goip" +) + +var ip string + +func configIp() { + ip = goip.GetOutsideIp() +} + +const prefix = "cron_%s:" + +func prefixSprintf() string { + return fmt.Sprintf(prefix, ip) +} diff --git a/cron.go b/cron.go index 6ffbdb5..c057826 100644 --- a/cron.go +++ b/cron.go @@ -45,7 +45,7 @@ func NewCron(config *CronConfig) *Cron { } // Send 发送 -func (c *Cron) Send(in *pb.String) (*pb.String, error) { +func (c *Cron) Send(in *pb.PublishRequest) (*pb.PublishRequest, error) { stream, err := c.Pub.Publish(context.Background(), in) if err != nil { log.Printf("[定时任务]发送失败:%v\n", err) diff --git a/go.mod b/go.mod index 833dfe8..67f2596 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,13 @@ require ( require ( github.com/golang/protobuf v1.5.2 // indirect + github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda // indirect + github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f // indirect + github.com/ulikunitz/xz v0.5.10 // indirect + go.dtapp.net/goip v1.0.16 // indirect + go.dtapp.net/gorequest v1.0.18 // indirect + go.dtapp.net/gostring v1.0.3 // indirect + go.dtapp.net/gotime v1.0.2 // indirect golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/text v0.3.7 // indirect diff --git a/go.sum b/go.sum index d3efd1a..fd1a1ef 100644 --- a/go.sum +++ b/go.sum @@ -48,9 +48,23 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda h1:h+YpzUB/bGVJcLqW+d5GghcCmE/A25KbzjXvWJQi/+o= +github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda/go.mod h1:MSotTrCv1PwoR8QgU1JurEx+lNNbtr25I+m0zbLyAGw= +github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f h1:1cJITU3JUI8qNS5T0BlXwANsVdyoJQHQ4hvOxbunPCw= +github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f/go.mod h1:LyBTue+RWeyIfN3ZJ4wVxvDuvlGJtDgCLgCb6HCPgps= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8= +github.com/ulikunitz/xz v0.5.10/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14= +go.dtapp.net/goip v1.0.16 h1:jJoXeLVc8BmlKEc+4T9mL2BFK63RJFd4B9xTMYhFRqg= +go.dtapp.net/goip v1.0.16/go.mod h1:BY2Xo5clizPZFQ8CYOlgg91fHMZR1Ll54f3P0sNHxbg= +go.dtapp.net/gorequest v1.0.18 h1:NAogmkEbz4Sln4tt6Li8tF99d3WnHMkbPuYFdNz/xTE= +go.dtapp.net/gorequest v1.0.18/go.mod h1:EwOfdfxsWPszOWrphCWHTN4DbYtU6fyQ/fuWQyQwSnk= +go.dtapp.net/gostring v1.0.3 h1:KSOq4D77/g5yZN/bqWfZ0kOOaPr/P1240vg03+XdENI= +go.dtapp.net/gostring v1.0.3/go.mod h1:+ggrOvgQDQturi1QGsXEpyRN/ZPoRDaqhMujIk5lrgQ= +go.dtapp.net/gotime v1.0.2 h1:CFIJHQXC/4t9bsJhk2cLhjHd6rpdPcJXr8BcHKHDuQo= +go.dtapp.net/gotime v1.0.2/go.mod h1:Gq7eNLr2iMLP18UNWONRq4V3Uhf/ADp4bIrS+Tc6ktY= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/jobs_test.go b/jobs_test.go new file mode 100644 index 0000000..d82bd06 --- /dev/null +++ b/jobs_test.go @@ -0,0 +1,113 @@ +package gojobs + +import ( + "context" + "go.dtapp.net/gojobs/pb" + "io" + "log" + "strings" + "sync" + "testing" + "time" +) + +func TestJobs(t *testing.T) { + testServer() +} + +func testServer() { + + server := NewServer(&ServerConfig{ + PublishTimeout: time.Millisecond * 100, + PubBuffer: 10, + Address: "0.0.0.0:8888", + }) + + cronServer := server.Pub.SubscribeTopic(func(v interface{}) bool { + log.Println("SubscribeTopic:", v) + if key, ok := v.(string); ok { + if strings.HasPrefix(key, "cron:") { + return true + } + } + return false + }) + + go func() { + log.Println("cronServer:topic:", <-cronServer) + }() + + err := server.StartUp() + if err != nil { + log.Panicf("创建服务失败:%v\n", err) + return + } + + <-make(chan bool) + +} + +func TestClient(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(2) + go testCron(&wg) + go testWorker(&wg) + wg.Wait() +} + +func testCron(wg *sync.WaitGroup) { + + server := NewCron(&CronConfig{ + Address: "localhost:8888", + }) + defer server.Conn.Close() + + t1 := time.NewTimer(time.Second * 10) + for { + select { + case <-t1.C: + + server.Send(&pb.PublishRequest{ + Value: "cron:" + "wechat.send", + }) + + t1.Reset(time.Second * 10) + } + } + + wg.Done() +} + +func testWorker(wg *sync.WaitGroup) { + + server := NewCron(&CronConfig{ + Address: "localhost:8888", + }) + defer server.Conn.Close() + + // 订阅服务,传入参数是 cron: + // 会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 + stream, err := server.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{ + Value: "cron:", + Ip: "127.0.0.1", + }) + if err != nil { + log.Printf("[跑业务]发送失败:%v\n", err) + } + + // 阻塞遍历流,输出结果 + for { + reply, err := stream.Recv() + if io.EOF == err { + log.Println("[跑业务]已关闭:", err.Error()) + break + } + if nil != err { + log.Println("[跑业务]异常:", err.Error()) + break + } + log.Println("[跑业务]:", reply) + } + + wg.Done() +} diff --git a/pb/pubsub.pb.go b/pb/pubsub.pb.go index 90412c5..0e478b7 100644 --- a/pb/pubsub.pb.go +++ b/pb/pubsub.pb.go @@ -25,7 +25,7 @@ const ( ) // 消息 -type String struct { +type PublishRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -33,8 +33,8 @@ type String struct { Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` } -func (x *String) Reset() { - *x = String{} +func (x *PublishRequest) Reset() { + *x = PublishRequest{} if protoimpl.UnsafeEnabled { mi := &file_pb_pubsub_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -42,13 +42,13 @@ func (x *String) Reset() { } } -func (x *String) String() string { +func (x *PublishRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*String) ProtoMessage() {} +func (*PublishRequest) ProtoMessage() {} -func (x *String) ProtoReflect() protoreflect.Message { +func (x *PublishRequest) ProtoReflect() protoreflect.Message { mi := &file_pb_pubsub_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -60,31 +60,153 @@ func (x *String) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use String.ProtoReflect.Descriptor instead. -func (*String) Descriptor() ([]byte, []int) { +// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead. +func (*PublishRequest) Descriptor() ([]byte, []int) { return file_pb_pubsub_proto_rawDescGZIP(), []int{0} } -func (x *String) GetValue() string { +func (x *PublishRequest) GetValue() string { if x != nil { return x.Value } return "" } +// 请求消息 +type SubscribeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` + Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"` +} + +func (x *SubscribeRequest) Reset() { + *x = SubscribeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pb_pubsub_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeRequest) ProtoMessage() {} + +func (x *SubscribeRequest) ProtoReflect() protoreflect.Message { + mi := &file_pb_pubsub_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead. +func (*SubscribeRequest) Descriptor() ([]byte, []int) { + return file_pb_pubsub_proto_rawDescGZIP(), []int{1} +} + +func (x *SubscribeRequest) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +func (x *SubscribeRequest) GetIp() string { + if x != nil { + return x.Ip + } + return "" +} + +// 响应消息 +type SubscribeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` + Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"` +} + +func (x *SubscribeResponse) Reset() { + *x = SubscribeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pb_pubsub_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubscribeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubscribeResponse) ProtoMessage() {} + +func (x *SubscribeResponse) ProtoReflect() protoreflect.Message { + mi := &file_pb_pubsub_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead. +func (*SubscribeResponse) Descriptor() ([]byte, []int) { + return file_pb_pubsub_proto_rawDescGZIP(), []int{2} +} + +func (x *SubscribeResponse) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +func (x *SubscribeResponse) GetIp() string { + if x != nil { + return x.Ip + } + return "" +} + var File_pb_pubsub_proto protoreflect.FileDescriptor var file_pb_pubsub_proto_rawDesc = []byte{ 0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x1e, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, - 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, - 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x52, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x12, - 0x21, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x0a, 0x2e, 0x70, 0x62, 0x2e, - 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x0a, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x72, 0x69, - 0x6e, 0x67, 0x12, 0x25, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, - 0x0a, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x0a, 0x2e, 0x70, 0x62, - 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f, - 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x26, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x38, 0x0a, + 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x39, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x70, 0x32, 0x77, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x12, 0x31, 0x0a, 0x07, + 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, + 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x62, + 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x3a, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x14, 0x2e, 0x70, + 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, + 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -99,15 +221,17 @@ func file_pb_pubsub_proto_rawDescGZIP() []byte { return file_pb_pubsub_proto_rawDescData } -var file_pb_pubsub_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_pb_pubsub_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_pb_pubsub_proto_goTypes = []interface{}{ - (*String)(nil), // 0: pb.String + (*PublishRequest)(nil), // 0: pb.PublishRequest + (*SubscribeRequest)(nil), // 1: pb.SubscribeRequest + (*SubscribeResponse)(nil), // 2: pb.SubscribeResponse } var file_pb_pubsub_proto_depIdxs = []int32{ - 0, // 0: pb.PubSub.Publish:input_type -> pb.String - 0, // 1: pb.PubSub.Subscribe:input_type -> pb.String - 0, // 2: pb.PubSub.Publish:output_type -> pb.String - 0, // 3: pb.PubSub.Subscribe:output_type -> pb.String + 0, // 0: pb.PubSub.Publish:input_type -> pb.PublishRequest + 1, // 1: pb.PubSub.Subscribe:input_type -> pb.SubscribeRequest + 0, // 2: pb.PubSub.Publish:output_type -> pb.PublishRequest + 2, // 3: pb.PubSub.Subscribe:output_type -> pb.SubscribeResponse 2, // [2:4] is the sub-list for method output_type 0, // [0:2] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name @@ -122,7 +246,31 @@ func file_pb_pubsub_proto_init() { } if !protoimpl.UnsafeEnabled { file_pb_pubsub_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*String); i { + switch v := v.(*PublishRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pb_pubsub_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pb_pubsub_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SubscribeResponse); i { case 0: return &v.state case 1: @@ -140,7 +288,7 @@ func file_pb_pubsub_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pb_pubsub_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, diff --git a/pb/pubsub.proto b/pb/pubsub.proto index f5d1aa1..93c99cd 100644 --- a/pb/pubsub.proto +++ b/pb/pubsub.proto @@ -10,12 +10,24 @@ option go_package = "../pb"; // 定义服务 service PubSub { // [发布] 消息 - rpc Publish (String) returns (String); + rpc Publish (PublishRequest) returns (PublishRequest); // [订阅] 消息 - rpc Subscribe (String) returns (stream String); + rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse); } // 消息 -message String { +message PublishRequest { string value = 1; } + +// 请求消息 +message SubscribeRequest { + string value = 1; + string ip = 2; +} + +// 响应消息 +message SubscribeResponse { + string value = 1; + string ip = 2; +} diff --git a/pb/pubsub.server.go b/pb/pubsub.server.go index abde775..38273f8 100644 --- a/pb/pubsub.server.go +++ b/pb/pubsub.server.go @@ -21,26 +21,26 @@ func NewPubSubServerService() *PubSubServerService { } // Publish 实现发布方法 -func (p *PubSubServerService) Publish(ctx context.Context, arg *String) (*String, error) { - log.Printf("[服务中转]:%v\n", arg.GetValue()) +func (p *PubSubServerService) Publish(ctx context.Context, req *PublishRequest) (*PublishRequest, error) { + log.Printf("[服务中转]:%v\n", req.GetValue()) // 发布消息 - p.pub.Publish(arg.GetValue()) - return &String{Value: arg.GetValue()}, nil + p.pub.Publish(req.GetValue()) + return &PublishRequest{Value: req.GetValue()}, nil } // Subscribe 实现订阅方法 -func (p *PubSubServerService) Subscribe(arg *String, stream PubSub_SubscribeServer) error { +func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_SubscribeServer) error { // SubscribeTopic 增加一个使用函数过滤器的订阅者 // func(v interface{}) 定义函数过滤的规则 // SubscribeTopic 返回一个chan interface{} - log.Printf("[服务中转]收到任务:%v\n", arg.GetValue()) + log.Printf("[服务中转]收到任务:%v\n", req.GetValue()) ch := p.pub.SubscribeTopic(func(v interface{}) bool { // 接收数据是string,并且key是以arg为前缀的 if key, ok := v.(string); ok { - if strings.HasPrefix(key, arg.GetValue()) { + if strings.HasPrefix(key, req.GetValue()) { return true } } @@ -52,7 +52,10 @@ func (p *PubSubServerService) Subscribe(arg *String, stream PubSub_SubscribeServ // 服务器遍历chan,并将其中信息发送给订阅客户端 for v := range ch { - err := stream.Send(&String{Value: v.(string)}) + err := stream.Send(&SubscribeResponse{ + Value: v.(string), + Ip: "", + }) if err != nil { log.Println("[服务中转]任务分配失败:", err.Error()) return err diff --git a/pb/pubsub_grpc.pb.go b/pb/pubsub_grpc.pb.go index 5fcdaec..78d6ddf 100644 --- a/pb/pubsub_grpc.pb.go +++ b/pb/pubsub_grpc.pb.go @@ -23,9 +23,9 @@ const _ = grpc.SupportPackageIsVersion7 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type PubSubClient interface { // [发布] 消息 - Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) + Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishRequest, error) // [订阅] 消息 - Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubSub_SubscribeClient, error) + Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (PubSub_SubscribeClient, error) } type pubSubClient struct { @@ -36,8 +36,8 @@ func NewPubSubClient(cc grpc.ClientConnInterface) PubSubClient { return &pubSubClient{cc} } -func (c *pubSubClient) Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) { - out := new(String) +func (c *pubSubClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishRequest, error) { + out := new(PublishRequest) err := c.cc.Invoke(ctx, "/pb.PubSub/Publish", in, out, opts...) if err != nil { return nil, err @@ -45,7 +45,7 @@ func (c *pubSubClient) Publish(ctx context.Context, in *String, opts ...grpc.Cal return out, nil } -func (c *pubSubClient) Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubSub_SubscribeClient, error) { +func (c *pubSubClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (PubSub_SubscribeClient, error) { stream, err := c.cc.NewStream(ctx, &PubSub_ServiceDesc.Streams[0], "/pb.PubSub/Subscribe", opts...) if err != nil { return nil, err @@ -61,7 +61,7 @@ func (c *pubSubClient) Subscribe(ctx context.Context, in *String, opts ...grpc.C } type PubSub_SubscribeClient interface { - Recv() (*String, error) + Recv() (*SubscribeResponse, error) grpc.ClientStream } @@ -69,8 +69,8 @@ type pubSubSubscribeClient struct { grpc.ClientStream } -func (x *pubSubSubscribeClient) Recv() (*String, error) { - m := new(String) +func (x *pubSubSubscribeClient) Recv() (*SubscribeResponse, error) { + m := new(SubscribeResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err } @@ -82,9 +82,9 @@ func (x *pubSubSubscribeClient) Recv() (*String, error) { // for forward compatibility type PubSubServer interface { // [发布] 消息 - Publish(context.Context, *String) (*String, error) + Publish(context.Context, *PublishRequest) (*PublishRequest, error) // [订阅] 消息 - Subscribe(*String, PubSub_SubscribeServer) error + Subscribe(*SubscribeRequest, PubSub_SubscribeServer) error mustEmbedUnimplementedPubSubServer() } @@ -92,10 +92,10 @@ type PubSubServer interface { type UnimplementedPubSubServer struct { } -func (UnimplementedPubSubServer) Publish(context.Context, *String) (*String, error) { +func (UnimplementedPubSubServer) Publish(context.Context, *PublishRequest) (*PublishRequest, error) { return nil, status.Errorf(codes.Unimplemented, "method Publish not implemented") } -func (UnimplementedPubSubServer) Subscribe(*String, PubSub_SubscribeServer) error { +func (UnimplementedPubSubServer) Subscribe(*SubscribeRequest, PubSub_SubscribeServer) error { return status.Errorf(codes.Unimplemented, "method Subscribe not implemented") } func (UnimplementedPubSubServer) mustEmbedUnimplementedPubSubServer() {} @@ -112,7 +112,7 @@ func RegisterPubSubServer(s grpc.ServiceRegistrar, srv PubSubServer) { } func _PubSub_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(String) + in := new(PublishRequest) if err := dec(in); err != nil { return nil, err } @@ -124,13 +124,13 @@ func _PubSub_Publish_Handler(srv interface{}, ctx context.Context, dec func(inte FullMethod: "/pb.PubSub/Publish", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(PubSubServer).Publish(ctx, req.(*String)) + return srv.(PubSubServer).Publish(ctx, req.(*PublishRequest)) } return interceptor(ctx, in, info, handler) } func _PubSub_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error { - m := new(String) + m := new(SubscribeRequest) if err := stream.RecvMsg(m); err != nil { return err } @@ -138,7 +138,7 @@ func _PubSub_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error } type PubSub_SubscribeServer interface { - Send(*String) error + Send(*SubscribeResponse) error grpc.ServerStream } @@ -146,7 +146,7 @@ type pubSubSubscribeServer struct { grpc.ServerStream } -func (x *pubSubSubscribeServer) Send(m *String) error { +func (x *pubSubSubscribeServer) Send(m *SubscribeResponse) error { return x.ServerStream.SendMsg(m) }