diff --git a/CHANGELOG.md b/CHANGELOG.md index 895154b..48f44fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## v1.0.22-28 +## v1.0.22-29 - update diff --git a/consts.go b/grpc.go similarity index 100% rename from consts.go rename to grpc.go diff --git a/client.go b/grpc_client.go similarity index 100% rename from client.go rename to grpc_client.go diff --git a/cron.go b/grpc_cron.go similarity index 100% rename from cron.go rename to grpc_cron.go diff --git a/server.go b/grpc_server.go similarity index 100% rename from server.go rename to grpc_server.go diff --git a/grpc_server_test.go b/grpc_server_test.go new file mode 100644 index 0000000..032b2a2 --- /dev/null +++ b/grpc_server_test.go @@ -0,0 +1,147 @@ +package gojobs + +import ( + "github.com/robfig/cron/v3" + "go.dtapp.net/gojobs/pb" + "go.dtapp.net/gouuid" + "io" + "log" + "sync" + "testing" + "time" +) + +func TestJobs(t *testing.T) { + wg := sync.WaitGroup{} + wg.Add(4) + go testServer(&wg) + go testCron(&wg) + go testWorker1(&wg) + go testWorker2(&wg) + wg.Wait() +} + +func testServer(wg *sync.WaitGroup) { + + server := NewServer(&ServerConfig{ + PublishTimeout: time.Millisecond * 100, + PubBuffer: 10, + Address: "0.0.0.0:8888", + }) + + // 启动定时任务 + server.StartCron() + + // 启动服务 + server.StartUp() + + <-make(chan bool) + + wg.Done() +} + +func testCron(wg *sync.WaitGroup) { + + server := NewCron(&CronConfig{ + Address: "localhost:8888", + }) + defer server.Conn.Close() + + // 创建一个cron实例 精确到秒 + c := cron.New(cron.WithSeconds()) + + // 每隔15秒执行一次 + _, _ = c.AddFunc("*/15 * * * * *", func() { + + server.Send(&pb.PublishRequest{ + Id: gouuid.GetUuId(), + Value: prefix, + Method: "wechat.1.send", + Ip: "127.0.0.1", + }) + + }) + + // 每隔30秒执行一次 + _, _ = c.AddFunc("*/30 * * * * *", func() { + + server.Send(&pb.PublishRequest{ + Id: gouuid.GetUuId(), + Value: prefix, + Method: "wechat.2.send", + Ip: "14.155.157.19", + }) + + }) + + // 启动任务 + c.Start() + + // 关闭任务 + defer c.Stop() + select {} + + wg.Done() +} + +func testWorker1(wg *sync.WaitGroup) { + + server := NewWorker(&WorkerConfig{ + Address: "localhost:8888", + ClientIp: "127.0.0.1", + }) + defer server.Conn.Close() + + // 订阅服务 + stream := server.SubscribeCron() + + // 启动任务,会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 + //stream := server.StartCron() + + // 阻塞遍历流,输出结果 + for { + reply, err := stream.Recv() + if io.EOF == err { + log.Println("[跑业务1]已关闭:", err.Error()) + break + } + if nil != err { + log.Println("[跑业务1]异常:", err.Error()) + break + } + log.Printf("[跑业务1]{收到}编号:%s 方法:%s\n", reply.GetId(), reply.GetMethod()) + } + + wg.Done() +} + +func testWorker2(wg *sync.WaitGroup) { + + server := NewWorker(&WorkerConfig{ + Address: "localhost:8888", + ClientIp: "14.155.157.19", + }) + defer server.Conn.Close() + + // 订阅服务 + stream := server.SubscribeCron() + + // 启动任务,会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 + //stream := server.StartCron() + + // 阻塞遍历流,输出结果 + for { + reply, err := stream.Recv() + if io.EOF == err { + log.Println("[跑业务2]已关闭:", err.Error()) + break + } + if nil != err { + log.Println("[跑业务2]异常:", err.Error()) + break + } + log.Printf("[跑业务2]{收到}编号:%s 方法:%s\n", reply.GetId(), reply.GetMethod()) + } + + wg.Done() +} diff --git a/worker.go b/grpc_worker.go similarity index 100% rename from worker.go rename to grpc_worker.go diff --git a/jobs.go b/jobs.go new file mode 100644 index 0000000..1c07121 --- /dev/null +++ b/jobs.go @@ -0,0 +1,70 @@ +package gojobs + +import ( + "fmt" + "net/http" +) + +const ( + CodeAbnormal = 0 // 异常 + CodeError = http.StatusInternalServerError // 失败 + CodeSuccess = http.StatusOK // 成功 + CodeEnd = http.StatusCreated // 结束 +) + +// 每隔n秒执行一次 +const specSeconds = "*/%d * * * * *" + +// GetSpecSeconds 每隔n秒执行一次 +var GetSpecSeconds = func(n int64) string { + if n < 0 && n > 59 { + return "" + } + return fmt.Sprintf(specSeconds, n) +} + +// GetFrequencySeconds 每隔n秒执行一次 +var GetFrequencySeconds = func(n int64) int64 { + if n < 0 && n > 59 { + return -1 + } + return n +} + +// 每隔n分钟执行一次 +const specMinutes = "0 */%d * * * *" + +// GetSpecMinutes 每隔n分钟执行一次 +var GetSpecMinutes = func(n int64) string { + if n < 0 && n > 59 { + return "" + } + return fmt.Sprintf(specMinutes, n) +} + +// GetFrequencyMinutes 每隔n分钟执行一次 +var GetFrequencyMinutes = func(n int64) int64 { + if n < 0 && n > 59 { + return -1 + } + return n * 60 +} + +// 每天n点执行一次 +const specHour = "0 0 */%d * * *" + +// GetSpecHour 每天n点执行一次 +var GetSpecHour = func(n int64) string { + if n < 0 && n > 23 { + return "" + } + return fmt.Sprintf(specHour, n) +} + +// GetFrequencyHour 每天n点执行一次 +var GetFrequencyHour = func(n int64) int64 { + if n < 0 && n > 23 { + return -1 + } + return n * 60 * 60 +} diff --git a/jobs_test.go b/jobs_test.go index 032b2a2..d60feb3 100644 --- a/jobs_test.go +++ b/jobs_test.go @@ -1,147 +1,18 @@ package gojobs -import ( - "github.com/robfig/cron/v3" - "go.dtapp.net/gojobs/pb" - "go.dtapp.net/gouuid" - "io" - "log" - "sync" - "testing" - "time" -) +import "testing" -func TestJobs(t *testing.T) { - wg := sync.WaitGroup{} - wg.Add(4) - go testServer(&wg) - go testCron(&wg) - go testWorker1(&wg) - go testWorker2(&wg) - wg.Wait() -} - -func testServer(wg *sync.WaitGroup) { - - server := NewServer(&ServerConfig{ - PublishTimeout: time.Millisecond * 100, - PubBuffer: 10, - Address: "0.0.0.0:8888", - }) - - // 启动定时任务 - server.StartCron() - - // 启动服务 - server.StartUp() - - <-make(chan bool) - - wg.Done() -} - -func testCron(wg *sync.WaitGroup) { - - server := NewCron(&CronConfig{ - Address: "localhost:8888", - }) - defer server.Conn.Close() - - // 创建一个cron实例 精确到秒 - c := cron.New(cron.WithSeconds()) - - // 每隔15秒执行一次 - _, _ = c.AddFunc("*/15 * * * * *", func() { - - server.Send(&pb.PublishRequest{ - Id: gouuid.GetUuId(), - Value: prefix, - Method: "wechat.1.send", - Ip: "127.0.0.1", - }) - - }) - - // 每隔30秒执行一次 - _, _ = c.AddFunc("*/30 * * * * *", func() { - - server.Send(&pb.PublishRequest{ - Id: gouuid.GetUuId(), - Value: prefix, - Method: "wechat.2.send", - Ip: "14.155.157.19", - }) - - }) - - // 启动任务 - c.Start() - - // 关闭任务 - defer c.Stop() - select {} - - wg.Done() -} - -func testWorker1(wg *sync.WaitGroup) { - - server := NewWorker(&WorkerConfig{ - Address: "localhost:8888", - ClientIp: "127.0.0.1", - }) - defer server.Conn.Close() - - // 订阅服务 - stream := server.SubscribeCron() - - // 启动任务,会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 - //stream := server.StartCron() - - // 阻塞遍历流,输出结果 - for { - reply, err := stream.Recv() - if io.EOF == err { - log.Println("[跑业务1]已关闭:", err.Error()) - break - } - if nil != err { - log.Println("[跑业务1]异常:", err.Error()) - break - } - log.Printf("[跑业务1]{收到}编号:%s 方法:%s\n", reply.GetId(), reply.GetMethod()) - } - - wg.Done() -} - -func testWorker2(wg *sync.WaitGroup) { - - server := NewWorker(&WorkerConfig{ - Address: "localhost:8888", - ClientIp: "14.155.157.19", - }) - defer server.Conn.Close() - - // 订阅服务 - stream := server.SubscribeCron() - - // 启动任务,会想过滤器函数,订阅者应该收到的信息为 cron:任务名称 - //stream := server.StartCron() +func TestSpec(t *testing.T) { + t.Log(GetSpecSeconds(10)) + t.Log(GetFrequencySeconds(10)) - // 阻塞遍历流,输出结果 - for { - reply, err := stream.Recv() - if io.EOF == err { - log.Println("[跑业务2]已关闭:", err.Error()) - break - } - if nil != err { - log.Println("[跑业务2]异常:", err.Error()) - break - } - log.Printf("[跑业务2]{收到}编号:%s 方法:%s\n", reply.GetId(), reply.GetMethod()) - } + t.Log(GetSpecMinutes(1)) + t.Log(GetFrequencyMinutes(1)) + t.Log(GetSpecMinutes(10)) + t.Log(GetFrequencyMinutes(10)) + t.Log(GetSpecMinutes(30)) + t.Log(GetFrequencyMinutes(30)) - wg.Done() + t.Log(GetSpecHour(10)) + t.Log(GetFrequencyHour(10)) } diff --git a/version.go b/version.go index 03d94e0..4d1ae51 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package gojobs -const Version = "1.0.28" +const Version = "1.0.29"