diff --git a/cron.go b/cron.go new file mode 100644 index 0000000..d14c5c2 --- /dev/null +++ b/cron.go @@ -0,0 +1,111 @@ +package gojobs + +import ( + "github.com/pkg/errors" + "github.com/robfig/cron/v3" + "sync" +) + +// Cron 定时任务管理器 +type Cron struct { + inner *cron.Cron + ids map[string]cron.EntryID + mutex sync.Mutex +} + +// NewCron 创建一个定时任务管理器 +func NewCron() *Cron { + return &Cron{ + inner: cron.New(cron.WithSeconds()), + ids: make(map[string]cron.EntryID), + } +} + +// Start 启动任务 +func (c *Cron) Start() { + c.inner.Start() +} + +// Stop 关闭任务 +func (c *Cron) Stop() { + c.inner.Stop() +} + +// DelByID 删除任务 +// id:唯一任务id +func (c *Cron) DelByID(id string) { + c.mutex.Lock() + defer c.mutex.Unlock() + + eid, ok := c.ids[id] + if !ok { + return + } + c.inner.Remove(eid) + delete(c.ids, id) +} + +// AddJobByInterface 实现接口的方式添加定时任务 +// id:唯一任务id +// spec:配置定时执行时间表达式 +// cmd:需要执行的任务方法 +func (c *Cron) AddJobByInterface(id string, spec string, cmd cron.Job) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + if _, ok := c.ids[id]; ok { + return errors.Errorf("任务已存在") + } + eid, err := c.inner.AddJob(spec, cmd) + if err != nil { + return err + } + c.ids[id] = eid + return nil +} + +// AddJobByFunc 添加函数作为定时任务 +// id:唯一任务id +// spec:配置定时执行时间表达式 +// f:需要执行的任务方法 +func (c *Cron) AddJobByFunc(id string, spec string, f func()) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + if _, ok := c.ids[id]; ok { + return errors.Errorf("任务已存在") + } + eid, err := c.inner.AddFunc(spec, f) + if err != nil { + return err + } + c.ids[id] = eid + return nil +} + +// IsExistsJob 判断是否存在任务 +// id:唯一任务id +func (c *Cron) IsExistsJob(id string) bool { + _, exist := c.ids[id] + return exist +} + +// Ids ... +func (c *Cron) Ids() []string { + c.mutex.Lock() + defer c.mutex.Unlock() + validIds := make([]string, 0, len(c.ids)) + invalidIds := make([]string, 0) + for sid, eid := range c.ids { + e := c.inner.Entry(eid) + if e.ID != eid { + invalidIds = append(invalidIds, sid) + continue + } + validIds = append(validIds, sid) + } + for _, id := range validIds { + delete(c.ids, id) + } + return validIds +} diff --git a/grpc_cron.go b/grpc_cron.go index e804727..1e92fc7 100644 --- a/grpc_cron.go +++ b/grpc_cron.go @@ -12,21 +12,21 @@ type CronConfig struct { Address string // 服务端口 127.0.0.1:8888 } -// Cron 定时任务 -type Cron struct { +// GrpcCron 定时任务 +type GrpcCron struct { CronConfig // 配置 Pub pb.PubSubClient // 订阅 Conn *grpc.ClientConn // 链接信息 } -// NewCron 创建定时任务 -func NewCron(config *CronConfig) *Cron { +// NewGrpcCron 创建定时任务 +func NewGrpcCron(config *CronConfig) *GrpcCron { if config.Address == "" { panic("[定时任务]请填写服务端口") } - c := &Cron{} + c := &GrpcCron{} c.Address = config.Address @@ -45,7 +45,7 @@ func NewCron(config *CronConfig) *Cron { } // Send 发送 -func (c *Cron) Send(in *pb.PublishRequest) (*pb.PublishResponse, error) { +func (c *GrpcCron) Send(in *pb.PublishRequest) (*pb.PublishResponse, error) { log.Printf("[定时任务]{广播开始}编号:%s 类型:%s ip:%s\n", in.GetId(), in.GetValue(), in.GetIp()) stream, err := c.Pub.Publish(context.Background(), in) if err != nil { diff --git a/jobs_test.go b/jobs_test.go index d60feb3..f79863b 100644 --- a/jobs_test.go +++ b/jobs_test.go @@ -1,18 +1,16 @@ package gojobs -import "testing" +import ( + "testing" +) func TestSpec(t *testing.T) { - t.Log(GetSpecSeconds(10)) - t.Log(GetFrequencySeconds(10)) + t.Log("每隔n秒执行一次:", GetSpecSeconds(10)) + t.Log("每隔n秒执行一次:", GetFrequencySeconds(10)) - t.Log(GetSpecMinutes(1)) - t.Log(GetFrequencyMinutes(1)) - t.Log(GetSpecMinutes(10)) - t.Log(GetFrequencyMinutes(10)) - t.Log(GetSpecMinutes(30)) - t.Log(GetFrequencyMinutes(30)) + t.Log("每隔n分钟执行一次:", GetSpecMinutes(10)) + t.Log("每隔n分钟执行一次:", GetFrequencyMinutes(10)) - t.Log(GetSpecHour(10)) - t.Log(GetFrequencyHour(10)) + t.Log("每天n点执行一次:", GetSpecHour(10)) + t.Log("每天n点执行一次:", GetFrequencyHour(10)) }