fix broadcast reuse buf, which make log read error

master
codeskyblue 8 years ago
parent e7554e1d2b
commit 8b9923ac50

@ -3,7 +3,6 @@ package main
import (
"errors"
"io"
"os"
"sync"
"time"
@ -11,126 +10,6 @@ import (
"github.com/qiniu/log"
)
type BroadcastString struct {
msgC chan string
writers map[chan string]bool
mu sync.Mutex
}
func NewBroadcastString() *BroadcastString {
b := &BroadcastString{
msgC: make(chan string, 20), // in case of cmd pipe error
writers: make(map[chan string]bool, 0),
}
go func() {
for message := range b.msgC {
b.writeToAll(message)
}
}()
return b
}
func (b *BroadcastString) writeToAll(message string) {
for c := range b.writers {
select {
case c <- message:
case <-time.After(500 * time.Millisecond):
log.Println("channel closed, remove from queue")
delete(b.writers, c)
}
}
}
func (b *BroadcastString) WriteMessage(message string) {
select {
case b.msgC <- message:
default:
}
}
func (b *BroadcastString) Reset() {
b.msgC = make(chan string, 20)
}
func (b *BroadcastString) AddListener(c chan string) chan string {
b.mu.Lock()
defer b.mu.Unlock()
if c == nil {
c = make(chan string, 4)
}
b.writers[c] = true
return c
}
func (b *BroadcastString) RemoveListener(c chan string) {
b.mu.Lock()
defer b.mu.Unlock()
delete(b.writers, c)
}
func (b *BroadcastString) Close() {
close(b.msgC)
}
type BufferBroadcast struct {
bs *BroadcastString
maxSize int
buf *rbuf.FixedSizeRingBuf // *bytes.Buffer
mu sync.Mutex
}
func NewBufferBroadcast(size int) *BufferBroadcast {
if size <= 0 {
size = 4 * 1024 // 4K
}
bufb := &BufferBroadcast{
maxSize: size,
bs: NewBroadcastString(),
buf: rbuf.NewFixedSizeRingBuf(size), // bytes.NewBuffer(nil), // buffer.NewRing(buffer.New(size)),
}
bufC := bufb.bs.AddListener(nil)
go func() {
for msg := range bufC {
bufb.buf.Write([]byte(msg))
}
}()
return bufb
}
func (b *BufferBroadcast) Write(data []byte) (n int, err error) {
b.bs.WriteMessage(string(data)) // should return immediatiely, in case of pipe error
return len(data), nil
}
func (b *BufferBroadcast) Reset() {
b.buf.Reset()
b.bs.Reset()
}
func (b *BufferBroadcast) AddHookFunc(wf func(string) error) chan error {
b.mu.Lock()
defer b.mu.Unlock()
c := b.bs.AddListener(nil)
errC := make(chan error, 1)
go func() {
data := b.buf.Bytes()
// data, _ := ioutil.ReadAll(b.buf)
if err := wf(string(data)); err != nil {
errC <- err
return
}
for msg := range c {
err := wf(msg)
if err != nil {
errC <- err
break
}
}
}()
return errC
}
// The new broadcast
type StreamWriter struct {
wc io.WriteCloser
@ -156,29 +35,24 @@ func NewWriteBroadcaster(size int) *WriteBroadcaster {
return bc
}
func (w *WriteBroadcaster) AddWriter(writer io.WriteCloser, stream string) []byte {
w.Lock()
defer w.Unlock()
if w.closed {
writer.Close()
return nil
}
sw := StreamWriter{wc: writer, stream: stream}
w.writers[sw] = true
return w.buf.Bytes()
}
func (wb *WriteBroadcaster) Closed() bool {
return wb.closed
}
func (wb *WriteBroadcaster) NewReader(name string) ([]byte, *io.PipeReader) {
r, w := io.Pipe()
return wb.AddWriter(w, name), r
}
// this is main func
func (wb *WriteBroadcaster) NewChanString(name string) chan string {
wb.Lock()
defer wb.Unlock()
func (wb *WriteBroadcaster) AddWriterFunc(name string, fn func([]byte) error) []byte {
return wb.AddWriter(&funcWriter{fn}, name)
wr := NewChanStrWriter()
if wb.closed {
wr.Close()
return nil
}
sw := StreamWriter{wc: wr, stream: name}
wb.writers[sw] = true
wr.Write(wb.buf.Bytes())
return wr.C
}
func (wb *WriteBroadcaster) Bytes() []byte {
@ -195,7 +69,7 @@ func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
for sw := range w.writers {
// set write timeout
err = GoTimeout(func() error {
if n, err := sw.wc.Write(p); err != nil || n != len(p) {
if _, err := sw.wc.Write(p); err != nil { //|| n != len(p) {
return errors.New("broadcast to " + sw.stream + " error")
}
return nil
@ -203,12 +77,21 @@ func (w *WriteBroadcaster) Write(p []byte) (n int, err error) {
if err != nil {
// On error, evict the writer
log.Warnf("broadcase write error: %s, %s", sw.stream, err)
sw.wc.Close()
delete(w.writers, sw)
}
}
return len(p), nil
}
func (w *WriteBroadcaster) CloseWriter(name string) {
for sw := range w.writers {
if sw.stream == name {
sw.wc.Close()
}
}
}
func (w *WriteBroadcaster) CloseWriters() error {
w.Lock()
defer w.Unlock()
@ -237,28 +120,44 @@ func NopWriteCloser(w io.Writer) io.WriteCloser {
return &nopWriteCloser{w}
}
// func writer
type funcWriter struct {
wrFn func([]byte) error
// chan string writer
type chanStrWriter struct {
C chan string
closed bool
}
func (c *chanStrWriter) Write(data []byte) (n int, err error) {
if c.closed {
return 0, errors.New("chan writer closed")
}
c.C <- string(data) // write timeout
return len(data), nil
}
func (f *funcWriter) Write(data []byte) (n int, err error) {
err = f.wrFn(data)
return len(data), err
func (c *chanStrWriter) Close() error {
if !c.closed {
c.closed = true
close(c.C)
}
return nil
}
func (f *funcWriter) Close() error { return nil }
func NewChanStrWriter() *chanStrWriter {
return &chanStrWriter{
C: make(chan string, 10),
}
}
// quick loss writer
type QuickLossBroadcastWriter struct {
*WriteBroadcaster
bufC chan []byte
bufC chan string
closed bool
}
func (w *QuickLossBroadcastWriter) Write(buf []byte) (int, error) {
select {
case w.bufC <- buf:
case w.bufC <- string(buf):
default:
}
return len(buf), nil
@ -275,19 +174,15 @@ func (w *QuickLossBroadcastWriter) Close() error {
func (w *QuickLossBroadcastWriter) drain() {
for data := range w.bufC {
w.WriteBroadcaster.Write(data)
w.WriteBroadcaster.Write([]byte(data))
}
}
func NewQuickLossBroadcastWriter(size int) *QuickLossBroadcastWriter {
qlw := &QuickLossBroadcastWriter{
WriteBroadcaster: NewWriteBroadcaster(size),
bufC: make(chan []byte, 20),
bufC: make(chan string, 20),
}
go qlw.drain()
qlw.AddWriterFunc("stdout", func(data []byte) error {
_, err := os.Stdout.Write(data)
return err
})
return qlw
}

@ -126,9 +126,9 @@ type Process struct {
*FSM `json:"-"`
Program `json:"program"`
cmd *kexec.KCommand
Stdout *QuickLossBroadcastWriter // *WriteBroadcaster // io.WriteCloser // *BufferBroadcast
Stderr *QuickLossBroadcastWriter // *BufferBroadcast
Output *QuickLossBroadcastWriter // *BufferBroadcast
Stdout *QuickLossBroadcastWriter
Stderr *QuickLossBroadcastWriter
Output *QuickLossBroadcastWriter
OutputFile *os.File
stopC chan syscall.Signal
retryLeft int
@ -216,7 +216,7 @@ func (p *Process) startCommand() {
errC := GoFunc(p.cmd.Run)
startTime := time.Now()
select {
case err := <-errC: //<-GoTimeoutFunc(time.Duration(p.StartSeconds)*time.Second, p.cmd.Run):
case err := <-errC:
log.Println(err, time.Since(startTime))
if time.Since(startTime) < time.Duration(p.StartSeconds)*time.Second {
if p.retryLeft == p.StartRetries { // If first time quit so fast, just set to fatal

@ -142,6 +142,7 @@ var vm = new Vue({
},
onmessage: function(evt) {
// strip ansi color
// console.log("DT:", evt.data)
that.log.content += evt.data.replace(/\033\[[0-9;]*m/g, "");
that.log.line_count = $.trim(that.log.content).split(/\r\n|\r|\n/).length;
if (that.log.follow) {

@ -38,7 +38,8 @@ type Supervisor struct {
procMap map[string]*Process
mu sync.Mutex
eventB *BroadcastString
eventB *WriteBroadcaster
// eventB *BroadcastString
}
func (s *Supervisor) programPath() string {
@ -56,11 +57,16 @@ func (s *Supervisor) newProcess(pg Program) *Process {
}
func (s *Supervisor) broadcastEvent(event string) {
s.eventB.WriteMessage(event)
s.eventB.Write([]byte(event))
}
func (s *Supervisor) addStatusChangeListener(c chan string) {
s.eventB.AddListener(c)
sChan := s.eventB.NewChanString(fmt.Sprintf("%d", time.Now().UnixNano()))
go func() {
for msg := range sChan {
c <- msg
}
}()
}
// Send Stop signal and wait program stops
@ -367,7 +373,7 @@ func (s *Supervisor) wsEvents(w http.ResponseWriter, r *http.Request) {
// Question: type 1 ?
c.WriteMessage(1, []byte(message))
}
s.eventB.RemoveListener(ch)
// s.eventB.RemoveListener(ch)
}()
for {
mt, message, err := c.ReadMessage()
@ -401,17 +407,13 @@ func (s *Supervisor) wsLog(w http.ResponseWriter, r *http.Request) {
}
defer c.Close()
wg := &sync.WaitGroup{}
wg.Add(1)
origData := proc.Output.AddWriterFunc(r.RemoteAddr, func(data []byte) error {
er := c.WriteMessage(1, data)
if er != nil {
wg.Done()
for data := range proc.Output.NewChanString(r.RemoteAddr) {
err := c.WriteMessage(1, []byte(data))
if err != nil {
proc.Output.CloseWriter(r.RemoteAddr)
break
}
return er
})
c.WriteMessage(1, origData)
wg.Wait()
}
}
func (s *Supervisor) Close() {
@ -443,8 +445,7 @@ func newSupervisorHandler() (hdlr http.Handler, err error) {
ConfigDir: defaultConfigDir,
pgMap: make(map[string]*Program, 0),
procMap: make(map[string]*Process, 0),
// eventCs: make(map[chan string]bool),
eventB: NewBroadcastString(),
eventB: NewWriteBroadcaster(4 * 1024),
}
if err = suv.loadDB(); err != nil {
return

Loading…
Cancel
Save