From 88978c9039d209240624adfd2f06d08c5b503af5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=85=89=E6=98=A5?= Date: Mon, 27 Jun 2022 21:36:53 +0800 Subject: [PATCH] - update jobs --- go.mod | 2 +- go.sum | 2 + utils/gojobs/jobs_gorm.go | 343 ++++-------------- utils/gojobs/jobs_gorm_etcd.go | 4 - utils/gojobs/jobs_gorm_get.go | 22 ++ utils/gojobs/jobs_gorm_ip.go | 8 +- utils/gojobs/jobs_gorm_lock.go | 69 +++- utils/gojobs/jobs_gorm_model.go | 14 +- utils/gojobs/jobs_gorm_run.go | 249 +++++++++++++ utils/gojobs/jobs_options.go | 25 -- utils/gojobs/operation_attr.go | 50 +++ .../baidubce/bce-sdk-go/bce/config.go | 2 +- vendor/modules.txt | 2 +- 13 files changed, 448 insertions(+), 344 deletions(-) create mode 100644 utils/gojobs/jobs_gorm_get.go create mode 100644 utils/gojobs/jobs_gorm_run.go delete mode 100644 utils/gojobs/jobs_options.go create mode 100644 utils/gojobs/operation_attr.go diff --git a/go.mod b/go.mod index 135d21f7..aa45ed9a 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/aliyun/aliyun-oss-go-sdk v2.2.4+incompatible github.com/allegro/bigcache/v3 v3.0.2 github.com/aws/aws-sdk-go v1.44.42 - github.com/baidubce/bce-sdk-go v0.9.125 + github.com/baidubce/bce-sdk-go v0.9.126 github.com/basgys/goxml2json v1.1.0 github.com/beego/beego/v2 v2.0.4 github.com/bradfitz/gomemcache v0.0.0-20220106215444-fb4bf637b56d diff --git a/go.sum b/go.sum index d104eaed..c02c18fb 100644 --- a/go.sum +++ b/go.sum @@ -44,6 +44,8 @@ github.com/aws/aws-sdk-go v1.44.42/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4 github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/baidubce/bce-sdk-go v0.9.125 h1:je4wqeQzch22S0islZZLKtmyzKld4KY6OVjuKOTwdOQ= github.com/baidubce/bce-sdk-go v0.9.125/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg= +github.com/baidubce/bce-sdk-go v0.9.126 h1:wrpb94AN8gLtuGr3wfEm0DObrNulX3W13MeG3lo9SRg= +github.com/baidubce/bce-sdk-go v0.9.126/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f h1:ZNv7On9kyUzm7fvRZumSyy/IUiSC7AzL0I1jKKtwooA= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= github.com/basgys/goxml2json v1.1.0 h1:4ln5i4rseYfXNd86lGEB+Vi652IsIXIvggKM/BhUKVw= diff --git a/utils/gojobs/jobs_gorm.go b/utils/gojobs/jobs_gorm.go index f7f0e0b3..e875375b 100644 --- a/utils/gojobs/jobs_gorm.go +++ b/utils/gojobs/jobs_gorm.go @@ -3,44 +3,65 @@ package gojobs import ( "errors" "fmt" + "github.com/go-redis/redis/v8" "go.dtapp.net/library" - "go.dtapp.net/library/utils/dorm" "go.dtapp.net/library/utils/goarray" "go.dtapp.net/library/utils/goip" "go.dtapp.net/library/utils/gojobs/jobs_gorm_model" - "go.dtapp.net/library/utils/only" + "go.dtapp.net/library/utils/golock" + "go.etcd.io/etcd/client/v3" "gorm.io/gorm" - "log" "runtime" ) -// ConfigJobsGorm 配置 -type ConfigJobsGorm struct { - runVersion string // 运行版本 - os string // 系统类型 - arch string // 系统架构 - maxProCs int // CPU核数 - version string // GO版本 - macAddrS string // Mac地址 - insideIp string // 内网ip - OutsideIp string // 外网ip - MainService int // 主要服务 - Db *gorm.DB // 数据库 - Redis *dorm.RedisClient // 缓存数据库服务 - Debug bool // 是否开启调测 -} - // JobsGorm Gorm数据库驱动 type JobsGorm struct { - db *gorm.DB // 数据库 - redis *dorm.RedisClient // 缓存数据库服务 - config *ConfigJobsGorm // 配置 + db struct { + gormClient *gorm.DB // 数据库驱动 + redisClient *redis.Client // 缓存数据库驱动 + etcdClient *clientv3.Client // 分布式缓存驱动 + } + service struct { + gormClient *gorm.DB // 数据库驱动 + lockRedisClient *golock.LockRedis // 缓存数据库驱动 + lockEtcdClient *golock.LockEtcd // 分布式缓存驱动 + } // 服务 + config struct { + lockPrefix string // 锁Key前缀 + lockType string // 锁驱动类型 + runVersion string // 运行版本 + os string // 系统类型 + arch string // 系统架构 + maxProCs int // CPU核数 + version string // GO版本 + macAddrS string // Mac地址 + insideIp string // 内网ip + outsideIp string // 外网ip + } // 配置 } // NewJobsGorm 初始化 -func NewJobsGorm(config *ConfigJobsGorm) (*JobsGorm, error) { - - c := &JobsGorm{config: config} +// WithGormClient +func NewJobsGorm(attrs ...*OperationAttr) (*JobsGorm, error) { + + c := &JobsGorm{} + for _, attr := range attrs { + if attr.gormClient != nil { + c.db.gormClient = attr.gormClient + c.service.gormClient = attr.gormClient + } + if attr.redisClient != nil { + c.config.lockType = attr.lockType + c.db.redisClient = attr.redisClient + } + if attr.etcdClient != nil { + c.config.lockType = attr.lockType + c.db.etcdClient = attr.etcdClient + } + if attr.lockPrefix != "" { + c.config.lockPrefix = attr.lockPrefix + } + } c.config.runVersion = go_library.Version() c.config.os = runtime.GOOS @@ -50,21 +71,30 @@ func NewJobsGorm(config *ConfigJobsGorm) (*JobsGorm, error) { c.config.macAddrS = goarray.TurnString(goip.GetMacAddr()) c.config.insideIp = goip.GetInsideIp() - if c.config.OutsideIp == "" { + if c.config.outsideIp == "" { return nil, errors.New("需要配置当前的IP") } - c.db = c.config.Db - if c.db == nil { + if c.db.gormClient == nil { return nil, errors.New("需要配置数据库驱动") } - c.redis = c.config.Redis - if c.redis == nil { + switch c.config.lockType { + case lockTypeRedis: + if c.db.redisClient == nil { + return nil, errors.New("需要配置缓存驱动") + } + c.service.lockRedisClient = golock.NewLockRedis(c.db.redisClient) + case lockTypeEtcd: + if c.db.etcdClient == nil { + return nil, errors.New("需要配置缓存驱动") + } + c.service.lockEtcdClient = golock.NewLockEtcd(c.db.etcdClient) + default: return nil, errors.New("需要配置缓存驱动") } - err := c.db.AutoMigrate( + err := c.service.gormClient.AutoMigrate( &jobs_gorm_model.Task{}, &jobs_gorm_model.TaskLog{}, &jobs_gorm_model.TaskLogRun{}, @@ -74,256 +104,5 @@ func NewJobsGorm(config *ConfigJobsGorm) (*JobsGorm, error) { return nil, errors.New(fmt.Sprintf("创建任务模型失败:%v\n", err)) } - if c.config.Debug == true { - log.Printf("jobs:%+v\n", c) - } - return c, nil } - -func (j *JobsGorm) GetDb() *gorm.DB { - return j.db -} - -func (j *JobsGorm) GetRedis() *dorm.RedisClient { - return j.redis -} - -// Run 运行 -func (j *JobsGorm) Run(info jobs_gorm_model.Task, status int, desc string) { - // 请求函数记录 - statusCreate := j.db.Create(&jobs_gorm_model.TaskLog{ - TaskId: info.Id, - StatusCode: status, - Desc: desc, - Version: j.config.runVersion, - }) - if statusCreate.RowsAffected == 0 { - log.Println("statusCreate", statusCreate.Error) - } - if status == 0 { - statusEdit := j.EditTask(j.db, info.Id). - Select("run_id"). - Updates(jobs_gorm_model.Task{ - RunId: only.GetUuId(), - }) - if statusEdit.RowsAffected == 0 { - log.Println("statusEdit", statusEdit.Error) - } - return - } - // 任务 - if status == CodeSuccess { - // 执行成功 - statusEdit := j.EditTask(j.db, info.Id). - Select("status_desc", "number", "run_id", "updated_ip", "result"). - Updates(jobs_gorm_model.Task{ - StatusDesc: "执行成功", - Number: info.Number + 1, - RunId: only.GetUuId(), - UpdatedIp: j.config.OutsideIp, - Result: desc, - }) - if statusEdit.RowsAffected == 0 { - log.Println("statusEdit", statusEdit.Error) - } - } - if status == CodeEnd { - // 执行成功、提前结束 - statusEdit := j.EditTask(j.db, info.Id). - Select("status", "status_desc", "number", "updated_ip", "result"). - Updates(jobs_gorm_model.Task{ - Status: TASK_SUCCESS, - StatusDesc: "结束执行", - Number: info.Number + 1, - UpdatedIp: j.config.OutsideIp, - Result: desc, - }) - if statusEdit.RowsAffected == 0 { - log.Println("statusEdit", statusEdit.Error) - } - } - if status == CodeError { - // 执行失败 - statusEdit := j.EditTask(j.db, info.Id). - Select("status_desc", "number", "run_id", "updated_ip", "result"). - Updates(jobs_gorm_model.Task{ - StatusDesc: "执行失败", - Number: info.Number + 1, - RunId: only.GetUuId(), - UpdatedIp: j.config.OutsideIp, - Result: desc, - }) - if statusEdit.RowsAffected == 0 { - log.Println("statusEdit", statusEdit.Error) - } - } - if info.MaxNumber != 0 { - if info.Number+1 >= info.MaxNumber { - // 关闭执行 - statusEdit := j.EditTask(j.db, info.Id). - Select("status"). - Updates(jobs_gorm_model.Task{ - Status: TASK_TIMEOUT, - }) - if statusEdit.RowsAffected == 0 { - log.Println("statusEdit", statusEdit.Error) - } - } - } -} - -// RunAddLog 任务执行日志 -func (j *JobsGorm) RunAddLog(id uint, runId string) *gorm.DB { - return j.db.Create(&jobs_gorm_model.TaskLogRun{ - TaskId: id, - RunId: runId, - InsideIp: j.config.insideIp, - OutsideIp: j.config.OutsideIp, - Os: j.config.os, - Arch: j.config.arch, - Gomaxprocs: j.config.maxProCs, - GoVersion: j.config.version, - MacAddrs: j.config.macAddrS, - }) -} - -// ConfigCreateInCustomId 创建正在运行任务 -type ConfigCreateInCustomId struct { - Tx *gorm.DB // 驱动 - Params string // 参数 - Frequency int64 // 频率(秒单位) - CustomId string // 自定义编号 - CustomSequence int64 // 自定义顺序 - Type string // 类型 - SpecifyIp string // 指定外网IP -} - -// CreateInCustomId 创建正在运行任务 -func (j *JobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error { - createStatus := config.Tx.Create(&jobs_gorm_model.Task{ - Status: TASK_IN, - Params: config.Params, - StatusDesc: "首次添加任务", - Frequency: config.Frequency, - RunId: only.GetUuId(), - CustomId: config.CustomId, - CustomSequence: config.CustomSequence, - Type: config.Type, - CreatedIp: j.config.OutsideIp, - SpecifyIp: config.SpecifyIp, - UpdatedIp: j.config.OutsideIp, - }) - if createStatus.RowsAffected == 0 { - return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) - } - return nil -} - -// ConfigCreateInCustomIdOnly 创建正在运行唯一任务 -type ConfigCreateInCustomIdOnly struct { - Tx *gorm.DB // 驱动 - Params string // 参数 - Frequency int64 // 频率(秒单位) - CustomId string // 自定义编号 - CustomSequence int64 // 自定义顺序 - Type string // 类型 - SpecifyIp string // 指定外网IP -} - -// CreateInCustomIdOnly 创建正在运行唯一任务 -func (j *JobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error { - query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) - if query.Id != 0 { - return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type)) - } - createStatus := config.Tx.Create(&jobs_gorm_model.Task{ - Status: TASK_IN, - Params: config.Params, - StatusDesc: "首次添加任务", - Frequency: config.Frequency, - RunId: only.GetUuId(), - CustomId: config.CustomId, - CustomSequence: config.CustomSequence, - Type: config.Type, - CreatedIp: j.config.OutsideIp, - SpecifyIp: config.SpecifyIp, - UpdatedIp: j.config.OutsideIp, - }) - if createStatus.RowsAffected == 0 { - return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) - } - return nil -} - -// ConfigCreateInCustomIdMaxNumber 创建正在运行任务并限制数量 -type ConfigCreateInCustomIdMaxNumber struct { - Tx *gorm.DB // 驱动 - Params string // 参数 - Frequency int64 // 频率(秒单位) - MaxNumber int64 // 最大次数 - CustomId string // 自定义编号 - CustomSequence int64 // 自定义顺序 - Type string // 类型 - SpecifyIp string // 指定外网IP -} - -// CreateInCustomIdMaxNumber 创建正在运行任务并限制数量 -func (j *JobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error { - createStatus := config.Tx.Create(&jobs_gorm_model.Task{ - Status: TASK_IN, - Params: config.Params, - StatusDesc: "首次添加任务", - Frequency: config.Frequency, - MaxNumber: config.MaxNumber, - RunId: only.GetUuId(), - CustomId: config.CustomId, - CustomSequence: config.CustomSequence, - Type: config.Type, - CreatedIp: j.config.OutsideIp, - SpecifyIp: config.SpecifyIp, - UpdatedIp: j.config.OutsideIp, - }) - if createStatus.RowsAffected == 0 { - return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) - } - return nil -} - -// ConfigCreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 -type ConfigCreateInCustomIdMaxNumberOnly struct { - Tx *gorm.DB // 驱动 - Params string // 参数 - Frequency int64 // 频率(秒单位) - MaxNumber int64 // 最大次数 - CustomId string // 自定义编号 - CustomSequence int64 // 自定义顺序 - Type string // 类型 - SpecifyIp string // 指定外网IP -} - -// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 -func (j *JobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error { - query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) - if query.Id != 0 { - return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type)) - } - createStatus := config.Tx.Create(&jobs_gorm_model.Task{ - Status: TASK_IN, - Params: config.Params, - StatusDesc: "首次添加任务", - Frequency: config.Frequency, - MaxNumber: config.MaxNumber, - RunId: only.GetUuId(), - CustomId: config.CustomId, - CustomSequence: config.CustomSequence, - Type: config.Type, - CreatedIp: j.config.OutsideIp, - SpecifyIp: config.SpecifyIp, - UpdatedIp: j.config.OutsideIp, - }) - if createStatus.RowsAffected == 0 { - return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) - } - return nil -} diff --git a/utils/gojobs/jobs_gorm_etcd.go b/utils/gojobs/jobs_gorm_etcd.go index a0e4ac3c..9724559b 100644 --- a/utils/gojobs/jobs_gorm_etcd.go +++ b/utils/gojobs/jobs_gorm_etcd.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "go.dtapp.net/library/utils/gojobs/jobs_gorm_model" - "log" "math/rand" "time" ) @@ -21,9 +20,6 @@ func (j *JobsGorm) GetEtcdIssueAddress(server *Etcd, v jobs_gorm_model.Task) (ad appointIpStatus = true } workers, err := server.ListWorkers() - if j.config.Debug == true { - log.Printf("客户端列表:%+v %v\n", workers, len(workers)) - } if err != nil { return address, errors.New(fmt.Sprintf("获取在线客户端列表失败:%s", err.Error())) } diff --git a/utils/gojobs/jobs_gorm_get.go b/utils/gojobs/jobs_gorm_get.go new file mode 100644 index 00000000..db4c67aa --- /dev/null +++ b/utils/gojobs/jobs_gorm_get.go @@ -0,0 +1,22 @@ +package gojobs + +import ( + "github.com/go-redis/redis/v8" + "go.etcd.io/etcd/client/v3" + "gorm.io/gorm" +) + +// GetDb 数据库驱动 +func (j *JobsGorm) GetDb() *gorm.DB { + return j.service.gormClient +} + +// GetRedis 缓存数据库驱动 +func (j *JobsGorm) GetRedis() *redis.Client { + return j.db.redisClient +} + +// GetEtcd 分布式缓存驱动 +func (j *JobsGorm) GetEtcd() *clientv3.Client { + return j.db.etcdClient +} diff --git a/utils/gojobs/jobs_gorm_ip.go b/utils/gojobs/jobs_gorm_ip.go index ec5ce96d..2265fc8a 100644 --- a/utils/gojobs/jobs_gorm_ip.go +++ b/utils/gojobs/jobs_gorm_ip.go @@ -9,12 +9,12 @@ import ( // RefreshIp 刷新Ip func (j *JobsGorm) RefreshIp(tx *gorm.DB) { xip := goip.GetOutsideIp() - if j.config.OutsideIp == "" || j.config.OutsideIp == "0.0.0.0" { + if j.config.outsideIp == "" || j.config.outsideIp == "0.0.0.0" { return } - if j.config.OutsideIp == xip { + if j.config.outsideIp == xip { return } - tx.Where("ips = ?", j.config.OutsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除 - j.config.OutsideIp = xip + tx.Where("ips = ?", j.config.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除 + j.config.outsideIp = xip } diff --git a/utils/gojobs/jobs_gorm_lock.go b/utils/gojobs/jobs_gorm_lock.go index 38f41728..551923dc 100644 --- a/utils/gojobs/jobs_gorm_lock.go +++ b/utils/gojobs/jobs_gorm_lock.go @@ -1,36 +1,67 @@ package gojobs import ( + "errors" "fmt" - "go.dtapp.net/library/utils/dorm" "go.dtapp.net/library/utils/gojobs/jobs_gorm_model" - "time" ) // Lock 上锁 -func (j *JobsGorm) Lock(info jobs_gorm_model.Task, id any) string { - cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id) - judgeCache := j.redis.NewStringOperation().Get(cacheName).UnwrapOr("") - if judgeCache != "" { - return judgeCache +func (j *JobsGorm) Lock(info jobs_gorm_model.Task, id any) (string, error) { + + if j.config.lockType == "" { + return "", errors.New("没有配置") + } + + var ( + redisKey = fmt.Sprintf("%s:%v:%v", j.config.lockPrefix, info.Type, id) + etcdKey = fmt.Sprintf("%s/%v/%v", j.config.lockPrefix, info.Type, id) + val = fmt.Sprintf("已在%s@%s机器上锁成功", j.config.insideIp, j.config.outsideIp) + ttl = info.Frequency * 3 + ) + + if j.config.lockType == lockTypeRedis { + return j.service.lockRedisClient.Lock(redisKey, val, ttl) } - j.redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器上锁成功", j.config.OutsideIp), dorm.WithExpire(time.Millisecond*time.Duration(info.Frequency)*3)) - return "" + + return j.service.lockEtcdClient.Lock(etcdKey, val, ttl) } // Unlock Lock 解锁 -func (j *JobsGorm) Unlock(info jobs_gorm_model.Task, id any) { - cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id) - j.redis.NewStringOperation().Del(cacheName) +func (j *JobsGorm) Unlock(info jobs_gorm_model.Task, id any) error { + + if j.config.lockType == "" { + return errors.New("没有配置") + } + + var ( + redisKey = fmt.Sprintf("%s:%v:%v", j.config.lockPrefix, info.Type, id) + etcdKey = fmt.Sprintf("%s/%v/%v", j.config.lockPrefix, info.Type, id) + ) + + if j.config.lockType == lockTypeRedis { + return j.service.lockRedisClient.Unlock(redisKey) + } + + return j.service.lockEtcdClient.Unlock(etcdKey) } // LockForever 永远上锁 -func (j *JobsGorm) LockForever(info jobs_gorm_model.Task, id any) string { - cacheName := fmt.Sprintf("cron:%v:%v", info.Type, id) - judgeCache := j.redis.NewStringOperation().Get(cacheName).UnwrapOr("") - if judgeCache != "" { - return judgeCache +func (j *JobsGorm) LockForever(info jobs_gorm_model.Task, id any) (string, error) { + + if j.config.lockType == "" { + return "", errors.New("没有配置") } - j.redis.NewStringOperation().Set(cacheName, fmt.Sprintf("已在%v机器永远上锁成功", j.config.OutsideIp)) - return "" + + var ( + redisKey = fmt.Sprintf("%s:%v:%v", j.config.lockPrefix, info.Type, id) + etcdKey = fmt.Sprintf("%s/%v/%v", j.config.lockPrefix, info.Type, id) + val = fmt.Sprintf("已在%s@%s机器永远上锁成功", j.config.insideIp, j.config.outsideIp) + ) + + if j.config.lockType == lockTypeRedis { + return j.service.lockRedisClient.LockForever(redisKey, val) + } + + return j.service.lockEtcdClient.LockForever(etcdKey, val) } diff --git a/utils/gojobs/jobs_gorm_model.go b/utils/gojobs/jobs_gorm_model.go index 95f9f55d..5760b2ec 100644 --- a/utils/gojobs/jobs_gorm_model.go +++ b/utils/gojobs/jobs_gorm_model.go @@ -161,27 +161,27 @@ func (j *JobsGorm) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB { // TaskIpInit 实例任务ip func (j *JobsGorm) TaskIpInit(tx *gorm.DB, ips map[string]string) bool { - if j.config.OutsideIp == "" || j.config.OutsideIp == "0.0.0.0" { + if j.config.outsideIp == "" || j.config.outsideIp == "0.0.0.0" { return false } - tx.Where("ips = ?", j.config.OutsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除 + tx.Where("ips = ?", j.config.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除 for k, v := range ips { if v == "" { - j.TaskIpUpdate(tx, k, j.config.OutsideIp) + j.TaskIpUpdate(tx, k, j.config.outsideIp) } else { find := strings.Contains(v, ",") if find == true { // 包含 parts := strings.Split(v, ",") for _, vv := range parts { - if vv == j.config.OutsideIp { - j.TaskIpUpdate(tx, k, j.config.OutsideIp) + if vv == j.config.outsideIp { + j.TaskIpUpdate(tx, k, j.config.outsideIp) } } } else { // 不包含 - if v == j.config.OutsideIp { - j.TaskIpUpdate(tx, k, j.config.OutsideIp) + if v == j.config.outsideIp { + j.TaskIpUpdate(tx, k, j.config.outsideIp) } } } diff --git a/utils/gojobs/jobs_gorm_run.go b/utils/gojobs/jobs_gorm_run.go new file mode 100644 index 00000000..3cd2a180 --- /dev/null +++ b/utils/gojobs/jobs_gorm_run.go @@ -0,0 +1,249 @@ +package gojobs + +import ( + "errors" + "fmt" + "go.dtapp.net/library/utils/gojobs/jobs_gorm_model" + "go.dtapp.net/library/utils/only" + "gorm.io/gorm" + "log" +) + +// Run 运行 +func (j *JobsGorm) Run(info jobs_gorm_model.Task, status int, desc string) { + // 请求函数记录 + statusCreate := j.service.gormClient.Create(&jobs_gorm_model.TaskLog{ + TaskId: info.Id, + StatusCode: status, + Desc: desc, + Version: j.config.runVersion, + }) + if statusCreate.RowsAffected == 0 { + log.Println("statusCreate", statusCreate.Error) + } + if status == 0 { + statusEdit := j.EditTask(j.service.gormClient, info.Id). + Select("run_id"). + Updates(jobs_gorm_model.Task{ + RunId: only.GetUuId(), + }) + if statusEdit.RowsAffected == 0 { + log.Println("statusEdit", statusEdit.Error) + } + return + } + // 任务 + if status == CodeSuccess { + // 执行成功 + statusEdit := j.EditTask(j.service.gormClient, info.Id). + Select("status_desc", "number", "run_id", "updated_ip", "result"). + Updates(jobs_gorm_model.Task{ + StatusDesc: "执行成功", + Number: info.Number + 1, + RunId: only.GetUuId(), + UpdatedIp: j.config.outsideIp, + Result: desc, + }) + if statusEdit.RowsAffected == 0 { + log.Println("statusEdit", statusEdit.Error) + } + } + if status == CodeEnd { + // 执行成功、提前结束 + statusEdit := j.EditTask(j.service.gormClient, info.Id). + Select("status", "status_desc", "number", "updated_ip", "result"). + Updates(jobs_gorm_model.Task{ + Status: TASK_SUCCESS, + StatusDesc: "结束执行", + Number: info.Number + 1, + UpdatedIp: j.config.outsideIp, + Result: desc, + }) + if statusEdit.RowsAffected == 0 { + log.Println("statusEdit", statusEdit.Error) + } + } + if status == CodeError { + // 执行失败 + statusEdit := j.EditTask(j.service.gormClient, info.Id). + Select("status_desc", "number", "run_id", "updated_ip", "result"). + Updates(jobs_gorm_model.Task{ + StatusDesc: "执行失败", + Number: info.Number + 1, + RunId: only.GetUuId(), + UpdatedIp: j.config.outsideIp, + Result: desc, + }) + if statusEdit.RowsAffected == 0 { + log.Println("statusEdit", statusEdit.Error) + } + } + if info.MaxNumber != 0 { + if info.Number+1 >= info.MaxNumber { + // 关闭执行 + statusEdit := j.EditTask(j.service.gormClient, info.Id). + Select("status"). + Updates(jobs_gorm_model.Task{ + Status: TASK_TIMEOUT, + }) + if statusEdit.RowsAffected == 0 { + log.Println("statusEdit", statusEdit.Error) + } + } + } +} + +// RunAddLog 任务执行日志 +func (j *JobsGorm) RunAddLog(id uint, runId string) *gorm.DB { + return j.service.gormClient.Create(&jobs_gorm_model.TaskLogRun{ + TaskId: id, + RunId: runId, + InsideIp: j.config.insideIp, + OutsideIp: j.config.outsideIp, + Os: j.config.os, + Arch: j.config.arch, + Gomaxprocs: j.config.maxProCs, + GoVersion: j.config.version, + MacAddrs: j.config.macAddrS, + }) +} + +// ConfigCreateInCustomId 创建正在运行任务 +type ConfigCreateInCustomId struct { + Tx *gorm.DB // 驱动 + Params string // 参数 + Frequency int64 // 频率(秒单位) + CustomId string // 自定义编号 + CustomSequence int64 // 自定义顺序 + Type string // 类型 + SpecifyIp string // 指定外网IP +} + +// CreateInCustomId 创建正在运行任务 +func (j *JobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error { + createStatus := config.Tx.Create(&jobs_gorm_model.Task{ + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + RunId: only.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + CreatedIp: j.config.outsideIp, + SpecifyIp: config.SpecifyIp, + UpdatedIp: j.config.outsideIp, + }) + if createStatus.RowsAffected == 0 { + return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) + } + return nil +} + +// ConfigCreateInCustomIdOnly 创建正在运行唯一任务 +type ConfigCreateInCustomIdOnly struct { + Tx *gorm.DB // 驱动 + Params string // 参数 + Frequency int64 // 频率(秒单位) + CustomId string // 自定义编号 + CustomSequence int64 // 自定义顺序 + Type string // 类型 + SpecifyIp string // 指定外网IP +} + +// CreateInCustomIdOnly 创建正在运行唯一任务 +func (j *JobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error { + query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) + if query.Id != 0 { + return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type)) + } + createStatus := config.Tx.Create(&jobs_gorm_model.Task{ + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + RunId: only.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + CreatedIp: j.config.outsideIp, + SpecifyIp: config.SpecifyIp, + UpdatedIp: j.config.outsideIp, + }) + if createStatus.RowsAffected == 0 { + return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) + } + return nil +} + +// ConfigCreateInCustomIdMaxNumber 创建正在运行任务并限制数量 +type ConfigCreateInCustomIdMaxNumber struct { + Tx *gorm.DB // 驱动 + Params string // 参数 + Frequency int64 // 频率(秒单位) + MaxNumber int64 // 最大次数 + CustomId string // 自定义编号 + CustomSequence int64 // 自定义顺序 + Type string // 类型 + SpecifyIp string // 指定外网IP +} + +// CreateInCustomIdMaxNumber 创建正在运行任务并限制数量 +func (j *JobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error { + createStatus := config.Tx.Create(&jobs_gorm_model.Task{ + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + MaxNumber: config.MaxNumber, + RunId: only.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + CreatedIp: j.config.outsideIp, + SpecifyIp: config.SpecifyIp, + UpdatedIp: j.config.outsideIp, + }) + if createStatus.RowsAffected == 0 { + return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) + } + return nil +} + +// ConfigCreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 +type ConfigCreateInCustomIdMaxNumberOnly struct { + Tx *gorm.DB // 驱动 + Params string // 参数 + Frequency int64 // 频率(秒单位) + MaxNumber int64 // 最大次数 + CustomId string // 自定义编号 + CustomSequence int64 // 自定义顺序 + Type string // 类型 + SpecifyIp string // 指定外网IP +} + +// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 +func (j *JobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error { + query := j.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) + if query.Id != 0 { + return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type)) + } + createStatus := config.Tx.Create(&jobs_gorm_model.Task{ + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + MaxNumber: config.MaxNumber, + RunId: only.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + CreatedIp: j.config.outsideIp, + SpecifyIp: config.SpecifyIp, + UpdatedIp: j.config.outsideIp, + }) + if createStatus.RowsAffected == 0 { + return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, createStatus.Error)) + } + return nil +} diff --git a/utils/gojobs/jobs_options.go b/utils/gojobs/jobs_options.go deleted file mode 100644 index cc2731f1..00000000 --- a/utils/gojobs/jobs_options.go +++ /dev/null @@ -1,25 +0,0 @@ -package gojobs - -// -//type JobsOption func(*JobsCron) -// -//// WithRedis 缓存服务驱动 -//func WithRedis(db *goredis.Client) JobsOption { -// return func(opts *JobsCron) { -// opts.redis = db -// } -//} -// -//// WithGorm 数据库服务驱动 -//func WithGorm(db *gorm.DB) JobsOption { -// return func(opts *JobsCron) { -// opts.db = db -// } -//} -// -//// WithMainService 是否主要服务(主要服务可删除过期服务) -//func WithMainService(status int) JobsOption { -// return func(opts *JobsCron) { -// opts.mainService = status -// } -//} diff --git a/utils/gojobs/operation_attr.go b/utils/gojobs/operation_attr.go new file mode 100644 index 00000000..62901b38 --- /dev/null +++ b/utils/gojobs/operation_attr.go @@ -0,0 +1,50 @@ +package gojobs + +import ( + "github.com/go-redis/redis/v8" + "go.dtapp.net/library/utils/goip" + "go.etcd.io/etcd/client/v3" + "gorm.io/gorm" +) + +const ( + lockTypeRedis = "redis" + lockTypeEtcd = "etcd" +) + +// OperationAttr 操作属性 +type OperationAttr struct { + gormClient *gorm.DB // 数据库驱动 + redisClient *redis.Client // 缓存数据库驱动 + etcdClient *clientv3.Client // 分布式缓存驱动 + lockPrefix string // 锁Key前缀 + ipService *goip.Client // ip服务 + lockType string // 锁驱动类型 +} + +// WithGormClient 设置数据库驱动 +func WithGormClient(client *gorm.DB) *OperationAttr { + return &OperationAttr{gormClient: client} +} + +// WithRedisClient 设置缓存数据库驱动 +func WithRedisClient(redisClient *redis.Client) *OperationAttr { + return &OperationAttr{redisClient: redisClient, lockType: lockTypeRedis} +} + +// WithEtcdClient 设置分布式缓存驱动 +func WithEtcdClient(etcdClient *clientv3.Client) *OperationAttr { + return &OperationAttr{etcdClient: etcdClient, lockType: lockTypeEtcd} +} + +// WithLockPrefix 设置锁Key前缀 +// redis:fmt.Sprintf("cron:lock:%v:%v", info.Type, id) +// etcd:fmt.Sprintf("cron/lock/%v/%v", info.Type, id) +func WithLockPrefix(lockPrefix string) *OperationAttr { + return &OperationAttr{lockPrefix: lockPrefix} +} + +// WithIpService 设置ip服务 +func WithIpService(ipService *goip.Client) *OperationAttr { + return &OperationAttr{ipService: ipService} +} diff --git a/vendor/github.com/baidubce/bce-sdk-go/bce/config.go b/vendor/github.com/baidubce/bce-sdk-go/bce/config.go index dd9d2fa8..07f0bc4b 100644 --- a/vendor/github.com/baidubce/bce-sdk-go/bce/config.go +++ b/vendor/github.com/baidubce/bce-sdk-go/bce/config.go @@ -26,7 +26,7 @@ import ( // Constants and default values for the package bce const ( - SDK_VERSION = "0.9.125" + SDK_VERSION = "0.9.126" URI_PREFIX = "/" // now support uri without prefix "v1" so just set root path DEFAULT_DOMAIN = "baidubce.com" DEFAULT_PROTOCOL = "http" diff --git a/vendor/modules.txt b/vendor/modules.txt index 9e3571dd..0c01c680 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -60,7 +60,7 @@ github.com/aws/aws-sdk-go/service/sso github.com/aws/aws-sdk-go/service/sso/ssoiface github.com/aws/aws-sdk-go/service/sts github.com/aws/aws-sdk-go/service/sts/stsiface -# github.com/baidubce/bce-sdk-go v0.9.125 +# github.com/baidubce/bce-sdk-go v0.9.126 ## explicit; go 1.11 github.com/baidubce/bce-sdk-go/auth github.com/baidubce/bce-sdk-go/bce