use pb to replace start and stop

master
shengxiang 9 years ago
parent 9c63dde1a0
commit 1e4475afb6

@ -1,7 +1,6 @@
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
@ -33,40 +32,9 @@ func MkdirIfNoExists(dir string) error {
return nil
}
func ShellTestFile(flag string, file string) bool {
finfo, err := os.Stat(file)
switch flag {
case "x":
if err == nil && (finfo.Mode()&os.ModeExclusive) == 0 {
return true
}
}
return false
}
func chttp(method string, url string, v ...interface{}) (res *JSONResponse, err error) {
var resp *http.Response
switch method {
case "GET":
resp, err = http.Get(url)
case "POST":
resp, err = http.PostForm(url, nil)
}
if err != nil {
return nil, err
}
defer resp.Body.Close()
res = &JSONResponse{}
err = json.NewDecoder(resp.Body).Decode(res)
return
}
func wrapAction(f func(*cli.Context)) func(*cli.Context) {
return func(c *cli.Context) {
// check if serer alive
//host := c.GlobalString("host")
//port := c.GlobalInt("port")
//ServeAddr(host, port)
_, err := goreq.Request{
Method: "GET",
Uri: buildURI(c, "/api/version"),
@ -156,16 +124,41 @@ func buildURI(ctx *cli.Context, uri string) string {
ctx.GlobalString("host"), ctx.GlobalInt("port"), uri)
}
func buildpbURI(ctx *cli.Context) string {
return buildURI(ctx, "/protobuf")
func StopAction(ctx *cli.Context) {
conn, err := connect(ctx)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
name := ctx.Args().First()
client := pb.NewProgramClient(conn)
res, err := client.Stop(context.Background(), &pb.Request{Name: proto.String(name)})
if err != nil {
Errorf("ERR: %#v\n", err)
}
fmt.Println(res.GetMessage())
}
func StopAction(ctx *cli.Context) {
log.Println(buildpbURI(ctx))
req := &pb.CtrlRequest{
Action: proto.String("stop"),
func Errorf(format string, v ...interface{}) {
fmt.Printf(format, v...)
os.Exit(1)
}
func StartAction(ctx *cli.Context) {
conn, err := connect(ctx)
if err != nil {
log.Fatal(err)
}
_ = req
defer conn.Close()
name := ctx.Args().First()
client := pb.NewProgramClient(conn)
res, err := client.Start(context.Background(), &pb.Request{Name: proto.String(name)})
if err != nil {
Errorf("ERR: %#v\n", err)
}
fmt.Println(res.GetMessage())
}
// grpc.Dial can't set network, so I have to implement this func
@ -176,6 +169,12 @@ func grpcDial(network, addr string) (*grpc.ClientConn, error) {
}))
}
func connect(ctx *cli.Context) (cc *grpc.ClientConn, err error) {
sockPath := filepath.Join(GOSUV_HOME, "gosuv.sock")
conn, err := grpcDial("unix", sockPath)
return conn, err
}
func ShutdownAction(ctx *cli.Context) {
sockPath := filepath.Join(GOSUV_HOME, "gosuv.sock")
conn, err := grpcDial("unix", sockPath)
@ -194,12 +193,16 @@ func ShutdownAction(ctx *cli.Context) {
func VersionAction(ctx *cli.Context) {
fmt.Printf("Client: %s\n", GOSUV_VERSION)
res, err := chttp("GET", fmt.Sprintf("http://%s:%d/api/version",
ctx.GlobalString("host"), ctx.GlobalInt("port")))
res, err := goreq.Request{
Method: "GET",
Uri: buildURI(ctx, "/api/version"),
}.Do()
if err != nil {
panic(err)
}
fmt.Printf("Server: %s\n", res.Message)
var reply JSONResponse
res.Body.FromJsonTo(&reply)
fmt.Printf("Server: %s\n", reply.Message)
}
var app *cli.App
@ -252,10 +255,15 @@ func init() {
},
Action: wrapAction(AddAction),
},
{
Name: "start",
Usage: "start a not running program",
Action: wrapAction(StartAction),
},
{
Name: "stop",
Usage: "Stop running program",
Action: StopAction,
Action: wrapAction(StopAction),
},
{
Name: "shutdown",

@ -1,4 +1,4 @@
#!/bin/bash -x
#!/bin/bash -ex
#
cd $(dirname $0)
protoc --go_out=plugins=grpc:. *.proto

@ -13,6 +13,7 @@ It has these top-level messages:
CtrlResponse
NopRequest
Response
Request
*/
package gosuvpb
@ -79,8 +80,9 @@ func (m *NopRequest) String() string { return proto.CompactTextString(m) }
func (*NopRequest) ProtoMessage() {}
type Response struct {
Code *int32 `protobuf:"varint,1,opt,name=code" json:"code,omitempty"`
XXX_unrecognized []byte `json:"-"`
Code *int32 `protobuf:"varint,1,opt,name=code" json:"code,omitempty"`
Message *string `protobuf:"bytes,2,opt,name=message" json:"message,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Response) Reset() { *m = Response{} }
@ -94,6 +96,29 @@ func (m *Response) GetCode() int32 {
return 0
}
func (m *Response) GetMessage() string {
if m != nil && m.Message != nil {
return *m.Message
}
return ""
}
type Request struct {
Name *string `protobuf:"bytes,1,req,name=name" json:"name,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (m *Request) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
@ -181,3 +206,87 @@ var _GoSuv_serviceDesc = grpc.ServiceDesc{
},
Streams: []grpc.StreamDesc{},
}
// Client API for Program service
type ProgramClient interface {
Start(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
Stop(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
}
type programClient struct {
cc *grpc.ClientConn
}
func NewProgramClient(cc *grpc.ClientConn) ProgramClient {
return &programClient{cc}
}
func (c *programClient) Start(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) {
out := new(Response)
err := grpc.Invoke(ctx, "/gosuvpb.Program/Start", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *programClient) Stop(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) {
out := new(Response)
err := grpc.Invoke(ctx, "/gosuvpb.Program/Stop", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Program service
type ProgramServer interface {
Start(context.Context, *Request) (*Response, error)
Stop(context.Context, *Request) (*Response, error)
}
func RegisterProgramServer(s *grpc.Server, srv ProgramServer) {
s.RegisterService(&_Program_serviceDesc, srv)
}
func _Program_Start_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
in := new(Request)
if err := codec.Unmarshal(buf, in); err != nil {
return nil, err
}
out, err := srv.(ProgramServer).Start(ctx, in)
if err != nil {
return nil, err
}
return out, nil
}
func _Program_Stop_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
in := new(Request)
if err := codec.Unmarshal(buf, in); err != nil {
return nil, err
}
out, err := srv.(ProgramServer).Stop(ctx, in)
if err != nil {
return nil, err
}
return out, nil
}
var _Program_serviceDesc = grpc.ServiceDesc{
ServiceName: "gosuvpb.Program",
HandlerType: (*ProgramServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Start",
Handler: _Program_Start_Handler,
},
{
MethodName: "Stop",
Handler: _Program_Stop_Handler,
},
},
Streams: []grpc.StreamDesc{},
}

@ -14,9 +14,19 @@ message NopRequest {
message Response {
optional int32 code = 1;
optional string message = 2;
}
message Request {
required string name = 1;
}
service GoSuv {
rpc Control(CtrlRequest) returns (CtrlResponse) {};
rpc Shutdown(NopRequest) returns (Response) {};
}
service Program {
rpc Start(Request) returns (Response) {};
rpc Stop(Request) returns (Response) {};
}

@ -9,6 +9,7 @@ import (
"os/exec"
"path/filepath"
"sync"
"syscall"
"github.com/codeskyblue/kproc"
"github.com/qiniu/log"
@ -23,7 +24,7 @@ func GoFunc(f func() error) chan error {
}
const (
ST_PENDING = "pending"
ST_STANDBY = "standby"
ST_RUNNING = "running"
ST_STOPPED = "stopped"
ST_FATAL = "fatal"
@ -46,12 +47,30 @@ type Program struct {
func NewProgram(cmd *exec.Cmd, info *ProgramInfo) *Program {
return &Program{
Process: kproc.ProcCommand(cmd),
Status: ST_PENDING,
Status: ST_STANDBY,
Sig: make(chan os.Signal),
Info: info,
}
}
func (p *Program) setStatus(st string) {
// TODO: status change hook
p.Status = st
}
func (p *Program) InputData(event Event) {
switch event {
case EVENT_START:
if p.Status != ST_RUNNING {
go p.Run()
}
case EVENT_STOP:
if p.Status == ST_RUNNING {
p.Stop()
}
}
}
func (p *Program) createLog() (*os.File, error) {
logDir := os.ExpandEnv("$HOME/.gosuv/logs")
os.MkdirAll(logDir, 0755) // just do it, err ignore it
@ -59,18 +78,33 @@ func (p *Program) createLog() (*os.File, error) {
return os.Create(logFile)
}
func (p *Program) InputData(evevt Event) {
if p.Status == ST_PENDING {
go p.Run()
}
}
func (p *Program) Run() error {
func (p *Program) Run() (err error) {
if err := p.Start(); err != nil {
p.Status = ST_FATAL
p.setStatus(ST_FATAL)
return err
}
return p.Wait()
// TODO: retry needed here
p.setStatus(ST_RUNNING)
defer func() {
if out, ok := p.Cmd.Stdout.(io.Closer); ok {
out.Close()
}
if err != nil {
log.Warnf("program finish: %v", err)
p.setStatus(ST_FATAL)
} else {
p.setStatus(ST_STOPPED)
}
}()
err = p.Wait()
return
}
func (p *Program) Stop() error {
p.Terminate(syscall.SIGKILL)
p.Wait()
p.setStatus(ST_STOPPED)
return nil
}
func (p *Program) Start() error {
@ -84,30 +118,19 @@ func (p *Program) Start() error {
}
// wait func finish, also accept signal
func (p *Program) Wait() (err error) {
log.Println("Wait program to finish")
p.Status = ST_RUNNING
defer func() {
if out, ok := p.Cmd.Stdout.(io.Closer); ok {
out.Close()
}
if err != nil {
log.Warnf("program finish: %v", err)
p.Status = ST_FATAL
} else {
p.Status = ST_STOPPED
}
}()
ch := GoFunc(p.Cmd.Wait)
for {
select {
case err = <-ch:
return err
case sig := <-p.Sig:
p.Terminate(sig)
}
}
}
// func (p *Program) Wait() (err error) {
// log.Println("Wait program to finish")
// ch := GoFunc(p.Cmd.Wait)
// for {
// select {
// case err = <-ch:
// return err
// case sig := <-p.Sig:
// p.Terminate(sig)
// }
// }
// }
type ProgramInfo struct {
Name string `json:"name"`
@ -135,6 +158,7 @@ type ProgramTable struct {
var (
ErrProgramDuplicate = errors.New("program duplicate")
ErrProgramNotExists = errors.New("program not exists")
)
func (pt *ProgramTable) saveConfig() error {
@ -193,3 +217,19 @@ func (pt *ProgramTable) Programs() []*Program {
}
return ps
}
func (pt *ProgramTable) Get(name string) (*Program, error) {
program, exists := pt.table[name]
if !exists {
return nil, ErrProgramNotExists
}
return program, nil
}
func (pt *ProgramTable) StopAll() {
pt.mu.Lock()
defer pt.mu.Unlock()
for _, program := range pt.table {
program.Stop()
}
}

@ -0,0 +1,57 @@
package main
import (
"net"
"os"
"time"
pb "github.com/codeskyblue/gosuv/gosuvpb"
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
)
type PbProgram struct {
}
func (this *PbProgram) Start(ctx context.Context, in *pb.Request) (res *pb.Response, err error) {
res = &pb.Response{}
program, err := programTable.Get(in.GetName())
if err != nil {
return
}
program.InputData(EVENT_START)
res.Message = proto.String("program started")
return res, nil
}
func (this *PbProgram) Stop(ctx context.Context, in *pb.Request) (res *pb.Response, err error) {
res = &pb.Response{}
program, err := programTable.Get(in.GetName())
if err != nil {
return
}
program.InputData(EVENT_STOP)
res.Message = proto.String("program stopped")
return res, nil
}
type PbSuvServer struct {
lis net.Listener
}
func (s *PbSuvServer) Control(ctx context.Context, in *pb.CtrlRequest) (*pb.CtrlResponse, error) {
res := &pb.CtrlResponse{}
res.Value = proto.String("Hi")
return res, nil
}
func (s *PbSuvServer) Shutdown(ctx context.Context, in *pb.NopRequest) (*pb.Response, error) {
go func() {
time.Sleep(50 * time.Millisecond)
s.lis.Close()
os.Exit(2)
}()
res := &pb.Response{}
res.Code = proto.Int32(200)
return res, nil
}

@ -7,15 +7,14 @@ import (
"net/http"
"os"
"os/exec"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
pb "github.com/codeskyblue/gosuv/gosuvpb"
"github.com/golang/protobuf/proto"
"github.com/lunny/log"
"github.com/lunny/tango"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
@ -89,25 +88,21 @@ func shutdownHandler(w http.ResponseWriter, r *http.Request) {
})
}
type SuvServer struct {
lis net.Listener
}
func (s *SuvServer) Control(ctx context.Context, in *pb.CtrlRequest) (*pb.CtrlResponse, error) {
res := &pb.CtrlResponse{}
res.Value = proto.String("Hi")
return res, nil
}
func (s *SuvServer) Shutdown(ctx context.Context, in *pb.NopRequest) (*pb.Response, error) {
func handleSignal(lis net.Listener) {
sigc := make(chan os.Signal, 2)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGHUP)
go func() {
time.Sleep(50 * time.Millisecond)
s.lis.Close()
os.Exit(2)
for sig := range sigc {
log.Println("Receive signal:", sig)
if sig == syscall.SIGHUP {
return // ignore
}
lis.Close()
programTable.StopAll()
os.Exit(0)
return
}
}()
res := &pb.Response{}
res.Code = proto.Int32(200)
return res, nil
}
func ServeAddr(host string, port int) error {
@ -122,26 +117,22 @@ func ServeAddr(host string, port int) error {
})
addr := fmt.Sprintf("%s:%d", host, port)
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
t.Run(addr)
wg.Done()
}()
go func() {
grpcServ := grpc.NewServer()
pbServ := &SuvServer{}
pb.RegisterGoSuvServer(grpcServ, pbServ)
go t.Run(addr)
lis, err := net.Listen("unix", filepath.Join(GOSUV_HOME, "gosuv.sock"))
if err != nil {
log.Fatal(err)
}
pbServ.lis = lis
//defer lis.Close()
grpcServ.Serve(lis)
wg.Done()
}()
wg.Wait()
lis, err := net.Listen("unix", filepath.Join(GOSUV_HOME, "gosuv.sock"))
if err != nil {
log.Fatal(err)
}
handleSignal(lis)
pbServ := &PbSuvServer{}
pbProgram := &PbProgram{}
grpcServ := grpc.NewServer()
pb.RegisterGoSuvServer(grpcServ, pbServ)
pb.RegisterProgramServer(grpcServ, pbProgram)
pbServ.lis = lis
grpcServ.Serve(lis)
return fmt.Errorf("Address: %s has been used", addr)
}

Loading…
Cancel
Save