From 63d838f8537d7c46ac8f3db01c3d96c7c70f8aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=85=89=E6=98=A5?= Date: Tue, 24 May 2022 00:00:26 +0800 Subject: [PATCH] =?UTF-8?q?-=20=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- jobs_test.go | 75 ++++++++++++++++++---------------------------------- server.go | 17 ++++++++++++ worker.go | 34 +++++++++++++++++++++++- 3 files changed, 76 insertions(+), 50 deletions(-) diff --git a/jobs_test.go b/jobs_test.go index d595047..bd71a14 100644 --- a/jobs_test.go +++ b/jobs_test.go @@ -1,13 +1,11 @@ package gojobs import ( - "context" "github.com/robfig/cron/v3" "go.dtapp.net/gojobs/pb" "go.dtapp.net/gouuid" "io" "log" - "strings" "sync" "testing" "time" @@ -31,24 +29,9 @@ func testServer(wg *sync.WaitGroup) { Address: "0.0.0.0:8888", }) - cronServer := server.Pub.SubscribeTopic(func(v interface{}) bool { - if key, ok := v.(string); ok { - if strings.HasPrefix(key, prefix) { - return true - } - } - return false - }) + server.StartCron() - go func() { - log.Println("cronServer:topic:", <-cronServer) - }() - - err := server.StartUp() - if err != nil { - log.Panicf("创建服务失败:%v\n", err) - return - } + server.StartUp() <-make(chan bool) @@ -69,8 +52,9 @@ func testCron(wg *sync.WaitGroup) { _, _ = c.AddFunc("*/15 * * * * *", func() { server.Send(&pb.PublishRequest{ - Id: gouuid.GetUuId(), - Value: prefix + "wechat.send" + " 我是定时任务", + Id: gouuid.GetUuId(), + //Value: prefix + "wechat.send" + " 我是定时任务", + Value: prefixSprintf("127.0.0.1") + "wechat.send" + " 我是定时任务", Ip: "127.0.0.1", }) @@ -80,8 +64,9 @@ func testCron(wg *sync.WaitGroup) { _, _ = c.AddFunc("*/30 * * * * *", func() { server.Send(&pb.PublishRequest{ - Id: gouuid.GetUuId(), - Value: prefix + "wechat.send" + " 我是定时任务", + Id: gouuid.GetUuId(), + //Value: prefix + "wechat.send" + " 我是定时任务", + Value: prefixSprintf("14.155.157.19") + "wechat.send" + " 我是定时任务", Ip: "14.155.157.19", }) @@ -99,21 +84,17 @@ func testCron(wg *sync.WaitGroup) { func testWorker1(wg *sync.WaitGroup) { - server := NewCron(&CronConfig{ - Address: "localhost:8888", + server := NewWorker(&WorkerConfig{ + Address: "localhost:8888", + ClientIp: "127.0.0.1", }) defer server.Conn.Close() - // 订阅服务,传入参数是 cron: - // 会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 - stream, err := server.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{ - Id: gouuid.GetUuId(), - Value: prefix, - Ip: "127.0.0.1", - }) - if err != nil { - log.Printf("[跑业务1]发送失败:%v\n", err) - } + // 订阅服务 + server.SubscribeCron() + + // 启动任务,会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 + stream := server.StartCron() // 阻塞遍历流,输出结果 for { @@ -126,7 +107,7 @@ func testWorker1(wg *sync.WaitGroup) { log.Println("[跑业务1]异常:", err.Error()) break } - log.Println("[跑业务1]:", reply) + log.Println("[跑业务1]收到:", reply) } wg.Done() @@ -134,21 +115,17 @@ func testWorker1(wg *sync.WaitGroup) { func testWorker2(wg *sync.WaitGroup) { - server := NewCron(&CronConfig{ - Address: "localhost:8888", + server := NewWorker(&WorkerConfig{ + Address: "localhost:8888", + ClientIp: "14.155.157.19", }) defer server.Conn.Close() - // 订阅服务,传入参数是 cron: - // 会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 - stream, err := server.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{ - Id: gouuid.GetUuId(), - Value: prefix, - Ip: "14.155.157.19", - }) - if err != nil { - log.Printf("[跑业务2]发送失败:%v\n", err) - } + // 订阅服务 + server.SubscribeCron() + + // 启动任务,会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 + stream := server.StartCron() // 阻塞遍历流,输出结果 for { @@ -161,7 +138,7 @@ func testWorker2(wg *sync.WaitGroup) { log.Println("[跑业务2]异常:", err.Error()) break } - log.Println("[跑业务2]:", reply) + log.Println("[跑业务2]收到:", reply) } wg.Done() diff --git a/server.go b/server.go index a3f3e7c..979cf4a 100644 --- a/server.go +++ b/server.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc" "log" "net" + "strings" "time" ) @@ -48,6 +49,22 @@ func NewServer(config *ServerConfig) *Server { return s } +// StartCron 启动定时任务 +func (s *Server) StartCron() { + cron := s.Pub.SubscribeTopic(func(v interface{}) bool { + if key, ok := v.(string); ok { + if strings.HasPrefix(key, prefix) { + return true + } + } + return false + }) + + go func() { + log.Println("cron:topic:", <-cron) + }() +} + // StartUp 启动 func (s *Server) StartUp() { diff --git a/worker.go b/worker.go index dabccfb..24a146c 100644 --- a/worker.go +++ b/worker.go @@ -1,13 +1,16 @@ package gojobs import ( + "context" "go.dtapp.net/gojobs/pb" + "go.dtapp.net/gouuid" "google.golang.org/grpc" ) // WorkerConfig 工作配置 type WorkerConfig struct { - Address string // 服务端口 127.0.0.1:8888 + Address string // 服务端口 127.0.0.1:8888 + ClientIp string // 自己的ip地址 } // Worker 工作 @@ -23,10 +26,14 @@ func NewWorker(config *WorkerConfig) *Worker { if config.Address == "" { panic("[工作线]请填写服务端口") } + if config.ClientIp == "" { + panic("[定时任务]请填写ip地址") + } w := &Worker{} w.Address = config.Address + w.ClientIp = config.ClientIp var err error @@ -41,3 +48,28 @@ func NewWorker(config *WorkerConfig) *Worker { return w } + +// SubscribeCron 订阅服务 +func (w *Worker) SubscribeCron() { + _, err := w.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{ + Id: gouuid.GetUuId(), + Value: prefix, + Ip: w.ClientIp, + }) + if err != nil { + panic("[工作线]{订阅服务失败}" + err.Error()) + } +} + +// StartCron 启动任务 +func (w *Worker) StartCron() pb.PubSub_SubscribeClient { + stream, err := w.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{ + Id: gouuid.GetUuId(), + Value: prefixSprintf(w.ClientIp), + Ip: w.ClientIp, + }) + if err != nil { + panic("[工作线]{启动任务失败}" + err.Error()) + } + return stream +}