You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gosuv/web.go

696 lines
16 KiB

package main
import (
"encoding/json"
"errors"
"fmt"
rice "github.com/GeertJohan/go.rice"
"html/template"
"io"
"io/ioutil"
"net/http"
"os"
"os/signal"
"os/user"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
"syscall"
"time"
"github.com/codeskyblue/gosuv/gops"
"github.com/codeskyblue/kexec"
"github.com/go-yaml/yaml"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
_ "github.com/shurcooL/vfsgen"
log "github.com/sirupsen/logrus"
)
var defaultConfigDir string
func init() {
Assets = rice.MustFindBox("res").HTTPBox()
defaultConfigDir = os.Getenv("GOSUV_HOME_DIR")
if defaultConfigDir == "" {
defaultConfigDir = filepath.Join(UserHomeDir(), ".gosuv")
}
//http.Handle("/res/", http.StripPrefix("/res/", http.FileServer(Assets))) // http.StripPrefix("/res/", Assets))
http.Handle("/res/", http.StripPrefix("/res/", http.FileServer(Assets)))
//http.Handle("/res/", http.FileServer(Assets))
}
type Supervisor struct {
ConfigDir string
names []string // order of programs
pgMap map[string]Program
procMap map[string]*Process
mu sync.Mutex
eventB *WriteBroadcaster
}
func (s *Supervisor) programs() []Program {
pgs := make([]Program, 0, len(s.names))
for _, name := range s.names {
pgs = append(pgs, s.pgMap[name])
}
return pgs
}
func (s *Supervisor) procs() []*Process {
ps := make([]*Process, 0, len(s.names))
for _, name := range s.names {
ps = append(ps, s.procMap[name])
}
return ps
}
func (s *Supervisor) programPath() string {
return filepath.Join(s.ConfigDir, "programs.yml")
}
func (s *Supervisor) newProcess(pg Program) *Process {
p := NewProcess(pg)
origFunc := p.StateChange
p.StateChange = func(oldState, newState FSMState) {
s.broadcastEvent(fmt.Sprintf("%s state: %s -> %s", p.Name, string(oldState), string(newState)))
origFunc(oldState, newState)
}
return p
}
func (s *Supervisor) broadcastEvent(event string) {
s.eventB.Write([]byte(event))
}
func (s *Supervisor) addStatusChangeListener(c chan string) {
sChan := s.eventB.NewChanString(fmt.Sprintf("%d", time.Now().UnixNano()))
go func() {
for msg := range sChan {
c <- msg
}
}()
}
// Send Stop signal and wait program stops
func (s *Supervisor) stopAndWait(name string) error {
p, ok := s.procMap[name]
if !ok {
return errors.New("No such program")
}
if !p.IsRunning() {
return nil
}
c := make(chan string, 0)
s.addStatusChangeListener(c)
p.Operate(StopEvent)
for {
select {
case <-c:
if !p.IsRunning() {
return nil
}
case <-time.After(1 * time.Second): // In case some event not catched
if !p.IsRunning() {
return nil
}
}
}
}
func (s *Supervisor) addOrUpdateProgram(pg Program) error {
// defer s.broadcastEvent(pg.Name + " add or update")
if err := pg.Check(); err != nil {
return err
}
origPg, ok := s.pgMap[pg.Name]
if ok {
if reflect.DeepEqual(origPg, pg) {
return nil
}
s.broadcastEvent(pg.Name + " update")
log.Println("Update:", pg.Name)
origProc := s.procMap[pg.Name]
isRunning := origProc.IsRunning()
go func() {
s.stopAndWait(origProc.Name)
newProc := s.newProcess(pg)
s.procMap[pg.Name] = newProc
s.pgMap[pg.Name] = pg // update origin
if isRunning {
newProc.Operate(StartEvent)
}
s.saveDB()
}()
} else {
s.names = append(s.names, pg.Name)
s.pgMap[pg.Name] = pg
s.procMap[pg.Name] = s.newProcess(pg)
s.broadcastEvent(pg.Name + " added")
}
return nil
}
// Check
// - Yaml format
// - Duplicated program
func (s *Supervisor) readConfigFromDB() (pgs []Program, err error) {
data, err := ioutil.ReadFile(s.programPath())
if err != nil {
data = []byte("")
}
pgs = make([]Program, 0)
if err = yaml.Unmarshal(data, &pgs); err != nil {
return nil, err
}
visited := map[string]bool{}
for _, pg := range pgs {
if visited[pg.Name] {
return nil, fmt.Errorf("Duplicated program name: %s", pg.Name)
}
visited[pg.Name] = true
}
return
}
func (s *Supervisor) loadDB() error {
s.mu.Lock()
defer s.mu.Unlock()
pgs, err := s.readConfigFromDB()
if err != nil {
return err
}
// add or update program
visited := map[string]bool{}
names := make([]string, 0, len(pgs))
for _, pg := range pgs {
names = append(names, pg.Name)
visited[pg.Name] = true
s.addOrUpdateProgram(pg)
}
s.names = names
// delete not exists program
for _, pg := range s.pgMap {
if visited[pg.Name] {
continue
}
s.removeProgram(pg.Name)
// name := pg.Name
// log.Printf("stop before delete program: %s", name)
// s.stopAndWait(name)
// delete(s.procMap, name)
// delete(s.pgMap, name)
// s.broadcastEvent(name + " deleted")
}
return nil
}
func (s *Supervisor) saveDB() error {
s.mu.Lock()
defer s.mu.Unlock()
data, err := yaml.Marshal(s.programs())
if err != nil {
return err
}
return ioutil.WriteFile(s.programPath(), data, 0644)
}
func (s *Supervisor) removeProgram(name string) {
names := make([]string, 0, len(s.names))
for _, pName := range s.names {
if pName == name {
continue
}
names = append(names, pName)
}
s.names = names
log.Printf("stop before delete program: %s", name)
s.stopAndWait(name)
delete(s.procMap, name)
delete(s.pgMap, name)
s.broadcastEvent(name + " deleted")
}
type WebConfig struct {
User string
Version string
}
func (s *Supervisor) renderHTML(r *http.Request, w http.ResponseWriter, name string, data interface{}) {
file, err := Assets.Open(name + ".html")
if err != nil {
panic(err)
}
defer file.Close()
body, _ := ioutil.ReadAll(file)
if data == nil {
wc := WebConfig{}
wc.Version = version
user, err := user.Current()
if err == nil {
wc.User = user.Username + "@" + cfg.Server.ServerName
}
data = wc
}
w.Header().Set("Content-Type", "text/html")
template.Must(template.New("t").Delims("[[", "]]").Parse(string(body))).Execute(w, data)
}
type JSONResponse struct {
Status int `json:"status"`
Value interface{} `json:"value"`
}
func (s *Supervisor) renderJSON(w http.ResponseWriter, data JSONResponse) {
w.Header().Set("Content-Type", "application/json")
bytes, _ := json.Marshal(data)
w.Write(bytes)
}
func (s *Supervisor) hIndex(w http.ResponseWriter, r *http.Request) {
s.renderHTML(r, w, "index", nil)
}
func (s *Supervisor) hSetting(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
s.renderHTML(r, w, "setting", map[string]string{
"Name": name,
})
}
func (s *Supervisor) hStatus(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
data, _ := json.Marshal(map[string]interface{}{
"status": 0,
"value": "server is running",
})
w.Write(data)
}
func (s *Supervisor) hShutdown(w http.ResponseWriter, r *http.Request) {
s.Close()
s.renderJSON(w, JSONResponse{
Status: 0,
Value: "gosuv server has been shutdown",
})
go func() {
time.Sleep(500 * time.Millisecond)
os.Exit(0)
}()
}
func (s *Supervisor) hReload(w http.ResponseWriter, r *http.Request) {
err := s.loadDB()
log.Println("reload config file")
if err == nil {
s.renderJSON(w, JSONResponse{
Status: 0,
Value: "load config success",
})
} else {
s.renderJSON(w, JSONResponse{
Status: 1,
Value: err.Error(),
})
}
}
func (s *Supervisor) hGetProgramList(w http.ResponseWriter, r *http.Request) {
data, err := json.Marshal(s.procs())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(data)
}
func (s *Supervisor) hGetProgram(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
proc, ok := s.procMap[name]
if !ok {
s.renderJSON(w, JSONResponse{
Status: 1,
Value: "program not exists",
})
return
} else {
s.renderJSON(w, JSONResponse{
Status: 0,
Value: proc,
})
}
}
func (s *Supervisor) hAddProgram(w http.ResponseWriter, r *http.Request) {
retries, err := strconv.Atoi(r.FormValue("retries"))
if err != nil {
http.Error(w, err.Error(), http.StatusForbidden)
return
}
envs := strings.Split(strings.ReplaceAll(r.FormValue("environ"), ":", ";"), ";")
envsok := []string{}
for _, env := range envs {
if strings.Contains(env, "=") {
envsok = append(envsok, env)
}
}
pg := Program{
Name: r.FormValue("name"),
Command: r.FormValue("command"),
Environ: envsok,
Dir: r.FormValue("dir"),
User: r.FormValue("user"),
StartAuto: r.FormValue("autostart") == "on",
StartRetries: retries,
// TODO: missing other values
}
if pg.Dir == "" {
pg.Dir = "/"
}
if err := pg.Check(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/json")
var data []byte
if _, ok := s.pgMap[pg.Name]; ok {
data, _ = json.Marshal(map[string]interface{}{
"status": 1,
"error": fmt.Sprintf("Program %s already exists", strconv.Quote(pg.Name)),
})
} else {
if err := s.addOrUpdateProgram(pg); err != nil {
data, _ = json.Marshal(map[string]interface{}{
"status": 1,
"error": err.Error(),
})
} else {
s.saveDB()
data, _ = json.Marshal(map[string]interface{}{
"status": 0,
})
}
}
w.Write(data)
}
func (s *Supervisor) hUpdateProgram(w http.ResponseWriter, r *http.Request) {
// name := mux.Vars(r)["name"]
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
pg := Program{}
err := json.NewDecoder(r.Body).Decode(&pg)
if err != nil {
json.NewEncoder(w).Encode(map[string]interface{}{
"status": 1,
"error": err.Error(),
})
return
}
err = s.addOrUpdateProgram(pg)
if err != nil {
json.NewEncoder(w).Encode(map[string]interface{}{
"status": 2,
"error": err.Error(),
})
return
}
json.NewEncoder(w).Encode(map[string]interface{}{
"status": 0,
"description": "program updated",
})
}
func (s *Supervisor) hDelProgram(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
w.Header().Set("Content-Type", "application/json")
var data []byte
if _, ok := s.pgMap[name]; !ok {
data, _ = json.Marshal(map[string]interface{}{
"status": 1,
"error": fmt.Sprintf("Program %s not exists", strconv.Quote(name)),
})
} else {
s.removeProgram(name)
s.saveDB()
data, _ = json.Marshal(map[string]interface{}{
"status": 0,
})
}
w.Write(data)
}
func (s *Supervisor) hStartProgram(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
proc, ok := s.procMap[name]
var data []byte
if !ok {
data, _ = json.Marshal(map[string]interface{}{
"status": 1,
"error": fmt.Sprintf("Process %s not exists", strconv.Quote(name)),
})
} else {
proc.Operate(StartEvent)
data, _ = json.Marshal(map[string]interface{}{
"status": 0,
"name": name,
})
}
w.Write(data)
}
func (s *Supervisor) hStopProgram(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
proc, ok := s.procMap[name]
var data []byte
if !ok {
data, _ = json.Marshal(map[string]interface{}{
"status": 1,
"error": fmt.Sprintf("Process %s not exists", strconv.Quote(name)),
})
} else {
proc.Operate(StopEvent)
data, _ = json.Marshal(map[string]interface{}{
"status": 0,
"name": name,
})
}
w.Write(data)
}
func (s *Supervisor) hWebhook(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
name, category := vars["name"], vars["category"]
proc, ok := s.procMap[name]
if !ok {
http.Error(w, fmt.Sprintf("proc %s not exist", strconv.Quote(name)), http.StatusForbidden)
return
}
hook := proc.Program.WebHook
if category == "github" {
gh := hook.Github
_ = gh.Secret
isRunning := proc.IsRunning()
s.stopAndWait(name)
go func() {
cmd := kexec.CommandString(hook.Command)
cmd.Dir = proc.Program.Dir
cmd.Stdout = proc.Output
cmd.Stderr = proc.Output
err := GoTimeout(cmd.Run, time.Duration(hook.Timeout)*time.Second)
if err == ErrGoTimeout {
cmd.Terminate(syscall.SIGTERM)
}
if err != nil {
log.Warnf("webhook command error: %v", err)
// Trigger pushover notification
}
if isRunning {
proc.Operate(StartEvent)
}
}()
io.WriteString(w, "success triggered")
} else {
log.Warnf("unknown webhook category: %v", category)
}
}
var upgrader = websocket.Upgrader{}
func (s *Supervisor) wsEvents(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer c.Close()
ch := make(chan string, 0)
s.addStatusChangeListener(ch)
go func() {
_, _ = <-ch // ignore the history messages
for message := range ch {
// Question: type 1 ?
c.WriteMessage(1, []byte(message))
}
// s.eventB.RemoveListener(ch)
}()
for {
mt, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", mt, err)
break
}
log.Printf("recv: %v %s", mt, message)
err = c.WriteMessage(mt, message)
if err != nil {
log.Println("write:", err)
break
}
}
}
func (s *Supervisor) wsLog(w http.ResponseWriter, r *http.Request) {
name := mux.Vars(r)["name"]
log.Println(name)
proc, ok := s.procMap[name]
if !ok {
log.Println("No such process")
// TODO: raise error here?
return
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer c.Close()
for data := range proc.Output.NewChanString(r.RemoteAddr) {
err := c.WriteMessage(1, []byte(data))
if err != nil {
proc.Output.CloseWriter(r.RemoteAddr)
break
}
}
}
// Performance
func (s *Supervisor) wsPerf(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer c.Close()
name := mux.Vars(r)["name"]
proc, ok := s.procMap[name]
if !ok {
log.Println("No such process")
// TODO: raise error here?
return
}
for {
// c.SetWriteDeadline(time.Now().Add(3 * time.Second))
if proc.cmd == nil || proc.cmd.Process == nil {
log.Println("process not running")
return
}
pid := proc.cmd.Process.Pid
ps, err := gops.NewProcess(pid)
if err != nil {
break
}
mainPinfo, err := ps.ProcInfo()
if err != nil {
break
}
pi := ps.ChildrenProcInfo(true)
pi.Add(mainPinfo)
err = c.WriteJSON(pi)
if err != nil {
break
}
time.Sleep(700 * time.Millisecond)
}
}
func (s *Supervisor) Close() {
for _, proc := range s.procMap {
s.stopAndWait(proc.Name)
}
log.Println("server closed")
}
func (s *Supervisor) catchExitSignal() {
sigC := make(chan os.Signal, 1)
signal.Notify(sigC, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
go func() {
for sig := range sigC {
if sig == syscall.SIGHUP {
log.Println("Receive SIGHUP, just ignore")
continue
}
log.Printf("Got signal: %v, stopping all running process\n", sig)
s.Close()
break
}
os.Exit(0)
}()
}
func (s *Supervisor) AutoStartPrograms() {
for _, proc := range s.procMap {
if proc.Program.StartAuto {
log.Printf("auto start %s", strconv.Quote(proc.Name))
proc.Operate(StartEvent)
}
}
}
func newSupervisorHandler() (suv *Supervisor, hdlr http.Handler, err error) {
suv = &Supervisor{
ConfigDir: defaultConfigDir,
pgMap: make(map[string]Program, 0),
procMap: make(map[string]*Process, 0),
eventB: NewWriteBroadcaster(4 * 1024),
}
if err = suv.loadDB(); err != nil {
return
}
suv.catchExitSignal()
r := mux.NewRouter()
r.HandleFunc("/", suv.hIndex)
r.HandleFunc("/settings/{name}", suv.hSetting)
r.HandleFunc("/api/status", suv.hStatus)
r.HandleFunc("/api/shutdown", suv.hShutdown).Methods("POST")
r.HandleFunc("/api/reload", suv.hReload).Methods("POST")
r.HandleFunc("/api/programs", suv.hGetProgramList).Methods("GET")
r.HandleFunc("/api/programs/{name}", suv.hGetProgram).Methods("GET")
r.HandleFunc("/api/programs/{name}", suv.hDelProgram).Methods("DELETE")
r.HandleFunc("/api/programs/{name}", suv.hUpdateProgram).Methods("PUT")
r.HandleFunc("/api/programs", suv.hAddProgram).Methods("POST")
r.HandleFunc("/api/programs/{name}/start", suv.hStartProgram).Methods("POST")
r.HandleFunc("/api/programs/{name}/stop", suv.hStopProgram).Methods("POST")
r.HandleFunc("/ws/events", suv.wsEvents)
r.HandleFunc("/ws/logs/{name}", suv.wsLog)
r.HandleFunc("/ws/perfs/{name}", suv.wsPerf)
r.HandleFunc("/webhooks/{name}/{category}", suv.hWebhook).Methods("POST")
return suv, r, nil
}