diff --git a/etcd.go b/etcd.go index 7a5417a..74cf270 100644 --- a/etcd.go +++ b/etcd.go @@ -9,17 +9,35 @@ import ( type EtcdConfig struct { Endpoints []string // 接口 []string{"http://127.0.0.1:2379"} DialTimeout time.Duration // time.Second * 5 + LocalIP string // 本机IP } // Etcd etcd type Etcd struct { EtcdConfig // 配置 - client *clientv3.Client // 驱动 - kv clientv3.KV - lease clientv3.Lease + Client *clientv3.Client // 驱动 + Kv clientv3.KV // Kv api + Lease clientv3.Lease // Lease api } // Close 关闭 func (e Etcd) Close() { - e.client.Close() + e.Client.Close() +} + +const ( + // JobSaveDir 定时任务任务保存目录 + JobSaveDir = "/cron/jobs/" + // JobWorkerDir 服务注册目录 + JobWorkerDir = "/cron/workers/" +) + +// GetWatchKey 监听的key +func (e Etcd) GetWatchKey() string { + return JobSaveDir + e.LocalIP +} + +// IssueWatchKey 下发的key +func (e Etcd) IssueWatchKey(ip string) string { + return JobSaveDir + ip + "/" } diff --git a/etcd_curd.go b/etcd_curd.go index 540cefc..ffc3af1 100644 --- a/etcd_curd.go +++ b/etcd_curd.go @@ -7,25 +7,25 @@ import ( // Watch 监听 func (e Etcd) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { - return e.client.Watch(ctx, key, opts...) + return e.Client.Watch(ctx, key, opts...) } // Create 创建 func (e Etcd) Create(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { - return e.client.Put(ctx, key, val, opts...) + return e.Client.Put(ctx, key, val, opts...) } // Get 获取 func (e Etcd) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { - return e.client.Get(ctx, key, opts...) + return e.Client.Get(ctx, key, opts...) } // Update 更新 func (e Etcd) Update(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { - return e.client.Put(ctx, key, val, opts...) + return e.Client.Put(ctx, key, val, opts...) } // Delete 删除 func (e Etcd) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { - return e.client.Delete(ctx, key, opts...) + return e.Client.Delete(ctx, key, opts...) } diff --git a/etcd_server.go b/etcd_server.go index 839f509..d017971 100644 --- a/etcd_server.go +++ b/etcd_server.go @@ -9,12 +9,17 @@ import ( ) // NewEtcdServer 创建 etcd server -func NewEtcdServer(config *EtcdConfig) (e *Etcd, err error) { +func NewEtcdServer(config *EtcdConfig) (*Etcd, error) { + + var ( + e = &Etcd{} + err error + ) e.Endpoints = config.Endpoints e.DialTimeout = config.DialTimeout - e.client, err = clientv3.New(clientv3.Config{ + e.Client, err = clientv3.New(clientv3.Config{ Endpoints: e.Endpoints, DialTimeout: e.DialTimeout, }) @@ -23,8 +28,8 @@ func NewEtcdServer(config *EtcdConfig) (e *Etcd, err error) { } // 得到KV和Lease的API子集 - e.kv = clientv3.NewKV(e.client) - e.lease = clientv3.NewLease(e.client) + e.Kv = clientv3.NewKV(e.Client) + e.Lease = clientv3.NewLease(e.Client) return e, nil } @@ -41,7 +46,7 @@ func (e Etcd) ListWorkers() (workerArr []string, err error) { workerArr = make([]string, 0) // 获取目录下所有Kv - if getResp, err = e.kv.Get(context.TODO(), JobWorkerDir, clientv3.WithPrefix()); err != nil { + if getResp, err = e.Kv.Get(context.TODO(), JobWorkerDir, clientv3.WithPrefix()); err != nil { return } diff --git a/etcd_worker.go b/etcd_worker.go index 39a0cd7..483afa2 100644 --- a/etcd_worker.go +++ b/etcd_worker.go @@ -4,24 +4,23 @@ import ( "context" "errors" "fmt" - "go.dtapp.net/goip" "go.etcd.io/etcd/client/v3" "log" "time" ) -const ( - // JobWorkerDir 服务注册目录 - JobWorkerDir = "/cron/workers/" -) - // NewEtcdWorker 创建 etcd Worker -func NewEtcdWorker(config *EtcdConfig) (e *Etcd, err error) { +func NewEtcdWorker(config *EtcdConfig) (*Etcd, error) { + + var ( + e = &Etcd{} + err error + ) e.Endpoints = config.Endpoints e.DialTimeout = config.DialTimeout - e.client, err = clientv3.New(clientv3.Config{ + e.Client, err = clientv3.New(clientv3.Config{ Endpoints: e.Endpoints, DialTimeout: e.DialTimeout, }) @@ -30,9 +29,10 @@ func NewEtcdWorker(config *EtcdConfig) (e *Etcd, err error) { } // 得到KV和Lease的API子集 - e.kv = clientv3.NewKV(e.client) - e.lease = clientv3.NewLease(e.client) + e.Kv = clientv3.NewKV(e.Client) + e.Lease = clientv3.NewLease(e.Client) + // 注册 go e.RegisterWorker() return e, nil @@ -50,32 +50,30 @@ func (e Etcd) RegisterWorker() { cancelFunc context.CancelFunc ) - localIP := goip.GetOutsideIp() - for { // 注册路径 - regKey = JobWorkerDir + localIP + regKey = JobWorkerDir + e.LocalIP cancelFunc = nil // 创建租约 - leaseGrantResp, err = e.lease.Grant(context.TODO(), 10) + leaseGrantResp, err = e.Lease.Grant(context.TODO(), 10) + log.Println("创建租约") if err != nil { - log.Println("创建租约") goto RETRY } // 自动续租 - keepAliveChan, err = e.lease.KeepAlive(context.TODO(), leaseGrantResp.ID) + keepAliveChan, err = e.Lease.KeepAlive(context.TODO(), leaseGrantResp.ID) + log.Println("自动续租") if err != nil { - log.Println("自动续租") goto RETRY } cancelCtx, cancelFunc = context.WithCancel(context.TODO()) // 注册到etcd - _, err = e.kv.Put(cancelCtx, regKey, "", clientv3.WithLease(leaseGrantResp.ID)) + _, err = e.Kv.Put(cancelCtx, regKey, "", clientv3.WithLease(leaseGrantResp.ID)) if err != nil { log.Println(fmt.Sprintf(" %s 服务注册失败:%s", regKey, err)) goto RETRY diff --git a/go.mod b/go.mod index b7a43d5..c768201 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 go.dtapp.net/goip v1.0.16 go.dtapp.net/gouuid v1.0.0 + go.etcd.io/etcd/api/v3 v3.5.4 go.etcd.io/etcd/client/v3 v3.5.4 google.golang.org/grpc v1.46.2 google.golang.org/protobuf v1.28.0 @@ -22,7 +23,6 @@ require ( go.dtapp.net/gorequest v1.0.18 // indirect go.dtapp.net/gostring v1.0.3 // indirect go.dtapp.net/gotime v1.0.2 // indirect - go.etcd.io/etcd/api/v3 v3.5.4 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.4 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect @@ -30,5 +30,5 @@ require ( golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/text v0.3.7 // indirect - google.golang.org/genproto v0.0.0-20220523171625-347a074981d8 // indirect + google.golang.org/genproto v0.0.0-20220524023933-508584e28198 // indirect ) diff --git a/go.sum b/go.sum index 4057d7f..fe75a0a 100644 --- a/go.sum +++ b/go.sum @@ -265,8 +265,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= -google.golang.org/genproto v0.0.0-20220523171625-347a074981d8 h1:4NSrVrQGh6+UqBEd+Kwdh6ZDwESH0Sj2bNUQN+VjoQk= -google.golang.org/genproto v0.0.0-20220523171625-347a074981d8/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= +google.golang.org/genproto v0.0.0-20220524023933-508584e28198 h1:a1g7i05I2vUwq5eYrmxBJy6rPbw/yo7WzzwPJmcC0P4= +google.golang.org/genproto v0.0.0-20220524023933-508584e28198/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/version.go b/version.go index 609cc25..e4b6f2a 100644 --- a/version.go +++ b/version.go @@ -1,3 +1,3 @@ package gojobs -const Version = "1.0.9" +const Version = "1.0.10"