This commit is contained in:
esrrhs 2019-10-27 17:17:47 +08:00
parent b67d5d372d
commit f93b24d3d1
5 changed files with 174 additions and 36 deletions

View File

@ -216,6 +216,36 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
p.localIdToConnMap[uuid] = clientConn
loggo.Info("client accept new local tcp %s %s", uuid, tcpsrcaddr.String())
loggo.Info("start connect remote tcp %s %s", uuid, tcpsrcaddr.String())
startConnectTime := time.Now()
for {
if clientConn.fm.IsRemoteConnected() {
break
}
clientConn.fm.Update()
sendlist := clientConn.fm.getSendList()
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, _ := proto.Marshal(f)
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems,
p.timeout)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
time.Sleep(time.Millisecond * 10)
now := time.Now()
diffclose := now.Sub(startConnectTime)
if diffclose > time.Second*(time.Duration(p.timeout)) {
loggo.Info("can not connect remote tcp %s %s", uuid, tcpsrcaddr.String())
p.Close(clientConn)
return
}
}
loggo.Info("connected remote tcp %s %s", uuid, tcpsrcaddr.String())
bytes := make([]byte, 10240)
tcpActiveRecvTime := time.Now()

View File

@ -34,6 +34,8 @@ type FrameMgr struct {
reqmap map[int32]int64
sendmap map[int32]int64
remote_connected bool
}
func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
@ -48,7 +50,8 @@ func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
recvwin: list.New(), recvlist: list.New(), recvid: 0,
close: false, remoteclosed: false, closesend: false,
lastPingTime: time.Now().UnixNano(), rttns: (int64)(resend_timems * 1000),
reqmap: make(map[int32]int64), sendmap: make(map[int32]int64)}
reqmap: make(map[int32]int64), sendmap: make(map[int32]int64),
remote_connected: false}
return fm
}
@ -73,6 +76,8 @@ func (fm *FrameMgr) Update() {
fm.combineWindowToRecvBuffer()
fm.tryConnect()
fm.calSendList()
fm.ping()
@ -80,6 +85,10 @@ func (fm *FrameMgr) Update() {
func (fm *FrameMgr) cutSendBufferToWindow() {
if !fm.remote_connected {
return
}
sendall := false
if fm.sendb.Size() < FRAME_MAX_SIZE {
@ -133,6 +142,11 @@ func (fm *FrameMgr) cutSendBufferToWindow() {
}
func (fm *FrameMgr) calSendList() {
if !fm.remote_connected {
return
}
cur := time.Now().UnixNano()
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
@ -185,6 +199,14 @@ func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int3
fm.processPing(f)
} else if f.Type == (int32)(Frame_PONG) {
fm.processPong(f)
} else if f.Type == (int32)(Frame_REG) {
fm.processReg(f)
} else if f.Type == (int32)(Frame_REGACK) {
fm.processRegAck(f)
} else if f.Type == (int32)(Frame_REG) {
fm.processRegAgain(f)
fm.remote_connected = true
loggo.Debug("recv reg again %d %d", f.Id, len(f.Data))
}
}
fm.recvlist.Init()
@ -193,6 +215,10 @@ func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int3
func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int, tmpackto map[int32]*Frame) {
if !fm.remote_connected {
return
}
for id, _ := range tmpreq {
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
@ -271,6 +297,10 @@ func (fm *FrameMgr) addToRecvWin(rf *Frame) bool {
func (fm *FrameMgr) combineWindowToRecvBuffer() {
if !fm.remote_connected {
return
}
for {
done := false
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
@ -367,6 +397,10 @@ func (fm *FrameMgr) IsRemoteClosed() bool {
}
func (fm *FrameMgr) ping() {
if !fm.remote_connected {
return
}
cur := time.Now().UnixNano()
if cur-fm.lastPingTime > (int64)(time.Second) {
f := &Frame{Type: (int32)(Frame_PING), Resend: false, Sendtime: cur,
@ -438,3 +472,33 @@ func (fm *FrameMgr) isIdOld(id int, maxid int) bool {
return false
}
func (fm *FrameMgr) IsRemoteConnected() bool {
return fm.remote_connected
}
func (fm *FrameMgr) tryConnect() {
if !fm.remote_connected {
f := &Frame{Type: (int32)(Frame_REG)}
fm.sendlist.PushBack(f)
loggo.Debug("try connect")
}
}
func (fm *FrameMgr) processReg(f *Frame) {
rf := &Frame{Type: (int32)(Frame_REGACK)}
fm.sendlist.PushBack(rf)
loggo.Debug("recv reg ")
}
func (fm *FrameMgr) processRegAck(f *Frame) {
rf := &Frame{Type: (int32)(Frame_REGAGAIN)}
fm.sendlist.PushBack(rf)
fm.remote_connected = true
loggo.Debug("recv reg ack ")
}
func (fm *FrameMgr) processRegAgain(f *Frame) {
fm.remote_connected = true
loggo.Debug("recv reg ack again ")
}

View File

@ -51,11 +51,14 @@ func (MyMsg_TYPE) EnumDescriptor() ([]byte, []int) {
type Frame_TYPE int32
const (
Frame_DATA Frame_TYPE = 0
Frame_REQ Frame_TYPE = 1
Frame_ACK Frame_TYPE = 2
Frame_PING Frame_TYPE = 3
Frame_PONG Frame_TYPE = 4
Frame_DATA Frame_TYPE = 0
Frame_REQ Frame_TYPE = 1
Frame_ACK Frame_TYPE = 2
Frame_PING Frame_TYPE = 3
Frame_PONG Frame_TYPE = 4
Frame_REG Frame_TYPE = 5
Frame_REGACK Frame_TYPE = 6
Frame_REGAGAIN Frame_TYPE = 7
)
var Frame_TYPE_name = map[int32]string{
@ -64,14 +67,20 @@ var Frame_TYPE_name = map[int32]string{
2: "ACK",
3: "PING",
4: "PONG",
5: "REG",
6: "REGACK",
7: "REGAGAIN",
}
var Frame_TYPE_value = map[string]int32{
"DATA": 0,
"REQ": 1,
"ACK": 2,
"PING": 3,
"PONG": 4,
"DATA": 0,
"REQ": 1,
"ACK": 2,
"PING": 3,
"PONG": 4,
"REG": 5,
"REGACK": 6,
"REGAGAIN": 7,
}
func (x Frame_TYPE) String() string {
@ -298,29 +307,31 @@ func init() {
func init() { proto.RegisterFile("msg.proto", fileDescriptor_c06e4cca6c2cc899) }
var fileDescriptor_c06e4cca6c2cc899 = []byte{
// 383 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x51, 0x5d, 0xab, 0xd3, 0x30,
0x18, 0x36, 0x6d, 0xd3, 0x75, 0xef, 0x99, 0x87, 0x9e, 0xf8, 0x41, 0xf0, 0xaa, 0x14, 0xc4, 0xde,
0xe8, 0x85, 0x82, 0xf7, 0x3d, 0xc7, 0xe3, 0x18, 0xd2, 0x39, 0xc3, 0x6e, 0xf4, 0x66, 0x74, 0x6b,
0x56, 0x82, 0xa6, 0x2d, 0x6d, 0x86, 0xce, 0xff, 0xe4, 0x4f, 0xf0, 0xca, 0xff, 0x25, 0x92, 0x77,
0x59, 0x61, 0x78, 0x95, 0xe7, 0x8b, 0xe6, 0x69, 0x1e, 0x98, 0xea, 0xa1, 0x7e, 0xd5, 0xf5, 0xad,
0x69, 0xd3, 0xbf, 0x1e, 0xd0, 0xe2, 0x58, 0x0c, 0x35, 0xbb, 0x06, 0x4f, 0x55, 0x9c, 0x24, 0x24,
0x9b, 0x0a, 0x4f, 0x55, 0x8c, 0x41, 0x60, 0x8e, 0x9d, 0xe4, 0x5e, 0x42, 0x32, 0x2a, 0x10, 0xb3,
0xa7, 0x10, 0x9a, 0xb2, 0xaf, 0xa5, 0xe1, 0x3e, 0xe6, 0x1c, 0xb3, 0xd9, 0xaa, 0x34, 0x25, 0x0f,
0x12, 0x92, 0xcd, 0x04, 0x62, 0x9b, 0xed, 0xf1, 0x0e, 0x4e, 0x13, 0x92, 0xdd, 0x08, 0xc7, 0xd8,
0x63, 0xa0, 0xba, 0xac, 0xd5, 0x8e, 0x87, 0x28, 0x9f, 0x08, 0x8b, 0xc1, 0xff, 0x2a, 0x8f, 0x7c,
0x82, 0x9a, 0x85, 0x8c, 0xc3, 0xc4, 0xec, 0x3a, 0xdd, 0x56, 0x92, 0x47, 0x58, 0xe1, 0x4c, 0xd9,
0x4b, 0x60, 0x0e, 0x6e, 0xb6, 0x87, 0xfd, 0x5e, 0xf6, 0x83, 0xfa, 0x29, 0xf9, 0x14, 0x43, 0x37,
0xce, 0xb9, 0x1d, 0x0d, 0xf6, 0x1c, 0xae, 0xcf, 0x71, 0x5d, 0xfe, 0xf8, 0xae, 0x1a, 0x0e, 0x18,
0x7d, 0xe8, 0xd4, 0x02, 0x45, 0xf6, 0x1a, 0x9e, 0x9c, 0x63, 0xbd, 0x1c, 0x64, 0x53, 0x6d, 0x8c,
0xd2, 0x52, 0x0f, 0xfc, 0x0a, 0xd3, 0x8f, 0x9c, 0x29, 0xd0, 0x5b, 0xa3, 0x85, 0x1d, 0x95, 0x96,
0xed, 0xc1, 0xf0, 0x99, 0xeb, 0x78, 0xa2, 0xe9, 0x0b, 0x08, 0xd6, 0x9f, 0x57, 0xf7, 0x2c, 0x82,
0xe0, 0x5d, 0xbe, 0xce, 0xe3, 0x07, 0x16, 0xad, 0x16, 0xcb, 0x79, 0x4c, 0xd8, 0x15, 0xd0, 0x22,
0x9f, 0x2f, 0xee, 0xe2, 0x5f, 0xbf, 0xfd, 0xf4, 0x0f, 0x01, 0xfa, 0xbe, 0x2f, 0xb5, 0x1c, 0x1f,
0x9c, 0x5c, 0x3e, 0xf8, 0xa9, 0x0c, 0xce, 0x10, 0x09, 0xc7, 0xd8, 0x33, 0x88, 0xec, 0x69, 0x6f,
0xc3, 0x29, 0x7c, 0x31, 0x72, 0x37, 0x64, 0x80, 0x5f, 0x71, 0x43, 0xe2, 0x38, 0xf4, 0x72, 0x1c,
0x7b, 0xaa, 0x8a, 0x87, 0x89, 0x9f, 0x51, 0xe1, 0x58, 0xfa, 0xf6, 0xbf, 0xda, 0x13, 0xf0, 0xc5,
0xfd, 0xa7, 0x98, 0x58, 0x90, 0xdf, 0x7d, 0x88, 0xbd, 0xf1, 0x47, 0x7c, 0x44, 0x1f, 0x97, 0xf3,
0x38, 0xb8, 0x9d, 0x7d, 0x81, 0x4e, 0x35, 0xb5, 0x39, 0x34, 0x8d, 0xfc, 0xb6, 0x0d, 0x71, 0xe9,
0x37, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x10, 0x46, 0x79, 0xfa, 0x68, 0x02, 0x00, 0x00,
// 407 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x92, 0xcd, 0x8e, 0xd3, 0x30,
0x14, 0x85, 0x71, 0x12, 0xa7, 0xe9, 0x9d, 0x32, 0xf2, 0x5c, 0x7e, 0x64, 0xb1, 0x8a, 0x2a, 0x21,
0xb2, 0x81, 0x05, 0x3c, 0x41, 0x66, 0x28, 0x51, 0x85, 0x5a, 0x06, 0xab, 0x9b, 0x61, 0x33, 0xca,
0x4c, 0x3c, 0x91, 0x05, 0x4e, 0xab, 0xc4, 0x15, 0x94, 0x77, 0xe2, 0x11, 0x78, 0x1d, 0x1e, 0x03,
0x21, 0xdf, 0xba, 0x95, 0x2a, 0x56, 0x3e, 0xe7, 0xde, 0x4f, 0xf5, 0xa9, 0x4f, 0x60, 0x6c, 0x87,
0xf6, 0xcd, 0xa6, 0x5f, 0xbb, 0xf5, 0xf4, 0x6f, 0x04, 0x7c, 0xb1, 0x5b, 0x0c, 0x2d, 0x9e, 0x43,
0x64, 0x1a, 0xc9, 0x72, 0x56, 0x8c, 0x55, 0x64, 0x1a, 0x44, 0x48, 0xdc, 0x6e, 0xa3, 0x65, 0x94,
0xb3, 0x82, 0x2b, 0xd2, 0xf8, 0x1c, 0x52, 0x57, 0xf7, 0xad, 0x76, 0x32, 0x26, 0x2e, 0x38, 0xcf,
0x36, 0xb5, 0xab, 0x65, 0x92, 0xb3, 0x62, 0xa2, 0x48, 0x7b, 0xb6, 0xa7, 0x3b, 0x24, 0xcf, 0x59,
0x71, 0xa1, 0x82, 0xc3, 0xa7, 0xc0, 0x6d, 0xdd, 0x9a, 0x7b, 0x99, 0xd2, 0x78, 0x6f, 0x50, 0x40,
0xfc, 0x55, 0xef, 0xe4, 0x88, 0x66, 0x5e, 0xa2, 0x84, 0x91, 0xbb, 0xdf, 0xd8, 0x75, 0xa3, 0x65,
0x46, 0x11, 0x0e, 0x16, 0x5f, 0x03, 0x06, 0x79, 0x7b, 0xb7, 0x7d, 0x78, 0xd0, 0xfd, 0x60, 0x7e,
0x6a, 0x39, 0x26, 0xe8, 0x22, 0x6c, 0x2e, 0x8f, 0x0b, 0x7c, 0x09, 0xe7, 0x07, 0xdc, 0xd6, 0x3f,
0xbe, 0x9b, 0x4e, 0x02, 0xa1, 0x8f, 0xc3, 0x74, 0x41, 0x43, 0x7c, 0x0b, 0xcf, 0x0e, 0x58, 0xaf,
0x07, 0xdd, 0x35, 0xb7, 0xce, 0x58, 0x6d, 0x07, 0x79, 0x46, 0xf4, 0x93, 0xb0, 0x54, 0xb4, 0x5b,
0xd1, 0x8a, 0x32, 0x1a, 0xab, 0xd7, 0x5b, 0x27, 0x27, 0x21, 0xe3, 0xde, 0x4e, 0x5f, 0x41, 0xb2,
0xba, 0xb9, 0x9e, 0x61, 0x06, 0xc9, 0xfb, 0x72, 0x55, 0x8a, 0x47, 0x5e, 0x5d, 0xcf, 0x97, 0x95,
0x60, 0x78, 0x06, 0x7c, 0x51, 0x56, 0xf3, 0x2b, 0xf1, 0xeb, 0x77, 0x3c, 0xfd, 0xc3, 0x80, 0x7f,
0xe8, 0x6b, 0xab, 0x8f, 0x0f, 0xce, 0x4e, 0x1f, 0x7c, 0x1f, 0x86, 0x6a, 0xc8, 0x54, 0x70, 0xf8,
0x02, 0x32, 0x7f, 0xfa, 0xdb, 0xa8, 0x8a, 0x58, 0x1d, 0x7d, 0x28, 0x32, 0xa1, 0x5f, 0x09, 0x45,
0x52, 0x39, 0xfc, 0xb4, 0x1c, 0x7f, 0x9a, 0x46, 0xa6, 0x79, 0x5c, 0x70, 0x15, 0xdc, 0xf4, 0xe6,
0xbf, 0xd8, 0x23, 0x88, 0xd5, 0xec, 0xb3, 0x60, 0x5e, 0x94, 0x57, 0x1f, 0x45, 0x74, 0xfc, 0x23,
0x31, 0xa9, 0x4f, 0xcb, 0x4a, 0x24, 0x7b, 0xaa, 0x12, 0x1c, 0x01, 0x52, 0x35, 0xab, 0x3c, 0x98,
0xe2, 0x04, 0x32, 0xaf, 0xab, 0x72, 0xbe, 0x14, 0xa3, 0xcb, 0xc9, 0x17, 0xd8, 0x98, 0xae, 0x75,
0xdb, 0xae, 0xd3, 0xdf, 0xee, 0x52, 0xfa, 0x18, 0xde, 0xfd, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x4e,
0xfd, 0x1d, 0x91, 0x8b, 0x02, 0x00, 0x00,
}

View File

@ -29,6 +29,9 @@ message Frame {
ACK = 2;
PING = 3;
PONG = 4;
REG = 5;
REGACK = 6;
REGAGAIN = 7;
}
int32 type = 1;

View File

@ -182,6 +182,36 @@ func (p *Server) RecvTCP(conn *ServerConn, id string, src *net.IPAddr) {
loggo.Info("server waiting target response %s -> %s %s", conn.tcpaddrTarget.String(), conn.id, conn.tcpconn.LocalAddr().String())
loggo.Info("start wait remote connect tcp %s %s", conn.id, conn.tcpaddrTarget.String())
startConnectTime := time.Now()
for {
if conn.fm.IsRemoteConnected() {
break
}
conn.fm.Update()
sendlist := conn.fm.getSendList()
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, _ := proto.Marshal(f)
sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb,
conn.rproto, -1, p.key,
0, 0, 0, 0,
0)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
time.Sleep(time.Millisecond * 10)
now := time.Now()
diffclose := now.Sub(startConnectTime)
if diffclose > time.Second*(time.Duration(conn.timeout)) {
loggo.Info("can not connect remote tcp %s %s", conn.id, conn.tcpaddrTarget.String())
p.Close(conn)
return
}
}
loggo.Info("remote connected tcp %s %s", conn.id, conn.tcpaddrTarget.String())
bytes := make([]byte, 10240)
tcpActiveRecvTime := time.Now()