diff --git a/utils/gojobs/client.go b/utils/gojobs/client.go index 1c671a88..eee7eb8f 100644 --- a/utils/gojobs/client.go +++ b/utils/gojobs/client.go @@ -19,14 +19,12 @@ type ClientConfig struct { GormClientFun dorm.GormClientFun // 数据库驱动 RedisClientFun dorm.RedisClientFun // 数据库驱动 RedisPrefixFun redisPrefixFun // 前缀 - SLog *golog.SLog // 日志服务 - CurrentIp string // 当前ip + CurrentIp string // 当前IP } // Client 实例 type Client struct { gormClient *dorm.GormClient // 数据库 - sLog *golog.SLog // 日志服务 config struct { systemHostname string // 主机名 systemOs string // 系统类型 @@ -53,6 +51,10 @@ type Client struct { cornKeyPrefix string // 任务Key前缀 xxx_cron cornKeyCustom string // 任务Key自定义 } + slog struct { + status bool // 状态 + client *golog.SLog // 日志服务 + } } // NewClient 创建实例 @@ -62,8 +64,6 @@ func NewClient(config *ClientConfig) (*Client, error) { c := &Client{} - c.sLog = config.SLog - if config.CurrentIp != "" && config.CurrentIp != "0.0.0.0" { c.config.systemOutsideIp = config.CurrentIp } diff --git a/utils/gojobs/cofing.go b/utils/gojobs/cofing.go index 83f57655..e29b0580 100644 --- a/utils/gojobs/cofing.go +++ b/utils/gojobs/cofing.go @@ -4,6 +4,7 @@ import ( "context" "github.com/dtapps/go-library" "github.com/dtapps/go-library/utils/goip" + "github.com/dtapps/go-library/utils/golog" "github.com/redis/go-redis/v9" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/host" @@ -11,6 +12,15 @@ import ( "runtime" ) +// ConfigSLogClientFun 日志配置 +func (c *Client) ConfigSLogClientFun(sLogFun golog.SLogFun) { + sLog := sLogFun() + if sLog != nil { + c.slog.client = sLog + c.slog.status = true + } +} + type systemResult struct { SystemHostname string // 主机名 SystemOs string // 系统类型 diff --git a/utils/gojobs/gorm_model.go b/utils/gojobs/gorm_model.go index ecc8c9eb..34490cb4 100644 --- a/utils/gojobs/gorm_model.go +++ b/utils/gojobs/gorm_model.go @@ -11,7 +11,9 @@ import ( func (c *Client) TaskTakeId(ctx context.Context, tx *gorm.DB, id uint) (result jobs_gorm_model.Task) { err := tx.Where("id = ?", id).Take(&result).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("编号查询任务:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("编号查询任务:%v", err) + } } return result } @@ -20,7 +22,9 @@ func (c *Client) TaskTakeId(ctx context.Context, tx *gorm.DB, id uint) (result j func (c *Client) TaskTake(ctx context.Context, tx *gorm.DB, customId string) (result jobs_gorm_model.Task) { err := tx.Where("custom_id = ?", customId).Take(&result).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("自定义编号查询任务:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("自定义编号查询任务:%v", err) + } } return result } @@ -29,7 +33,9 @@ func (c *Client) TaskTake(ctx context.Context, tx *gorm.DB, customId string) (re func (c *Client) taskTake(ctx context.Context, tx *gorm.DB, customId, status string) (result jobs_gorm_model.Task) { err := tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("自定义编号加状态查询任务:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("自定义编号加状态查询任务:%v", err) + } } return result } @@ -63,7 +69,9 @@ func (c *Client) TaskTakeWait(ctx context.Context, tx *gorm.DB, customId string) func (c *Client) TaskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task) { err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("查询单任务:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("查询单任务:%v", err) + } } return result } @@ -72,7 +80,9 @@ func (c *Client) TaskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type s func (c *Client) taskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type, status string) (result jobs_gorm_model.Task) { err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("查询单任务:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("查询单任务:%v", err) + } } return result } @@ -106,7 +116,9 @@ func (c *Client) TaskTypeTakeWait(ctx context.Context, tx *gorm.DB, customId, Ty func (c *Client) TaskFindAll(ctx context.Context, tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task) { err := tx.Where("frequency = ?", frequency).Order("id asc").Find(&results).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err) + } } return results } @@ -115,7 +127,9 @@ func (c *Client) TaskFindAll(ctx context.Context, tx *gorm.DB, frequency int64) func (c *Client) TaskFindAllType(ctx context.Context, tx *gorm.DB, Type string, frequency int64) (results []jobs_gorm_model.Task) { err := tx.Where("type = ?", Type).Where("frequency = ?", frequency).Order("id asc").Find(&results).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err) + } } return results } @@ -124,7 +138,9 @@ func (c *Client) TaskFindAllType(ctx context.Context, tx *gorm.DB, Type string, func (c *Client) taskFindAll(ctx context.Context, tx *gorm.DB, frequency int64, status string) (results []jobs_gorm_model.Task) { err := tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err) + } } return results } @@ -134,13 +150,17 @@ func (c *Client) taskFindAllType(ctx context.Context, tx *gorm.DB, Type string, if frequency == 0 { err := tx.Where("type = ?", Type).Where("status = ?", status).Order("id asc").Find(&results).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err) + } } return results } err := tx.Where("type = ?", Type).Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("查询多任务:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("查询多任务:%v", err) + } } return results } @@ -204,7 +224,9 @@ func (c *Client) StartTask(ctx context.Context, tx *gorm.DB, id uint) error { StatusDesc: "启动任务", }).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("任务启动失败:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("任务启动失败:%v", err) + } } return err } @@ -221,7 +243,9 @@ func (c *Client) StartTaskCustom(ctx context.Context, tx *gorm.DB, customId stri StatusDesc: "启动任务", }).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("任务启动自定义失败:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("任务启动自定义失败:%v", err) + } } return err } @@ -240,7 +264,9 @@ func (c *Client) UpdateFrequency(ctx context.Context, tx *gorm.DB, id uint, freq NextRunTime: gotime.Current().AfterSeconds(frequency).Time, }).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("更新任务频率失败:%v", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("更新任务频率失败:%v", err) + } } return err } diff --git a/utils/gojobs/model.go b/utils/gojobs/model.go index 8e22ce17..df2389f9 100644 --- a/utils/gojobs/model.go +++ b/utils/gojobs/model.go @@ -11,7 +11,9 @@ import ( func (c *Client) autoMigrateTask(ctx context.Context) { err := c.gormClient.GetDb().AutoMigrate(&jobs_gorm_model.Task{}) if err != nil { - c.sLog.WithTraceId(ctx).Errorf("创建模型:%s", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("创建模型:%s", err) + } } } @@ -19,7 +21,9 @@ func (c *Client) autoMigrateTask(ctx context.Context) { func (c *Client) autoMigrateTaskLog(ctx context.Context) { err := c.gormClient.GetDb().AutoMigrate(&jobs_gorm_model.TaskLog{}) if err != nil { - c.sLog.WithTraceId(ctx).Errorf("创建模型:%s", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("创建模型:%s", err) + } } } @@ -27,7 +31,9 @@ func (c *Client) autoMigrateTaskLog(ctx context.Context) { func (c *Client) GormTaskLogDelete(ctx context.Context, hour int64) error { err := c.gormClient.GetDb().Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err) + } } return err } @@ -36,7 +42,9 @@ func (c *Client) GormTaskLogDelete(ctx context.Context, hour int64) error { func (c *Client) GormTaskLogInDelete(ctx context.Context, hour int64) error { err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_IN).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err) + } } return err } @@ -45,7 +53,9 @@ func (c *Client) GormTaskLogInDelete(ctx context.Context, hour int64) error { func (c *Client) GormTaskLogSuccessDelete(ctx context.Context, hour int64) error { err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_SUCCESS).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err) + } } return err } @@ -54,7 +64,9 @@ func (c *Client) GormTaskLogSuccessDelete(ctx context.Context, hour int64) error func (c *Client) GormTaskLogErrorDelete(ctx context.Context, hour int64) error { err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_ERROR).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err) + } } return err } @@ -63,7 +75,9 @@ func (c *Client) GormTaskLogErrorDelete(ctx context.Context, hour int64) error { func (c *Client) GormTaskLogTimeoutDelete(ctx context.Context, hour int64) error { err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_TIMEOUT).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err) + } } return err } @@ -72,7 +86,9 @@ func (c *Client) GormTaskLogTimeoutDelete(ctx context.Context, hour int64) error func (c *Client) GormTaskLogWaitDelete(ctx context.Context, hour int64) error { err := c.gormClient.GetDb().Where("task_result_status = ?", TASK_WAIT).Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("删除失败:%s", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("删除失败:%s", err) + } } return err } @@ -101,8 +117,10 @@ func (c *Client) GormTaskLogRecord(ctx context.Context, task jobs_gorm_model.Tas } err := c.gormClient.GetDb().Create(&taskLog).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("记录失败:%s", err) - c.sLog.WithTraceId(ctx).Errorf("记录数据:%+v", taskLog) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("记录失败:%s", err) + c.slog.client.WithTraceId(ctx).Errorf("记录数据:%+v", taskLog) + } } } diff --git a/utils/gojobs/redis.go b/utils/gojobs/redis.go index 71a6208e..40126db0 100644 --- a/utils/gojobs/redis.go +++ b/utils/gojobs/redis.go @@ -12,7 +12,9 @@ import ( func (c *Client) Publish(ctx context.Context, channel string, message interface{}) error { publish, err := c.cache.redisClient.Publish(ctx, channel, message).Result() if err != nil { - c.sLog.WithTraceId(ctx).Errorf("发布失败:%s %s %v %s", channel, message, publish, err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("发布失败:%s %s %v %s", channel, message, publish, err) + } } return err } diff --git a/utils/gojobs/redis_get.go b/utils/gojobs/redis_get.go index 29a2e7ad..103ac1b3 100644 --- a/utils/gojobs/redis_get.go +++ b/utils/gojobs/redis_get.go @@ -38,7 +38,9 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_ if appointIpStatus { // 判断是否指定某ip执行 if gostring.Contains(workers[0], currentIp) { - c.sLog.WithTraceId(ctx).Info("只有一个客户端在线,指定某ip执行:", workers[0], currentIp) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Info("只有一个客户端在线,指定某ip执行:", workers[0], currentIp) + } return workers[0], nil } return "", errors.New(fmt.Sprintf("需要执行的[%s]客户端不在线", currentIp)) @@ -50,7 +52,9 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_ if appointIpStatus { for wk, wv := range workers { if gostring.Contains(wv, currentIp) { - c.sLog.WithTraceId(ctx).Info("优先处理指定某ip执行:", workers[wk], currentIp) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Info("优先处理指定某ip执行:", workers[wk], currentIp) + } return workers[wk], nil } } @@ -61,7 +65,9 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_ if address == "" { return address, errors.New("获取执行的客户端异常") } - c.sLog.WithTraceId(ctx).Info("随机返回一个:", address, currentIp) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Info("随机返回一个:", address, currentIp) + } return address, nil } } @@ -72,7 +78,9 @@ func (c *Client) GetSubscribeClientList(ctx context.Context) (client []string, e // 查询活跃的channel client, err = c.cache.redisClient.PubSubChannels(ctx, c.cache.cornKeyPrefix+"_*").Result() if err != nil { - c.sLog.WithTraceId(ctx).Errorf("获取在线的客户端失败:%s,%v", c.cache.cornKeyPrefix+"_*", err) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("获取在线的客户端失败:%s,%v", c.cache.cornKeyPrefix+"_*", err) + } } return client, err diff --git a/utils/gojobs/run.go b/utils/gojobs/run.go index 97f76636..1cefc4ac 100644 --- a/utils/gojobs/run.go +++ b/utils/gojobs/run.go @@ -77,7 +77,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC runId := gotrace_id.GetTraceIdContext(ctx) if runId == "" { - c.sLog.WithTraceId(ctx).Error("上下文没有跟踪编号") + if c.slog.status { + c.slog.client.WithTraceId(ctx).Error("上下文没有跟踪编号") + } return } @@ -93,7 +95,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time, }).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error()) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error()) + } } return case CodeSuccess: @@ -109,7 +113,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time, }).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error()) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error()) + } } case CodeEnd: // 执行成功、提前结束 @@ -124,7 +130,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC NextRunTime: gotime.Current().Time, }).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error()) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error()) + } } case CodeError: // 执行失败 @@ -139,7 +147,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time, }).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error()) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error()) + } } } @@ -152,7 +162,9 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC Status: TASK_TIMEOUT, }).Error if err != nil { - c.sLog.WithTraceId(ctx).Errorf("保存失败:%s", err.Error()) + if c.slog.status { + c.slog.client.WithTraceId(ctx).Errorf("保存失败:%s", err.Error()) + } } } }