diff --git a/CHANGELOG.md b/CHANGELOG.md index cbf4928..49acf9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## v1.0.21 + +- 增加任务自动创建 + ## v1.0.20 - 优化任务 diff --git a/etcd_server_test.go b/etcd_server_test.go new file mode 100644 index 0000000..67b95c1 --- /dev/null +++ b/etcd_server_test.go @@ -0,0 +1,67 @@ +package gojobs + +import ( + "context" + "github.com/robfig/cron/v3" + "log" + "testing" + "time" +) + +func TestEtcdServer(t *testing.T) { + server, err := NewEtcdServer(&EtcdConfig{ + Endpoints: []string{"http://127.0.0.1:2379"}, + DialTimeout: time.Second * 5, + Username: "root", + Password: "p5sttPYcFWw7Z7aP", + }) + if err != nil { + panic(err) + } + defer server.Close() + + // 创建一个cron实例 精确到秒 + c := cron.New(cron.WithSeconds()) + + // 每隔15秒执行一次 + _, _ = c.AddFunc("*/15 * * * * *", func() { + + create, err := server.Create(context.TODO(), server.IssueWatchKey("116.30.228.12")+"/"+"wechat_1_test", "每隔15秒执行一次") + if err != nil { + log.Println("创建任务失败", err) + } + log.Println("创建任务成功", create, err) + + }) + + // 每隔30秒执行一次 + _, _ = c.AddFunc("*/30 * * * * *", func() { + + create, err := server.Create(context.TODO(), server.IssueWatchKey("127.0.0.1")+"/"+"wechat_2_test", "每隔30秒执行一次") + if err != nil { + log.Println("创建任务失败", err) + } + log.Println("创建任务成功", create, err) + + }) + + // 每隔1分钟执行一次 + _, _ = c.AddFunc("0 */1 * * * *", func() { + + create, err := server.Create(context.TODO(), server.IssueWatchKey("116.30.228.12")+"/"+"wechat_3_test", "每隔1分钟执行一次") + if err != nil { + log.Println("创建任务失败", err) + } + log.Println("创建任务成功", create, err) + + workers, _ := server.ListWorkers() + log.Println("ListWorkers", workers) + }) + + // 启动任务 + c.Start() + + // 关闭任务 + defer c.Stop() + select {} +} diff --git a/etcd_worker_test.go b/etcd_worker_test.go new file mode 100644 index 0000000..bb07877 --- /dev/null +++ b/etcd_worker_test.go @@ -0,0 +1,70 @@ +package gojobs + +import ( + "context" + "fmt" + "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/client/v3" + "log" + "sync" + "testing" + "time" +) + +func TestEtcdWorker(t *testing.T) { + server, err := NewEtcdWorker(&EtcdConfig{ + Endpoints: []string{"http://127.0.0.1:2379"}, + DialTimeout: time.Second * 5, + Username: "root", + Password: "p5sttPYcFWw7Z7aP", + }) + if err != nil { + panic(err) + } + defer server.Close() + + // 监听 + go func() { + rch := server.Watch(context.TODO(), server.GetWatchKey()+"/", clientv3.WithPrefix()) + // 处理监听事件 + for watchResp := range rch { + for _, watchEvent := range watchResp.Events { + switch watchEvent.Type { + case mvccpb.PUT: + + // 收到任务 + log.Printf("监听收到任务 键名:%s 值:%s\n", watchEvent.Kv.Key, watchEvent.Kv.Value) + log.Println("处理监听") + + wg := sync.WaitGroup{} + wg.Add(1) + go run(&wg) + wg.Wait() + + _, err = server.Delete(context.TODO(), string(watchEvent.Kv.Key)) + if err != nil { + log.Println("删除失败", err) + } else { + log.Println("删除成功") + } + case mvccpb.DELETE: + // 任务被删除了 + + } + + } + } + log.Println("out") + }() + + select {} +} + +func run(wg *sync.WaitGroup) { + log.Println("等待开始") + fmt.Println(time.Now()) + time.Sleep(time.Second * 10) + fmt.Println(time.Now()) + log.Println("等待结束") + wg.Done() +} diff --git a/jobs_gorm.go b/jobs_gorm.go index d30b620..c1353a9 100644 --- a/jobs_gorm.go +++ b/jobs_gorm.go @@ -1,6 +1,8 @@ package gojobs import ( + "errors" + "fmt" "go.dtapp.net/gojobs/jobs_gorm" "go.dtapp.net/goredis" "gorm.io/gorm" @@ -13,12 +15,25 @@ type ConfigJobsGorm struct { } func NewJobsGorm(config *ConfigJobsGorm) *jobs_gorm.JobsGorm { + var ( jobsGorm = &jobs_gorm.JobsGorm{} ) + jobsGorm = jobs_gorm.NewGorm(jobs_gorm.JobsGorm{ Db: config.Db, Redis: config.Redis, }, config.MainService, Version) + + err := jobsGorm.Db.AutoMigrate( + &jobs_gorm.Task{}, + &jobs_gorm.TaskLog{}, + &jobs_gorm.TaskLogRun{}, + &jobs_gorm.TaskIp{}, + ) + if err != nil { + panic(errors.New(fmt.Sprintf("创建任务模型失败:%v\n", err))) + } + return jobsGorm } diff --git a/version.go b/version.go index 53e5dca..0794d3e 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package gojobs -const Version = "1.0.20" +const Version = "1.0.21"