- 优化KEY
continuous-integration/drone/tag Build is failing Details
continuous-integration/drone/push Build is failing Details

master v1.0.52
李光春 2 years ago
parent 622f6be827
commit 4f4e4ac2da

@ -1,3 +1,3 @@
package gojobs package gojobs
const Version = "1.0.51" const Version = "1.0.52"

@ -1,13 +0,0 @@
package gojobs
func (j *JobsGorm) getCornKeyIp() string {
return j.config.cornPrefix + "_" + j.config.outsideIp
}
func (j *JobsGorm) getCornKeyChannel() string {
return j.config.cornKeyIp
}
func (j *JobsGorm) getCornKeyChannels() string {
return j.config.cornKeyIp + "_*"
}

@ -13,13 +13,14 @@ import (
) )
type JobsGormConfig struct { type JobsGormConfig struct {
GormClient *dorm.GormClient // 数据库驱动 GormClient *dorm.GormClient // 数据库驱动
RedisClient *dorm.RedisClient // 缓存数据库驱动 RedisClient *dorm.RedisClient // 缓存数据库驱动
CurrentIp string // 当前ip CurrentIp string // 当前ip
LockPrefix string // 锁Key前缀 xxx_lock LockKeyPrefix string // 锁Key前缀 xxx_lock
LockSeparator string // 锁分隔符 xxx_lock LockKeySeparator string // 锁Key分隔符 :
CornPrefix string // 任务前缀 xxx_cron CornKeyPrefix string // 任务Key前缀 xxx_cron
Debug bool // 调试 CornKeyCustom string // 任务Key自定义 xxx_cron_自定义 xxx_cron_自定义_*
Debug bool // 调试
} }
// JobsGorm Gorm数据库驱动 // JobsGorm Gorm数据库驱动
@ -28,21 +29,19 @@ type JobsGorm struct {
redisClient *dorm.RedisClient // 缓存驱动 redisClient *dorm.RedisClient // 缓存驱动
lockClient *golock.LockRedis // 锁驱动 lockClient *golock.LockRedis // 锁驱动
config struct { config struct {
debug bool // 调试 debug bool // 调试
runVersion string // 运行版本 runVersion string // 运行版本
os string // 系统类型 os string // 系统类型
arch string // 系统架构 arch string // 系统架构
maxProCs int // CPU核数 maxProCs int // CPU核数
version string // GO版本 version string // GO版本
macAddrS string // Mac地址 macAddrS string // Mac地址
insideIp string // 内网ip insideIp string // 内网ip
outsideIp string // 外网ip outsideIp string // 外网ip
lockPrefix string // 锁Key前缀 lockKeyPrefix string // 锁Key前缀 xxx_lock
lockSeparator string // 锁分隔符 lockKeySeparator string // 锁Key分隔符 :
cornPrefix string // 任务key前缀 cornKeyPrefix string // 任务Key前缀 xxx_cron
cornKeyIp string // 任务key cornKeyCustom string // 任务Key自定义
cornKeyChannel string // 任务频道key(任务key+ip)
cornKeyChannels string // 任务频道key通配符匹配(任务key+ip+_*)
} }
} }
@ -50,14 +49,17 @@ type JobsGorm struct {
func NewJobsGorm(config *JobsGormConfig) (*JobsGorm, error) { func NewJobsGorm(config *JobsGormConfig) (*JobsGorm, error) {
// 判断 // 判断
if config.LockPrefix == "" { if config.LockKeyPrefix == "" {
return nil, errors.New("需要配置锁Key前缀") return nil, errors.New("需要配置锁Key前缀")
} }
if config.LockSeparator == "" { if config.LockKeySeparator == "" {
return nil, errors.New("需要配置锁分隔符") return nil, errors.New("需要配置锁Key分隔符")
} }
if config.CornPrefix == "" { if config.CornKeyPrefix == "" {
return nil, errors.New("需要配置任务前缀") return nil, errors.New("需要配置任务Key前缀")
}
if config.CornKeyCustom == "" {
return nil, errors.New("需要配置任务Key自定义")
} }
if config.CurrentIp == "" { if config.CurrentIp == "" {
return nil, errors.New("需要配置当前的IP") return nil, errors.New("需要配置当前的IP")
@ -73,9 +75,10 @@ func NewJobsGorm(config *JobsGormConfig) (*JobsGorm, error) {
c.gormClient = config.GormClient c.gormClient = config.GormClient
c.redisClient = config.RedisClient c.redisClient = config.RedisClient
c.config.outsideIp = config.CurrentIp c.config.outsideIp = config.CurrentIp
c.config.lockPrefix = config.LockPrefix c.config.lockKeyPrefix = config.LockKeyPrefix
c.config.lockSeparator = config.LockSeparator c.config.lockKeySeparator = config.LockKeySeparator
c.config.cornPrefix = config.CornPrefix c.config.cornKeyPrefix = config.CornKeyPrefix
c.config.cornKeyCustom = config.CornKeyCustom
c.config.debug = config.Debug c.config.debug = config.Debug
// 锁 // 锁
@ -101,20 +104,6 @@ func NewJobsGorm(config *JobsGormConfig) (*JobsGorm, error) {
return nil, errors.New(fmt.Sprintf("创建任务模型失败:%v\n", err)) return nil, errors.New(fmt.Sprintf("创建任务模型失败:%v\n", err))
} }
c.config.cornKeyIp = c.getCornKeyIp()
c.config.cornKeyChannel = c.getCornKeyChannel()
c.config.cornKeyChannels = c.getCornKeyChannels()
if c.config.cornKeyIp == "" {
return nil, errors.New(fmt.Sprintf("没有配置 cornKeyIp%s", c.config.cornKeyIp))
}
if c.config.cornKeyChannel == "" {
return nil, errors.New(fmt.Sprintf("没有配置 cornKeyChannel%s", c.config.cornKeyChannel))
}
if c.config.cornKeyChannels == "" {
return nil, errors.New(fmt.Sprintf("没有配置 cornKeyChannels%s", c.config.cornKeyChannels))
}
if c.config.debug == true { if c.config.debug == true {
log.Printf("JOBS配置%+v\n", c.config) log.Printf("JOBS配置%+v\n", c.config)
} }

@ -8,15 +8,15 @@ import (
// Lock 上锁 // Lock 上锁
func (j *JobsGorm) Lock(info jobs_gorm_model.Task, id any) (string, error) { func (j *JobsGorm) Lock(info jobs_gorm_model.Task, id any) (string, error) {
return j.lockClient.Lock(fmt.Sprintf("%s%s%v%s%v", j.config.lockPrefix, j.config.lockSeparator, info.Type, j.config.lockSeparator, id), fmt.Sprintf("已在%s@%s机器上锁成功", j.config.insideIp, j.config.outsideIp), time.Duration(info.Frequency)*3*time.Second) return j.lockClient.Lock(fmt.Sprintf("%s%s%v%s%v", j.config.lockKeyPrefix, j.config.lockKeySeparator, info.Type, j.config.lockKeySeparator, id), fmt.Sprintf("已在%s@%s机器上锁成功", j.config.insideIp, j.config.outsideIp), time.Duration(info.Frequency)*3*time.Second)
} }
// Unlock Lock 解锁 // Unlock Lock 解锁
func (j *JobsGorm) Unlock(info jobs_gorm_model.Task, id any) error { func (j *JobsGorm) Unlock(info jobs_gorm_model.Task, id any) error {
return j.lockClient.Unlock(fmt.Sprintf("%s%s%v%s%v", j.config.lockPrefix, j.config.lockSeparator, info.Type, j.config.lockSeparator, id)) return j.lockClient.Unlock(fmt.Sprintf("%s%s%v%s%v", j.config.lockKeyPrefix, j.config.lockKeySeparator, info.Type, j.config.lockKeySeparator, id))
} }
// LockForever 永远上锁 // LockForever 永远上锁
func (j *JobsGorm) LockForever(info jobs_gorm_model.Task, id any) (string, error) { func (j *JobsGorm) LockForever(info jobs_gorm_model.Task, id any) (string, error) {
return j.lockClient.LockForever(fmt.Sprintf("%s%s%v%s%v", j.config.lockPrefix, j.config.lockSeparator, info.Type, j.config.lockSeparator, id), fmt.Sprintf("已在%s@%s机器永远上锁成功", j.config.insideIp, j.config.outsideIp)) return j.lockClient.LockForever(fmt.Sprintf("%s%s%v%s%v", j.config.lockKeyPrefix, j.config.lockKeySeparator, info.Type, j.config.lockKeySeparator, id), fmt.Sprintf("已在%s@%s机器永远上锁成功", j.config.insideIp, j.config.outsideIp))
} }

@ -26,13 +26,13 @@ type SubscribeResult struct {
// Subscribe 订阅 // Subscribe 订阅
func (j *JobsGorm) Subscribe(ctx context.Context) SubscribeResult { func (j *JobsGorm) Subscribe(ctx context.Context) SubscribeResult {
return SubscribeResult{ return SubscribeResult{
Message: j.redisClient.Subscribe(ctx, j.config.cornKeyChannel), Message: j.redisClient.Subscribe(ctx, j.config.cornKeyPrefix+"_"+j.config.cornKeyCustom),
} }
} }
// PSubscribe 订阅,支持通配符匹配(ch_user_*) // PSubscribe 订阅,支持通配符匹配(ch_user_*)
func (j *JobsGorm) PSubscribe(ctx context.Context) SubscribeResult { func (j *JobsGorm) PSubscribe(ctx context.Context) SubscribeResult {
return SubscribeResult{ return SubscribeResult{
Message: j.redisClient.PSubscribe(ctx, j.config.cornKeyChannels), Message: j.redisClient.PSubscribe(ctx, j.config.cornKeyPrefix+"_"+j.config.cornKeyCustom+"_*"),
} }
} }

@ -42,18 +42,18 @@ func (j *JobsGorm) GetIssueAddress(workers []string, v *jobs_gorm_model.Task) (a
if appointIpStatus == true { if appointIpStatus == true {
// 判断是否指定某ip执行 // 判断是否指定某ip执行
if gostring.Contains(currentIp, workers[0]) == true { if gostring.Contains(currentIp, workers[0]) == true {
return j.config.cornPrefix + "_" + v.SpecifyIp, nil return j.config.cornKeyPrefix + "_" + v.SpecifyIp, nil
} }
return address, errors.New(fmt.Sprintf("需要执行的[%s]客户端不在线", currentIp)) return address, errors.New(fmt.Sprintf("需要执行的[%s]客户端不在线", currentIp))
} }
return j.config.cornPrefix + "_" + workers[0], nil return j.config.cornKeyPrefix + "_" + workers[0], nil
} }
// 优先处理指定某ip执行 // 优先处理指定某ip执行
if appointIpStatus == true { if appointIpStatus == true {
for wk, wv := range workers { for wk, wv := range workers {
if gostring.Contains(currentIp, wv) == true { if gostring.Contains(currentIp, wv) == true {
return j.config.cornPrefix + "_" + workers[wk], nil return j.config.cornKeyPrefix + "_" + workers[wk], nil
} }
} }
return address, errors.New(fmt.Sprintf("需要执行的[%s]客户端不在线", currentIp)) return address, errors.New(fmt.Sprintf("需要执行的[%s]客户端不在线", currentIp))
@ -63,7 +63,7 @@ func (j *JobsGorm) GetIssueAddress(workers []string, v *jobs_gorm_model.Task) (a
if zxIp == "" { if zxIp == "" {
return address, errors.New("获取执行的客户端异常") return address, errors.New("获取执行的客户端异常")
} }
address = j.config.cornPrefix + "_" + zxIp address = j.config.cornKeyPrefix + "_" + zxIp
return address, err return address, err
} }
} }
@ -72,11 +72,11 @@ func (j *JobsGorm) GetIssueAddress(workers []string, v *jobs_gorm_model.Task) (a
func (j *JobsGorm) GetSubscribeClientList(ctx context.Context) ([]string, error) { func (j *JobsGorm) GetSubscribeClientList(ctx context.Context) ([]string, error) {
if j.config.debug == true { if j.config.debug == true {
log.Printf("获取在线的客户端:%s\n", j.config.cornPrefix+"_*") log.Printf("获取在线的客户端:%s\n", j.config.cornKeyPrefix+"_*")
} }
// 扫描 // 扫描
values, err := j.redisClient.Keys(ctx, j.config.cornPrefix+"_*").Result() values, err := j.redisClient.Keys(ctx, j.config.cornKeyPrefix+"_*").Result()
if err != nil { if err != nil {
if err == errors.New("ERR wrong number of arguments for 'mget' command") { if err == errors.New("ERR wrong number of arguments for 'mget' command") {
return []string{}, nil return []string{}, nil

@ -11,9 +11,9 @@ import (
func (j *JobsGorm) Ping(ctx context.Context) { func (j *JobsGorm) Ping(ctx context.Context) {
c := cron.New(cron.WithSeconds()) c := cron.New(cron.WithSeconds())
_, _ = c.AddFunc(GetSeconds(2).Spec(), func() { _, _ = c.AddFunc(GetSeconds(2).Spec(), func() {
result, err := j.redisClient.Set(ctx, j.config.cornKeyIp, j.config.outsideIp, 3*time.Second).Result() result, err := j.redisClient.Set(ctx, j.config.cornKeyPrefix+"_"+j.config.cornKeyCustom, j.config.outsideIp, 3*time.Second).Result()
if j.config.debug == true { if j.config.debug == true {
log.Println("JOBS心跳", j.config.cornKeyIp, j.config.outsideIp, result, err) log.Println("JOBS心跳", j.config.cornKeyPrefix+"_"+j.config.cornKeyCustom, j.config.outsideIp, result, err)
} }
}) })
c.Start() c.Start()

Loading…
Cancel
Save