From 14315b85ab3f363117ea1c812e4fa2ffefeec7c6 Mon Sep 17 00:00:00 2001 From: esrrhs Date: Wed, 23 Oct 2019 20:36:13 +0800 Subject: [PATCH] add --- client.go | 4 +- framemgr.go | 147 +++++++++++++++++++++++++++++++++++++++++++--------- server.go | 92 +++++++++++++++++++++++--------- 3 files changed, 191 insertions(+), 52 deletions(-) diff --git a/client.go b/client.go index 1a75c74..f57e8bb 100644 --- a/client.go +++ b/client.go @@ -204,7 +204,6 @@ func (p *Client) AcceptTcp() error { func (p *Client) AcceptTcpConn(conn *net.TCPConn) { - now := time.Now() uuid := UniqueId() tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr) @@ -243,6 +242,7 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) { sendlist := clientConn.fm.getSendList() + now := time.Now() clientConn.activeTime = now for e := sendlist.Front(); e != nil; e = e.Next() { @@ -251,7 +251,7 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) { mb, err := proto.Marshal(&f) if err != nil { loggo.Error("Error tcp Marshal %s %s %s", uuid, tcpsrcaddr.String(), err) - break + continue } p.sequence++ diff --git a/framemgr.go b/framemgr.go index 5243d67..9708e81 100644 --- a/framemgr.go +++ b/framemgr.go @@ -8,15 +8,20 @@ import ( ) type FrameMgr struct { - sendb *rbuffergo.RBuffergo - recvb *rbuffergo.RBuffergo - sendlock sync.Locker + sendb *rbuffergo.RBuffergo + recvb *rbuffergo.RBuffergo + recvlock sync.Locker windowsize int resend_timems int - win *list.List - sendid int - sendlist *list.List + + sendwin *list.List + sendlist *list.List + sendid int + + recvwin *list.List + recvlist *list.List + recvid int } func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr { @@ -25,9 +30,10 @@ func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr { recvb := rbuffergo.New(buffersize, false) fm := &FrameMgr{sendb: sendb, recvb: recvb, - sendlock: &sync.Mutex{}, recvlock: &sync.Mutex{}, - windowsize: windowsize, win: list.New(), sendid: 0, - resend_timems: resend_timems, sendlist: list.New()} + recvlock: &sync.Mutex{}, + windowsize: windowsize, resend_timems: resend_timems, + sendwin: list.New(), sendlist: list.New(), sendid: 0, + recvwin: list.New(), recvlist: list.New(), recvid: 0} return fm } @@ -38,19 +44,20 @@ func (fm *FrameMgr) GetSendBufferLeft() int { } func (fm *FrameMgr) WriteSendBuffer(data []byte) { - fm.sendlock.Lock() - defer fm.sendlock.Unlock() fm.sendb.Write(data) } func (fm *FrameMgr) Update() { fm.cutSendBufferToWindow() + + fm.sendlist.Init() + tmpreq, tmpack, tmpackto := fm.preProcessRecvList() + fm.processRecvList(tmpreq, tmpack, tmpackto) + fm.calSendList() } func (fm *FrameMgr) cutSendBufferToWindow() { - fm.sendlock.Lock() - defer fm.sendlock.Unlock() sendall := false @@ -58,8 +65,8 @@ func (fm *FrameMgr) cutSendBufferToWindow() { sendall = true } - for fm.sendb.Size() > FRAME_MAX_SIZE && fm.win.Len() < fm.windowsize { - f := Frame{Resend: false, Sendtime: 0, + for fm.sendb.Size() >= FRAME_MAX_SIZE && fm.sendwin.Len() < fm.windowsize { + f := &Frame{Type: (int32)(Frame_DATA), Resend: false, Sendtime: 0, Id: (int32)(fm.sendid), Data: make([]byte, FRAME_MAX_SIZE)} fm.sendb.Read(f.Data) @@ -69,11 +76,11 @@ func (fm *FrameMgr) cutSendBufferToWindow() { fm.sendid = 0 } - fm.win.PushBack(f) + fm.sendwin.PushBack(f) } - if sendall && fm.sendb.Size() > 0 && fm.win.Len() < fm.windowsize { - f := Frame{Resend: false, Sendtime: 0, + if sendall && fm.sendb.Size() > 0 && fm.sendwin.Len() < fm.windowsize { + f := &Frame{Type: (int32)(Frame_DATA), Resend: false, Sendtime: 0, Id: (int32)(fm.sendid), Data: make([]byte, fm.sendb.Size())} fm.sendb.Read(f.Data) @@ -83,20 +90,18 @@ func (fm *FrameMgr) cutSendBufferToWindow() { fm.sendid = 0 } - fm.win.PushBack(f) + fm.sendwin.PushBack(f) } } func (fm *FrameMgr) calSendList() { cur := time.Now().UnixNano() - - fm.sendlist.Init() - - for e := fm.win.Front(); e != nil; e = e.Next() { - f := e.Value.(Frame) + for e := fm.sendwin.Front(); e != nil; e = e.Next() { + f := e.Value.(*Frame) if f.Resend || cur-f.Sendtime > int64(fm.resend_timems*1000) { f.Sendtime = cur - fm.sendlist.PushBack(&f) + fm.sendlist.PushBack(f) + f.Resend = false } } } @@ -104,3 +109,95 @@ func (fm *FrameMgr) calSendList() { func (fm *FrameMgr) getSendList() *list.List { return fm.sendlist } + +func (fm *FrameMgr) OnRecvFrame(f *Frame) { + fm.recvlock.Lock() + defer fm.recvlock.Unlock() + + fm.recvlist.PushBack(f) +} + +func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int32]*Frame) { + fm.recvlock.Lock() + defer fm.recvlock.Unlock() + + tmpreq := make(map[int32]int) + tmpack := make(map[int32]int) + tmpackto := make(map[int32]*Frame) + for e := fm.recvlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*Frame) + if f.Type == (int32)(Frame_REQ) { + for _, id := range f.Dataid { + tmpreq[id]++ + } + } else if f.Type == (int32)(Frame_ACK) { + for _, id := range f.Dataid { + tmpack[id]++ + } + } else if f.Type == (int32)(Frame_DATA) { + tmpackto[f.Id] = f + } + } + fm.recvlist.Init() + return tmpreq, tmpack, tmpackto +} + +func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int, tmpackto map[int32]*Frame) { + + for id, _ := range tmpreq { + for e := fm.sendwin.Front(); e != nil; e = e.Next() { + f := e.Value.(*Frame) + if f.Id == id { + f.Resend = true + break + } + } + } + + for id, _ := range tmpack { + for e := fm.sendwin.Front(); e != nil; e = e.Next() { + f := e.Value.(*Frame) + if f.Id == id { + fm.sendwin.Remove(e) + break + } + } + } + + if len(tmpackto) > 0 { + f := &Frame{Type: (int32)(Frame_ACK), Resend: false, Sendtime: 0, + Id: 0, + Dataid: make([]int32, len(tmpackto))} + index := 0 + for id, rf := range tmpackto { + f.Dataid[index] = id + index++ + fm.addToRecvWin(rf) + } + fm.sendlist.PushBack(f) + } +} + +func (fm *FrameMgr) addToRecvWin(rf *Frame) { + + for e := fm.recvwin.Front(); e != nil; e = e.Next() { + f := e.Value.(*Frame) + if f.Id == rf.Id { + return + } + } + + for e := fm.recvwin.Front(); e != nil; e = e.Next() { + f := e.Value.(*Frame) + if rf.Id < f.Id { + fm.recvwin.InsertBefore(rf, e) + return + } + } + + if fm.recvwin.Len() > 0 { + fm.recvwin.PushBack(rf) + } else { + fm.recvwin.PushBack(rf) + } +} diff --git a/server.go b/server.go index a9a1768..ab172b4 100644 --- a/server.go +++ b/server.go @@ -2,6 +2,7 @@ package pingtunnel import ( "github.com/esrrhs/go-engine/src/loggo" + "github.com/golang/protobuf/proto" "golang.org/x/net/icmp" "net" "time" @@ -40,6 +41,8 @@ type ServerConn struct { activeTime time.Time close bool rproto int + fm *FrameMgr + tcpmode int } func (p *Server) Run() { @@ -112,8 +115,10 @@ func (p *Server) processPacket(packet *Packet) { return } + fm := NewFrameMgr((int)(packet.my.TcpmodeBuffersize), (int)(packet.my.TcpmodeMaxwin), (int)(packet.my.TcpmodeResendTimems)) + localConn = &ServerConn{tcpconn: targetConn, tcpaddrTarget: ipaddrTarget, id: id, activeTime: now, close: false, - rproto: (int)(packet.my.Rproto)} + rproto: (int)(packet.my.Rproto), fm: fm, tcpmode: (int)(packet.my.Tcpmode)} p.localConnMap[id] = localConn @@ -135,7 +140,7 @@ func (p *Server) processPacket(packet *Packet) { } localConn = &ServerConn{conn: targetConn, ipaddrTarget: ipaddrTarget, id: id, activeTime: now, close: false, - rproto: (int)(packet.my.Rproto)} + rproto: (int)(packet.my.Rproto), tcpmode: (int)(packet.my.Tcpmode)} p.localConnMap[id] = localConn @@ -147,11 +152,23 @@ func (p *Server) processPacket(packet *Packet) { if packet.my.Type == (int32)(MyMsg_DATA) { - _, err := localConn.conn.Write(packet.my.Data) - if err != nil { - loggo.Error("WriteToUDP Error %s", err) - localConn.close = true - return + if packet.my.Tcpmode > 0 { + f := &Frame{} + err := proto.Unmarshal(packet.my.Data, f) + if err != nil { + loggo.Error("Unmarshal tcp Error %s", err) + return + } + + localConn.fm.OnRecvFrame(f) + + } else { + _, err := localConn.conn.Write(packet.my.Data) + if err != nil { + loggo.Error("WriteToUDP Error %s", err) + localConn.close = true + return + } } p.recvPacket++ @@ -163,33 +180,52 @@ func (p *Server) RecvTCP(conn *ServerConn, id string, src *net.IPAddr) { loggo.Info("server waiting target response %s -> %s %s", conn.tcpaddrTarget.String(), conn.id, conn.tcpconn.LocalAddr().String()) - for { - bytes := make([]byte, 2000) + bytes := make([]byte, 10240) - conn.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) - n, _, err := conn.conn.ReadFromUDP(bytes) - if err != nil { - if neterr, ok := err.(*net.OpError); ok { - if neterr.Timeout() { - // Read timeout - continue - } else { - loggo.Error("ReadFromUDP Error read udp %s", err) - conn.close = true - return + for { + left := conn.fm.GetSendBufferLeft() + if left >= len(bytes) { + conn.tcpconn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) + n, err := conn.tcpconn.Read(bytes) + if err != nil { + if neterr, ok := err.(*net.OpError); ok { + if neterr.Timeout() { + // Read timeout + n = 0 + } else { + loggo.Error("Error read tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) + break + } } } + if n > 0 { + conn.fm.WriteSendBuffer(bytes[:n]) + } } + conn.fm.Update() + + sendlist := conn.fm.getSendList() + now := time.Now() conn.activeTime = now - sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), bytes[:n], - conn.rproto, -1, p.key, - 0, 0, 0, 0) + for e := sendlist.Front(); e != nil; e = e.Next() { - p.sendPacket++ - p.sendPacketSize += (uint64)(n) + f := e.Value.(Frame) + mb, err := proto.Marshal(&f) + if err != nil { + loggo.Error("Error tcp Marshal %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) + continue + } + + sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb, + conn.rproto, -1, p.key, + 0, 0, 0, 0) + + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) + } } } @@ -238,6 +274,9 @@ func (p *Server) checkTimeoutConn() { now := time.Now() for _, conn := range p.localConnMap { + if conn.tcpmode > 0 { + continue + } diff := now.Sub(conn.activeTime) if diff > time.Second*(time.Duration(p.timeout)) { conn.close = true @@ -245,6 +284,9 @@ func (p *Server) checkTimeoutConn() { } for id, conn := range p.localConnMap { + if conn.tcpmode > 0 { + continue + } if conn.close { loggo.Info("close inactive conn %s %s", id, conn.ipaddrTarget.String()) p.Close(conn)