- bug
continuous-integration/drone/push Build was killed Details

master
李光春 2 years ago
parent 5cf6101d2b
commit 736c82d8af

@ -0,0 +1,18 @@
package gojobs
import (
"fmt"
"go.dtapp.net/goip"
)
var ip string
func configIp() {
ip = goip.GetOutsideIp()
}
const prefix = "cron_%s:"
func prefixSprintf() string {
return fmt.Sprintf(prefix, ip)
}

@ -45,7 +45,7 @@ func NewCron(config *CronConfig) *Cron {
}
// Send 发送
func (c *Cron) Send(in *pb.String) (*pb.String, error) {
func (c *Cron) Send(in *pb.PublishRequest) (*pb.PublishRequest, error) {
stream, err := c.Pub.Publish(context.Background(), in)
if err != nil {
log.Printf("[定时任务]发送失败:%v\n", err)

@ -9,6 +9,13 @@ require (
require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda // indirect
github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f // indirect
github.com/ulikunitz/xz v0.5.10 // indirect
go.dtapp.net/goip v1.0.16 // indirect
go.dtapp.net/gorequest v1.0.18 // indirect
go.dtapp.net/gostring v1.0.3 // indirect
go.dtapp.net/gotime v1.0.2 // indirect
golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/text v0.3.7 // indirect

@ -48,9 +48,23 @@ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda h1:h+YpzUB/bGVJcLqW+d5GghcCmE/A25KbzjXvWJQi/+o=
github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda/go.mod h1:MSotTrCv1PwoR8QgU1JurEx+lNNbtr25I+m0zbLyAGw=
github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f h1:1cJITU3JUI8qNS5T0BlXwANsVdyoJQHQ4hvOxbunPCw=
github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f/go.mod h1:LyBTue+RWeyIfN3ZJ4wVxvDuvlGJtDgCLgCb6HCPgps=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/ulikunitz/xz v0.5.10 h1:t92gobL9l3HE202wg3rlk19F6X+JOxl9BBrCCMYEYd8=
github.com/ulikunitz/xz v0.5.10/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
go.dtapp.net/goip v1.0.16 h1:jJoXeLVc8BmlKEc+4T9mL2BFK63RJFd4B9xTMYhFRqg=
go.dtapp.net/goip v1.0.16/go.mod h1:BY2Xo5clizPZFQ8CYOlgg91fHMZR1Ll54f3P0sNHxbg=
go.dtapp.net/gorequest v1.0.18 h1:NAogmkEbz4Sln4tt6Li8tF99d3WnHMkbPuYFdNz/xTE=
go.dtapp.net/gorequest v1.0.18/go.mod h1:EwOfdfxsWPszOWrphCWHTN4DbYtU6fyQ/fuWQyQwSnk=
go.dtapp.net/gostring v1.0.3 h1:KSOq4D77/g5yZN/bqWfZ0kOOaPr/P1240vg03+XdENI=
go.dtapp.net/gostring v1.0.3/go.mod h1:+ggrOvgQDQturi1QGsXEpyRN/ZPoRDaqhMujIk5lrgQ=
go.dtapp.net/gotime v1.0.2 h1:CFIJHQXC/4t9bsJhk2cLhjHd6rpdPcJXr8BcHKHDuQo=
go.dtapp.net/gotime v1.0.2/go.mod h1:Gq7eNLr2iMLP18UNWONRq4V3Uhf/ADp4bIrS+Tc6ktY=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=

@ -0,0 +1,113 @@
package gojobs
import (
"context"
"go.dtapp.net/gojobs/pb"
"io"
"log"
"strings"
"sync"
"testing"
"time"
)
func TestJobs(t *testing.T) {
testServer()
}
func testServer() {
server := NewServer(&ServerConfig{
PublishTimeout: time.Millisecond * 100,
PubBuffer: 10,
Address: "0.0.0.0:8888",
})
cronServer := server.Pub.SubscribeTopic(func(v interface{}) bool {
log.Println("SubscribeTopic:", v)
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "cron:") {
return true
}
}
return false
})
go func() {
log.Println("cronServertopic:", <-cronServer)
}()
err := server.StartUp()
if err != nil {
log.Panicf("创建服务失败:%v\n", err)
return
}
<-make(chan bool)
}
func TestClient(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(2)
go testCron(&wg)
go testWorker(&wg)
wg.Wait()
}
func testCron(wg *sync.WaitGroup) {
server := NewCron(&CronConfig{
Address: "localhost:8888",
})
defer server.Conn.Close()
t1 := time.NewTimer(time.Second * 10)
for {
select {
case <-t1.C:
server.Send(&pb.PublishRequest{
Value: "cron:" + "wechat.send",
})
t1.Reset(time.Second * 10)
}
}
wg.Done()
}
func testWorker(wg *sync.WaitGroup) {
server := NewCron(&CronConfig{
Address: "localhost:8888",
})
defer server.Conn.Close()
// 订阅服务,传入参数是 cron:
// 会想过滤器函数,订阅者应该收到的信息为 cron:任务名称
stream, err := server.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{
Value: "cron:",
Ip: "127.0.0.1",
})
if err != nil {
log.Printf("[跑业务]发送失败:%v\n", err)
}
// 阻塞遍历流,输出结果
for {
reply, err := stream.Recv()
if io.EOF == err {
log.Println("[跑业务]已关闭:", err.Error())
break
}
if nil != err {
log.Println("[跑业务]异常:", err.Error())
break
}
log.Println("[跑业务]:", reply)
}
wg.Done()
}

@ -25,7 +25,7 @@ const (
)
// 消息
type String struct {
type PublishRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
@ -33,8 +33,8 @@ type String struct {
Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
}
func (x *String) Reset() {
*x = String{}
func (x *PublishRequest) Reset() {
*x = PublishRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -42,13 +42,13 @@ func (x *String) Reset() {
}
}
func (x *String) String() string {
func (x *PublishRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*String) ProtoMessage() {}
func (*PublishRequest) ProtoMessage() {}
func (x *String) ProtoReflect() protoreflect.Message {
func (x *PublishRequest) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -60,31 +60,153 @@ func (x *String) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
// Deprecated: Use String.ProtoReflect.Descriptor instead.
func (*String) Descriptor() ([]byte, []int) {
// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead.
func (*PublishRequest) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{0}
}
func (x *String) GetValue() string {
func (x *PublishRequest) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
// 请求消息
type SubscribeRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"`
}
func (x *SubscribeRequest) Reset() {
*x = SubscribeRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SubscribeRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SubscribeRequest) ProtoMessage() {}
func (x *SubscribeRequest) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead.
func (*SubscribeRequest) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{1}
}
func (x *SubscribeRequest) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
func (x *SubscribeRequest) GetIp() string {
if x != nil {
return x.Ip
}
return ""
}
// 响应消息
type SubscribeResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
Ip string `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"`
}
func (x *SubscribeResponse) Reset() {
*x = SubscribeResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SubscribeResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SubscribeResponse) ProtoMessage() {}
func (x *SubscribeResponse) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead.
func (*SubscribeResponse) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{2}
}
func (x *SubscribeResponse) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
func (x *SubscribeResponse) GetIp() string {
if x != nil {
return x.Ip
}
return ""
}
var File_pb_pubsub_proto protoreflect.FileDescriptor
var file_pb_pubsub_proto_rawDesc = []byte{
0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x1e, 0x0a, 0x06, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12,
0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x52, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x12,
0x21, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x0a, 0x2e, 0x70, 0x62, 0x2e,
0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x0a, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x72, 0x69,
0x6e, 0x67, 0x12, 0x25, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12,
0x0a, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x1a, 0x0a, 0x2e, 0x70, 0x62,
0x2e, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f,
0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x26, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x38, 0x0a,
0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x39, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63,
0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05,
0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c,
0x75, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02,
0x69, 0x70, 0x32, 0x77, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x12, 0x31, 0x0a, 0x07,
0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x62,
0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x62,
0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
0x3a, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x14, 0x2e, 0x70,
0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62,
0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e,
0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -99,15 +221,17 @@ func file_pb_pubsub_proto_rawDescGZIP() []byte {
return file_pb_pubsub_proto_rawDescData
}
var file_pb_pubsub_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_pb_pubsub_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_pb_pubsub_proto_goTypes = []interface{}{
(*String)(nil), // 0: pb.String
(*PublishRequest)(nil), // 0: pb.PublishRequest
(*SubscribeRequest)(nil), // 1: pb.SubscribeRequest
(*SubscribeResponse)(nil), // 2: pb.SubscribeResponse
}
var file_pb_pubsub_proto_depIdxs = []int32{
0, // 0: pb.PubSub.Publish:input_type -> pb.String
0, // 1: pb.PubSub.Subscribe:input_type -> pb.String
0, // 2: pb.PubSub.Publish:output_type -> pb.String
0, // 3: pb.PubSub.Subscribe:output_type -> pb.String
0, // 0: pb.PubSub.Publish:input_type -> pb.PublishRequest
1, // 1: pb.PubSub.Subscribe:input_type -> pb.SubscribeRequest
0, // 2: pb.PubSub.Publish:output_type -> pb.PublishRequest
2, // 3: pb.PubSub.Subscribe:output_type -> pb.SubscribeResponse
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
@ -122,7 +246,31 @@ func file_pb_pubsub_proto_init() {
}
if !protoimpl.UnsafeEnabled {
file_pb_pubsub_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*String); i {
switch v := v.(*PublishRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_pubsub_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_pubsub_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeResponse); i {
case 0:
return &v.state
case 1:
@ -140,7 +288,7 @@ func file_pb_pubsub_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_pubsub_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},

@ -10,12 +10,24 @@ option go_package = "../pb";
//
service PubSub {
// []
rpc Publish (String) returns (String);
rpc Publish (PublishRequest) returns (PublishRequest);
// []
rpc Subscribe (String) returns (stream String);
rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse);
}
//
message String {
message PublishRequest {
string value = 1;
}
//
message SubscribeRequest {
string value = 1;
string ip = 2;
}
//
message SubscribeResponse {
string value = 1;
string ip = 2;
}

@ -21,26 +21,26 @@ func NewPubSubServerService() *PubSubServerService {
}
// Publish 实现发布方法
func (p *PubSubServerService) Publish(ctx context.Context, arg *String) (*String, error) {
log.Printf("[服务中转]%v\n", arg.GetValue())
func (p *PubSubServerService) Publish(ctx context.Context, req *PublishRequest) (*PublishRequest, error) {
log.Printf("[服务中转]%v\n", req.GetValue())
// 发布消息
p.pub.Publish(arg.GetValue())
return &String{Value: arg.GetValue()}, nil
p.pub.Publish(req.GetValue())
return &PublishRequest{Value: req.GetValue()}, nil
}
// Subscribe 实现订阅方法
func (p *PubSubServerService) Subscribe(arg *String, stream PubSub_SubscribeServer) error {
func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_SubscribeServer) error {
// SubscribeTopic 增加一个使用函数过滤器的订阅者
// func(v interface{}) 定义函数过滤的规则
// SubscribeTopic 返回一个chan interface{}
log.Printf("[服务中转]收到任务:%v\n", arg.GetValue())
log.Printf("[服务中转]收到任务:%v\n", req.GetValue())
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
// 接收数据是string并且key是以arg为前缀的
if key, ok := v.(string); ok {
if strings.HasPrefix(key, arg.GetValue()) {
if strings.HasPrefix(key, req.GetValue()) {
return true
}
}
@ -52,7 +52,10 @@ func (p *PubSubServerService) Subscribe(arg *String, stream PubSub_SubscribeServ
// 服务器遍历chan并将其中信息发送给订阅客户端
for v := range ch {
err := stream.Send(&String{Value: v.(string)})
err := stream.Send(&SubscribeResponse{
Value: v.(string),
Ip: "",
})
if err != nil {
log.Println("[服务中转]任务分配失败:", err.Error())
return err

@ -23,9 +23,9 @@ const _ = grpc.SupportPackageIsVersion7
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type PubSubClient interface {
// [发布] 消息
Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishRequest, error)
// [订阅] 消息
Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubSub_SubscribeClient, error)
Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (PubSub_SubscribeClient, error)
}
type pubSubClient struct {
@ -36,8 +36,8 @@ func NewPubSubClient(cc grpc.ClientConnInterface) PubSubClient {
return &pubSubClient{cc}
}
func (c *pubSubClient) Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) {
out := new(String)
func (c *pubSubClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishRequest, error) {
out := new(PublishRequest)
err := c.cc.Invoke(ctx, "/pb.PubSub/Publish", in, out, opts...)
if err != nil {
return nil, err
@ -45,7 +45,7 @@ func (c *pubSubClient) Publish(ctx context.Context, in *String, opts ...grpc.Cal
return out, nil
}
func (c *pubSubClient) Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubSub_SubscribeClient, error) {
func (c *pubSubClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (PubSub_SubscribeClient, error) {
stream, err := c.cc.NewStream(ctx, &PubSub_ServiceDesc.Streams[0], "/pb.PubSub/Subscribe", opts...)
if err != nil {
return nil, err
@ -61,7 +61,7 @@ func (c *pubSubClient) Subscribe(ctx context.Context, in *String, opts ...grpc.C
}
type PubSub_SubscribeClient interface {
Recv() (*String, error)
Recv() (*SubscribeResponse, error)
grpc.ClientStream
}
@ -69,8 +69,8 @@ type pubSubSubscribeClient struct {
grpc.ClientStream
}
func (x *pubSubSubscribeClient) Recv() (*String, error) {
m := new(String)
func (x *pubSubSubscribeClient) Recv() (*SubscribeResponse, error) {
m := new(SubscribeResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
@ -82,9 +82,9 @@ func (x *pubSubSubscribeClient) Recv() (*String, error) {
// for forward compatibility
type PubSubServer interface {
// [发布] 消息
Publish(context.Context, *String) (*String, error)
Publish(context.Context, *PublishRequest) (*PublishRequest, error)
// [订阅] 消息
Subscribe(*String, PubSub_SubscribeServer) error
Subscribe(*SubscribeRequest, PubSub_SubscribeServer) error
mustEmbedUnimplementedPubSubServer()
}
@ -92,10 +92,10 @@ type PubSubServer interface {
type UnimplementedPubSubServer struct {
}
func (UnimplementedPubSubServer) Publish(context.Context, *String) (*String, error) {
func (UnimplementedPubSubServer) Publish(context.Context, *PublishRequest) (*PublishRequest, error) {
return nil, status.Errorf(codes.Unimplemented, "method Publish not implemented")
}
func (UnimplementedPubSubServer) Subscribe(*String, PubSub_SubscribeServer) error {
func (UnimplementedPubSubServer) Subscribe(*SubscribeRequest, PubSub_SubscribeServer) error {
return status.Errorf(codes.Unimplemented, "method Subscribe not implemented")
}
func (UnimplementedPubSubServer) mustEmbedUnimplementedPubSubServer() {}
@ -112,7 +112,7 @@ func RegisterPubSubServer(s grpc.ServiceRegistrar, srv PubSubServer) {
}
func _PubSub_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(String)
in := new(PublishRequest)
if err := dec(in); err != nil {
return nil, err
}
@ -124,13 +124,13 @@ func _PubSub_Publish_Handler(srv interface{}, ctx context.Context, dec func(inte
FullMethod: "/pb.PubSub/Publish",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PubSubServer).Publish(ctx, req.(*String))
return srv.(PubSubServer).Publish(ctx, req.(*PublishRequest))
}
return interceptor(ctx, in, info, handler)
}
func _PubSub_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(String)
m := new(SubscribeRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
@ -138,7 +138,7 @@ func _PubSub_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error
}
type PubSub_SubscribeServer interface {
Send(*String) error
Send(*SubscribeResponse) error
grpc.ServerStream
}
@ -146,7 +146,7 @@ type pubSubSubscribeServer struct {
grpc.ServerStream
}
func (x *pubSubSubscribeServer) Send(m *String) error {
func (x *pubSubSubscribeServer) Send(m *SubscribeResponse) error {
return x.ServerStream.SendMsg(m)
}

Loading…
Cancel
Save