add tail function

master
shengxiang 9 years ago
parent 05d1a450b6
commit 0a2f8f2197

@ -24,6 +24,11 @@ Go version at least `1.4`
$ gosuv stop timetest $ gosuv stop timetest
program "timetest" stopped program "timetest" stopped
$ gosuv tail timetest
line 1
line 2
line ...
# see more usage # see more usage
$ gosuv help $ gosuv help

@ -2,6 +2,7 @@ package main
import ( import (
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"net" "net"
"net/http" "net/http"
@ -164,6 +165,24 @@ func StopAction(ctx *cli.Context) {
fmt.Println(res.Message) fmt.Println(res.Message)
} }
func TailAction(ctx *cli.Context, client pb.ProgramClient) {
req := &pb.Request{Name: ctx.Args().First()}
tailc, err := client.Tail(context.Background(), req)
if err != nil {
log.Fatal(err)
}
for {
line, err := tailc.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Fatal(err)
}
fmt.Print(line.Line)
}
}
func Errorf(format string, v ...interface{}) { func Errorf(format string, v ...interface{}) {
fmt.Printf(format, v...) fmt.Printf(format, v...)
os.Exit(1) os.Exit(1)
@ -277,6 +296,11 @@ func initCli() {
Usage: "Stop running program", Usage: "Stop running program",
Action: wrap(StopAction), Action: wrap(StopAction),
}, },
{
Name: "tail",
Usage: "tail log",
Action: wrap(TailAction),
},
{ {
Name: "shutdown", Name: "shutdown",
Usage: "Shutdown server", Usage: "Shutdown server",

@ -220,6 +220,7 @@ var _GoSuv_serviceDesc = grpc.ServiceDesc{
type ProgramClient interface { type ProgramClient interface {
Start(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) Start(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
Stop(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) Stop(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
Tail(ctx context.Context, in *Request, opts ...grpc.CallOption) (Program_TailClient, error)
} }
type programClient struct { type programClient struct {
@ -248,11 +249,44 @@ func (c *programClient) Stop(ctx context.Context, in *Request, opts ...grpc.Call
return out, nil return out, nil
} }
func (c *programClient) Tail(ctx context.Context, in *Request, opts ...grpc.CallOption) (Program_TailClient, error) {
stream, err := grpc.NewClientStream(ctx, &_Program_serviceDesc.Streams[0], c.cc, "/gosuvpb.Program/Tail", opts...)
if err != nil {
return nil, err
}
x := &programTailClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Program_TailClient interface {
Recv() (*LogLine, error)
grpc.ClientStream
}
type programTailClient struct {
grpc.ClientStream
}
func (x *programTailClient) Recv() (*LogLine, error) {
m := new(LogLine)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// Server API for Program service // Server API for Program service
type ProgramServer interface { type ProgramServer interface {
Start(context.Context, *Request) (*Response, error) Start(context.Context, *Request) (*Response, error)
Stop(context.Context, *Request) (*Response, error) Stop(context.Context, *Request) (*Response, error)
Tail(*Request, Program_TailServer) error
} }
func RegisterProgramServer(s *grpc.Server, srv ProgramServer) { func RegisterProgramServer(s *grpc.Server, srv ProgramServer) {
@ -283,6 +317,27 @@ func _Program_Stop_Handler(srv interface{}, ctx context.Context, codec grpc.Code
return out, nil return out, nil
} }
func _Program_Tail_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Request)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(ProgramServer).Tail(m, &programTailServer{stream})
}
type Program_TailServer interface {
Send(*LogLine) error
grpc.ServerStream
}
type programTailServer struct {
grpc.ServerStream
}
func (x *programTailServer) Send(m *LogLine) error {
return x.ServerStream.SendMsg(m)
}
var _Program_serviceDesc = grpc.ServiceDesc{ var _Program_serviceDesc = grpc.ServiceDesc{
ServiceName: "gosuvpb.Program", ServiceName: "gosuvpb.Program",
HandlerType: (*ProgramServer)(nil), HandlerType: (*ProgramServer)(nil),
@ -296,5 +351,11 @@ var _Program_serviceDesc = grpc.ServiceDesc{
Handler: _Program_Stop_Handler, Handler: _Program_Stop_Handler,
}, },
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{
{
StreamName: "Tail",
Handler: _Program_Tail_Handler,
ServerStreams: true,
},
},
} }

@ -42,5 +42,5 @@ service GoSuv {
service Program { service Program {
rpc Start(Request) returns (Response) {} rpc Start(Request) returns (Response) {}
rpc Stop(Request) returns (Response) {} rpc Stop(Request) returns (Response) {}
//rpc Tail(Request) returns (stream LogLine) {} rpc Tail(Request) returns (stream LogLine) {}
} }

@ -121,7 +121,7 @@ func (p *Program) InputData(event Event) {
func (p *Program) createLog() (*os.File, error) { func (p *Program) createLog() (*os.File, error) {
logDir := filepath.Join(GOSUV_HOME, "logs") logDir := filepath.Join(GOSUV_HOME, "logs")
os.MkdirAll(logDir, 0755) // just do it, err ignore it os.MkdirAll(logDir, 0755) // just do it, err ignore it
logFile := filepath.Join(logDir, p.Info.Name+".log") logFile := p.logFilePath()
return os.OpenFile(logFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) return os.OpenFile(logFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
} }
@ -133,6 +133,11 @@ func (p *Program) sleep(d time.Duration) {
} }
} }
func (p *Program) logFilePath() string {
logDir := filepath.Join(GOSUV_HOME, "logs")
return filepath.Join(logDir, p.Info.Name+".log")
}
func (p *Program) RunWithRetry() { func (p *Program) RunWithRetry() {
for p.retry = 0; p.retry < p.Info.StartRetries+1; p.retry += 1 { for p.retry = 0; p.retry < p.Info.StartRetries+1; p.retry += 1 {
// wait program to exit // wait program to exit

@ -3,6 +3,7 @@ package main
import ( import (
"net" "net"
"os" "os"
"os/exec"
"time" "time"
pb "github.com/codeskyblue/gosuv/gosuvpb" pb "github.com/codeskyblue/gosuv/gosuvpb"
@ -34,6 +35,34 @@ func (this *PbProgram) Stop(ctx context.Context, in *pb.Request) (res *pb.Respon
return res, nil return res, nil
} }
//func (this *PbProgram) Tail(ctx context.Context, in *pb.Request)(stream
func (c *PbProgram) Tail(in *pb.Request, stream pb.Program_TailServer) (err error) {
program, err := programTable.Get(in.Name)
if err != nil {
return
}
cmd := exec.Command("tail", "-n5", "-f", program.logFilePath())
rd, err := cmd.StdoutPipe()
go cmd.Run()
defer func() {
if cmd.Process != nil {
cmd.Process.Kill()
}
}()
buf := make([]byte, 1024)
for {
nr, err := rd.Read(buf)
if err != nil {
break
}
line := &pb.LogLine{Line: string(buf[:nr])}
if err = stream.Send(line); err != nil {
return err
}
}
return nil
}
type PbSuvServer struct { type PbSuvServer struct {
lis net.Listener lis net.Listener
} }

Loading…
Cancel
Save