This commit is contained in:
esrrhs 2019-10-19 22:36:17 +08:00
parent 1402d824c6
commit a3036dcc66
4 changed files with 207 additions and 55 deletions

113
client.go
View File

@ -2,8 +2,6 @@ package pingtunnel
import ( import (
"github.com/esrrhs/go-engine/src/loggo" "github.com/esrrhs/go-engine/src/loggo"
"github.com/esrrhs/go-engine/src/pool"
"github.com/esrrhs/go-engine/src/rbuffergo"
"golang.org/x/net/icmp" "golang.org/x/net/icmp"
"math" "math"
"math/rand" "math/rand"
@ -34,29 +32,21 @@ func NewClient(addr string, server string, target string, timeout int, sproto in
return nil, err return nil, err
} }
var sendFramePool *pool.Pool
if tcpmode {
sendFramePool = pool.New(func() interface{} {
return Frame{size: 0, data: make([]byte, FRAME_MAX_SIZE)}
})
}
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
return &Client{ return &Client{
id: r.Intn(math.MaxInt16), id: r.Intn(math.MaxInt16),
ipaddr: ipaddr, ipaddr: ipaddr,
tcpaddr: tcpaddr, tcpaddr: tcpaddr,
addr: addr, addr: addr,
ipaddrServer: ipaddrServer, ipaddrServer: ipaddrServer,
addrServer: server, addrServer: server,
targetAddr: target, targetAddr: target,
timeout: timeout, timeout: timeout,
sproto: sproto, sproto: sproto,
rproto: rproto, rproto: rproto,
catch: catch, catch: catch,
key: key, key: key,
tcpmode: tcpmode, tcpmode: tcpmode,
sendFramePool: sendFramePool,
}, nil }, nil
} }
@ -64,14 +54,15 @@ type Client struct {
id int id int
sequence int sequence int
timeout int timeout int
sproto int sproto int
rproto int rproto int
catch int catch int
key int key int
tcpmode bool tcpmode bool
tcpmode_buffersize int
sendFramePool *pool.Pool tcpmode_maxwin int
tcpmode_resend_timems int
ipaddr *net.UDPAddr ipaddr *net.UDPAddr
tcpaddr *net.TCPAddr tcpaddr *net.TCPAddr
@ -105,8 +96,7 @@ type ClientConn struct {
activeTime time.Time activeTime time.Time
close bool close bool
sendb *rbuffergo.RBuffergo fm *FrameMgr
recvb *rbuffergo.RBuffergo
} }
func (p *Client) Addr() string { func (p *Client) Addr() string {
@ -221,57 +211,71 @@ func (p *Client) AcceptTcp() error {
} }
func (p *Client) AcceptTcpConn(conn *net.TCPConn) error { func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
now := time.Now() now := time.Now()
uuid := UniqueId() uuid := UniqueId()
tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr) tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr)
sendb := rbuffergo.New(1024*1024, false) fm := NewFrameMgr(p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems)
recvb := rbuffergo.New(1024*1024, false)
cutsize := FRAME_MAX_SIZE clientConn := &ClientConn{tcpaddr: tcpsrcaddr, id: uuid, activeTime: now, close: false,
sendwin := sendb.Capacity() / cutsize fm: fm}
clientConn := &ClientConn{tcpaddr: tcpsrcaddr, id: uuid, activeTime: now, close: false, sendb: sendb, recvb: recvb}
p.localAddrToConnMap[tcpsrcaddr.String()] = clientConn p.localAddrToConnMap[tcpsrcaddr.String()] = clientConn
p.localIdToConnMap[uuid] = clientConn p.localIdToConnMap[uuid] = clientConn
loggo.Info("client accept new local tcp %s %s", uuid, tcpsrcaddr.String()) loggo.Info("client accept new local tcp %s %s", uuid, tcpsrcaddr.String())
bytes := make([]byte, 10240) bytes := make([]byte, 10240)
sendwinmap := make(map[string]*ClientConn)
for { for {
left := sendb.Capacity() - sendb.Size() left := clientConn.fm.GetSendBufferLeft()
if left >= len(bytes) { if left >= len(bytes) {
conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, srcaddr, err := p.conn.ReadFrom(bytes) n, err := conn.Read(bytes)
if err != nil { if err != nil {
if neterr, ok := err.(*net.OpError); ok { if neterr, ok := err.(*net.OpError); ok {
if neterr.Timeout() { if neterr.Timeout() {
// Read timeout // Read timeout
continue n = 0
} else { } else {
loggo.Error("Error read tcp %s %s", srcaddr.String(), err) loggo.Error("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
break break
} }
} }
} }
if n > 0 { if n > 0 {
sendb.Write(bytes[:n]) clientConn.fm.WriteSendBuffer(bytes[:n])
} }
} }
clientConn.fm.Update()
sendlist := clientConn.fm.getSendList()
clientConn.activeTime = now clientConn.activeTime = now
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(DATA), bytes[:n],
p.sproto, p.rproto, p.catch, p.key)
p.sequence++ for e := sendlist.Front(); e != nil; e = e.Next() {
p.sendPacket++ f := e.Value.(Frame)
p.sendPacketSize += (uint64)(n) mb, err := f.Marshal(0)
if err != nil {
loggo.Error("Error tcp Marshal %s %s %s", uuid, tcpsrcaddr.String(), err)
break
}
p.sequence++
p.sendPacket++
p.sendPacketSize += (uint64)(f.size)
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(DATA), mb,
p.sproto, p.rproto, p.catch, p.key)
}
} }
loggo.Info("close inactive conn %s %s", clientConn.id, clientConn.tcpaddr.String())
conn.Close()
p.Close(clientConn)
} }
func (p *Client) Accept() error { func (p *Client) Accept() error {
@ -374,6 +378,11 @@ func (p *Client) Close(clientConn *ClientConn) {
} }
func (p *Client) checkTimeoutConn() { func (p *Client) checkTimeoutConn() {
if p.tcpmode {
return
}
now := time.Now() now := time.Now()
for _, conn := range p.localIdToConnMap { for _, conn := range p.localIdToConnMap {
diff := now.Sub(conn.activeTime) diff := now.Sub(conn.activeTime)

View File

@ -89,7 +89,8 @@ func main() {
fmt.Printf("server %s\n", *server) fmt.Printf("server %s\n", *server)
fmt.Printf("target %s\n", *target) fmt.Printf("target %s\n", *target)
c, err := pingtunnel.NewClient(*listen, *server, *target, *timeout, *sproto, *rproto, *catch, *key, *tcpmode) c, err := pingtunnel.NewClient(*listen, *server, *target, *timeout, *sproto, *rproto, *catch, *key,
*tcpmode)
if err != nil { if err != nil {
fmt.Printf("ERROR: %s\n", err.Error()) fmt.Printf("ERROR: %s\n", err.Error())
return return

107
framemgr.go Normal file
View File

@ -0,0 +1,107 @@
package pingtunnel
import (
"container/list"
"github.com/esrrhs/go-engine/src/rbuffergo"
"math/rand"
"sync"
"time"
)
type FrameMgr struct {
sendb *rbuffergo.RBuffergo
recvb *rbuffergo.RBuffergo
sendlock sync.Locker
recvlock sync.Locker
windowsize int
resend_timems int
win *list.List
sendid int
sendlist *list.List
}
func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
sendb := rbuffergo.New(buffersize, false)
recvb := rbuffergo.New(buffersize, false)
fm := &FrameMgr{sendb: sendb, recvb: recvb,
sendlock: &sync.Mutex{}, recvlock: &sync.Mutex{},
windowsize: windowsize, win: list.New(), sendid: rand.Int() % (FRAME_MAX_ID + 1),
resend_timems: resend_timems, sendlist: list.New()}
return fm
}
func (fm *FrameMgr) GetSendBufferLeft() int {
left := fm.sendb.Capacity() - fm.sendb.Size()
return left
}
func (fm *FrameMgr) WriteSendBuffer(data []byte) {
fm.sendlock.Lock()
defer fm.sendlock.Unlock()
fm.sendb.Write(data)
}
func (fm *FrameMgr) Update() {
fm.cutSendBufferToWindow()
fm.calSendList()
}
func (fm *FrameMgr) cutSendBufferToWindow() {
fm.sendlock.Lock()
defer fm.sendlock.Unlock()
sendall := false
if fm.sendb.Size() < FRAME_MAX_SIZE {
sendall = true
}
for fm.sendb.Size() > FRAME_MAX_SIZE && fm.win.Len() < fm.windowsize {
f := Frame{resend: false, sendtime: 0,
id: fm.sendid, size: FRAME_MAX_SIZE,
data: make([]byte, FRAME_MAX_SIZE)}
fm.sendb.Read(f.data)
fm.sendid++
if fm.sendid > FRAME_MAX_ID {
fm.sendid = 0
}
fm.win.PushBack(f)
}
if sendall && fm.sendb.Size() > 0 && fm.win.Len() < fm.windowsize {
f := Frame{resend: false, sendtime: 0,
id: fm.sendid, size: fm.sendb.Size(),
data: make([]byte, fm.sendb.Size())}
fm.sendb.Read(f.data)
fm.sendid++
if fm.sendid > FRAME_MAX_ID {
fm.sendid = 0
}
fm.win.PushBack(f)
}
}
func (fm *FrameMgr) calSendList() {
cur := time.Now().UnixNano()
fm.sendlist.Init()
for e := fm.win.Front(); e != nil; e = e.Next() {
f := e.Value.(Frame)
if f.resend || cur-f.sendtime > int64(fm.resend_timems*1000) {
f.sendtime = cur
fm.sendlist.PushBack(&f)
}
}
}
func (fm *FrameMgr) getSendList() *list.List {
return fm.sendlist
}

View File

@ -264,9 +264,44 @@ type CatchMsg struct {
const ( const (
FRAME_MAX_SIZE int = 888 FRAME_MAX_SIZE int = 888
FRAME_MAX_ID int = 999
)
const (
FRAME_TYPE_DATA int = 0x0101
FRAME_TYPE_REQ int = 0x0202
FRAME_TYPE_ACK int = 0x0303
) )
type Frame struct { type Frame struct {
size int ty int
data []byte resend bool
sendtime int64
id int
size int
data []byte
dataid []int
}
// Marshal implements the Marshal method of MessageBody interface.
func (p *Frame) Marshal(proto int) ([]byte, error) {
b := make([]byte, p.Len(proto))
binary.BigEndian.PutUint16(b[:2], uint16(p.ty))
datalen := len(p.data)
binary.BigEndian.PutUint16(b[2:4], uint16(datalen))
// TODO
return b, nil
}
// Len implements the Len method of MessageBody interface.
func (p *Frame) Len(proto int) int {
if p == nil {
return 0
}
return 4 + 2 + 4 + 4 // TODO
} }