diff --git a/distributed.go b/distributed.go index 7ff5fb7..bbb32fd 100644 --- a/distributed.go +++ b/distributed.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "net/http" "net/url" + "sort" "strconv" "strings" "time" @@ -102,10 +103,18 @@ func (cluster *Cluster) cmdJoinCluster(w http.ResponseWriter, r *http.Request) { //获取分布式系统下所有的内容 func (cluster *Cluster) cmdQueryDistributedPrograms(w http.ResponseWriter, r *http.Request) { + + w.Header().Set("Content-Type", "application/json") + slaves := []string{} + for _, v := range cluster.slaves.GetALL() { + if slave, ok := v.(string); ok { + slaves = append(slaves, slave) + } + } + sort.Strings(slaves) jsonOut := "{" idx := 0 - for _, v := range cluster.slaves.GetALL() { - slave := v.(string) + for _, slave := range slaves { reqUrl := fmt.Sprintf("http://%s/api/programs", slave) if body, err := cluster.requestSlave(reqUrl, http.MethodGet, nil); err == nil { jsonOut += fmt.Sprintf("\"%s\":%s", slave, body) @@ -116,7 +125,6 @@ func (cluster *Cluster) cmdQueryDistributedPrograms(w http.ResponseWriter, r *ht idx += 1 } jsonOut += "}" - w.Header().Set("Content-Type", "application/json") w.Write([]byte(jsonOut)) } @@ -236,6 +244,6 @@ func newDistributed(suv *Supervisor, hdlr http.Handler) error { } var cluster = Cluster{ - slaves: gcache.New(10).LRU().Expiration(time.Second * 3).Build(), + slaves: gcache.New(1000).LRU().Expiration(time.Second * 3).Build(), client: new(http.Client), } diff --git a/fsm.go b/fsm.go index 49d3206..db764e5 100644 --- a/fsm.go +++ b/fsm.go @@ -29,6 +29,7 @@ import ( "syscall" "time" + "github.com/axgle/pinyin" "github.com/kennygrant/sanitize" "github.com/lunny/dingtalk_webhook" "github.com/natefinch/lumberjack" @@ -225,7 +226,7 @@ type Process struct { func (p *Process) buildCommand() *kexec.KCommand { cmd := kexec.CommandString(p.Command) // cmd := kexec.Command(p.Command[0], p.Command[1:]...) - logDir := filepath.Join(defaultGosuvDir, "log", sanitize.Name(p.Name)) + logDir := filepath.Join(defaultGosuvDir, "log", sanitize.Name(pinyin.Convert(p.Name))) if !IsDir(logDir) { os.MkdirAll(logDir, 0755) } @@ -308,6 +309,7 @@ func (p *Process) waitNextRetry() { } func (p *Process) stopCommand() { + fmt.Println(p.Command, "###", p.Name, "stopCommand 1111111111111111") p.mu.Lock() defer p.mu.Unlock() defer p.SetState(Stopped) @@ -317,8 +319,20 @@ func (p *Process) stopCommand() { p.SetState(Stopping) if p.cmd.Process != nil { - p.cmd.Process.Signal(syscall.SIGTERM) // TODO(ssx): add it to config + stopch := make(chan bool) + go func() { + p.cmd.Process.Signal(syscall.SIGTERM) + stopch <- true + }() + select { + case <-stopch: // TODO(ssx): add it to config + log.Println(p.Name, "停止完成") + case <-time.After(10 * time.Second): + log.Println(p.Name, "停止超时,强制 kill") + p.cmd.Process.Signal(syscall.SIGKILL) + } } + fmt.Println(p.Command, "###", p.Name, "stopCommand 2222222222222222222") select { case <-GoFunc(p.cmd.Wait): p.RunNotification(FSMState("quit normally")) @@ -340,6 +354,7 @@ func (p *Process) stopCommand() { p.OutputFile = nil } p.cmd = nil + fmt.Println(p.Command, "###", p.Name, "3333333333333333333333") } func (p *Process) IsRunning() bool {