- repair bug

master
李光春 2 years ago
parent ec93d383b3
commit cdc7ff7fbf

@ -24,13 +24,15 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// 消息
// 请求消息
type PublishRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Ip string `protobuf:"bytes,3,opt,name=ip,proto3" json:"ip,omitempty"`
}
func (x *PublishRequest) Reset() {
@ -65,6 +67,13 @@ func (*PublishRequest) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{0}
}
func (x *PublishRequest) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *PublishRequest) GetValue() string {
if x != nil {
return x.Value
@ -72,20 +81,92 @@ func (x *PublishRequest) GetValue() string {
return ""
}
func (x *PublishRequest) GetIp() string {
if x != nil {
return x.Ip
}
return ""
}
// 响应消息
type PublishResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Ip string `protobuf:"bytes,3,opt,name=ip,proto3" json:"ip,omitempty"`
}
func (x *PublishResponse) Reset() {
*x = PublishResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PublishResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PublishResponse) ProtoMessage() {}
func (x *PublishResponse) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead.
func (*PublishResponse) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{1}
}
func (x *PublishResponse) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *PublishResponse) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
func (x *PublishResponse) GetIp() string {
if x != nil {
return x.Ip
}
return ""
}
// 请求消息
type SubscribeRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"`
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Ip string `protobuf:"bytes,3,opt,name=ip,proto3" json:"ip,omitempty"`
}
func (x *SubscribeRequest) Reset() {
*x = SubscribeRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[1]
mi := &file_pb_pubsub_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -98,7 +179,7 @@ func (x *SubscribeRequest) String() string {
func (*SubscribeRequest) ProtoMessage() {}
func (x *SubscribeRequest) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[1]
mi := &file_pb_pubsub_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -111,7 +192,14 @@ func (x *SubscribeRequest) ProtoReflect() protoreflect.Message {
// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead.
func (*SubscribeRequest) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{1}
return file_pb_pubsub_proto_rawDescGZIP(), []int{2}
}
func (x *SubscribeRequest) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *SubscribeRequest) GetValue() string {
@ -134,14 +222,15 @@ type SubscribeResponse struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"`
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Ip string `protobuf:"bytes,3,opt,name=ip,proto3" json:"ip,omitempty"`
}
func (x *SubscribeResponse) Reset() {
*x = SubscribeResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[2]
mi := &file_pb_pubsub_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -154,7 +243,7 @@ func (x *SubscribeResponse) String() string {
func (*SubscribeResponse) ProtoMessage() {}
func (x *SubscribeResponse) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[2]
mi := &file_pb_pubsub_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -167,7 +256,14 @@ func (x *SubscribeResponse) ProtoReflect() protoreflect.Message {
// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead.
func (*SubscribeResponse) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{2}
return file_pb_pubsub_proto_rawDescGZIP(), []int{3}
}
func (x *SubscribeResponse) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *SubscribeResponse) GetValue() string {
@ -188,25 +284,34 @@ var File_pb_pubsub_proto protoreflect.FileDescriptor
var file_pb_pubsub_proto_rawDesc = []byte{
0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x26, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x38, 0x0a,
0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x39, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63,
0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c,
0x75, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02,
0x69, 0x70, 0x32, 0x77, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x12, 0x31, 0x0a, 0x07,
0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62,
0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x62,
0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
0x3a, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x14, 0x2e, 0x70,
0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e,
0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x46, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, 0x0a,
0x02, 0x69, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x47, 0x0a,
0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64,
0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52,
0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x03, 0x20, 0x01,
0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x48, 0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61,
0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70,
0x22, 0x49, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69,
0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x32, 0x78, 0x0a, 0x06, 0x50,
0x75, 0x62, 0x53, 0x75, 0x62, 0x12, 0x32, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68,
0x12, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73,
0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x09, 0x53, 0x75, 0x62,
0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x14, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73,
0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70,
0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -221,17 +326,18 @@ func file_pb_pubsub_proto_rawDescGZIP() []byte {
return file_pb_pubsub_proto_rawDescData
}
var file_pb_pubsub_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_pb_pubsub_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_pb_pubsub_proto_goTypes = []interface{}{
(*PublishRequest)(nil), // 0: pb.PublishRequest
(*SubscribeRequest)(nil), // 1: pb.SubscribeRequest
(*SubscribeResponse)(nil), // 2: pb.SubscribeResponse
(*PublishResponse)(nil), // 1: pb.PublishResponse
(*SubscribeRequest)(nil), // 2: pb.SubscribeRequest
(*SubscribeResponse)(nil), // 3: pb.SubscribeResponse
}
var file_pb_pubsub_proto_depIdxs = []int32{
0, // 0: pb.PubSub.Publish:input_type -> pb.PublishRequest
1, // 1: pb.PubSub.Subscribe:input_type -> pb.SubscribeRequest
0, // 2: pb.PubSub.Publish:output_type -> pb.PublishRequest
2, // 3: pb.PubSub.Subscribe:output_type -> pb.SubscribeResponse
2, // 1: pb.PubSub.Subscribe:input_type -> pb.SubscribeRequest
1, // 2: pb.PubSub.Publish:output_type -> pb.PublishResponse
3, // 3: pb.PubSub.Subscribe:output_type -> pb.SubscribeResponse
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
@ -258,7 +364,7 @@ func file_pb_pubsub_proto_init() {
}
}
file_pb_pubsub_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeRequest); i {
switch v := v.(*PublishResponse); i {
case 0:
return &v.state
case 1:
@ -270,6 +376,18 @@ func file_pb_pubsub_proto_init() {
}
}
file_pb_pubsub_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_pubsub_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeResponse); i {
case 0:
return &v.state
@ -288,7 +406,7 @@ func file_pb_pubsub_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_pubsub_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumMessages: 4,
NumExtensions: 0,
NumServices: 1,
},

@ -21,11 +21,17 @@ func NewPubSubServerService() *PubSubServerService {
}
// Publish 实现发布方法
func (p *PubSubServerService) Publish(ctx context.Context, req *PublishRequest) (*PublishRequest, error) {
log.Printf("[服务中转]%v\n", req.GetValue())
func (p *PubSubServerService) Publish(ctx context.Context, req *PublishRequest) (*PublishResponse, error) {
log.Printf("[服务中转]{发布} 编号:%s 类型:%s ip地址%s\n", req.GetId(), req.GetValue(), req.GetIp())
// 发布消息
p.pub.Publish(req.GetValue())
return &PublishRequest{Value: req.GetValue()}, nil
return &PublishResponse{
Id: req.GetId(),
Value: req.GetValue(),
Ip: req.GetIp(),
}, nil
}
// Subscribe 实现订阅方法
@ -35,9 +41,13 @@ func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_Sub
// func(v interface{}) 定义函数过滤的规则
// SubscribeTopic 返回一个chan interface{}
log.Printf("[服务中转]收到任务:%v\n", req.GetValue())
log.Printf("[服务中转]{订阅} 编号:%s 类型:%s ip地址%s\n", req.GetId(), req.GetValue(), req.GetIp())
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
log.Printf("[服务中转]{订阅} 主题:%v\n", v)
log.Printf("[服务中转]{订阅} 主题:%+v\n", v)
// 接收数据是string并且key是以arg为前缀的
if key, ok := v.(string); ok {
if strings.HasPrefix(key, req.GetValue()) {
@ -47,17 +57,19 @@ func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_Sub
return false
})
log.Println("[服务中转]工作线:", ch)
log.Println("[服务中转]工作线数量:", p.pub.Len())
log.Println("[服务中转]{订阅} 工作线:", ch)
log.Println("[服务中转]{订阅} 当前工作线数量:", p.pub.Len())
// 服务器遍历chan并将其中信息发送给订阅客户端
for v := range ch {
log.Println("[服务中转] for ch", ch)
log.Println("[服务中转] for v", v)
err := stream.Send(&SubscribeResponse{
Value: v.(string),
Ip: "",
})
if err != nil {
log.Println("[服务中转]任务分配失败:", err.Error())
log.Println("[服务中转]{订阅} 任务分配失败 ", err.Error())
return err
}
}

@ -23,7 +23,7 @@ const _ = grpc.SupportPackageIsVersion7
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type PubSubClient interface {
// [发布] 消息
Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishRequest, error)
Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishResponse, error)
// [订阅] 消息
Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (PubSub_SubscribeClient, error)
}
@ -36,8 +36,8 @@ func NewPubSubClient(cc grpc.ClientConnInterface) PubSubClient {
return &pubSubClient{cc}
}
func (c *pubSubClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishRequest, error) {
out := new(PublishRequest)
func (c *pubSubClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishResponse, error) {
out := new(PublishResponse)
err := c.cc.Invoke(ctx, "/pb.PubSub/Publish", in, out, opts...)
if err != nil {
return nil, err
@ -82,7 +82,7 @@ func (x *pubSubSubscribeClient) Recv() (*SubscribeResponse, error) {
// for forward compatibility
type PubSubServer interface {
// [发布] 消息
Publish(context.Context, *PublishRequest) (*PublishRequest, error)
Publish(context.Context, *PublishRequest) (*PublishResponse, error)
// [订阅] 消息
Subscribe(*SubscribeRequest, PubSub_SubscribeServer) error
mustEmbedUnimplementedPubSubServer()
@ -92,7 +92,7 @@ type PubSubServer interface {
type UnimplementedPubSubServer struct {
}
func (UnimplementedPubSubServer) Publish(context.Context, *PublishRequest) (*PublishRequest, error) {
func (UnimplementedPubSubServer) Publish(context.Context, *PublishRequest) (*PublishResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Publish not implemented")
}
func (UnimplementedPubSubServer) Subscribe(*SubscribeRequest, PubSub_SubscribeServer) error {

Loading…
Cancel
Save