diff --git a/reverse.go b/reverse.go index 8db2167..0c783c1 100644 --- a/reverse.go +++ b/reverse.go @@ -9,6 +9,14 @@ import ( "time" ) +var startTime int64 + +func ts() int64 { + if startTime == 0 { + startTime = time.Now().Unix() + } + return time.Now().Unix() - startTime +} const MAXPACKET = 999 func b(s string) []byte { @@ -105,7 +113,7 @@ func (s *ReverseServer) Run() { } func (s *ReverseServer) handleSession(ses *LrcpSession) { - r := bufio.NewReader(ses) + r := bufio.NewReaderSize(ses, 1024*1024) for { data, err := r.ReadBytes('\n') if err != nil { return } @@ -155,7 +163,7 @@ func NewLrcpSession(sessionId uint32, pc net.PacketConn, addr net.Addr) *LrcpSes func (s *LrcpSession) snd(buf []byte) error { var err error - fmt.Printf("> %s\n", bytes.Replace(buf, b("\n"), b("\\n"), -1)) + fmt.Printf("> %d %s\n", ts(), bytes.Replace(buf, b("\n"), b("\\n"), -1)) _, err = s.pc.WriteTo(buf, s.addr) return err @@ -196,7 +204,7 @@ func (s *LrcpSession) waitAck() bool { for ! acked { select { case <- s.ackQ: acked = s.seq <= s.acked - case <-time.After(3 * time.Second): return false + case <-time.After(1 * time.Second): return false } } @@ -346,7 +354,7 @@ func (s *LrcpServer) Accept() *LrcpSession { } func (s *LrcpServer) handle(addr net.Addr, req []byte) { - fmt.Printf("< %s\n", bytes.Replace(req, b("\n"), []byte("\\n"), -1)) + fmt.Printf("< %d %s\n", ts(), bytes.Replace(req, b("\n"), []byte("\\n"), -1)) if ! bytes.HasPrefix(req, b("/")) || ! bytes.HasSuffix(req, b("/")) { return } @@ -414,6 +422,9 @@ func (s *LrcpServer) hdlAck(sessionId uint32, req []byte) error { strPos, req := splitSlash(req) pos, err:= b2i(strPos) if err != nil { return err } + if pos > ses.seq { + return s.hdlClose(sessionId) + } ses.acked = Max(ses.acked, pos) ses.ackQ <- pos return nil