From 8f50892a5e63fb3ecad58425d2c16df72a6e12e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=85=89=E6=98=A5?= Date: Mon, 23 May 2022 21:25:49 +0800 Subject: [PATCH] - repair bug --- consts.go | 8 +++-- go.mod | 4 ++- go.sum | 6 ++++ jobs_test.go | 95 +++++++++++++++++++++++++++++++++++++++++----------- 4 files changed, 90 insertions(+), 23 deletions(-) diff --git a/consts.go b/consts.go index d45b03b..1c3ab57 100644 --- a/consts.go +++ b/consts.go @@ -11,8 +11,10 @@ func configIp() { ip = goip.GetOutsideIp() } -const prefix = "cron_%s:" +const prefix = "cron:" -func prefixSprintf() string { - return fmt.Sprintf(prefix, ip) +const prefixIp = "cron_%s:" + +func prefixSprintf(str string) string { + return fmt.Sprintf(prefixIp, str) } diff --git a/go.mod b/go.mod index 67f2596..b78b7cc 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,9 @@ module go.dtapp.net/gojobs go 1.18 require ( + github.com/robfig/cron/v3 v3.0.1 + go.dtapp.net/goip v1.0.16 + go.dtapp.net/gouuid v1.0.0 google.golang.org/grpc v1.46.2 google.golang.org/protobuf v1.28.0 ) @@ -12,7 +15,6 @@ require ( github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda // indirect github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f // indirect github.com/ulikunitz/xz v0.5.10 // indirect - go.dtapp.net/goip v1.0.16 // indirect go.dtapp.net/gorequest v1.0.18 // indirect go.dtapp.net/gostring v1.0.3 // indirect go.dtapp.net/gotime v1.0.2 // indirect diff --git a/go.sum b/go.sum index fd1a1ef..c37feb5 100644 --- a/go.sum +++ b/go.sum @@ -47,9 +47,12 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda h1:h+YpzUB/bGVJcLqW+d5GghcCmE/A25KbzjXvWJQi/+o= github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda/go.mod h1:MSotTrCv1PwoR8QgU1JurEx+lNNbtr25I+m0zbLyAGw= +github.com/saracen/go7z-fixtures v0.0.0-20190623165746-aa6b8fba1d2f h1:PF9WV5j/x6MT+x/sauUHd4objCvJbZb0wdxZkHSdd5A= github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f h1:1cJITU3JUI8qNS5T0BlXwANsVdyoJQHQ4hvOxbunPCw= github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f/go.mod h1:LyBTue+RWeyIfN3ZJ4wVxvDuvlGJtDgCLgCb6HCPgps= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -65,6 +68,8 @@ go.dtapp.net/gostring v1.0.3 h1:KSOq4D77/g5yZN/bqWfZ0kOOaPr/P1240vg03+XdENI= go.dtapp.net/gostring v1.0.3/go.mod h1:+ggrOvgQDQturi1QGsXEpyRN/ZPoRDaqhMujIk5lrgQ= go.dtapp.net/gotime v1.0.2 h1:CFIJHQXC/4t9bsJhk2cLhjHd6rpdPcJXr8BcHKHDuQo= go.dtapp.net/gotime v1.0.2/go.mod h1:Gq7eNLr2iMLP18UNWONRq4V3Uhf/ADp4bIrS+Tc6ktY= +go.dtapp.net/gouuid v1.0.0 h1:JCTkrsAmylcVBxsnt2Hu8e0KZUQCdv6KXCLNLrVxXYI= +go.dtapp.net/gouuid v1.0.0/go.mod h1:OwgJyZcvMDelwWIdicxMwJqQiIK82F3XvBizNEfYP2U= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -110,6 +115,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= diff --git a/jobs_test.go b/jobs_test.go index 27b575c..d595047 100644 --- a/jobs_test.go +++ b/jobs_test.go @@ -2,7 +2,9 @@ package gojobs import ( "context" + "github.com/robfig/cron/v3" "go.dtapp.net/gojobs/pb" + "go.dtapp.net/gouuid" "io" "log" "strings" @@ -13,10 +15,11 @@ import ( func TestJobs(t *testing.T) { wg := sync.WaitGroup{} - wg.Add(3) + wg.Add(4) go testServer(&wg) go testCron(&wg) - go testWorker(&wg) + go testWorker1(&wg) + go testWorker2(&wg) wg.Wait() } @@ -29,9 +32,8 @@ func testServer(wg *sync.WaitGroup) { }) cronServer := server.Pub.SubscribeTopic(func(v interface{}) bool { - log.Println("SubscribeTopic:", v) if key, ok := v.(string); ok { - if strings.HasPrefix(key, "cron:") { + if strings.HasPrefix(key, prefix) { return true } } @@ -60,23 +62,77 @@ func testCron(wg *sync.WaitGroup) { }) defer server.Conn.Close() - t1 := time.NewTimer(time.Second * 10) - for { - select { - case <-t1.C: + // 创建一个cron实例 精确到秒 + c := cron.New(cron.WithSeconds()) + + // 每隔15秒执行一次 + _, _ = c.AddFunc("*/15 * * * * *", func() { + + server.Send(&pb.PublishRequest{ + Id: gouuid.GetUuId(), + Value: prefix + "wechat.send" + " 我是定时任务", + Ip: "127.0.0.1", + }) + + }) + + // 每隔30秒执行一次 + _, _ = c.AddFunc("*/30 * * * * *", func() { + + server.Send(&pb.PublishRequest{ + Id: gouuid.GetUuId(), + Value: prefix + "wechat.send" + " 我是定时任务", + Ip: "14.155.157.19", + }) + + }) + + // 启动任务 + c.Start() + + // 关闭任务 + defer c.Stop() + select {} + + wg.Done() +} + +func testWorker1(wg *sync.WaitGroup) { + + server := NewCron(&CronConfig{ + Address: "localhost:8888", + }) + defer server.Conn.Close() - server.Send(&pb.PublishRequest{ - Value: "cron:" + "wechat.send", - }) + // 订阅服务,传入参数是 cron: + // 会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 + stream, err := server.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{ + Id: gouuid.GetUuId(), + Value: prefix, + Ip: "127.0.0.1", + }) + if err != nil { + log.Printf("[跑业务1]发送失败:%v\n", err) + } - t1.Reset(time.Second * 10) + // 阻塞遍历流,输出结果 + for { + reply, err := stream.Recv() + if io.EOF == err { + log.Println("[跑业务1]已关闭:", err.Error()) + break } + if nil != err { + log.Println("[跑业务1]异常:", err.Error()) + break + } + log.Println("[跑业务1]:", reply) } wg.Done() } -func testWorker(wg *sync.WaitGroup) { +func testWorker2(wg *sync.WaitGroup) { server := NewCron(&CronConfig{ Address: "localhost:8888", @@ -86,25 +142,26 @@ func testWorker(wg *sync.WaitGroup) { // 订阅服务,传入参数是 cron: // 会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 stream, err := server.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{ - Value: "cron:", - Ip: "127.0.0.1", + Id: gouuid.GetUuId(), + Value: prefix, + Ip: "14.155.157.19", }) if err != nil { - log.Printf("[跑业务]发送失败:%v\n", err) + log.Printf("[跑业务2]发送失败:%v\n", err) } // 阻塞遍历流,输出结果 for { reply, err := stream.Recv() if io.EOF == err { - log.Println("[跑业务]已关闭:", err.Error()) + log.Println("[跑业务2]已关闭:", err.Error()) break } if nil != err { - log.Println("[跑业务]异常:", err.Error()) + log.Println("[跑业务2]异常:", err.Error()) break } - log.Println("[跑业务]:", reply) + log.Println("[跑业务2]:", reply) } wg.Done()