- repair bug

master
李光春 2 years ago
parent cdc7ff7fbf
commit 8f50892a5e

@ -11,8 +11,10 @@ func configIp() {
ip = goip.GetOutsideIp()
}
const prefix = "cron_%s:"
const prefix = "cron:"
func prefixSprintf() string {
return fmt.Sprintf(prefix, ip)
const prefixIp = "cron_%s:"
func prefixSprintf(str string) string {
return fmt.Sprintf(prefixIp, str)
}

@ -3,6 +3,9 @@ module go.dtapp.net/gojobs
go 1.18
require (
github.com/robfig/cron/v3 v3.0.1
go.dtapp.net/goip v1.0.16
go.dtapp.net/gouuid v1.0.0
google.golang.org/grpc v1.46.2
google.golang.org/protobuf v1.28.0
)
@ -12,7 +15,6 @@ require (
github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda // indirect
github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f // indirect
github.com/ulikunitz/xz v0.5.10 // indirect
go.dtapp.net/goip v1.0.16 // indirect
go.dtapp.net/gorequest v1.0.18 // indirect
go.dtapp.net/gostring v1.0.3 // indirect
go.dtapp.net/gotime v1.0.2 // indirect

@ -47,9 +47,12 @@ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda h1:h+YpzUB/bGVJcLqW+d5GghcCmE/A25KbzjXvWJQi/+o=
github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda/go.mod h1:MSotTrCv1PwoR8QgU1JurEx+lNNbtr25I+m0zbLyAGw=
github.com/saracen/go7z-fixtures v0.0.0-20190623165746-aa6b8fba1d2f h1:PF9WV5j/x6MT+x/sauUHd4objCvJbZb0wdxZkHSdd5A=
github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f h1:1cJITU3JUI8qNS5T0BlXwANsVdyoJQHQ4hvOxbunPCw=
github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f/go.mod h1:LyBTue+RWeyIfN3ZJ4wVxvDuvlGJtDgCLgCb6HCPgps=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -65,6 +68,8 @@ go.dtapp.net/gostring v1.0.3 h1:KSOq4D77/g5yZN/bqWfZ0kOOaPr/P1240vg03+XdENI=
go.dtapp.net/gostring v1.0.3/go.mod h1:+ggrOvgQDQturi1QGsXEpyRN/ZPoRDaqhMujIk5lrgQ=
go.dtapp.net/gotime v1.0.2 h1:CFIJHQXC/4t9bsJhk2cLhjHd6rpdPcJXr8BcHKHDuQo=
go.dtapp.net/gotime v1.0.2/go.mod h1:Gq7eNLr2iMLP18UNWONRq4V3Uhf/ADp4bIrS+Tc6ktY=
go.dtapp.net/gouuid v1.0.0 h1:JCTkrsAmylcVBxsnt2Hu8e0KZUQCdv6KXCLNLrVxXYI=
go.dtapp.net/gouuid v1.0.0/go.mod h1:OwgJyZcvMDelwWIdicxMwJqQiIK82F3XvBizNEfYP2U=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
@ -110,6 +115,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=

@ -2,7 +2,9 @@ package gojobs
import (
"context"
"github.com/robfig/cron/v3"
"go.dtapp.net/gojobs/pb"
"go.dtapp.net/gouuid"
"io"
"log"
"strings"
@ -13,10 +15,11 @@ import (
func TestJobs(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(3)
wg.Add(4)
go testServer(&wg)
go testCron(&wg)
go testWorker(&wg)
go testWorker1(&wg)
go testWorker2(&wg)
wg.Wait()
}
@ -29,9 +32,8 @@ func testServer(wg *sync.WaitGroup) {
})
cronServer := server.Pub.SubscribeTopic(func(v interface{}) bool {
log.Println("SubscribeTopic:", v)
if key, ok := v.(string); ok {
if strings.HasPrefix(key, "cron:") {
if strings.HasPrefix(key, prefix) {
return true
}
}
@ -60,23 +62,77 @@ func testCron(wg *sync.WaitGroup) {
})
defer server.Conn.Close()
t1 := time.NewTimer(time.Second * 10)
for {
select {
case <-t1.C:
// 创建一个cron实例 精确到秒
c := cron.New(cron.WithSeconds())
// 每隔15秒执行一次
_, _ = c.AddFunc("*/15 * * * * *", func() {
server.Send(&pb.PublishRequest{
Id: gouuid.GetUuId(),
Value: prefix + "wechat.send" + " 我是定时任务",
Ip: "127.0.0.1",
})
})
// 每隔30秒执行一次
_, _ = c.AddFunc("*/30 * * * * *", func() {
server.Send(&pb.PublishRequest{
Id: gouuid.GetUuId(),
Value: prefix + "wechat.send" + " 我是定时任务",
Ip: "14.155.157.19",
})
})
// 启动任务
c.Start()
// 关闭任务
defer c.Stop()
select {}
wg.Done()
}
func testWorker1(wg *sync.WaitGroup) {
server := NewCron(&CronConfig{
Address: "localhost:8888",
})
defer server.Conn.Close()
server.Send(&pb.PublishRequest{
Value: "cron:" + "wechat.send",
})
// 订阅服务,传入参数是 cron:
// 会想过滤器函数,订阅者应该收到的信息为 cron:任务名称
stream, err := server.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{
Id: gouuid.GetUuId(),
Value: prefix,
Ip: "127.0.0.1",
})
if err != nil {
log.Printf("[跑业务1]发送失败:%v\n", err)
}
t1.Reset(time.Second * 10)
// 阻塞遍历流,输出结果
for {
reply, err := stream.Recv()
if io.EOF == err {
log.Println("[跑业务1]已关闭:", err.Error())
break
}
if nil != err {
log.Println("[跑业务1]异常:", err.Error())
break
}
log.Println("[跑业务1]:", reply)
}
wg.Done()
}
func testWorker(wg *sync.WaitGroup) {
func testWorker2(wg *sync.WaitGroup) {
server := NewCron(&CronConfig{
Address: "localhost:8888",
@ -86,25 +142,26 @@ func testWorker(wg *sync.WaitGroup) {
// 订阅服务,传入参数是 cron:
// 会想过滤器函数,订阅者应该收到的信息为 cron:任务名称
stream, err := server.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{
Value: "cron:",
Ip: "127.0.0.1",
Id: gouuid.GetUuId(),
Value: prefix,
Ip: "14.155.157.19",
})
if err != nil {
log.Printf("[跑业务]发送失败:%v\n", err)
log.Printf("[跑业务2]发送失败:%v\n", err)
}
// 阻塞遍历流,输出结果
for {
reply, err := stream.Recv()
if io.EOF == err {
log.Println("[跑业务]已关闭:", err.Error())
log.Println("[跑业务2]已关闭:", err.Error())
break
}
if nil != err {
log.Println("[跑业务]异常:", err.Error())
log.Println("[跑业务2]异常:", err.Error())
break
}
log.Println("[跑业务]:", reply)
log.Println("[跑业务2]:", reply)
}
wg.Done()

Loading…
Cancel
Save