- update mongo model

master v1.0.80
李光春 2 years ago
parent 649ace79cf
commit c66aa424b3

@ -3,6 +3,7 @@ package gojobs
import (
"context"
"go.dtapp.net/dorm"
"go.dtapp.net/goip"
"go.dtapp.net/golog"
)
@ -38,15 +39,16 @@ type ClientConfig struct {
type Client struct {
zapLog *golog.ZapLog // 日志服务
config struct {
debug bool // 日志开关
runVersion string // 运行版本
os string // 系统类型
arch string // 系统架构
maxProCs int // CPU核数
version string // GO版本
macAddrS string // Mac地址
insideIp string // 内网ip
outsideIp string // 外网ip
systemHostName string // 主机名
systemInsideIp string // 内网ip
systemOs string // 系统类型
systemArch string // 系统架构
systemCpuQuantity int // cpu核数
goVersion string // go版本
sdkVersion string // sdk版本
systemMacAddrS string // Mac地址
systemOutsideIp string // 外网ip
debug bool // 日志开关
}
cache struct {
redisClient *dorm.RedisClient // 数据库
@ -75,9 +77,11 @@ func NewClient(config *ClientConfig) (*Client, error) {
c.config.debug = config.Debug
if config.CurrentIp == "" {
return nil, currentIpNoConfig
config.CurrentIp = goip.GetOutsideIp(ctx)
}
if config.CurrentIp != "" && config.CurrentIp != "0.0.0.0" {
c.config.systemOutsideIp = config.CurrentIp
}
c.config.outsideIp = config.CurrentIp
// 缓存
redisClient := config.RedisClientFun()
@ -100,10 +104,10 @@ func NewClient(config *ClientConfig) (*Client, error) {
if gormClient != nil && gormClient.Db != nil {
c.db.gormClient = gormClient
c.autoMigrateTask()
c.autoMigrateTaskIp()
c.autoMigrateTaskLog()
c.autoMigrateTaskLogRun()
c.autoMigrateTask(ctx)
c.autoMigrateTaskIp(ctx)
c.autoMigrateTaskLog(ctx)
c.autoMigrateTaskLogRun(ctx)
} else {
return nil, gormClientFunNoConfig
}

@ -8,11 +8,11 @@ import (
)
func (c *Client) setConfig(ctx context.Context) {
c.config.runVersion = Version
c.config.os = runtime.GOOS
c.config.arch = runtime.GOARCH
c.config.maxProCs = runtime.GOMAXPROCS(0)
c.config.version = runtime.Version()
c.config.macAddrS = goarray.TurnString(goip.GetMacAddr(ctx))
c.config.insideIp = goip.GetInsideIp(ctx)
c.config.sdkVersion = Version
c.config.systemOs = runtime.GOOS
c.config.systemArch = runtime.GOARCH
c.config.systemCpuQuantity = runtime.GOMAXPROCS(0)
c.config.goVersion = runtime.Version()
c.config.systemMacAddrS = goarray.TurnString(goip.GetMacAddr(ctx))
c.config.systemInsideIp = goip.GetInsideIp(ctx)
}

@ -1,10 +1,14 @@
package gojobs
import (
"context"
"errors"
"fmt"
"go.dtapp.net/gojobs/jobs_gorm_model"
"go.dtapp.net/gojobs/jobs_mongo_model"
"go.dtapp.net/gostring"
"go.dtapp.net/gotime"
"go.mongodb.org/mongo-driver/bson/primitive"
"gorm.io/gorm"
)
@ -18,13 +22,13 @@ type ConfigCreateInCustomId struct {
Type string // 类型
TypeName string // 类型名称
SpecifyIp string // 指定外网IP
CurrentIp string // 当前ip
CurrentIp string // 当前外网IP
}
// CreateInCustomId 创建正在运行任务
func (c *Client) CreateInCustomId(config *ConfigCreateInCustomId) error {
func (c *Client) CreateInCustomId(ctx context.Context, config *ConfigCreateInCustomId) error {
if config.CurrentIp == "" {
config.CurrentIp = c.config.outsideIp
config.CurrentIp = c.config.systemOutsideIp
}
err := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
@ -43,6 +47,62 @@ func (c *Client) CreateInCustomId(config *ConfigCreateInCustomId) error {
if err != nil {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, err.Error()))
}
if c.db.mongoClient != nil && c.db.mongoClient.Db != nil {
go func() {
_, err = c.db.mongoClient.Database(c.db.mongoDatabaseName).
Collection(jobs_mongo_model.Task{}.TableName()).
InsertOne(&jobs_mongo_model.Task{
Id: primitive.NewObjectID(),
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
RunId: gostring.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
TypeName: config.TypeName,
SpecifyIp: config.SpecifyIp,
CreateRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().Format(),
RunIp: config.CurrentIp,
},
CurrentRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().Format(),
RunIp: config.CurrentIp,
},
NextRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().AfterSeconds(config.Frequency).Format(),
RunIp: config.CurrentIp,
},
CreateTime: primitive.NewDateTimeFromTime(gotime.Current().Time),
})
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.CreateInCustomId]%s", err.Error())
}
}()
}
return nil
}
@ -56,17 +116,17 @@ type ConfigCreateInCustomIdOnly struct {
Type string // 类型
TypeName string // 类型名称
SpecifyIp string // 指定外网IP
CurrentIp string // 当前ip
CurrentIp string // 当前外网IP
}
// CreateInCustomIdOnly 创建正在运行唯一任务
func (c *Client) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error {
func (c *Client) CreateInCustomIdOnly(ctx context.Context, config *ConfigCreateInCustomIdOnly) error {
query := c.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type))
}
if config.CurrentIp == "" {
config.CurrentIp = c.config.outsideIp
config.CurrentIp = c.config.systemOutsideIp
}
err := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
@ -85,6 +145,62 @@ func (c *Client) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error
if err != nil {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, err.Error()))
}
if c.db.mongoClient != nil && c.db.mongoClient.Db != nil {
go func() {
_, err = c.db.mongoClient.Database(c.db.mongoDatabaseName).
Collection(jobs_mongo_model.Task{}.TableName()).
InsertOne(&jobs_mongo_model.Task{
Id: primitive.NewObjectID(),
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
RunId: gostring.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
TypeName: config.TypeName,
SpecifyIp: config.SpecifyIp,
CreateRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().Format(),
RunIp: config.CurrentIp,
},
CurrentRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().Format(),
RunIp: config.CurrentIp,
},
NextRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().AfterSeconds(config.Frequency).Format(),
RunIp: config.CurrentIp,
},
CreateTime: primitive.NewDateTimeFromTime(gotime.Current().Time),
})
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.CreateInCustomIdOnly]%s", err.Error())
}
}()
}
return nil
}
@ -99,13 +215,13 @@ type ConfigCreateInCustomIdMaxNumber struct {
Type string // 类型
TypeName string // 类型名称
SpecifyIp string // 指定外网IP
CurrentIp string // 当前ip
CurrentIp string // 当前外网IP
}
// CreateInCustomIdMaxNumber 创建正在运行任务并限制数量
func (c *Client) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error {
func (c *Client) CreateInCustomIdMaxNumber(ctx context.Context, config *ConfigCreateInCustomIdMaxNumber) error {
if config.CurrentIp == "" {
config.CurrentIp = c.config.outsideIp
config.CurrentIp = c.config.systemOutsideIp
}
err := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
@ -125,6 +241,63 @@ func (c *Client) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumb
if err != nil {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, err.Error()))
}
if c.db.mongoClient != nil && c.db.mongoClient.Db != nil {
go func() {
_, err = c.db.mongoClient.Database(c.db.mongoDatabaseName).
Collection(jobs_mongo_model.Task{}.TableName()).
InsertOne(&jobs_mongo_model.Task{
Id: primitive.NewObjectID(),
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
MaxNumber: config.MaxNumber,
RunId: gostring.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
TypeName: config.TypeName,
SpecifyIp: config.SpecifyIp,
CreateRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().Format(),
RunIp: config.CurrentIp,
},
CurrentRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().Format(),
RunIp: config.CurrentIp,
},
NextRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().AfterSeconds(config.Frequency).Format(),
RunIp: config.CurrentIp,
},
CreateTime: primitive.NewDateTimeFromTime(gotime.Current().Time),
})
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.CreateInCustomIdMaxNumber]%s", err.Error())
}
}()
}
return nil
}
@ -139,17 +312,17 @@ type ConfigCreateInCustomIdMaxNumberOnly struct {
Type string // 类型
TypeName string // 类型名称
SpecifyIp string // 指定外网IP
CurrentIp string // 当前ip
CurrentIp string // 当前外网IP
}
// CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
func (c *Client) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error {
func (c *Client) CreateInCustomIdMaxNumberOnly(ctx context.Context, config *ConfigCreateInCustomIdMaxNumberOnly) error {
query := c.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type)
if query.Id != 0 {
return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type))
}
if config.CurrentIp == "" {
config.CurrentIp = c.config.outsideIp
config.CurrentIp = c.config.systemOutsideIp
}
err := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_IN,
@ -169,5 +342,62 @@ func (c *Client) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMax
if err != nil {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, err.Error()))
}
if c.db.mongoClient != nil && c.db.mongoClient.Db != nil {
go func() {
_, err = c.db.mongoClient.Database(c.db.mongoDatabaseName).
Collection(jobs_mongo_model.Task{}.TableName()).
InsertOne(&jobs_mongo_model.Task{
Id: primitive.NewObjectID(),
Status: TASK_IN,
Params: config.Params,
StatusDesc: "首次添加任务",
Frequency: config.Frequency,
MaxNumber: config.MaxNumber,
RunId: gostring.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
TypeName: config.TypeName,
SpecifyIp: config.SpecifyIp,
CreateRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().Format(),
RunIp: config.CurrentIp,
},
CurrentRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().Format(),
RunIp: config.CurrentIp,
},
NextRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().AfterSeconds(config.Frequency).Format(),
RunIp: config.CurrentIp,
},
CreateTime: primitive.NewDateTimeFromTime(gotime.Current().Time),
})
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.CreateInCustomIdMaxNumberOnly]%s", err.Error())
}
}()
}
return nil
}

@ -1,10 +1,14 @@
package gojobs
import (
"context"
"errors"
"fmt"
"go.dtapp.net/gojobs/jobs_gorm_model"
"go.dtapp.net/gojobs/jobs_mongo_model"
"go.dtapp.net/gostring"
"go.dtapp.net/gotime"
"go.mongodb.org/mongo-driver/bson/primitive"
"gorm.io/gorm"
)
@ -18,18 +22,18 @@ type ConfigCreateWaitCustomId struct {
Type string // 类型
TypeName string // 类型名称
SpecifyIp string // 指定外网IP
CurrentIp string // 当前ip
CurrentIp string // 当前外网IP
}
// CreateWaitCustomId 创建正在运行任务
func (c *Client) CreateWaitCustomId(config *ConfigCreateWaitCustomId) error {
func (c *Client) CreateWaitCustomId(ctx context.Context, config *ConfigCreateWaitCustomId) error {
if config.CurrentIp == "" {
config.CurrentIp = c.config.outsideIp
config.CurrentIp = c.config.systemOutsideIp
}
err := config.Tx.Create(&jobs_gorm_model.Task{
Status: TASK_WAIT,
Params: config.Params,
StatusDesc: "首次添加任务",
StatusDesc: "首次添加等待任务",
Frequency: config.Frequency,
RunId: gostring.GetUuId(),
CustomId: config.CustomId,
@ -43,5 +47,61 @@ func (c *Client) CreateWaitCustomId(config *ConfigCreateWaitCustomId) error {
if err != nil {
return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, err.Error()))
}
if c.db.mongoClient != nil && c.db.mongoClient.Db != nil {
go func() {
_, err = c.db.mongoClient.Database(c.db.mongoDatabaseName).
Collection(jobs_mongo_model.Task{}.TableName()).
InsertOne(&jobs_mongo_model.Task{
Id: primitive.NewObjectID(),
Status: TASK_WAIT,
Params: config.Params,
StatusDesc: "首次添加等待任务",
Frequency: config.Frequency,
RunId: gostring.GetUuId(),
CustomId: config.CustomId,
CustomSequence: config.CustomSequence,
Type: config.Type,
TypeName: config.TypeName,
SpecifyIp: config.SpecifyIp,
CreateRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().Format(),
RunIp: config.CurrentIp,
},
CurrentRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().Format(),
RunIp: config.CurrentIp,
},
NextRunInfo: jobs_mongo_model.TaskRunInfo{
SystemHostName: c.config.systemHostName,
SystemInsideIp: c.config.systemInsideIp,
SystemOs: c.config.systemOs,
SystemArch: c.config.systemArch,
SystemCpuQuantity: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
RunTime: gotime.Current().AfterSeconds(config.Frequency).Format(),
RunIp: config.CurrentIp,
},
CreateTime: primitive.NewDateTimeFromTime(gotime.Current().Time),
})
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.CreateWaitCustomId]%s", err.Error())
}
}()
}
return nil
}

@ -23,7 +23,7 @@ func (c *Client) GetRedis() *redis.Client {
// GetCurrentIp 获取当前ip
func (c *Client) GetCurrentIp() string {
return c.config.outsideIp
return c.config.systemOutsideIp
}
// GetSubscribeAddress 获取订阅地址

@ -184,27 +184,27 @@ func (c *Client) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB {
// TaskIpInit 实例任务ip
func (c *Client) TaskIpInit(tx *gorm.DB, ips map[string]string) bool {
if c.config.outsideIp == "" || c.config.outsideIp == "0.0.0.0" {
if c.config.systemOutsideIp == "" || c.config.systemOutsideIp == "0.0.0.0" {
return false
}
tx.Where("ips = ?", c.config.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除
tx.Where("ips = ?", c.config.systemOutsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除
for k, v := range ips {
if v == "" {
c.TaskIpUpdate(tx, k, c.config.outsideIp)
c.TaskIpUpdate(tx, k, c.config.systemOutsideIp)
} else {
find := strings.Contains(v, ",")
if find == true {
// 包含
parts := strings.Split(v, ",")
for _, vv := range parts {
if vv == c.config.outsideIp {
c.TaskIpUpdate(tx, k, c.config.outsideIp)
if vv == c.config.systemOutsideIp {
c.TaskIpUpdate(tx, k, c.config.systemOutsideIp)
}
}
} else {
// 不包含
if v == c.config.outsideIp {
c.TaskIpUpdate(tx, k, c.config.outsideIp)
if v == c.config.systemOutsideIp {
c.TaskIpUpdate(tx, k, c.config.systemOutsideIp)
}
}
}

@ -10,12 +10,12 @@ import (
// RefreshIp 刷新Ip
func (c *Client) RefreshIp(ctx context.Context, tx *gorm.DB) {
xip := goip.GetOutsideIp(ctx)
if c.config.outsideIp == "" || c.config.outsideIp == "0.0.0.0" {
if c.config.systemOutsideIp == "" || c.config.systemOutsideIp == "0.0.0.0" {
return
}
if c.config.outsideIp == xip {
if c.config.systemOutsideIp == xip {
return
}
tx.Where("ips = ?", c.config.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除
c.config.outsideIp = xip
tx.Where("ips = ?", c.config.systemOutsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除
c.config.systemOutsideIp = xip
}

@ -4,58 +4,38 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
)
type TaskRunInfo struct {
SystemHostName string `json:"system_host_name,omitempty" bson:"system_host_name,omitempty"` //【系统】主机名
SystemInsideIp string `json:"system_inside_ip,omitempty" bson:"system_inside_ip,omitempty"` //【系统】内网ip
SystemOs string `json:"system_os,omitempty" bson:"system_os,omitempty"` //【系统】系统类型
SystemArch string `json:"system_arch,omitempty" bson:"system_arch,omitempty"` //【系统】系统架构
SystemCpuQuantity int `json:"system_cpu_quantity,omitempty" bson:"system_cpu_quantity,omitempty"` //【系统】CPU核数
GoVersion string `json:"go_version,omitempty" bson:"go_version,omitempty"` //【程序】Go版本
SdkVersion string `json:"sdk_version,omitempty" bson:"sdk_version,omitempty"` //【程序】Sdk版本
RunTime string `json:"run_time,omitempty" bson:"run_time,omitempty"` //【系统】运行时间
RunIp string `json:"run_ip,omitempty" bson:"run_ip,omitempty"` //【系统】外网ip
RunResult string `json:"run_result,omitempty" bson:"run_result,omitempty"` //【系统】结果
}
// Task 任务
type Task struct {
Id primitive.ObjectID `json:"id,omitempty" bson:"_id,omitempty"` //【系统】记录编号
Status string `json:"status,omitempty" bson:"status,omitempty"` //【系统】状态码
Params string `json:"params,omitempty" bson:"params,omitempty"` //【系统】参数
StatusDesc string `json:"status_desc,omitempty" bson:"status_desc,omitempty"` //【系统】状态描述
Frequency int64 `json:"frequency,omitempty" bson:"frequency,omitempty"` //【系统】频率(秒单位)
Number int64 `json:"number,omitempty" bson:"number,omitempty"` //【系统】当前次数
MaxNumber int64 `json:"max_number,omitempty" bson:"max_number,omitempty"` //【系统】最大次数
RunId string `json:"run_id,omitempty" bson:"run_id,omitempty"` //【系统】执行编号
CustomId string `json:"custom_id,omitempty" bson:"custom_id,omitempty"` //【系统】自定义编号
CustomSequence int64 `json:"custom_sequence,omitempty" bson:"custom_sequence,omitempty"` //【系统】自定义顺序
Type string `json:"type,omitempty" bson:"type,omitempty"` //【系统】类型
TypeName string `json:"type_name,omitempty" bson:"type_name,omitempty"` //【系统】类型名称
SpecifyIp string `json:"specify_ip,omitempty" bson:"specify_ip,omitempty"` //【系统】指定外网IP
CreateRunInfo struct {
SystemHostName string `json:"system_host_name,omitempty" bson:"system_host_name,omitempty"` //【系统】主机名
SystemInsideIp string `json:"system_inside_ip,omitempty" bson:"system_inside_ip,omitempty"` //【系统】内网ip
SystemOs string `json:"system_os,omitempty" bson:"system_os,omitempty"` //【系统】系统类型
SystemArch string `json:"system_arch,omitempty" bson:"system_arch,omitempty"` //【系统】系统架构
SystemCpuQuantity int `json:"system_cpu_quantity,omitempty" bson:"system_cpu_quantity,omitempty"` //【系统】CPU核数
GoVersion string `json:"go_version,omitempty" bson:"go_version,omitempty"` //【程序】Go版本
SdkVersion string `json:"sdk_version,omitempty" bson:"sdk_version,omitempty"` //【程序】Sdk版本
RunTime string `json:"run_time,omitempty" bson:"run_time,omitempty"` //【系统】运行时间
RunIp string `json:"run_ip,omitempty" bson:"run_ip,omitempty"` //【系统】外网ip
RunResult string `json:"run_result,omitempty" bson:"run_result,omitempty"` //【系统】结果
} `json:"create_run_info,omitempty" bson:"create_run_info,omitempty"` //【系统】创建运行信息
CurrentRunInfo struct {
SystemHostName string `json:"system_host_name,omitempty" bson:"system_host_name,omitempty"` //【系统】主机名
SystemInsideIp string `json:"system_inside_ip,omitempty" bson:"system_inside_ip,omitempty"` //【系统】内网ip
SystemOs string `json:"system_os,omitempty" bson:"system_os,omitempty"` //【系统】系统类型
SystemArch string `json:"system_arch,omitempty" bson:"system_arch,omitempty"` //【系统】系统架构
SystemCpuQuantity int `json:"system_cpu_quantity,omitempty" bson:"system_cpu_quantity,omitempty"` //【系统】CPU核数
GoVersion string `json:"go_version,omitempty" bson:"go_version,omitempty"` //【程序】Go版本
SdkVersion string `json:"sdk_version,omitempty" bson:"sdk_version,omitempty"` //【程序】Sdk版本
RunTime string `json:"run_time,omitempty" bson:"run_time,omitempty"` //【系统】运行时间
RunIp string `json:"run_ip,omitempty" bson:"run_ip,omitempty"` //【系统】外网ip
RunResult string `json:"run_result,omitempty" bson:"run_result,omitempty"` //【系统】结果
} `json:"current_run_info,omitempty" bson:"current_run_info,omitempty"` //【系统】当前运行信息
NextRunInfo struct {
SystemHostName string `json:"system_host_name,omitempty" bson:"system_host_name,omitempty"` //【系统】主机名
SystemInsideIp string `json:"system_inside_ip,omitempty" bson:"system_inside_ip,omitempty"` //【系统】内网ip
SystemOs string `json:"system_os,omitempty" bson:"system_os,omitempty"` //【系统】系统类型
SystemArch string `json:"system_arch,omitempty" bson:"system_arch,omitempty"` //【系统】系统架构
SystemCpuQuantity int `json:"system_cpu_quantity,omitempty" bson:"system_cpu_quantity,omitempty"` //【系统】CPU核数
GoVersion string `json:"go_version,omitempty" bson:"go_version,omitempty"` //【程序】Go版本
SdkVersion string `json:"sdk_version,omitempty" bson:"sdk_version,omitempty"` //【程序】Sdk版本
RunTime string `json:"run_time,omitempty" bson:"run_time,omitempty"` //【系统】运行时间
RunIp string `json:"run_ip,omitempty" bson:"run_ip,omitempty"` //【系统】外网ip
RunResult string `json:"run_result,omitempty" bson:"run_result,omitempty"` //【系统】结果
} `json:"next_run_info,omitempty" bson:"next_run_info,omitempty"` //【系统】下一次运行信息
CurrentTime primitive.DateTime `json:"current_time,omitempty" bson:"current_time,omitempty"` //【系统】创建时间
Id primitive.ObjectID `json:"id,omitempty" bson:"_id,omitempty"` //【系统】记录编号
Status string `json:"status,omitempty" bson:"status,omitempty"` //【系统】状态码
Params string `json:"params,omitempty" bson:"params,omitempty"` //【系统】参数
StatusDesc string `json:"status_desc,omitempty" bson:"status_desc,omitempty"` //【系统】状态描述
Frequency int64 `json:"frequency,omitempty" bson:"frequency,omitempty"` //【系统】频率(秒单位)
Number int64 `json:"number,omitempty" bson:"number,omitempty"` //【系统】当前次数
MaxNumber int64 `json:"max_number,omitempty" bson:"max_number,omitempty"` //【系统】最大次数
RunId string `json:"run_id,omitempty" bson:"run_id,omitempty"` //【系统】执行编号
CustomId string `json:"custom_id,omitempty" bson:"custom_id,omitempty"` //【系统】自定义编号
CustomSequence int64 `json:"custom_sequence,omitempty" bson:"custom_sequence,omitempty"` //【系统】自定义顺序
Type string `json:"type,omitempty" bson:"type,omitempty"` //【系统】类型
TypeName string `json:"type_name,omitempty" bson:"type_name,omitempty"` //【系统】类型名称
SpecifyIp string `json:"specify_ip,omitempty" bson:"specify_ip,omitempty"` //【系统】指定外网IP
CreateRunInfo TaskRunInfo `json:"create_run_info,omitempty" bson:"create_run_info,omitempty"` //【系统】创建运行信息
CurrentRunInfo TaskRunInfo `json:"current_run_info,omitempty" bson:"current_run_info,omitempty"` //【系统】当前运行信息
NextRunInfo TaskRunInfo `json:"next_run_info,omitempty" bson:"next_run_info,omitempty"` //【系统】下一次运行信息
CreateTime primitive.DateTime `json:"create_time,omitempty" bson:"create_time,omitempty"` //【系统】创建时间
}
func (Task) TableName() string {

@ -10,7 +10,7 @@ import (
// Lock 上锁
func (c *Client) Lock(ctx context.Context, info jobs_gorm_model.Task, id any) (string, error) {
return c.cache.redisLockClient.Lock(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, id), fmt.Sprintf("已在%s@%s机器上锁成功%v", c.config.insideIp, c.config.outsideIp, gotime.Current().Format()), time.Duration(info.Frequency)*3*time.Second)
return c.cache.redisLockClient.Lock(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, id), fmt.Sprintf("已在%s@%s机器上锁成功%v", c.config.systemInsideIp, c.config.systemOutsideIp, gotime.Current().Format()), time.Duration(info.Frequency)*3*time.Second)
}
// Unlock Lock 解锁
@ -20,5 +20,5 @@ func (c *Client) Unlock(ctx context.Context, info jobs_gorm_model.Task, id any)
// LockForever 永远上锁
func (c *Client) LockForever(ctx context.Context, info jobs_gorm_model.Task, id any) (string, error) {
return c.cache.redisLockClient.LockForever(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, id), fmt.Sprintf("已在%s@%s机器永远上锁成功%v", c.config.insideIp, c.config.outsideIp, gotime.Current().Format()))
return c.cache.redisLockClient.LockForever(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, id), fmt.Sprintf("已在%s@%s机器永远上锁成功%v", c.config.systemInsideIp, c.config.systemOutsideIp, gotime.Current().Format()))
}

@ -10,7 +10,7 @@ import (
// LockId 上锁
func (c *Client) LockId(ctx context.Context, info jobs_gorm_model.Task) (string, error) {
return c.cache.redisLockClient.Lock(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, info.Id), fmt.Sprintf("已在%s@%s机器上锁成功%v", c.config.insideIp, c.config.outsideIp, gotime.Current().Format()), time.Duration(info.Frequency)*3*time.Second)
return c.cache.redisLockClient.Lock(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, info.Id), fmt.Sprintf("已在%s@%s机器上锁成功%v", c.config.systemInsideIp, c.config.systemOutsideIp, gotime.Current().Format()), time.Duration(info.Frequency)*3*time.Second)
}
// UnlockId Lock 解锁
@ -20,5 +20,5 @@ func (c *Client) UnlockId(ctx context.Context, info jobs_gorm_model.Task) error
// LockForeverId 永远上锁
func (c *Client) LockForeverId(ctx context.Context, info jobs_gorm_model.Task) (string, error) {
return c.cache.redisLockClient.LockForever(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, info.Id), fmt.Sprintf("已在%s@%s机器永远上锁成功%v", c.config.insideIp, c.config.outsideIp, gotime.Current().Format()))
return c.cache.redisLockClient.LockForever(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, info.Id), fmt.Sprintf("已在%s@%s机器永远上锁成功%v", c.config.systemInsideIp, c.config.systemOutsideIp, gotime.Current().Format()))
}

@ -10,23 +10,23 @@ import (
)
// 创建模型
func (c *Client) autoMigrateTask() {
c.zapLog.WithLogger().Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.Task{}))
func (c *Client) autoMigrateTask(ctx context.Context) {
c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.Task{}))
}
// 创建模型
func (c *Client) autoMigrateTaskIp() {
c.zapLog.WithLogger().Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskIp{}))
func (c *Client) autoMigrateTaskIp(ctx context.Context) {
c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskIp{}))
}
// 创建模型
func (c *Client) autoMigrateTaskLog() {
c.zapLog.WithLogger().Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskLog{}))
func (c *Client) autoMigrateTaskLog(ctx context.Context) {
c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskLog{}))
}
// 创建模型
func (c *Client) autoMigrateTaskLogRun() {
c.zapLog.WithLogger().Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskLogRun{}))
func (c *Client) autoMigrateTaskLogRun(ctx context.Context) {
c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskLogRun{}))
}
// 创建时间序列集合
@ -38,7 +38,7 @@ func (c *Client) mongoCreateCollectionTask(ctx context.Context) {
if commandErr != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("检查时间序列集合:%s", commandErr)
} else {
c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.mongoClient.Db.Database(c.db.mongoDatabaseName).CreateCollection(ctx, jobs_mongo_model.Task{}.TableName(), options.CreateCollection().SetTimeSeriesOptions(options.TimeSeries().SetTimeField("current_time"))))
c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.mongoClient.Db.Database(c.db.mongoDatabaseName).CreateCollection(ctx, jobs_mongo_model.Task{}.TableName(), options.CreateCollection().SetTimeSeriesOptions(options.TimeSeries().SetTimeField("create_time"))))
}
}

@ -1,55 +0,0 @@
package gojobs
import (
"context"
"go.dtapp.net/dorm"
"go.dtapp.net/gojobs/jobs_gorm_model"
"go.dtapp.net/gojobs/jobs_mongo_model"
"go.dtapp.net/gotime"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// PublishLog 发布记录
func (c *Client) PublishLog(ctx context.Context, info jobs_gorm_model.Task, recordAddress string) {
_, err := c.db.mongoClient.Database(c.db.mongoDatabaseName).
Collection(jobs_mongo_model.TaskIssueRecord{}.TableName()).
InsertOne(&jobs_mongo_model.TaskIssueRecord{
Id: primitive.NewObjectID(),
TaskInfo: jobs_mongo_model.TaskIssueRecordTaskInfo{
Id: info.Id,
Status: info.Status,
Params: info.Params,
ParamsType: info.ParamsType,
StatusDesc: info.StatusDesc,
Frequency: info.Frequency,
Number: info.Number,
MaxNumber: info.MaxNumber,
RunId: info.RunId,
CustomId: info.CustomId,
CustomSequence: info.CustomSequence,
Type: info.Type,
TypeName: info.TypeName,
CreatedIp: info.CreatedIp,
SpecifyIp: info.SpecifyIp,
UpdatedIp: info.UpdatedIp,
Result: info.Result,
NextRunTime: dorm.BsonTime(info.NextRunTime),
CreatedAt: dorm.BsonTime(info.CreatedAt),
UpdatedAt: dorm.BsonTime(info.UpdatedAt),
},
SystemInfo: jobs_mongo_model.TaskIssueRecordSystemInfo{
InsideIp: c.config.insideIp,
OutsideIp: c.config.outsideIp,
Os: c.config.os,
Arch: c.config.arch,
Gomaxprocs: c.config.maxProCs,
GoVersion: c.config.version,
SdkVersion: c.config.runVersion,
},
RecordAddress: recordAddress,
RecordTime: primitive.NewDateTimeFromTime(gotime.Current().Time),
})
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.RunAddLog.jobs_mongo_model.TaskIssueRecord]%s", err.Error())
}
}

@ -3,10 +3,8 @@ package gojobs
import (
"context"
"go.dtapp.net/gojobs/jobs_gorm_model"
"go.dtapp.net/gojobs/jobs_mongo_model"
"go.dtapp.net/gostring"
"go.dtapp.net/gotime"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// Run 运行
@ -16,29 +14,11 @@ func (c *Client) Run(ctx context.Context, info jobs_gorm_model.Task, status int,
TaskId: info.Id,
StatusCode: status,
Desc: result,
Version: c.config.runVersion,
Version: c.config.sdkVersion,
}).Error
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.Create]%s", err.Error())
}
// 记录
if c.db.mongoClient != nil && c.db.mongoClient.Db != nil {
go func() {
_, err = c.db.mongoClient.Database(c.db.mongoDatabaseName).
Collection(jobs_mongo_model.TaskLog{}.TableName()).
InsertOne(&jobs_mongo_model.TaskLog{
Id: primitive.NewObjectID(),
TaskId: info.Id,
StatusCode: status,
Desc: result,
Version: c.config.runVersion,
CreatedAt: primitive.NewDateTimeFromTime(gotime.Current().Time),
})
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.jobs_mongo_model.TaskLog]%s", err.Error())
}
}()
}
if status == 0 {
err = c.EditTask(c.db.gormClient.Db, info.Id).
Select("run_id", "result", "next_run_time").
@ -61,7 +41,7 @@ func (c *Client) Run(ctx context.Context, info jobs_gorm_model.Task, status int,
StatusDesc: "执行成功",
Number: info.Number + 1,
RunId: gostring.GetUuId(),
UpdatedIp: c.config.outsideIp,
UpdatedIp: c.config.systemOutsideIp,
Result: result,
NextRunTime: gotime.Current().AfterSeconds(info.Frequency).Time,
}).Error
@ -77,7 +57,7 @@ func (c *Client) Run(ctx context.Context, info jobs_gorm_model.Task, status int,
Status: TASK_SUCCESS,
StatusDesc: "结束执行",
Number: info.Number + 1,
UpdatedIp: c.config.outsideIp,
UpdatedIp: c.config.systemOutsideIp,
Result: result,
NextRunTime: gotime.Current().Time,
}).Error
@ -93,7 +73,7 @@ func (c *Client) Run(ctx context.Context, info jobs_gorm_model.Task, status int,
StatusDesc: "执行失败",
Number: info.Number + 1,
RunId: gostring.GetUuId(),
UpdatedIp: c.config.outsideIp,
UpdatedIp: c.config.systemOutsideIp,
Result: result,
NextRunTime: gotime.Current().AfterSeconds(info.Frequency).Time,
}).Error
@ -118,39 +98,16 @@ func (c *Client) Run(ctx context.Context, info jobs_gorm_model.Task, status int,
// RunAddLog 任务执行日志
func (c *Client) RunAddLog(ctx context.Context, id uint, runId string) error {
if c.db.mongoClient != nil && c.db.mongoClient.Db != nil {
go func() {
_, err := c.db.mongoClient.Database(c.db.mongoDatabaseName).
Collection(jobs_mongo_model.TaskLogRun{}.TableName()).
InsertOne(&jobs_mongo_model.TaskLogRun{
Id: primitive.NewObjectID(),
TaskId: id,
RunId: runId,
InsideIp: c.config.insideIp,
OutsideIp: c.config.outsideIp,
Os: c.config.os,
Arch: c.config.arch,
Gomaxprocs: c.config.maxProCs,
GoVersion: c.config.version,
SdkVersion: c.config.runVersion,
MacAddrs: c.config.macAddrS,
CreatedAt: primitive.NewDateTimeFromTime(gotime.Current().Time),
})
if err != nil {
c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.RunAddLog.jobs_mongo_model.TaskLogRun]%s", err.Error())
}
}()
}
return c.db.gormClient.Db.Create(&jobs_gorm_model.TaskLogRun{
TaskId: id,
RunId: runId,
InsideIp: c.config.insideIp,
OutsideIp: c.config.outsideIp,
Os: c.config.os,
Arch: c.config.arch,
Gomaxprocs: c.config.maxProCs,
GoVersion: c.config.version,
SdkVersion: c.config.runVersion,
MacAddrs: c.config.macAddrS,
InsideIp: c.config.systemInsideIp,
OutsideIp: c.config.systemOutsideIp,
Os: c.config.systemOs,
Arch: c.config.systemArch,
Gomaxprocs: c.config.systemCpuQuantity,
GoVersion: c.config.goVersion,
SdkVersion: c.config.sdkVersion,
MacAddrs: c.config.systemMacAddrS,
}).Error
}

Loading…
Cancel
Save