- update jobs

master
李光春 2 years ago
parent b434b89a71
commit 8ace334264

@ -2,17 +2,18 @@ package jobs
import ( import (
"go.dtapp.net/library/gotime" "go.dtapp.net/library/gotime"
"gorm.io/gorm"
"log" "log"
) )
// Check 任务检查 // Check 任务检查
func (app *App) Check(vs []Task) { func (app *App) Check(tx *gorm.DB, vs []Task) {
if app.MainService > 0 && len(vs) > 0 { if app.MainService > 0 && len(vs) > 0 {
for _, v := range vs { for _, v := range vs {
diffInSecondWithAbs := gotime.Current().DiffInSecondWithAbs(gotime.SetCurrentParse(v.UpdatedAt).Time) diffInSecondWithAbs := gotime.Current().DiffInSecondWithAbs(gotime.SetCurrentParse(v.UpdatedAt).Time)
if diffInSecondWithAbs >= v.Frequency*3 { if diffInSecondWithAbs >= v.Frequency*3 {
log.Printf("每隔%v秒任务%v相差%v秒\n", v.Frequency, v.Id, diffInSecondWithAbs) log.Printf("每隔%v秒任务%v相差%v秒\n", v.Frequency, v.Id, diffInSecondWithAbs)
app.Db.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&TaskLogRun{}) // 删除 tx.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&TaskLogRun{}) // 删除
} }
} }
} }

@ -2,9 +2,11 @@ package jobs
import ( import (
"go.dtapp.net/library/goip" "go.dtapp.net/library/goip"
"gorm.io/gorm"
) )
func (app *App) RefreshIp() { // RefreshIp 刷新Ip
func (app *App) RefreshIp(tx *gorm.DB) {
xip := goip.GetOutsideIp() xip := goip.GetOutsideIp()
if app.OutsideIp == "" || app.OutsideIp == "0.0.0.0" { if app.OutsideIp == "" || app.OutsideIp == "0.0.0.0" {
return return
@ -12,6 +14,6 @@ func (app *App) RefreshIp() {
if app.OutsideIp == xip { if app.OutsideIp == xip {
return return
} }
app.Db.Where("ips = ?", app.OutsideIp).Delete(&TaskIp{}) // 删除 tx.Where("ips = ?", app.OutsideIp).Delete(&TaskIp{}) // 删除
app.OutsideIp = xip app.OutsideIp = xip
} }

@ -24,8 +24,8 @@ type App struct {
} }
// Add 添加任务 // Add 添加任务
func (app *App) Add(Type string, params interface{}, frequency int64) int64 { func (app *App) Add(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
return app.Db.Create(&Task{ return tx.Create(&Task{
Status: TASK_IN, Status: TASK_IN,
Params: gojson.JsonEncodeNoError(params), Params: gojson.JsonEncodeNoError(params),
StatusDesc: "首次添加任务", StatusDesc: "首次添加任务",
@ -36,16 +36,16 @@ func (app *App) Add(Type string, params interface{}, frequency int64) int64 {
UpdatedIp: app.OutsideIp, UpdatedIp: app.OutsideIp,
CreatedAt: gotime.Current().Format(), CreatedAt: gotime.Current().Format(),
UpdatedAt: gotime.Current().Format(), UpdatedAt: gotime.Current().Format(),
}).RowsAffected })
} }
// AddCustomId 添加任务 // AddCustomId 添加任务
func (app *App) AddCustomId(Type string, params interface{}, frequency int64, customId string) int64 { func (app *App) AddCustomId(tx *gorm.DB, Type string, params interface{}, frequency int64, customId string) *gorm.DB {
query := app.TaskCustomIdTake(Type, customId) query := app.TaskCustomIdTake(tx, Type, customId)
if query.Id != 0 { if query.Id != 0 {
return 0 return tx
} }
return app.Db.Create(&Task{ return tx.Create(&Task{
Status: TASK_IN, Status: TASK_IN,
Params: gojson.JsonEncodeNoError(params), Params: gojson.JsonEncodeNoError(params),
StatusDesc: "首次添加任务", StatusDesc: "首次添加任务",
@ -57,16 +57,16 @@ func (app *App) AddCustomId(Type string, params interface{}, frequency int64, cu
UpdatedIp: app.OutsideIp, UpdatedIp: app.OutsideIp,
CreatedAt: gotime.Current().Format(), CreatedAt: gotime.Current().Format(),
UpdatedAt: gotime.Current().Format(), UpdatedAt: gotime.Current().Format(),
}).RowsAffected })
} }
// AddCustomIdMaxNumber 添加任务并设置最大数量 // AddCustomIdMaxNumber 添加任务并设置最大数量
func (app *App) AddCustomIdMaxNumber(Type string, params interface{}, frequency int64, customId string, maxNumber int64) int64 { func (app *App) AddCustomIdMaxNumber(tx *gorm.DB, Type string, params interface{}, frequency int64, customId string, maxNumber int64) *gorm.DB {
query := app.TaskCustomIdTakeStatus(Type, customId, TASK_IN) query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN)
if query.Id != 0 { if query.Id != 0 {
return 0 return tx
} }
return app.Db.Create(&Task{ return tx.Create(&Task{
Status: TASK_IN, Status: TASK_IN,
Params: gojson.JsonEncodeNoError(params), Params: gojson.JsonEncodeNoError(params),
StatusDesc: "首次添加任务", StatusDesc: "首次添加任务",
@ -79,39 +79,39 @@ func (app *App) AddCustomIdMaxNumber(Type string, params interface{}, frequency
UpdatedIp: app.OutsideIp, UpdatedIp: app.OutsideIp,
CreatedAt: gotime.Current().Format(), CreatedAt: gotime.Current().Format(),
UpdatedAt: gotime.Current().Format(), UpdatedAt: gotime.Current().Format(),
}).RowsAffected })
} }
type TaskParams = Task type TaskParams = Task
// AddInOrder 添加订单可执行任务 // AddInOrder 添加订单可执行任务
func (app *App) AddInOrder(Type string, params interface{}, frequency int64) int64 { func (app *App) AddInOrder(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
var param TaskParams var param TaskParams
param.Type = Type param.Type = Type
param.Frequency = frequency param.Frequency = frequency
param.ParamsType = ParamsOrderType param.ParamsType = ParamsOrderType
return app.AddIn(param, params) return app.AddIn(tx, param, params)
} }
// AddInOrderCustomId 添加订单可执行任务 // AddInOrderCustomId 添加订单可执行任务
func (app *App) AddInOrderCustomId(Type string, params interface{}, frequency int64, customId string) int64 { func (app *App) AddInOrderCustomId(tx *gorm.DB, Type string, params interface{}, frequency int64, customId string) *gorm.DB {
query := app.TaskCustomIdTakeStatus(Type, customId, TASK_IN) query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN)
if query.Id != 0 { if query.Id != 0 {
return 0 return tx
} }
var param TaskParams var param TaskParams
param.Type = Type param.Type = Type
param.Frequency = frequency param.Frequency = frequency
param.CustomId = customId param.CustomId = customId
param.ParamsType = ParamsOrderType param.ParamsType = ParamsOrderType
return app.AddIn(param, params) return app.AddIn(tx, param, params)
} }
// AddInOrderCustomIdSpecifyIp 添加订单可执行任务 // AddInOrderCustomIdSpecifyIp 添加订单可执行任务
func (app *App) AddInOrderCustomIdSpecifyIp(Type string, params interface{}, frequency int64, customId, specifyIp string) int64 { func (app *App) AddInOrderCustomIdSpecifyIp(tx *gorm.DB, Type string, params interface{}, frequency int64, customId, specifyIp string) *gorm.DB {
query := app.TaskCustomIdTakeStatus(Type, customId, TASK_IN) query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN)
if query.Id != 0 { if query.Id != 0 {
return 0 return tx
} }
var param TaskParams var param TaskParams
param.Type = Type param.Type = Type
@ -119,49 +119,49 @@ func (app *App) AddInOrderCustomIdSpecifyIp(Type string, params interface{}, fre
param.CustomId = customId param.CustomId = customId
param.SpecifyIp = specifyIp param.SpecifyIp = specifyIp
param.ParamsType = ParamsOrderType param.ParamsType = ParamsOrderType
return app.AddIn(param, params) return app.AddIn(tx, param, params)
} }
// AddInMerchantGoldenBean 添加商家金豆可执行任务 // AddInMerchantGoldenBean 添加商家金豆可执行任务
func (app *App) AddInMerchantGoldenBean(Type string, params interface{}, frequency int64) int64 { func (app *App) AddInMerchantGoldenBean(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
var param TaskParams var param TaskParams
param.Type = Type param.Type = Type
param.Frequency = frequency param.Frequency = frequency
param.ParamsType = ParamsMerchantGoldenBeanType param.ParamsType = ParamsMerchantGoldenBeanType
return app.AddIn(param, params) return app.AddIn(tx, param, params)
} }
// AddInTeamInv 添加团队邀请可执行任务 // AddInTeamInv 添加团队邀请可执行任务
func (app *App) AddInTeamInv(Type string, params interface{}, frequency int64) int64 { func (app *App) AddInTeamInv(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
var param TaskParams var param TaskParams
param.Type = Type param.Type = Type
param.Frequency = frequency param.Frequency = frequency
param.ParamsType = ParamsTeamInvType param.ParamsType = ParamsTeamInvType
return app.AddIn(param, params) return app.AddIn(tx, param, params)
} }
// AddInUserShareInvitation 邀请好友 // AddInUserShareInvitation 邀请好友
func (app *App) AddInUserShareInvitation(Type string, params interface{}, frequency int64) int64 { func (app *App) AddInUserShareInvitation(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
var param TaskParams var param TaskParams
param.Type = Type param.Type = Type
param.Frequency = frequency param.Frequency = frequency
return app.AddIn(param, params) return app.AddIn(tx, param, params)
} }
// AddInNewService 添加企业自定义可执行任务 // AddInNewService 添加企业自定义可执行任务
func (app *App) AddInNewService(Type string, params interface{}, frequency int64) int64 { func (app *App) AddInNewService(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
var param TaskParams var param TaskParams
param.Type = Type param.Type = Type
param.Frequency = frequency param.Frequency = frequency
param.ParamsType = ParamsNewServiceType param.ParamsType = ParamsNewServiceType
return app.AddIn(param, params) return app.AddIn(tx, param, params)
} }
// AddInOrderCustomIdObservation 添加观察接口任务 // AddInOrderCustomIdObservation 添加观察接口任务
func (app *App) AddInOrderCustomIdObservation(Type string, customId string) int64 { func (app *App) AddInOrderCustomIdObservation(tx *gorm.DB, Type string, customId string) *gorm.DB {
query := app.TaskCustomIdTakeStatus(Type, customId, TASK_IN) query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN)
if query.Id != 0 { if query.Id != 0 {
return int64(query.Id) return tx
} }
var param TaskParams var param TaskParams
param.Type = Type param.Type = Type
@ -169,24 +169,25 @@ func (app *App) AddInOrderCustomIdObservation(Type string, customId string) int6
param.MaxNumber = 24 * 5 // 一个星期 param.MaxNumber = 24 * 5 // 一个星期
param.CustomId = customId param.CustomId = customId
param.ParamsType = ParamsOrderType param.ParamsType = ParamsOrderType
return app.AddIn(param, ParamsOrderId{ return app.AddIn(tx, param, ParamsOrderId{
OrderId: customId, OrderId: customId,
}) })
} }
// AddInOrderCustomIdObservationClone 观察接口关闭 // AddInOrderCustomIdObservationClone 观察接口关闭
func (app *App) AddInOrderCustomIdObservationClone(Type string, customId string) int64 { func (app *App) AddInOrderCustomIdObservationClone(tx *gorm.DB, Type string, customId string) *gorm.DB {
query := app.TaskCustomIdTakeStatus(Type, customId, TASK_IN) query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN)
if query.Id == 0 { if query.Id == 0 {
return 0 return tx
} }
return app.Edit(query.Id).Select("status", "status_desc", "run_id", "updated_ip", "updated_at").Updates(Task{ return app.Edit(tx, query.Id).Select("status", "status_desc", "run_id", "updated_ip", "updated_at").
Status: TASK_SUCCESS, Updates(Task{
StatusDesc: "已完成,停止观察", Status: TASK_SUCCESS,
RunId: gouuid.GetUuId(), StatusDesc: "已完成,停止观察",
UpdatedIp: app.OutsideIp, RunId: gouuid.GetUuId(),
UpdatedAt: gotime.Current().Format(), UpdatedIp: app.OutsideIp,
}).RowsAffected UpdatedAt: gotime.Current().Format(),
})
} }
// AddIn 添加可执行任务 // AddIn 添加可执行任务
@ -194,7 +195,7 @@ func (app *App) AddInOrderCustomIdObservationClone(Type string, customId string)
// params.Frequency 任务频率 // params.Frequency 任务频率
// params.CustomId 自定义编号 // params.CustomId 自定义编号
// params 任务参数 // params 任务参数
func (app *App) AddIn(param TaskParams, params interface{}) int64 { func (app *App) AddIn(tx *gorm.DB, param TaskParams, params interface{}) *gorm.DB {
param.Status = TASK_IN param.Status = TASK_IN
param.StatusDesc = "首次添加任务" param.StatusDesc = "首次添加任务"
param.RunId = gouuid.GetUuId() param.RunId = gouuid.GetUuId()
@ -203,17 +204,17 @@ func (app *App) AddIn(param TaskParams, params interface{}) int64 {
param.UpdatedIp = app.OutsideIp param.UpdatedIp = app.OutsideIp
param.CreatedAt = gotime.Current().Format() param.CreatedAt = gotime.Current().Format()
param.UpdatedAt = gotime.Current().Format() param.UpdatedAt = gotime.Current().Format()
status := app.Db.Create(&param) status := tx.Create(&param)
if status.RowsAffected == 0 { if status.RowsAffected == 0 {
log.Println("AddIn", status.Error) log.Println("AddIn", status.Error)
} }
return status.RowsAffected return status
} }
// AddWaitNewServiceNext 添加企业自定义下一步等待执行任务 // AddWaitNewServiceNext 添加企业自定义下一步等待执行任务
func (app *App) AddWaitNewServiceNext(param TaskParams, params interface{}) int64 { func (app *App) AddWaitNewServiceNext(tx *gorm.DB, param TaskParams, params interface{}) *gorm.DB {
param.ParamsType = ParamsNewServiceNextType param.ParamsType = ParamsNewServiceNextType
return app.AddWait(param, params) return app.AddWait(tx, param, params)
} }
// AddWait 添加等待执行任务 // AddWait 添加等待执行任务
@ -222,7 +223,7 @@ func (app *App) AddWaitNewServiceNext(param TaskParams, params interface{}) int6
// params.CustomId 自定义编号 // params.CustomId 自定义编号
// params.CustomSequence 自定义顺序 // params.CustomSequence 自定义顺序
// params 任务参数 // params 任务参数
func (app *App) AddWait(param TaskParams, params interface{}) int64 { func (app *App) AddWait(tx *gorm.DB, param TaskParams, params interface{}) *gorm.DB {
param.Status = TASK_WAIT param.Status = TASK_WAIT
param.StatusDesc = "首次添加任务" param.StatusDesc = "首次添加任务"
param.RunId = gouuid.GetUuId() param.RunId = gouuid.GetUuId()
@ -231,38 +232,40 @@ func (app *App) AddWait(param TaskParams, params interface{}) int64 {
param.UpdatedIp = app.OutsideIp param.UpdatedIp = app.OutsideIp
param.CreatedAt = gotime.Current().Format() param.CreatedAt = gotime.Current().Format()
param.UpdatedAt = gotime.Current().Format() param.UpdatedAt = gotime.Current().Format()
return app.Db.Create(&param).RowsAffected return tx.Create(&param)
} }
// Edit 任务修改 // Edit 任务修改
func (app *App) Edit(id uint) *gorm.DB { func (app *App) Edit(tx *gorm.DB, id uint) *gorm.DB {
return app.Db.Model(&Task{}).Where("id = ?", id) return tx.Model(&Task{}).Where("id = ?", id)
} }
// UpdateFrequency 更新任务频率 // UpdateFrequency 更新任务频率
func (app *App) UpdateFrequency(id uint, frequency int64) *gorm.DB { func (app *App) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB {
return app.Edit(id).Updates(map[string]interface{}{ return app.Edit(tx, id).
"frequency": frequency, Updates(map[string]interface{}{
}) "frequency": frequency,
})
} }
// Start 任务启动 // Start 任务启动
func (app *App) Start(customId string, customSequence int64) int64 { func (app *App) Start(tx *gorm.DB, customId string, customSequence int64) *gorm.DB {
return app.Db.Model(&Task{}). return tx.Model(&Task{}).
Where("custom_id = ?", customId). Where("custom_id = ?", customId).
Where("custom_sequence = ?", customSequence). Where("custom_sequence = ?", customSequence).
Where("status = ?", TASK_WAIT). Where("status = ?", TASK_WAIT).
Select("status", "status_desc", "updated_ip", "updated_at").Updates(Task{ Select("status", "status_desc", "updated_ip", "updated_at").
Status: TASK_IN, Updates(Task{
StatusDesc: "启动任务", Status: TASK_IN,
UpdatedIp: app.OutsideIp, StatusDesc: "启动任务",
UpdatedAt: gotime.Current().Format(), UpdatedIp: app.OutsideIp,
}).RowsAffected UpdatedAt: gotime.Current().Format(),
})
} }
// RunAddLog 任务执行日志 // RunAddLog 任务执行日志
func (app *App) RunAddLog(id uint, runId string) *gorm.DB { func (app *App) RunAddLog(tx *gorm.DB, id uint, runId string) *gorm.DB {
return app.Db.Create(&TaskLogRun{ return tx.Create(&TaskLogRun{
TaskId: id, TaskId: id,
RunId: runId, RunId: runId,
InsideIp: app.InsideIp, InsideIp: app.InsideIp,
@ -277,9 +280,9 @@ func (app *App) RunAddLog(id uint, runId string) *gorm.DB {
} }
// Run 任务执行 // Run 任务执行
func (app *App) Run(info Task, status int, desc string) { func (app *App) Run(tx *gorm.DB, info Task, status int, desc string) {
// 请求函数记录 // 请求函数记录
statusCreate := app.Db.Create(&TaskLog{ statusCreate := tx.Create(&TaskLog{
TaskId: info.Id, TaskId: info.Id,
StatusCode: status, StatusCode: status,
Desc: desc, Desc: desc,
@ -290,7 +293,7 @@ func (app *App) Run(info Task, status int, desc string) {
log.Println("statusCreate", statusCreate.Error) log.Println("statusCreate", statusCreate.Error)
} }
if status == 0 { if status == 0 {
statusEdit := app.Edit(info.Id).Select("run_id").Updates(Task{ statusEdit := app.Edit(tx, info.Id).Select("run_id").Updates(Task{
RunId: gouuid.GetUuId(), RunId: gouuid.GetUuId(),
}) })
if statusEdit.RowsAffected == 0 { if statusEdit.RowsAffected == 0 {
@ -301,7 +304,7 @@ func (app *App) Run(info Task, status int, desc string) {
// 任务 // 任务
if status == CodeSuccess { if status == CodeSuccess {
// 执行成功 // 执行成功
statusEdit := app.Edit(info.Id).Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").Updates(Task{ statusEdit := app.Edit(tx, info.Id).Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").Updates(Task{
StatusDesc: "执行成功", StatusDesc: "执行成功",
Number: info.Number + 1, Number: info.Number + 1,
RunId: gouuid.GetUuId(), RunId: gouuid.GetUuId(),
@ -315,7 +318,7 @@ func (app *App) Run(info Task, status int, desc string) {
} }
if status == CodeEnd { if status == CodeEnd {
// 执行成功、提前结束 // 执行成功、提前结束
statusEdit := app.Edit(info.Id).Select("status", "status_desc", "number", "updated_ip", "updated_at", "result").Updates(Task{ statusEdit := app.Edit(tx, info.Id).Select("status", "status_desc", "number", "updated_ip", "updated_at", "result").Updates(Task{
Status: TASK_SUCCESS, Status: TASK_SUCCESS,
StatusDesc: "结束执行", StatusDesc: "结束执行",
Number: info.Number + 1, Number: info.Number + 1,
@ -329,7 +332,7 @@ func (app *App) Run(info Task, status int, desc string) {
} }
if status == CodeError { if status == CodeError {
// 执行失败 // 执行失败
statusEdit := app.Edit(info.Id).Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").Updates(Task{ statusEdit := app.Edit(tx, info.Id).Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").Updates(Task{
StatusDesc: "执行失败", StatusDesc: "执行失败",
Number: info.Number + 1, Number: info.Number + 1,
RunId: gouuid.GetUuId(), RunId: gouuid.GetUuId(),
@ -344,7 +347,7 @@ func (app *App) Run(info Task, status int, desc string) {
if info.MaxNumber != 0 { if info.MaxNumber != 0 {
if info.Number+1 >= info.MaxNumber { if info.Number+1 >= info.MaxNumber {
// 关闭执行 // 关闭执行
statusEdit := app.Edit(info.Id).Select("status").Updates(Task{ statusEdit := app.Edit(tx, info.Id).Select("status").Updates(Task{
Status: TASK_TIMEOUT, Status: TASK_TIMEOUT,
}) })
if statusEdit.RowsAffected == 0 { if statusEdit.RowsAffected == 0 {

@ -1,6 +1,7 @@
package jobs package jobs
import ( import (
"gorm.io/gorm"
"log" "log"
"strings" "strings"
) )
@ -40,32 +41,32 @@ func (m *Task) TableName() string {
} }
// TaskTake 查询任务 // TaskTake 查询任务
func (app *App) TaskTake(customId string) (result Task) { func (app *App) TaskTake(tx *gorm.DB, customId string) (result Task) {
app.Db.Where("custom_id = ?", customId).Where("status = ?", TASK_IN).Take(&result) tx.Where("custom_id = ?", customId).Where("status = ?", TASK_IN).Take(&result)
return result return result
} }
// TaskCustomIdTake 查询任务 // TaskCustomIdTake 查询任务
func (app *App) TaskCustomIdTake(Type, customId string) (result Task) { func (app *App) TaskCustomIdTake(tx *gorm.DB, Type, customId string) (result Task) {
app.Db.Where("type = ?", Type).Where("custom_id = ?", customId).Take(&result) tx.Where("type = ?", Type).Where("custom_id = ?", customId).Take(&result)
return result return result
} }
// TaskCustomIdTakeStatus 查询任务 // TaskCustomIdTakeStatus 查询任务
func (app *App) TaskCustomIdTakeStatus(Type, customId, status string) (result Task) { func (app *App) TaskCustomIdTakeStatus(tx *gorm.DB, Type, customId, status string) (result Task) {
app.Db.Where("type = ?", Type).Where("custom_id = ?", customId).Where("status = ?", status).Take(&result) tx.Where("type = ?", Type).Where("custom_id = ?", customId).Where("status = ?", status).Take(&result)
return result return result
} }
// TaskFind 查询任务 // TaskFind 查询任务
func (app *App) TaskFind(frequency int64) (results []Task) { func (app *App) TaskFind(tx *gorm.DB, frequency int64) (results []Task) {
app.Db.Table("task").Select("task.*").Where("task.frequency = ?", frequency).Where("task.status = ?", TASK_IN).Where("task_ip.ips = ?", app.OutsideIp).Order("task.id asc").Joins("left join task_ip on task_ip.task_type = task.type").Find(&results) tx.Table("task").Select("task.*").Where("task.frequency = ?", frequency).Where("task.status = ?", TASK_IN).Where("task_ip.ips = ?", app.OutsideIp).Order("task.id asc").Joins("left join task_ip on task_ip.task_type = task.type").Find(&results)
return app.taskFindCheck(results) return app.taskFindCheck(results)
} }
// TaskFindAll 查询任务 // TaskFindAll 查询任务
func (app *App) TaskFindAll(frequency int64) (results []Task) { func (app *App) TaskFindAll(tx *gorm.DB, frequency int64) (results []Task) {
app.Db.Where("frequency = ?", frequency).Where("status = ?", TASK_IN).Order("id asc").Find(&results) tx.Where("frequency = ?", frequency).Where("status = ?", TASK_IN).Order("id asc").Find(&results)
return results return results
} }
@ -117,8 +118,8 @@ func (m *TaskLogRun) TableName() string {
} }
// TaskLogRunTake 查询任务执行日志 // TaskLogRunTake 查询任务执行日志
func (app *App) TaskLogRunTake(taskId uint, runId string) (result TaskLogRun) { func (app *App) TaskLogRunTake(tx *gorm.DB, taskId uint, runId string) (result TaskLogRun) {
app.Db.Select("id", "os", "arch", "outside_ip", "created_at").Where("task_id = ?", taskId).Where("run_id = ?", runId).Take(&result) tx.Select("id", "os", "arch", "outside_ip", "created_at").Where("task_id = ?", taskId).Where("run_id = ?", runId).Take(&result)
return result return result
} }
@ -133,31 +134,31 @@ func (m *TaskIp) TableName() string {
return "task_ip" return "task_ip"
} }
func (app *App) TaskIpUpdate(taskType, ips string) int64 { func (app *App) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB {
var query TaskIp var query TaskIp
app.Db.Where("task_type = ?", taskType).Where("ips = ?", ips).Take(&query) tx.Where("task_type = ?", taskType).Where("ips = ?", ips).Take(&query)
if query.Id != 0 { if query.Id != 0 {
return query.Id return tx
} }
updateStatus := app.Db.Create(&TaskIp{ updateStatus := tx.Create(&TaskIp{
TaskType: taskType, TaskType: taskType,
Ips: ips, Ips: ips,
}) })
if updateStatus.RowsAffected == 0 { if updateStatus.RowsAffected == 0 {
log.Println("任务更新失败:", updateStatus.Error) log.Println("任务更新失败:", updateStatus.Error)
} }
return updateStatus.RowsAffected return updateStatus
} }
// TaskIpInit 实例任务ip // TaskIpInit 实例任务ip
func (app *App) TaskIpInit(ips map[string]string) bool { func (app *App) TaskIpInit(tx *gorm.DB, ips map[string]string) bool {
if app.OutsideIp == "" || app.OutsideIp == "0.0.0.0" { if app.OutsideIp == "" || app.OutsideIp == "0.0.0.0" {
return false return false
} }
app.Db.Where("ips = ?", app.OutsideIp).Delete(&TaskIp{}) // 删除 tx.Where("ips = ?", app.OutsideIp).Delete(&TaskIp{}) // 删除
for k, v := range ips { for k, v := range ips {
if v == "" { if v == "" {
app.TaskIpUpdate(k, app.OutsideIp) app.TaskIpUpdate(tx, k, app.OutsideIp)
} else { } else {
find := strings.Contains(v, ",") find := strings.Contains(v, ",")
if find == true { if find == true {
@ -165,13 +166,13 @@ func (app *App) TaskIpInit(ips map[string]string) bool {
parts := strings.Split(v, ",") parts := strings.Split(v, ",")
for _, vv := range parts { for _, vv := range parts {
if vv == app.OutsideIp { if vv == app.OutsideIp {
app.TaskIpUpdate(k, app.OutsideIp) app.TaskIpUpdate(tx, k, app.OutsideIp)
} }
} }
} else { } else {
// 不包含 // 不包含
if v == app.OutsideIp { if v == app.OutsideIp {
app.TaskIpUpdate(k, app.OutsideIp) app.TaskIpUpdate(tx, k, app.OutsideIp)
} }
} }
} }

@ -0,0 +1,3 @@
package jobs
const Version = "1.0.16"
Loading…
Cancel
Save