diff --git a/jobs_gorm/const.go b/cron_spec.go similarity index 89% rename from jobs_gorm/const.go rename to cron_spec.go index 9db92cf..420f312 100644 --- a/jobs_gorm/const.go +++ b/cron_spec.go @@ -1,4 +1,4 @@ -package jobs_gorm +package gojobs import ( "fmt" @@ -17,7 +17,7 @@ const specSeconds = "*/%d * * * * *" // GetSpecSeconds 每隔n秒执行一次 var GetSpecSeconds = func(n int64) string { - if n < 0 && n > 59 { + if n < 0 || n > 59 { return "" } return fmt.Sprintf(specSeconds, n) @@ -25,7 +25,7 @@ var GetSpecSeconds = func(n int64) string { // GetFrequencySeconds 每隔n秒执行一次 var GetFrequencySeconds = func(n int64) int64 { - if n < 0 && n > 59 { + if n < 0 || n > 59 { return -1 } return n @@ -36,7 +36,7 @@ const specMinutes = "0 */%d * * * *" // GetSpecMinutes 每隔n分钟执行一次 var GetSpecMinutes = func(n int64) string { - if n < 0 && n > 59 { + if n < 0 || n > 59 { return "" } return fmt.Sprintf(specMinutes, n) @@ -44,7 +44,7 @@ var GetSpecMinutes = func(n int64) string { // GetFrequencyMinutes 每隔n分钟执行一次 var GetFrequencyMinutes = func(n int64) int64 { - if n < 0 && n > 59 { + if n < 0 || n > 59 { return -1 } return n * 60 @@ -55,7 +55,7 @@ const specHour = "0 0 */%d * * *" // GetSpecHour 每天n点执行一次 var GetSpecHour = func(n int64) string { - if n < 0 && n > 23 { + if n < 0 || n > 23 { return "" } return fmt.Sprintf(specHour, n) @@ -63,7 +63,7 @@ var GetSpecHour = func(n int64) string { // GetFrequencyHour 每天n点执行一次 var GetFrequencyHour = func(n int64) int64 { - if n < 0 && n > 23 { + if n < 0 || n > 23 { return -1 } return n * 60 * 60 diff --git a/cron_spec_test.go b/cron_spec_test.go new file mode 100644 index 0000000..e14f9aa --- /dev/null +++ b/cron_spec_test.go @@ -0,0 +1,20 @@ +package gojobs + +import "testing" + +func TestSpec(t *testing.T) { + t.Log("每隔10秒执行一次:", GetSpecSeconds(10)) + t.Log("每隔10秒执行一次:", GetFrequencySeconds(10)) + + t.Log("每隔60秒执行一次:", GetSpecSeconds(60)) + t.Log("每隔60秒执行一次:", GetFrequencySeconds(60)) + + t.Log("每隔10分钟执行一次:", GetSpecMinutes(10)) + t.Log("每隔10分钟执行一次:", GetFrequencyMinutes(10)) + + t.Log("每隔60分钟执行一次:", GetSpecMinutes(60)) + t.Log("每隔60分钟执行一次:", GetFrequencyMinutes(60)) + + t.Log("每天n点执行一次:", GetSpecHour(10)) + t.Log("每天n点执行一次:", GetFrequencyHour(10)) +} diff --git a/cron_test.go b/cron_test.go new file mode 100644 index 0000000..d4793f3 --- /dev/null +++ b/cron_test.go @@ -0,0 +1,45 @@ +package gojobs + +import ( + "fmt" + "github.com/jasonlvhit/gocron" + "log" + "testing" +) + +func TestCron1(t *testing.T) { + + // 创建一个cron实例 精确到秒 + crontab := NewCron() + + log.Println(crontab) + + err := crontab.AddJobByFunc("1", "*/1 * * * * *", func() { + log.Println("哈哈哈哈") + }) + if err != nil { + fmt.Printf("添加任务时出错:%s", err) + return + } + + err = crontab.AddJobByFunc("2", "*/2 * * * * *", func() { + log.Println("啊啊啊啊") + }) + if err != nil { + fmt.Printf("添加任务时出错:%s", err) + return + } + + crontab.Start() + select {} +} + +func TestCron2(t *testing.T) { + i := 0 + s := gocron.NewScheduler() + s.Every(5).Seconds().Do(func() { + i++ + log.Println("execute per 5 seconds", i) + }) + <-s.Start() +} diff --git a/go.mod b/go.mod index a968f24..727c5e4 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,9 @@ go 1.18 require ( gitee.com/chunanyong/zorm v1.5.5 - github.com/beego/beego/v2 v2.0.3 + github.com/beego/beego/v2 v2.0.4 + github.com/jasonlvhit/gocron v0.0.1 + github.com/pkg/errors v0.9.1 github.com/robfig/cron/v3 v3.0.1 go.dtapp.net/goarray v1.0.0 go.dtapp.net/goip v1.0.17 @@ -35,7 +37,6 @@ require ( github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/saracen/go7z v0.0.0-20191010121135-9c09b6bd7fda // indirect github.com/saracen/solidblock v0.0.0-20190426153529-45df20abab6f // indirect github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect diff --git a/go.sum b/go.sum index 64afd46..d01e93f 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,8 @@ github.com/aryann/difflib v0.0.0-20170710044230-e206f873d14a/go.mod h1:DAHtR1m6l github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= -github.com/beego/beego/v2 v2.0.3 h1:vLrjDsn3JcxvIUqduDs4i0BdWuu5v7YN2FRKQcTWIDI= -github.com/beego/beego/v2 v2.0.3/go.mod h1:svcOCy6uDaGYHwcO3nppzKwFigeXm8WHkZfgnvemYNM= +github.com/beego/beego/v2 v2.0.4 h1:1NjpVkcqYVdKE06VJTQUVzsgZqFcaj0MqjHna57bWsA= +github.com/beego/beego/v2 v2.0.4/go.mod h1:21YTlo+jRYqrM/dLC0knzmo9C25x0pqddoKqy8kxev8= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -97,6 +97,7 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-redis/redis v6.15.5+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -239,6 +240,8 @@ github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0f github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jasonlvhit/gocron v0.0.1 h1:qTt5qF3b3srDjeOIR4Le1LfeyvoYzJlYpqvG7tJX5YU= +github.com/jasonlvhit/gocron v0.0.1/go.mod h1:k9a3TV8VcU73XZxfVHCHWMWF9SOqgoku0/QlY2yvlA4= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= @@ -325,8 +328,10 @@ github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQ github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= diff --git a/jobs.go b/jobs.go index 1c07121..cb6d890 100644 --- a/jobs.go +++ b/jobs.go @@ -1,70 +1,25 @@ package gojobs -import ( - "fmt" - "net/http" -) - const ( - CodeAbnormal = 0 // 异常 - CodeError = http.StatusInternalServerError // 失败 - CodeSuccess = http.StatusOK // 成功 - CodeEnd = http.StatusCreated // 结束 + TASK_IN = "IN" // 任务运行 + TASK_SUCCESS = "SUCCESS" // 任务完成 + TASK_ERROR = "ERROR" // 任务异常 + TASK_TIMEOUT = "TIMEOUT" // 任务超时 + TASK_WAIT = "WAIT" // 任务等待 ) -// 每隔n秒执行一次 -const specSeconds = "*/%d * * * * *" - -// GetSpecSeconds 每隔n秒执行一次 -var GetSpecSeconds = func(n int64) string { - if n < 0 && n > 59 { - return "" - } - return fmt.Sprintf(specSeconds, n) -} - -// GetFrequencySeconds 每隔n秒执行一次 -var GetFrequencySeconds = func(n int64) int64 { - if n < 0 && n > 59 { - return -1 - } - return n -} - -// 每隔n分钟执行一次 -const specMinutes = "0 */%d * * * *" - -// GetSpecMinutes 每隔n分钟执行一次 -var GetSpecMinutes = func(n int64) string { - if n < 0 && n > 59 { - return "" - } - return fmt.Sprintf(specMinutes, n) -} - -// GetFrequencyMinutes 每隔n分钟执行一次 -var GetFrequencyMinutes = func(n int64) int64 { - if n < 0 && n > 59 { - return -1 - } - return n * 60 -} - -// 每天n点执行一次 -const specHour = "0 0 */%d * * *" - -// GetSpecHour 每天n点执行一次 -var GetSpecHour = func(n int64) string { - if n < 0 && n > 23 { - return "" - } - return fmt.Sprintf(specHour, n) -} - -// GetFrequencyHour 每天n点执行一次 -var GetFrequencyHour = func(n int64) int64 { - if n < 0 && n > 23 { - return -1 - } - return n * 60 * 60 +// Cron +type jobs interface { + // Run 运行 + Run() + // RunAddLog 任务执行日志 + RunAddLog() + // CreateInCustomId 创建正在运行任务 + CreateInCustomId() + // CreateInCustomIdOnly 创建正在运行唯一任务 + CreateInCustomIdOnly() + // CreateInCustomIdMaxNumber 创建正在运行任务并限制数量 + CreateInCustomIdMaxNumber() + // CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 + CreateInCustomIdMaxNumberOnly() } diff --git a/jobs_common/jobs_common.go b/jobs_common/jobs_common.go deleted file mode 100644 index 96b5c20..0000000 --- a/jobs_common/jobs_common.go +++ /dev/null @@ -1,9 +0,0 @@ -package jobs_common - -const ( - TASK_IN = "IN" // 任务运行 - TASK_SUCCESS = "SUCCESS" // 任务完成 - TASK_ERROR = "ERROR" // 任务异常 - TASK_TIMEOUT = "TIMEOUT" // 任务超时 - TASK_WAIT = "WAIT" // 任务等待 -) diff --git a/jobs_gorm.go b/jobs_gorm.go index 4f8d4f6..3d87e08 100644 --- a/jobs_gorm.go +++ b/jobs_gorm.go @@ -3,37 +3,309 @@ package gojobs import ( "errors" "fmt" - "go.dtapp.net/gojobs/jobs_gorm" + "go.dtapp.net/goarray" + "go.dtapp.net/goip" + "go.dtapp.net/gojobs/jobs_gorm_model" "go.dtapp.net/goredis" + "go.dtapp.net/gotime" + "go.dtapp.net/gouuid" "gorm.io/gorm" + "log" + "runtime" ) +// ConfigJobsGorm 配置 type ConfigJobsGorm struct { MainService int // 主要服务 Db *gorm.DB // 数据库 Redis *goredis.Client // 缓存数据库服务 } -func NewJobsGorm(config *ConfigJobsGorm) *jobs_gorm.JobsGorm { +// Gorm数据库驱动 +type jobsGorm struct { + runVersion string // 运行版本 + os string // 系统类型 + arch string // 系统架构 + maxProCs int // CPU核数 + version string // GO版本 + macAddrS string // Mac地址 + insideIp string // 内网ip + outsideIp string // 外网ip + mainService int // 主要服务 + db *gorm.DB // 数据库 + redis *goredis.Client // 缓存数据库服务 +} + +// NewJobsGorm 初始化 +func NewJobsGorm(config *ConfigJobsGorm) *jobsGorm { var ( - jobsGorm = &jobs_gorm.JobsGorm{} + j = &jobsGorm{} ) - jobsGorm = jobs_gorm.NewGorm(jobs_gorm.JobsGorm{ - Db: config.Db, - Redis: config.Redis, - }, config.MainService, Version) + j.runVersion = Version + j.os = runtime.GOOS + j.arch = runtime.GOARCH + j.maxProCs = runtime.GOMAXPROCS(0) + j.version = runtime.Version() + j.macAddrS = goarray.TurnString(goip.GetMacAddr()) + j.insideIp = goip.GetInsideIp() + j.outsideIp = goip.GetOutsideIp() + j.mainService = config.MainService + j.db = config.Db + j.redis = config.Redis - err := jobsGorm.Db.AutoMigrate( - &jobs_gorm.Task{}, - &jobs_gorm.TaskLog{}, - &jobs_gorm.TaskLogRun{}, - &jobs_gorm.TaskIp{}, + err := j.db.AutoMigrate( + &jobs_gorm_model.Task{}, + &jobs_gorm_model.TaskLog{}, + &jobs_gorm_model.TaskLogRun{}, + &jobs_gorm_model.TaskIp{}, ) if err != nil { panic(errors.New(fmt.Sprintf("创建任务模型失败:%v\n", err))) } - return jobsGorm + return j +} + +// Run 运行 +func (j *jobsGorm) Run(info jobs_gorm_model.Task, status int, desc string) { + // 请求函数记录 + statusCreate := j.db.Create(&jobs_gorm_model.TaskLog{ + TaskId: info.Id, + StatusCode: status, + Desc: desc, + Version: j.runVersion, + CreatedAt: gotime.Current().Format(), + }) + if statusCreate.RowsAffected == 0 { + log.Println("statusCreate", statusCreate.Error) + } + if status == 0 { + statusEdit := j.EditTask(j.db, info.Id).Select("run_id").Updates(jobs_gorm_model.Task{ + RunId: gouuid.GetUuId(), + }) + if statusEdit.RowsAffected == 0 { + log.Println("statusEdit", statusEdit.Error) + } + return + } + // 任务 + if status == CodeSuccess { + // 执行成功 + statusEdit := j.EditTask(j.db, info.Id). + Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result"). + Updates(jobs_gorm_model.Task{ + StatusDesc: "执行成功", + Number: info.Number + 1, + RunId: gouuid.GetUuId(), + UpdatedIp: j.outsideIp, + UpdatedAt: gotime.Current().Format(), + Result: desc, + }) + if statusEdit.RowsAffected == 0 { + log.Println("statusEdit", statusEdit.Error) + } + } + if status == CodeEnd { + // 执行成功、提前结束 + statusEdit := j.EditTask(j.db, info.Id). + Select("status", "status_desc", "number", "updated_ip", "updated_at", "result"). + Updates(jobs_gorm_model.Task{ + Status: TASK_SUCCESS, + StatusDesc: "结束执行", + Number: info.Number + 1, + UpdatedIp: j.outsideIp, + UpdatedAt: gotime.Current().Format(), + Result: desc, + }) + if statusEdit.RowsAffected == 0 { + log.Println("statusEdit", statusEdit.Error) + } + } + if status == CodeError { + // 执行失败 + statusEdit := j.EditTask(j.db, info.Id). + Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result"). + Updates(jobs_gorm_model.Task{ + StatusDesc: "执行失败", + Number: info.Number + 1, + RunId: gouuid.GetUuId(), + UpdatedIp: j.outsideIp, + UpdatedAt: gotime.Current().Format(), + Result: desc, + }) + if statusEdit.RowsAffected == 0 { + log.Println("statusEdit", statusEdit.Error) + } + } + if info.MaxNumber != 0 { + if info.Number+1 >= info.MaxNumber { + // 关闭执行 + statusEdit := j.EditTask(j.db, info.Id). + Select("status"). + Updates(jobs_gorm_model.Task{ + Status: TASK_TIMEOUT, + }) + if statusEdit.RowsAffected == 0 { + log.Println("statusEdit", statusEdit.Error) + } + } + } +} + +// RunAddLog 任务执行日志 +func (j *jobsGorm) RunAddLog(id uint, runId string) *gorm.DB { + return j.db.Create(&jobs_gorm_model.TaskLogRun{ + TaskId: id, + RunId: runId, + InsideIp: j.insideIp, + OutsideIp: j.outsideIp, + Os: j.os, + Arch: j.arch, + Gomaxprocs: j.maxProCs, + GoVersion: j.version, + MacAddrs: j.macAddrS, + CreatedAt: gotime.Current().Format(), + }) +} + +// ConfigCreateInCustomId 创建正在运行任务 +type ConfigCreateInCustomId struct { + Tx *gorm.DB // 驱动 + Params string // 参数 + Frequency int64 // 频率(秒单位) + CustomId string // 自定义编号 + CustomSequence int64 // 自定义顺序 + Type string // 类型 + SpecifyIp string // 指定外网IP +} + +// CreateInCustomId 创建正在运行任务 +func (j *jobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error { + createStatus := config.Tx.Create(&jobs_gorm_model.Task{ + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + RunId: gouuid.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + CreatedIp: j.outsideIp, + SpecifyIp: config.SpecifyIp, + UpdatedIp: j.outsideIp, + }) + if createStatus.RowsAffected == 0 { + return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) + } + return nil +} + +// ConfigCreateInCustomIdOnly 创建正在运行唯一任务 +type ConfigCreateInCustomIdOnly struct { + Tx *gorm.DB // 驱动 + Params string // 参数 + Frequency int64 // 频率(秒单位) + CustomId string // 自定义编号 + CustomSequence int64 // 自定义顺序 + Type string // 类型 + SpecifyIp string // 指定外网IP +} + +// CreateInCustomIdOnly 创建正在运行唯一任务 +func (j *jobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error { + query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) + if query.Id != 0 { + return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type)) + } + createStatus := config.Tx.Create(&jobs_gorm_model.Task{ + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + RunId: gouuid.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + CreatedIp: j.outsideIp, + SpecifyIp: config.SpecifyIp, + UpdatedIp: j.outsideIp, + }) + if createStatus.RowsAffected == 0 { + return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) + } + return nil +} + +// ConfigCreateInCustomIdMaxNumber 创建正在运行任务并限制数量 +type ConfigCreateInCustomIdMaxNumber struct { + Tx *gorm.DB // 驱动 + Params string // 参数 + Frequency int64 // 频率(秒单位) + MaxNumber int64 // 最大次数 + CustomId string // 自定义编号 + CustomSequence int64 // 自定义顺序 + Type string // 类型 + SpecifyIp string // 指定外网IP +} + +// CreateInCustomIdMaxNumber 创建正在运行任务并限制数量 +func (j *jobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error { + createStatus := config.Tx.Create(&jobs_gorm_model.Task{ + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + MaxNumber: config.MaxNumber, + RunId: gouuid.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + CreatedIp: j.outsideIp, + SpecifyIp: config.SpecifyIp, + UpdatedIp: j.outsideIp, + }) + if createStatus.RowsAffected == 0 { + return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) + } + return nil +} + +// ConfigCreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 +type ConfigCreateInCustomIdMaxNumberOnly struct { + Tx *gorm.DB // 驱动 + Params string // 参数 + Frequency int64 // 频率(秒单位) + MaxNumber int64 // 最大次数 + CustomId string // 自定义编号 + CustomSequence int64 // 自定义顺序 + Type string // 类型 + SpecifyIp string // 指定外网IP +} + +// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 +func (j *jobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error { + query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) + if query.Id != 0 { + return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type)) + } + createStatus := config.Tx.Create(&jobs_gorm_model.Task{ + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + MaxNumber: config.MaxNumber, + RunId: gouuid.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + CreatedIp: j.outsideIp, + SpecifyIp: config.SpecifyIp, + UpdatedIp: j.outsideIp, + }) + if createStatus.RowsAffected == 0 { + return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) + } + return nil } diff --git a/jobs_gorm/const_test.go b/jobs_gorm/const_test.go deleted file mode 100644 index b42935f..0000000 --- a/jobs_gorm/const_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package jobs_gorm - -import "testing" - -func TestSpec(t *testing.T) { - t.Log(GetSpecSeconds(10)) - t.Log(GetFrequencySeconds(10)) - - t.Log(GetSpecMinutes(1)) - t.Log(GetFrequencyMinutes(1)) - t.Log(GetSpecMinutes(10)) - t.Log(GetFrequencyMinutes(10)) - t.Log(GetSpecMinutes(30)) - t.Log(GetFrequencyMinutes(30)) - - t.Log(GetSpecHour(10)) - t.Log(GetFrequencyHour(10)) -} diff --git a/jobs_gorm/ip.go b/jobs_gorm/ip.go deleted file mode 100644 index 4627ede..0000000 --- a/jobs_gorm/ip.go +++ /dev/null @@ -1,19 +0,0 @@ -package jobs_gorm - -import ( - "go.dtapp.net/goip" - "gorm.io/gorm" -) - -// RefreshIp 刷新Ip -func (jobsGorm *JobsGorm) RefreshIp(tx *gorm.DB) { - xip := goip.GetOutsideIp() - if jobsGorm.outsideIp == "" || jobsGorm.outsideIp == "0.0.0.0" { - return - } - if jobsGorm.outsideIp == xip { - return - } - tx.Where("ips = ?", jobsGorm.outsideIp).Delete(&TaskIp{}) // 删除 - jobsGorm.outsideIp = xip -} diff --git a/jobs_gorm/jobs_gorm.go b/jobs_gorm/jobs_gorm.go deleted file mode 100644 index e435b2c..0000000 --- a/jobs_gorm/jobs_gorm.go +++ /dev/null @@ -1,286 +0,0 @@ -package jobs_gorm - -import ( - "errors" - "fmt" - "go.dtapp.net/goarray" - "go.dtapp.net/goip" - "go.dtapp.net/gojobs/jobs_common" - "go.dtapp.net/goredis" - "go.dtapp.net/gotime" - "go.dtapp.net/gouuid" - "gorm.io/gorm" - "log" - "runtime" -) - -// JobsGorm 任务 -type JobsGorm struct { - runVersion string // 运行版本 - os string // 系统类型 - arch string // 系统架构 - maxProCs int // CPU核数 - version string // GO版本 - macAddrS string // Mac地址 - insideIp string // 内网ip - outsideIp string // 外网ip - mainService int // 主要服务 - Db *gorm.DB // 数据库 - Redis *goredis.Client // 缓存数据库服务 -} - -// NewGorm 任务 -func NewGorm(jobsGorm JobsGorm, mainService int, runVersion string) *JobsGorm { - jobsGorm.runVersion = runVersion - jobsGorm.os = runtime.GOOS - jobsGorm.arch = runtime.GOARCH - jobsGorm.maxProCs = runtime.GOMAXPROCS(0) - jobsGorm.version = runtime.Version() - jobsGorm.macAddrS = goarray.TurnString(goip.GetMacAddr()) - jobsGorm.insideIp = goip.GetInsideIp() - jobsGorm.outsideIp = goip.GetOutsideIp() - jobsGorm.mainService = mainService - return &jobsGorm -} - -// ConfigCreateInCustomId 创建正在运行任务 -type ConfigCreateInCustomId struct { - Tx *gorm.DB // 驱动 - Params string // 参数 - Frequency int64 // 频率(秒单位) - CustomId string // 自定义编号 - CustomSequence int64 // 自定义顺序 - Type string // 类型 - SpecifyIp string // 指定外网IP -} - -// CreateInCustomId 创建正在运行任务 -func (jobsGorm *JobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error { - createStatus := config.Tx.Create(&Task{ - Status: jobs_common.TASK_IN, - Params: config.Params, - StatusDesc: "首次添加任务", - Frequency: config.Frequency, - RunId: gouuid.GetUuId(), - CustomId: config.CustomId, - CustomSequence: config.CustomSequence, - Type: config.Type, - CreatedIp: jobsGorm.outsideIp, - SpecifyIp: config.SpecifyIp, - UpdatedIp: jobsGorm.outsideIp, - }) - if createStatus.RowsAffected == 0 { - return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) - } - return nil -} - -// ConfigCreateInCustomIdOnly 创建正在运行唯一任务 -type ConfigCreateInCustomIdOnly struct { - Tx *gorm.DB // 驱动 - Params string // 参数 - Frequency int64 // 频率(秒单位) - CustomId string // 自定义编号 - CustomSequence int64 // 自定义顺序 - Type string // 类型 - SpecifyIp string // 指定外网IP -} - -// CreateInCustomIdOnly 创建正在运行唯一任务 -func (jobsGorm *JobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error { - query := jobsGorm.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) - if query.Id != 0 { - return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type)) - } - createStatus := config.Tx.Create(&Task{ - Status: jobs_common.TASK_IN, - Params: config.Params, - StatusDesc: "首次添加任务", - Frequency: config.Frequency, - RunId: gouuid.GetUuId(), - CustomId: config.CustomId, - CustomSequence: config.CustomSequence, - Type: config.Type, - CreatedIp: jobsGorm.outsideIp, - SpecifyIp: config.SpecifyIp, - UpdatedIp: jobsGorm.outsideIp, - }) - if createStatus.RowsAffected == 0 { - return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) - } - return nil -} - -// ConfigCreateInCustomIdMaxNumber 创建正在运行任务并限制数量 -type ConfigCreateInCustomIdMaxNumber struct { - Tx *gorm.DB // 驱动 - Params string // 参数 - Frequency int64 // 频率(秒单位) - MaxNumber int64 // 最大次数 - CustomId string // 自定义编号 - CustomSequence int64 // 自定义顺序 - Type string // 类型 - SpecifyIp string // 指定外网IP -} - -// CreateInCustomIdMaxNumber 创建正在运行任务并限制数量 -func (jobsGorm *JobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error { - createStatus := config.Tx.Create(&Task{ - Status: jobs_common.TASK_IN, - Params: config.Params, - StatusDesc: "首次添加任务", - Frequency: config.Frequency, - MaxNumber: config.MaxNumber, - RunId: gouuid.GetUuId(), - CustomId: config.CustomId, - CustomSequence: config.CustomSequence, - Type: config.Type, - CreatedIp: jobsGorm.outsideIp, - SpecifyIp: config.SpecifyIp, - UpdatedIp: jobsGorm.outsideIp, - }) - if createStatus.RowsAffected == 0 { - return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) - } - return nil -} - -// ConfigCreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 -type ConfigCreateInCustomIdMaxNumberOnly struct { - Tx *gorm.DB // 驱动 - Params string // 参数 - Frequency int64 // 频率(秒单位) - MaxNumber int64 // 最大次数 - CustomId string // 自定义编号 - CustomSequence int64 // 自定义顺序 - Type string // 类型 - SpecifyIp string // 指定外网IP -} - -// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 -func (jobsGorm *JobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error { - query := jobsGorm.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) - if query.Id != 0 { - return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type)) - } - createStatus := config.Tx.Create(&Task{ - Status: jobs_common.TASK_IN, - Params: config.Params, - StatusDesc: "首次添加任务", - Frequency: config.Frequency, - MaxNumber: config.MaxNumber, - RunId: gouuid.GetUuId(), - CustomId: config.CustomId, - CustomSequence: config.CustomSequence, - Type: config.Type, - CreatedIp: jobsGorm.outsideIp, - SpecifyIp: config.SpecifyIp, - UpdatedIp: jobsGorm.outsideIp, - }) - if createStatus.RowsAffected == 0 { - return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) - } - return nil -} - -// RunAddLog 任务执行日志 -func (jobsGorm *JobsGorm) RunAddLog(tx *gorm.DB, id uint, runId string) *gorm.DB { - return tx.Create(&TaskLogRun{ - TaskId: id, - RunId: runId, - InsideIp: jobsGorm.insideIp, - OutsideIp: jobsGorm.outsideIp, - Os: jobsGorm.os, - Arch: jobsGorm.arch, - Gomaxprocs: jobsGorm.maxProCs, - GoVersion: jobsGorm.version, - MacAddrs: jobsGorm.macAddrS, - CreatedAt: gotime.Current().Format(), - }) -} - -// Run 任务执行 -func (jobsGorm *JobsGorm) Run(tx *gorm.DB, info Task, status int, desc string) { - // 请求函数记录 - statusCreate := tx.Create(&TaskLog{ - TaskId: info.Id, - StatusCode: status, - Desc: desc, - Version: jobsGorm.runVersion, - CreatedAt: gotime.Current().Format(), - }) - if statusCreate.RowsAffected == 0 { - log.Println("statusCreate", statusCreate.Error) - } - if status == 0 { - statusEdit := jobsGorm.EditTask(tx, info.Id).Select("run_id").Updates(Task{ - RunId: gouuid.GetUuId(), - }) - if statusEdit.RowsAffected == 0 { - log.Println("statusEdit", statusEdit.Error) - } - return - } - // 任务 - if status == CodeSuccess { - // 执行成功 - statusEdit := jobsGorm.EditTask(tx, info.Id). - Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result"). - Updates(Task{ - StatusDesc: "执行成功", - Number: info.Number + 1, - RunId: gouuid.GetUuId(), - UpdatedIp: jobsGorm.outsideIp, - UpdatedAt: gotime.Current().Format(), - Result: desc, - }) - if statusEdit.RowsAffected == 0 { - log.Println("statusEdit", statusEdit.Error) - } - } - if status == CodeEnd { - // 执行成功、提前结束 - statusEdit := jobsGorm.EditTask(tx, info.Id). - Select("status", "status_desc", "number", "updated_ip", "updated_at", "result"). - Updates(Task{ - Status: jobs_common.TASK_SUCCESS, - StatusDesc: "结束执行", - Number: info.Number + 1, - UpdatedIp: jobsGorm.outsideIp, - UpdatedAt: gotime.Current().Format(), - Result: desc, - }) - if statusEdit.RowsAffected == 0 { - log.Println("statusEdit", statusEdit.Error) - } - } - if status == CodeError { - // 执行失败 - statusEdit := jobsGorm.EditTask(tx, info.Id). - Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result"). - Updates(Task{ - StatusDesc: "执行失败", - Number: info.Number + 1, - RunId: gouuid.GetUuId(), - UpdatedIp: jobsGorm.outsideIp, - UpdatedAt: gotime.Current().Format(), - Result: desc, - }) - if statusEdit.RowsAffected == 0 { - log.Println("statusEdit", statusEdit.Error) - } - } - if info.MaxNumber != 0 { - if info.Number+1 >= info.MaxNumber { - // 关闭执行 - statusEdit := jobsGorm.EditTask(tx, info.Id). - Select("status"). - Updates(Task{ - Status: jobs_common.TASK_TIMEOUT, - }) - if statusEdit.RowsAffected == 0 { - log.Println("statusEdit", statusEdit.Error) - } - } - } -} diff --git a/jobs_gorm/lock.go b/jobs_gorm/lock.go deleted file mode 100644 index 81201ba..0000000 --- a/jobs_gorm/lock.go +++ /dev/null @@ -1,35 +0,0 @@ -package jobs_gorm - -import ( - "fmt" - "go.dtapp.net/goredis" - "time" -) - -// Lock 上锁 -func (jobsGorm *JobsGorm) Lock(info Task, id any) string { - cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id) - judgeCache := jobsGorm.Redis.NewStringOperation().Get(cacheName).UnwrapOr("") - if judgeCache != "" { - return judgeCache - } - jobsGorm.Redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器上锁成功", jobsGorm.outsideIp), goredis.WithExpire(time.Millisecond*time.Duration(info.Frequency)*3)) - return "" -} - -// Unlock Lock 解锁 -func (jobsGorm *JobsGorm) Unlock(info Task, id any) { - cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id) - jobsGorm.Redis.NewStringOperation().Del(cacheName) -} - -// LockForever 永远上锁 -func (jobsGorm *JobsGorm) LockForever(info Task, id any) string { - cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id) - judgeCache := jobsGorm.Redis.NewStringOperation().Get(cacheName).UnwrapOr("") - if judgeCache != "" { - return judgeCache - } - jobsGorm.Redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器永远上锁成功", jobsGorm.outsideIp)) - return "" -} diff --git a/jobs_gorm/model_task.go b/jobs_gorm/model_task.go deleted file mode 100644 index 7f073e6..0000000 --- a/jobs_gorm/model_task.go +++ /dev/null @@ -1,158 +0,0 @@ -package jobs_gorm - -import ( - "go.dtapp.net/gojobs/jobs_common" - "gorm.io/gorm" -) - -// Task 任务 -type Task struct { - Id uint `gorm:"primaryKey;comment:记录编号" json:"id"` // 记录编号 - Status string `gorm:"index;comment:状态码" json:"status"` // 状态码 - Params string `gorm:"comment:参数" json:"params"` // 参数 - ParamsType string `gorm:"comment:参数类型" json:"params_type"` // 参数类型 - StatusDesc string `gorm:"comment:状态描述" json:"status_desc"` // 状态描述 - Frequency int64 `gorm:"index;comment:频率(秒单位)" json:"frequency"` // 频率(秒单位) - Number int64 `gorm:"comment:当前次数" json:"number"` // 当前次数 - MaxNumber int64 `gorm:"comment:最大次数" json:"max_number"` // 最大次数 - RunId string `gorm:"comment:执行编号" json:"run_id"` // 执行编号 - CustomId string `gorm:"index;comment:自定义编号" json:"custom_id"` // 自定义编号 - CustomSequence int64 `gorm:"comment:自定义顺序" json:"custom_sequence"` // 自定义顺序 - Type string `gorm:"index;comment:类型" json:"type"` // 类型 - CreatedIp string `gorm:"comment:创建外网IP" json:"created_ip"` // 创建外网IP - SpecifyIp string `gorm:"comment:指定外网IP" json:"specify_ip"` // 指定外网IP - UpdatedIp string `gorm:"comment:更新外网IP" json:"updated_ip"` // 更新外网IP - Result string `gorm:"comment:结果" json:"result"` // 结果 - CreatedAt string `gorm:"type:text;comment:创建时间" json:"created_at"` // 创建时间 - UpdatedAt string `gorm:"type:text;comment:更新时间" json:"updated_at"` // 更新时间 - DeletedAt gorm.DeletedAt `gorm:"type:text;index;comment:删除时间" json:"deleted_at"` // 删除时间 -} - -func (m *Task) TableName() string { - return "task" -} - -// TaskTake 查询单任务 -func (jobsGorm *JobsGorm) TaskTake(tx *gorm.DB, customId string) (result Task) { - tx.Where("custom_id = ?", customId).Take(&result) - return result -} - -// 查询单任务 -func (jobsGorm *JobsGorm) taskTake(tx *gorm.DB, customId, status string) (result Task) { - tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result) - return result -} - -// TaskTakeIn 查询单任务 - 任务运行 -func (jobsGorm *JobsGorm) TaskTakeIn(tx *gorm.DB, customId string) Task { - return jobsGorm.taskTake(tx, customId, jobs_common.TASK_IN) -} - -// TaskTakeSuccess 查询单任务 - 任务完成 -func (jobsGorm *JobsGorm) TaskTakeSuccess(tx *gorm.DB, customId string) Task { - return jobsGorm.taskTake(tx, customId, jobs_common.TASK_SUCCESS) -} - -// TaskTakeError 查询单任务 - 任务异常 -func (jobsGorm *JobsGorm) TaskTakeError(tx *gorm.DB, customId string) Task { - return jobsGorm.taskTake(tx, customId, jobs_common.TASK_ERROR) -} - -// TaskTakeTimeout 查询单任务 - 任务超时 -func (jobsGorm *JobsGorm) TaskTakeTimeout(tx *gorm.DB, customId string) Task { - return jobsGorm.taskTake(tx, customId, jobs_common.TASK_TIMEOUT) -} - -// TaskTakeWait 查询单任务 - 任务等待 -func (jobsGorm *JobsGorm) TaskTakeWait(tx *gorm.DB, customId string) Task { - return jobsGorm.taskTake(tx, customId, jobs_common.TASK_WAIT) -} - -// TaskTypeTake 查询单任务 -func (jobsGorm *JobsGorm) TaskTypeTake(tx *gorm.DB, customId, Type string) (result Task) { - tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result) - return result -} - -// 查询单任务 -func (jobsGorm *JobsGorm) taskTypeTake(tx *gorm.DB, customId, Type, status string) (result Task) { - tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result) - return result -} - -// TaskTypeTakeIn 查询单任务 - 任务运行 -func (jobsGorm *JobsGorm) TaskTypeTakeIn(tx *gorm.DB, customId, Type string) Task { - return jobsGorm.taskTypeTake(tx, customId, Type, jobs_common.TASK_IN) -} - -// TaskTypeTakeSuccess 查询单任务 - 任务完成 -func (jobsGorm *JobsGorm) TaskTypeTakeSuccess(tx *gorm.DB, customId, Type string) Task { - return jobsGorm.taskTypeTake(tx, customId, Type, jobs_common.TASK_SUCCESS) -} - -// TaskTypeTakeError 查询单任务 - 任务异常 -func (jobsGorm *JobsGorm) TaskTypeTakeError(tx *gorm.DB, customId, Type string) Task { - return jobsGorm.taskTypeTake(tx, customId, Type, jobs_common.TASK_ERROR) -} - -// TaskTypeTakeTimeout 查询单任务 - 任务超时 -func (jobsGorm *JobsGorm) TaskTypeTakeTimeout(tx *gorm.DB, customId, Type string) Task { - return jobsGorm.taskTypeTake(tx, customId, Type, jobs_common.TASK_TIMEOUT) -} - -// TaskTypeTakeWait 查询单任务 - 任务等待 -func (jobsGorm *JobsGorm) TaskTypeTakeWait(tx *gorm.DB, customId, Type string) Task { - return jobsGorm.taskTypeTake(tx, customId, Type, jobs_common.TASK_WAIT) -} - -// TaskFindAll 查询多任务 -func (jobsGorm *JobsGorm) TaskFindAll(tx *gorm.DB, frequency int64) (results []Task) { - tx.Where("frequency = ?", frequency).Order("id asc").Find(&results) - return results -} - -// 查询多任务 -func (jobsGorm *JobsGorm) taskFindAll(tx *gorm.DB, frequency int64, status string) (results []Task) { - tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results) - return results -} - -// TaskFindAllIn 查询多任务 - 任务运行 -func (jobsGorm *JobsGorm) TaskFindAllIn(tx *gorm.DB, frequency int64) []Task { - return jobsGorm.taskFindAll(tx, frequency, jobs_common.TASK_IN) -} - -// TaskFindAllSuccess 查询多任务 - 任务完成 -func (jobsGorm *JobsGorm) TaskFindAllSuccess(tx *gorm.DB, frequency int64) []Task { - return jobsGorm.taskFindAll(tx, frequency, jobs_common.TASK_SUCCESS) -} - -// TaskFindAllError 查询多任务 - 任务异常 -func (jobsGorm *JobsGorm) TaskFindAllError(tx *gorm.DB, frequency int64) []Task { - return jobsGorm.taskFindAll(tx, frequency, jobs_common.TASK_ERROR) -} - -// TaskFindAllTimeout 查询多任务 - 任务超时 -func (jobsGorm *JobsGorm) TaskFindAllTimeout(tx *gorm.DB, frequency int64) []Task { - return jobsGorm.taskFindAll(tx, frequency, jobs_common.TASK_TIMEOUT) -} - -// TaskFindAllWait 查询多任务 - 任务等待 -func (jobsGorm *JobsGorm) TaskFindAllWait(tx *gorm.DB, frequency int64) []Task { - return jobsGorm.taskFindAll(tx, frequency, jobs_common.TASK_WAIT) -} - -// EditTask 任务修改 -func (jobsGorm *JobsGorm) EditTask(tx *gorm.DB, id uint) *gorm.DB { - return tx.Model(&Task{}).Where("id = ?", id) -} - -// UpdateFrequency 更新任务频率 -func (jobsGorm *JobsGorm) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB { - return jobsGorm.EditTask(tx, id). - Select("frequency"). - Updates(Task{ - Frequency: frequency, - }) -} diff --git a/jobs_gorm/model_task_ip.go b/jobs_gorm/model_task_ip.go deleted file mode 100644 index dde0fde..0000000 --- a/jobs_gorm/model_task_ip.go +++ /dev/null @@ -1,69 +0,0 @@ -package jobs_gorm - -import ( - "gorm.io/gorm" - "log" - "strings" -) - -// TaskIp 任务Ip -type TaskIp struct { - Id int64 `gorm:"primaryKey;comment:记录编号" json:"id"` // 记录编号 - TaskType string `gorm:"comment:任务编号" json:"task_type"` // 任务编号 - Ips string `gorm:"comment:任务IP" json:"ips"` // 任务IP -} - -func (m *TaskIp) TableName() string { - return "task_ip" -} - -func (jobsGorm *JobsGorm) taskIpTake(tx *gorm.DB, taskType, ips string) (result TaskIp) { - tx.Where("task_type = ?", taskType).Where("ips = ?", ips).Take(&result) - return result -} - -// TaskIpUpdate 更新ip -func (jobsGorm *JobsGorm) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB { - query := jobsGorm.taskIpTake(tx, taskType, ips) - if query.Id != 0 { - return tx - } - updateStatus := tx.Create(&TaskIp{ - TaskType: taskType, - Ips: ips, - }) - if updateStatus.RowsAffected == 0 { - log.Println("任务更新失败:", updateStatus.Error) - } - return updateStatus -} - -// TaskIpInit 实例任务ip -func (jobsGorm *JobsGorm) TaskIpInit(tx *gorm.DB, ips map[string]string) bool { - if jobsGorm.outsideIp == "" || jobsGorm.outsideIp == "0.0.0.0" { - return false - } - tx.Where("ips = ?", jobsGorm.outsideIp).Delete(&TaskIp{}) // 删除 - for k, v := range ips { - if v == "" { - jobsGorm.TaskIpUpdate(tx, k, jobsGorm.outsideIp) - } else { - find := strings.Contains(v, ",") - if find == true { - // 包含 - parts := strings.Split(v, ",") - for _, vv := range parts { - if vv == jobsGorm.outsideIp { - jobsGorm.TaskIpUpdate(tx, k, jobsGorm.outsideIp) - } - } - } else { - // 不包含 - if v == jobsGorm.outsideIp { - jobsGorm.TaskIpUpdate(tx, k, jobsGorm.outsideIp) - } - } - } - } - return true -} diff --git a/jobs_gorm/params.go b/jobs_gorm/params.go deleted file mode 100644 index 33e1893..0000000 --- a/jobs_gorm/params.go +++ /dev/null @@ -1,29 +0,0 @@ -package jobs_gorm - -var ParamsOrderType = "order" - -// ParamsOrderId 订单任务 -type ParamsOrderId struct { - OrderId string `json:"order_id,omitempty"` -} - -var ParamsMerchantGoldenBeanType = "merchant.golden_bean" - -var ParamsNewServiceType = "new_service" - -// ParamsTaskId 企业自定义任务 -type ParamsTaskId struct { - TaskId int64 `json:"task_id,omitempty"` -} - -var ParamsNewServiceNextType = "new_service.next" - -// ParamsTaskIdNext 企业自定义下一步任务 -type ParamsTaskIdNext struct { - TaskId int64 `json:"task_id,omitempty"` - MerchantUserId int64 `json:"merchant_user_id,omitempty"` - CurrentNumber int `json:"current_number,omitempty"` - MaxNumber int `json:"max_number,omitempty"` -} - -var ParamsTeamInvType = "team.inv" diff --git a/jobs_gorm/type.go b/jobs_gorm/type.go deleted file mode 100644 index cf9d1c7..0000000 --- a/jobs_gorm/type.go +++ /dev/null @@ -1,25 +0,0 @@ -package jobs_gorm - -func GetTypeApiPaySubmit(Type string) string { - return "api.pay.submit." + Type -} - -func GetTypeWechatRefundsSubmit(Type string) string { - return "wechat.refunds.submit." + Type -} - -func GetTypeWechatRefundsQuery(Type string) string { - return "wechat.refunds.query." + Type -} - -func GetTypeGoldenBeansIssue(Type string) string { - return "golden_beans.issue." + Type -} - -func GetTypeGoldenBeansRefunds(Type string) string { - return "golden_beans.refunds." + Type -} - -func GetTypeCustomerAuto(Type string) string { - return "customer.auto." + Type -} diff --git a/jobs_gorm/check_task.go b/jobs_gorm_check_task.go similarity index 66% rename from jobs_gorm/check_task.go rename to jobs_gorm_check_task.go index 4e35960..f00cbe2 100644 --- a/jobs_gorm/check_task.go +++ b/jobs_gorm_check_task.go @@ -1,18 +1,19 @@ -package jobs_gorm +package gojobs import ( + "go.dtapp.net/gojobs/jobs_gorm_model" "go.dtapp.net/gotime" "gorm.io/gorm" "log" ) -func (jobsGorm *JobsGorm) Check(tx *gorm.DB, vs []Task) { - if jobsGorm.mainService > 0 && len(vs) > 0 { +func (j *jobsGorm) Check(tx *gorm.DB, vs []jobs_gorm_model.Task) { + if j.mainService > 0 && len(vs) > 0 { for _, v := range vs { diffInSecondWithAbs := gotime.Current().DiffInSecondWithAbs(gotime.SetCurrentParse(v.UpdatedAt).Time) if diffInSecondWithAbs >= v.Frequency*3 { log.Printf("每隔%v秒任务:%v相差%v秒\n", v.Frequency, v.Id, diffInSecondWithAbs) - tx.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&TaskLogRun{}) // 删除 + tx.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&jobs_gorm_model.TaskLogRun{}) // 删除 } } } diff --git a/jobs_gorm_ip.go b/jobs_gorm_ip.go new file mode 100644 index 0000000..1d0edb1 --- /dev/null +++ b/jobs_gorm_ip.go @@ -0,0 +1,20 @@ +package gojobs + +import ( + "go.dtapp.net/goip" + "go.dtapp.net/gojobs/jobs_gorm_model" + "gorm.io/gorm" +) + +// RefreshIp 刷新Ip +func (j *jobsGorm) RefreshIp(tx *gorm.DB) { + xip := goip.GetOutsideIp() + if j.outsideIp == "" || j.outsideIp == "0.0.0.0" { + return + } + if j.outsideIp == xip { + return + } + tx.Where("ips = ?", j.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除 + j.outsideIp = xip +} diff --git a/jobs_gorm_lock.go b/jobs_gorm_lock.go new file mode 100644 index 0000000..28eed35 --- /dev/null +++ b/jobs_gorm_lock.go @@ -0,0 +1,36 @@ +package gojobs + +import ( + "fmt" + "go.dtapp.net/gojobs/jobs_gorm_model" + "go.dtapp.net/goredis" + "time" +) + +// Lock 上锁 +func (j *jobsGorm) Lock(info jobs_gorm_model.Task, id any) string { + cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id) + judgeCache := j.redis.NewStringOperation().Get(cacheName).UnwrapOr("") + if judgeCache != "" { + return judgeCache + } + j.redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器上锁成功", j.outsideIp), goredis.WithExpire(time.Millisecond*time.Duration(info.Frequency)*3)) + return "" +} + +// Unlock Lock 解锁 +func (j *jobsGorm) Unlock(info jobs_gorm_model.Task, id any) { + cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id) + j.redis.NewStringOperation().Del(cacheName) +} + +// LockForever 永远上锁 +func (j *jobsGorm) LockForever(info jobs_gorm_model.Task, id any) string { + cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id) + judgeCache := j.redis.NewStringOperation().Get(cacheName).UnwrapOr("") + if judgeCache != "" { + return judgeCache + } + j.redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器永远上锁成功", j.outsideIp)) + return "" +} diff --git a/jobs_gorm_model.go b/jobs_gorm_model.go new file mode 100644 index 0000000..2ffc1d9 --- /dev/null +++ b/jobs_gorm_model.go @@ -0,0 +1,190 @@ +package gojobs + +import ( + "go.dtapp.net/gojobs/jobs_gorm_model" + "gorm.io/gorm" + "log" + "strings" +) + +// TaskTake 查询单任务 +func (j *jobsGorm) TaskTake(tx *gorm.DB, customId string) (result jobs_gorm_model.Task) { + tx.Where("custom_id = ?", customId).Take(&result) + return result +} + +// 查询单任务 +func (j *jobsGorm) taskTake(tx *gorm.DB, customId, status string) (result jobs_gorm_model.Task) { + tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result) + return result +} + +// TaskTakeIn 查询单任务 - 任务运行 +func (j *jobsGorm) TaskTakeIn(tx *gorm.DB, customId string) jobs_gorm_model.Task { + return j.taskTake(tx, customId, TASK_IN) +} + +// TaskTakeSuccess 查询单任务 - 任务完成 +func (j *jobsGorm) TaskTakeSuccess(tx *gorm.DB, customId string) jobs_gorm_model.Task { + return j.taskTake(tx, customId, TASK_SUCCESS) +} + +// TaskTakeError 查询单任务 - 任务异常 +func (j *jobsGorm) TaskTakeError(tx *gorm.DB, customId string) jobs_gorm_model.Task { + return j.taskTake(tx, customId, TASK_ERROR) +} + +// TaskTakeTimeout 查询单任务 - 任务超时 +func (j *jobsGorm) TaskTakeTimeout(tx *gorm.DB, customId string) jobs_gorm_model.Task { + return j.taskTake(tx, customId, TASK_TIMEOUT) +} + +// TaskTakeWait 查询单任务 - 任务等待 +func (j *jobsGorm) TaskTakeWait(tx *gorm.DB, customId string) jobs_gorm_model.Task { + return j.taskTake(tx, customId, TASK_WAIT) +} + +// TaskTypeTake 查询单任务 +func (j *jobsGorm) TaskTypeTake(tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task) { + tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result) + return result +} + +// 查询单任务 +func (j *jobsGorm) taskTypeTake(tx *gorm.DB, customId, Type, status string) (result jobs_gorm_model.Task) { + tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result) + return result +} + +// TaskTypeTakeIn 查询单任务 - 任务运行 +func (j *jobsGorm) TaskTypeTakeIn(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { + return j.taskTypeTake(tx, customId, Type, TASK_IN) +} + +// TaskTypeTakeSuccess 查询单任务 - 任务完成 +func (j *jobsGorm) TaskTypeTakeSuccess(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { + return j.taskTypeTake(tx, customId, Type, TASK_SUCCESS) +} + +// TaskTypeTakeError 查询单任务 - 任务异常 +func (j *jobsGorm) TaskTypeTakeError(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { + return j.taskTypeTake(tx, customId, Type, TASK_ERROR) +} + +// TaskTypeTakeTimeout 查询单任务 - 任务超时 +func (j *jobsGorm) TaskTypeTakeTimeout(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { + return j.taskTypeTake(tx, customId, Type, TASK_TIMEOUT) +} + +// TaskTypeTakeWait 查询单任务 - 任务等待 +func (j *jobsGorm) TaskTypeTakeWait(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { + return j.taskTypeTake(tx, customId, Type, TASK_WAIT) +} + +// TaskFindAll 查询多任务 +func (j *jobsGorm) TaskFindAll(tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task) { + tx.Where("frequency = ?", frequency).Order("id asc").Find(&results) + return results +} + +// 查询多任务 +func (j *jobsGorm) taskFindAll(tx *gorm.DB, frequency int64, status string) (results []jobs_gorm_model.Task) { + tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results) + return results +} + +// TaskFindAllIn 查询多任务 - 任务运行 +func (j *jobsGorm) TaskFindAllIn(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { + return j.taskFindAll(tx, frequency, TASK_IN) +} + +// TaskFindAllSuccess 查询多任务 - 任务完成 +func (j *jobsGorm) TaskFindAllSuccess(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { + return j.taskFindAll(tx, frequency, TASK_SUCCESS) +} + +// TaskFindAllError 查询多任务 - 任务异常 +func (j *jobsGorm) TaskFindAllError(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { + return j.taskFindAll(tx, frequency, TASK_ERROR) +} + +// TaskFindAllTimeout 查询多任务 - 任务超时 +func (j *jobsGorm) TaskFindAllTimeout(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { + return j.taskFindAll(tx, frequency, TASK_TIMEOUT) +} + +// TaskFindAllWait 查询多任务 - 任务等待 +func (j *jobsGorm) TaskFindAllWait(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { + return j.taskFindAll(tx, frequency, TASK_WAIT) +} + +// EditTask 任务修改 +func (j *jobsGorm) EditTask(tx *gorm.DB, id uint) *gorm.DB { + return tx.Model(&jobs_gorm_model.Task{}).Where("id = ?", id) +} + +// UpdateFrequency 更新任务频率 +func (j *jobsGorm) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB { + return j.EditTask(tx, id). + Select("frequency"). + Updates(jobs_gorm_model.Task{ + Frequency: frequency, + }) +} + +func (j *jobsGorm) taskIpTake(tx *gorm.DB, taskType, ips string) (result jobs_gorm_model.TaskIp) { + tx.Where("task_type = ?", taskType).Where("ips = ?", ips).Take(&result) + return result +} + +// TaskIpUpdate 更新ip +func (j *jobsGorm) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB { + query := j.taskIpTake(tx, taskType, ips) + if query.Id != 0 { + return tx + } + updateStatus := tx.Create(&jobs_gorm_model.TaskIp{ + TaskType: taskType, + Ips: ips, + }) + if updateStatus.RowsAffected == 0 { + log.Println("任务更新失败:", updateStatus.Error) + } + return updateStatus +} + +// TaskIpInit 实例任务ip +func (j *jobsGorm) TaskIpInit(tx *gorm.DB, ips map[string]string) bool { + if j.outsideIp == "" || j.outsideIp == "0.0.0.0" { + return false + } + tx.Where("ips = ?", j.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除 + for k, v := range ips { + if v == "" { + j.TaskIpUpdate(tx, k, j.outsideIp) + } else { + find := strings.Contains(v, ",") + if find == true { + // 包含 + parts := strings.Split(v, ",") + for _, vv := range parts { + if vv == j.outsideIp { + j.TaskIpUpdate(tx, k, j.outsideIp) + } + } + } else { + // 不包含 + if v == j.outsideIp { + j.TaskIpUpdate(tx, k, j.outsideIp) + } + } + } + } + return true +} + +// TaskLogRunTake 查询任务执行日志 +func (j *jobsGorm) TaskLogRunTake(tx *gorm.DB, taskId uint, runId string) (result jobs_gorm_model.TaskLogRun) { + tx.Select("id", "os", "arch", "outside_ip", "created_at").Where("task_id = ?", taskId).Where("run_id = ?", runId).Take(&result) + return result +} diff --git a/jobs_gorm_model/task.go b/jobs_gorm_model/task.go new file mode 100644 index 0000000..5b6c957 --- /dev/null +++ b/jobs_gorm_model/task.go @@ -0,0 +1,32 @@ +package jobs_gorm_model + +import ( + "gorm.io/gorm" +) + +// Task 任务 +type Task struct { + Id uint `gorm:"primaryKey;comment:记录编号" json:"id"` // 记录编号 + Status string `gorm:"index;comment:状态码" json:"status"` // 状态码 + Params string `gorm:"comment:参数" json:"params"` // 参数 + ParamsType string `gorm:"comment:参数类型" json:"params_type"` // 参数类型 + StatusDesc string `gorm:"comment:状态描述" json:"status_desc"` // 状态描述 + Frequency int64 `gorm:"index;comment:频率(秒单位)" json:"frequency"` // 频率(秒单位) + Number int64 `gorm:"comment:当前次数" json:"number"` // 当前次数 + MaxNumber int64 `gorm:"comment:最大次数" json:"max_number"` // 最大次数 + RunId string `gorm:"comment:执行编号" json:"run_id"` // 执行编号 + CustomId string `gorm:"index;comment:自定义编号" json:"custom_id"` // 自定义编号 + CustomSequence int64 `gorm:"comment:自定义顺序" json:"custom_sequence"` // 自定义顺序 + Type string `gorm:"index;comment:类型" json:"type"` // 类型 + CreatedIp string `gorm:"comment:创建外网IP" json:"created_ip"` // 创建外网IP + SpecifyIp string `gorm:"comment:指定外网IP" json:"specify_ip"` // 指定外网IP + UpdatedIp string `gorm:"comment:更新外网IP" json:"updated_ip"` // 更新外网IP + Result string `gorm:"comment:结果" json:"result"` // 结果 + CreatedAt string `gorm:"type:text;comment:创建时间" json:"created_at"` // 创建时间 + UpdatedAt string `gorm:"type:text;comment:更新时间" json:"updated_at"` // 更新时间 + DeletedAt gorm.DeletedAt `gorm:"type:text;index;comment:删除时间" json:"deleted_at"` // 删除时间 +} + +func (m *Task) TableName() string { + return "task" +} diff --git a/jobs_gorm_model/task_ip.go b/jobs_gorm_model/task_ip.go new file mode 100644 index 0000000..931013e --- /dev/null +++ b/jobs_gorm_model/task_ip.go @@ -0,0 +1,12 @@ +package jobs_gorm_model + +// TaskIp 任务Ip +type TaskIp struct { + Id int64 `gorm:"primaryKey;comment:记录编号" json:"id"` // 记录编号 + TaskType string `gorm:"comment:任务编号" json:"task_type"` // 任务编号 + Ips string `gorm:"comment:任务IP" json:"ips"` // 任务IP +} + +func (m *TaskIp) TableName() string { + return "task_ip" +} diff --git a/jobs_gorm/model_task_log.go b/jobs_gorm_model/task_log.go similarity index 96% rename from jobs_gorm/model_task_log.go rename to jobs_gorm_model/task_log.go index 42cf307..fac2de3 100644 --- a/jobs_gorm/model_task_log.go +++ b/jobs_gorm_model/task_log.go @@ -1,4 +1,4 @@ -package jobs_gorm +package jobs_gorm_model // TaskLog 任务日志模型 type TaskLog struct { diff --git a/jobs_gorm/model_task_log_run.go b/jobs_gorm_model/task_log_run.go similarity index 76% rename from jobs_gorm/model_task_log_run.go rename to jobs_gorm_model/task_log_run.go index 6f40b2c..7df23c7 100644 --- a/jobs_gorm/model_task_log_run.go +++ b/jobs_gorm_model/task_log_run.go @@ -1,8 +1,4 @@ -package jobs_gorm - -import ( - "gorm.io/gorm" -) +package jobs_gorm_model // TaskLogRun 任务执行日志模型 type TaskLogRun struct { @@ -22,9 +18,3 @@ type TaskLogRun struct { func (m *TaskLogRun) TableName() string { return "task_log_run" } - -// TaskLogRunTake 查询任务执行日志 -func (jobsGorm *JobsGorm) TaskLogRunTake(tx *gorm.DB, taskId uint, runId string) (result TaskLogRun) { - tx.Select("id", "os", "arch", "outside_ip", "created_at").Where("task_id = ?", taskId).Where("run_id = ?", runId).Take(&result) - return result -} diff --git a/jobs_options.go b/jobs_options.go new file mode 100644 index 0000000..cc2731f --- /dev/null +++ b/jobs_options.go @@ -0,0 +1,25 @@ +package gojobs + +// +//type JobsOption func(*JobsCron) +// +//// WithRedis 缓存服务驱动 +//func WithRedis(db *goredis.Client) JobsOption { +// return func(opts *JobsCron) { +// opts.redis = db +// } +//} +// +//// WithGorm 数据库服务驱动 +//func WithGorm(db *gorm.DB) JobsOption { +// return func(opts *JobsCron) { +// opts.db = db +// } +//} +// +//// WithMainService 是否主要服务(主要服务可删除过期服务) +//func WithMainService(status int) JobsOption { +// return func(opts *JobsCron) { +// opts.mainService = status +// } +//} diff --git a/jobs_test.go b/jobs_test.go index f79863b..e592a16 100644 --- a/jobs_test.go +++ b/jobs_test.go @@ -1,16 +1,13 @@ package gojobs -import ( - "testing" -) - -func TestSpec(t *testing.T) { - t.Log("每隔n秒执行一次:", GetSpecSeconds(10)) - t.Log("每隔n秒执行一次:", GetFrequencySeconds(10)) - - t.Log("每隔n分钟执行一次:", GetSpecMinutes(10)) - t.Log("每隔n分钟执行一次:", GetFrequencyMinutes(10)) - - t.Log("每天n点执行一次:", GetSpecHour(10)) - t.Log("每天n点执行一次:", GetFrequencyHour(10)) +import "testing" + +func TestJobs1(t *testing.T) { + //t.Log(NewJobsGorm(&ConfigJobsGorm{ + // MainService: 0, + // Db: nil, + // Redis: nil, + //}).Run()) + //t.Log(NewJobsXorm().Run()) + //t.Log(NewJobsZorm().Run()) } diff --git a/jobs_xorm.go b/jobs_xorm.go index 41dbbe2..74661fa 100644 --- a/jobs_xorm.go +++ b/jobs_xorm.go @@ -2,14 +2,20 @@ package gojobs import "xorm.io/xorm" -type JobsXorm struct { - Db *xorm.Engine +// Xorm数据库驱动 +type jobsXorm struct { + db *xorm.Engine } -func newJobsXorm(db *xorm.Engine) *JobsXorm { +// NewJobsXorm 初始化 +func NewJobsXorm(db *xorm.Engine) *jobsXorm { var ( - jobsXorm = &JobsXorm{} + j = &jobsXorm{} ) - jobsXorm.Db = db - return jobsXorm + j.db = db + return j +} + +func (j *jobsXorm) Run() { + } diff --git a/jobs_zorm.go b/jobs_zorm.go index e069bf3..a38e58a 100644 --- a/jobs_zorm.go +++ b/jobs_zorm.go @@ -2,14 +2,20 @@ package gojobs import "gitee.com/chunanyong/zorm" -type JobsZorm struct { - Db *zorm.DBDao +// Zorm数据库驱动 +type jobsZorm struct { + db *zorm.DBDao } -func NewJobsZorm(db *zorm.DBDao) *JobsZorm { +// NewJobsZorm 初始化 +func NewJobsZorm(db *zorm.DBDao) *jobsZorm { var ( - jobsZorm = &JobsZorm{} + j = &jobsZorm{} ) - jobsZorm.Db = db - return jobsZorm + j.db = db + return j +} + +func (j *jobsZorm) Run() { + } diff --git a/version.go b/version.go index 4d1ae51..f821713 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package gojobs -const Version = "1.0.29" +const Version = "1.0.30"