- update jobs

master
李光春 2 years ago
parent 5947e93c93
commit 88978c9039

@ -7,7 +7,7 @@ require (
github.com/aliyun/aliyun-oss-go-sdk v2.2.4+incompatible
github.com/allegro/bigcache/v3 v3.0.2
github.com/aws/aws-sdk-go v1.44.42
github.com/baidubce/bce-sdk-go v0.9.125
github.com/baidubce/bce-sdk-go v0.9.126
github.com/basgys/goxml2json v1.1.0
github.com/beego/beego/v2 v2.0.4
github.com/bradfitz/gomemcache v0.0.0-20220106215444-fb4bf637b56d

@ -44,6 +44,8 @@ github.com/aws/aws-sdk-go v1.44.42/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4
github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g=
github.com/baidubce/bce-sdk-go v0.9.125 h1:je4wqeQzch22S0islZZLKtmyzKld4KY6OVjuKOTwdOQ=
github.com/baidubce/bce-sdk-go v0.9.125/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg=
github.com/baidubce/bce-sdk-go v0.9.126 h1:wrpb94AN8gLtuGr3wfEm0DObrNulX3W13MeG3lo9SRg=
github.com/baidubce/bce-sdk-go v0.9.126/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA=
github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc=
github.com/basgys/goxml2json v1.1.0 h1:4ln5i4rseYfXNd86lGEB+Vi652IsIXIvggKM/BhUKVw=

@ -3,44 +3,65 @@ package gojobs
import (
"errors"
"fmt"
"github.com/go-redis/redis/v8"
"go.dtapp.net/library"
"go.dtapp.net/library/utils/dorm"
"go.dtapp.net/library/utils/goarray"
"go.dtapp.net/library/utils/goip"
"go.dtapp.net/library/utils/gojobs/jobs_gorm_model"
"go.dtapp.net/library/utils/only"
"go.dtapp.net/library/utils/golock"
"go.etcd.io/etcd/client/v3"
"gorm.io/gorm"
"log"
"runtime"
)
// ConfigJobsGorm 配置
type ConfigJobsGorm struct {
runVersion string // 运行版本
os string // 系统类型
arch string // 系统架构
maxProCs int // CPU核数
version string // GO版本
macAddrS string // Mac地址
insideIp string // 内网ip
OutsideIp string // 外网ip
MainService int // 主要服务
Db *gorm.DB // 数据库
Redis *dorm.RedisClient // 缓存数据库服务
Debug bool // 是否开启调测
}
// JobsGorm Gorm数据库驱动
type JobsGorm struct {
db *gorm.DB // 数据库
redis *dorm.RedisClient // 缓存数据库服务
config *ConfigJobsGorm // 配置
db struct {
gormClient *gorm.DB // 数据库驱动
redisClient *redis.Client // 缓存数据库驱动
etcdClient *clientv3.Client // 分布式缓存驱动
}
service struct {
gormClient *gorm.DB // 数据库驱动
lockRedisClient *golock.LockRedis // 缓存数据库驱动
lockEtcdClient *golock.LockEtcd // 分布式缓存驱动
} // 服务
config struct {
lockPrefix string // 锁Key前缀
lockType string // 锁驱动类型
runVersion string // 运行版本
os string // 系统类型
arch string // 系统架构
maxProCs int // CPU核数
version string // GO版本
macAddrS string // Mac地址
insideIp string // 内网ip
outsideIp string // 外网ip
} // 配置
}
// NewJobsGorm 初始化
func NewJobsGorm(config *ConfigJobsGorm) (*JobsGorm, error) {
c := &JobsGorm{config: config}
// WithGormClient
func NewJobsGorm(attrs ...*OperationAttr) (*JobsGorm, error) {
c := &JobsGorm{}
for _, attr := range attrs {
if attr.gormClient != nil {
c.db.gormClient = attr.gormClient
c.service.gormClient = attr.gormClient
}
if attr.redisClient != nil {
c.config.lockType = attr.lockType
c.db.redisClient = attr.redisClient
}
if attr.etcdClient != nil {
c.config.lockType = attr.lockType
c.db.etcdClient = attr.etcdClient
}
if attr.lockPrefix != "" {
c.config.lockPrefix = attr.lockPrefix
}
}
c.config.runVersion = go_library.Version()
c.config.os = runtime.GOOS
@ -50,21 +71,30 @@ func NewJobsGorm(config *ConfigJobsGorm) (*JobsGorm, error) {
c.config.macAddrS = goarray.TurnString(goip.GetMacAddr())
c.config.insideIp = goip.GetInsideIp()
if c.config.OutsideIp == "" {
if c.config.outsideIp == "" {
return nil, errors.New("需要配置当前的IP")
}
c.db = c.config.Db
if c.db == nil {
if c.db.gormClient == nil {
return nil, errors.New("需要配置数据库驱动")
}
c.redis = c.config.Redis
if c.redis == nil {
switch c.config.lockType {
case lockTypeRedis:
if c.db.redisClient == nil {
return nil, errors.New("需要配置缓存驱动")
}
c.service.lockRedisClient = golock.NewLockRedis(c.db.redisClient)
case lockTypeEtcd:
if c.db.etcdClient == nil {
return nil, errors.New("需要配置缓存驱动")
}
c.service.lockEtcdClient = golock.NewLockEtcd(c.db.etcdClient)
default:
return nil, errors.New("需要配置缓存驱动")
}
err := c.db.AutoMigrate(
err := c.service.gormClient.AutoMigrate(
&jobs_gorm_model.Task{},
&jobs_gorm_model.TaskLog{},
&jobs_gorm_model.TaskLogRun{},
@ -74,256 +104,5 @@ func NewJobsGorm(config *ConfigJobsGorm) (*JobsGorm, error) {
return nil, errors.New(fmt.Sprintf("创建任务模型失败:%v\n", err))
}
if c.config.Debug == true {
log.Printf("jobs%+v\n", c)
}
return c, nil
}
func (j *JobsGorm) GetDb() *gorm.DB {
return j.db
}
func (j *JobsGorm) GetRedis() *dorm.RedisClient {
return j.redis
}
// Run 运行
func (j *JobsGorm) Run(info jobs_gorm_model.Task, status int, desc string) {
// 请求函数记录
statusCreate := j.db.Create(&jobs_gorm_model.TaskLog{
TaskId: info.Id,
StatusCode: status,
Desc: desc,
Version: j.config.runVersion,
})
if statusCreate.RowsAffected == 0 {
log.Println("statusCreate", statusCreate.Error)
}
if status == 0 {
statusEdit := j.EditTask(j.db, info.Id).
Select("run_id").
Updates(jobs_gorm_model.Task{
RunId: only.GetUuId(),
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
return
}
// 任务
if status == CodeSuccess {
// 执行成功
statusEdit := j.EditTask(j.db, info.Id).
Select("status_desc", "number", "run_id", "updated_ip", "result").
Updates(jobs_gorm_model.Task{
StatusDesc: "执行成功",
Number: info.Number + 1,
RunId: only.GetUuId(),
UpdatedIp: j.config.OutsideIp,
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if status == CodeEnd {
// 执行成功、提前结束
statusEdit := j.EditTask(j.db, info.Id).
Select("status", "status_desc", "number", "updated_ip", "result").
Updates(jobs_gorm_model.Task{
Status: TASK_SUCCESS,
StatusDesc: "结束执行",
Number: info.Number + 1,
UpdatedIp: j.config.OutsideIp,
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if status == CodeError {
// 执行失败
statusEdit := j.EditTask(j.db, info.Id).
Select("status_desc", "number", "run_id", "updated_ip", "result").
Updates(jobs_gorm_model.Task{
StatusDesc: "执行失败",
Number: info.Number + 1,
RunId: only.GetUuId(),
UpdatedIp: j.config.OutsideIp,
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if info.MaxNumber != 0 {
if info.Number+1 >= info.MaxNumber {
// 关闭执行
statusEdit := j.EditTask(j.db, info.Id).
Select("status").
Updates(jobs_gorm_model.Task{
Status: TASK_TIMEOUT,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
}
}
// RunAddLog 任务执行日志
func (j *JobsGorm) RunAddLog(id uint, runId string) *gorm.DB {
return j.db.Create(&jobs_gorm_model.TaskLogRun{
TaskId: id,
RunId: runId,
InsideIp: j.config.insideIp,
OutsideIp: j.config.OutsideIp,
Os: j.config.os,
Arch: j.config.arch,
Gomaxprocs: j.config.maxProCs,
GoVersion: j.config.version,
MacAddrs: j.config.macAddrS,
})
}
// ConfigCreateInCustomId 创建正在运行任务
type ConfigCreateInCustomId struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomId 创建正在运行任务
func (j *JobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error {
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
RunId: only.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.config.OutsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.config.OutsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdOnly 创建正在运行唯一任务
type ConfigCreateInCustomIdOnly struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdOnly 创建正在运行唯一任务
func (j *JobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error {
query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type))
}
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
RunId: only.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.config.OutsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.config.OutsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdMaxNumber 创建正在运行任务并限制数量
type ConfigCreateInCustomIdMaxNumber struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
MaxNumber int64 // 最大次数
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdMaxNumber 创建正在运行任务并限制数量
func (j *JobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error {
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
MaxNumber: config.MaxNumber,
RunId: only.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.config.OutsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.config.OutsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
type ConfigCreateInCustomIdMaxNumberOnly struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
MaxNumber int64 // 最大次数
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
func (j *JobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error {
query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type))
}
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
MaxNumber: config.MaxNumber,
RunId: only.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.config.OutsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.config.OutsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}

@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"go.dtapp.net/library/utils/gojobs/jobs_gorm_model"
"log"
"math/rand"
"time"
)
@ -21,9 +20,6 @@ func (j *JobsGorm) GetEtcdIssueAddress(server *Etcd, v jobs_gorm_model.Task) (ad
appointIpStatus = true
}
workers, err := server.ListWorkers()
if j.config.Debug == true {
log.Printf("客户端列表:%+v %v\n", workers, len(workers))
}
if err != nil {
return address, errors.New(fmt.Sprintf("获取在线客户端列表失败:%s", err.Error()))
}

@ -0,0 +1,22 @@
package gojobs
import (
"github.com/go-redis/redis/v8"
"go.etcd.io/etcd/client/v3"
"gorm.io/gorm"
)
// GetDb 数据库驱动
func (j *JobsGorm) GetDb() *gorm.DB {
return j.service.gormClient
}
// GetRedis 缓存数据库驱动
func (j *JobsGorm) GetRedis() *redis.Client {
return j.db.redisClient
}
// GetEtcd 分布式缓存驱动
func (j *JobsGorm) GetEtcd() *clientv3.Client {
return j.db.etcdClient
}

@ -9,12 +9,12 @@ import (
// RefreshIp 刷新Ip
func (j *JobsGorm) RefreshIp(tx *gorm.DB) {
xip := goip.GetOutsideIp()
if j.config.OutsideIp == "" || j.config.OutsideIp == "0.0.0.0" {
if j.config.outsideIp == "" || j.config.outsideIp == "0.0.0.0" {
return
}
if j.config.OutsideIp == xip {
if j.config.outsideIp == xip {
return
}
tx.Where("ips = ?", j.config.OutsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除
j.config.OutsideIp = xip
tx.Where("ips = ?", j.config.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除
j.config.outsideIp = xip
}

@ -1,36 +1,67 @@
package gojobs
import (
"errors"
"fmt"
"go.dtapp.net/library/utils/dorm"
"go.dtapp.net/library/utils/gojobs/jobs_gorm_model"
"time"
)
// Lock 上锁
func (j *JobsGorm) Lock(info jobs_gorm_model.Task, id any) string {
cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id)
judgeCache := j.redis.NewStringOperation().Get(cacheName).UnwrapOr("")
if judgeCache != "" {
return judgeCache
func (j *JobsGorm) Lock(info jobs_gorm_model.Task, id any) (string, error) {
if j.config.lockType == "" {
return "", errors.New("没有配置")
}
var (
redisKey = fmt.Sprintf("%s:%v:%v", j.config.lockPrefix, info.Type, id)
etcdKey = fmt.Sprintf("%s/%v/%v", j.config.lockPrefix, info.Type, id)
val = fmt.Sprintf("已在%s@%s机器上锁成功", j.config.insideIp, j.config.outsideIp)
ttl = info.Frequency * 3
)
if j.config.lockType == lockTypeRedis {
return j.service.lockRedisClient.Lock(redisKey, val, ttl)
}
j.redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器上锁成功", j.config.OutsideIp), dorm.WithExpire(time.Millisecond*time.Duration(info.Frequency)*3))
return ""
return j.service.lockEtcdClient.Lock(etcdKey, val, ttl)
}
// Unlock Lock 解锁
func (j *JobsGorm) Unlock(info jobs_gorm_model.Task, id any) {
cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id)
j.redis.NewStringOperation().Del(cacheName)
func (j *JobsGorm) Unlock(info jobs_gorm_model.Task, id any) error {
if j.config.lockType == "" {
return errors.New("没有配置")
}
var (
redisKey = fmt.Sprintf("%s:%v:%v", j.config.lockPrefix, info.Type, id)
etcdKey = fmt.Sprintf("%s/%v/%v", j.config.lockPrefix, info.Type, id)
)
if j.config.lockType == lockTypeRedis {
return j.service.lockRedisClient.Unlock(redisKey)
}
return j.service.lockEtcdClient.Unlock(etcdKey)
}
// LockForever 永远上锁
func (j *JobsGorm) LockForever(info jobs_gorm_model.Task, id any) string {
cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id)
judgeCache := j.redis.NewStringOperation().Get(cacheName).UnwrapOr("")
if judgeCache != "" {
return judgeCache
func (j *JobsGorm) LockForever(info jobs_gorm_model.Task, id any) (string, error) {
if j.config.lockType == "" {
return "", errors.New("没有配置")
}
j.redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器永远上锁成功", j.config.OutsideIp))
return ""
var (
redisKey = fmt.Sprintf("%s:%v:%v", j.config.lockPrefix, info.Type, id)
etcdKey = fmt.Sprintf("%s/%v/%v", j.config.lockPrefix, info.Type, id)
val = fmt.Sprintf("已在%s@%s机器永远上锁成功", j.config.insideIp, j.config.outsideIp)
)
if j.config.lockType == lockTypeRedis {
return j.service.lockRedisClient.LockForever(redisKey, val)
}
return j.service.lockEtcdClient.LockForever(etcdKey, val)
}

@ -161,27 +161,27 @@ func (j *JobsGorm) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB {
// TaskIpInit 实例任务ip
func (j *JobsGorm) TaskIpInit(tx *gorm.DB, ips map[string]string) bool {
if j.config.OutsideIp == "" || j.config.OutsideIp == "0.0.0.0" {
if j.config.outsideIp == "" || j.config.outsideIp == "0.0.0.0" {
return false
}
tx.Where("ips = ?", j.config.OutsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除
tx.Where("ips = ?", j.config.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除
for k, v := range ips {
if v == "" {
j.TaskIpUpdate(tx, k, j.config.OutsideIp)
j.TaskIpUpdate(tx, k, j.config.outsideIp)
} else {
find := strings.Contains(v, ",")
if find == true {
// 包含
parts := strings.Split(v, ",")
for _, vv := range parts {
if vv == j.config.OutsideIp {
j.TaskIpUpdate(tx, k, j.config.OutsideIp)
if vv == j.config.outsideIp {
j.TaskIpUpdate(tx, k, j.config.outsideIp)
}
}
} else {
// 不包含
if v == j.config.OutsideIp {
j.TaskIpUpdate(tx, k, j.config.OutsideIp)
if v == j.config.outsideIp {
j.TaskIpUpdate(tx, k, j.config.outsideIp)
}
}
}

@ -0,0 +1,249 @@
package gojobs
import (
"errors"
"fmt"
"go.dtapp.net/library/utils/gojobs/jobs_gorm_model"
"go.dtapp.net/library/utils/only"
"gorm.io/gorm"
"log"
)
// Run 运行
func (j *JobsGorm) Run(info jobs_gorm_model.Task, status int, desc string) {
// 请求函数记录
statusCreate := j.service.gormClient.Create(&jobs_gorm_model.TaskLog{
TaskId: info.Id,
StatusCode: status,
Desc: desc,
Version: j.config.runVersion,
})
if statusCreate.RowsAffected == 0 {
log.Println("statusCreate", statusCreate.Error)
}
if status == 0 {
statusEdit := j.EditTask(j.service.gormClient, info.Id).
Select("run_id").
Updates(jobs_gorm_model.Task{
RunId: only.GetUuId(),
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
return
}
// 任务
if status == CodeSuccess {
// 执行成功
statusEdit := j.EditTask(j.service.gormClient, info.Id).
Select("status_desc", "number", "run_id", "updated_ip", "result").
Updates(jobs_gorm_model.Task{
StatusDesc: "执行成功",
Number: info.Number + 1,
RunId: only.GetUuId(),
UpdatedIp: j.config.outsideIp,
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if status == CodeEnd {
// 执行成功、提前结束
statusEdit := j.EditTask(j.service.gormClient, info.Id).
Select("status", "status_desc", "number", "updated_ip", "result").
Updates(jobs_gorm_model.Task{
Status: TASK_SUCCESS,
StatusDesc: "结束执行",
Number: info.Number + 1,
UpdatedIp: j.config.outsideIp,
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if status == CodeError {
// 执行失败
statusEdit := j.EditTask(j.service.gormClient, info.Id).
Select("status_desc", "number", "run_id", "updated_ip", "result").
Updates(jobs_gorm_model.Task{
StatusDesc: "执行失败",
Number: info.Number + 1,
RunId: only.GetUuId(),
UpdatedIp: j.config.outsideIp,
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
if info.MaxNumber != 0 {
if info.Number+1 >= info.MaxNumber {
// 关闭执行
statusEdit := j.EditTask(j.service.gormClient, info.Id).
Select("status").
Updates(jobs_gorm_model.Task{
Status: TASK_TIMEOUT,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
}
}
}
// RunAddLog 任务执行日志
func (j *JobsGorm) RunAddLog(id uint, runId string) *gorm.DB {
return j.service.gormClient.Create(&jobs_gorm_model.TaskLogRun{
TaskId: id,
RunId: runId,
InsideIp: j.config.insideIp,
OutsideIp: j.config.outsideIp,
Os: j.config.os,
Arch: j.config.arch,
Gomaxprocs: j.config.maxProCs,
GoVersion: j.config.version,
MacAddrs: j.config.macAddrS,
})
}
// ConfigCreateInCustomId 创建正在运行任务
type ConfigCreateInCustomId struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomId 创建正在运行任务
func (j *JobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error {
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
RunId: only.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.config.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.config.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdOnly 创建正在运行唯一任务
type ConfigCreateInCustomIdOnly struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdOnly 创建正在运行唯一任务
func (j *JobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error {
query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type))
}
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
RunId: only.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.config.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.config.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdMaxNumber 创建正在运行任务并限制数量
type ConfigCreateInCustomIdMaxNumber struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
MaxNumber int64 // 最大次数
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdMaxNumber 创建正在运行任务并限制数量
func (j *JobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error {
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
MaxNumber: config.MaxNumber,
RunId: only.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.config.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.config.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}
// ConfigCreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
type ConfigCreateInCustomIdMaxNumberOnly struct {
Tx *gorm.DB // 驱动
Params string // 参数
Frequency int64 // 频率(秒单位)
MaxNumber int64 // 最大次数
CustomId string // 自定义编号
CustomSequence int64 // 自定义顺序
Type string // 类型
SpecifyIp string // 指定外网IP
}
// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
func (j *JobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error {
query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type))
}
createStatus := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
MaxNumber: config.MaxNumber,
RunId: only.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
CreatedIp: j.config.outsideIp,
SpecifyIp: config.SpecifyIp,
UpdatedIp: j.config.outsideIp,
})
if createStatus.RowsAffected == 0 {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error))
}
return nil
}

@ -1,25 +0,0 @@
package gojobs
//
//type JobsOption func(*JobsCron)
//
//// WithRedis 缓存服务驱动
//func WithRedis(db *goredis.Client) JobsOption {
// return func(opts *JobsCron) {
// opts.redis = db
// }
//}
//
//// WithGorm 数据库服务驱动
//func WithGorm(db *gorm.DB) JobsOption {
// return func(opts *JobsCron) {
// opts.db = db
// }
//}
//
//// WithMainService 是否主要服务(主要服务可删除过期服务)
//func WithMainService(status int) JobsOption {
// return func(opts *JobsCron) {
// opts.mainService = status
// }
//}

@ -0,0 +1,50 @@
package gojobs
import (
"github.com/go-redis/redis/v8"
"go.dtapp.net/library/utils/goip"
"go.etcd.io/etcd/client/v3"
"gorm.io/gorm"
)
const (
lockTypeRedis = "redis"
lockTypeEtcd = "etcd"
)
// OperationAttr 操作属性
type OperationAttr struct {
gormClient *gorm.DB // 数据库驱动
redisClient *redis.Client // 缓存数据库驱动
etcdClient *clientv3.Client // 分布式缓存驱动
lockPrefix string // 锁Key前缀
ipService *goip.Client // ip服务
lockType string // 锁驱动类型
}
// WithGormClient 设置数据库驱动
func WithGormClient(client *gorm.DB) *OperationAttr {
return &OperationAttr{gormClient: client}
}
// WithRedisClient 设置缓存数据库驱动
func WithRedisClient(redisClient *redis.Client) *OperationAttr {
return &OperationAttr{redisClient: redisClient, lockType: lockTypeRedis}
}
// WithEtcdClient 设置分布式缓存驱动
func WithEtcdClient(etcdClient *clientv3.Client) *OperationAttr {
return &OperationAttr{etcdClient: etcdClient, lockType: lockTypeEtcd}
}
// WithLockPrefix 设置锁Key前缀
// redisfmt.Sprintf("cron:lock:%v:%v", info.Type, id)
// etcdfmt.Sprintf("cron/lock/%v/%v", info.Type, id)
func WithLockPrefix(lockPrefix string) *OperationAttr {
return &OperationAttr{lockPrefix: lockPrefix}
}
// WithIpService 设置ip服务
func WithIpService(ipService *goip.Client) *OperationAttr {
return &OperationAttr{ipService: ipService}
}

@ -26,7 +26,7 @@ import (
// Constants and default values for the package bce
const (
SDK_VERSION = "0.9.125"
SDK_VERSION = "0.9.126"
URI_PREFIX = "/" // now support uri without prefix "v1" so just set root path
DEFAULT_DOMAIN = "baidubce.com"
DEFAULT_PROTOCOL = "http"

@ -60,7 +60,7 @@ github.com/aws/aws-sdk-go/service/sso
github.com/aws/aws-sdk-go/service/sso/ssoiface
github.com/aws/aws-sdk-go/service/sts
github.com/aws/aws-sdk-go/service/sts/stsiface
# github.com/baidubce/bce-sdk-go v0.9.125
# github.com/baidubce/bce-sdk-go v0.9.126
## explicit; go 1.11
github.com/baidubce/bce-sdk-go/auth
github.com/baidubce/bce-sdk-go/bce

Loading…
Cancel
Save