diff --git a/.fsw.yml b/.fsw.yml index c19f82d..1c84aaa 100644 --- a/.fsw.yml +++ b/.fsw.yml @@ -7,7 +7,7 @@ triggers: - '**/*.py' env: DEBUG: "1" - cmd: sh ./run.sh + cmd: sh ./build.sh shell: true delay: 100ms signal: TERM diff --git a/README.md b/README.md index 8a62096..6c9a348 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,13 @@ Features Go version at least `1.6+` ## Installation -Standalone binary can be download from [Stable1](https://dl.equinox.io/shengxiang/gosuv/stable) [Stable github](https://github.com/codeskyblue/gosuv/releases/latest) [Develop](https://dl.equinox.io/shengxiang/gosuv/dev) +Standalone binary can be download from + +| Name | Branch | Address | +|--------|--------|---------| +| equinox| stable | | +| github-release | stable | | +| equinox| develop | | Or if you have go enviroment, you can also build from source. diff --git a/broadcast.go b/broadcast.go index 03d107e..5bc0277 100644 --- a/broadcast.go +++ b/broadcast.go @@ -3,6 +3,7 @@ package main import ( "errors" "io" + "os" "sync" "time" @@ -155,15 +156,16 @@ func NewWriteBroadcaster(size int) *WriteBroadcaster { return bc } -func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) { +func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) []byte { w.Lock() defer w.Unlock() if w.closed { writer.Close() - return + return nil } sw := StreamWriter{wc: writer, stream: stream} w.writers[sw] = true + return w.buf.Bytes() } func (wb *WriteBroadcaster) Closed() bool { @@ -172,8 +174,11 @@ func (wb *WriteBroadcaster) Closed() bool { func (wb *WriteBroadcaster) NewReader(name string) ([]byte, *io.PipeReader) { r, w := io.Pipe() - wb.AddWriter(w, name) - return wb.buf.Bytes(), r + return wb.AddWriter(w, name), r +} + +func (wb *WriteBroadcaster) AddWriterFunc(name string, fn func([]byte) error) []byte { + return wb.AddWriter(&funcWriter{fn}, name) } func (wb *WriteBroadcaster) Bytes() []byte { @@ -228,3 +233,58 @@ func (w *nopWriteCloser) Close() error { return nil } func NopWriteCloser(w io.Writer) io.WriteCloser { return &nopWriteCloser{w} } + +// func writer +type funcWriter struct { + wrFn func([]byte) error +} + +func (f *funcWriter) Write(data []byte) (n int, err error) { + err = f.wrFn(data) + return len(data), err +} + +func (f *funcWriter) Close() error { return nil } + +// quick loss writer +type QuickLossBroadcastWriter struct { + *WriteBroadcaster + bufC chan []byte + closed bool +} + +func (w *QuickLossBroadcastWriter) Write(buf []byte) (int, error) { + select { + case w.bufC <- buf: + default: + } + return len(buf), nil +} + +func (w *QuickLossBroadcastWriter) Close() error { + if !w.closed { + w.closed = true + close(w.bufC) + w.WriteBroadcaster.CloseWriters() + } + return nil +} + +func (w *QuickLossBroadcastWriter) drain() { + for data := range w.bufC { + w.WriteBroadcaster.Write(data) + } +} + +func NewQuickLossBroadcastWriter(size int) *QuickLossBroadcastWriter { + qlw := &QuickLossBroadcastWriter{ + WriteBroadcaster: NewWriteBroadcaster(size), + bufC: make(chan []byte, 20), + } + go qlw.drain() + qlw.AddWriterFunc("stdout", func(data []byte) error { + _, err := os.Stdout.Write(data) + return err + }) + return qlw +} diff --git a/fsm.go b/fsm.go index b024c85..61bbcfc 100644 --- a/fsm.go +++ b/fsm.go @@ -126,9 +126,9 @@ type Process struct { *FSM `json:"-"` Program `json:"program"` cmd *kexec.KCommand - Stdout *BufferBroadcast - Stderr *BufferBroadcast - Output *BufferBroadcast + Stdout *QuickLossBroadcastWriter // *WriteBroadcaster // io.WriteCloser // *BufferBroadcast + Stderr *QuickLossBroadcastWriter // *BufferBroadcast + Output *QuickLossBroadcastWriter // *BufferBroadcast OutputFile *os.File stopC chan syscall.Signal retryLeft int @@ -205,8 +205,8 @@ func (p *Process) IsRunning() bool { func (p *Process) startCommand() { p.stopCommand() - p.Stdout.Reset() - p.Stderr.Reset() + // p.Stdout.Reset() + // p.Stderr.Reset() // p.Output.Reset() // Donot reset because log is still needed. log.Printf("start cmd(%s): %s", p.Name, p.Command) p.cmd = p.buildCommand() @@ -241,9 +241,9 @@ func NewProcess(pg Program) *Process { stopC: make(chan syscall.Signal), retryLeft: pg.StartRetries, Status: string(Stopped), - Output: NewBufferBroadcast(outputBufferSize), - Stdout: NewBufferBroadcast(outputBufferSize), - Stderr: NewBufferBroadcast(outputBufferSize), + Output: NewQuickLossBroadcastWriter(outputBufferSize), // NewBufferBroadcast(outputBufferSize), + Stdout: NewQuickLossBroadcastWriter(outputBufferSize), // NewBufferBroadcast(outputBufferSize), + Stderr: NewQuickLossBroadcastWriter(outputBufferSize), // NewBufferBroadcast(outputBufferSize), } pr.StateChange = func(_, newStatus FSMState) { pr.Status = string(newStatus) diff --git a/proctrl.go b/proctrl.go deleted file mode 100644 index 535f8d0..0000000 --- a/proctrl.go +++ /dev/null @@ -1,176 +0,0 @@ -package main - -// import ( -// "io" -// "os" -// "path/filepath" -// "sync" -// "syscall" -// "time" - -// "github.com/codeskyblue/kexec" -// "github.com/qiniu/log" -// ) - -// const ( -// ST_RUNNING = "RUNNING" -// ST_STOPPED = "STOPPED" -// ST_FATAL = "FATAL" -// ST_RETRYWAIT = "RETRYWAIT" // some like python-supervisor EXITED -// ) - -// type Event int - -// const ( -// EVENT_START = Event(iota) -// EVENT_STOP -// ) - -// type Program struct { -// Name string `yaml:"name"` -// Command string `yaml:"command"` -// Environ []string `yaml:"environ"` -// Dir string `yaml:"directory"` -// AutoStart bool `yaml:"autostart"` // change to *bool, which support unexpected -// StartRetries int `yaml:"startretries"` -// StartSeconds int `yaml:"startsecs"` -// LogDir string `yaml:"logdir"` -// } - -// func (p *Program) buildCmd() *kexec.KCommand { -// cmd := kexec.CommandString(p.Command) // Not tested here, I think it should work -// // cmd := kexec.Command(p.Command[0], p.Command[1:]...) -// cmd.Dir = p.Dir -// cmd.Env = append(os.Environ(), p.Environ...) -// return cmd -// } - -// type Process struct { -// *kexec.KCommand `json:"-"` -// Status string `json:"state"` -// Sig chan os.Signal `json:"-"` -// Info *Program `json:"conf"` -// mu sync.Mutex - -// retry int -// stopC chan bool -// } - -// func NewProcess(info *Program) *Process { -// // set default values -// if info.StartRetries <= 0 { -// info.StartRetries = 3 -// } -// if info.StartSeconds <= 0 { -// info.StartSeconds = 3 -// } -// return &Process{ -// Status: ST_STOPPED, -// Sig: make(chan os.Signal), -// Info: info, -// stopC: make(chan bool), -// } -// } - -// func (p *Process) setStatus(st string) { -// p.Status = st -// } - -// // STOP and START Should not run parallel -// func (p *Process) Operate(event Event) { -// p.mu.Lock() -// defer p.mu.Unlock() - -// switch event { -// case EVENT_START: -// if p.Status == ST_STOPPED || p.Status == ST_FATAL { -// go p.RunWithRetry() -// } -// case EVENT_STOP: -// if p.Status == ST_RUNNING || p.Status == ST_RETRYWAIT { -// p.stop() -// } -// } -// } - -// func (p *Process) LogFilePath() string { -// logDir := p.Info.LogDir -// if logDir == "" { -// logDir = filepath.Join(p.Dir, "logs") -// } -// return filepath.Join(logDir, p.Info.Name+".log") -// } - -// func (p *Process) createLog() (*os.File, error) { -// logDir := filepath.Join(p.Info.LogDir, "logs") -// os.MkdirAll(logDir, 0755) // just do it, err ignore it -// logFile := p.LogFilePath() -// return os.OpenFile(logFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) -// } - -// func (p *Process) RunWithRetry() { -// var err error -// for p.retry = 0; p.retry < p.Info.StartRetries+1; p.retry += 1 { -// // wait program to exit -// errC := GoFunc(p.run) - -// PROGRAM_WAIT: -// // Here is RUNNING State -// select { -// case err = <-errC: -// log.Info(p.Info.Name, err) -// case <-time.After(time.Second * time.Duration(p.Info.StartSeconds)): // reset retry -// p.retry = 0 -// goto PROGRAM_WAIT -// case <-p.stopC: -// return -// } - -// // Enter RETRY_WAIT State -// if p.retry < p.Info.StartRetries { -// p.setStatus(ST_RETRYWAIT) -// select { -// case <-p.stopC: -// return -// case <-time.After(time.Second * 2): -// } -// } -// } -// p.setStatus(ST_FATAL) -// } - -// func (p *Process) start() error { -// p.KCommand = p.Info.buildCmd() -// logFd, err := p.createLog() -// if err != nil { -// return err -// } -// p.Cmd.Stdout = logFd -// p.Cmd.Stderr = logFd -// return p.Cmd.Start() -// } - -// func (p *Process) run() (err error) { -// if err = p.start(); err != nil { -// return -// } -// p.setStatus(ST_RUNNING) -// defer func() { -// if out, ok := p.Cmd.Stdout.(io.Closer); ok { -// out.Close() -// } -// log.Warnf("Process finish: %v", err) -// }() -// err = p.Wait() -// return -// } - -// func (p *Process) stop() error { -// select { -// case p.stopC <- true: // stopC may not recevied, when in FATAL, in a very low probality -// case <-time.After(time.Millisecond * 200): // 0.2s -// } -// p.Terminate(syscall.SIGKILL) -// p.setStatus(ST_STOPPED) -// return nil -// } diff --git a/web.go b/web.go index 1e5ae05..b73228e 100644 --- a/web.go +++ b/web.go @@ -401,9 +401,17 @@ func (s *Supervisor) wsLog(w http.ResponseWriter, r *http.Request) { } defer c.Close() - <-proc.Output.AddHookFunc(func(message string) error { - return c.WriteMessage(1, []byte(message)) + wg := &sync.WaitGroup{} + wg.Add(1) + origData := proc.Output.AddWriterFunc(r.RemoteAddr, func(data []byte) error { + er := c.WriteMessage(1, data) + if er != nil { + wg.Done() + } + return er }) + c.WriteMessage(1, origData) + wg.Wait() } func (s *Supervisor) Close() {