From a3036dcc664cda9ef2fa8d718edfcd0bbfc4e356 Mon Sep 17 00:00:00 2001 From: esrrhs Date: Sat, 19 Oct 2019 22:36:17 +0800 Subject: [PATCH] add --- client.go | 113 +++++++++++++++++++++++++++----------------------- cmd/main.go | 3 +- framemgr.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++ pingtunnel.go | 39 ++++++++++++++++- 4 files changed, 207 insertions(+), 55 deletions(-) create mode 100644 framemgr.go diff --git a/client.go b/client.go index ab80019..baa925d 100644 --- a/client.go +++ b/client.go @@ -2,8 +2,6 @@ package pingtunnel import ( "github.com/esrrhs/go-engine/src/loggo" - "github.com/esrrhs/go-engine/src/pool" - "github.com/esrrhs/go-engine/src/rbuffergo" "golang.org/x/net/icmp" "math" "math/rand" @@ -34,29 +32,21 @@ func NewClient(addr string, server string, target string, timeout int, sproto in return nil, err } - var sendFramePool *pool.Pool - if tcpmode { - sendFramePool = pool.New(func() interface{} { - return Frame{size: 0, data: make([]byte, FRAME_MAX_SIZE)} - }) - } - r := rand.New(rand.NewSource(time.Now().UnixNano())) return &Client{ - id: r.Intn(math.MaxInt16), - ipaddr: ipaddr, - tcpaddr: tcpaddr, - addr: addr, - ipaddrServer: ipaddrServer, - addrServer: server, - targetAddr: target, - timeout: timeout, - sproto: sproto, - rproto: rproto, - catch: catch, - key: key, - tcpmode: tcpmode, - sendFramePool: sendFramePool, + id: r.Intn(math.MaxInt16), + ipaddr: ipaddr, + tcpaddr: tcpaddr, + addr: addr, + ipaddrServer: ipaddrServer, + addrServer: server, + targetAddr: target, + timeout: timeout, + sproto: sproto, + rproto: rproto, + catch: catch, + key: key, + tcpmode: tcpmode, }, nil } @@ -64,14 +54,15 @@ type Client struct { id int sequence int - timeout int - sproto int - rproto int - catch int - key int - tcpmode bool - - sendFramePool *pool.Pool + timeout int + sproto int + rproto int + catch int + key int + tcpmode bool + tcpmode_buffersize int + tcpmode_maxwin int + tcpmode_resend_timems int ipaddr *net.UDPAddr tcpaddr *net.TCPAddr @@ -105,8 +96,7 @@ type ClientConn struct { activeTime time.Time close bool - sendb *rbuffergo.RBuffergo - recvb *rbuffergo.RBuffergo + fm *FrameMgr } func (p *Client) Addr() string { @@ -221,57 +211,71 @@ func (p *Client) AcceptTcp() error { } -func (p *Client) AcceptTcpConn(conn *net.TCPConn) error { +func (p *Client) AcceptTcpConn(conn *net.TCPConn) { now := time.Now() uuid := UniqueId() tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr) - sendb := rbuffergo.New(1024*1024, false) - recvb := rbuffergo.New(1024*1024, false) + fm := NewFrameMgr(p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems) - cutsize := FRAME_MAX_SIZE - sendwin := sendb.Capacity() / cutsize - - clientConn := &ClientConn{tcpaddr: tcpsrcaddr, id: uuid, activeTime: now, close: false, sendb: sendb, recvb: recvb} + clientConn := &ClientConn{tcpaddr: tcpsrcaddr, id: uuid, activeTime: now, close: false, + fm: fm} p.localAddrToConnMap[tcpsrcaddr.String()] = clientConn p.localIdToConnMap[uuid] = clientConn loggo.Info("client accept new local tcp %s %s", uuid, tcpsrcaddr.String()) bytes := make([]byte, 10240) - sendwinmap := make(map[string]*ClientConn) - for { - left := sendb.Capacity() - sendb.Size() + left := clientConn.fm.GetSendBufferLeft() if left >= len(bytes) { conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) - n, srcaddr, err := p.conn.ReadFrom(bytes) + n, err := conn.Read(bytes) if err != nil { if neterr, ok := err.(*net.OpError); ok { if neterr.Timeout() { // Read timeout - continue + n = 0 } else { - loggo.Error("Error read tcp %s %s", srcaddr.String(), err) + loggo.Error("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err) break } } } if n > 0 { - sendb.Write(bytes[:n]) + clientConn.fm.WriteSendBuffer(bytes[:n]) } } + clientConn.fm.Update() + + sendlist := clientConn.fm.getSendList() + clientConn.activeTime = now - sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(DATA), bytes[:n], - p.sproto, p.rproto, p.catch, p.key) - p.sequence++ + for e := sendlist.Front(); e != nil; e = e.Next() { - p.sendPacket++ - p.sendPacketSize += (uint64)(n) + f := e.Value.(Frame) + mb, err := f.Marshal(0) + if err != nil { + loggo.Error("Error tcp Marshal %s %s %s", uuid, tcpsrcaddr.String(), err) + break + } + + p.sequence++ + + p.sendPacket++ + p.sendPacketSize += (uint64)(f.size) + + sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(DATA), mb, + p.sproto, p.rproto, p.catch, p.key) + } } + + loggo.Info("close inactive conn %s %s", clientConn.id, clientConn.tcpaddr.String()) + conn.Close() + p.Close(clientConn) } func (p *Client) Accept() error { @@ -374,6 +378,11 @@ func (p *Client) Close(clientConn *ClientConn) { } func (p *Client) checkTimeoutConn() { + + if p.tcpmode { + return + } + now := time.Now() for _, conn := range p.localIdToConnMap { diff := now.Sub(conn.activeTime) diff --git a/cmd/main.go b/cmd/main.go index 8484ca8..e1e78b5 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -89,7 +89,8 @@ func main() { fmt.Printf("server %s\n", *server) fmt.Printf("target %s\n", *target) - c, err := pingtunnel.NewClient(*listen, *server, *target, *timeout, *sproto, *rproto, *catch, *key, *tcpmode) + c, err := pingtunnel.NewClient(*listen, *server, *target, *timeout, *sproto, *rproto, *catch, *key, + *tcpmode) if err != nil { fmt.Printf("ERROR: %s\n", err.Error()) return diff --git a/framemgr.go b/framemgr.go new file mode 100644 index 0000000..2bb67d4 --- /dev/null +++ b/framemgr.go @@ -0,0 +1,107 @@ +package pingtunnel + +import ( + "container/list" + "github.com/esrrhs/go-engine/src/rbuffergo" + "math/rand" + "sync" + "time" +) + +type FrameMgr struct { + sendb *rbuffergo.RBuffergo + recvb *rbuffergo.RBuffergo + sendlock sync.Locker + recvlock sync.Locker + windowsize int + resend_timems int + win *list.List + sendid int + sendlist *list.List +} + +func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr { + + sendb := rbuffergo.New(buffersize, false) + recvb := rbuffergo.New(buffersize, false) + + fm := &FrameMgr{sendb: sendb, recvb: recvb, + sendlock: &sync.Mutex{}, recvlock: &sync.Mutex{}, + windowsize: windowsize, win: list.New(), sendid: rand.Int() % (FRAME_MAX_ID + 1), + resend_timems: resend_timems, sendlist: list.New()} + + return fm +} + +func (fm *FrameMgr) GetSendBufferLeft() int { + left := fm.sendb.Capacity() - fm.sendb.Size() + return left +} + +func (fm *FrameMgr) WriteSendBuffer(data []byte) { + fm.sendlock.Lock() + defer fm.sendlock.Unlock() + fm.sendb.Write(data) +} + +func (fm *FrameMgr) Update() { + fm.cutSendBufferToWindow() + fm.calSendList() +} + +func (fm *FrameMgr) cutSendBufferToWindow() { + fm.sendlock.Lock() + defer fm.sendlock.Unlock() + + sendall := false + + if fm.sendb.Size() < FRAME_MAX_SIZE { + sendall = true + } + + for fm.sendb.Size() > FRAME_MAX_SIZE && fm.win.Len() < fm.windowsize { + f := Frame{resend: false, sendtime: 0, + id: fm.sendid, size: FRAME_MAX_SIZE, + data: make([]byte, FRAME_MAX_SIZE)} + fm.sendb.Read(f.data) + + fm.sendid++ + if fm.sendid > FRAME_MAX_ID { + fm.sendid = 0 + } + + fm.win.PushBack(f) + } + + if sendall && fm.sendb.Size() > 0 && fm.win.Len() < fm.windowsize { + f := Frame{resend: false, sendtime: 0, + id: fm.sendid, size: fm.sendb.Size(), + data: make([]byte, fm.sendb.Size())} + fm.sendb.Read(f.data) + + fm.sendid++ + if fm.sendid > FRAME_MAX_ID { + fm.sendid = 0 + } + + fm.win.PushBack(f) + } +} + +func (fm *FrameMgr) calSendList() { + cur := time.Now().UnixNano() + + fm.sendlist.Init() + + for e := fm.win.Front(); e != nil; e = e.Next() { + f := e.Value.(Frame) + if f.resend || cur-f.sendtime > int64(fm.resend_timems*1000) { + f.sendtime = cur + fm.sendlist.PushBack(&f) + } + } +} + +func (fm *FrameMgr) getSendList() *list.List { + return fm.sendlist +} diff --git a/pingtunnel.go b/pingtunnel.go index 24c6f93..8fbae8d 100644 --- a/pingtunnel.go +++ b/pingtunnel.go @@ -264,9 +264,44 @@ type CatchMsg struct { const ( FRAME_MAX_SIZE int = 888 + FRAME_MAX_ID int = 999 +) + +const ( + FRAME_TYPE_DATA int = 0x0101 + FRAME_TYPE_REQ int = 0x0202 + FRAME_TYPE_ACK int = 0x0303 ) type Frame struct { - size int - data []byte + ty int + resend bool + sendtime int64 + id int + size int + data []byte + dataid []int +} + +// Marshal implements the Marshal method of MessageBody interface. +func (p *Frame) Marshal(proto int) ([]byte, error) { + + b := make([]byte, p.Len(proto)) + + binary.BigEndian.PutUint16(b[:2], uint16(p.ty)) + + datalen := len(p.data) + binary.BigEndian.PutUint16(b[2:4], uint16(datalen)) + + // TODO + + return b, nil +} + +// Len implements the Len method of MessageBody interface. +func (p *Frame) Len(proto int) int { + if p == nil { + return 0 + } + return 4 + 2 + 4 + 4 // TODO }