- update jobs

master
李光春 8 months ago
parent b612eab3d8
commit 0bf72b945e

@ -19,14 +19,12 @@ type ClientConfig struct {
GormClientFun dorm.GormClientFun // 数据库驱动
RedisClientFun dorm.RedisClientFun // 数据库驱动
RedisPrefixFun redisPrefixFun // 前缀
SLog *golog.SLog // 日志服务
CurrentIp string // 当前ip
CurrentIp string // 当前IP
}
// Client 实例
type Client struct {
gormClient *dorm.GormClient // 数据库
sLog *golog.SLog // 日志服务
config struct {
systemHostname string // 主机名
systemOs string // 系统类型
@ -53,6 +51,10 @@ type Client struct {
cornKeyPrefix string // 任务Key前缀 xxx_cron
cornKeyCustom string // 任务Key自定义
}
slog struct {
status bool // 状态
client *golog.SLog // 日志服务
}
}
// NewClient 创建实例
@ -62,8 +64,6 @@ func NewClient(config *ClientConfig) (*Client, error) {
c := &Client{}
c.sLog = config.SLog
if config.CurrentIp != "" && config.CurrentIp != "0.0.0.0" {
c.config.systemOutsideIp = config.CurrentIp
}

@ -4,6 +4,7 @@ import (
"context"
"github.com/dtapps/go-library"
"github.com/dtapps/go-library/utils/goip"
"github.com/dtapps/go-library/utils/golog"
"github.com/redis/go-redis/v9"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/host"
@ -11,6 +12,15 @@ import (
"runtime"
)
// ConfigSLogClientFun 日志配置
func (c *Client) ConfigSLogClientFun(sLogFun golog.SLogFun) {
sLog := sLogFun()
if sLog != nil {
c.slog.client = sLog
c.slog.status = true
}
}
type systemResult struct {
SystemHostname string // 主机名
SystemOs string // 系统类型

@ -11,7 +11,9 @@ import (
func (c *Client) TaskTakeId(ctx context.Context, tx *gorm.DB, id uint) (result jobs_gorm_model.Task) {
err := tx.Where("id = ?", id).Take(&result).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("编号查询任务:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("编号查询任务:%v", err)
}
}
return result
}
@ -20,7 +22,9 @@ func (c *Client) TaskTakeId(ctx context.Context, tx *gorm.DB, id uint) (result j
func (c *Client) TaskTake(ctx context.Context, tx *gorm.DB, customId string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Take(&result).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("自定义编号查询任务:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("自定义编号查询任务:%v", err)
}
}
return result
}
@ -29,7 +33,9 @@ func (c *Client) TaskTake(ctx context.Context, tx *gorm.DB, customId string) (re
func (c *Client) taskTake(ctx context.Context, tx *gorm.DB, customId, status string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("自定义编号加状态查询任务:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("自定义编号加状态查询任务:%v", err)
}
}
return result
}
@ -63,7 +69,9 @@ func (c *Client) TaskTakeWait(ctx context.Context, tx *gorm.DB, customId string)
func (c *Client) TaskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("查询单任务:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("查询单任务:%v", err)
}
}
return result
}
@ -72,7 +80,9 @@ func (c *Client) TaskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type s
func (c *Client) taskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type, status string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("查询单任务:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("查询单任务:%v", err)
}
}
return result
}
@ -106,7 +116,9 @@ func (c *Client) TaskTypeTakeWait(ctx context.Context, tx *gorm.DB, customId, Ty
func (c *Client) TaskFindAll(ctx context.Context, tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task) {
err := tx.Where("frequency = ?", frequency).Order("id asc").Find(&results).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err)
}
}
return results
}
@ -115,7 +127,9 @@ func (c *Client) TaskFindAll(ctx context.Context, tx *gorm.DB, frequency int64)
func (c *Client) TaskFindAllType(ctx context.Context, tx *gorm.DB, Type string, frequency int64) (results []jobs_gorm_model.Task) {
err := tx.Where("type = ?", Type).Where("frequency = ?", frequency).Order("id asc").Find(&results).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err)
}
}
return results
}
@ -124,7 +138,9 @@ func (c *Client) TaskFindAllType(ctx context.Context, tx *gorm.DB, Type string,
func (c *Client) taskFindAll(ctx context.Context, tx *gorm.DB, frequency int64, status string) (results []jobs_gorm_model.Task) {
err := tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err)
}
}
return results
}
@ -134,13 +150,17 @@ func (c *Client) taskFindAllType(ctx context.Context, tx *gorm.DB, Type string,
if frequency == 0 {
err := tx.Where("type = ?", Type).Where("status = ?", status).Order("id asc").Find(&results).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err)
}
}
return results
}
err := tx.Where("type = ?", Type).Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err)
}
}
return results
}
@ -204,7 +224,9 @@ func (c *Client) StartTask(ctx context.Context, tx *gorm.DB, id uint) error {
StatusDesc: "启动任务",
}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("任务启动失败:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("任务启动失败:%v", err)
}
}
return err
}
@ -221,7 +243,9 @@ func (c *Client) StartTaskCustom(ctx context.Context, tx *gorm.DB, customId stri
StatusDesc: "启动任务",
}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("任务启动自定义失败:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("任务启动自定义失败:%v", err)
}
}
return err
}
@ -240,7 +264,9 @@ func (c *Client) UpdateFrequency(ctx context.Context, tx *gorm.DB, id uint, freq
NextRunTime: gotime.Current().AfterSeconds(frequency).Time,
}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("更新任务频率失败:%v", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("更新任务频率失败:%v", err)
}
}
return err
}

@ -11,7 +11,9 @@ import (
func (c *Client) autoMigrateTask(ctx context.Context) {
err := c.gormClient.GetDb().AutoMigrate(&jobs_gorm_model.Task{})
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("创建模型:%s", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("创建模型:%s", err)
}
}
}
@ -19,7 +21,9 @@ func (c *Client) autoMigrateTask(ctx context.Context) {
func (c *Client) autoMigrateTaskLog(ctx context.Context) {
err := c.gormClient.GetDb().AutoMigrate(&jobs_gorm_model.TaskLog{})
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("创建模型:%s", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("创建模型:%s", err)
}
}
}
@ -27,7 +31,9 @@ func (c *Client) autoMigrateTaskLog(ctx context.Context) {
func (c *Client) GormTaskLogDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
}
}
return err
}
@ -36,7 +42,9 @@ func (c *Client) GormTaskLogDelete(ctx context.Context, hour int64) error {
func (c *Client) GormTaskLogInDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_IN).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
}
}
return err
}
@ -45,7 +53,9 @@ func (c *Client) GormTaskLogInDelete(ctx context.Context, hour int64) error {
func (c *Client) GormTaskLogSuccessDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_SUCCESS).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
}
}
return err
}
@ -54,7 +64,9 @@ func (c *Client) GormTaskLogSuccessDelete(ctx context.Context, hour int64) error
func (c *Client) GormTaskLogErrorDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_ERROR).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
}
}
return err
}
@ -63,7 +75,9 @@ func (c *Client) GormTaskLogErrorDelete(ctx context.Context, hour int64) error {
func (c *Client) GormTaskLogTimeoutDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_TIMEOUT).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
}
}
return err
}
@ -72,7 +86,9 @@ func (c *Client) GormTaskLogTimeoutDelete(ctx context.Context, hour int64) error
func (c *Client) GormTaskLogWaitDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_WAIT).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
}
}
return err
}
@ -101,8 +117,10 @@ func (c *Client) GormTaskLogRecord(ctx context.Context, task jobs_gorm_model.Tas
}
err := c.gormClient.GetDb().Create(&taskLog).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("记录失败:%s", err)
c.sLog.WithTraceId(ctx).Errorf("记录数据:%+v", taskLog)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("记录失败:%s", err)
c.slog.client.WithTraceId(ctx).Errorf("记录数据:%+v", taskLog)
}
}
}

@ -12,7 +12,9 @@ import (
func (c *Client) Publish(ctx context.Context, channel string, message interface{}) error {
publish, err := c.cache.redisClient.Publish(ctx, channel, message).Result()
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("发布失败:%s %s %v %s", channel, message, publish, err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("发布失败:%s %s %v %s", channel, message, publish, err)
}
}
return err
}

@ -38,7 +38,9 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
if appointIpStatus {
// 判断是否指定某ip执行
if gostring.Contains(workers[0], currentIp) {
c.sLog.WithTraceId(ctx).Info("只有一个客户端在线指定某ip执行", workers[0], currentIp)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Info("只有一个客户端在线指定某ip执行", workers[0], currentIp)
}
return workers[0], nil
}
return "", errors.New(fmt.Sprintf("需要执行的[%s]客户端不在线", currentIp))
@ -50,7 +52,9 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
if appointIpStatus {
for wk, wv := range workers {
if gostring.Contains(wv, currentIp) {
c.sLog.WithTraceId(ctx).Info("优先处理指定某ip执行", workers[wk], currentIp)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Info("优先处理指定某ip执行", workers[wk], currentIp)
}
return workers[wk], nil
}
}
@ -61,7 +65,9 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
if address == "" {
return address, errors.New("获取执行的客户端异常")
}
c.sLog.WithTraceId(ctx).Info("随机返回一个:", address, currentIp)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Info("随机返回一个:", address, currentIp)
}
return address, nil
}
}
@ -72,7 +78,9 @@ func (c *Client) GetSubscribeClientList(ctx context.Context) (client []string, e
// 查询活跃的channel
client, err = c.cache.redisClient.PubSubChannels(ctx, c.cache.cornKeyPrefix+"_*").Result()
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("获取在线的客户端失败:%s%v", c.cache.cornKeyPrefix+"_*", err)
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("获取在线的客户端失败:%s%v", c.cache.cornKeyPrefix+"_*", err)
}
}
return client, err

@ -77,7 +77,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
runId := gotrace_id.GetTraceIdContext(ctx)
if runId == "" {
c.sLog.WithTraceId(ctx).Error("上下文没有跟踪编号")
if c.slog.status {
c.slog.client.WithTraceId(ctx).Error("上下文没有跟踪编号")
}
return
}
@ -93,7 +95,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
}
}
return
case CodeSuccess:
@ -109,7 +113,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
}
}
case CodeEnd:
// 执行成功、提前结束
@ -124,7 +130,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().Time,
}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
}
}
case CodeError:
// 执行失败
@ -139,7 +147,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
}
}
}
@ -152,7 +162,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
Status: TASK_TIMEOUT,
}).Error
if err != nil {
c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
if c.slog.status {
c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
}
}
}
}

Loading…
Cancel
Save