- update jobs
continuous-integration/drone/push Build is failing Details
continuous-integration/drone/tag Build is failing Details

master v1.0.30
李光春 2 years ago
parent f95054baa8
commit 4d2e183b47

@ -1,4 +1,4 @@
package jobs_gorm
package gojobs
import (
"fmt"
@ -17,7 +17,7 @@ const specSeconds = "*/%d * * * * *"
// GetSpecSeconds 每隔n秒执行一次
var GetSpecSeconds = func(n int64) string {
if n < 0 && n > 59 {
if n < 0 || n > 59 {
return ""
}
return fmt.Sprintf(specSeconds, n)
@ -25,7 +25,7 @@ var GetSpecSeconds = func(n int64) string {
// GetFrequencySeconds 每隔n秒执行一次
var GetFrequencySeconds = func(n int64) int64 {
if n < 0 && n > 59 {
if n < 0 || n > 59 {
return -1
}
return n
@ -36,7 +36,7 @@ const specMinutes = "0 */%d * * * *"
// GetSpecMinutes 每隔n分钟执行一次
var GetSpecMinutes = func(n int64) string {
if n < 0 && n > 59 {
if n < 0 || n > 59 {
return ""
}
return fmt.Sprintf(specMinutes, n)
@ -44,7 +44,7 @@ var GetSpecMinutes = func(n int64) string {
// GetFrequencyMinutes 每隔n分钟执行一次
var GetFrequencyMinutes = func(n int64) int64 {
if n < 0 && n > 59 {
if n < 0 || n > 59 {
return -1
}
return n * 60
@ -55,7 +55,7 @@ const specHour = "0 0 */%d * * *"
// GetSpecHour 每天n点执行一次
var GetSpecHour = func(n int64) string {
if n < 0 && n > 23 {
if n < 0 || n > 23 {
return ""
}
return fmt.Sprintf(specHour, n)
@ -63,7 +63,7 @@ var GetSpecHour = func(n int64) string {
// GetFrequencyHour 每天n点执行一次
var GetFrequencyHour = func(n int64) int64 {
if n < 0 && n > 23 {
if n < 0 || n > 23 {
return -1
}
return n * 60 * 60

@ -0,0 +1,20 @@
package gojobs
import "testing"
func TestSpec(t *testing.T) {
t.Log("每隔10秒执行一次", GetSpecSeconds(10))
t.Log("每隔10秒执行一次", GetFrequencySeconds(10))
t.Log("每隔60秒执行一次", GetSpecSeconds(60))
t.Log("每隔60秒执行一次", GetFrequencySeconds(60))
t.Log("每隔10分钟执行一次", GetSpecMinutes(10))
t.Log("每隔10分钟执行一次", GetFrequencyMinutes(10))
t.Log("每隔60分钟执行一次", GetSpecMinutes(60))
t.Log("每隔60分钟执行一次", GetFrequencyMinutes(60))
t.Log("每天n点执行一次", GetSpecHour(10))
t.Log("每天n点执行一次", GetFrequencyHour(10))
}

@ -0,0 +1,45 @@
package gojobs
import (
"fmt"
"github.com/jasonlvhit/gocron"
"log"
"testing"
)
func TestCron1(t *testing.T) {
// 创建一个cron实例 精确到秒
crontab := NewCron()
log.Println(crontab)
err := crontab.AddJobByFunc("1", "*/1 * * * * *", func() {
log.Println("哈哈哈哈")
})
if err != nil {
fmt.Printf("添加任务时出错:%s", err)
return
}
err = crontab.AddJobByFunc("2", "*/2 * * * * *", func() {
log.Println("啊啊啊啊")
})
if err != nil {
fmt.Printf("添加任务时出错:%s", err)
return
}
crontab.Start()
select {}
}
func TestCron2(t *testing.T) {
i := 0
s := gocron.NewScheduler()
s.Every(5).Seconds().Do(func() {
i++
log.Println("execute per 5 seconds", i)
})
<-s.Start()
}

@ -4,7 +4,9 @@ go 1.18
require (
gitee.com/chunanyong/zorm v1.5.5
github.com/beego/beego/v2 v2.0.3
github.com/beego/beego/v2 v2.0.4
github.com/jasonlvhit/gocron v0.0.1
github.com/pkg/errors v0.9.1
github.com/robfig/cron/v3 v3.0.1
go.dtapp.net/goarray v1.0.0
go.dtapp.net/goip v1.0.17
@ -35,7 +37,6 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda // indirect
github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f // indirect
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect

@ -27,8 +27,8 @@ github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6l
github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/beego/beego/v2 v2.0.3 h1:vLrjDsn3JcxvIUqduDs4i0BdWuu5v7YN2FRKQcTWIDI=
github.com/beego/beego/v2 v2.0.3/go.mod h1:svcOCy6uDaGYHwcO3nppzKwFigeXm8WHkZfgnvemYNM=
github.com/beego/beego/v2 v2.0.4 h1:1NjpVkcqYVdKE06VJTQUVzsgZqFcaj0MqjHna57bWsA=
github.com/beego/beego/v2 v2.0.4/go.mod h1:21YTlo+jRYqrM/dLC0knzmo9C25x0pqddoKqy8kxev8=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@ -97,6 +97,7 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-redis/redis v6.15.5+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
@ -239,6 +240,8 @@ github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0f
github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jasonlvhit/gocron v0.0.1 h1:qTt5qF3b3srDjeOIR4Le1LfeyvoYzJlYpqvG7tJX5YU=
github.com/jasonlvhit/gocron v0.0.1/go.mod h1:k9a3TV8VcU73XZxfVHCHWMWF9SOqgoku0/QlY2yvlA4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
@ -325,8 +328,10 @@ github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQ
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=

@ -1,70 +1,25 @@
package gojobs
import (
"fmt"
"net/http"
)
const (
CodeAbnormal = 0 // 异常
CodeError = http.StatusInternalServerError // 失败
CodeSuccess = http.StatusOK // 成功
CodeEnd = http.StatusCreated // 结束
TASK_IN = "IN" // 任务运行
TASK_SUCCESS = "SUCCESS" // 任务完成
TASK_ERROR = "ERROR" // 任务异常
TASK_TIMEOUT = "TIMEOUT" // 任务超时
TASK_WAIT = "WAIT" // 任务等待
)
// 每隔n秒执行一次
const specSeconds = "*/%d * * * * *"
// GetSpecSeconds 每隔n秒执行一次
var GetSpecSeconds = func(n int64) string {
if n < 0 && n > 59 {
return ""
}
return fmt.Sprintf(specSeconds, n)
}
// GetFrequencySeconds 每隔n秒执行一次
var GetFrequencySeconds = func(n int64) int64 {
if n < 0 && n > 59 {
return -1
}
return n
}
// 每隔n分钟执行一次
const specMinutes = "0 */%d * * * *"
// GetSpecMinutes 每隔n分钟执行一次
var GetSpecMinutes = func(n int64) string {
if n < 0 && n > 59 {
return ""
}
return fmt.Sprintf(specMinutes, n)
}
// GetFrequencyMinutes 每隔n分钟执行一次
var GetFrequencyMinutes = func(n int64) int64 {
if n < 0 && n > 59 {
return -1
}
return n * 60
}
// 每天n点执行一次
const specHour = "0 0 */%d * * *"
// GetSpecHour 每天n点执行一次
var GetSpecHour = func(n int64) string {
if n < 0 && n > 23 {
return ""
}
return fmt.Sprintf(specHour, n)
}
// GetFrequencyHour 每天n点执行一次
var GetFrequencyHour = func(n int64) int64 {
if n < 0 && n > 23 {
return -1
}
return n * 60 * 60
// Cron
type jobs interface {
// Run 运行
Run()
// RunAddLog 任务执行日志
RunAddLog()
// CreateInCustomId 创建正在运行任务
CreateInCustomId()
// CreateInCustomIdOnly 创建正在运行唯一任务
CreateInCustomIdOnly()
// CreateInCustomIdMaxNumber 创建正在运行任务并限制数量
CreateInCustomIdMaxNumber()
// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
CreateInCustomIdMaxNumberOnly()
}

@ -1,9 +0,0 @@
package jobs_common
const (
TASK_IN = "IN" // 任务运行
TASK_SUCCESS = "SUCCESS" // 任务完成
TASK_ERROR = "ERROR" // 任务异常
TASK_TIMEOUT = "TIMEOUT" // 任务超时
TASK_WAIT = "WAIT" // 任务等待
)

@ -3,37 +3,309 @@ package gojobs
import (
"errors"
"fmt"
"go.dtapp.net/gojobs/jobs_gorm"
"go.dtapp.net/goarray"
"go.dtapp.net/goip"
"go.dtapp.net/gojobs/jobs_gorm_model"
"go.dtapp.net/goredis"
"go.dtapp.net/gotime"
"go.dtapp.net/gouuid"
"gorm.io/gorm"
"log"
"runtime"
)
// ConfigJobsGorm 配置
type ConfigJobsGorm struct {
MainService int // 主要服务
Db *gorm.DB // 数据库
Redis *goredis.Client // 缓存数据库服务
}
func NewJobsGorm(config *ConfigJobsGorm) *jobs_gorm.JobsGorm {
// Gorm数据库驱动
type jobsGorm struct {
runVersion string // 运行版本
os string // 系统类型
arch string // 系统架构
maxProCs int // CPU核数
version string // GO版本
macAddrS string // Mac地址
insideIp string // 内网ip
outsideIp string // 外网ip
mainService int // 主要服务
db *gorm.DB // 数据库
redis *goredis.Client // 缓存数据库服务
}
// NewJobsGorm 初始化
func NewJobsGorm(config *ConfigJobsGorm) *jobsGorm {
var (
jobsGorm = &jobs_gorm.JobsGorm{}
j = &jobsGorm{}
)
jobsGorm = jobs_gorm.NewGorm(jobs_gorm.JobsGorm{
Db: config.Db,
Redis: config.Redis,
}, config.MainService, Version)
j.runVersion = Version
j.os = runtime.GOOS
j.arch = runtime.GOARCH
j.maxProCs = runtime.GOMAXPROCS(0)
j.version = runtime.Version()
j.macAddrS = goarray.TurnString(goip.GetMacAddr())
j.insideIp = goip.GetInsideIp()
j.outsideIp = goip.GetOutsideIp()
j.mainService = config.MainService
j.db = config.Db
j.redis = config.Redis
err := jobsGorm.Db.AutoMigrate(
&jobs_gorm.Task{},
&jobs_gorm.TaskLog{},
&jobs_gorm.TaskLogRun{},
&jobs_gorm.TaskIp{},
err := j.db.AutoMigrate(
&jobs_gorm_model.Task{},
&jobs_gorm_model.TaskLog{},
&jobs_gorm_model.TaskLogRun{},
&jobs_gorm_model.TaskIp{},
)
if err != nil {
panic(errors.New(fmt.Sprintf("创建任务模型失败:%v\n", err)))
}
return jobsGorm
return j
}
// Run 运行
func (j *jobsGorm) Run(info jobs_gorm_model.Task, status int, desc string) {
// 请求函数记录
statusCreate := j.db.Create(&jobs_gorm_model.TaskLog{
TaskId: info.Id,
StatusCode: status,
Desc: desc,
Version: j.runVersion,
CreatedAt: gotime.Current().Format(),
})
if statusCreate.RowsAffected == 0 {
log.Println("statusCreate", statusCreate.Error)
}
if status == 0 {
statusEdit := j.EditTask(j.db, info.Id).Select("run_id").Updates(jobs_gorm_model.Task{
RunId: gouuid.GetUuId(),
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
return
}
// 任务
if status == CodeSuccess {
// 执行成功
statusEdit := j.EditTask(j.db, info.Id).
Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").
Updates(jobs_gorm_model.Task{
StatusDesc: "执行成功",
Number: info.Number + 1,
RunId: gouuid.GetUuId(),
UpdatedIp: j.outsideIp,
UpdatedAt: gotime.Current().Format(),
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if status == CodeEnd {
// 执行成功、提前结束
statusEdit := j.EditTask(j.db, info.Id).
Select("status", "status_desc", "number", "updated_ip", "updated_at", "result").
Updates(jobs_gorm_model.Task{
Status: TASK_SUCCESS,
StatusDesc: "结束执行",
Number: info.Number + 1,
UpdatedIp: j.outsideIp,
UpdatedAt: gotime.Current().Format(),
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if status == CodeError {
// 执行失败
statusEdit := j.EditTask(j.db, info.Id).
Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").
Updates(jobs_gorm_model.Task{
StatusDesc: "执行失败",
Number: info.Number + 1,
RunId: gouuid.GetUuId(),
UpdatedIp: j.outsideIp,
UpdatedAt: gotime.Current().Format(),
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if info.MaxNumber != 0 {
if info.Number+1 >= info.MaxNumber {
// 关闭执行
statusEdit := j.EditTask(j.db, info.Id).
Select("status").
Updates(jobs_gorm_model.Task{
Status: TASK_TIMEOUT,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
}
}
// RunAddLog 任务执行日志
func (j *jobsGorm) RunAddLog(id uint, runId string) *gorm.DB {
return j.db.Create(&jobs_gorm_model.TaskLogRun{
TaskId: id,
RunId: runId,
InsideIp: j.insideIp,
OutsideIp: j.outsideIp,
Os: j.os,
Arch: j.arch,
Gomaxprocs: j.maxProCs,
GoVersion: j.version,
MacAddrs: j.macAddrS,
CreatedAt: gotime.Current().Format(),
})
}
// ConfigCreateInCustomId 创建正在运行任务
type ConfigCreateInCustomId struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomId 创建正在运行任务
func (j *jobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error {
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
RunId: gouuid.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdOnly 创建正在运行唯一任务
type ConfigCreateInCustomIdOnly struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdOnly 创建正在运行唯一任务
func (j *jobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error {
query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type))
}
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
RunId: gouuid.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdMaxNumber 创建正在运行任务并限制数量
type ConfigCreateInCustomIdMaxNumber struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
MaxNumber int64 // 最大次数
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdMaxNumber 创建正在运行任务并限制数量
func (j *jobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error {
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
MaxNumber: config.MaxNumber,
RunId: gouuid.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
type ConfigCreateInCustomIdMaxNumberOnly struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
MaxNumber int64 // 最大次数
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
func (j *jobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error {
query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type))
}
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
MaxNumber: config.MaxNumber,
RunId: gouuid.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}

@ -1,18 +0,0 @@
package jobs_gorm
import "testing"
func TestSpec(t *testing.T) {
t.Log(GetSpecSeconds(10))
t.Log(GetFrequencySeconds(10))
t.Log(GetSpecMinutes(1))
t.Log(GetFrequencyMinutes(1))
t.Log(GetSpecMinutes(10))
t.Log(GetFrequencyMinutes(10))
t.Log(GetSpecMinutes(30))
t.Log(GetFrequencyMinutes(30))
t.Log(GetSpecHour(10))
t.Log(GetFrequencyHour(10))
}

@ -1,19 +0,0 @@
package jobs_gorm
import (
"go.dtapp.net/goip"
"gorm.io/gorm"
)
// RefreshIp 刷新Ip
func (jobsGorm *JobsGorm) RefreshIp(tx *gorm.DB) {
xip := goip.GetOutsideIp()
if jobsGorm.outsideIp == "" || jobsGorm.outsideIp == "0.0.0.0" {
return
}
if jobsGorm.outsideIp == xip {
return
}
tx.Where("ips = ?", jobsGorm.outsideIp).Delete(&TaskIp{}) // 删除
jobsGorm.outsideIp = xip
}

@ -1,286 +0,0 @@
package jobs_gorm
import (
"errors"
"fmt"
"go.dtapp.net/goarray"
"go.dtapp.net/goip"
"go.dtapp.net/gojobs/jobs_common"
"go.dtapp.net/goredis"
"go.dtapp.net/gotime"
"go.dtapp.net/gouuid"
"gorm.io/gorm"
"log"
"runtime"
)
// JobsGorm 任务
type JobsGorm struct {
runVersion string // 运行版本
os string // 系统类型
arch string // 系统架构
maxProCs int // CPU核数
version string // GO版本
macAddrS string // Mac地址
insideIp string // 内网ip
outsideIp string // 外网ip
mainService int // 主要服务
Db *gorm.DB // 数据库
Redis *goredis.Client // 缓存数据库服务
}
// NewGorm 任务
func NewGorm(jobsGorm JobsGorm, mainService int, runVersion string) *JobsGorm {
jobsGorm.runVersion = runVersion
jobsGorm.os = runtime.GOOS
jobsGorm.arch = runtime.GOARCH
jobsGorm.maxProCs = runtime.GOMAXPROCS(0)
jobsGorm.version = runtime.Version()
jobsGorm.macAddrS = goarray.TurnString(goip.GetMacAddr())
jobsGorm.insideIp = goip.GetInsideIp()
jobsGorm.outsideIp = goip.GetOutsideIp()
jobsGorm.mainService = mainService
return &jobsGorm
}
// ConfigCreateInCustomId 创建正在运行任务
type ConfigCreateInCustomId struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomId 创建正在运行任务
func (jobsGorm *JobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error {
createStatus := config.Tx.Create(&Task{
Status: jobs_common.TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
RunId: gouuid.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: jobsGorm.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: jobsGorm.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdOnly 创建正在运行唯一任务
type ConfigCreateInCustomIdOnly struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdOnly 创建正在运行唯一任务
func (jobsGorm *JobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error {
query := jobsGorm.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type))
}
createStatus := config.Tx.Create(&Task{
Status: jobs_common.TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
RunId: gouuid.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: jobsGorm.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: jobsGorm.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdMaxNumber 创建正在运行任务并限制数量
type ConfigCreateInCustomIdMaxNumber struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
MaxNumber int64 // 最大次数
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdMaxNumber 创建正在运行任务并限制数量
func (jobsGorm *JobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error {
createStatus := config.Tx.Create(&Task{
Status: jobs_common.TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
MaxNumber: config.MaxNumber,
RunId: gouuid.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: jobsGorm.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: jobsGorm.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
type ConfigCreateInCustomIdMaxNumberOnly struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
MaxNumber int64 // 最大次数
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
func (jobsGorm *JobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error {
query := jobsGorm.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type))
}
createStatus := config.Tx.Create(&Task{
Status: jobs_common.TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
MaxNumber: config.MaxNumber,
RunId: gouuid.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: jobsGorm.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: jobsGorm.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// RunAddLog 任务执行日志
func (jobsGorm *JobsGorm) RunAddLog(tx *gorm.DB, id uint, runId string) *gorm.DB {
return tx.Create(&TaskLogRun{
TaskId: id,
RunId: runId,
InsideIp: jobsGorm.insideIp,
OutsideIp: jobsGorm.outsideIp,
Os: jobsGorm.os,
Arch: jobsGorm.arch,
Gomaxprocs: jobsGorm.maxProCs,
GoVersion: jobsGorm.version,
MacAddrs: jobsGorm.macAddrS,
CreatedAt: gotime.Current().Format(),
})
}
// Run 任务执行
func (jobsGorm *JobsGorm) Run(tx *gorm.DB, info Task, status int, desc string) {
// 请求函数记录
statusCreate := tx.Create(&TaskLog{
TaskId: info.Id,
StatusCode: status,
Desc: desc,
Version: jobsGorm.runVersion,
CreatedAt: gotime.Current().Format(),
})
if statusCreate.RowsAffected == 0 {
log.Println("statusCreate", statusCreate.Error)
}
if status == 0 {
statusEdit := jobsGorm.EditTask(tx, info.Id).Select("run_id").Updates(Task{
RunId: gouuid.GetUuId(),
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
return
}
// 任务
if status == CodeSuccess {
// 执行成功
statusEdit := jobsGorm.EditTask(tx, info.Id).
Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").
Updates(Task{
StatusDesc: "执行成功",
Number: info.Number + 1,
RunId: gouuid.GetUuId(),
UpdatedIp: jobsGorm.outsideIp,
UpdatedAt: gotime.Current().Format(),
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if status == CodeEnd {
// 执行成功、提前结束
statusEdit := jobsGorm.EditTask(tx, info.Id).
Select("status", "status_desc", "number", "updated_ip", "updated_at", "result").
Updates(Task{
Status: jobs_common.TASK_SUCCESS,
StatusDesc: "结束执行",
Number: info.Number + 1,
UpdatedIp: jobsGorm.outsideIp,
UpdatedAt: gotime.Current().Format(),
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if status == CodeError {
// 执行失败
statusEdit := jobsGorm.EditTask(tx, info.Id).
Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").
Updates(Task{
StatusDesc: "执行失败",
Number: info.Number + 1,
RunId: gouuid.GetUuId(),
UpdatedIp: jobsGorm.outsideIp,
UpdatedAt: gotime.Current().Format(),
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if info.MaxNumber != 0 {
if info.Number+1 >= info.MaxNumber {
// 关闭执行
statusEdit := jobsGorm.EditTask(tx, info.Id).
Select("status").
Updates(Task{
Status: jobs_common.TASK_TIMEOUT,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
}
}

@ -1,35 +0,0 @@
package jobs_gorm
import (
"fmt"
"go.dtapp.net/goredis"
"time"
)
// Lock 上锁
func (jobsGorm *JobsGorm) Lock(info Task, id any) string {
cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id)
judgeCache := jobsGorm.Redis.NewStringOperation().Get(cacheName).UnwrapOr("")
if judgeCache != "" {
return judgeCache
}
jobsGorm.Redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器上锁成功", jobsGorm.outsideIp), goredis.WithExpire(time.Millisecond*time.Duration(info.Frequency)*3))
return ""
}
// Unlock Lock 解锁
func (jobsGorm *JobsGorm) Unlock(info Task, id any) {
cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id)
jobsGorm.Redis.NewStringOperation().Del(cacheName)
}
// LockForever 永远上锁
func (jobsGorm *JobsGorm) LockForever(info Task, id any) string {
cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id)
judgeCache := jobsGorm.Redis.NewStringOperation().Get(cacheName).UnwrapOr("")
if judgeCache != "" {
return judgeCache
}
jobsGorm.Redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器永远上锁成功", jobsGorm.outsideIp))
return ""
}

@ -1,158 +0,0 @@
package jobs_gorm
import (
"go.dtapp.net/gojobs/jobs_common"
"gorm.io/gorm"
)
// Task 任务
type Task struct {
Id uint `gorm:"primaryKey;comment:记录编号" json:"id"` // 记录编号
Status string `gorm:"index;comment:状态码" json:"status"` // 状态码
Params string `gorm:"comment:参数" json:"params"` // 参数
ParamsType string `gorm:"comment:参数类型" json:"params_type"` // 参数类型
StatusDesc string `gorm:"comment:状态描述" json:"status_desc"` // 状态描述
Frequency int64 `gorm:"index;comment:频率(秒单位)" json:"frequency"` // 频率(秒单位)
Number int64 `gorm:"comment:当前次数" json:"number"` // 当前次数
MaxNumber int64 `gorm:"comment:最大次数" json:"max_number"` // 最大次数
RunId string `gorm:"comment:执行编号" json:"run_id"` // 执行编号
CustomId string `gorm:"index;comment:自定义编号" json:"custom_id"` // 自定义编号
CustomSequence int64 `gorm:"comment:自定义顺序" json:"custom_sequence"` // 自定义顺序
Type string `gorm:"index;comment:类型" json:"type"` // 类型
CreatedIp string `gorm:"comment:创建外网IP" json:"created_ip"` // 创建外网IP
SpecifyIp string `gorm:"comment:指定外网IP" json:"specify_ip"` // 指定外网IP
UpdatedIp string `gorm:"comment:更新外网IP" json:"updated_ip"` // 更新外网IP
Result string `gorm:"comment:结果" json:"result"` // 结果
CreatedAt string `gorm:"type:text;comment:创建时间" json:"created_at"` // 创建时间
UpdatedAt string `gorm:"type:text;comment:更新时间" json:"updated_at"` // 更新时间
DeletedAt gorm.DeletedAt `gorm:"type:text;index;comment:删除时间" json:"deleted_at"` // 删除时间
}
func (m *Task) TableName() string {
return "task"
}
// TaskTake 查询单任务
func (jobsGorm *JobsGorm) TaskTake(tx *gorm.DB, customId string) (result Task) {
tx.Where("custom_id = ?", customId).Take(&result)
return result
}
// 查询单任务
func (jobsGorm *JobsGorm) taskTake(tx *gorm.DB, customId, status string) (result Task) {
tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result)
return result
}
// TaskTakeIn 查询单任务 - 任务运行
func (jobsGorm *JobsGorm) TaskTakeIn(tx *gorm.DB, customId string) Task {
return jobsGorm.taskTake(tx, customId, jobs_common.TASK_IN)
}
// TaskTakeSuccess 查询单任务 - 任务完成
func (jobsGorm *JobsGorm) TaskTakeSuccess(tx *gorm.DB, customId string) Task {
return jobsGorm.taskTake(tx, customId, jobs_common.TASK_SUCCESS)
}
// TaskTakeError 查询单任务 - 任务异常
func (jobsGorm *JobsGorm) TaskTakeError(tx *gorm.DB, customId string) Task {
return jobsGorm.taskTake(tx, customId, jobs_common.TASK_ERROR)
}
// TaskTakeTimeout 查询单任务 - 任务超时
func (jobsGorm *JobsGorm) TaskTakeTimeout(tx *gorm.DB, customId string) Task {
return jobsGorm.taskTake(tx, customId, jobs_common.TASK_TIMEOUT)
}
// TaskTakeWait 查询单任务 - 任务等待
func (jobsGorm *JobsGorm) TaskTakeWait(tx *gorm.DB, customId string) Task {
return jobsGorm.taskTake(tx, customId, jobs_common.TASK_WAIT)
}
// TaskTypeTake 查询单任务
func (jobsGorm *JobsGorm) TaskTypeTake(tx *gorm.DB, customId, Type string) (result Task) {
tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result)
return result
}
// 查询单任务
func (jobsGorm *JobsGorm) taskTypeTake(tx *gorm.DB, customId, Type, status string) (result Task) {
tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result)
return result
}
// TaskTypeTakeIn 查询单任务 - 任务运行
func (jobsGorm *JobsGorm) TaskTypeTakeIn(tx *gorm.DB, customId, Type string) Task {
return jobsGorm.taskTypeTake(tx, customId, Type, jobs_common.TASK_IN)
}
// TaskTypeTakeSuccess 查询单任务 - 任务完成
func (jobsGorm *JobsGorm) TaskTypeTakeSuccess(tx *gorm.DB, customId, Type string) Task {
return jobsGorm.taskTypeTake(tx, customId, Type, jobs_common.TASK_SUCCESS)
}
// TaskTypeTakeError 查询单任务 - 任务异常
func (jobsGorm *JobsGorm) TaskTypeTakeError(tx *gorm.DB, customId, Type string) Task {
return jobsGorm.taskTypeTake(tx, customId, Type, jobs_common.TASK_ERROR)
}
// TaskTypeTakeTimeout 查询单任务 - 任务超时
func (jobsGorm *JobsGorm) TaskTypeTakeTimeout(tx *gorm.DB, customId, Type string) Task {
return jobsGorm.taskTypeTake(tx, customId, Type, jobs_common.TASK_TIMEOUT)
}
// TaskTypeTakeWait 查询单任务 - 任务等待
func (jobsGorm *JobsGorm) TaskTypeTakeWait(tx *gorm.DB, customId, Type string) Task {
return jobsGorm.taskTypeTake(tx, customId, Type, jobs_common.TASK_WAIT)
}
// TaskFindAll 查询多任务
func (jobsGorm *JobsGorm) TaskFindAll(tx *gorm.DB, frequency int64) (results []Task) {
tx.Where("frequency = ?", frequency).Order("id asc").Find(&results)
return results
}
// 查询多任务
func (jobsGorm *JobsGorm) taskFindAll(tx *gorm.DB, frequency int64, status string) (results []Task) {
tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results)
return results
}
// TaskFindAllIn 查询多任务 - 任务运行
func (jobsGorm *JobsGorm) TaskFindAllIn(tx *gorm.DB, frequency int64) []Task {
return jobsGorm.taskFindAll(tx, frequency, jobs_common.TASK_IN)
}
// TaskFindAllSuccess 查询多任务 - 任务完成
func (jobsGorm *JobsGorm) TaskFindAllSuccess(tx *gorm.DB, frequency int64) []Task {
return jobsGorm.taskFindAll(tx, frequency, jobs_common.TASK_SUCCESS)
}
// TaskFindAllError 查询多任务 - 任务异常
func (jobsGorm *JobsGorm) TaskFindAllError(tx *gorm.DB, frequency int64) []Task {
return jobsGorm.taskFindAll(tx, frequency, jobs_common.TASK_ERROR)
}
// TaskFindAllTimeout 查询多任务 - 任务超时
func (jobsGorm *JobsGorm) TaskFindAllTimeout(tx *gorm.DB, frequency int64) []Task {
return jobsGorm.taskFindAll(tx, frequency, jobs_common.TASK_TIMEOUT)
}
// TaskFindAllWait 查询多任务 - 任务等待
func (jobsGorm *JobsGorm) TaskFindAllWait(tx *gorm.DB, frequency int64) []Task {
return jobsGorm.taskFindAll(tx, frequency, jobs_common.TASK_WAIT)
}
// EditTask 任务修改
func (jobsGorm *JobsGorm) EditTask(tx *gorm.DB, id uint) *gorm.DB {
return tx.Model(&Task{}).Where("id = ?", id)
}
// UpdateFrequency 更新任务频率
func (jobsGorm *JobsGorm) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB {
return jobsGorm.EditTask(tx, id).
Select("frequency").
Updates(Task{
Frequency: frequency,
})
}

@ -1,69 +0,0 @@
package jobs_gorm
import (
"gorm.io/gorm"
"log"
"strings"
)
// TaskIp 任务Ip
type TaskIp struct {
Id int64 `gorm:"primaryKey;comment:记录编号" json:"id"` // 记录编号
TaskType string `gorm:"comment:任务编号" json:"task_type"` // 任务编号
Ips string `gorm:"comment:任务IP" json:"ips"` // 任务IP
}
func (m *TaskIp) TableName() string {
return "task_ip"
}
func (jobsGorm *JobsGorm) taskIpTake(tx *gorm.DB, taskType, ips string) (result TaskIp) {
tx.Where("task_type = ?", taskType).Where("ips = ?", ips).Take(&result)
return result
}
// TaskIpUpdate 更新ip
func (jobsGorm *JobsGorm) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB {
query := jobsGorm.taskIpTake(tx, taskType, ips)
if query.Id != 0 {
return tx
}
updateStatus := tx.Create(&TaskIp{
TaskType: taskType,
Ips: ips,
})
if updateStatus.RowsAffected == 0 {
log.Println("任务更新失败:", updateStatus.Error)
}
return updateStatus
}
// TaskIpInit 实例任务ip
func (jobsGorm *JobsGorm) TaskIpInit(tx *gorm.DB, ips map[string]string) bool {
if jobsGorm.outsideIp == "" || jobsGorm.outsideIp == "0.0.0.0" {
return false
}
tx.Where("ips = ?", jobsGorm.outsideIp).Delete(&TaskIp{}) // 删除
for k, v := range ips {
if v == "" {
jobsGorm.TaskIpUpdate(tx, k, jobsGorm.outsideIp)
} else {
find := strings.Contains(v, ",")
if find == true {
// 包含
parts := strings.Split(v, ",")
for _, vv := range parts {
if vv == jobsGorm.outsideIp {
jobsGorm.TaskIpUpdate(tx, k, jobsGorm.outsideIp)
}
}
} else {
// 不包含
if v == jobsGorm.outsideIp {
jobsGorm.TaskIpUpdate(tx, k, jobsGorm.outsideIp)
}
}
}
}
return true
}

@ -1,29 +0,0 @@
package jobs_gorm
var ParamsOrderType = "order"
// ParamsOrderId 订单任务
type ParamsOrderId struct {
OrderId string `json:"order_id,omitempty"`
}
var ParamsMerchantGoldenBeanType = "merchant.golden_bean"
var ParamsNewServiceType = "new_service"
// ParamsTaskId 企业自定义任务
type ParamsTaskId struct {
TaskId int64 `json:"task_id,omitempty"`
}
var ParamsNewServiceNextType = "new_service.next"
// ParamsTaskIdNext 企业自定义下一步任务
type ParamsTaskIdNext struct {
TaskId int64 `json:"task_id,omitempty"`
MerchantUserId int64 `json:"merchant_user_id,omitempty"`
CurrentNumber int `json:"current_number,omitempty"`
MaxNumber int `json:"max_number,omitempty"`
}
var ParamsTeamInvType = "team.inv"

@ -1,25 +0,0 @@
package jobs_gorm
func GetTypeApiPaySubmit(Type string) string {
return "api.pay.submit." + Type
}
func GetTypeWechatRefundsSubmit(Type string) string {
return "wechat.refunds.submit." + Type
}
func GetTypeWechatRefundsQuery(Type string) string {
return "wechat.refunds.query." + Type
}
func GetTypeGoldenBeansIssue(Type string) string {
return "golden_beans.issue." + Type
}
func GetTypeGoldenBeansRefunds(Type string) string {
return "golden_beans.refunds." + Type
}
func GetTypeCustomerAuto(Type string) string {
return "customer.auto." + Type
}

@ -1,18 +1,19 @@
package jobs_gorm
package gojobs
import (
"go.dtapp.net/gojobs/jobs_gorm_model"
"go.dtapp.net/gotime"
"gorm.io/gorm"
"log"
)
func (jobsGorm *JobsGorm) Check(tx *gorm.DB, vs []Task) {
if jobsGorm.mainService > 0 && len(vs) > 0 {
func (j *jobsGorm) Check(tx *gorm.DB, vs []jobs_gorm_model.Task) {
if j.mainService > 0 && len(vs) > 0 {
for _, v := range vs {
diffInSecondWithAbs := gotime.Current().DiffInSecondWithAbs(gotime.SetCurrentParse(v.UpdatedAt).Time)
if diffInSecondWithAbs >= v.Frequency*3 {
log.Printf("每隔%v秒任务%v相差%v秒\n", v.Frequency, v.Id, diffInSecondWithAbs)
tx.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&TaskLogRun{}) // 删除
tx.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&jobs_gorm_model.TaskLogRun{}) // 删除
}
}
}

@ -0,0 +1,20 @@
package gojobs
import (
"go.dtapp.net/goip"
"go.dtapp.net/gojobs/jobs_gorm_model"
"gorm.io/gorm"
)
// RefreshIp 刷新Ip
func (j *jobsGorm) RefreshIp(tx *gorm.DB) {
xip := goip.GetOutsideIp()
if j.outsideIp == "" || j.outsideIp == "0.0.0.0" {
return
}
if j.outsideIp == xip {
return
}
tx.Where("ips = ?", j.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除
j.outsideIp = xip
}

@ -0,0 +1,36 @@
package gojobs
import (
"fmt"
"go.dtapp.net/gojobs/jobs_gorm_model"
"go.dtapp.net/goredis"
"time"
)
// Lock 上锁
func (j *jobsGorm) Lock(info jobs_gorm_model.Task, id any) string {
cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id)
judgeCache := j.redis.NewStringOperation().Get(cacheName).UnwrapOr("")
if judgeCache != "" {
return judgeCache
}
j.redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器上锁成功", j.outsideIp), goredis.WithExpire(time.Millisecond*time.Duration(info.Frequency)*3))
return ""
}
// Unlock Lock 解锁
func (j *jobsGorm) Unlock(info jobs_gorm_model.Task, id any) {
cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id)
j.redis.NewStringOperation().Del(cacheName)
}
// LockForever 永远上锁
func (j *jobsGorm) LockForever(info jobs_gorm_model.Task, id any) string {
cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id)
judgeCache := j.redis.NewStringOperation().Get(cacheName).UnwrapOr("")
if judgeCache != "" {
return judgeCache
}
j.redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器永远上锁成功", j.outsideIp))
return ""
}

@ -0,0 +1,190 @@
package gojobs
import (
"go.dtapp.net/gojobs/jobs_gorm_model"
"gorm.io/gorm"
"log"
"strings"
)
// TaskTake 查询单任务
func (j *jobsGorm) TaskTake(tx *gorm.DB, customId string) (result jobs_gorm_model.Task) {
tx.Where("custom_id = ?", customId).Take(&result)
return result
}
// 查询单任务
func (j *jobsGorm) taskTake(tx *gorm.DB, customId, status string) (result jobs_gorm_model.Task) {
tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result)
return result
}
// TaskTakeIn 查询单任务 - 任务运行
func (j *jobsGorm) TaskTakeIn(tx *gorm.DB, customId string) jobs_gorm_model.Task {
return j.taskTake(tx, customId, TASK_IN)
}
// TaskTakeSuccess 查询单任务 - 任务完成
func (j *jobsGorm) TaskTakeSuccess(tx *gorm.DB, customId string) jobs_gorm_model.Task {
return j.taskTake(tx, customId, TASK_SUCCESS)
}
// TaskTakeError 查询单任务 - 任务异常
func (j *jobsGorm) TaskTakeError(tx *gorm.DB, customId string) jobs_gorm_model.Task {
return j.taskTake(tx, customId, TASK_ERROR)
}
// TaskTakeTimeout 查询单任务 - 任务超时
func (j *jobsGorm) TaskTakeTimeout(tx *gorm.DB, customId string) jobs_gorm_model.Task {
return j.taskTake(tx, customId, TASK_TIMEOUT)
}
// TaskTakeWait 查询单任务 - 任务等待
func (j *jobsGorm) TaskTakeWait(tx *gorm.DB, customId string) jobs_gorm_model.Task {
return j.taskTake(tx, customId, TASK_WAIT)
}
// TaskTypeTake 查询单任务
func (j *jobsGorm) TaskTypeTake(tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task) {
tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result)
return result
}
// 查询单任务
func (j *jobsGorm) taskTypeTake(tx *gorm.DB, customId, Type, status string) (result jobs_gorm_model.Task) {
tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result)
return result
}
// TaskTypeTakeIn 查询单任务 - 任务运行
func (j *jobsGorm) TaskTypeTakeIn(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return j.taskTypeTake(tx, customId, Type, TASK_IN)
}
// TaskTypeTakeSuccess 查询单任务 - 任务完成
func (j *jobsGorm) TaskTypeTakeSuccess(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return j.taskTypeTake(tx, customId, Type, TASK_SUCCESS)
}
// TaskTypeTakeError 查询单任务 - 任务异常
func (j *jobsGorm) TaskTypeTakeError(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return j.taskTypeTake(tx, customId, Type, TASK_ERROR)
}
// TaskTypeTakeTimeout 查询单任务 - 任务超时
func (j *jobsGorm) TaskTypeTakeTimeout(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return j.taskTypeTake(tx, customId, Type, TASK_TIMEOUT)
}
// TaskTypeTakeWait 查询单任务 - 任务等待
func (j *jobsGorm) TaskTypeTakeWait(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return j.taskTypeTake(tx, customId, Type, TASK_WAIT)
}
// TaskFindAll 查询多任务
func (j *jobsGorm) TaskFindAll(tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task) {
tx.Where("frequency = ?", frequency).Order("id asc").Find(&results)
return results
}
// 查询多任务
func (j *jobsGorm) taskFindAll(tx *gorm.DB, frequency int64, status string) (results []jobs_gorm_model.Task) {
tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results)
return results
}
// TaskFindAllIn 查询多任务 - 任务运行
func (j *jobsGorm) TaskFindAllIn(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return j.taskFindAll(tx, frequency, TASK_IN)
}
// TaskFindAllSuccess 查询多任务 - 任务完成
func (j *jobsGorm) TaskFindAllSuccess(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return j.taskFindAll(tx, frequency, TASK_SUCCESS)
}
// TaskFindAllError 查询多任务 - 任务异常
func (j *jobsGorm) TaskFindAllError(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return j.taskFindAll(tx, frequency, TASK_ERROR)
}
// TaskFindAllTimeout 查询多任务 - 任务超时
func (j *jobsGorm) TaskFindAllTimeout(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return j.taskFindAll(tx, frequency, TASK_TIMEOUT)
}
// TaskFindAllWait 查询多任务 - 任务等待
func (j *jobsGorm) TaskFindAllWait(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return j.taskFindAll(tx, frequency, TASK_WAIT)
}
// EditTask 任务修改
func (j *jobsGorm) EditTask(tx *gorm.DB, id uint) *gorm.DB {
return tx.Model(&jobs_gorm_model.Task{}).Where("id = ?", id)
}
// UpdateFrequency 更新任务频率
func (j *jobsGorm) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB {
return j.EditTask(tx, id).
Select("frequency").
Updates(jobs_gorm_model.Task{
Frequency: frequency,
})
}
func (j *jobsGorm) taskIpTake(tx *gorm.DB, taskType, ips string) (result jobs_gorm_model.TaskIp) {
tx.Where("task_type = ?", taskType).Where("ips = ?", ips).Take(&result)
return result
}
// TaskIpUpdate 更新ip
func (j *jobsGorm) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB {
query := j.taskIpTake(tx, taskType, ips)
if query.Id != 0 {
return tx
}
updateStatus := tx.Create(&jobs_gorm_model.TaskIp{
TaskType: taskType,
Ips: ips,
})
if updateStatus.RowsAffected == 0 {
log.Println("任务更新失败:", updateStatus.Error)
}
return updateStatus
}
// TaskIpInit 实例任务ip
func (j *jobsGorm) TaskIpInit(tx *gorm.DB, ips map[string]string) bool {
if j.outsideIp == "" || j.outsideIp == "0.0.0.0" {
return false
}
tx.Where("ips = ?", j.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除
for k, v := range ips {
if v == "" {
j.TaskIpUpdate(tx, k, j.outsideIp)
} else {
find := strings.Contains(v, ",")
if find == true {
// 包含
parts := strings.Split(v, ",")
for _, vv := range parts {
if vv == j.outsideIp {
j.TaskIpUpdate(tx, k, j.outsideIp)
}
}
} else {
// 不包含
if v == j.outsideIp {
j.TaskIpUpdate(tx, k, j.outsideIp)
}
}
}
}
return true
}
// TaskLogRunTake 查询任务执行日志
func (j *jobsGorm) TaskLogRunTake(tx *gorm.DB, taskId uint, runId string) (result jobs_gorm_model.TaskLogRun) {
tx.Select("id", "os", "arch", "outside_ip", "created_at").Where("task_id = ?", taskId).Where("run_id = ?", runId).Take(&result)
return result
}

@ -0,0 +1,32 @@
package jobs_gorm_model
import (
"gorm.io/gorm"
)
// Task 任务
type Task struct {
Id uint `gorm:"primaryKey;comment:记录编号" json:"id"` // 记录编号
Status string `gorm:"index;comment:状态码" json:"status"` // 状态码
Params string `gorm:"comment:参数" json:"params"` // 参数
ParamsType string `gorm:"comment:参数类型" json:"params_type"` // 参数类型
StatusDesc string `gorm:"comment:状态描述" json:"status_desc"` // 状态描述
Frequency int64 `gorm:"index;comment:频率(秒单位)" json:"frequency"` // 频率(秒单位)
Number int64 `gorm:"comment:当前次数" json:"number"` // 当前次数
MaxNumber int64 `gorm:"comment:最大次数" json:"max_number"` // 最大次数
RunId string `gorm:"comment:执行编号" json:"run_id"` // 执行编号
CustomId string `gorm:"index;comment:自定义编号" json:"custom_id"` // 自定义编号
CustomSequence int64 `gorm:"comment:自定义顺序" json:"custom_sequence"` // 自定义顺序
Type string `gorm:"index;comment:类型" json:"type"` // 类型
CreatedIp string `gorm:"comment:创建外网IP" json:"created_ip"` // 创建外网IP
SpecifyIp string `gorm:"comment:指定外网IP" json:"specify_ip"` // 指定外网IP
UpdatedIp string `gorm:"comment:更新外网IP" json:"updated_ip"` // 更新外网IP
Result string `gorm:"comment:结果" json:"result"` // 结果
CreatedAt string `gorm:"type:text;comment:创建时间" json:"created_at"` // 创建时间
UpdatedAt string `gorm:"type:text;comment:更新时间" json:"updated_at"` // 更新时间
DeletedAt gorm.DeletedAt `gorm:"type:text;index;comment:删除时间" json:"deleted_at"` // 删除时间
}
func (m *Task) TableName() string {
return "task"
}

@ -0,0 +1,12 @@
package jobs_gorm_model
// TaskIp 任务Ip
type TaskIp struct {
Id int64 `gorm:"primaryKey;comment:记录编号" json:"id"` // 记录编号
TaskType string `gorm:"comment:任务编号" json:"task_type"` // 任务编号
Ips string `gorm:"comment:任务IP" json:"ips"` // 任务IP
}
func (m *TaskIp) TableName() string {
return "task_ip"
}

@ -1,4 +1,4 @@
package jobs_gorm
package jobs_gorm_model
// TaskLog 任务日志模型
type TaskLog struct {

@ -1,8 +1,4 @@
package jobs_gorm
import (
"gorm.io/gorm"
)
package jobs_gorm_model
// TaskLogRun 任务执行日志模型
type TaskLogRun struct {
@ -22,9 +18,3 @@ type TaskLogRun struct {
func (m *TaskLogRun) TableName() string {
return "task_log_run"
}
// TaskLogRunTake 查询任务执行日志
func (jobsGorm *JobsGorm) TaskLogRunTake(tx *gorm.DB, taskId uint, runId string) (result TaskLogRun) {
tx.Select("id", "os", "arch", "outside_ip", "created_at").Where("task_id = ?", taskId).Where("run_id = ?", runId).Take(&result)
return result
}

@ -0,0 +1,25 @@
package gojobs
//
//type JobsOption func(*JobsCron)
//
//// WithRedis 缓存服务驱动
//func WithRedis(db *goredis.Client) JobsOption {
// return func(opts *JobsCron) {
// opts.redis = db
// }
//}
//
//// WithGorm 数据库服务驱动
//func WithGorm(db *gorm.DB) JobsOption {
// return func(opts *JobsCron) {
// opts.db = db
// }
//}
//
//// WithMainService 是否主要服务(主要服务可删除过期服务)
//func WithMainService(status int) JobsOption {
// return func(opts *JobsCron) {
// opts.mainService = status
// }
//}

@ -1,16 +1,13 @@
package gojobs
import (
"testing"
)
func TestSpec(t *testing.T) {
t.Log("每隔n秒执行一次", GetSpecSeconds(10))
t.Log("每隔n秒执行一次", GetFrequencySeconds(10))
t.Log("每隔n分钟执行一次", GetSpecMinutes(10))
t.Log("每隔n分钟执行一次", GetFrequencyMinutes(10))
t.Log("每天n点执行一次", GetSpecHour(10))
t.Log("每天n点执行一次", GetFrequencyHour(10))
import "testing"
func TestJobs1(t *testing.T) {
//t.Log(NewJobsGorm(&ConfigJobsGorm{
// MainService: 0,
// Db: nil,
// Redis: nil,
//}).Run())
//t.Log(NewJobsXorm().Run())
//t.Log(NewJobsZorm().Run())
}

@ -2,14 +2,20 @@ package gojobs
import "xorm.io/xorm"
type JobsXorm struct {
Db *xorm.Engine
// Xorm数据库驱动
type jobsXorm struct {
db *xorm.Engine
}
func newJobsXorm(db *xorm.Engine) *JobsXorm {
// NewJobsXorm 初始化
func NewJobsXorm(db *xorm.Engine) *jobsXorm {
var (
jobsXorm = &JobsXorm{}
j = &jobsXorm{}
)
jobsXorm.Db = db
return jobsXorm
j.db = db
return j
}
func (j *jobsXorm) Run() {
}

@ -2,14 +2,20 @@ package gojobs
import "gitee.com/chunanyong/zorm"
type JobsZorm struct {
Db *zorm.DBDao
// Zorm数据库驱动
type jobsZorm struct {
db *zorm.DBDao
}
func NewJobsZorm(db *zorm.DBDao) *JobsZorm {
// NewJobsZorm 初始化
func NewJobsZorm(db *zorm.DBDao) *jobsZorm {
var (
jobsZorm = &JobsZorm{}
j = &jobsZorm{}
)
jobsZorm.Db = db
return jobsZorm
j.db = db
return j
}
func (j *jobsZorm) Run() {
}

@ -1,3 +1,3 @@
package gojobs
const Version = "1.0.29"
const Version = "1.0.30"

Loading…
Cancel
Save