- 优化任务

master
李光春 2 years ago
parent f1da238556
commit 63d838f853

@ -1,13 +1,11 @@
package gojobs
import (
"context"
"github.com/robfig/cron/v3"
"go.dtapp.net/gojobs/pb"
"go.dtapp.net/gouuid"
"io"
"log"
"strings"
"sync"
"testing"
"time"
@ -31,24 +29,9 @@ func testServer(wg *sync.WaitGroup) {
Address: "0.0.0.0:8888",
})
cronServer := server.Pub.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, prefix) {
return true
}
}
return false
})
server.StartCron()
go func() {
log.Println("cronServertopic:", <-cronServer)
}()
err := server.StartUp()
if err != nil {
log.Panicf("创建服务失败:%v\n", err)
return
}
server.StartUp()
<-make(chan bool)
@ -69,8 +52,9 @@ func testCron(wg *sync.WaitGroup) {
_, _ = c.AddFunc("*/15 * * * * *", func() {
server.Send(&pb.PublishRequest{
Id: gouuid.GetUuId(),
Value: prefix + "wechat.send" + " 我是定时任务",
Id: gouuid.GetUuId(),
//Value: prefix + "wechat.send" + " 我是定时任务",
Value: prefixSprintf("127.0.0.1") + "wechat.send" + " 我是定时任务",
Ip: "127.0.0.1",
})
@ -80,8 +64,9 @@ func testCron(wg *sync.WaitGroup) {
_, _ = c.AddFunc("*/30 * * * * *", func() {
server.Send(&pb.PublishRequest{
Id: gouuid.GetUuId(),
Value: prefix + "wechat.send" + " 我是定时任务",
Id: gouuid.GetUuId(),
//Value: prefix + "wechat.send" + " 我是定时任务",
Value: prefixSprintf("14.155.157.19") + "wechat.send" + " 我是定时任务",
Ip: "14.155.157.19",
})
@ -99,21 +84,17 @@ func testCron(wg *sync.WaitGroup) {
func testWorker1(wg *sync.WaitGroup) {
server := NewCron(&CronConfig{
Address: "localhost:8888",
server := NewWorker(&WorkerConfig{
Address: "localhost:8888",
ClientIp: "127.0.0.1",
})
defer server.Conn.Close()
// 订阅服务,传入参数是 cron:
// 会想过滤器函数,订阅者应该收到的信息为 cron:任务名称
stream, err := server.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{
Id: gouuid.GetUuId(),
Value: prefix,
Ip: "127.0.0.1",
})
if err != nil {
log.Printf("[跑业务1]发送失败:%v\n", err)
}
// 订阅服务
server.SubscribeCron()
// 启动任务,会想过滤器函数,订阅者应该收到的信息为 cron:任务名称
stream := server.StartCron()
// 阻塞遍历流,输出结果
for {
@ -126,7 +107,7 @@ func testWorker1(wg *sync.WaitGroup) {
log.Println("[跑业务1]异常:", err.Error())
break
}
log.Println("[跑业务1]:", reply)
log.Println("[跑业务1]收到:", reply)
}
wg.Done()
@ -134,21 +115,17 @@ func testWorker1(wg *sync.WaitGroup) {
func testWorker2(wg *sync.WaitGroup) {
server := NewCron(&CronConfig{
Address: "localhost:8888",
server := NewWorker(&WorkerConfig{
Address: "localhost:8888",
ClientIp: "14.155.157.19",
})
defer server.Conn.Close()
// 订阅服务,传入参数是 cron:
// 会想过滤器函数,订阅者应该收到的信息为 cron:任务名称
stream, err := server.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{
Id: gouuid.GetUuId(),
Value: prefix,
Ip: "14.155.157.19",
})
if err != nil {
log.Printf("[跑业务2]发送失败:%v\n", err)
}
// 订阅服务
server.SubscribeCron()
// 启动任务,会想过滤器函数,订阅者应该收到的信息为 cron:任务名称
stream := server.StartCron()
// 阻塞遍历流,输出结果
for {
@ -161,7 +138,7 @@ func testWorker2(wg *sync.WaitGroup) {
log.Println("[跑业务2]异常:", err.Error())
break
}
log.Println("[跑业务2]:", reply)
log.Println("[跑业务2]收到:", reply)
}
wg.Done()

@ -7,6 +7,7 @@ import (
"google.golang.org/grpc"
"log"
"net"
"strings"
"time"
)
@ -48,6 +49,22 @@ func NewServer(config *ServerConfig) *Server {
return s
}
// StartCron 启动定时任务
func (s *Server) StartCron() {
cron := s.Pub.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
if strings.HasPrefix(key, prefix) {
return true
}
}
return false
})
go func() {
log.Println("crontopic:", <-cron)
}()
}
// StartUp 启动
func (s *Server) StartUp() {

@ -1,13 +1,16 @@
package gojobs
import (
"context"
"go.dtapp.net/gojobs/pb"
"go.dtapp.net/gouuid"
"google.golang.org/grpc"
)
// WorkerConfig 工作配置
type WorkerConfig struct {
Address string // 服务端口 127.0.0.1:8888
Address string // 服务端口 127.0.0.1:8888
ClientIp string // 自己的ip地址
}
// Worker 工作
@ -23,10 +26,14 @@ func NewWorker(config *WorkerConfig) *Worker {
if config.Address == "" {
panic("[工作线]请填写服务端口")
}
if config.ClientIp == "" {
panic("[定时任务]请填写ip地址")
}
w := &Worker{}
w.Address = config.Address
w.ClientIp = config.ClientIp
var err error
@ -41,3 +48,28 @@ func NewWorker(config *WorkerConfig) *Worker {
return w
}
// SubscribeCron 订阅服务
func (w *Worker) SubscribeCron() {
_, err := w.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{
Id: gouuid.GetUuId(),
Value: prefix,
Ip: w.ClientIp,
})
if err != nil {
panic("[工作线]{订阅服务失败}" + err.Error())
}
}
// StartCron 启动任务
func (w *Worker) StartCron() pb.PubSub_SubscribeClient {
stream, err := w.Pub.Subscribe(context.Background(), &pb.SubscribeRequest{
Id: gouuid.GetUuId(),
Value: prefixSprintf(w.ClientIp),
Ip: w.ClientIp,
})
if err != nil {
panic("[工作线]{启动任务失败}" + err.Error())
}
return stream
}

Loading…
Cancel
Save