fix pipe error

master
codeskyblue 8 years ago
parent 79fc7d1966
commit 76e5617248

@ -1,27 +1,33 @@
package main
import (
"log"
"sync"
"time"
"github.com/glycerine/rbuf"
"github.com/qiniu/log"
)
type BroadcastString struct {
msgC chan string
writers map[chan string]bool
mu sync.Mutex
}
func NewBroadcastString() *BroadcastString {
return &BroadcastString{
b := &BroadcastString{
msgC: make(chan string, 20), // in case of cmd pipe error
writers: make(map[chan string]bool, 0),
}
go func() {
for message := range b.msgC {
b.writeToAll(message)
}
}()
return b
}
func (b *BroadcastString) WriteMessage(message string) {
b.mu.Lock()
defer b.mu.Unlock()
func (b *BroadcastString) writeToAll(message string) {
for c := range b.writers {
select {
case c <- message:
@ -32,6 +38,17 @@ func (b *BroadcastString) WriteMessage(message string) {
}
}
func (b *BroadcastString) WriteMessage(message string) {
select {
case b.msgC <- message:
default:
}
}
func (b *BroadcastString) Reset() {
b.msgC = make(chan string, 20)
}
func (b *BroadcastString) AddListener(c chan string) chan string {
b.mu.Lock()
defer b.mu.Unlock()
@ -48,6 +65,10 @@ func (b *BroadcastString) RemoveListener(c chan string) {
delete(b.writers, c)
}
func (b *BroadcastString) Close() {
close(b.msgC)
}
type BufferBroadcast struct {
bs *BroadcastString
@ -60,25 +81,28 @@ func NewBufferBroadcast(size int) *BufferBroadcast {
if size <= 0 {
size = 4 * 1024 // 4K
}
return &BufferBroadcast{
bufb := &BufferBroadcast{
maxSize: size,
bs: NewBroadcastString(),
buf: rbuf.NewFixedSizeRingBuf(size), // bytes.NewBuffer(nil), // buffer.NewRing(buffer.New(size)),
}
bufC := bufb.bs.AddListener(nil)
go func() {
for msg := range bufC {
bufb.buf.Write([]byte(msg))
}
}()
return bufb
}
func (b *BufferBroadcast) Write(data []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
// if b.buf.Len() >= b.maxSize*2 {
// b.buf = bytes.NewBuffer(b.buf.Bytes()[b.buf.Len()-b.maxSize : b.buf.Len()])
// }
b.bs.WriteMessage(string(data))
return b.buf.Write(data)
b.bs.WriteMessage(string(data)) // should return immediatiely, in case of pipe error
return len(data), nil
}
func (b *BufferBroadcast) Reset() {
b.buf.Reset()
b.bs.Reset()
}
func (b *BufferBroadcast) AddHookFunc(wf func(string) error) chan error {

Loading…
Cancel
Save