You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
go-library/utils/jobs/jobs.go

359 lines
11 KiB

2 years ago
package jobs
import (
2 years ago
"go.dtapp.net/library/utils/dorm"
2 years ago
"go.dtapp.net/library/utils/gojson"
"go.dtapp.net/library/utils/gotime"
2 years ago
"go.dtapp.net/library/utils/only"
2 years ago
"gorm.io/gorm"
"log"
2 years ago
)
type App struct {
2 years ago
RunVersion int `json:"run_version"` // 运行版本
Os string `json:"os"` // 系统类型
Arch string `json:"arch"` // 系统架构
MaxProCs int `json:"max_pro_cs"` // CPU核数
Version string `json:"version"` // GO版本
MacAddrS string `json:"mac_addr_s"` // Mac地址
InsideIp string `json:"inside_ip"` // 内网ip
OutsideIp string `json:"outside_ip"` // 外网ip
MainService int `json:"main_service"` // 主要服务
Db *gorm.DB // 数据库
Redis *dorm.RedisClient // 缓存数据库服务
2 years ago
}
// Add 添加任务
2 years ago
func (app *App) Add(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
return tx.Create(&Task{
2 years ago
Status: TASK_IN,
Params: gojson.JsonEncodeNoError(params),
StatusDesc: "首次添加任务",
Frequency: frequency,
2 years ago
RunId: only.GetUuId(),
2 years ago
Type: Type,
CreatedIp: app.OutsideIp,
UpdatedIp: app.OutsideIp,
CreatedAt: gotime.Current().Format(),
UpdatedAt: gotime.Current().Format(),
2 years ago
})
2 years ago
}
// AddCustomId 添加任务
2 years ago
func (app *App) AddCustomId(tx *gorm.DB, Type string, params interface{}, frequency int64, customId string) *gorm.DB {
query := app.TaskCustomIdTake(tx, Type, customId)
2 years ago
if query.Id != 0 {
2 years ago
return tx
2 years ago
}
2 years ago
return tx.Create(&Task{
2 years ago
Status: TASK_IN,
Params: gojson.JsonEncodeNoError(params),
StatusDesc: "首次添加任务",
Frequency: frequency,
2 years ago
RunId: only.GetUuId(),
2 years ago
CustomId: customId,
Type: Type,
CreatedIp: app.OutsideIp,
UpdatedIp: app.OutsideIp,
CreatedAt: gotime.Current().Format(),
UpdatedAt: gotime.Current().Format(),
2 years ago
})
2 years ago
}
// AddCustomIdMaxNumber 添加任务并设置最大数量
2 years ago
func (app *App) AddCustomIdMaxNumber(tx *gorm.DB, Type string, params interface{}, frequency int64, customId string, maxNumber int64) *gorm.DB {
query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN)
2 years ago
if query.Id != 0 {
2 years ago
return tx
2 years ago
}
2 years ago
return tx.Create(&Task{
2 years ago
Status: TASK_IN,
Params: gojson.JsonEncodeNoError(params),
StatusDesc: "首次添加任务",
Frequency: frequency,
MaxNumber: maxNumber,
2 years ago
RunId: only.GetUuId(),
2 years ago
CustomId: customId,
Type: Type,
CreatedIp: app.OutsideIp,
UpdatedIp: app.OutsideIp,
CreatedAt: gotime.Current().Format(),
UpdatedAt: gotime.Current().Format(),
2 years ago
})
2 years ago
}
type TaskParams = Task
// AddInOrder 添加订单可执行任务
2 years ago
func (app *App) AddInOrder(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
2 years ago
var param TaskParams
param.Type = Type
param.Frequency = frequency
param.ParamsType = ParamsOrderType
2 years ago
return app.AddIn(tx, param, params)
2 years ago
}
// AddInOrderCustomId 添加订单可执行任务
2 years ago
func (app *App) AddInOrderCustomId(tx *gorm.DB, Type string, params interface{}, frequency int64, customId string) *gorm.DB {
query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN)
2 years ago
if query.Id != 0 {
2 years ago
return tx
2 years ago
}
var param TaskParams
param.Type = Type
param.Frequency = frequency
param.CustomId = customId
param.ParamsType = ParamsOrderType
2 years ago
return app.AddIn(tx, param, params)
2 years ago
}
// AddInOrderCustomIdSpecifyIp 添加订单可执行任务
2 years ago
func (app *App) AddInOrderCustomIdSpecifyIp(tx *gorm.DB, Type string, params interface{}, frequency int64, customId, specifyIp string) *gorm.DB {
query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN)
2 years ago
if query.Id != 0 {
2 years ago
return tx
2 years ago
}
var param TaskParams
param.Type = Type
param.Frequency = frequency
param.CustomId = customId
param.SpecifyIp = specifyIp
param.ParamsType = ParamsOrderType
2 years ago
return app.AddIn(tx, param, params)
2 years ago
}
// AddInMerchantGoldenBean 添加商家金豆可执行任务
2 years ago
func (app *App) AddInMerchantGoldenBean(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
2 years ago
var param TaskParams
param.Type = Type
param.Frequency = frequency
param.ParamsType = ParamsMerchantGoldenBeanType
2 years ago
return app.AddIn(tx, param, params)
2 years ago
}
// AddInTeamInv 添加团队邀请可执行任务
2 years ago
func (app *App) AddInTeamInv(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
2 years ago
var param TaskParams
param.Type = Type
param.Frequency = frequency
param.ParamsType = ParamsTeamInvType
2 years ago
return app.AddIn(tx, param, params)
2 years ago
}
// AddInUserShareInvitation 邀请好友
2 years ago
func (app *App) AddInUserShareInvitation(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
2 years ago
var param TaskParams
param.Type = Type
param.Frequency = frequency
2 years ago
return app.AddIn(tx, param, params)
2 years ago
}
// AddInNewService 添加企业自定义可执行任务
2 years ago
func (app *App) AddInNewService(tx *gorm.DB, Type string, params interface{}, frequency int64) *gorm.DB {
2 years ago
var param TaskParams
param.Type = Type
param.Frequency = frequency
param.ParamsType = ParamsNewServiceType
2 years ago
return app.AddIn(tx, param, params)
2 years ago
}
// AddInOrderCustomIdObservation 添加观察接口任务
2 years ago
func (app *App) AddInOrderCustomIdObservation(tx *gorm.DB, Type string, customId string) *gorm.DB {
query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN)
2 years ago
if query.Id != 0 {
2 years ago
return tx
2 years ago
}
var param TaskParams
param.Type = Type
param.Frequency = 3600
param.MaxNumber = 24 * 5 // 一个星期
param.CustomId = customId
param.ParamsType = ParamsOrderType
2 years ago
return app.AddIn(tx, param, ParamsOrderId{
2 years ago
OrderId: customId,
})
}
// AddInOrderCustomIdObservationClone 观察接口关闭
2 years ago
func (app *App) AddInOrderCustomIdObservationClone(tx *gorm.DB, Type string, customId string) *gorm.DB {
query := app.TaskCustomIdTakeStatus(tx, Type, customId, TASK_IN)
2 years ago
if query.Id == 0 {
2 years ago
return tx
2 years ago
}
2 years ago
return app.Edit(tx, query.Id).Select("status", "status_desc", "run_id", "updated_ip", "updated_at").
Updates(Task{
Status: TASK_SUCCESS,
StatusDesc: "已完成,停止观察",
2 years ago
RunId: only.GetUuId(),
2 years ago
UpdatedIp: app.OutsideIp,
UpdatedAt: gotime.Current().Format(),
})
2 years ago
}
// AddIn 添加可执行任务
// params.Type 任务类型
// params.Frequency 任务频率
// params.CustomId 自定义编号
// params 任务参数
2 years ago
func (app *App) AddIn(tx *gorm.DB, param TaskParams, params interface{}) *gorm.DB {
2 years ago
param.Status = TASK_IN
param.StatusDesc = "首次添加任务"
2 years ago
param.RunId = only.GetUuId()
2 years ago
param.Params = gojson.JsonEncodeNoError(params)
param.CreatedIp = app.OutsideIp
param.UpdatedIp = app.OutsideIp
param.CreatedAt = gotime.Current().Format()
param.UpdatedAt = gotime.Current().Format()
2 years ago
status := tx.Create(&param)
if status.RowsAffected == 0 {
log.Println("AddIn", status.Error)
}
2 years ago
return status
2 years ago
}
// AddWaitNewServiceNext 添加企业自定义下一步等待执行任务
2 years ago
func (app *App) AddWaitNewServiceNext(tx *gorm.DB, param TaskParams, params interface{}) *gorm.DB {
2 years ago
param.ParamsType = ParamsNewServiceNextType
2 years ago
return app.AddWait(tx, param, params)
2 years ago
}
// AddWait 添加等待执行任务
// params.Type 任务类型
// params.Frequency 任务频率
// params.CustomId 自定义编号
// params.CustomSequence 自定义顺序
// params 任务参数
2 years ago
func (app *App) AddWait(tx *gorm.DB, param TaskParams, params interface{}) *gorm.DB {
2 years ago
param.Status = TASK_WAIT
param.StatusDesc = "首次添加任务"
2 years ago
param.RunId = only.GetUuId()
2 years ago
param.Params = gojson.JsonEncodeNoError(params)
param.CreatedIp = app.OutsideIp
param.UpdatedIp = app.OutsideIp
param.CreatedAt = gotime.Current().Format()
param.UpdatedAt = gotime.Current().Format()
2 years ago
return tx.Create(&param)
2 years ago
}
// Edit 任务修改
2 years ago
func (app *App) Edit(tx *gorm.DB, id uint) *gorm.DB {
return tx.Model(&Task{}).Where("id = ?", id)
2 years ago
}
// UpdateFrequency 更新任务频率
2 years ago
func (app *App) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB {
return app.Edit(tx, id).
Updates(map[string]interface{}{
"frequency": frequency,
})
2 years ago
}
// Start 任务启动
2 years ago
func (app *App) Start(tx *gorm.DB, customId string, customSequence int64) *gorm.DB {
return tx.Model(&Task{}).
2 years ago
Where("custom_id = ?", customId).
Where("custom_sequence = ?", customSequence).
Where("status = ?", TASK_WAIT).
2 years ago
Select("status", "status_desc", "updated_ip", "updated_at").
Updates(Task{
Status: TASK_IN,
StatusDesc: "启动任务",
UpdatedIp: app.OutsideIp,
UpdatedAt: gotime.Current().Format(),
})
2 years ago
}
// RunAddLog 任务执行日志
2 years ago
func (app *App) RunAddLog(tx *gorm.DB, id uint, runId string) *gorm.DB {
return tx.Create(&TaskLogRun{
2 years ago
TaskId: id,
RunId: runId,
InsideIp: app.InsideIp,
OutsideIp: app.OutsideIp,
Os: app.Os,
Arch: app.Arch,
Gomaxprocs: app.MaxProCs,
GoVersion: app.Version,
MacAddrs: app.MacAddrS,
CreatedAt: gotime.Current().Format(),
})
2 years ago
}
// Run 任务执行
2 years ago
func (app *App) Run(tx *gorm.DB, info Task, status int, desc string) {
2 years ago
// 请求函数记录
2 years ago
statusCreate := tx.Create(&TaskLog{
2 years ago
TaskId: info.Id,
StatusCode: status,
Desc: desc,
Version: app.RunVersion,
CreatedAt: gotime.Current().Format(),
})
if statusCreate.RowsAffected == 0 {
log.Println("statusCreate", statusCreate.Error)
}
2 years ago
if status == 0 {
2 years ago
statusEdit := app.Edit(tx, info.Id).Select("run_id").Updates(Task{
2 years ago
RunId: only.GetUuId(),
2 years ago
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
2 years ago
return
}
// 任务
if status == CodeSuccess {
2 years ago
// 执行成功
2 years ago
statusEdit := app.Edit(tx, info.Id).Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").Updates(Task{
2 years ago
StatusDesc: "执行成功",
Number: info.Number + 1,
2 years ago
RunId: only.GetUuId(),
2 years ago
UpdatedIp: app.OutsideIp,
UpdatedAt: gotime.Current().Format(),
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
2 years ago
}
if status == CodeEnd {
2 years ago
// 执行成功、提前结束
2 years ago
statusEdit := app.Edit(tx, info.Id).Select("status", "status_desc", "number", "updated_ip", "updated_at", "result").Updates(Task{
2 years ago
Status: TASK_SUCCESS,
StatusDesc: "结束执行",
Number: info.Number + 1,
UpdatedIp: app.OutsideIp,
UpdatedAt: gotime.Current().Format(),
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
2 years ago
}
if status == CodeError {
2 years ago
// 执行失败
2 years ago
statusEdit := app.Edit(tx, info.Id).Select("status_desc", "number", "run_id", "updated_ip", "updated_at", "result").Updates(Task{
2 years ago
StatusDesc: "执行失败",
Number: info.Number + 1,
2 years ago
RunId: only.GetUuId(),
2 years ago
UpdatedIp: app.OutsideIp,
UpdatedAt: gotime.Current().Format(),
Result: desc,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
2 years ago
}
if info.MaxNumber != 0 {
if info.Number+1 >= info.MaxNumber {
// 关闭执行
2 years ago
statusEdit := app.Edit(tx, info.Id).Select("status").Updates(Task{
2 years ago
Status: TASK_TIMEOUT,
})
if statusEdit.RowsAffected == 0 {
log.Println("statusEdit", statusEdit.Error)
}
2 years ago
}
}
}