From 0bf72b945ec6635e1a0323154c032b47b330fb40 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E6=9D=8E=E5=85=89=E6=98=A5?=
Date: Fri, 8 Sep 2023 12:11:38 +0800
Subject: [PATCH] - update jobs
---
utils/gojobs/client.go | 10 ++++----
utils/gojobs/cofing.go | 10 ++++++++
utils/gojobs/gorm_model.go | 52 ++++++++++++++++++++++++++++----------
utils/gojobs/model.go | 38 ++++++++++++++++++++--------
utils/gojobs/redis.go | 4 ++-
utils/gojobs/redis_get.go | 16 +++++++++---
utils/gojobs/run.go | 24 +++++++++++++-----
7 files changed, 115 insertions(+), 39 deletions(-)
diff --git a/utils/gojobs/client.go b/utils/gojobs/client.go
index 1c671a88..eee7eb8f 100644
--- a/utils/gojobs/client.go
+++ b/utils/gojobs/client.go
@@ -19,14 +19,12 @@ type ClientConfig struct {
GormClientFun dorm.GormClientFun // 数据库驱动
RedisClientFun dorm.RedisClientFun // 数据库驱动
RedisPrefixFun redisPrefixFun // 前缀
- SLog *golog.SLog // 日志服务
- CurrentIp string // 当前ip
+ CurrentIp string // 当前IP
}
// Client 实例
type Client struct {
gormClient *dorm.GormClient // 数据库
- sLog *golog.SLog // 日志服务
config struct {
systemHostname string // 主机名
systemOs string // 系统类型
@@ -53,6 +51,10 @@ type Client struct {
cornKeyPrefix string // 任务Key前缀 xxx_cron
cornKeyCustom string // 任务Key自定义
}
+ slog struct {
+ status bool // 状态
+ client *golog.SLog // 日志服务
+ }
}
// NewClient 创建实例
@@ -62,8 +64,6 @@ func NewClient(config *ClientConfig) (*Client, error) {
c := &Client{}
- c.sLog = config.SLog
-
if config.CurrentIp != "" && config.CurrentIp != "0.0.0.0" {
c.config.systemOutsideIp = config.CurrentIp
}
diff --git a/utils/gojobs/cofing.go b/utils/gojobs/cofing.go
index 83f57655..e29b0580 100644
--- a/utils/gojobs/cofing.go
+++ b/utils/gojobs/cofing.go
@@ -4,6 +4,7 @@ import (
"context"
"github.com/dtapps/go-library"
"github.com/dtapps/go-library/utils/goip"
+ "github.com/dtapps/go-library/utils/golog"
"github.com/redis/go-redis/v9"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/host"
@@ -11,6 +12,15 @@ import (
"runtime"
)
+// ConfigSLogClientFun 日志配置
+func (c *Client) ConfigSLogClientFun(sLogFun golog.SLogFun) {
+ sLog := sLogFun()
+ if sLog != nil {
+ c.slog.client = sLog
+ c.slog.status = true
+ }
+}
+
type systemResult struct {
SystemHostname string // 主机名
SystemOs string // 系统类型
diff --git a/utils/gojobs/gorm_model.go b/utils/gojobs/gorm_model.go
index ecc8c9eb..34490cb4 100644
--- a/utils/gojobs/gorm_model.go
+++ b/utils/gojobs/gorm_model.go
@@ -11,7 +11,9 @@ import (
func (c *Client) TaskTakeId(ctx context.Context, tx *gorm.DB, id uint) (result jobs_gorm_model.Task) {
err := tx.Where("id = ?", id).Take(&result).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("编号查询任务:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("编号查询任务:%v", err)
+ }
}
return result
}
@@ -20,7 +22,9 @@ func (c *Client) TaskTakeId(ctx context.Context, tx *gorm.DB, id uint) (result j
func (c *Client) TaskTake(ctx context.Context, tx *gorm.DB, customId string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Take(&result).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("自定义编号查询任务:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("自定义编号查询任务:%v", err)
+ }
}
return result
}
@@ -29,7 +33,9 @@ func (c *Client) TaskTake(ctx context.Context, tx *gorm.DB, customId string) (re
func (c *Client) taskTake(ctx context.Context, tx *gorm.DB, customId, status string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("自定义编号加状态查询任务:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("自定义编号加状态查询任务:%v", err)
+ }
}
return result
}
@@ -63,7 +69,9 @@ func (c *Client) TaskTakeWait(ctx context.Context, tx *gorm.DB, customId string)
func (c *Client) TaskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("查询单任务:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("查询单任务:%v", err)
+ }
}
return result
}
@@ -72,7 +80,9 @@ func (c *Client) TaskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type s
func (c *Client) taskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type, status string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("查询单任务:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("查询单任务:%v", err)
+ }
}
return result
}
@@ -106,7 +116,9 @@ func (c *Client) TaskTypeTakeWait(ctx context.Context, tx *gorm.DB, customId, Ty
func (c *Client) TaskFindAll(ctx context.Context, tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task) {
err := tx.Where("frequency = ?", frequency).Order("id asc").Find(&results).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err)
+ }
}
return results
}
@@ -115,7 +127,9 @@ func (c *Client) TaskFindAll(ctx context.Context, tx *gorm.DB, frequency int64)
func (c *Client) TaskFindAllType(ctx context.Context, tx *gorm.DB, Type string, frequency int64) (results []jobs_gorm_model.Task) {
err := tx.Where("type = ?", Type).Where("frequency = ?", frequency).Order("id asc").Find(&results).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err)
+ }
}
return results
}
@@ -124,7 +138,9 @@ func (c *Client) TaskFindAllType(ctx context.Context, tx *gorm.DB, Type string,
func (c *Client) taskFindAll(ctx context.Context, tx *gorm.DB, frequency int64, status string) (results []jobs_gorm_model.Task) {
err := tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err)
+ }
}
return results
}
@@ -134,13 +150,17 @@ func (c *Client) taskFindAllType(ctx context.Context, tx *gorm.DB, Type string,
if frequency == 0 {
err := tx.Where("type = ?", Type).Where("status = ?", status).Order("id asc").Find(&results).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err)
+ }
}
return results
}
err := tx.Where("type = ?", Type).Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err)
+ }
}
return results
}
@@ -204,7 +224,9 @@ func (c *Client) StartTask(ctx context.Context, tx *gorm.DB, id uint) error {
StatusDesc: "启动任务",
}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("任务启动失败:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("任务启动失败:%v", err)
+ }
}
return err
}
@@ -221,7 +243,9 @@ func (c *Client) StartTaskCustom(ctx context.Context, tx *gorm.DB, customId stri
StatusDesc: "启动任务",
}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("任务启动自定义失败:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("任务启动自定义失败:%v", err)
+ }
}
return err
}
@@ -240,7 +264,9 @@ func (c *Client) UpdateFrequency(ctx context.Context, tx *gorm.DB, id uint, freq
NextRunTime: gotime.Current().AfterSeconds(frequency).Time,
}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("更新任务频率失败:%v", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("更新任务频率失败:%v", err)
+ }
}
return err
}
diff --git a/utils/gojobs/model.go b/utils/gojobs/model.go
index 8e22ce17..df2389f9 100644
--- a/utils/gojobs/model.go
+++ b/utils/gojobs/model.go
@@ -11,7 +11,9 @@ import (
func (c *Client) autoMigrateTask(ctx context.Context) {
err := c.gormClient.GetDb().AutoMigrate(&jobs_gorm_model.Task{})
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("创建模型:%s", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("创建模型:%s", err)
+ }
}
}
@@ -19,7 +21,9 @@ func (c *Client) autoMigrateTask(ctx context.Context) {
func (c *Client) autoMigrateTaskLog(ctx context.Context) {
err := c.gormClient.GetDb().AutoMigrate(&jobs_gorm_model.TaskLog{})
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("创建模型:%s", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("创建模型:%s", err)
+ }
}
}
@@ -27,7 +31,9 @@ func (c *Client) autoMigrateTaskLog(ctx context.Context) {
func (c *Client) GormTaskLogDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ }
}
return err
}
@@ -36,7 +42,9 @@ func (c *Client) GormTaskLogDelete(ctx context.Context, hour int64) error {
func (c *Client) GormTaskLogInDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_IN).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ }
}
return err
}
@@ -45,7 +53,9 @@ func (c *Client) GormTaskLogInDelete(ctx context.Context, hour int64) error {
func (c *Client) GormTaskLogSuccessDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_SUCCESS).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ }
}
return err
}
@@ -54,7 +64,9 @@ func (c *Client) GormTaskLogSuccessDelete(ctx context.Context, hour int64) error
func (c *Client) GormTaskLogErrorDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_ERROR).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ }
}
return err
}
@@ -63,7 +75,9 @@ func (c *Client) GormTaskLogErrorDelete(ctx context.Context, hour int64) error {
func (c *Client) GormTaskLogTimeoutDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_TIMEOUT).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ }
}
return err
}
@@ -72,7 +86,9 @@ func (c *Client) GormTaskLogTimeoutDelete(ctx context.Context, hour int64) error
func (c *Client) GormTaskLogWaitDelete(ctx context.Context, hour int64) error {
err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_WAIT).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err)
+ }
}
return err
}
@@ -101,8 +117,10 @@ func (c *Client) GormTaskLogRecord(ctx context.Context, task jobs_gorm_model.Tas
}
err := c.gormClient.GetDb().Create(&taskLog).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("记录失败:%s", err)
- c.sLog.WithTraceId(ctx).Errorf("记录数据:%+v", taskLog)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("记录失败:%s", err)
+ c.slog.client.WithTraceId(ctx).Errorf("记录数据:%+v", taskLog)
+ }
}
}
diff --git a/utils/gojobs/redis.go b/utils/gojobs/redis.go
index 71a6208e..40126db0 100644
--- a/utils/gojobs/redis.go
+++ b/utils/gojobs/redis.go
@@ -12,7 +12,9 @@ import (
func (c *Client) Publish(ctx context.Context, channel string, message interface{}) error {
publish, err := c.cache.redisClient.Publish(ctx, channel, message).Result()
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("发布失败:%s %s %v %s", channel, message, publish, err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("发布失败:%s %s %v %s", channel, message, publish, err)
+ }
}
return err
}
diff --git a/utils/gojobs/redis_get.go b/utils/gojobs/redis_get.go
index 29a2e7ad..103ac1b3 100644
--- a/utils/gojobs/redis_get.go
+++ b/utils/gojobs/redis_get.go
@@ -38,7 +38,9 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
if appointIpStatus {
// 判断是否指定某ip执行
if gostring.Contains(workers[0], currentIp) {
- c.sLog.WithTraceId(ctx).Info("只有一个客户端在线,指定某ip执行:", workers[0], currentIp)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Info("只有一个客户端在线,指定某ip执行:", workers[0], currentIp)
+ }
return workers[0], nil
}
return "", errors.New(fmt.Sprintf("需要执行的[%s]客户端不在线", currentIp))
@@ -50,7 +52,9 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
if appointIpStatus {
for wk, wv := range workers {
if gostring.Contains(wv, currentIp) {
- c.sLog.WithTraceId(ctx).Info("优先处理指定某ip执行:", workers[wk], currentIp)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Info("优先处理指定某ip执行:", workers[wk], currentIp)
+ }
return workers[wk], nil
}
}
@@ -61,7 +65,9 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
if address == "" {
return address, errors.New("获取执行的客户端异常")
}
- c.sLog.WithTraceId(ctx).Info("随机返回一个:", address, currentIp)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Info("随机返回一个:", address, currentIp)
+ }
return address, nil
}
}
@@ -72,7 +78,9 @@ func (c *Client) GetSubscribeClientList(ctx context.Context) (client []string, e
// 查询活跃的channel
client, err = c.cache.redisClient.PubSubChannels(ctx, c.cache.cornKeyPrefix+"_*").Result()
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("获取在线的客户端失败:%s,%v", c.cache.cornKeyPrefix+"_*", err)
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("获取在线的客户端失败:%s,%v", c.cache.cornKeyPrefix+"_*", err)
+ }
}
return client, err
diff --git a/utils/gojobs/run.go b/utils/gojobs/run.go
index 97f76636..1cefc4ac 100644
--- a/utils/gojobs/run.go
+++ b/utils/gojobs/run.go
@@ -77,7 +77,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
runId := gotrace_id.GetTraceIdContext(ctx)
if runId == "" {
- c.sLog.WithTraceId(ctx).Error("上下文没有跟踪编号")
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Error("上下文没有跟踪编号")
+ }
return
}
@@ -93,7 +95,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
+ }
}
return
case CodeSuccess:
@@ -109,7 +113,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
+ }
}
case CodeEnd:
// 执行成功、提前结束
@@ -124,7 +130,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().Time,
}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
+ }
}
case CodeError:
// 执行失败
@@ -139,7 +147,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
+ }
}
}
@@ -152,7 +162,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
Status: TASK_TIMEOUT,
}).Error
if err != nil {
- c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
+ if c.slog.status {
+ c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error())
+ }
}
}
}