master v1.0.104
李光春 2 years ago
parent d84a9656a7
commit 625deb44ca

@ -3,7 +3,6 @@ package gojobs
import (
"context"
"go.dtapp.net/dorm"
"go.dtapp.net/goip"
"go.dtapp.net/golog"
)
@ -20,10 +19,8 @@ type ClientConfig struct {
MongoClientFun dorm.MongoClientFun // 数据库驱动
RedisClientFun dorm.RedisClientFun // 数据库驱动
RedisPrefixFun redisPrefixFun // 前缀
Debug bool // 日志开关
ZapLog *golog.ZapLog // 日志服务
CurrentIp string // 当前ip
JsonStatus bool // json状态
}
// Client 实例
@ -63,10 +60,6 @@ func NewClient(config *ClientConfig) (*Client, error) {
c.zapLog = config.ZapLog
// 配置外网ip
if config.CurrentIp == "" {
config.CurrentIp = goip.GetOutsideIp(ctx)
}
if config.CurrentIp != "" && config.CurrentIp != "0.0.0.0" {
c.config.systemOutsideIp = config.CurrentIp
}

@ -1,6 +1,6 @@
package gojobs
const (
Version = "1.0.103"
Version = "1.0.104"
SpecifyIpNull = "0.0.0.0"
)

@ -7,8 +7,8 @@ require (
github.com/jasonlvhit/gocron v0.0.1
github.com/robfig/cron/v3 v3.0.1
go.dtapp.net/dorm v1.0.42
go.dtapp.net/goip v1.0.37
go.dtapp.net/golog v1.0.90
go.dtapp.net/goip v1.0.38
go.dtapp.net/golog v1.0.92
go.dtapp.net/gostring v1.0.10
go.dtapp.net/gotime v1.0.5
go.dtapp.net/gotrace_id v1.0.6

@ -505,10 +505,10 @@ github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxt
github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0=
go.dtapp.net/dorm v1.0.42 h1:bugzTYBr5goLDf9s8vft6xG/fAiinLm1jo+9uFD7JRM=
go.dtapp.net/dorm v1.0.42/go.mod h1:LCy6tqg7uClOhMb8zgF9x9mTFoVd9Ud40g9a2Z6bbwM=
go.dtapp.net/goip v1.0.37 h1:QwdBxnEhB6Vm+Ma2X1PQb/1Sm96dJFGgnOkC4Dr1oxk=
go.dtapp.net/goip v1.0.37/go.mod h1:N2YFFr2OO+5VQwMqyKtg7c4MVrDJOoog/QmIvYUfi1c=
go.dtapp.net/golog v1.0.90 h1:sDmpZZ9/gEy9HWxpPQO4gjK1JC7R876w4DzKWHIak6I=
go.dtapp.net/golog v1.0.90/go.mod h1:6kSMLr7RSDONS1+2xYWK1f8X3YHrElmHg2Ti/IlFKAU=
go.dtapp.net/goip v1.0.38 h1:WHIqXV0qWUM9XDtRaMIMyCKWyd9dWfSvSRdDr7vF7xU=
go.dtapp.net/goip v1.0.38/go.mod h1:N2YFFr2OO+5VQwMqyKtg7c4MVrDJOoog/QmIvYUfi1c=
go.dtapp.net/golog v1.0.92 h1:LbkCNxT7AJUmbXQPaQzWO+b8/dDK4s4FOtZ9ZtVXKOI=
go.dtapp.net/golog v1.0.92/go.mod h1:3AGlz/yrCNMD0RSKThQaTSwZHkMnlgeBlLJ9voTvaZY=
go.dtapp.net/gorandom v1.0.1 h1:IWfMClh1ECPvyUjlqD7MwLq4mZdUusD1qAwAdsvEJBs=
go.dtapp.net/gorandom v1.0.1/go.mod h1:ZPdgalKpvFV/ATQqR0k4ns/F/IpITAZpx6WkWirr5Y8=
go.dtapp.net/gorequest v1.0.31 h1:r/OoU5Y00TbJjkQtpvwjsb/pllqO0UQQjFRY1veZYZc=

@ -10,7 +10,7 @@ import (
func (c *Client) TaskTakeId(ctx context.Context, tx *gorm.DB, id uint) (result jobs_gorm_model.Task) {
err := tx.Where("id = ?", id).Take(&result).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]编号查询任务:%v", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("编号查询任务:%v", err)
}
return result
}
@ -19,7 +19,7 @@ func (c *Client) TaskTakeId(ctx context.Context, tx *gorm.DB, id uint) (result j
func (c *Client) TaskTake(ctx context.Context, tx *gorm.DB, customId string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Take(&result).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]自定义编号查询任务:%v", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("自定义编号查询任务:%v", err)
}
return result
}
@ -28,7 +28,7 @@ func (c *Client) TaskTake(ctx context.Context, tx *gorm.DB, customId string) (re
func (c *Client) taskTake(ctx context.Context, tx *gorm.DB, customId, status string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("status = ?", status).Take(&result).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]自定义编号加状态查询任务:%v", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("自定义编号加状态查询任务:%v", err)
}
return result
}
@ -62,7 +62,7 @@ func (c *Client) TaskTakeWait(ctx context.Context, tx *gorm.DB, customId string)
func (c *Client) TaskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Take(&result).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询单任务:%v", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("查询单任务:%v", err)
}
return result
}
@ -71,7 +71,7 @@ func (c *Client) TaskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type s
func (c *Client) taskTypeTake(ctx context.Context, tx *gorm.DB, customId, Type, status string) (result jobs_gorm_model.Task) {
err := tx.Where("custom_id = ?", customId).Where("type = ?", Type).Where("status = ?", status).Take(&result).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询单任务:%v", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("查询单任务:%v", err)
}
return result
}
@ -105,7 +105,7 @@ func (c *Client) TaskTypeTakeWait(ctx context.Context, tx *gorm.DB, customId, Ty
func (c *Client) TaskFindAll(ctx context.Context, tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task) {
err := tx.Where("frequency = ?", frequency).Order("id asc").Find(&results).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询多任务:%v", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("查询多任务:%v", err)
}
return results
}
@ -114,7 +114,7 @@ func (c *Client) TaskFindAll(ctx context.Context, tx *gorm.DB, frequency int64)
func (c *Client) taskFindAll(ctx context.Context, tx *gorm.DB, frequency int64, status string) (results []jobs_gorm_model.Task) {
err := tx.Where("frequency = ?", frequency).Where("status = ?", status).Order("id asc").Find(&results).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]查询多任务:%v", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("查询多任务:%v", err)
}
return results
}
@ -153,7 +153,7 @@ func (c *Client) StartTask(ctx context.Context, tx *gorm.DB, id uint) error {
StatusDesc: "启动任务",
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]任务启动失败:%v", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("任务启动失败:%v", err)
}
return err
}
@ -170,7 +170,7 @@ func (c *Client) StartTaskCustom(ctx context.Context, tx *gorm.DB, customId stri
StatusDesc: "启动任务",
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]任务启动自定义失败:%v", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("任务启动自定义失败:%v", err)
}
return err
}
@ -188,7 +188,7 @@ func (c *Client) UpdateFrequency(ctx context.Context, tx *gorm.DB, id uint, freq
Frequency: frequency,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]更新任务频率失败:%v", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("更新任务频率失败:%v", err)
}
return err
}

@ -29,7 +29,11 @@ func (c *Client) autoMigrateTaskLog(ctx context.Context) {
// GormTaskLogDelete 删除
func (c *Client) GormTaskLogDelete(ctx context.Context, hour int64) error {
return c.gormClient.GetDb().Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
err := c.gormClient.GetDb().Where("log_time < ?", gotime.Current().BeforeHour(hour).Format()).Delete(&jobs_gorm_model.TaskLog{}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("删除失败:%s", err)
}
return err
}
// MongoTaskLogDelete 删除

@ -12,7 +12,7 @@ import (
func (c *Client) Publish(ctx context.Context, channel string, message interface{}) error {
publish, err := c.cache.redisClient.Publish(ctx, channel, message).Result()
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]发布失败 %s %s %v %s\n", channel, message, publish, err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("发布失败:%s %s %v %s\n", channel, message, publish, err)
}
return err
}

@ -38,7 +38,7 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
if appointIpStatus {
// 判断是否指定某ip执行
if gostring.Contains(workers[0], currentIp) {
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs]只有一个客户端在线指定某ip执行", workers[0], currentIp)
c.zapLog.WithTraceId(ctx).Sugar().Info("只有一个客户端在线指定某ip执行", workers[0], currentIp)
return workers[0], nil
}
return "", errors.New(fmt.Sprintf("需要执行的[%s]客户端不在线", currentIp))
@ -50,7 +50,7 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
if appointIpStatus {
for wk, wv := range workers {
if gostring.Contains(wv, currentIp) {
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs]优先处理指定某ip执行", workers[wk], currentIp)
c.zapLog.WithTraceId(ctx).Sugar().Info("优先处理指定某ip执行", workers[wk], currentIp)
return workers[wk], nil
}
}
@ -61,7 +61,7 @@ func (c *Client) GetIssueAddress(ctx context.Context, workers []string, v *jobs_
if address == "" {
return address, errors.New("获取执行的客户端异常")
}
c.zapLog.WithTraceId(ctx).Sugar().Info("[jobs]随机返回一个:", address, currentIp)
c.zapLog.WithTraceId(ctx).Sugar().Info("随机返回一个:", address, currentIp)
return address, nil
}
}
@ -72,7 +72,7 @@ func (c *Client) GetSubscribeClientList(ctx context.Context) (client []string, e
// 查询活跃的channel
client, err = c.cache.redisClient.PubSubChannels(ctx, c.cache.cornKeyPrefix+"_*").Result()
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]获取在线的客户端失败:%s%v", c.cache.cornKeyPrefix+"_*", err)
c.zapLog.WithTraceId(ctx).Sugar().Errorf("获取在线的客户端失败:%s%v", c.cache.cornKeyPrefix+"_*", err)
}
return client, err

@ -15,7 +15,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
runId := gotrace_id.GetTraceIdContext(ctx)
if runId == "" {
c.zapLog.WithTraceId(ctx).Sugar().Error("[jobs]上下文没有跟踪编号")
c.zapLog.WithTraceId(ctx).Sugar().Error("上下文没有跟踪编号")
return
}
@ -61,7 +61,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败:%s", err.Error())
c.zapLog.WithTraceId(ctx).Sugar().Errorf("保存失败:%s", err.Error())
}
return
case CodeSuccess:
@ -77,7 +77,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败:%s", err.Error())
c.zapLog.WithTraceId(ctx).Sugar().Errorf("保存失败:%s", err.Error())
}
case CodeEnd:
// 执行成功、提前结束
@ -92,7 +92,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().Time,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败:%s", err.Error())
c.zapLog.WithTraceId(ctx).Sugar().Errorf("保存失败:%s", err.Error())
}
case CodeError:
// 执行失败
@ -107,7 +107,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
NextRunTime: gotime.Current().AfterSeconds(task.Frequency).Time,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败:%s", err.Error())
c.zapLog.WithTraceId(ctx).Sugar().Errorf("保存失败:%s", err.Error())
}
}
@ -120,7 +120,7 @@ func (c *Client) Run(ctx context.Context, task jobs_gorm_model.Task, taskResultC
Status: TASK_TIMEOUT,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[jobs]保存失败:%s", err.Error())
c.zapLog.WithTraceId(ctx).Sugar().Errorf("保存失败:%s", err.Error())
}
}
}

Loading…
Cancel
Save