From 0ed7ceff615fbaf839d8a3c8551ff8406dd0f5c5 Mon Sep 17 00:00:00 2001 From: wangyj Date: Wed, 17 Jan 2018 19:12:40 +0800 Subject: [PATCH 1/3] fix file fd leak --- fsm.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/fsm.go b/fsm.go index 2346e8d..49d3206 100644 --- a/fsm.go +++ b/fsm.go @@ -194,7 +194,7 @@ func (p *Program) RunNotification(state FSMState) { ding := dingtalk.NewWebhook(group.Secret) err := ding.SendTextMsg(msg, false, group.Mobiles...) if err != nil { - log.Error("钉钉通知失败:", err) + log.Error("钉钉通知失败:", msg, err) } } } @@ -288,6 +288,9 @@ func (p *Process) buildCommand() *kexec.KCommand { } func (p *Process) waitNextRetry() { + if p.OutputFile != nil { + p.OutputFile.Close() + } p.SetState(RetryWait) if p.retryLeft <= 0 { p.retryLeft = p.StartRetries @@ -376,11 +379,13 @@ func (p *Process) startCommand() { return } } + p.waitNextRetry() case <-p.stopC: log.Println("recv stop command") p.stopCommand() // clean up all process } + }() } From 56552ccc07579e11dadcaddbba555d4edc993cba Mon Sep 17 00:00:00 2001 From: wangyj Date: Wed, 24 Jan 2018 14:47:20 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=90=8E=E7=AB=AF=20slaves=20=E6=8E=92?= =?UTF-8?q?=E5=BA=8F=EF=BC=8C=E8=A7=A3=E5=86=B3=E9=A1=B5=E9=9D=A2=E8=B7=B3?= =?UTF-8?q?=E5=8A=A8=E9=97=AE=E9=A2=98=EF=BC=8Cslaves=20cache=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=88=B01000?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- distributed.go | 16 ++++++++++++---- fsm.go | 19 +++++++++++++++++-- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/distributed.go b/distributed.go index 7ff5fb7..bbb32fd 100644 --- a/distributed.go +++ b/distributed.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "net/http" "net/url" + "sort" "strconv" "strings" "time" @@ -102,10 +103,18 @@ func (cluster *Cluster) cmdJoinCluster(w http.ResponseWriter, r *http.Request) { //获取分布式系统下所有的内容 func (cluster *Cluster) cmdQueryDistributedPrograms(w http.ResponseWriter, r *http.Request) { + + w.Header().Set("Content-Type", "application/json") + slaves := []string{} + for _, v := range cluster.slaves.GetALL() { + if slave, ok := v.(string); ok { + slaves = append(slaves, slave) + } + } + sort.Strings(slaves) jsonOut := "{" idx := 0 - for _, v := range cluster.slaves.GetALL() { - slave := v.(string) + for _, slave := range slaves { reqUrl := fmt.Sprintf("http://%s/api/programs", slave) if body, err := cluster.requestSlave(reqUrl, http.MethodGet, nil); err == nil { jsonOut += fmt.Sprintf("\"%s\":%s", slave, body) @@ -116,7 +125,6 @@ func (cluster *Cluster) cmdQueryDistributedPrograms(w http.ResponseWriter, r *ht idx += 1 } jsonOut += "}" - w.Header().Set("Content-Type", "application/json") w.Write([]byte(jsonOut)) } @@ -236,6 +244,6 @@ func newDistributed(suv *Supervisor, hdlr http.Handler) error { } var cluster = Cluster{ - slaves: gcache.New(10).LRU().Expiration(time.Second * 3).Build(), + slaves: gcache.New(1000).LRU().Expiration(time.Second * 3).Build(), client: new(http.Client), } diff --git a/fsm.go b/fsm.go index 49d3206..db764e5 100644 --- a/fsm.go +++ b/fsm.go @@ -29,6 +29,7 @@ import ( "syscall" "time" + "github.com/axgle/pinyin" "github.com/kennygrant/sanitize" "github.com/lunny/dingtalk_webhook" "github.com/natefinch/lumberjack" @@ -225,7 +226,7 @@ type Process struct { func (p *Process) buildCommand() *kexec.KCommand { cmd := kexec.CommandString(p.Command) // cmd := kexec.Command(p.Command[0], p.Command[1:]...) - logDir := filepath.Join(defaultGosuvDir, "log", sanitize.Name(p.Name)) + logDir := filepath.Join(defaultGosuvDir, "log", sanitize.Name(pinyin.Convert(p.Name))) if !IsDir(logDir) { os.MkdirAll(logDir, 0755) } @@ -308,6 +309,7 @@ func (p *Process) waitNextRetry() { } func (p *Process) stopCommand() { + fmt.Println(p.Command, "###", p.Name, "stopCommand 1111111111111111") p.mu.Lock() defer p.mu.Unlock() defer p.SetState(Stopped) @@ -317,8 +319,20 @@ func (p *Process) stopCommand() { p.SetState(Stopping) if p.cmd.Process != nil { - p.cmd.Process.Signal(syscall.SIGTERM) // TODO(ssx): add it to config + stopch := make(chan bool) + go func() { + p.cmd.Process.Signal(syscall.SIGTERM) + stopch <- true + }() + select { + case <-stopch: // TODO(ssx): add it to config + log.Println(p.Name, "停止完成") + case <-time.After(10 * time.Second): + log.Println(p.Name, "停止超时,强制 kill") + p.cmd.Process.Signal(syscall.SIGKILL) + } } + fmt.Println(p.Command, "###", p.Name, "stopCommand 2222222222222222222") select { case <-GoFunc(p.cmd.Wait): p.RunNotification(FSMState("quit normally")) @@ -340,6 +354,7 @@ func (p *Process) stopCommand() { p.OutputFile = nil } p.cmd = nil + fmt.Println(p.Command, "###", p.Name, "3333333333333333333333") } func (p *Process) IsRunning() bool { From 0d2be4371381e6b7d0618601948dbd6b4085a5b3 Mon Sep 17 00:00:00 2001 From: soopsio <32614138+soopsio@users.noreply.github.com> Date: Sat, 27 Jan 2018 04:22:27 +0800 Subject: [PATCH 3/3] Update fsm.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 删除调试的 fmt.Print --- fsm.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/fsm.go b/fsm.go index db764e5..1aa00e9 100644 --- a/fsm.go +++ b/fsm.go @@ -309,7 +309,6 @@ func (p *Process) waitNextRetry() { } func (p *Process) stopCommand() { - fmt.Println(p.Command, "###", p.Name, "stopCommand 1111111111111111") p.mu.Lock() defer p.mu.Unlock() defer p.SetState(Stopped) @@ -332,7 +331,6 @@ func (p *Process) stopCommand() { p.cmd.Process.Signal(syscall.SIGKILL) } } - fmt.Println(p.Command, "###", p.Name, "stopCommand 2222222222222222222") select { case <-GoFunc(p.cmd.Wait): p.RunNotification(FSMState("quit normally")) @@ -354,7 +352,6 @@ func (p *Process) stopCommand() { p.OutputFile = nil } p.cmd = nil - fmt.Println(p.Command, "###", p.Name, "3333333333333333333333") } func (p *Process) IsRunning() bool {