try to fix branch, but log still duplicate

master
codeskyblue 8 years ago
parent e34d1b80dd
commit c8c6923f63

@ -7,7 +7,7 @@ triggers:
- '**/*.py'
env:
DEBUG: "1"
cmd: sh ./run.sh
cmd: sh ./build.sh
shell: true
delay: 100ms
signal: TERM

@ -25,7 +25,13 @@ Features
Go version at least `1.6+`
## Installation
Standalone binary can be download from [Stable1](https://dl.equinox.io/shengxiang/gosuv/stable) [Stable github](https://github.com/codeskyblue/gosuv/releases/latest) [Develop](https://dl.equinox.io/shengxiang/gosuv/dev)
Standalone binary can be download from
| Name | Branch | Address |
|--------|--------|---------|
| equinox| stable | <https://dl.equinox.io/shengxiang/gosuv/stable> |
| github-release | stable | <https://github.com/codeskyblue/gosuv/releases/latest>|
| equinox| develop | <https://dl.equinox.io/shengxiang/gosuv/dev> |
Or if you have go enviroment, you can also build from source.

@ -3,6 +3,7 @@ package main
import (
"errors"
"io"
"os"
"sync"
"time"
@ -155,15 +156,16 @@ func NewWriteBroadcaster(size int) *WriteBroadcaster {
return bc
}
func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) {
func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) []byte {
w.Lock()
defer w.Unlock()
if w.closed {
writer.Close()
return
return nil
}
sw := StreamWriter{wc: writer, stream: stream}
w.writers[sw] = true
return w.buf.Bytes()
}
func (wb *WriteBroadcaster) Closed() bool {
@ -172,8 +174,11 @@ func (wb *WriteBroadcaster) Closed() bool {
func (wb *WriteBroadcaster) NewReader(name string) ([]byte, *io.PipeReader) {
r, w := io.Pipe()
wb.AddWriter(w, name)
return wb.buf.Bytes(), r
return wb.AddWriter(w, name), r
}
func (wb *WriteBroadcaster) AddWriterFunc(name string, fn func([]byte) error) []byte {
return wb.AddWriter(&funcWriter{fn}, name)
}
func (wb *WriteBroadcaster) Bytes() []byte {
@ -228,3 +233,58 @@ func (w *nopWriteCloser) Close() error { return nil }
func NopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w}
}
// func writer
type funcWriter struct {
wrFn func([]byte) error
}
func (f *funcWriter) Write(data []byte) (n int, err error) {
err = f.wrFn(data)
return len(data), err
}
func (f *funcWriter) Close() error { return nil }
// quick loss writer
type QuickLossBroadcastWriter struct {
*WriteBroadcaster
bufC chan []byte
closed bool
}
func (w *QuickLossBroadcastWriter) Write(buf []byte) (int, error) {
select {
case w.bufC <- buf:
default:
}
return len(buf), nil
}
func (w *QuickLossBroadcastWriter) Close() error {
if !w.closed {
w.closed = true
close(w.bufC)
w.WriteBroadcaster.CloseWriters()
}
return nil
}
func (w *QuickLossBroadcastWriter) drain() {
for data := range w.bufC {
w.WriteBroadcaster.Write(data)
}
}
func NewQuickLossBroadcastWriter(size int) *QuickLossBroadcastWriter {
qlw := &QuickLossBroadcastWriter{
WriteBroadcaster: NewWriteBroadcaster(size),
bufC: make(chan []byte, 20),
}
go qlw.drain()
qlw.AddWriterFunc("stdout", func(data []byte) error {
_, err := os.Stdout.Write(data)
return err
})
return qlw
}

@ -126,9 +126,9 @@ type Process struct {
*FSM `json:"-"`
Program `json:"program"`
cmd *kexec.KCommand
Stdout *BufferBroadcast
Stderr *BufferBroadcast
Output *BufferBroadcast
Stdout *QuickLossBroadcastWriter // *WriteBroadcaster // io.WriteCloser // *BufferBroadcast
Stderr *QuickLossBroadcastWriter // *BufferBroadcast
Output *QuickLossBroadcastWriter // *BufferBroadcast
OutputFile *os.File
stopC chan syscall.Signal
retryLeft int
@ -205,8 +205,8 @@ func (p *Process) IsRunning() bool {
func (p *Process) startCommand() {
p.stopCommand()
p.Stdout.Reset()
p.Stderr.Reset()
// p.Stdout.Reset()
// p.Stderr.Reset()
// p.Output.Reset() // Donot reset because log is still needed.
log.Printf("start cmd(%s): %s", p.Name, p.Command)
p.cmd = p.buildCommand()
@ -241,9 +241,9 @@ func NewProcess(pg Program) *Process {
stopC: make(chan syscall.Signal),
retryLeft: pg.StartRetries,
Status: string(Stopped),
Output: NewBufferBroadcast(outputBufferSize),
Stdout: NewBufferBroadcast(outputBufferSize),
Stderr: NewBufferBroadcast(outputBufferSize),
Output: NewQuickLossBroadcastWriter(outputBufferSize), // NewBufferBroadcast(outputBufferSize),
Stdout: NewQuickLossBroadcastWriter(outputBufferSize), // NewBufferBroadcast(outputBufferSize),
Stderr: NewQuickLossBroadcastWriter(outputBufferSize), // NewBufferBroadcast(outputBufferSize),
}
pr.StateChange = func(_, newStatus FSMState) {
pr.Status = string(newStatus)

@ -1,176 +0,0 @@
package main
// import (
// "io"
// "os"
// "path/filepath"
// "sync"
// "syscall"
// "time"
// "github.com/codeskyblue/kexec"
// "github.com/qiniu/log"
// )
// const (
// ST_RUNNING = "RUNNING"
// ST_STOPPED = "STOPPED"
// ST_FATAL = "FATAL"
// ST_RETRYWAIT = "RETRYWAIT" // some like python-supervisor EXITED
// )
// type Event int
// const (
// EVENT_START = Event(iota)
// EVENT_STOP
// )
// type Program struct {
// Name string `yaml:"name"`
// Command string `yaml:"command"`
// Environ []string `yaml:"environ"`
// Dir string `yaml:"directory"`
// AutoStart bool `yaml:"autostart"` // change to *bool, which support unexpected
// StartRetries int `yaml:"startretries"`
// StartSeconds int `yaml:"startsecs"`
// LogDir string `yaml:"logdir"`
// }
// func (p *Program) buildCmd() *kexec.KCommand {
// cmd := kexec.CommandString(p.Command) // Not tested here, I think it should work
// // cmd := kexec.Command(p.Command[0], p.Command[1:]...)
// cmd.Dir = p.Dir
// cmd.Env = append(os.Environ(), p.Environ...)
// return cmd
// }
// type Process struct {
// *kexec.KCommand `json:"-"`
// Status string `json:"state"`
// Sig chan os.Signal `json:"-"`
// Info *Program `json:"conf"`
// mu sync.Mutex
// retry int
// stopC chan bool
// }
// func NewProcess(info *Program) *Process {
// // set default values
// if info.StartRetries <= 0 {
// info.StartRetries = 3
// }
// if info.StartSeconds <= 0 {
// info.StartSeconds = 3
// }
// return &Process{
// Status: ST_STOPPED,
// Sig: make(chan os.Signal),
// Info: info,
// stopC: make(chan bool),
// }
// }
// func (p *Process) setStatus(st string) {
// p.Status = st
// }
// // STOP and START Should not run parallel
// func (p *Process) Operate(event Event) {
// p.mu.Lock()
// defer p.mu.Unlock()
// switch event {
// case EVENT_START:
// if p.Status == ST_STOPPED || p.Status == ST_FATAL {
// go p.RunWithRetry()
// }
// case EVENT_STOP:
// if p.Status == ST_RUNNING || p.Status == ST_RETRYWAIT {
// p.stop()
// }
// }
// }
// func (p *Process) LogFilePath() string {
// logDir := p.Info.LogDir
// if logDir == "" {
// logDir = filepath.Join(p.Dir, "logs")
// }
// return filepath.Join(logDir, p.Info.Name+".log")
// }
// func (p *Process) createLog() (*os.File, error) {
// logDir := filepath.Join(p.Info.LogDir, "logs")
// os.MkdirAll(logDir, 0755) // just do it, err ignore it
// logFile := p.LogFilePath()
// return os.OpenFile(logFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
// }
// func (p *Process) RunWithRetry() {
// var err error
// for p.retry = 0; p.retry < p.Info.StartRetries+1; p.retry += 1 {
// // wait program to exit
// errC := GoFunc(p.run)
// PROGRAM_WAIT:
// // Here is RUNNING State
// select {
// case err = <-errC:
// log.Info(p.Info.Name, err)
// case <-time.After(time.Second * time.Duration(p.Info.StartSeconds)): // reset retry
// p.retry = 0
// goto PROGRAM_WAIT
// case <-p.stopC:
// return
// }
// // Enter RETRY_WAIT State
// if p.retry < p.Info.StartRetries {
// p.setStatus(ST_RETRYWAIT)
// select {
// case <-p.stopC:
// return
// case <-time.After(time.Second * 2):
// }
// }
// }
// p.setStatus(ST_FATAL)
// }
// func (p *Process) start() error {
// p.KCommand = p.Info.buildCmd()
// logFd, err := p.createLog()
// if err != nil {
// return err
// }
// p.Cmd.Stdout = logFd
// p.Cmd.Stderr = logFd
// return p.Cmd.Start()
// }
// func (p *Process) run() (err error) {
// if err = p.start(); err != nil {
// return
// }
// p.setStatus(ST_RUNNING)
// defer func() {
// if out, ok := p.Cmd.Stdout.(io.Closer); ok {
// out.Close()
// }
// log.Warnf("Process finish: %v", err)
// }()
// err = p.Wait()
// return
// }
// func (p *Process) stop() error {
// select {
// case p.stopC <- true: // stopC may not recevied, when in FATAL, in a very low probality
// case <-time.After(time.Millisecond * 200): // 0.2s
// }
// p.Terminate(syscall.SIGKILL)
// p.setStatus(ST_STOPPED)
// return nil
// }

@ -401,9 +401,17 @@ func (s *Supervisor) wsLog(w http.ResponseWriter, r *http.Request) {
}
defer c.Close()
<-proc.Output.AddHookFunc(func(message string) error {
return c.WriteMessage(1, []byte(message))
wg := &sync.WaitGroup{}
wg.Add(1)
origData := proc.Output.AddWriterFunc(r.RemoteAddr, func(data []byte) error {
er := c.WriteMessage(1, data)
if er != nil {
wg.Done()
}
return er
})
c.WriteMessage(1, origData)
wg.Wait()
}
func (s *Supervisor) Close() {

Loading…
Cancel
Save