From 3d5d7c3a7de829513e1f0f77a0b4c6318b68b85f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=85=89=E6=98=A5?= Date: Wed, 21 Sep 2022 16:22:54 +0800 Subject: [PATCH] - update context --- client.go | 6 --- const.go | 2 +- create_in.go | 4 +- gorm_model.go | 142 +++++++++++++++++++++++++++++++------------------- redis.go | 4 +- redis_get.go | 21 ++++---- run.go | 12 ++--- 7 files changed, 109 insertions(+), 82 deletions(-) diff --git a/client.go b/client.go index 88acaee..b408f6d 100644 --- a/client.go +++ b/client.go @@ -39,8 +39,6 @@ type Client struct { goVersion string // go版本 sdkVersion string // sdk版本 systemOutsideIp string // 外网ip - debug bool // 日志开关 - jsonStatus bool // json状态 } cache struct { redisClient *dorm.RedisClient // 数据库 @@ -65,10 +63,6 @@ func NewClient(config *ClientConfig) (*Client, error) { c.zapLog = config.ZapLog - c.config.debug = config.Debug - - c.config.jsonStatus = config.JsonStatus - // 配置外网ip if config.CurrentIp == "" { config.CurrentIp = goip.GetOutsideIp(ctx) diff --git a/const.go b/const.go index fea82f8..73ce8e4 100644 --- a/const.go +++ b/const.go @@ -1,6 +1,6 @@ package gojobs const ( - Version = "1.0.101" + Version = "1.0.102" SpecifyIpNull = "0.0.0.0" ) diff --git a/create_in.go b/create_in.go index 7c3f9da..ddaf148 100644 --- a/create_in.go +++ b/create_in.go @@ -62,7 +62,7 @@ type ConfigCreateInCustomIdOnly struct { // CreateInCustomIdOnly 创建正在运行唯一任务 func (c *Client) CreateInCustomIdOnly(ctx context.Context, config *ConfigCreateInCustomIdOnly) error { - query := c.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) + query := c.TaskTypeTakeIn(ctx, config.Tx, config.CustomId, config.Type) if query.Id != 0 { return TaskIsExist } @@ -145,7 +145,7 @@ type ConfigCreateInCustomIdMaxNumberOnly struct { // CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 func (c *Client) CreateInCustomIdMaxNumberOnly(ctx context.Context, config *ConfigCreateInCustomIdMaxNumberOnly) error { - query := c.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) + query := c.TaskTypeTakeIn(ctx, config.Tx, config.CustomId, config.Type) if query.Id != 0 { return TaskIsExist } diff --git a/gorm_model.go b/gorm_model.go index df82d68..4e261c2 100644 --- a/gorm_model.go +++ b/gorm_model.go @@ -1,140 +1,166 @@ package gojobs import ( + "context" "go.dtapp.net/gojobs/jobs_gorm_model" "gorm.io/gorm" ) -// TaskTakeId 查询单任务 -func (c *Client) TaskTakeId(tx *gorm.DB, id uint) (result jobs_gorm_model.Task) { - tx.Where("id = ?", id).Take(&result) +// TaskTakeId 编号查询任务 +func (c *Client) TaskTakeId(ctx context.Context, tx *gorm.DB, id uint) (result jobs_gorm_model.Task) { + err := tx.Where("id = ?", id).Take(&result).Error + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]编号查询任务:%v", err) + } return result } -// TaskTake 查询单任务 -func (c *Client) TaskTake(tx *gorm.DB, customId string) (result jobs_gorm_model.Task) { - tx.Where("custom_id = ?", customId).Take(&result) +// TaskTake 自定义编号查询任务 +func (c *Client) TaskTake(ctx context.Context, tx *gorm.DB, customId string) (result jobs_gorm_model.Task) { + err := tx.Where("custom_id = ?", customId).Take(&result).Error + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]自定义编号查询任务:%v", err) + } return result } -// 查询单任务 -func (c *Client) taskTake(tx *gorm.DB, customId, status string) (result jobs_gorm_model.Task) { - tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result) +// 自定义编号加状态查询任务 +func (c *Client) taskTake(ctx context.Context, tx *gorm.DB, customId, status string) (result jobs_gorm_model.Task) { + err := tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result).Error + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]自定义编号加状态查询任务:%v", err) + } return result } // TaskTakeIn 查询单任务 - 任务运行 -func (c *Client) TaskTakeIn(tx *gorm.DB, customId string) jobs_gorm_model.Task { - return c.taskTake(tx, customId, TASK_IN) +func (c *Client) TaskTakeIn(ctx context.Context, tx *gorm.DB, customId string) jobs_gorm_model.Task { + return c.taskTake(ctx, tx, customId, TASK_IN) } // TaskTakeSuccess 查询单任务 - 任务完成 -func (c *Client) TaskTakeSuccess(tx *gorm.DB, customId string) jobs_gorm_model.Task { - return c.taskTake(tx, customId, TASK_SUCCESS) +func (c *Client) TaskTakeSuccess(ctx context.Context, tx *gorm.DB, customId string) jobs_gorm_model.Task { + return c.taskTake(ctx, tx, customId, TASK_SUCCESS) } // TaskTakeError 查询单任务 - 任务异常 -func (c *Client) TaskTakeError(tx *gorm.DB, customId string) jobs_gorm_model.Task { - return c.taskTake(tx, customId, TASK_ERROR) +func (c *Client) TaskTakeError(ctx context.Context, tx *gorm.DB, customId string) jobs_gorm_model.Task { + return c.taskTake(ctx, tx, customId, TASK_ERROR) } // TaskTakeTimeout 查询单任务 - 任务超时 -func (c *Client) TaskTakeTimeout(tx *gorm.DB, customId string) jobs_gorm_model.Task { - return c.taskTake(tx, customId, TASK_TIMEOUT) +func (c *Client) TaskTakeTimeout(ctx context.Context, tx *gorm.DB, customId string) jobs_gorm_model.Task { + return c.taskTake(ctx, tx, customId, TASK_TIMEOUT) } // TaskTakeWait 查询单任务 - 任务等待 -func (c *Client) TaskTakeWait(tx *gorm.DB, customId string) jobs_gorm_model.Task { - return c.taskTake(tx, customId, TASK_WAIT) +func (c *Client) TaskTakeWait(ctx context.Context, tx *gorm.DB, customId string) jobs_gorm_model.Task { + return c.taskTake(ctx, tx, customId, TASK_WAIT) } // TaskTypeTake 查询单任务 -func (c *Client) TaskTypeTake(tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task) { - tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result) +func (c *Client) TaskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task) { + err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result).Error + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询单任务:%v", err) + } return result } // 查询单任务 -func (c *Client) taskTypeTake(tx *gorm.DB, customId, Type, status string) (result jobs_gorm_model.Task) { - tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result) +func (c *Client) taskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type, status string) (result jobs_gorm_model.Task) { + err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result).Error + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询单任务:%v", err) + } return result } // TaskTypeTakeIn 查询单任务 - 任务运行 -func (c *Client) TaskTypeTakeIn(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { - return c.taskTypeTake(tx, customId, Type, TASK_IN) +func (c *Client) TaskTypeTakeIn(ctx context.Context, tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { + return c.taskTypeTake(ctx, tx, customId, Type, TASK_IN) } // TaskTypeTakeSuccess 查询单任务 - 任务完成 -func (c *Client) TaskTypeTakeSuccess(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { - return c.taskTypeTake(tx, customId, Type, TASK_SUCCESS) +func (c *Client) TaskTypeTakeSuccess(ctx context.Context, tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { + return c.taskTypeTake(ctx, tx, customId, Type, TASK_SUCCESS) } // TaskTypeTakeError 查询单任务 - 任务异常 -func (c *Client) TaskTypeTakeError(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { - return c.taskTypeTake(tx, customId, Type, TASK_ERROR) +func (c *Client) TaskTypeTakeError(ctx context.Context, tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { + return c.taskTypeTake(ctx, tx, customId, Type, TASK_ERROR) } // TaskTypeTakeTimeout 查询单任务 - 任务超时 -func (c *Client) TaskTypeTakeTimeout(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { - return c.taskTypeTake(tx, customId, Type, TASK_TIMEOUT) +func (c *Client) TaskTypeTakeTimeout(ctx context.Context, tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { + return c.taskTypeTake(ctx, tx, customId, Type, TASK_TIMEOUT) } // TaskTypeTakeWait 查询单任务 - 任务等待 -func (c *Client) TaskTypeTakeWait(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { - return c.taskTypeTake(tx, customId, Type, TASK_WAIT) +func (c *Client) TaskTypeTakeWait(ctx context.Context, tx *gorm.DB, customId, Type string) jobs_gorm_model.Task { + return c.taskTypeTake(ctx, tx, customId, Type, TASK_WAIT) } // TaskFindAll 查询多任务 -func (c *Client) TaskFindAll(tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task) { - tx.Where("frequency = ?", frequency).Order("id asc").Find(&results) +func (c *Client) TaskFindAll(ctx context.Context, tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task) { + err := tx.Where("frequency = ?", frequency).Order("id asc").Find(&results).Error + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询多任务:%v", err) + } return results } // 查询多任务 -func (c *Client) taskFindAll(tx *gorm.DB, frequency int64, status string) (results []jobs_gorm_model.Task) { - tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results) +func (c *Client) taskFindAll(ctx context.Context, tx *gorm.DB, frequency int64, status string) (results []jobs_gorm_model.Task) { + err := tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results).Error + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询多任务:%v", err) + } return results } // TaskFindAllIn 查询多任务 - 任务运行 -func (c *Client) TaskFindAllIn(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { - return c.taskFindAll(tx, frequency, TASK_IN) +func (c *Client) TaskFindAllIn(ctx context.Context, tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { + return c.taskFindAll(ctx, tx, frequency, TASK_IN) } // TaskFindAllSuccess 查询多任务 - 任务完成 -func (c *Client) TaskFindAllSuccess(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { - return c.taskFindAll(tx, frequency, TASK_SUCCESS) +func (c *Client) TaskFindAllSuccess(ctx context.Context, tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { + return c.taskFindAll(ctx, tx, frequency, TASK_SUCCESS) } // TaskFindAllError 查询多任务 - 任务异常 -func (c *Client) TaskFindAllError(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { - return c.taskFindAll(tx, frequency, TASK_ERROR) +func (c *Client) TaskFindAllError(ctx context.Context, tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { + return c.taskFindAll(ctx, tx, frequency, TASK_ERROR) } // TaskFindAllTimeout 查询多任务 - 任务超时 -func (c *Client) TaskFindAllTimeout(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { - return c.taskFindAll(tx, frequency, TASK_TIMEOUT) +func (c *Client) TaskFindAllTimeout(ctx context.Context, tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { + return c.taskFindAll(ctx, tx, frequency, TASK_TIMEOUT) } // TaskFindAllWait 查询多任务 - 任务等待 -func (c *Client) TaskFindAllWait(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { - return c.taskFindAll(tx, frequency, TASK_WAIT) +func (c *Client) TaskFindAllWait(ctx context.Context, tx *gorm.DB, frequency int64) []jobs_gorm_model.Task { + return c.taskFindAll(ctx, tx, frequency, TASK_WAIT) } // StartTask 任务启动 -func (c *Client) StartTask(tx *gorm.DB, id uint) error { - return c.EditTask(tx, id). +func (c *Client) StartTask(ctx context.Context, tx *gorm.DB, id uint) error { + err := c.EditTask(tx, id). Select("status", "status_desc"). Updates(jobs_gorm_model.Task{ Status: TASK_IN, StatusDesc: "启动任务", }).Error + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]任务启动失败:%v", err) + } + return err } // StartTaskCustom 任务启动自定义 -func (c *Client) StartTaskCustom(tx *gorm.DB, customId string, customSequence int64) error { - return tx.Model(&jobs_gorm_model.Task{}). +func (c *Client) StartTaskCustom(ctx context.Context, tx *gorm.DB, customId string, customSequence int64) error { + err := tx.Model(&jobs_gorm_model.Task{}). Where("custom_id = ?", customId). Where("custom_sequence = ?", customSequence). Where("status = ?", TASK_WAIT). @@ -143,6 +169,10 @@ func (c *Client) StartTaskCustom(tx *gorm.DB, customId string, customSequence in Status: TASK_IN, StatusDesc: "启动任务", }).Error + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]任务启动自定义失败:%v", err) + } + return err } // EditTask 任务修改 @@ -151,10 +181,14 @@ func (c *Client) EditTask(tx *gorm.DB, id uint) *gorm.DB { } // UpdateFrequency 更新任务频率 -func (c *Client) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB { - return c.EditTask(tx, id). +func (c *Client) UpdateFrequency(ctx context.Context, tx *gorm.DB, id uint, frequency int64) error { + err := c.EditTask(tx, id). Select("frequency"). Updates(jobs_gorm_model.Task{ Frequency: frequency, - }) + }).Error + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]更新任务频率失败:%v", err) + } + return err } diff --git a/redis.go b/redis.go index e0d2c93..3c05704 100644 --- a/redis.go +++ b/redis.go @@ -11,8 +11,8 @@ import ( // message 消息 func (c *Client) Publish(ctx context.Context, channel string, message interface{}) error { publish, err := c.cache.redisClient.Publish(ctx, channel, message).Result() - if c.config.debug == true { - c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs.Publish] %s %s %v %s\n", channel, message, publish, err) + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]发布失败 %s %s %v %s\n", channel, message, publish, err) } return err } diff --git a/redis_get.go b/redis_get.go index 0758075..37b2b4b 100644 --- a/redis_get.go +++ b/redis_get.go @@ -30,10 +30,10 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_ // 只有一个客户端在线 if len(workers) == 1 { - if appointIpStatus == true { + if appointIpStatus { // 判断是否指定某ip执行 - if gostring.Contains(workers[0], currentIp) == true { - c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs.GetIssueAddress]只有一个客户端在线,指定某ip执行:", workers[0], currentIp) + if gostring.Contains(workers[0], currentIp) { + c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs]只有一个客户端在线,指定某ip执行:", workers[0], currentIp) return workers[0], nil } return "", errors.New(fmt.Sprintf("需要执行的[%s]客户端不在线", currentIp)) @@ -42,10 +42,10 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_ } // 优先处理指定某ip执行 - if appointIpStatus == true { + if appointIpStatus { for wk, wv := range workers { - if gostring.Contains(wv, currentIp) == true { - c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs.GetIssueAddress]优先处理指定某ip执行:", workers[wk], currentIp) + if gostring.Contains(wv, currentIp) { + c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs]优先处理指定某ip执行:", workers[wk], currentIp) return workers[wk], nil } } @@ -56,7 +56,7 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_ if address == "" { return address, errors.New("获取执行的客户端异常") } - c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs.GetIssueAddress]随机返回一个:", address, currentIp) + c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs]随机返回一个:", address, currentIp) return address, nil } } @@ -64,12 +64,11 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_ // GetSubscribeClientList 获取在线的客户端 func (c *Client) GetSubscribeClientList(ctx context.Context) (client []string, err error) { - if c.config.debug == true { - c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs.GetSubscribeClientList]:", c.cache.cornKeyPrefix+"_*") - } - // 查询活跃的channel client, err = c.cache.redisClient.PubSubChannels(ctx, c.cache.cornKeyPrefix+"_*").Result() + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]获取在线的客户端失败:%s,%v", c.cache.cornKeyPrefix+"_*", err) + } return client, err } diff --git a/run.go b/run.go index 8859749..385beed 100644 --- a/run.go +++ b/run.go @@ -15,7 +15,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC runId := gotrace_id.GetTraceIdContext(ctx) if runId == "" { - c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run]:%s", "上下文没有跟踪编号") + c.zapLog.WithTraceId(ctx).Sugar().Error("[jobs]上下文没有跟踪编号") return } @@ -61,7 +61,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time, }).Error if err != nil { - c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.0]:%s", err.Error()) + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败:%s", err.Error()) } return case CodeSuccess: @@ -77,7 +77,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time, }).Error if err != nil { - c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.CodeSuccess]:%s", err.Error()) + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败:%s", err.Error()) } case CodeEnd: // 执行成功、提前结束 @@ -92,7 +92,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC NextRunTime: gotime.Current().Time, }).Error if err != nil { - c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.CodeEnd]:%s", err.Error()) + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败:%s", err.Error()) } case CodeError: // 执行失败 @@ -107,7 +107,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time, }).Error if err != nil { - c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.CodeError]:%s", err.Error()) + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败:%s", err.Error()) } } @@ -120,7 +120,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC Status: TASK_TIMEOUT, }).Error if err != nil { - c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.TASK_TIMEOUT]:%s", err.Error()) + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败:%s", err.Error()) } } }