- update context

master v1.0.102
李光春 2 years ago
parent 3a9b47cc7f
commit 3d5d7c3a7d

@ -39,8 +39,6 @@ type Client struct {
goVersion string // go版本
sdkVersion string // sdk版本
systemOutsideIp string // 外网ip
debug bool // 日志开关
jsonStatus bool // json状态
}
cache struct {
redisClient *dorm.RedisClient // 数据库
@ -65,10 +63,6 @@ func NewClient(config *ClientConfig) (*Client, error) {
c.zapLog = config.ZapLog
c.config.debug = config.Debug
c.config.jsonStatus = config.JsonStatus
// 配置外网ip
if config.CurrentIp == "" {
config.CurrentIp = goip.GetOutsideIp(ctx)

@ -1,6 +1,6 @@
package gojobs
const (
Version = "1.0.101"
Version = "1.0.102"
SpecifyIpNull = "0.0.0.0"
)

@ -62,7 +62,7 @@ type ConfigCreateInCustomIdOnly struct {
// CreateInCustomIdOnly 创建正在运行唯一任务
func (c *Client) CreateInCustomIdOnly(ctx context.Context, config *ConfigCreateInCustomIdOnly) error {
query := c.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
query := c.TaskTypeTakeIn(ctx, config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return TaskIsExist
}
@ -145,7 +145,7 @@ type ConfigCreateInCustomIdMaxNumberOnly struct {
// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
func (c *Client) CreateInCustomIdMaxNumberOnly(ctx context.Context, config *ConfigCreateInCustomIdMaxNumberOnly) error {
query := c.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
query := c.TaskTypeTakeIn(ctx, config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return TaskIsExist
}

@ -1,140 +1,166 @@
package gojobs
import (
"context"
"go.dtapp.net/gojobs/jobs_gorm_model"
"gorm.io/gorm"
)
// TaskTakeId 查询单任务
func (c *Client) TaskTakeId(tx *gorm.DB, id uint) (result jobs_gorm_model.Task) {
tx.Where("id = ?", id).Take(&result)
// TaskTakeId 编号查询任务
func (c *Client) TaskTakeId(ctx context.Context, tx *gorm.DB, id uint) (result jobs_gorm_model.Task) {
err := tx.Where("id = ?", id).Take(&result).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]编号查询任务:%v", err)
}
return result
}
// TaskTake 查询单任务
func (c *Client) TaskTake(tx *gorm.DB, customId string) (result jobs_gorm_model.Task) {
tx.Where("custom_id = ?", customId).Take(&result)
// TaskTake 自定义编号查询任务
func (c *Client) TaskTake(ctx context.Context, tx *gorm.DB, customId string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Take(&result).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]自定义编号查询任务:%v", err)
}
return result
}
// 查询单任务
func (c *Client) taskTake(tx *gorm.DB, customId, status string) (result jobs_gorm_model.Task) {
tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result)
// 自定义编号加状态查询任务
func (c *Client) taskTake(ctx context.Context, tx *gorm.DB, customId, status string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]自定义编号加状态查询任务:%v", err)
}
return result
}
// TaskTakeIn 查询单任务 - 任务运行
func (c *Client) TaskTakeIn(tx *gorm.DB, customId string) jobs_gorm_model.Task {
return c.taskTake(tx, customId, TASK_IN)
func (c *Client) TaskTakeIn(ctx context.Context, tx *gorm.DB, customId string) jobs_gorm_model.Task {
return c.taskTake(ctx, tx, customId, TASK_IN)
}
// TaskTakeSuccess 查询单任务 - 任务完成
func (c *Client) TaskTakeSuccess(tx *gorm.DB, customId string) jobs_gorm_model.Task {
return c.taskTake(tx, customId, TASK_SUCCESS)
func (c *Client) TaskTakeSuccess(ctx context.Context, tx *gorm.DB, customId string) jobs_gorm_model.Task {
return c.taskTake(ctx, tx, customId, TASK_SUCCESS)
}
// TaskTakeError 查询单任务 - 任务异常
func (c *Client) TaskTakeError(tx *gorm.DB, customId string) jobs_gorm_model.Task {
return c.taskTake(tx, customId, TASK_ERROR)
func (c *Client) TaskTakeError(ctx context.Context, tx *gorm.DB, customId string) jobs_gorm_model.Task {
return c.taskTake(ctx, tx, customId, TASK_ERROR)
}
// TaskTakeTimeout 查询单任务 - 任务超时
func (c *Client) TaskTakeTimeout(tx *gorm.DB, customId string) jobs_gorm_model.Task {
return c.taskTake(tx, customId, TASK_TIMEOUT)
func (c *Client) TaskTakeTimeout(ctx context.Context, tx *gorm.DB, customId string) jobs_gorm_model.Task {
return c.taskTake(ctx, tx, customId, TASK_TIMEOUT)
}
// TaskTakeWait 查询单任务 - 任务等待
func (c *Client) TaskTakeWait(tx *gorm.DB, customId string) jobs_gorm_model.Task {
return c.taskTake(tx, customId, TASK_WAIT)
func (c *Client) TaskTakeWait(ctx context.Context, tx *gorm.DB, customId string) jobs_gorm_model.Task {
return c.taskTake(ctx, tx, customId, TASK_WAIT)
}
// TaskTypeTake 查询单任务
func (c *Client) TaskTypeTake(tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task) {
tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result)
func (c *Client) TaskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询单任务:%v", err)
}
return result
}
// 查询单任务
func (c *Client) taskTypeTake(tx *gorm.DB, customId, Type, status string) (result jobs_gorm_model.Task) {
tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result)
func (c *Client) taskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type, status string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询单任务:%v", err)
}
return result
}
// TaskTypeTakeIn 查询单任务 - 任务运行
func (c *Client) TaskTypeTakeIn(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return c.taskTypeTake(tx, customId, Type, TASK_IN)
func (c *Client) TaskTypeTakeIn(ctx context.Context, tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return c.taskTypeTake(ctx, tx, customId, Type, TASK_IN)
}
// TaskTypeTakeSuccess 查询单任务 - 任务完成
func (c *Client) TaskTypeTakeSuccess(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return c.taskTypeTake(tx, customId, Type, TASK_SUCCESS)
func (c *Client) TaskTypeTakeSuccess(ctx context.Context, tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return c.taskTypeTake(ctx, tx, customId, Type, TASK_SUCCESS)
}
// TaskTypeTakeError 查询单任务 - 任务异常
func (c *Client) TaskTypeTakeError(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return c.taskTypeTake(tx, customId, Type, TASK_ERROR)
func (c *Client) TaskTypeTakeError(ctx context.Context, tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return c.taskTypeTake(ctx, tx, customId, Type, TASK_ERROR)
}
// TaskTypeTakeTimeout 查询单任务 - 任务超时
func (c *Client) TaskTypeTakeTimeout(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return c.taskTypeTake(tx, customId, Type, TASK_TIMEOUT)
func (c *Client) TaskTypeTakeTimeout(ctx context.Context, tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return c.taskTypeTake(ctx, tx, customId, Type, TASK_TIMEOUT)
}
// TaskTypeTakeWait 查询单任务 - 任务等待
func (c *Client) TaskTypeTakeWait(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return c.taskTypeTake(tx, customId, Type, TASK_WAIT)
func (c *Client) TaskTypeTakeWait(ctx context.Context, tx *gorm.DB, customId, Type string) jobs_gorm_model.Task {
return c.taskTypeTake(ctx, tx, customId, Type, TASK_WAIT)
}
// TaskFindAll 查询多任务
func (c *Client) TaskFindAll(tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task) {
tx.Where("frequency = ?", frequency).Order("id asc").Find(&results)
func (c *Client) TaskFindAll(ctx context.Context, tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task) {
err := tx.Where("frequency = ?", frequency).Order("id asc").Find(&results).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询多任务:%v", err)
}
return results
}
// 查询多任务
func (c *Client) taskFindAll(tx *gorm.DB, frequency int64, status string) (results []jobs_gorm_model.Task) {
tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results)
func (c *Client) taskFindAll(ctx context.Context, tx *gorm.DB, frequency int64, status string) (results []jobs_gorm_model.Task) {
err := tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询多任务:%v", err)
}
return results
}
// TaskFindAllIn 查询多任务 - 任务运行
func (c *Client) TaskFindAllIn(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return c.taskFindAll(tx, frequency, TASK_IN)
func (c *Client) TaskFindAllIn(ctx context.Context, tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return c.taskFindAll(ctx, tx, frequency, TASK_IN)
}
// TaskFindAllSuccess 查询多任务 - 任务完成
func (c *Client) TaskFindAllSuccess(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return c.taskFindAll(tx, frequency, TASK_SUCCESS)
func (c *Client) TaskFindAllSuccess(ctx context.Context, tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return c.taskFindAll(ctx, tx, frequency, TASK_SUCCESS)
}
// TaskFindAllError 查询多任务 - 任务异常
func (c *Client) TaskFindAllError(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return c.taskFindAll(tx, frequency, TASK_ERROR)
func (c *Client) TaskFindAllError(ctx context.Context, tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return c.taskFindAll(ctx, tx, frequency, TASK_ERROR)
}
// TaskFindAllTimeout 查询多任务 - 任务超时
func (c *Client) TaskFindAllTimeout(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return c.taskFindAll(tx, frequency, TASK_TIMEOUT)
func (c *Client) TaskFindAllTimeout(ctx context.Context, tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return c.taskFindAll(ctx, tx, frequency, TASK_TIMEOUT)
}
// TaskFindAllWait 查询多任务 - 任务等待
func (c *Client) TaskFindAllWait(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return c.taskFindAll(tx, frequency, TASK_WAIT)
func (c *Client) TaskFindAllWait(ctx context.Context, tx *gorm.DB, frequency int64) []jobs_gorm_model.Task {
return c.taskFindAll(ctx, tx, frequency, TASK_WAIT)
}
// StartTask 任务启动
func (c *Client) StartTask(tx *gorm.DB, id uint) error {
return c.EditTask(tx, id).
func (c *Client) StartTask(ctx context.Context, tx *gorm.DB, id uint) error {
err := c.EditTask(tx, id).
Select("status", "status_desc").
Updates(jobs_gorm_model.Task{
Status: TASK_IN,
StatusDesc: "启动任务",
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]任务启动失败:%v", err)
}
return err
}
// StartTaskCustom 任务启动自定义
func (c *Client) StartTaskCustom(tx *gorm.DB, customId string, customSequence int64) error {
return tx.Model(&jobs_gorm_model.Task{}).
func (c *Client) StartTaskCustom(ctx context.Context, tx *gorm.DB, customId string, customSequence int64) error {
err := tx.Model(&jobs_gorm_model.Task{}).
Where("custom_id = ?", customId).
Where("custom_sequence = ?", customSequence).
Where("status = ?", TASK_WAIT).
@ -143,6 +169,10 @@ func (c *Client) StartTaskCustom(tx *gorm.DB, customId string, customSequence in
Status: TASK_IN,
StatusDesc: "启动任务",
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]任务启动自定义失败:%v", err)
}
return err
}
// EditTask 任务修改
@ -151,10 +181,14 @@ func (c *Client) EditTask(tx *gorm.DB, id uint) *gorm.DB {
}
// UpdateFrequency 更新任务频率
func (c *Client) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB {
return c.EditTask(tx, id).
func (c *Client) UpdateFrequency(ctx context.Context, tx *gorm.DB, id uint, frequency int64) error {
err := c.EditTask(tx, id).
Select("frequency").
Updates(jobs_gorm_model.Task{
Frequency: frequency,
})
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]更新任务频率失败:%v", err)
}
return err
}

@ -11,8 +11,8 @@ import (
// message 消息
func (c *Client) Publish(ctx context.Context, channel string, message interface{}) error {
publish, err := c.cache.redisClient.Publish(ctx, channel, message).Result()
if c.config.debug == true {
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs.Publish] %s %s %v %s\n", channel, message, publish, err)
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]发布失败 %s %s %v %s\n", channel, message, publish, err)
}
return err
}

@ -30,10 +30,10 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
// 只有一个客户端在线
if len(workers) == 1 {
if appointIpStatus == true {
if appointIpStatus {
// 判断是否指定某ip执行
if gostring.Contains(workers[0], currentIp) == true {
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs.GetIssueAddress]只有一个客户端在线指定某ip执行", workers[0], currentIp)
if gostring.Contains(workers[0], currentIp) {
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs]只有一个客户端在线指定某ip执行", workers[0], currentIp)
return workers[0], nil
}
return "", errors.New(fmt.Sprintf("需要执行的[%s]客户端不在线", currentIp))
@ -42,10 +42,10 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
}
// 优先处理指定某ip执行
if appointIpStatus == true {
if appointIpStatus {
for wk, wv := range workers {
if gostring.Contains(wv, currentIp) == true {
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs.GetIssueAddress]优先处理指定某ip执行", workers[wk], currentIp)
if gostring.Contains(wv, currentIp) {
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs]优先处理指定某ip执行", workers[wk], currentIp)
return workers[wk], nil
}
}
@ -56,7 +56,7 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
if address == "" {
return address, errors.New("获取执行的客户端异常")
}
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs.GetIssueAddress]随机返回一个:", address, currentIp)
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs]随机返回一个:", address, currentIp)
return address, nil
}
}
@ -64,12 +64,11 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
// GetSubscribeClientList 获取在线的客户端
func (c *Client) GetSubscribeClientList(ctx context.Context) (client []string, err error) {
if c.config.debug == true {
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs.GetSubscribeClientList]", c.cache.cornKeyPrefix+"_*")
}
// 查询活跃的channel
client, err = c.cache.redisClient.PubSubChannels(ctx, c.cache.cornKeyPrefix+"_*").Result()
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]获取在线的客户端失败:%s%v", c.cache.cornKeyPrefix+"_*", err)
}
return client, err
}

@ -15,7 +15,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
runId := gotrace_id.GetTraceIdContext(ctx)
if runId == "" {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run]%s", "上下文没有跟踪编号")
c.zapLog.WithTraceId(ctx).Sugar().Error("[jobs]上下文没有跟踪编号")
return
}
@ -61,7 +61,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.0]%s", err.Error())
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败%s", err.Error())
}
return
case CodeSuccess:
@ -77,7 +77,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.CodeSuccess]%s", err.Error())
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败%s", err.Error())
}
case CodeEnd:
// 执行成功、提前结束
@ -92,7 +92,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().Time,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.CodeEnd]%s", err.Error())
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败%s", err.Error())
}
case CodeError:
// 执行失败
@ -107,7 +107,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.CodeError]%s", err.Error())
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败%s", err.Error())
}
}
@ -120,7 +120,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
Status: TASK_TIMEOUT,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.TASK_TIMEOUT]%s", err.Error())
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败%s", err.Error())
}
}
}

Loading…
Cancel
Save