diff --git a/jobs.go b/jobs.go index cb6d890..350121a 100644 --- a/jobs.go +++ b/jobs.go @@ -11,9 +11,9 @@ const ( // Cron type jobs interface { // Run 运行 - Run() + Run(info interface{}, status int, desc string) // RunAddLog 任务执行日志 - RunAddLog() + RunAddLog(id uint, runId string) // CreateInCustomId 创建正在运行任务 CreateInCustomId() // CreateInCustomIdOnly 创建正在运行唯一任务 diff --git a/jobs_gorm_check_task.go b/jobs_gorm_check_task.go index 8567ccf..a64a54e 100644 --- a/jobs_gorm_check_task.go +++ b/jobs_gorm_check_task.go @@ -7,14 +7,30 @@ import ( "log" ) -func (j *JobsGorm) Check(tx *gorm.DB, vs []jobs_gorm_model.Task) { - if j.mainService > 0 && len(vs) > 0 { +// CheckManyTask 多任务检查 +func (j *JobsGorm) CheckManyTask(tx *gorm.DB, vs []jobs_gorm_model.Task) { + if len(vs) > 0 { for _, v := range vs { diffInSecondWithAbs := gotime.Current().DiffInSecondWithAbs(gotime.SetCurrentParse(v.UpdatedAt).Time) if diffInSecondWithAbs >= v.Frequency*3 { log.Printf("每隔%v秒任务:%v相差%v秒\n", v.Frequency, v.Id, diffInSecondWithAbs) - tx.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&jobs_gorm_model.TaskLogRun{}) // 删除 + statusDelete := tx.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&jobs_gorm_model.TaskLogRun{}) + if statusDelete.RowsAffected == 0 { + log.Println("删除失败", statusDelete.Error) + } } } } } + +// CheckSingleTask 单任务检查 +func (j *JobsGorm) CheckSingleTask(tx *gorm.DB, v jobs_gorm_model.Task) { + diffInSecondWithAbs := gotime.Current().DiffInSecondWithAbs(gotime.SetCurrentParse(v.UpdatedAt).Time) + if diffInSecondWithAbs >= v.Frequency*3 { + log.Printf("每隔%v秒任务:%v相差%v秒\n", v.Frequency, v.Id, diffInSecondWithAbs) + statusDelete := tx.Where("task_id = ?", v.Id).Where("run_id = ?", v.RunId).Delete(&jobs_gorm_model.TaskLogRun{}) + if statusDelete.RowsAffected == 0 { + log.Println("删除失败", statusDelete.Error) + } + } +} diff --git a/jobs_gorm_etcd.go b/jobs_gorm_etcd.go new file mode 100644 index 0000000..44cd4a5 --- /dev/null +++ b/jobs_gorm_etcd.go @@ -0,0 +1,46 @@ +package gojobs + +import ( + "errors" + "fmt" + "go.dtapp.net/gojobs/jobs_gorm_model" + "math/rand" + "time" +) + +// GetEtcdIssueAddress 获取ETCD下发地址 +func (j *JobsGorm) GetEtcdIssueAddress(server *Etcd, v jobs_gorm_model.Task) (address string, err error) { + var ( + currentIp = "" + appointIpStatus = false + ) + // 赋值ip + if v.SpecifyIp != "" { + currentIp = v.SpecifyIp + appointIpStatus = true + } + workers, err := server.ListWorkers() + if err != nil { + return address, errors.New(fmt.Sprintf("获取在线客户端列表失败:%s", err.Error())) + } + if len(workers) < 0 { + return address, errors.New("没有客户端在线") + } + // 判断是否指定某ip执行 + if len(workers) == 1 { + if appointIpStatus == true { + if currentIp == workers[0] { + return fmt.Sprintf("%s/%d", server.IssueWatchKey(v.SpecifyIp), v.Id), nil + } + return address, errors.New("执行的客户端不在线") + } + } + // 随机返回一个 + return fmt.Sprintf("%s/%d", server.IssueWatchKey(workers[j.random(0, len(workers))]), v.Id), err +} + +// 随机返回一个 +func (j *JobsGorm) random(min, max int) int { + rand.Seed(time.Now().Unix()) + return rand.Intn(max-min) + min +} diff --git a/jobs_gorm_etcd_test.go b/jobs_gorm_etcd_test.go new file mode 100644 index 0000000..d8fa835 --- /dev/null +++ b/jobs_gorm_etcd_test.go @@ -0,0 +1,7 @@ +package gojobs + +import "testing" + +func TestRandom(t *testing.T) { + //t.Log(random(0, 2)) +} diff --git a/version.go b/version.go index 759c7ba..0b02b60 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package gojobs -const Version = "1.0.34" +const Version = "1.0.35"