From 8ace334264ee060dede2a64f6e7169659f0ade84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=85=89=E6=98=A5?= Date: Sat, 11 Jun 2022 10:54:28 +0800 Subject: [PATCH] - update jobs --- jobs/check.go | 5 +- jobs/ip.go | 6 +- jobs/jobs.go | 153 ++++++++++++++++++++++++------------------------ jobs/model.go | 45 +++++++------- jobs/version.go | 3 + 5 files changed, 111 insertions(+), 101 deletions(-) create mode 100644 jobs/version.go diff --git a/jobs/check.go b/jobs/check.go index 223498a4..5547bc6e 100644 --- a/jobs/check.go +++ b/jobs/check.go @@ -2,17 +2,18 @@ package jobs import ( "go.dtapp.net/library/gotime" + "gorm.io/gorm" "log" ) // Check 任务检查 -func (app *App) Check(vs []Task) { +func (app *App) Check(tx *gorm.DB, vs []Task) { if app.MainService > 0 && len(vs) > 0 { for _, v := range vs { diffInSecondWithAbs := gotime.Current().DiffInSecondWithAbs(gotime.SetCurrentParse(v.UpdatedAt).Time) if diffInSecondWithAbs >= v.Frequency*3 { log.Printf("每隔%v秒任务:%v相差%v秒\n", v.Frequency, v.Id, diffInSecondWithAbs) - app.Db.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&TaskLogRun{}) // 删除 + tx.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&TaskLogRun{}) // 删除 } } } diff --git a/jobs/ip.go b/jobs/ip.go index 305c76a7..0acfc05f 100644 --- a/jobs/ip.go +++ b/jobs/ip.go @@ -2,9 +2,11 @@ package jobs import ( "go.dtapp.net/library/goip" + "gorm.io/gorm" ) -func (app *App) RefreshIp() { +// RefreshIp 刷新Ip +func (app *App) RefreshIp(tx *gorm.DB) { xip := goip.GetOutsideIp() if app.OutsideIp == "" || app.OutsideIp == "0.0.0.0" { return @@ -12,6 +14,6 @@ func (app *App) RefreshIp() { if app.OutsideIp == xip { return } - app.Db.Where("ips = ?", app.OutsideIp).Delete(&TaskIp{}) // 删除 + tx.Where("ips = ?", app.OutsideIp).Delete(&TaskIp{}) // 删除 app.OutsideIp = xip } diff --git a/jobs/jobs.go b/jobs/jobs.go index 4d0ec34f..5b7b8ec4 100644 --- a/jobs/jobs.go +++ b/jobs/jobs.go @@ -24,8 +24,8 @@ type App struct { } // Add 添加任务 -func (app *App) Add(Type string, params interface{}, frequency int64) int64 { - return app.Db.Create(&Task{ +func (app *App) Add(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB { + return tx.Create(&Task{ Status: TASK_IN, Params: gojson.JsonEncodeNoError(params), StatusDesc: "首次添加任务", @@ -36,16 +36,16 @@ func (app *App) Add(Type string, params interface{}, frequency int64) int64 { UpdatedIp: app.OutsideIp, CreatedAt: gotime.Current().Format(), UpdatedAt: gotime.Current().Format(), - }).RowsAffected + }) } // AddCustomId 添加任务 -func (app *App) AddCustomId(Type string, params interface{}, frequency int64, customId string) int64 { - query := app.TaskCustomIdTake(Type, customId) +func (app *App) AddCustomId(tx *gorm.DB, Type string, params interface{}, frequency int64, customId string) *gorm.DB { + query := app.TaskCustomIdTake(tx, Type, customId) if query.Id != 0 { - return 0 + return tx } - return app.Db.Create(&Task{ + return tx.Create(&Task{ Status: TASK_IN, Params: gojson.JsonEncodeNoError(params), StatusDesc: "首次添加任务", @@ -57,16 +57,16 @@ func (app *App) AddCustomId(Type string, params interface{}, frequency int64, cu UpdatedIp: app.OutsideIp, CreatedAt: gotime.Current().Format(), UpdatedAt: gotime.Current().Format(), - }).RowsAffected + }) } // AddCustomIdMaxNumber 添加任务并设置最大数量 -func (app *App) AddCustomIdMaxNumber(Type string, params interface{}, frequency int64, customId string, maxNumber int64) int64 { - query := app.TaskCustomIdTakeStatus(Type, customId, TASK_IN) +func (app *App) AddCustomIdMaxNumber(tx *gorm.DB, Type string, params interface{}, frequency int64, customId string, maxNumber int64) *gorm.DB { + query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN) if query.Id != 0 { - return 0 + return tx } - return app.Db.Create(&Task{ + return tx.Create(&Task{ Status: TASK_IN, Params: gojson.JsonEncodeNoError(params), StatusDesc: "首次添加任务", @@ -79,39 +79,39 @@ func (app *App) AddCustomIdMaxNumber(Type string, params interface{}, frequency UpdatedIp: app.OutsideIp, CreatedAt: gotime.Current().Format(), UpdatedAt: gotime.Current().Format(), - }).RowsAffected + }) } type TaskParams = Task // AddInOrder 添加订单可执行任务 -func (app *App) AddInOrder(Type string, params interface{}, frequency int64) int64 { +func (app *App) AddInOrder(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB { var param TaskParams param.Type = Type param.Frequency = frequency param.ParamsType = ParamsOrderType - return app.AddIn(param, params) + return app.AddIn(tx, param, params) } // AddInOrderCustomId 添加订单可执行任务 -func (app *App) AddInOrderCustomId(Type string, params interface{}, frequency int64, customId string) int64 { - query := app.TaskCustomIdTakeStatus(Type, customId, TASK_IN) +func (app *App) AddInOrderCustomId(tx *gorm.DB, Type string, params interface{}, frequency int64, customId string) *gorm.DB { + query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN) if query.Id != 0 { - return 0 + return tx } var param TaskParams param.Type = Type param.Frequency = frequency param.CustomId = customId param.ParamsType = ParamsOrderType - return app.AddIn(param, params) + return app.AddIn(tx, param, params) } // AddInOrderCustomIdSpecifyIp 添加订单可执行任务 -func (app *App) AddInOrderCustomIdSpecifyIp(Type string, params interface{}, frequency int64, customId, specifyIp string) int64 { - query := app.TaskCustomIdTakeStatus(Type, customId, TASK_IN) +func (app *App) AddInOrderCustomIdSpecifyIp(tx *gorm.DB, Type string, params interface{}, frequency int64, customId, specifyIp string) *gorm.DB { + query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN) if query.Id != 0 { - return 0 + return tx } var param TaskParams param.Type = Type @@ -119,49 +119,49 @@ func (app *App) AddInOrderCustomIdSpecifyIp(Type string, params interface{}, fre param.CustomId = customId param.SpecifyIp = specifyIp param.ParamsType = ParamsOrderType - return app.AddIn(param, params) + return app.AddIn(tx, param, params) } // AddInMerchantGoldenBean 添加商家金豆可执行任务 -func (app *App) AddInMerchantGoldenBean(Type string, params interface{}, frequency int64) int64 { +func (app *App) AddInMerchantGoldenBean(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB { var param TaskParams param.Type = Type param.Frequency = frequency param.ParamsType = ParamsMerchantGoldenBeanType - return app.AddIn(param, params) + return app.AddIn(tx, param, params) } // AddInTeamInv 添加团队邀请可执行任务 -func (app *App) AddInTeamInv(Type string, params interface{}, frequency int64) int64 { +func (app *App) AddInTeamInv(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB { var param TaskParams param.Type = Type param.Frequency = frequency param.ParamsType = ParamsTeamInvType - return app.AddIn(param, params) + return app.AddIn(tx, param, params) } // AddInUserShareInvitation 邀请好友 -func (app *App) AddInUserShareInvitation(Type string, params interface{}, frequency int64) int64 { +func (app *App) AddInUserShareInvitation(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB { var param TaskParams param.Type = Type param.Frequency = frequency - return app.AddIn(param, params) + return app.AddIn(tx, param, params) } // AddInNewService 添加企业自定义可执行任务 -func (app *App) AddInNewService(Type string, params interface{}, frequency int64) int64 { +func (app *App) AddInNewService(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB { var param TaskParams param.Type = Type param.Frequency = frequency param.ParamsType = ParamsNewServiceType - return app.AddIn(param, params) + return app.AddIn(tx, param, params) } // AddInOrderCustomIdObservation 添加观察接口任务 -func (app *App) AddInOrderCustomIdObservation(Type string, customId string) int64 { - query := app.TaskCustomIdTakeStatus(Type, customId, TASK_IN) +func (app *App) AddInOrderCustomIdObservation(tx *gorm.DB, Type string, customId string) *gorm.DB { + query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN) if query.Id != 0 { - return int64(query.Id) + return tx } var param TaskParams param.Type = Type @@ -169,24 +169,25 @@ func (app *App) AddInOrderCustomIdObservation(Type string, customId string) int6 param.MaxNumber = 24 * 5 // 一个星期 param.CustomId = customId param.ParamsType = ParamsOrderType - return app.AddIn(param, ParamsOrderId{ + return app.AddIn(tx, param, ParamsOrderId{ OrderId: customId, }) } // AddInOrderCustomIdObservationClone 观察接口关闭 -func (app *App) AddInOrderCustomIdObservationClone(Type string, customId string) int64 { - query := app.TaskCustomIdTakeStatus(Type, customId, TASK_IN) +func (app *App) AddInOrderCustomIdObservationClone(tx *gorm.DB, Type string, customId string) *gorm.DB { + query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN) if query.Id == 0 { - return 0 + return tx } - return app.Edit(query.Id).Select("status", "status_desc", "run_id", "updated_ip", "updated_at").Updates(Task{ - Status: TASK_SUCCESS, - StatusDesc: "已完成,停止观察", - RunId: gouuid.GetUuId(), - UpdatedIp: app.OutsideIp, - UpdatedAt: gotime.Current().Format(), - }).RowsAffected + return app.Edit(tx, query.Id).Select("status", "status_desc", "run_id", "updated_ip", "updated_at"). + Updates(Task{ + Status: TASK_SUCCESS, + StatusDesc: "已完成,停止观察", + RunId: gouuid.GetUuId(), + UpdatedIp: app.OutsideIp, + UpdatedAt: gotime.Current().Format(), + }) } // AddIn 添加可执行任务 @@ -194,7 +195,7 @@ func (app *App) AddInOrderCustomIdObservationClone(Type string, customId string) // params.Frequency 任务频率 // params.CustomId 自定义编号 // params 任务参数 -func (app *App) AddIn(param TaskParams, params interface{}) int64 { +func (app *App) AddIn(tx *gorm.DB, param TaskParams, params interface{}) *gorm.DB { param.Status = TASK_IN param.StatusDesc = "首次添加任务" param.RunId = gouuid.GetUuId() @@ -203,17 +204,17 @@ func (app *App) AddIn(param TaskParams, params interface{}) int64 { param.UpdatedIp = app.OutsideIp param.CreatedAt = gotime.Current().Format() param.UpdatedAt = gotime.Current().Format() - status := app.Db.Create(¶m) + status := tx.Create(¶m) if status.RowsAffected == 0 { log.Println("AddIn:", status.Error) } - return status.RowsAffected + return status } // AddWaitNewServiceNext 添加企业自定义下一步等待执行任务 -func (app *App) AddWaitNewServiceNext(param TaskParams, params interface{}) int64 { +func (app *App) AddWaitNewServiceNext(tx *gorm.DB, param TaskParams, params interface{}) *gorm.DB { param.ParamsType = ParamsNewServiceNextType - return app.AddWait(param, params) + return app.AddWait(tx, param, params) } // AddWait 添加等待执行任务 @@ -222,7 +223,7 @@ func (app *App) AddWaitNewServiceNext(param TaskParams, params interface{}) int6 // params.CustomId 自定义编号 // params.CustomSequence 自定义顺序 // params 任务参数 -func (app *App) AddWait(param TaskParams, params interface{}) int64 { +func (app *App) AddWait(tx *gorm.DB, param TaskParams, params interface{}) *gorm.DB { param.Status = TASK_WAIT param.StatusDesc = "首次添加任务" param.RunId = gouuid.GetUuId() @@ -231,38 +232,40 @@ func (app *App) AddWait(param TaskParams, params interface{}) int64 { param.UpdatedIp = app.OutsideIp param.CreatedAt = gotime.Current().Format() param.UpdatedAt = gotime.Current().Format() - return app.Db.Create(¶m).RowsAffected + return tx.Create(¶m) } // Edit 任务修改 -func (app *App) Edit(id uint) *gorm.DB { - return app.Db.Model(&Task{}).Where("id = ?", id) +func (app *App) Edit(tx *gorm.DB, id uint) *gorm.DB { + return tx.Model(&Task{}).Where("id = ?", id) } // UpdateFrequency 更新任务频率 -func (app *App) UpdateFrequency(id uint, frequency int64) *gorm.DB { - return app.Edit(id).Updates(map[string]interface{}{ - "frequency": frequency, - }) +func (app *App) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB { + return app.Edit(tx, id). + Updates(map[string]interface{}{ + "frequency": frequency, + }) } // Start 任务启动 -func (app *App) Start(customId string, customSequence int64) int64 { - return app.Db.Model(&Task{}). +func (app *App) Start(tx *gorm.DB, customId string, customSequence int64) *gorm.DB { + return tx.Model(&Task{}). Where("custom_id = ?", customId). Where("custom_sequence = ?", customSequence). Where("status = ?", TASK_WAIT). - Select("status", "status_desc", "updated_ip", "updated_at").Updates(Task{ - Status: TASK_IN, - StatusDesc: "启动任务", - UpdatedIp: app.OutsideIp, - UpdatedAt: gotime.Current().Format(), - }).RowsAffected + Select("status", "status_desc", "updated_ip", "updated_at"). + Updates(Task{ + Status: TASK_IN, + StatusDesc: "启动任务", + UpdatedIp: app.OutsideIp, + UpdatedAt: gotime.Current().Format(), + }) } // RunAddLog 任务执行日志 -func (app *App) RunAddLog(id uint, runId string) *gorm.DB { - return app.Db.Create(&TaskLogRun{ +func (app *App) RunAddLog(tx *gorm.DB, id uint, runId string) *gorm.DB { + return tx.Create(&TaskLogRun{ TaskId: id, RunId: runId, InsideIp: app.InsideIp, @@ -277,9 +280,9 @@ func (app *App) RunAddLog(id uint, runId string) *gorm.DB { } // Run 任务执行 -func (app *App) Run(info Task, status int, desc string) { +func (app *App) Run(tx *gorm.DB, info Task, status int, desc string) { // 请求函数记录 - statusCreate := app.Db.Create(&TaskLog{ + statusCreate := tx.Create(&TaskLog{ TaskId: info.Id, StatusCode: status, Desc: desc, @@ -290,7 +293,7 @@ func (app *App) Run(info Task, status int, desc string) { log.Println("statusCreate", statusCreate.Error) } if status == 0 { - statusEdit := app.Edit(info.Id).Select("run_id").Updates(Task{ + statusEdit := app.Edit(tx, info.Id).Select("run_id").Updates(Task{ RunId: gouuid.GetUuId(), }) if statusEdit.RowsAffected == 0 { @@ -301,7 +304,7 @@ func (app *App) Run(info Task, status int, desc string) { // 任务 if status == CodeSuccess { // 执行成功 - statusEdit := app.Edit(info.Id).Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").Updates(Task{ + statusEdit := app.Edit(tx, info.Id).Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").Updates(Task{ StatusDesc: "执行成功", Number: info.Number + 1, RunId: gouuid.GetUuId(), @@ -315,7 +318,7 @@ func (app *App) Run(info Task, status int, desc string) { } if status == CodeEnd { // 执行成功、提前结束 - statusEdit := app.Edit(info.Id).Select("status", "status_desc", "number", "updated_ip", "updated_at", "result").Updates(Task{ + statusEdit := app.Edit(tx, info.Id).Select("status", "status_desc", "number", "updated_ip", "updated_at", "result").Updates(Task{ Status: TASK_SUCCESS, StatusDesc: "结束执行", Number: info.Number + 1, @@ -329,7 +332,7 @@ func (app *App) Run(info Task, status int, desc string) { } if status == CodeError { // 执行失败 - statusEdit := app.Edit(info.Id).Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").Updates(Task{ + statusEdit := app.Edit(tx, info.Id).Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").Updates(Task{ StatusDesc: "执行失败", Number: info.Number + 1, RunId: gouuid.GetUuId(), @@ -344,7 +347,7 @@ func (app *App) Run(info Task, status int, desc string) { if info.MaxNumber != 0 { if info.Number+1 >= info.MaxNumber { // 关闭执行 - statusEdit := app.Edit(info.Id).Select("status").Updates(Task{ + statusEdit := app.Edit(tx, info.Id).Select("status").Updates(Task{ Status: TASK_TIMEOUT, }) if statusEdit.RowsAffected == 0 { diff --git a/jobs/model.go b/jobs/model.go index ba8ed21b..8187d411 100644 --- a/jobs/model.go +++ b/jobs/model.go @@ -1,6 +1,7 @@ package jobs import ( + "gorm.io/gorm" "log" "strings" ) @@ -40,32 +41,32 @@ func (m *Task) TableName() string { } // TaskTake 查询任务 -func (app *App) TaskTake(customId string) (result Task) { - app.Db.Where("custom_id = ?", customId).Where("status = ?", TASK_IN).Take(&result) +func (app *App) TaskTake(tx *gorm.DB, customId string) (result Task) { + tx.Where("custom_id = ?", customId).Where("status = ?", TASK_IN).Take(&result) return result } // TaskCustomIdTake 查询任务 -func (app *App) TaskCustomIdTake(Type, customId string) (result Task) { - app.Db.Where("type = ?", Type).Where("custom_id = ?", customId).Take(&result) +func (app *App) TaskCustomIdTake(tx *gorm.DB, Type, customId string) (result Task) { + tx.Where("type = ?", Type).Where("custom_id = ?", customId).Take(&result) return result } // TaskCustomIdTakeStatus 查询任务 -func (app *App) TaskCustomIdTakeStatus(Type, customId, status string) (result Task) { - app.Db.Where("type = ?", Type).Where("custom_id = ?", customId).Where("status = ?", status).Take(&result) +func (app *App) TaskCustomIdTakeStatus(tx *gorm.DB, Type, customId, status string) (result Task) { + tx.Where("type = ?", Type).Where("custom_id = ?", customId).Where("status = ?", status).Take(&result) return result } // TaskFind 查询任务 -func (app *App) TaskFind(frequency int64) (results []Task) { - app.Db.Table("task").Select("task.*").Where("task.frequency = ?", frequency).Where("task.status = ?", TASK_IN).Where("task_ip.ips = ?", app.OutsideIp).Order("task.id asc").Joins("left join task_ip on task_ip.task_type = task.type").Find(&results) +func (app *App) TaskFind(tx *gorm.DB, frequency int64) (results []Task) { + tx.Table("task").Select("task.*").Where("task.frequency = ?", frequency).Where("task.status = ?", TASK_IN).Where("task_ip.ips = ?", app.OutsideIp).Order("task.id asc").Joins("left join task_ip on task_ip.task_type = task.type").Find(&results) return app.taskFindCheck(results) } // TaskFindAll 查询任务 -func (app *App) TaskFindAll(frequency int64) (results []Task) { - app.Db.Where("frequency = ?", frequency).Where("status = ?", TASK_IN).Order("id asc").Find(&results) +func (app *App) TaskFindAll(tx *gorm.DB, frequency int64) (results []Task) { + tx.Where("frequency = ?", frequency).Where("status = ?", TASK_IN).Order("id asc").Find(&results) return results } @@ -117,8 +118,8 @@ func (m *TaskLogRun) TableName() string { } // TaskLogRunTake 查询任务执行日志 -func (app *App) TaskLogRunTake(taskId uint, runId string) (result TaskLogRun) { - app.Db.Select("id", "os", "arch", "outside_ip", "created_at").Where("task_id = ?", taskId).Where("run_id = ?", runId).Take(&result) +func (app *App) TaskLogRunTake(tx *gorm.DB, taskId uint, runId string) (result TaskLogRun) { + tx.Select("id", "os", "arch", "outside_ip", "created_at").Where("task_id = ?", taskId).Where("run_id = ?", runId).Take(&result) return result } @@ -133,31 +134,31 @@ func (m *TaskIp) TableName() string { return "task_ip" } -func (app *App) TaskIpUpdate(taskType, ips string) int64 { +func (app *App) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB { var query TaskIp - app.Db.Where("task_type = ?", taskType).Where("ips = ?", ips).Take(&query) + tx.Where("task_type = ?", taskType).Where("ips = ?", ips).Take(&query) if query.Id != 0 { - return query.Id + return tx } - updateStatus := app.Db.Create(&TaskIp{ + updateStatus := tx.Create(&TaskIp{ TaskType: taskType, Ips: ips, }) if updateStatus.RowsAffected == 0 { log.Println("任务更新失败:", updateStatus.Error) } - return updateStatus.RowsAffected + return updateStatus } // TaskIpInit 实例任务ip -func (app *App) TaskIpInit(ips map[string]string) bool { +func (app *App) TaskIpInit(tx *gorm.DB, ips map[string]string) bool { if app.OutsideIp == "" || app.OutsideIp == "0.0.0.0" { return false } - app.Db.Where("ips = ?", app.OutsideIp).Delete(&TaskIp{}) // 删除 + tx.Where("ips = ?", app.OutsideIp).Delete(&TaskIp{}) // 删除 for k, v := range ips { if v == "" { - app.TaskIpUpdate(k, app.OutsideIp) + app.TaskIpUpdate(tx, k, app.OutsideIp) } else { find := strings.Contains(v, ",") if find == true { @@ -165,13 +166,13 @@ func (app *App) TaskIpInit(ips map[string]string) bool { parts := strings.Split(v, ",") for _, vv := range parts { if vv == app.OutsideIp { - app.TaskIpUpdate(k, app.OutsideIp) + app.TaskIpUpdate(tx, k, app.OutsideIp) } } } else { // 不包含 if v == app.OutsideIp { - app.TaskIpUpdate(k, app.OutsideIp) + app.TaskIpUpdate(tx, k, app.OutsideIp) } } } diff --git a/jobs/version.go b/jobs/version.go new file mode 100644 index 00000000..b74f0e51 --- /dev/null +++ b/jobs/version.go @@ -0,0 +1,3 @@ +package jobs + +const Version = "1.0.16"