send events to browser every time when status change

master
codeskyblue 8 years ago
parent 2965cf0b85
commit 6aed1f5b12

@ -46,6 +46,9 @@
<button class="btn btn-default btn-sm" v-on:click="refresh">
<span class="glyphicon glyphicon-refresh"></span> Refresh
</button>
<button class="btn btn-default btn-sm" v-on:click="test">
<span class="glyphicon glyphicon-test"></span> Test
</button>
</div>
<div class="col-md-12">
<table class="table table-hover">

@ -12,6 +12,9 @@ function getQueryString(name) {
return null;
}
var ws;
var wsProtocol = location.protocol == "https:" ? "wss" : "ws";
var vm = new Vue({
el: "#app",
data: {
@ -57,6 +60,7 @@ var vm = new Vue({
return this.breadcrumb;
},
refresh: function() {
// ws.send("Hello")
console.log("RR");
$.ajax({
url: "/api/programs",
@ -65,6 +69,9 @@ var vm = new Vue({
}
});
},
test: function() {
ws.send("Test");
},
cmdStart: function(name) {
console.log(name);
$.ajax({
@ -120,4 +127,23 @@ $(function() {
e.preventDefault()
});
console.log("HEE")
ws = new WebSocket(wsProtocol + "://" + location.host + "/ws/events");
ws.onopen = function(evt) {
console.log("OPEN");
}
ws.onclose = function(evt) {
console.log("CLOSE");
ws = null;
}
ws.onmessage = function(evt) {
console.log("response:" + evt.data);
vm.refresh();
}
ws.onerror = function(evt) {
console.log("error:", evt.data);
}
// ws.send("Hello")
});

@ -10,10 +10,12 @@ import (
"path/filepath"
"reflect"
"strconv"
"sync"
"time"
"github.com/go-yaml/yaml"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/qiniu/log"
)
@ -22,12 +24,42 @@ type Supervisor struct {
pgs []*Program
pgMap map[string]*Program
procMap map[string]*Process
eventCs []chan string // channels
mu sync.Mutex
}
func (s *Supervisor) programPath() string {
return filepath.Join(s.ConfigDir, "programs.yml")
}
func (s *Supervisor) newProcess(pg Program) *Process {
p := NewProcess(pg)
origFunc := p.StateChange
p.StateChange = func(oldState, newState FSMState) {
log.Println(newState)
s.broadcastEvent(string(newState))
origFunc(oldState, newState)
}
return p
}
func (s *Supervisor) broadcastEvent(event string) {
s.mu.Lock()
defer s.mu.Unlock()
validEventCs := make([]chan string, 0, len(s.eventCs))
for _, c := range s.eventCs {
log.Println("start send events")
select {
case c <- event:
log.Println("Send events")
validEventCs = append(validEventCs, c)
case <-time.After(500 * time.Millisecond):
log.Println("Chan closed, remove from queue")
}
}
s.eventCs = validEventCs
}
func (s *Supervisor) addOrUpdateProgram(pg Program) error {
origPg, ok := s.pgMap[pg.Name]
if ok {
@ -41,7 +73,7 @@ func (s *Supervisor) addOrUpdateProgram(pg Program) error {
// TODO: wait state change
time.Sleep(2 * time.Second)
newProc := NewProcess(pg)
newProc := s.newProcess(pg)
s.procMap[pg.Name] = newProc
if isRunning {
newProc.Operate(StartEvent)
@ -51,7 +83,7 @@ func (s *Supervisor) addOrUpdateProgram(pg Program) error {
} else {
s.pgs = append(s.pgs, &pg)
s.pgMap[pg.Name] = &pg
s.procMap[pg.Name] = NewProcess(pg)
s.procMap[pg.Name] = s.newProcess(pg)
log.Println("Add:", pg.Name)
}
return s.saveDB()
@ -131,9 +163,7 @@ func (s *Supervisor) hGetProgram(w http.ResponseWriter, r *http.Request) {
for _, pg := range s.pgs {
procs = append(procs, s.procMap[pg.Name])
}
log.Println(procs[0])
data, err := json.Marshal(procs)
log.Println(string(data))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -200,11 +230,47 @@ func (s *Supervisor) hStartProgram(w http.ResponseWriter, r *http.Request) {
w.Write(data)
}
var upgrader = websocket.Upgrader{}
func (s *Supervisor) wsEvents(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer c.Close()
ch := make(chan string, 0)
s.eventCs = append(s.eventCs, ch)
go func() {
for message := range ch {
log.Println(message)
c.WriteMessage(1, []byte(message))
// c.WriteMessage(mt, message)
}
close(ch)
}()
for {
mt, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", mt, err)
break
}
log.Printf("recv: %v %s", mt, message)
err = c.WriteMessage(mt, message)
if err != nil {
log.Println("write:", err)
break
}
}
}
func init() {
suv := &Supervisor{
ConfigDir: filepath.Join(UserHomeDir(), ".gosuv"),
pgMap: make(map[string]*Program, 0),
procMap: make(map[string]*Process, 0),
eventCs: make([]chan string, 0),
}
if err := suv.loadDB(); err != nil {
log.Fatal(err)
@ -214,6 +280,7 @@ func init() {
r.HandleFunc("/api/programs", suv.hGetProgram).Methods("GET")
r.HandleFunc("/api/programs", suv.hAddProgram).Methods("POST")
r.HandleFunc("/api/programs/{name}/start", suv.hStartProgram).Methods("POST")
r.HandleFunc("/ws/events", suv.wsEvents)
fs := http.FileServer(http.Dir("res"))
http.Handle("/", r)

Loading…
Cancel
Save