diff --git a/README.md b/README.md index f9450b4..b57a3a5 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,11 @@ Go version at least `1.4` $ gosuv stop timetest program "timetest" stopped + $ gosuv tail timetest + line 1 + line 2 + line ... + # see more usage $ gosuv help diff --git a/gosuv.go b/gosuv.go index 7a99aa7..2fb2bc4 100644 --- a/gosuv.go +++ b/gosuv.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "io" "io/ioutil" "net" "net/http" @@ -164,6 +165,24 @@ func StopAction(ctx *cli.Context) { fmt.Println(res.Message) } +func TailAction(ctx *cli.Context, client pb.ProgramClient) { + req := &pb.Request{Name: ctx.Args().First()} + tailc, err := client.Tail(context.Background(), req) + if err != nil { + log.Fatal(err) + } + for { + line, err := tailc.Recv() + if err == io.EOF { + return + } + if err != nil { + log.Fatal(err) + } + fmt.Print(line.Line) + } +} + func Errorf(format string, v ...interface{}) { fmt.Printf(format, v...) os.Exit(1) @@ -277,6 +296,11 @@ func initCli() { Usage: "Stop running program", Action: wrap(StopAction), }, + { + Name: "tail", + Usage: "tail log", + Action: wrap(TailAction), + }, { Name: "shutdown", Usage: "Shutdown server", diff --git a/gosuvpb/gosuv.pb.go b/gosuvpb/gosuv.pb.go index 8828be1..3d10d22 100644 --- a/gosuvpb/gosuv.pb.go +++ b/gosuvpb/gosuv.pb.go @@ -220,6 +220,7 @@ var _GoSuv_serviceDesc = grpc.ServiceDesc{ type ProgramClient interface { Start(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) Stop(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) + Tail(ctx context.Context, in *Request, opts ...grpc.CallOption) (Program_TailClient, error) } type programClient struct { @@ -248,11 +249,44 @@ func (c *programClient) Stop(ctx context.Context, in *Request, opts ...grpc.Call return out, nil } +func (c *programClient) Tail(ctx context.Context, in *Request, opts ...grpc.CallOption) (Program_TailClient, error) { + stream, err := grpc.NewClientStream(ctx, &_Program_serviceDesc.Streams[0], c.cc, "/gosuvpb.Program/Tail", opts...) + if err != nil { + return nil, err + } + x := &programTailClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Program_TailClient interface { + Recv() (*LogLine, error) + grpc.ClientStream +} + +type programTailClient struct { + grpc.ClientStream +} + +func (x *programTailClient) Recv() (*LogLine, error) { + m := new(LogLine) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Server API for Program service type ProgramServer interface { Start(context.Context, *Request) (*Response, error) Stop(context.Context, *Request) (*Response, error) + Tail(*Request, Program_TailServer) error } func RegisterProgramServer(s *grpc.Server, srv ProgramServer) { @@ -283,6 +317,27 @@ func _Program_Stop_Handler(srv interface{}, ctx context.Context, codec grpc.Code return out, nil } +func _Program_Tail_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Request) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ProgramServer).Tail(m, &programTailServer{stream}) +} + +type Program_TailServer interface { + Send(*LogLine) error + grpc.ServerStream +} + +type programTailServer struct { + grpc.ServerStream +} + +func (x *programTailServer) Send(m *LogLine) error { + return x.ServerStream.SendMsg(m) +} + var _Program_serviceDesc = grpc.ServiceDesc{ ServiceName: "gosuvpb.Program", HandlerType: (*ProgramServer)(nil), @@ -296,5 +351,11 @@ var _Program_serviceDesc = grpc.ServiceDesc{ Handler: _Program_Stop_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Tail", + Handler: _Program_Tail_Handler, + ServerStreams: true, + }, + }, } diff --git a/gosuvpb/gosuv.proto b/gosuvpb/gosuv.proto index ad8c6bb..8e60b0a 100644 --- a/gosuvpb/gosuv.proto +++ b/gosuvpb/gosuv.proto @@ -42,5 +42,5 @@ service GoSuv { service Program { rpc Start(Request) returns (Response) {} rpc Stop(Request) returns (Response) {} - //rpc Tail(Request) returns (stream LogLine) {} + rpc Tail(Request) returns (stream LogLine) {} } diff --git a/program.go b/program.go index 83e987f..3ea472c 100644 --- a/program.go +++ b/program.go @@ -121,7 +121,7 @@ func (p *Program) InputData(event Event) { func (p *Program) createLog() (*os.File, error) { logDir := filepath.Join(GOSUV_HOME, "logs") os.MkdirAll(logDir, 0755) // just do it, err ignore it - logFile := filepath.Join(logDir, p.Info.Name+".log") + logFile := p.logFilePath() return os.OpenFile(logFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) } @@ -133,6 +133,11 @@ func (p *Program) sleep(d time.Duration) { } } +func (p *Program) logFilePath() string { + logDir := filepath.Join(GOSUV_HOME, "logs") + return filepath.Join(logDir, p.Info.Name+".log") +} + func (p *Program) RunWithRetry() { for p.retry = 0; p.retry < p.Info.StartRetries+1; p.retry += 1 { // wait program to exit diff --git a/service.go b/service.go index addd798..f2d9bee 100644 --- a/service.go +++ b/service.go @@ -3,6 +3,7 @@ package main import ( "net" "os" + "os/exec" "time" pb "github.com/codeskyblue/gosuv/gosuvpb" @@ -34,6 +35,34 @@ func (this *PbProgram) Stop(ctx context.Context, in *pb.Request) (res *pb.Respon return res, nil } +//func (this *PbProgram) Tail(ctx context.Context, in *pb.Request)(stream +func (c *PbProgram) Tail(in *pb.Request, stream pb.Program_TailServer) (err error) { + program, err := programTable.Get(in.Name) + if err != nil { + return + } + cmd := exec.Command("tail", "-n5", "-f", program.logFilePath()) + rd, err := cmd.StdoutPipe() + go cmd.Run() + defer func() { + if cmd.Process != nil { + cmd.Process.Kill() + } + }() + buf := make([]byte, 1024) + for { + nr, err := rd.Read(buf) + if err != nil { + break + } + line := &pb.LogLine{Line: string(buf[:nr])} + if err = stream.Send(line); err != nil { + return err + } + } + return nil +} + type PbSuvServer struct { lis net.Listener }