diff --git a/distributed.go b/distributed.go index 7ff5fb7..bbb32fd 100644 --- a/distributed.go +++ b/distributed.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "net/http" "net/url" + "sort" "strconv" "strings" "time" @@ -102,10 +103,18 @@ func (cluster *Cluster) cmdJoinCluster(w http.ResponseWriter, r *http.Request) { //获取分布式系统下所有的内容 func (cluster *Cluster) cmdQueryDistributedPrograms(w http.ResponseWriter, r *http.Request) { + + w.Header().Set("Content-Type", "application/json") + slaves := []string{} + for _, v := range cluster.slaves.GetALL() { + if slave, ok := v.(string); ok { + slaves = append(slaves, slave) + } + } + sort.Strings(slaves) jsonOut := "{" idx := 0 - for _, v := range cluster.slaves.GetALL() { - slave := v.(string) + for _, slave := range slaves { reqUrl := fmt.Sprintf("http://%s/api/programs", slave) if body, err := cluster.requestSlave(reqUrl, http.MethodGet, nil); err == nil { jsonOut += fmt.Sprintf("\"%s\":%s", slave, body) @@ -116,7 +125,6 @@ func (cluster *Cluster) cmdQueryDistributedPrograms(w http.ResponseWriter, r *ht idx += 1 } jsonOut += "}" - w.Header().Set("Content-Type", "application/json") w.Write([]byte(jsonOut)) } @@ -236,6 +244,6 @@ func newDistributed(suv *Supervisor, hdlr http.Handler) error { } var cluster = Cluster{ - slaves: gcache.New(10).LRU().Expiration(time.Second * 3).Build(), + slaves: gcache.New(1000).LRU().Expiration(time.Second * 3).Build(), client: new(http.Client), } diff --git a/fsm.go b/fsm.go index 2346e8d..1aa00e9 100644 --- a/fsm.go +++ b/fsm.go @@ -29,6 +29,7 @@ import ( "syscall" "time" + "github.com/axgle/pinyin" "github.com/kennygrant/sanitize" "github.com/lunny/dingtalk_webhook" "github.com/natefinch/lumberjack" @@ -194,7 +195,7 @@ func (p *Program) RunNotification(state FSMState) { ding := dingtalk.NewWebhook(group.Secret) err := ding.SendTextMsg(msg, false, group.Mobiles...) if err != nil { - log.Error("钉钉通知失败:", err) + log.Error("钉钉通知失败:", msg, err) } } } @@ -225,7 +226,7 @@ type Process struct { func (p *Process) buildCommand() *kexec.KCommand { cmd := kexec.CommandString(p.Command) // cmd := kexec.Command(p.Command[0], p.Command[1:]...) - logDir := filepath.Join(defaultGosuvDir, "log", sanitize.Name(p.Name)) + logDir := filepath.Join(defaultGosuvDir, "log", sanitize.Name(pinyin.Convert(p.Name))) if !IsDir(logDir) { os.MkdirAll(logDir, 0755) } @@ -288,6 +289,9 @@ func (p *Process) buildCommand() *kexec.KCommand { } func (p *Process) waitNextRetry() { + if p.OutputFile != nil { + p.OutputFile.Close() + } p.SetState(RetryWait) if p.retryLeft <= 0 { p.retryLeft = p.StartRetries @@ -314,7 +318,18 @@ func (p *Process) stopCommand() { p.SetState(Stopping) if p.cmd.Process != nil { - p.cmd.Process.Signal(syscall.SIGTERM) // TODO(ssx): add it to config + stopch := make(chan bool) + go func() { + p.cmd.Process.Signal(syscall.SIGTERM) + stopch <- true + }() + select { + case <-stopch: // TODO(ssx): add it to config + log.Println(p.Name, "停止完成") + case <-time.After(10 * time.Second): + log.Println(p.Name, "停止超时,强制 kill") + p.cmd.Process.Signal(syscall.SIGKILL) + } } select { case <-GoFunc(p.cmd.Wait): @@ -376,11 +391,13 @@ func (p *Process) startCommand() { return } } + p.waitNextRetry() case <-p.stopC: log.Println("recv stop command") p.stopCommand() // clean up all process } + }() }