From 633489b4804c6396ae770f302273d9694f823a91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=85=89=E6=98=A5?= Date: Tue, 24 May 2022 12:42:56 +0800 Subject: [PATCH] =?UTF-8?q?-=20=E6=B3=A8=E5=86=8Cworker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etcd.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ version.go | 2 +- 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/etcd.go b/etcd.go index 9576900..473f2d4 100644 --- a/etcd.go +++ b/etcd.go @@ -3,8 +3,11 @@ package gojobs import ( "context" "errors" + "fmt" + "go.dtapp.net/goip" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/client/v3" + "log" "strings" "time" ) @@ -101,6 +104,63 @@ func (e Etcd) ListWorkers() (workerArr []string, err error) { return } +// RegisterWorker 注册worker +func (e Etcd) RegisterWorker() { + var ( + regKey string + leaseGrantResp *clientv3.LeaseGrantResponse + err error + keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse + keepAliveResp *clientv3.LeaseKeepAliveResponse + cancelCtx context.Context + cancelFunc context.CancelFunc + ) + + localIP := goip.GetOutsideIp() + + for { + // 注册路径 + regKey = JobWorkerDir + localIP + + cancelFunc = nil + + // 创建租约 + if leaseGrantResp, err = e.lease.Grant(context.TODO(), 10); err != nil { + goto RETRY + } + + // 自动续租 + if keepAliveChan, err = e.lease.KeepAlive(context.TODO(), leaseGrantResp.ID); err != nil { + goto RETRY + } + + cancelCtx, cancelFunc = context.WithCancel(context.TODO()) + + // 注册到etcd + if _, err = e.kv.Put(cancelCtx, regKey, "", clientv3.WithLease(leaseGrantResp.ID)); err != nil { + log.Println(fmt.Sprintf(" %s 服务注册失败:%s", regKey, err)) + goto RETRY + } + + // 处理续租应答 + for { + select { + case keepAliveResp = <-keepAliveChan: + if keepAliveResp == nil { // 续租失败 + log.Println("续租失败") + goto RETRY + } + } + } + + RETRY: + time.Sleep(1 * time.Second) + if cancelFunc != nil { + cancelFunc() + } + } +} + const ( // JobWorkerDir 服务注册目录 JobWorkerDir = "/cron/workers/" diff --git a/version.go b/version.go index c8a5c4c..8eb7d60 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package gojobs -const Version = "1.0.7" +const Version = "1.0.8"