- service:优化

- utils:优化
master
李光春 2 years ago
parent 1e97205c34
commit 8f98d88385

@ -0,0 +1,38 @@
package gojobs
import (
"google.golang.org/grpc"
)
// ClientConfig 客户端配置
type ClientConfig struct {
Address string // 服务端口 127.0.0.1:8888
}
// Client 定时任务
type Client struct {
ClientConfig // 配置
Conn *grpc.ClientConn // 链接信息
}
// NewClient 创建客户端
func NewClient(config *ClientConfig) *Client {
if config.Address == "" {
panic("[客户端]请填写服务端口")
}
c := &Client{}
c.Address = config.Address
var err error
// 建立连接 获取client
c.Conn, err = grpc.Dial(c.Address, grpc.WithInsecure())
if err != nil {
panic("[客户端]{连接失败}" + err.Error())
}
return c
}

@ -0,0 +1,20 @@
package gojobs
import (
"fmt"
"go.dtapp.net/library/utils/goip"
)
var ip string
func configIp() {
ip = goip.GetOutsideIp()
}
const prefix = "cron:"
const prefixIp = "cron_%s:"
func prefixSprintf(str string) string {
return fmt.Sprintf(prefixIp, str)
}

@ -0,0 +1,56 @@
package gojobs
import (
"context"
"go.dtapp.net/library/utils/gojobs/pb"
"google.golang.org/grpc"
"log"
)
// CronConfig 定时任务配置
type CronConfig struct {
Address string // 服务端口 127.0.0.1:8888
}
// Cron 定时任务
type Cron struct {
CronConfig // 配置
Pub pb.PubSubClient // 订阅
Conn *grpc.ClientConn // 链接信息
}
// NewCron 创建定时任务
func NewCron(config *CronConfig) *Cron {
if config.Address == "" {
panic("[定时任务]请填写服务端口")
}
c := &Cron{}
c.Address = config.Address
var err error
// 建立连接 获取client
c.Conn, err = grpc.Dial(c.Address, grpc.WithInsecure())
if err != nil {
panic("[定时任务]{连接失败}" + err.Error())
}
// 新建一个客户端
c.Pub = pb.NewPubSubClient(c.Conn)
return c
}
// Send 发送
func (c *Cron) Send(in *pb.PublishRequest) (*pb.PublishResponse, error) {
log.Printf("[定时任务]{广播开始}编号:%s 类型:%s ip%s\n", in.GetId(), in.GetValue(), in.GetIp())
stream, err := c.Pub.Publish(context.Background(), in)
if err != nil {
log.Printf("[定时任务]{广播失败}编号:%s %v\n", in.GetId(), err)
}
log.Printf("[定时任务]{广播成功}编号:%s 类型:%s ip%s\n", in.GetId(), in.GetValue(), in.GetIp())
return stream, err
}

@ -0,0 +1,44 @@
package gojobs
import (
"go.etcd.io/etcd/client/v3"
"time"
)
// EtcdConfig etcd配置
type EtcdConfig struct {
Endpoints []string // 接口 []string{"http://127.0.0.1:2379"}
DialTimeout time.Duration // time.Second * 5
LocalIP string // 本机IP
}
// Etcd etcd
type Etcd struct {
EtcdConfig // 配置
Client *clientv3.Client // 驱动
Kv clientv3.KV // kv API子集
Lease clientv3.Lease // lease租约对象
leaseId clientv3.LeaseID // 租约编号
}
// Close 关闭
func (e Etcd) Close() {
e.Client.Close()
}
const (
// JobSaveDir 定时任务任务保存目录
JobSaveDir = "/cron/jobs/"
// JobWorkerDir 服务注册目录
JobWorkerDir = "/cron/workers/"
)
// GetWatchKey 监听的key
func (e Etcd) GetWatchKey() string {
return JobSaveDir + e.LocalIP
}
// IssueWatchKey 下发的key
func (e Etcd) IssueWatchKey(ip string) string {
return JobSaveDir + ip
}

@ -0,0 +1,37 @@
package gojobs
import (
"context"
"go.etcd.io/etcd/client/v3"
"log"
)
// Watch 监听
func (e Etcd) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
log.Println("监听:", key)
return e.Client.Watch(ctx, key, opts...)
}
// Create 创建
func (e Etcd) Create(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
log.Println("创建:", key, val)
return e.Client.Put(ctx, key, val, opts...)
}
// Get 获取
func (e Etcd) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
log.Println("获取:", key)
return e.Client.Get(ctx, key, opts...)
}
// Update 更新
func (e Etcd) Update(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
log.Println("更新:", key, val)
return e.Client.Put(ctx, key, val, opts...)
}
// Delete 删除
func (e Etcd) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
log.Println("删除:", key)
return e.Client.Delete(ctx, key, opts...)
}

@ -0,0 +1,68 @@
package gojobs
import (
"context"
"errors"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
"strings"
)
// NewEtcdServer 创建 etcd server
func NewEtcdServer(config *EtcdConfig) (*Etcd, error) {
var (
e = &Etcd{}
err error
)
e.Endpoints = config.Endpoints
e.DialTimeout = config.DialTimeout
e.LocalIP = config.LocalIP
e.Client, err = clientv3.New(clientv3.Config{
Endpoints: e.Endpoints,
DialTimeout: e.DialTimeout,
})
if err != nil {
return nil, errors.New("连接失败:" + err.Error())
}
// kv API子集
e.Kv = clientv3.NewKV(e.Client)
// 创建一个lease租约对象
e.Lease = clientv3.NewLease(e.Client)
return e, nil
}
// ListWorkers 获取在线worker列表
func (e Etcd) ListWorkers() (workerArr []string, err error) {
var (
getResp *clientv3.GetResponse
kv *mvccpb.KeyValue
workerIP string
)
// 初始化数组
workerArr = make([]string, 0)
// 获取目录下所有Kv
if getResp, err = e.Kv.Get(context.TODO(), JobWorkerDir, clientv3.WithPrefix()); err != nil {
return
}
// 解析每个节点的IP
for _, kv = range getResp.Kvs {
// kv.Key : /cron/workers/192.168.2.1
workerIP = ExtractWorkerIP(string(kv.Key))
workerArr = append(workerArr, workerIP)
}
return
}
// ExtractWorkerIP 提取worker的IP
func ExtractWorkerIP(regKey string) string {
return strings.TrimPrefix(regKey, JobWorkerDir)
}

@ -0,0 +1,110 @@
package gojobs
import (
"context"
"errors"
"fmt"
"go.dtapp.net/library/utils/goip"
"go.etcd.io/etcd/client/v3"
"log"
"time"
)
// NewEtcdWorker 创建 etcd Worker
func NewEtcdWorker(config *EtcdConfig) (*Etcd, error) {
var (
e = &Etcd{}
err error
)
e.Endpoints = config.Endpoints
e.DialTimeout = config.DialTimeout
if config.LocalIP == "" {
config.LocalIP = goip.GetOutsideIp()
}
e.LocalIP = config.LocalIP
e.Client, err = clientv3.New(clientv3.Config{
Endpoints: e.Endpoints,
DialTimeout: e.DialTimeout,
})
if err != nil {
return nil, errors.New("连接失败:" + err.Error())
}
// 获得kv API子集
e.Kv = clientv3.NewKV(e.Client)
// 创建一个lease租约对象
e.Lease = clientv3.NewLease(e.Client)
// 注册服务
go e.RegisterWorker()
return e, nil
}
// RegisterWorker 注册worker
func (e Etcd) RegisterWorker() {
var (
regKey string
leaseGrantResp *clientv3.LeaseGrantResponse
err error
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
keepAliveResp *clientv3.LeaseKeepAliveResponse
cancelCtx context.Context
cancelFunc context.CancelFunc
)
for {
// 注册路径
regKey = JobWorkerDir + e.LocalIP
log.Println("租约:", regKey)
cancelFunc = nil
// 申请一个10秒的租约
leaseGrantResp, err = e.Lease.Grant(context.TODO(), 10)
if err != nil {
log.Println("申请一个10秒的租约失败", err)
goto RETRY
}
// 自动永久续租
keepAliveChan, err = e.Lease.KeepAlive(context.TODO(), leaseGrantResp.ID)
if err != nil {
log.Println("自动永久续租失败", err)
goto RETRY
}
cancelCtx, cancelFunc = context.WithCancel(context.TODO())
// 注册到etcd
_, err = e.Kv.Put(cancelCtx, regKey, "", clientv3.WithLease(leaseGrantResp.ID))
if err != nil {
log.Println(fmt.Sprintf(" %s 服务注册失败:%s", regKey, err))
goto RETRY
}
// 处理续约应答的协程
for {
select {
case keepAliveResp = <-keepAliveChan:
if keepAliveResp == nil {
log.Println("续租失败")
goto RETRY
} else {
log.Println("收到自动续租应答:", leaseGrantResp.ID)
}
}
}
RETRY:
log.Println("异常 RETRY ", regKey)
time.Sleep(1 * time.Second)
if cancelFunc != nil {
cancelFunc()
}
}
}

@ -0,0 +1,5 @@
#protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out . --grpc-gateway_opt paths=source_relative ./pb/basics.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./pb/basics.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./pb/task.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative ./pb/pubsub.proto

@ -0,0 +1,221 @@
// 版本
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.19.4
// source: pb/basics.proto
// 包名
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// 请求消息
type BasicsRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *BasicsRequest) Reset() {
*x = BasicsRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_basics_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *BasicsRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*BasicsRequest) ProtoMessage() {}
func (x *BasicsRequest) ProtoReflect() protoreflect.Message {
mi := &file_pb_basics_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use BasicsRequest.ProtoReflect.Descriptor instead.
func (*BasicsRequest) Descriptor() ([]byte, []int) {
return file_pb_basics_proto_rawDescGZIP(), []int{0}
}
func (x *BasicsRequest) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
// 响应消息
type BasicsResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *BasicsResponse) Reset() {
*x = BasicsResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_basics_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *BasicsResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*BasicsResponse) ProtoMessage() {}
func (x *BasicsResponse) ProtoReflect() protoreflect.Message {
mi := &file_pb_basics_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use BasicsResponse.ProtoReflect.Descriptor instead.
func (*BasicsResponse) Descriptor() ([]byte, []int) {
return file_pb_basics_proto_rawDescGZIP(), []int{1}
}
func (x *BasicsResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
var File_pb_basics_proto protoreflect.FileDescriptor
var file_pb_basics_proto_rawDesc = []byte{
0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x62, 0x61, 0x73, 0x69, 0x63, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x29, 0x0a, 0x0d, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x22, 0x2a, 0x0a, 0x0e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x6a, 0x0a, 0x06,
0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x12, 0x2f, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x11,
0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x2f, 0x0a, 0x04, 0x50, 0x6f, 0x6e, 0x67, 0x12,
0x11, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x42, 0x61, 0x73, 0x69, 0x63, 0x73, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f, 0x70,
0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pb_basics_proto_rawDescOnce sync.Once
file_pb_basics_proto_rawDescData = file_pb_basics_proto_rawDesc
)
func file_pb_basics_proto_rawDescGZIP() []byte {
file_pb_basics_proto_rawDescOnce.Do(func() {
file_pb_basics_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_basics_proto_rawDescData)
})
return file_pb_basics_proto_rawDescData
}
var file_pb_basics_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pb_basics_proto_goTypes = []interface{}{
(*BasicsRequest)(nil), // 0: pb.BasicsRequest
(*BasicsResponse)(nil), // 1: pb.BasicsResponse
}
var file_pb_basics_proto_depIdxs = []int32{
0, // 0: pb.Basics.Ping:input_type -> pb.BasicsRequest
0, // 1: pb.Basics.Pong:input_type -> pb.BasicsRequest
1, // 2: pb.Basics.Ping:output_type -> pb.BasicsResponse
1, // 3: pb.Basics.Pong:output_type -> pb.BasicsResponse
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pb_basics_proto_init() }
func file_pb_basics_proto_init() {
if File_pb_basics_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pb_basics_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*BasicsRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_basics_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*BasicsResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_basics_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pb_basics_proto_goTypes,
DependencyIndexes: file_pb_basics_proto_depIdxs,
MessageInfos: file_pb_basics_proto_msgTypes,
}.Build()
File_pb_basics_proto = out.File
file_pb_basics_proto_rawDesc = nil
file_pb_basics_proto_goTypes = nil
file_pb_basics_proto_depIdxs = nil
}

@ -0,0 +1,25 @@
//
syntax = "proto3";
//
package pb;
//
option go_package = "../pb";
//
service Basics{
//
rpc Ping(BasicsRequest) returns (BasicsResponse){};
//
rpc Pong(BasicsRequest) returns (BasicsResponse){};
}
//
message BasicsRequest {
string message = 1;
}
//
message BasicsResponse {
string message = 1;
}

@ -0,0 +1,145 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: pb/basics.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// BasicsClient is the client API for Basics service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type BasicsClient interface {
// 心跳
Ping(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error)
// 心跳
Pong(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error)
}
type basicsClient struct {
cc grpc.ClientConnInterface
}
func NewBasicsClient(cc grpc.ClientConnInterface) BasicsClient {
return &basicsClient{cc}
}
func (c *basicsClient) Ping(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error) {
out := new(BasicsResponse)
err := c.cc.Invoke(ctx, "/pb.Basics/Ping", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *basicsClient) Pong(ctx context.Context, in *BasicsRequest, opts ...grpc.CallOption) (*BasicsResponse, error) {
out := new(BasicsResponse)
err := c.cc.Invoke(ctx, "/pb.Basics/Pong", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// BasicsServer is the server API for Basics service.
// All implementations must embed UnimplementedBasicsServer
// for forward compatibility
type BasicsServer interface {
// 心跳
Ping(context.Context, *BasicsRequest) (*BasicsResponse, error)
// 心跳
Pong(context.Context, *BasicsRequest) (*BasicsResponse, error)
mustEmbedUnimplementedBasicsServer()
}
// UnimplementedBasicsServer must be embedded to have forward compatible implementations.
type UnimplementedBasicsServer struct {
}
func (UnimplementedBasicsServer) Ping(context.Context, *BasicsRequest) (*BasicsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedBasicsServer) Pong(context.Context, *BasicsRequest) (*BasicsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Pong not implemented")
}
func (UnimplementedBasicsServer) mustEmbedUnimplementedBasicsServer() {}
// UnsafeBasicsServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to BasicsServer will
// result in compilation errors.
type UnsafeBasicsServer interface {
mustEmbedUnimplementedBasicsServer()
}
func RegisterBasicsServer(s grpc.ServiceRegistrar, srv BasicsServer) {
s.RegisterService(&Basics_ServiceDesc, srv)
}
func _Basics_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BasicsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BasicsServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.Basics/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BasicsServer).Ping(ctx, req.(*BasicsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Basics_Pong_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BasicsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BasicsServer).Pong(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.Basics/Pong",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BasicsServer).Pong(ctx, req.(*BasicsRequest))
}
return interceptor(ctx, in, info, handler)
}
// Basics_ServiceDesc is the grpc.ServiceDesc for Basics service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Basics_ServiceDesc = grpc.ServiceDesc{
ServiceName: "pb.Basics",
HandlerType: (*BasicsServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _Basics_Ping_Handler,
},
{
MethodName: "Pong",
Handler: _Basics_Pong_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pb/basics.proto",
}

@ -0,0 +1,440 @@
// 版本
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.19.4
// source: pb/pubsub.proto
// 包名
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// 请求消息
type PublishRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Method string `protobuf:"bytes,3,opt,name=method,proto3" json:"method,omitempty"`
Ip string `protobuf:"bytes,4,opt,name=ip,proto3" json:"ip,omitempty"`
}
func (x *PublishRequest) Reset() {
*x = PublishRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PublishRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PublishRequest) ProtoMessage() {}
func (x *PublishRequest) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PublishRequest.ProtoReflect.Descriptor instead.
func (*PublishRequest) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{0}
}
func (x *PublishRequest) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *PublishRequest) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
func (x *PublishRequest) GetMethod() string {
if x != nil {
return x.Method
}
return ""
}
func (x *PublishRequest) GetIp() string {
if x != nil {
return x.Ip
}
return ""
}
// 响应消息
type PublishResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Ip string `protobuf:"bytes,3,opt,name=ip,proto3" json:"ip,omitempty"`
}
func (x *PublishResponse) Reset() {
*x = PublishResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *PublishResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*PublishResponse) ProtoMessage() {}
func (x *PublishResponse) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead.
func (*PublishResponse) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{1}
}
func (x *PublishResponse) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *PublishResponse) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
func (x *PublishResponse) GetIp() string {
if x != nil {
return x.Ip
}
return ""
}
// 请求消息
type SubscribeRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Method string `protobuf:"bytes,3,opt,name=method,proto3" json:"method,omitempty"`
Ip string `protobuf:"bytes,4,opt,name=ip,proto3" json:"ip,omitempty"`
}
func (x *SubscribeRequest) Reset() {
*x = SubscribeRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SubscribeRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SubscribeRequest) ProtoMessage() {}
func (x *SubscribeRequest) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SubscribeRequest.ProtoReflect.Descriptor instead.
func (*SubscribeRequest) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{2}
}
func (x *SubscribeRequest) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *SubscribeRequest) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
func (x *SubscribeRequest) GetMethod() string {
if x != nil {
return x.Method
}
return ""
}
func (x *SubscribeRequest) GetIp() string {
if x != nil {
return x.Ip
}
return ""
}
// 响应消息
type SubscribeResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
Method string `protobuf:"bytes,3,opt,name=method,proto3" json:"method,omitempty"`
}
func (x *SubscribeResponse) Reset() {
*x = SubscribeResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_pubsub_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *SubscribeResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*SubscribeResponse) ProtoMessage() {}
func (x *SubscribeResponse) ProtoReflect() protoreflect.Message {
mi := &file_pb_pubsub_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use SubscribeResponse.ProtoReflect.Descriptor instead.
func (*SubscribeResponse) Descriptor() ([]byte, []int) {
return file_pb_pubsub_proto_rawDescGZIP(), []int{3}
}
func (x *SubscribeResponse) GetId() string {
if x != nil {
return x.Id
}
return ""
}
func (x *SubscribeResponse) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
func (x *SubscribeResponse) GetMethod() string {
if x != nil {
return x.Method
}
return ""
}
var File_pb_pubsub_proto protoreflect.FileDescriptor
var file_pb_pubsub_proto_rawDesc = []byte{
0x0a, 0x0f, 0x70, 0x62, 0x2f, 0x70, 0x75, 0x62, 0x73, 0x75, 0x62, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x5e, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x16, 0x0a,
0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d,
0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28,
0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x47, 0x0a, 0x0f, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x0e,
0x0a, 0x02, 0x69, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70, 0x22, 0x60,
0x0a, 0x10, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02,
0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x74, 0x68,
0x6f, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64,
0x12, 0x0e, 0x0a, 0x02, 0x69, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x70,
0x22, 0x51, 0x0a, 0x11, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x6d,
0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x74,
0x68, 0x6f, 0x64, 0x32, 0x78, 0x0a, 0x06, 0x50, 0x75, 0x62, 0x53, 0x75, 0x62, 0x12, 0x32, 0x0a,
0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x12, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x75,
0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x70,
0x62, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x12, 0x3a, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x14,
0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72,
0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x42, 0x07, 0x5a,
0x05, 0x2e, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pb_pubsub_proto_rawDescOnce sync.Once
file_pb_pubsub_proto_rawDescData = file_pb_pubsub_proto_rawDesc
)
func file_pb_pubsub_proto_rawDescGZIP() []byte {
file_pb_pubsub_proto_rawDescOnce.Do(func() {
file_pb_pubsub_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_pubsub_proto_rawDescData)
})
return file_pb_pubsub_proto_rawDescData
}
var file_pb_pubsub_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_pb_pubsub_proto_goTypes = []interface{}{
(*PublishRequest)(nil), // 0: pb.PublishRequest
(*PublishResponse)(nil), // 1: pb.PublishResponse
(*SubscribeRequest)(nil), // 2: pb.SubscribeRequest
(*SubscribeResponse)(nil), // 3: pb.SubscribeResponse
}
var file_pb_pubsub_proto_depIdxs = []int32{
0, // 0: pb.PubSub.Publish:input_type -> pb.PublishRequest
2, // 1: pb.PubSub.Subscribe:input_type -> pb.SubscribeRequest
1, // 2: pb.PubSub.Publish:output_type -> pb.PublishResponse
3, // 3: pb.PubSub.Subscribe:output_type -> pb.SubscribeResponse
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pb_pubsub_proto_init() }
func file_pb_pubsub_proto_init() {
if File_pb_pubsub_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pb_pubsub_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PublishRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_pubsub_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PublishResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_pubsub_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_pubsub_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*SubscribeResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_pubsub_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pb_pubsub_proto_goTypes,
DependencyIndexes: file_pb_pubsub_proto_depIdxs,
MessageInfos: file_pb_pubsub_proto_msgTypes,
}.Build()
File_pb_pubsub_proto = out.File
file_pb_pubsub_proto_rawDesc = nil
file_pb_pubsub_proto_goTypes = nil
file_pb_pubsub_proto_depIdxs = nil
}

@ -0,0 +1,46 @@
//
syntax = "proto3";
//
package pb;
//
option go_package = "../pb";
//
service PubSub {
// []
rpc Publish (PublishRequest) returns (PublishResponse);
// []
rpc Subscribe (SubscribeRequest) returns (stream SubscribeResponse);
}
//
message PublishRequest {
string id = 1;
string value = 2;
string method = 3;
string ip = 4;
}
//
message PublishResponse {
string id = 1;
string value = 2;
string ip = 3;
}
//
message SubscribeRequest {
string id = 1;
string value = 2;
string method = 3;
string ip = 4;
}
//
message SubscribeResponse {
string id = 1;
string value = 2;
string method = 3;
}

@ -0,0 +1,77 @@
package pb
import (
"context"
"go.dtapp.net/library/utils/gojobs/pubsub"
"log"
"strings"
"time"
)
type PubSubServerService struct {
pub *pubsub.Publisher
UnimplementedPubSubServer
}
func NewPubSubServerService() *PubSubServerService {
return &PubSubServerService{
// 新建一个Publisher对象
pub: pubsub.NewPublisher(time.Millisecond*100, 10),
}
}
// Publish 实现发布方法
func (p *PubSubServerService) Publish(ctx context.Context, req *PublishRequest) (*PublishResponse, error) {
log.Printf("[服务中转]{发布}编号:%s 类型:%s ip地址%s\n", req.GetId(), req.GetValue(), req.GetIp())
// 发布消息
p.pub.Publish(req.GetValue())
return &PublishResponse{
Id: req.GetId(),
Value: req.GetValue(),
Ip: req.GetIp(),
}, nil
}
// Subscribe 实现订阅方法
func (p *PubSubServerService) Subscribe(req *SubscribeRequest, stream PubSub_SubscribeServer) error {
// SubscribeTopic 增加一个使用函数过滤器的订阅者
// func(v interface{}) 定义函数过滤的规则
// SubscribeTopic 返回一个chan interface{}
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
log.Printf("[服务中转]{订阅}主题:%v\n", v)
// 接收数据是string并且key是以arg为前缀的
if key, ok := v.(string); ok {
if strings.HasPrefix(key, req.GetValue()) {
return true
}
}
return false
})
log.Printf("[服务中转]{订阅}编号:%s 类型:%s 方法:%s ip地址%s\n", req.GetId(), req.GetValue(), req.GetMethod(), req.GetIp())
log.Println("[服务中转]{订阅}工作线:", ch)
log.Println("[服务中转]{订阅}当前工作线数量:", p.pub.Len())
// 服务器遍历chan并将其中信息发送给订阅客户端
for v := range ch {
log.Println("[服务中转]{订阅}for ch", ch)
log.Println("[服务中转]{订阅}for v", v)
err := stream.Send(&SubscribeResponse{
Id: req.GetId(),
Value: req.GetValue(),
Method: req.GetMethod(),
})
if err != nil {
log.Println("[服务中转]{订阅}任务分配失败 ", err.Error())
return err
}
}
return nil
}

@ -0,0 +1,173 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: pb/pubsub.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// PubSubClient is the client API for PubSub service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type PubSubClient interface {
// [发布] 消息
Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishResponse, error)
// [订阅] 消息
Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (PubSub_SubscribeClient, error)
}
type pubSubClient struct {
cc grpc.ClientConnInterface
}
func NewPubSubClient(cc grpc.ClientConnInterface) PubSubClient {
return &pubSubClient{cc}
}
func (c *pubSubClient) Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishResponse, error) {
out := new(PublishResponse)
err := c.cc.Invoke(ctx, "/pb.PubSub/Publish", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *pubSubClient) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (PubSub_SubscribeClient, error) {
stream, err := c.cc.NewStream(ctx, &PubSub_ServiceDesc.Streams[0], "/pb.PubSub/Subscribe", opts...)
if err != nil {
return nil, err
}
x := &pubSubSubscribeClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type PubSub_SubscribeClient interface {
Recv() (*SubscribeResponse, error)
grpc.ClientStream
}
type pubSubSubscribeClient struct {
grpc.ClientStream
}
func (x *pubSubSubscribeClient) Recv() (*SubscribeResponse, error) {
m := new(SubscribeResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// PubSubServer is the server API for PubSub service.
// All implementations must embed UnimplementedPubSubServer
// for forward compatibility
type PubSubServer interface {
// [发布] 消息
Publish(context.Context, *PublishRequest) (*PublishResponse, error)
// [订阅] 消息
Subscribe(*SubscribeRequest, PubSub_SubscribeServer) error
mustEmbedUnimplementedPubSubServer()
}
// UnimplementedPubSubServer must be embedded to have forward compatible implementations.
type UnimplementedPubSubServer struct {
}
func (UnimplementedPubSubServer) Publish(context.Context, *PublishRequest) (*PublishResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Publish not implemented")
}
func (UnimplementedPubSubServer) Subscribe(*SubscribeRequest, PubSub_SubscribeServer) error {
return status.Errorf(codes.Unimplemented, "method Subscribe not implemented")
}
func (UnimplementedPubSubServer) mustEmbedUnimplementedPubSubServer() {}
// UnsafePubSubServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to PubSubServer will
// result in compilation errors.
type UnsafePubSubServer interface {
mustEmbedUnimplementedPubSubServer()
}
func RegisterPubSubServer(s grpc.ServiceRegistrar, srv PubSubServer) {
s.RegisterService(&PubSub_ServiceDesc, srv)
}
func _PubSub_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PublishRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(PubSubServer).Publish(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.PubSub/Publish",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PubSubServer).Publish(ctx, req.(*PublishRequest))
}
return interceptor(ctx, in, info, handler)
}
func _PubSub_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SubscribeRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(PubSubServer).Subscribe(m, &pubSubSubscribeServer{stream})
}
type PubSub_SubscribeServer interface {
Send(*SubscribeResponse) error
grpc.ServerStream
}
type pubSubSubscribeServer struct {
grpc.ServerStream
}
func (x *pubSubSubscribeServer) Send(m *SubscribeResponse) error {
return x.ServerStream.SendMsg(m)
}
// PubSub_ServiceDesc is the grpc.ServiceDesc for PubSub service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var PubSub_ServiceDesc = grpc.ServiceDesc{
ServiceName: "pb.PubSub",
HandlerType: (*PubSubServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Publish",
Handler: _PubSub_Publish_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Subscribe",
Handler: _PubSub_Subscribe_Handler,
ServerStreams: true,
},
},
Metadata: "pb/pubsub.proto",
}

@ -0,0 +1,234 @@
// 版本
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.0
// protoc v3.19.4
// source: pb/task.proto
// 包名
package pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// 请求消息
type TaskRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *TaskRequest) Reset() {
*x = TaskRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_task_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *TaskRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TaskRequest) ProtoMessage() {}
func (x *TaskRequest) ProtoReflect() protoreflect.Message {
mi := &file_pb_task_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TaskRequest.ProtoReflect.Descriptor instead.
func (*TaskRequest) Descriptor() ([]byte, []int) {
return file_pb_task_proto_rawDescGZIP(), []int{0}
}
func (x *TaskRequest) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
// 响应消息
type TaskResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"`
}
func (x *TaskResponse) Reset() {
*x = TaskResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pb_task_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *TaskResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TaskResponse) ProtoMessage() {}
func (x *TaskResponse) ProtoReflect() protoreflect.Message {
mi := &file_pb_task_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TaskResponse.ProtoReflect.Descriptor instead.
func (*TaskResponse) Descriptor() ([]byte, []int) {
return file_pb_task_proto_rawDescGZIP(), []int{1}
}
func (x *TaskResponse) GetMessage() string {
if x != nil {
return x.Message
}
return ""
}
var File_pb_task_proto protoreflect.FileDescriptor
var file_pb_task_proto_rawDesc = []byte{
0x0a, 0x0d, 0x70, 0x62, 0x2f, 0x74, 0x61, 0x73, 0x6b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x02, 0x70, 0x62, 0x22, 0x27, 0x0a, 0x0b, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x28, 0x0a, 0x0c,
0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07,
0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d,
0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0xfb, 0x01, 0x0a, 0x04, 0x54, 0x61, 0x73, 0x6b, 0x12,
0x30, 0x0a, 0x09, 0x55, 0x6e, 0x61, 0x72, 0x79, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70,
0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e,
0x70, 0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
0x00, 0x12, 0x3c, 0x0a, 0x13, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x69, 0x6e, 0x67, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x61,
0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x54,
0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12,
0x3c, 0x0a, 0x13, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x69,
0x6e, 0x67, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x61, 0x73,
0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x12, 0x45, 0x0a,
0x1a, 0x42, 0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x53, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x0f, 0x2e, 0x70, 0x62,
0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x70,
0x62, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
0x28, 0x01, 0x30, 0x01, 0x42, 0x07, 0x5a, 0x05, 0x2e, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_pb_task_proto_rawDescOnce sync.Once
file_pb_task_proto_rawDescData = file_pb_task_proto_rawDesc
)
func file_pb_task_proto_rawDescGZIP() []byte {
file_pb_task_proto_rawDescOnce.Do(func() {
file_pb_task_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_task_proto_rawDescData)
})
return file_pb_task_proto_rawDescData
}
var file_pb_task_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_pb_task_proto_goTypes = []interface{}{
(*TaskRequest)(nil), // 0: pb.TaskRequest
(*TaskResponse)(nil), // 1: pb.TaskResponse
}
var file_pb_task_proto_depIdxs = []int32{
0, // 0: pb.Task.UnaryTask:input_type -> pb.TaskRequest
0, // 1: pb.Task.ServerStreamingTask:input_type -> pb.TaskRequest
0, // 2: pb.Task.ClientStreamingTask:input_type -> pb.TaskRequest
0, // 3: pb.Task.BidirectionalStreamingTask:input_type -> pb.TaskRequest
1, // 4: pb.Task.UnaryTask:output_type -> pb.TaskResponse
1, // 5: pb.Task.ServerStreamingTask:output_type -> pb.TaskResponse
1, // 6: pb.Task.ClientStreamingTask:output_type -> pb.TaskResponse
1, // 7: pb.Task.BidirectionalStreamingTask:output_type -> pb.TaskResponse
4, // [4:8] is the sub-list for method output_type
0, // [0:4] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pb_task_proto_init() }
func file_pb_task_proto_init() {
if File_pb_task_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pb_task_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TaskRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pb_task_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TaskResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pb_task_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pb_task_proto_goTypes,
DependencyIndexes: file_pb_task_proto_depIdxs,
MessageInfos: file_pb_task_proto_msgTypes,
}.Build()
File_pb_task_proto = out.File
file_pb_task_proto_rawDesc = nil
file_pb_task_proto_goTypes = nil
file_pb_task_proto_depIdxs = nil
}

@ -0,0 +1,30 @@
//
syntax = "proto3";
//
package pb;
//
option go_package = "../pb";
//
service Task{
//
rpc UnaryTask(TaskRequest) returns (TaskResponse){};
//
rpc ServerStreamingTask(TaskRequest) returns (stream TaskResponse){};
//
rpc ClientStreamingTask(stream TaskRequest) returns (TaskResponse){};
//
rpc BidirectionalStreamingTask(stream TaskRequest) returns (stream TaskResponse){};
}
//
message TaskRequest {
string message = 1;
}
//
message TaskResponse {
string message = 1;
}

@ -0,0 +1,315 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.4
// source: pb/task.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// TaskClient is the client API for Task service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type TaskClient interface {
// 普通一元方法
UnaryTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (*TaskResponse, error)
// 服务端推送流
ServerStreamingTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (Task_ServerStreamingTaskClient, error)
// 客户端推送流
ClientStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_ClientStreamingTaskClient, error)
// 双向推送流
BidirectionalStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_BidirectionalStreamingTaskClient, error)
}
type taskClient struct {
cc grpc.ClientConnInterface
}
func NewTaskClient(cc grpc.ClientConnInterface) TaskClient {
return &taskClient{cc}
}
func (c *taskClient) UnaryTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (*TaskResponse, error) {
out := new(TaskResponse)
err := c.cc.Invoke(ctx, "/pb.Task/UnaryTask", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *taskClient) ServerStreamingTask(ctx context.Context, in *TaskRequest, opts ...grpc.CallOption) (Task_ServerStreamingTaskClient, error) {
stream, err := c.cc.NewStream(ctx, &Task_ServiceDesc.Streams[0], "/pb.Task/ServerStreamingTask", opts...)
if err != nil {
return nil, err
}
x := &taskServerStreamingTaskClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Task_ServerStreamingTaskClient interface {
Recv() (*TaskResponse, error)
grpc.ClientStream
}
type taskServerStreamingTaskClient struct {
grpc.ClientStream
}
func (x *taskServerStreamingTaskClient) Recv() (*TaskResponse, error) {
m := new(TaskResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *taskClient) ClientStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_ClientStreamingTaskClient, error) {
stream, err := c.cc.NewStream(ctx, &Task_ServiceDesc.Streams[1], "/pb.Task/ClientStreamingTask", opts...)
if err != nil {
return nil, err
}
x := &taskClientStreamingTaskClient{stream}
return x, nil
}
type Task_ClientStreamingTaskClient interface {
Send(*TaskRequest) error
CloseAndRecv() (*TaskResponse, error)
grpc.ClientStream
}
type taskClientStreamingTaskClient struct {
grpc.ClientStream
}
func (x *taskClientStreamingTaskClient) Send(m *TaskRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *taskClientStreamingTaskClient) CloseAndRecv() (*TaskResponse, error) {
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
m := new(TaskResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *taskClient) BidirectionalStreamingTask(ctx context.Context, opts ...grpc.CallOption) (Task_BidirectionalStreamingTaskClient, error) {
stream, err := c.cc.NewStream(ctx, &Task_ServiceDesc.Streams[2], "/pb.Task/BidirectionalStreamingTask", opts...)
if err != nil {
return nil, err
}
x := &taskBidirectionalStreamingTaskClient{stream}
return x, nil
}
type Task_BidirectionalStreamingTaskClient interface {
Send(*TaskRequest) error
Recv() (*TaskResponse, error)
grpc.ClientStream
}
type taskBidirectionalStreamingTaskClient struct {
grpc.ClientStream
}
func (x *taskBidirectionalStreamingTaskClient) Send(m *TaskRequest) error {
return x.ClientStream.SendMsg(m)
}
func (x *taskBidirectionalStreamingTaskClient) Recv() (*TaskResponse, error) {
m := new(TaskResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// TaskServer is the server API for Task service.
// All implementations must embed UnimplementedTaskServer
// for forward compatibility
type TaskServer interface {
// 普通一元方法
UnaryTask(context.Context, *TaskRequest) (*TaskResponse, error)
// 服务端推送流
ServerStreamingTask(*TaskRequest, Task_ServerStreamingTaskServer) error
// 客户端推送流
ClientStreamingTask(Task_ClientStreamingTaskServer) error
// 双向推送流
BidirectionalStreamingTask(Task_BidirectionalStreamingTaskServer) error
mustEmbedUnimplementedTaskServer()
}
// UnimplementedTaskServer must be embedded to have forward compatible implementations.
type UnimplementedTaskServer struct {
}
func (UnimplementedTaskServer) UnaryTask(context.Context, *TaskRequest) (*TaskResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method UnaryTask not implemented")
}
func (UnimplementedTaskServer) ServerStreamingTask(*TaskRequest, Task_ServerStreamingTaskServer) error {
return status.Errorf(codes.Unimplemented, "method ServerStreamingTask not implemented")
}
func (UnimplementedTaskServer) ClientStreamingTask(Task_ClientStreamingTaskServer) error {
return status.Errorf(codes.Unimplemented, "method ClientStreamingTask not implemented")
}
func (UnimplementedTaskServer) BidirectionalStreamingTask(Task_BidirectionalStreamingTaskServer) error {
return status.Errorf(codes.Unimplemented, "method BidirectionalStreamingTask not implemented")
}
func (UnimplementedTaskServer) mustEmbedUnimplementedTaskServer() {}
// UnsafeTaskServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to TaskServer will
// result in compilation errors.
type UnsafeTaskServer interface {
mustEmbedUnimplementedTaskServer()
}
func RegisterTaskServer(s grpc.ServiceRegistrar, srv TaskServer) {
s.RegisterService(&Task_ServiceDesc, srv)
}
func _Task_UnaryTask_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(TaskRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(TaskServer).UnaryTask(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.Task/UnaryTask",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(TaskServer).UnaryTask(ctx, req.(*TaskRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Task_ServerStreamingTask_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(TaskRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(TaskServer).ServerStreamingTask(m, &taskServerStreamingTaskServer{stream})
}
type Task_ServerStreamingTaskServer interface {
Send(*TaskResponse) error
grpc.ServerStream
}
type taskServerStreamingTaskServer struct {
grpc.ServerStream
}
func (x *taskServerStreamingTaskServer) Send(m *TaskResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Task_ClientStreamingTask_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(TaskServer).ClientStreamingTask(&taskClientStreamingTaskServer{stream})
}
type Task_ClientStreamingTaskServer interface {
SendAndClose(*TaskResponse) error
Recv() (*TaskRequest, error)
grpc.ServerStream
}
type taskClientStreamingTaskServer struct {
grpc.ServerStream
}
func (x *taskClientStreamingTaskServer) SendAndClose(m *TaskResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *taskClientStreamingTaskServer) Recv() (*TaskRequest, error) {
m := new(TaskRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func _Task_BidirectionalStreamingTask_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(TaskServer).BidirectionalStreamingTask(&taskBidirectionalStreamingTaskServer{stream})
}
type Task_BidirectionalStreamingTaskServer interface {
Send(*TaskResponse) error
Recv() (*TaskRequest, error)
grpc.ServerStream
}
type taskBidirectionalStreamingTaskServer struct {
grpc.ServerStream
}
func (x *taskBidirectionalStreamingTaskServer) Send(m *TaskResponse) error {
return x.ServerStream.SendMsg(m)
}
func (x *taskBidirectionalStreamingTaskServer) Recv() (*TaskRequest, error) {
m := new(TaskRequest)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Task_ServiceDesc is the grpc.ServiceDesc for Task service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Task_ServiceDesc = grpc.ServiceDesc{
ServiceName: "pb.Task",
HandlerType: (*TaskServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "UnaryTask",
Handler: _Task_UnaryTask_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "ServerStreamingTask",
Handler: _Task_ServerStreamingTask_Handler,
ServerStreams: true,
},
{
StreamName: "ClientStreamingTask",
Handler: _Task_ClientStreamingTask_Handler,
ClientStreams: true,
},
{
StreamName: "BidirectionalStreamingTask",
Handler: _Task_BidirectionalStreamingTask_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "pb/task.proto",
}

@ -0,0 +1,125 @@
package pubsub
import (
"sync"
"time"
)
// 等待组放在共享内存池中减少GC
var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}
// NewPublisher
// 第一个参数控制发布时最大阻塞时间
// 第二个参数是缓冲区大小控制每个订阅者的chan缓冲区大小
func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
return &Publisher{
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
type subscriber chan interface{}
type topicFunc func(v interface{}) bool
type Publisher struct {
m sync.RWMutex // 控制订阅者map并发读写安全
buffer int // 每个订阅者chan缓冲区大小
timeout time.Duration // 发布阻塞超时时间
subscribers map[subscriber]topicFunc
}
// Len 返回订阅者数量
func (p *Publisher) Len() int {
p.m.RLock()
i := len(p.subscribers)
p.m.RUnlock()
return i
}
// Subscribe 无Topic订阅
func (p *Publisher) Subscribe() chan interface{} {
return p.SubscribeTopic(nil)
}
// SubscribeTopic 通过Topic订阅
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// SubscribeTopicWithBuffer 通过自定义chan缓冲区大小定义新的订阅者
func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
ch := make(chan interface{}, buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// Evict 移除某个订阅者
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()
_, exists := p.subscribers[sub]
if exists {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
// Publish 发布消息
func (p *Publisher) Publish(v interface{}) {
p.m.RLock()
if len(p.subscribers) == 0 {
p.m.RUnlock()
return
}
wg := wgPool.Get().(*sync.WaitGroup)
for sub, topic := range p.subscribers {
wg.Add(1)
go p.sendTopic(sub, topic, v, wg)
}
wg.Wait()
wgPool.Put(wg)
p.m.RUnlock()
}
// Close 关闭服务
func (p *Publisher) Close() {
p.m.Lock()
for sub := range p.subscribers {
delete(p.subscribers, sub)
close(sub)
}
p.m.Unlock()
}
// 真正发布消息的逻辑通过Timer根据传入的timeout控制每次发布消息最大阻塞时长
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) {
return
}
// 如果接收器不可用,请在选择“不阻止”下发送
if p.timeout > 0 {
timeout := time.NewTimer(p.timeout)
defer timeout.Stop()
select {
case sub <- v:
case <-timeout.C:
}
return
}
select {
case sub <- v:
default:
}
}

@ -0,0 +1,84 @@
package gojobs
import (
"errors"
"go.dtapp.net/library/utils/gojobs/pb"
"go.dtapp.net/library/utils/gojobs/pubsub"
"google.golang.org/grpc"
"log"
"net"
"strings"
"time"
)
// ServerConfig 服务配置
type ServerConfig struct {
PublishTimeout time.Duration // 控制发布时最大阻塞时间
PubBuffer int // 缓冲区大小控制每个订阅者的chan缓冲区大小
Address string // 服务端口 0.0.0.0:8888
}
// Server 服务
type Server struct {
ServerConfig // 配置
Pub *pubsub.Publisher // 订阅
Conn *grpc.Server // 链接信息
}
// NewServer 创建服务和注册
func NewServer(config *ServerConfig) *Server {
if config.Address == "" {
panic("[服务中转]请填写服务端口")
}
s := &Server{}
s.PublishTimeout = config.PublishTimeout
s.PubBuffer = config.PubBuffer
s.Address = config.Address
s.Pub = pubsub.NewPublisher(config.PublishTimeout, config.PubBuffer)
// 创建gRPC服务器
s.Conn = grpc.NewServer()
// 注册
pb.RegisterPubSubServer(s.Conn, pb.NewPubSubServerService())
return s
}
// StartCron 启动定时任务
func (s *Server) StartCron() {
cron := s.Pub.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, prefix) {
return true
}
}
return false
})
go func() {
log.Println("crontopic:", <-cron)
}()
}
// StartUp 启动服务
func (s *Server) StartUp() {
// 监听本地端口
lis, err := net.Listen("tcp", s.Address)
if err != nil {
panic(errors.New("[服务中转]{创建监听失败}" + err.Error()))
}
log.Println("[服务中转]{监听}", lis.Addr())
// 启动grpc
err = s.Conn.Serve(lis)
if err != nil {
panic(errors.New("[服务中转]{创建服务失败}" + err.Error()))
}
}

@ -0,0 +1,76 @@
package gojobs
import (
"context"
"go.dtapp.net/library/utils/gojobs/pb"
"go.dtapp.net/library/utils/gouuid"
"google.golang.org/grpc"
)
// WorkerConfig 工作配置
type WorkerConfig struct {
Address string // 服务端口 127.0.0.1:8888
ClientIp string // 自己的ip地址
}
// Worker 工作
type Worker struct {
WorkerConfig // 配置
Pub pb.PubSubClient // 订阅
Conn *grpc.ClientConn // 链接信息
}
// NewWorker 创建工作
func NewWorker(config *WorkerConfig) *Worker {
if config.Address == "" {
panic("[工作线]请填写服务端口")
}
if config.ClientIp == "" {
panic("[定时任务]请填写ip地址")
}
w := &Worker{}
w.Address = config.Address
w.ClientIp = config.ClientIp
var err error
// 建立连接 获取client
w.Conn, err = grpc.Dial(w.Address, grpc.WithInsecure())
if err != nil {
panic("[工作线]{连接失败}" + err.Error())
}
// 新建一个客户端
w.Pub = pb.NewPubSubClient(w.Conn)
return w
}
// SubscribeCron 订阅服务
func (w *Worker) SubscribeCron() pb.PubSub_SubscribeClient {
stream, err := w.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{
Id: gouuid.GetUuId(),
Value: prefix,
Ip: w.ClientIp,
})
if err != nil {
panic("[工作线]{订阅服务失败}" + err.Error())
}
return stream
}
// StartCron 启动任务
func (w *Worker) StartCron() pb.PubSub_SubscribeClient {
stream, err := w.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{
Id: gouuid.GetUuId(),
Value: prefixSprintf(w.ClientIp),
Ip: w.ClientIp,
})
if err != nil {
panic("[工作线]{启动任务失败}" + err.Error())
}
return stream
}
Loading…
Cancel
Save