From c66aa424b35e6334ad44472f65f52f9567724bb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=85=89=E6=98=A5?= Date: Mon, 12 Sep 2022 18:22:27 +0800 Subject: [PATCH] - update mongo model --- client.go | 34 +++--- cofing.go | 14 +-- create_in.go | 254 +++++++++++++++++++++++++++++++++++++-- create_wait.go | 68 ++++++++++- get.go | 2 +- gorm_model.go | 14 +-- ip.go | 8 +- jobs_mongo_model/task.go | 80 +++++------- lock.go | 4 +- lock_id.go | 4 +- model.go | 18 +-- publish.go | 55 --------- run.go | 67 ++--------- 13 files changed, 399 insertions(+), 223 deletions(-) delete mode 100644 publish.go diff --git a/client.go b/client.go index cbd9e9e..cc7af7d 100644 --- a/client.go +++ b/client.go @@ -3,6 +3,7 @@ package gojobs import ( "context" "go.dtapp.net/dorm" + "go.dtapp.net/goip" "go.dtapp.net/golog" ) @@ -38,15 +39,16 @@ type ClientConfig struct { type Client struct { zapLog *golog.ZapLog // 日志服务 config struct { - debug bool // 日志开关 - runVersion string // 运行版本 - os string // 系统类型 - arch string // 系统架构 - maxProCs int // CPU核数 - version string // GO版本 - macAddrS string // Mac地址 - insideIp string // 内网ip - outsideIp string // 外网ip + systemHostName string // 主机名 + systemInsideIp string // 内网ip + systemOs string // 系统类型 + systemArch string // 系统架构 + systemCpuQuantity int // cpu核数 + goVersion string // go版本 + sdkVersion string // sdk版本 + systemMacAddrS string // Mac地址 + systemOutsideIp string // 外网ip + debug bool // 日志开关 } cache struct { redisClient *dorm.RedisClient // 数据库 @@ -75,9 +77,11 @@ func NewClient(config *ClientConfig) (*Client, error) { c.config.debug = config.Debug if config.CurrentIp == "" { - return nil, currentIpNoConfig + config.CurrentIp = goip.GetOutsideIp(ctx) + } + if config.CurrentIp != "" && config.CurrentIp != "0.0.0.0" { + c.config.systemOutsideIp = config.CurrentIp } - c.config.outsideIp = config.CurrentIp // 缓存 redisClient := config.RedisClientFun() @@ -100,10 +104,10 @@ func NewClient(config *ClientConfig) (*Client, error) { if gormClient != nil && gormClient.Db != nil { c.db.gormClient = gormClient - c.autoMigrateTask() - c.autoMigrateTaskIp() - c.autoMigrateTaskLog() - c.autoMigrateTaskLogRun() + c.autoMigrateTask(ctx) + c.autoMigrateTaskIp(ctx) + c.autoMigrateTaskLog(ctx) + c.autoMigrateTaskLogRun(ctx) } else { return nil, gormClientFunNoConfig } diff --git a/cofing.go b/cofing.go index deb3c98..00c1a9d 100644 --- a/cofing.go +++ b/cofing.go @@ -8,11 +8,11 @@ import ( ) func (c *Client) setConfig(ctx context.Context) { - c.config.runVersion = Version - c.config.os = runtime.GOOS - c.config.arch = runtime.GOARCH - c.config.maxProCs = runtime.GOMAXPROCS(0) - c.config.version = runtime.Version() - c.config.macAddrS = goarray.TurnString(goip.GetMacAddr(ctx)) - c.config.insideIp = goip.GetInsideIp(ctx) + c.config.sdkVersion = Version + c.config.systemOs = runtime.GOOS + c.config.systemArch = runtime.GOARCH + c.config.systemCpuQuantity = runtime.GOMAXPROCS(0) + c.config.goVersion = runtime.Version() + c.config.systemMacAddrS = goarray.TurnString(goip.GetMacAddr(ctx)) + c.config.systemInsideIp = goip.GetInsideIp(ctx) } diff --git a/create_in.go b/create_in.go index 7a9d3ce..0b1eefd 100644 --- a/create_in.go +++ b/create_in.go @@ -1,10 +1,14 @@ package gojobs import ( + "context" "errors" "fmt" "go.dtapp.net/gojobs/jobs_gorm_model" + "go.dtapp.net/gojobs/jobs_mongo_model" "go.dtapp.net/gostring" + "go.dtapp.net/gotime" + "go.mongodb.org/mongo-driver/bson/primitive" "gorm.io/gorm" ) @@ -18,13 +22,13 @@ type ConfigCreateInCustomId struct { Type string // 类型 TypeName string // 类型名称 SpecifyIp string // 指定外网IP - CurrentIp string // 当前ip + CurrentIp string // 当前外网IP } // CreateInCustomId 创建正在运行任务 -func (c *Client) CreateInCustomId(config *ConfigCreateInCustomId) error { +func (c *Client) CreateInCustomId(ctx context.Context, config *ConfigCreateInCustomId) error { if config.CurrentIp == "" { - config.CurrentIp = c.config.outsideIp + config.CurrentIp = c.config.systemOutsideIp } err := config.Tx.Create(&jobs_gorm_model.Task{ Status: TASK_IN, @@ -43,6 +47,62 @@ func (c *Client) CreateInCustomId(config *ConfigCreateInCustomId) error { if err != nil { return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, err.Error())) } + if c.db.mongoClient != nil && c.db.mongoClient.Db != nil { + go func() { + _, err = c.db.mongoClient.Database(c.db.mongoDatabaseName). + Collection(jobs_mongo_model.Task{}.TableName()). + InsertOne(&jobs_mongo_model.Task{ + Id: primitive.NewObjectID(), + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + RunId: gostring.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + TypeName: config.TypeName, + SpecifyIp: config.SpecifyIp, + CreateRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().Format(), + RunIp: config.CurrentIp, + }, + CurrentRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().Format(), + RunIp: config.CurrentIp, + }, + NextRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().AfterSeconds(config.Frequency).Format(), + RunIp: config.CurrentIp, + }, + CreateTime: primitive.NewDateTimeFromTime(gotime.Current().Time), + }) + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.CreateInCustomId]:%s", err.Error()) + } + }() + } return nil } @@ -56,17 +116,17 @@ type ConfigCreateInCustomIdOnly struct { Type string // 类型 TypeName string // 类型名称 SpecifyIp string // 指定外网IP - CurrentIp string // 当前ip + CurrentIp string // 当前外网IP } // CreateInCustomIdOnly 创建正在运行唯一任务 -func (c *Client) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error { +func (c *Client) CreateInCustomIdOnly(ctx context.Context, config *ConfigCreateInCustomIdOnly) error { query := c.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) if query.Id != 0 { return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type)) } if config.CurrentIp == "" { - config.CurrentIp = c.config.outsideIp + config.CurrentIp = c.config.systemOutsideIp } err := config.Tx.Create(&jobs_gorm_model.Task{ Status: TASK_IN, @@ -85,6 +145,62 @@ func (c *Client) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error if err != nil { return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, err.Error())) } + if c.db.mongoClient != nil && c.db.mongoClient.Db != nil { + go func() { + _, err = c.db.mongoClient.Database(c.db.mongoDatabaseName). + Collection(jobs_mongo_model.Task{}.TableName()). + InsertOne(&jobs_mongo_model.Task{ + Id: primitive.NewObjectID(), + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + RunId: gostring.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + TypeName: config.TypeName, + SpecifyIp: config.SpecifyIp, + CreateRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().Format(), + RunIp: config.CurrentIp, + }, + CurrentRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().Format(), + RunIp: config.CurrentIp, + }, + NextRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().AfterSeconds(config.Frequency).Format(), + RunIp: config.CurrentIp, + }, + CreateTime: primitive.NewDateTimeFromTime(gotime.Current().Time), + }) + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.CreateInCustomIdOnly]:%s", err.Error()) + } + }() + } return nil } @@ -99,13 +215,13 @@ type ConfigCreateInCustomIdMaxNumber struct { Type string // 类型 TypeName string // 类型名称 SpecifyIp string // 指定外网IP - CurrentIp string // 当前ip + CurrentIp string // 当前外网IP } // CreateInCustomIdMaxNumber 创建正在运行任务并限制数量 -func (c *Client) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error { +func (c *Client) CreateInCustomIdMaxNumber(ctx context.Context, config *ConfigCreateInCustomIdMaxNumber) error { if config.CurrentIp == "" { - config.CurrentIp = c.config.outsideIp + config.CurrentIp = c.config.systemOutsideIp } err := config.Tx.Create(&jobs_gorm_model.Task{ Status: TASK_IN, @@ -125,6 +241,63 @@ func (c *Client) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumb if err != nil { return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, err.Error())) } + if c.db.mongoClient != nil && c.db.mongoClient.Db != nil { + go func() { + _, err = c.db.mongoClient.Database(c.db.mongoDatabaseName). + Collection(jobs_mongo_model.Task{}.TableName()). + InsertOne(&jobs_mongo_model.Task{ + Id: primitive.NewObjectID(), + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + MaxNumber: config.MaxNumber, + RunId: gostring.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + TypeName: config.TypeName, + SpecifyIp: config.SpecifyIp, + CreateRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().Format(), + RunIp: config.CurrentIp, + }, + CurrentRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().Format(), + RunIp: config.CurrentIp, + }, + NextRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().AfterSeconds(config.Frequency).Format(), + RunIp: config.CurrentIp, + }, + CreateTime: primitive.NewDateTimeFromTime(gotime.Current().Time), + }) + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.CreateInCustomIdMaxNumber]:%s", err.Error()) + } + }() + } return nil } @@ -139,17 +312,17 @@ type ConfigCreateInCustomIdMaxNumberOnly struct { Type string // 类型 TypeName string // 类型名称 SpecifyIp string // 指定外网IP - CurrentIp string // 当前ip + CurrentIp string // 当前外网IP } // CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量 -func (c *Client) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error { +func (c *Client) CreateInCustomIdMaxNumberOnly(ctx context.Context, config *ConfigCreateInCustomIdMaxNumberOnly) error { query := c.TaskTypeTakeIn(config.Tx, config.CustomId, config.Type) if query.Id != 0 { return errors.New(fmt.Sprintf("%d:[%s@%s]任务已存在", query.Id, config.CustomId, config.Type)) } if config.CurrentIp == "" { - config.CurrentIp = c.config.outsideIp + config.CurrentIp = c.config.systemOutsideIp } err := config.Tx.Create(&jobs_gorm_model.Task{ Status: TASK_IN, @@ -169,5 +342,62 @@ func (c *Client) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMax if err != nil { return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, err.Error())) } + if c.db.mongoClient != nil && c.db.mongoClient.Db != nil { + go func() { + _, err = c.db.mongoClient.Database(c.db.mongoDatabaseName). + Collection(jobs_mongo_model.Task{}.TableName()). + InsertOne(&jobs_mongo_model.Task{ + Id: primitive.NewObjectID(), + Status: TASK_IN, + Params: config.Params, + StatusDesc: "首次添加任务", + Frequency: config.Frequency, + MaxNumber: config.MaxNumber, + RunId: gostring.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + TypeName: config.TypeName, + SpecifyIp: config.SpecifyIp, + CreateRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().Format(), + RunIp: config.CurrentIp, + }, + CurrentRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().Format(), + RunIp: config.CurrentIp, + }, + NextRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().AfterSeconds(config.Frequency).Format(), + RunIp: config.CurrentIp, + }, + CreateTime: primitive.NewDateTimeFromTime(gotime.Current().Time), + }) + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.CreateInCustomIdMaxNumberOnly]:%s", err.Error()) + } + }() + } return nil } diff --git a/create_wait.go b/create_wait.go index 1bdadbb..9b38e3a 100644 --- a/create_wait.go +++ b/create_wait.go @@ -1,10 +1,14 @@ package gojobs import ( + "context" "errors" "fmt" "go.dtapp.net/gojobs/jobs_gorm_model" + "go.dtapp.net/gojobs/jobs_mongo_model" "go.dtapp.net/gostring" + "go.dtapp.net/gotime" + "go.mongodb.org/mongo-driver/bson/primitive" "gorm.io/gorm" ) @@ -18,18 +22,18 @@ type ConfigCreateWaitCustomId struct { Type string // 类型 TypeName string // 类型名称 SpecifyIp string // 指定外网IP - CurrentIp string // 当前ip + CurrentIp string // 当前外网IP } // CreateWaitCustomId 创建正在运行任务 -func (c *Client) CreateWaitCustomId(config *ConfigCreateWaitCustomId) error { +func (c *Client) CreateWaitCustomId(ctx context.Context, config *ConfigCreateWaitCustomId) error { if config.CurrentIp == "" { - config.CurrentIp = c.config.outsideIp + config.CurrentIp = c.config.systemOutsideIp } err := config.Tx.Create(&jobs_gorm_model.Task{ Status: TASK_WAIT, Params: config.Params, - StatusDesc: "首次添加任务", + StatusDesc: "首次添加等待任务", Frequency: config.Frequency, RunId: gostring.GetUuId(), CustomId: config.CustomId, @@ -43,5 +47,61 @@ func (c *Client) CreateWaitCustomId(config *ConfigCreateWaitCustomId) error { if err != nil { return errors.New(fmt.Sprintf("创建[%s@%s]任务失败:%s", config.CustomId, config.Type, err.Error())) } + if c.db.mongoClient != nil && c.db.mongoClient.Db != nil { + go func() { + _, err = c.db.mongoClient.Database(c.db.mongoDatabaseName). + Collection(jobs_mongo_model.Task{}.TableName()). + InsertOne(&jobs_mongo_model.Task{ + Id: primitive.NewObjectID(), + Status: TASK_WAIT, + Params: config.Params, + StatusDesc: "首次添加等待任务", + Frequency: config.Frequency, + RunId: gostring.GetUuId(), + CustomId: config.CustomId, + CustomSequence: config.CustomSequence, + Type: config.Type, + TypeName: config.TypeName, + SpecifyIp: config.SpecifyIp, + CreateRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().Format(), + RunIp: config.CurrentIp, + }, + CurrentRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().Format(), + RunIp: config.CurrentIp, + }, + NextRunInfo: jobs_mongo_model.TaskRunInfo{ + SystemHostName: c.config.systemHostName, + SystemInsideIp: c.config.systemInsideIp, + SystemOs: c.config.systemOs, + SystemArch: c.config.systemArch, + SystemCpuQuantity: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + RunTime: gotime.Current().AfterSeconds(config.Frequency).Format(), + RunIp: config.CurrentIp, + }, + CreateTime: primitive.NewDateTimeFromTime(gotime.Current().Time), + }) + if err != nil { + c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.CreateWaitCustomId]:%s", err.Error()) + } + }() + } return nil } diff --git a/get.go b/get.go index 9b581cc..4d3d7f3 100644 --- a/get.go +++ b/get.go @@ -23,7 +23,7 @@ func (c *Client) GetRedis() *redis.Client { // GetCurrentIp 获取当前ip func (c *Client) GetCurrentIp() string { - return c.config.outsideIp + return c.config.systemOutsideIp } // GetSubscribeAddress 获取订阅地址 diff --git a/gorm_model.go b/gorm_model.go index 69e563e..8356326 100644 --- a/gorm_model.go +++ b/gorm_model.go @@ -184,27 +184,27 @@ func (c *Client) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB { // TaskIpInit 实例任务ip func (c *Client) TaskIpInit(tx *gorm.DB, ips map[string]string) bool { - if c.config.outsideIp == "" || c.config.outsideIp == "0.0.0.0" { + if c.config.systemOutsideIp == "" || c.config.systemOutsideIp == "0.0.0.0" { return false } - tx.Where("ips = ?", c.config.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除 + tx.Where("ips = ?", c.config.systemOutsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除 for k, v := range ips { if v == "" { - c.TaskIpUpdate(tx, k, c.config.outsideIp) + c.TaskIpUpdate(tx, k, c.config.systemOutsideIp) } else { find := strings.Contains(v, ",") if find == true { // 包含 parts := strings.Split(v, ",") for _, vv := range parts { - if vv == c.config.outsideIp { - c.TaskIpUpdate(tx, k, c.config.outsideIp) + if vv == c.config.systemOutsideIp { + c.TaskIpUpdate(tx, k, c.config.systemOutsideIp) } } } else { // 不包含 - if v == c.config.outsideIp { - c.TaskIpUpdate(tx, k, c.config.outsideIp) + if v == c.config.systemOutsideIp { + c.TaskIpUpdate(tx, k, c.config.systemOutsideIp) } } } diff --git a/ip.go b/ip.go index a42b2e2..7ac236e 100644 --- a/ip.go +++ b/ip.go @@ -10,12 +10,12 @@ import ( // RefreshIp 刷新Ip func (c *Client) RefreshIp(ctx context.Context, tx *gorm.DB) { xip := goip.GetOutsideIp(ctx) - if c.config.outsideIp == "" || c.config.outsideIp == "0.0.0.0" { + if c.config.systemOutsideIp == "" || c.config.systemOutsideIp == "0.0.0.0" { return } - if c.config.outsideIp == xip { + if c.config.systemOutsideIp == xip { return } - tx.Where("ips = ?", c.config.outsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除 - c.config.outsideIp = xip + tx.Where("ips = ?", c.config.systemOutsideIp).Delete(&jobs_gorm_model.TaskIp{}) // 删除 + c.config.systemOutsideIp = xip } diff --git a/jobs_mongo_model/task.go b/jobs_mongo_model/task.go index 40e032d..58e9838 100644 --- a/jobs_mongo_model/task.go +++ b/jobs_mongo_model/task.go @@ -4,58 +4,38 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) +type TaskRunInfo struct { + SystemHostName string `json:"system_host_name,omitempty" bson:"system_host_name,omitempty"` //【系统】主机名 + SystemInsideIp string `json:"system_inside_ip,omitempty" bson:"system_inside_ip,omitempty"` //【系统】内网ip + SystemOs string `json:"system_os,omitempty" bson:"system_os,omitempty"` //【系统】系统类型 + SystemArch string `json:"system_arch,omitempty" bson:"system_arch,omitempty"` //【系统】系统架构 + SystemCpuQuantity int `json:"system_cpu_quantity,omitempty" bson:"system_cpu_quantity,omitempty"` //【系统】CPU核数 + GoVersion string `json:"go_version,omitempty" bson:"go_version,omitempty"` //【程序】Go版本 + SdkVersion string `json:"sdk_version,omitempty" bson:"sdk_version,omitempty"` //【程序】Sdk版本 + RunTime string `json:"run_time,omitempty" bson:"run_time,omitempty"` //【系统】运行时间 + RunIp string `json:"run_ip,omitempty" bson:"run_ip,omitempty"` //【系统】外网ip + RunResult string `json:"run_result,omitempty" bson:"run_result,omitempty"` //【系统】结果 +} + // Task 任务 type Task struct { - Id primitive.ObjectID `json:"id,omitempty" bson:"_id,omitempty"` //【系统】记录编号 - Status string `json:"status,omitempty" bson:"status,omitempty"` //【系统】状态码 - Params string `json:"params,omitempty" bson:"params,omitempty"` //【系统】参数 - StatusDesc string `json:"status_desc,omitempty" bson:"status_desc,omitempty"` //【系统】状态描述 - Frequency int64 `json:"frequency,omitempty" bson:"frequency,omitempty"` //【系统】频率(秒单位) - Number int64 `json:"number,omitempty" bson:"number,omitempty"` //【系统】当前次数 - MaxNumber int64 `json:"max_number,omitempty" bson:"max_number,omitempty"` //【系统】最大次数 - RunId string `json:"run_id,omitempty" bson:"run_id,omitempty"` //【系统】执行编号 - CustomId string `json:"custom_id,omitempty" bson:"custom_id,omitempty"` //【系统】自定义编号 - CustomSequence int64 `json:"custom_sequence,omitempty" bson:"custom_sequence,omitempty"` //【系统】自定义顺序 - Type string `json:"type,omitempty" bson:"type,omitempty"` //【系统】类型 - TypeName string `json:"type_name,omitempty" bson:"type_name,omitempty"` //【系统】类型名称 - SpecifyIp string `json:"specify_ip,omitempty" bson:"specify_ip,omitempty"` //【系统】指定外网IP - CreateRunInfo struct { - SystemHostName string `json:"system_host_name,omitempty" bson:"system_host_name,omitempty"` //【系统】主机名 - SystemInsideIp string `json:"system_inside_ip,omitempty" bson:"system_inside_ip,omitempty"` //【系统】内网ip - SystemOs string `json:"system_os,omitempty" bson:"system_os,omitempty"` //【系统】系统类型 - SystemArch string `json:"system_arch,omitempty" bson:"system_arch,omitempty"` //【系统】系统架构 - SystemCpuQuantity int `json:"system_cpu_quantity,omitempty" bson:"system_cpu_quantity,omitempty"` //【系统】CPU核数 - GoVersion string `json:"go_version,omitempty" bson:"go_version,omitempty"` //【程序】Go版本 - SdkVersion string `json:"sdk_version,omitempty" bson:"sdk_version,omitempty"` //【程序】Sdk版本 - RunTime string `json:"run_time,omitempty" bson:"run_time,omitempty"` //【系统】运行时间 - RunIp string `json:"run_ip,omitempty" bson:"run_ip,omitempty"` //【系统】外网ip - RunResult string `json:"run_result,omitempty" bson:"run_result,omitempty"` //【系统】结果 - } `json:"create_run_info,omitempty" bson:"create_run_info,omitempty"` //【系统】创建运行信息 - CurrentRunInfo struct { - SystemHostName string `json:"system_host_name,omitempty" bson:"system_host_name,omitempty"` //【系统】主机名 - SystemInsideIp string `json:"system_inside_ip,omitempty" bson:"system_inside_ip,omitempty"` //【系统】内网ip - SystemOs string `json:"system_os,omitempty" bson:"system_os,omitempty"` //【系统】系统类型 - SystemArch string `json:"system_arch,omitempty" bson:"system_arch,omitempty"` //【系统】系统架构 - SystemCpuQuantity int `json:"system_cpu_quantity,omitempty" bson:"system_cpu_quantity,omitempty"` //【系统】CPU核数 - GoVersion string `json:"go_version,omitempty" bson:"go_version,omitempty"` //【程序】Go版本 - SdkVersion string `json:"sdk_version,omitempty" bson:"sdk_version,omitempty"` //【程序】Sdk版本 - RunTime string `json:"run_time,omitempty" bson:"run_time,omitempty"` //【系统】运行时间 - RunIp string `json:"run_ip,omitempty" bson:"run_ip,omitempty"` //【系统】外网ip - RunResult string `json:"run_result,omitempty" bson:"run_result,omitempty"` //【系统】结果 - } `json:"current_run_info,omitempty" bson:"current_run_info,omitempty"` //【系统】当前运行信息 - NextRunInfo struct { - SystemHostName string `json:"system_host_name,omitempty" bson:"system_host_name,omitempty"` //【系统】主机名 - SystemInsideIp string `json:"system_inside_ip,omitempty" bson:"system_inside_ip,omitempty"` //【系统】内网ip - SystemOs string `json:"system_os,omitempty" bson:"system_os,omitempty"` //【系统】系统类型 - SystemArch string `json:"system_arch,omitempty" bson:"system_arch,omitempty"` //【系统】系统架构 - SystemCpuQuantity int `json:"system_cpu_quantity,omitempty" bson:"system_cpu_quantity,omitempty"` //【系统】CPU核数 - GoVersion string `json:"go_version,omitempty" bson:"go_version,omitempty"` //【程序】Go版本 - SdkVersion string `json:"sdk_version,omitempty" bson:"sdk_version,omitempty"` //【程序】Sdk版本 - RunTime string `json:"run_time,omitempty" bson:"run_time,omitempty"` //【系统】运行时间 - RunIp string `json:"run_ip,omitempty" bson:"run_ip,omitempty"` //【系统】外网ip - RunResult string `json:"run_result,omitempty" bson:"run_result,omitempty"` //【系统】结果 - } `json:"next_run_info,omitempty" bson:"next_run_info,omitempty"` //【系统】下一次运行信息 - CurrentTime primitive.DateTime `json:"current_time,omitempty" bson:"current_time,omitempty"` //【系统】创建时间 + Id primitive.ObjectID `json:"id,omitempty" bson:"_id,omitempty"` //【系统】记录编号 + Status string `json:"status,omitempty" bson:"status,omitempty"` //【系统】状态码 + Params string `json:"params,omitempty" bson:"params,omitempty"` //【系统】参数 + StatusDesc string `json:"status_desc,omitempty" bson:"status_desc,omitempty"` //【系统】状态描述 + Frequency int64 `json:"frequency,omitempty" bson:"frequency,omitempty"` //【系统】频率(秒单位) + Number int64 `json:"number,omitempty" bson:"number,omitempty"` //【系统】当前次数 + MaxNumber int64 `json:"max_number,omitempty" bson:"max_number,omitempty"` //【系统】最大次数 + RunId string `json:"run_id,omitempty" bson:"run_id,omitempty"` //【系统】执行编号 + CustomId string `json:"custom_id,omitempty" bson:"custom_id,omitempty"` //【系统】自定义编号 + CustomSequence int64 `json:"custom_sequence,omitempty" bson:"custom_sequence,omitempty"` //【系统】自定义顺序 + Type string `json:"type,omitempty" bson:"type,omitempty"` //【系统】类型 + TypeName string `json:"type_name,omitempty" bson:"type_name,omitempty"` //【系统】类型名称 + SpecifyIp string `json:"specify_ip,omitempty" bson:"specify_ip,omitempty"` //【系统】指定外网IP + CreateRunInfo TaskRunInfo `json:"create_run_info,omitempty" bson:"create_run_info,omitempty"` //【系统】创建运行信息 + CurrentRunInfo TaskRunInfo `json:"current_run_info,omitempty" bson:"current_run_info,omitempty"` //【系统】当前运行信息 + NextRunInfo TaskRunInfo `json:"next_run_info,omitempty" bson:"next_run_info,omitempty"` //【系统】下一次运行信息 + CreateTime primitive.DateTime `json:"create_time,omitempty" bson:"create_time,omitempty"` //【系统】创建时间 } func (Task) TableName() string { diff --git a/lock.go b/lock.go index 332efd2..9c93693 100644 --- a/lock.go +++ b/lock.go @@ -10,7 +10,7 @@ import ( // Lock 上锁 func (c *Client) Lock(ctx context.Context, info jobs_gorm_model.Task, id any) (string, error) { - return c.cache.redisLockClient.Lock(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, id), fmt.Sprintf("已在%s@%s机器上锁成功,%v", c.config.insideIp, c.config.outsideIp, gotime.Current().Format()), time.Duration(info.Frequency)*3*time.Second) + return c.cache.redisLockClient.Lock(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, id), fmt.Sprintf("已在%s@%s机器上锁成功,%v", c.config.systemInsideIp, c.config.systemOutsideIp, gotime.Current().Format()), time.Duration(info.Frequency)*3*time.Second) } // Unlock Lock 解锁 @@ -20,5 +20,5 @@ func (c *Client) Unlock(ctx context.Context, info jobs_gorm_model.Task, id any) // LockForever 永远上锁 func (c *Client) LockForever(ctx context.Context, info jobs_gorm_model.Task, id any) (string, error) { - return c.cache.redisLockClient.LockForever(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, id), fmt.Sprintf("已在%s@%s机器永远上锁成功,%v", c.config.insideIp, c.config.outsideIp, gotime.Current().Format())) + return c.cache.redisLockClient.LockForever(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, id), fmt.Sprintf("已在%s@%s机器永远上锁成功,%v", c.config.systemInsideIp, c.config.systemOutsideIp, gotime.Current().Format())) } diff --git a/lock_id.go b/lock_id.go index df2377f..4a7d58f 100644 --- a/lock_id.go +++ b/lock_id.go @@ -10,7 +10,7 @@ import ( // LockId 上锁 func (c *Client) LockId(ctx context.Context, info jobs_gorm_model.Task) (string, error) { - return c.cache.redisLockClient.Lock(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, info.Id), fmt.Sprintf("已在%s@%s机器上锁成功,%v", c.config.insideIp, c.config.outsideIp, gotime.Current().Format()), time.Duration(info.Frequency)*3*time.Second) + return c.cache.redisLockClient.Lock(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, info.Id), fmt.Sprintf("已在%s@%s机器上锁成功,%v", c.config.systemInsideIp, c.config.systemOutsideIp, gotime.Current().Format()), time.Duration(info.Frequency)*3*time.Second) } // UnlockId Lock 解锁 @@ -20,5 +20,5 @@ func (c *Client) UnlockId(ctx context.Context, info jobs_gorm_model.Task) error // LockForeverId 永远上锁 func (c *Client) LockForeverId(ctx context.Context, info jobs_gorm_model.Task) (string, error) { - return c.cache.redisLockClient.LockForever(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, info.Id), fmt.Sprintf("已在%s@%s机器永远上锁成功,%v", c.config.insideIp, c.config.outsideIp, gotime.Current().Format())) + return c.cache.redisLockClient.LockForever(ctx, fmt.Sprintf("%s%s%v%s%v", c.cache.lockKeyPrefix, c.cache.lockKeySeparator, info.Type, c.cache.lockKeySeparator, info.Id), fmt.Sprintf("已在%s@%s机器永远上锁成功,%v", c.config.systemInsideIp, c.config.systemOutsideIp, gotime.Current().Format())) } diff --git a/model.go b/model.go index 945106b..006021b 100644 --- a/model.go +++ b/model.go @@ -10,23 +10,23 @@ import ( ) // 创建模型 -func (c *Client) autoMigrateTask() { - c.zapLog.WithLogger().Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.Task{})) +func (c *Client) autoMigrateTask(ctx context.Context) { + c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.Task{})) } // 创建模型 -func (c *Client) autoMigrateTaskIp() { - c.zapLog.WithLogger().Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskIp{})) +func (c *Client) autoMigrateTaskIp(ctx context.Context) { + c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskIp{})) } // 创建模型 -func (c *Client) autoMigrateTaskLog() { - c.zapLog.WithLogger().Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskLog{})) +func (c *Client) autoMigrateTaskLog(ctx context.Context) { + c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskLog{})) } // 创建模型 -func (c *Client) autoMigrateTaskLogRun() { - c.zapLog.WithLogger().Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskLogRun{})) +func (c *Client) autoMigrateTaskLogRun(ctx context.Context) { + c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.gormClient.Db.AutoMigrate(&jobs_gorm_model.TaskLogRun{})) } // 创建时间序列集合 @@ -38,7 +38,7 @@ func (c *Client) mongoCreateCollectionTask(ctx context.Context) { if commandErr != nil { c.zapLog.WithTraceId(ctx).Sugar().Errorf("检查时间序列集合:%s", commandErr) } else { - c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.mongoClient.Db.Database(c.db.mongoDatabaseName).CreateCollection(ctx, jobs_mongo_model.Task{}.TableName(), options.CreateCollection().SetTimeSeriesOptions(options.TimeSeries().SetTimeField("current_time")))) + c.zapLog.WithTraceId(ctx).Sugar().Info(c.db.mongoClient.Db.Database(c.db.mongoDatabaseName).CreateCollection(ctx, jobs_mongo_model.Task{}.TableName(), options.CreateCollection().SetTimeSeriesOptions(options.TimeSeries().SetTimeField("create_time")))) } } diff --git a/publish.go b/publish.go deleted file mode 100644 index fe31330..0000000 --- a/publish.go +++ /dev/null @@ -1,55 +0,0 @@ -package gojobs - -import ( - "context" - "go.dtapp.net/dorm" - "go.dtapp.net/gojobs/jobs_gorm_model" - "go.dtapp.net/gojobs/jobs_mongo_model" - "go.dtapp.net/gotime" - "go.mongodb.org/mongo-driver/bson/primitive" -) - -// PublishLog 发布记录 -func (c *Client) PublishLog(ctx context.Context, info jobs_gorm_model.Task, recordAddress string) { - _, err := c.db.mongoClient.Database(c.db.mongoDatabaseName). - Collection(jobs_mongo_model.TaskIssueRecord{}.TableName()). - InsertOne(&jobs_mongo_model.TaskIssueRecord{ - Id: primitive.NewObjectID(), - TaskInfo: jobs_mongo_model.TaskIssueRecordTaskInfo{ - Id: info.Id, - Status: info.Status, - Params: info.Params, - ParamsType: info.ParamsType, - StatusDesc: info.StatusDesc, - Frequency: info.Frequency, - Number: info.Number, - MaxNumber: info.MaxNumber, - RunId: info.RunId, - CustomId: info.CustomId, - CustomSequence: info.CustomSequence, - Type: info.Type, - TypeName: info.TypeName, - CreatedIp: info.CreatedIp, - SpecifyIp: info.SpecifyIp, - UpdatedIp: info.UpdatedIp, - Result: info.Result, - NextRunTime: dorm.BsonTime(info.NextRunTime), - CreatedAt: dorm.BsonTime(info.CreatedAt), - UpdatedAt: dorm.BsonTime(info.UpdatedAt), - }, - SystemInfo: jobs_mongo_model.TaskIssueRecordSystemInfo{ - InsideIp: c.config.insideIp, - OutsideIp: c.config.outsideIp, - Os: c.config.os, - Arch: c.config.arch, - Gomaxprocs: c.config.maxProCs, - GoVersion: c.config.version, - SdkVersion: c.config.runVersion, - }, - RecordAddress: recordAddress, - RecordTime: primitive.NewDateTimeFromTime(gotime.Current().Time), - }) - if err != nil { - c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.RunAddLog.jobs_mongo_model.TaskIssueRecord]:%s", err.Error()) - } -} diff --git a/run.go b/run.go index 6b4443c..207e9fd 100644 --- a/run.go +++ b/run.go @@ -3,10 +3,8 @@ package gojobs import ( "context" "go.dtapp.net/gojobs/jobs_gorm_model" - "go.dtapp.net/gojobs/jobs_mongo_model" "go.dtapp.net/gostring" "go.dtapp.net/gotime" - "go.mongodb.org/mongo-driver/bson/primitive" ) // Run 运行 @@ -16,29 +14,11 @@ func (c *Client) Run(ctx context.Context, info jobs_gorm_model.Task, status int, TaskId: info.Id, StatusCode: status, Desc: result, - Version: c.config.runVersion, + Version: c.config.sdkVersion, }).Error if err != nil { c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.Create]:%s", err.Error()) } - // 记录 - if c.db.mongoClient != nil && c.db.mongoClient.Db != nil { - go func() { - _, err = c.db.mongoClient.Database(c.db.mongoDatabaseName). - Collection(jobs_mongo_model.TaskLog{}.TableName()). - InsertOne(&jobs_mongo_model.TaskLog{ - Id: primitive.NewObjectID(), - TaskId: info.Id, - StatusCode: status, - Desc: result, - Version: c.config.runVersion, - CreatedAt: primitive.NewDateTimeFromTime(gotime.Current().Time), - }) - if err != nil { - c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.Run.jobs_mongo_model.TaskLog]:%s", err.Error()) - } - }() - } if status == 0 { err = c.EditTask(c.db.gormClient.Db, info.Id). Select("run_id", "result", "next_run_time"). @@ -61,7 +41,7 @@ func (c *Client) Run(ctx context.Context, info jobs_gorm_model.Task, status int, StatusDesc: "执行成功", Number: info.Number + 1, RunId: gostring.GetUuId(), - UpdatedIp: c.config.outsideIp, + UpdatedIp: c.config.systemOutsideIp, Result: result, NextRunTime: gotime.Current().AfterSeconds(info.Frequency).Time, }).Error @@ -77,7 +57,7 @@ func (c *Client) Run(ctx context.Context, info jobs_gorm_model.Task, status int, Status: TASK_SUCCESS, StatusDesc: "结束执行", Number: info.Number + 1, - UpdatedIp: c.config.outsideIp, + UpdatedIp: c.config.systemOutsideIp, Result: result, NextRunTime: gotime.Current().Time, }).Error @@ -93,7 +73,7 @@ func (c *Client) Run(ctx context.Context, info jobs_gorm_model.Task, status int, StatusDesc: "执行失败", Number: info.Number + 1, RunId: gostring.GetUuId(), - UpdatedIp: c.config.outsideIp, + UpdatedIp: c.config.systemOutsideIp, Result: result, NextRunTime: gotime.Current().AfterSeconds(info.Frequency).Time, }).Error @@ -118,39 +98,16 @@ func (c *Client) Run(ctx context.Context, info jobs_gorm_model.Task, status int, // RunAddLog 任务执行日志 func (c *Client) RunAddLog(ctx context.Context, id uint, runId string) error { - if c.db.mongoClient != nil && c.db.mongoClient.Db != nil { - go func() { - _, err := c.db.mongoClient.Database(c.db.mongoDatabaseName). - Collection(jobs_mongo_model.TaskLogRun{}.TableName()). - InsertOne(&jobs_mongo_model.TaskLogRun{ - Id: primitive.NewObjectID(), - TaskId: id, - RunId: runId, - InsideIp: c.config.insideIp, - OutsideIp: c.config.outsideIp, - Os: c.config.os, - Arch: c.config.arch, - Gomaxprocs: c.config.maxProCs, - GoVersion: c.config.version, - SdkVersion: c.config.runVersion, - MacAddrs: c.config.macAddrS, - CreatedAt: primitive.NewDateTimeFromTime(gotime.Current().Time), - }) - if err != nil { - c.zapLog.WithTraceId(ctx).Sugar().Errorf("[gojobs.RunAddLog.jobs_mongo_model.TaskLogRun]:%s", err.Error()) - } - }() - } return c.db.gormClient.Db.Create(&jobs_gorm_model.TaskLogRun{ TaskId: id, RunId: runId, - InsideIp: c.config.insideIp, - OutsideIp: c.config.outsideIp, - Os: c.config.os, - Arch: c.config.arch, - Gomaxprocs: c.config.maxProCs, - GoVersion: c.config.version, - SdkVersion: c.config.runVersion, - MacAddrs: c.config.macAddrS, + InsideIp: c.config.systemInsideIp, + OutsideIp: c.config.systemOutsideIp, + Os: c.config.systemOs, + Arch: c.config.systemArch, + Gomaxprocs: c.config.systemCpuQuantity, + GoVersion: c.config.goVersion, + SdkVersion: c.config.sdkVersion, + MacAddrs: c.config.systemMacAddrS, }).Error }