parent
5be7f800ce
commit
494f65c727
@ -0,0 +1,98 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type BroadcastString struct {
|
||||
writers map[chan string]bool
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewBroadcastString() *BroadcastString {
|
||||
return &BroadcastString{
|
||||
writers: make(map[chan string]bool, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BroadcastString) WriteMessage(message string) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
for c := range b.writers {
|
||||
select {
|
||||
case c <- message:
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
log.Println("channel closed, remove from queue")
|
||||
delete(b.writers, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BroadcastString) AddListener(c chan string) chan string {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if c == nil {
|
||||
c = make(chan string, 0)
|
||||
}
|
||||
b.writers[c] = true
|
||||
return c
|
||||
}
|
||||
|
||||
type BufferBroadcast struct {
|
||||
bs *BroadcastString
|
||||
|
||||
maxSize int
|
||||
buf *bytes.Buffer
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewBufferBroadcast(size int) *BufferBroadcast {
|
||||
if size <= 0 {
|
||||
size = 4 * 1024 // 4K
|
||||
}
|
||||
return &BufferBroadcast{
|
||||
maxSize: size,
|
||||
bs: NewBroadcastString(),
|
||||
buf: bytes.NewBuffer(nil), // buffer.NewRing(buffer.New(size)),
|
||||
}
|
||||
}
|
||||
|
||||
func (b *BufferBroadcast) Write(data []byte) (n int, err error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if b.buf.Len() >= b.maxSize*2 {
|
||||
b.buf = bytes.NewBuffer(b.buf.Bytes()[b.buf.Len()-b.maxSize : b.buf.Len()])
|
||||
}
|
||||
b.bs.WriteMessage(string(data))
|
||||
return b.buf.Write(data)
|
||||
}
|
||||
|
||||
func (b *BufferBroadcast) Reset() {
|
||||
b.buf.Reset()
|
||||
}
|
||||
|
||||
func (b *BufferBroadcast) AddHookFunc(wf func(string) error) chan error {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
c := b.bs.AddListener(nil)
|
||||
errC := make(chan error, 1)
|
||||
go func() {
|
||||
data := b.buf.Bytes()
|
||||
// data, _ := ioutil.ReadAll(b.buf)
|
||||
if err := wf(string(data)); err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
for msg := range c {
|
||||
err := wf(msg)
|
||||
if err != nil {
|
||||
errC <- err
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
return errC
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBroadcast(t *testing.T) {
|
||||
bs := NewBroadcastString()
|
||||
bs.WriteMessage("hello")
|
||||
c1 := bs.AddListener(nil)
|
||||
go func() {
|
||||
bs.WriteMessage("world")
|
||||
}()
|
||||
message := <-c1
|
||||
if message != "world" {
|
||||
t.Fatalf("expect message world, but got %s", message)
|
||||
}
|
||||
c2 := bs.AddListener(nil)
|
||||
go func() {
|
||||
bs.WriteMessage("tab")
|
||||
}()
|
||||
|
||||
// test write multi
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
message = <-c2
|
||||
if message != "tab" {
|
||||
t.Errorf("expect tab, but got %s", message)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
message = <-c1
|
||||
if message != "tab" {
|
||||
t.Errorf("expect tab, but got %s", message)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
The MIT License (MIT)
|
||||
Copyright (c) 2016 codeskyblue
|
||||
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
|
||||
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
|
||||
OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
@ -1,7 +1,53 @@
|
||||
package kexec
|
||||
|
||||
import "os/exec"
|
||||
import (
|
||||
"errors"
|
||||
"os/exec"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type KCommand struct {
|
||||
*exec.Cmd
|
||||
|
||||
errChs []chan error
|
||||
err error
|
||||
finished bool
|
||||
once sync.Once
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (c *KCommand) Run() error {
|
||||
if err := c.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
return c.Wait()
|
||||
}
|
||||
|
||||
func (k *KCommand) Wait() error {
|
||||
if k.Process == nil {
|
||||
return errors.New("exec: not started")
|
||||
}
|
||||
k.once.Do(func() {
|
||||
if k.errChs == nil {
|
||||
k.errChs = make([]chan error, 0)
|
||||
}
|
||||
go func() {
|
||||
k.err = k.Cmd.Wait()
|
||||
k.mu.Lock()
|
||||
k.finished = true
|
||||
for _, errC := range k.errChs {
|
||||
errC <- k.err
|
||||
}
|
||||
k.mu.Unlock()
|
||||
}()
|
||||
})
|
||||
k.mu.Lock()
|
||||
if k.finished {
|
||||
k.mu.Unlock()
|
||||
return k.err
|
||||
}
|
||||
errC := make(chan error, 1)
|
||||
k.errChs = append(k.errChs, errC)
|
||||
k.mu.Unlock()
|
||||
return <-errC
|
||||
}
|
||||
|
Loading…
Reference in new issue