This commit is contained in:
esrrhs 2019-10-26 19:54:49 +08:00
parent a2f8819782
commit 2ffba2224b
4 changed files with 86 additions and 39 deletions

View File

@ -41,11 +41,11 @@ Usage:
-tcp_bs tcp的发送接收缓冲区大小默认1MB -tcp_bs tcp的发送接收缓冲区大小默认1MB
Tcp send and receive buffer size, default 1MB Tcp send and receive buffer size, default 1MB
-tcp_mw tcp的最大窗口默认100 -tcp_mw tcp的最大窗口默认10000
The maximum window of tcp, the default is 100 The maximum window of tcp, the default is 100
-tcp_rst tcp的超时发送时间默认200ms -tcp_rst tcp的超时发送时间默认400ms
Tcp timeout resend time, default 200ms Tcp timeout resend time, default 400ms
` `
func main() { func main() {
@ -59,7 +59,7 @@ func main() {
tcpmode := flag.Int("tcp", 0, "tcp mode") tcpmode := flag.Int("tcp", 0, "tcp mode")
tcpmode_buffersize := flag.Int("tcp_bs", 1024*1024, "tcp mode buffer size") tcpmode_buffersize := flag.Int("tcp_bs", 1024*1024, "tcp mode buffer size")
tcpmode_maxwin := flag.Int("tcp_mw", 100, "tcp mode max win") tcpmode_maxwin := flag.Int("tcp_mw", 100, "tcp mode max win")
tcpmode_resend_timems := flag.Int("tcp_rst", 200, "tcp mode resend time ms") tcpmode_resend_timems := flag.Int("tcp_rst", 400, "tcp mode resend time ms")
flag.Usage = func() { flag.Usage = func() {
fmt.Printf(usage) fmt.Printf(usage)
} }

View File

@ -28,6 +28,9 @@ type FrameMgr struct {
close bool close bool
remoteclosed bool remoteclosed bool
closesend bool closesend bool
lastPingTime int64
rttms int
} }
func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr { func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
@ -40,7 +43,8 @@ func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
windowsize: windowsize, resend_timems: resend_timems, windowsize: windowsize, resend_timems: resend_timems,
sendwin: list.New(), sendlist: list.New(), sendid: 0, sendwin: list.New(), sendlist: list.New(), sendid: 0,
recvwin: list.New(), recvlist: list.New(), recvid: 0, recvwin: list.New(), recvlist: list.New(), recvid: 0,
close: false, remoteclosed: false, closesend: false} close: false, remoteclosed: false, closesend: false,
lastPingTime: time.Now().UnixNano(), rttms: resend_timems}
return fm return fm
} }
@ -66,6 +70,8 @@ func (fm *FrameMgr) Update() {
fm.combineWindowToRecvBuffer() fm.combineWindowToRecvBuffer()
fm.calSendList() fm.calSendList()
fm.ping()
} }
func (fm *FrameMgr) cutSendBufferToWindow() { func (fm *FrameMgr) cutSendBufferToWindow() {
@ -166,6 +172,10 @@ func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int3
} else if f.Type == (int32)(Frame_DATA) { } else if f.Type == (int32)(Frame_DATA) {
tmpackto[f.Id] = f tmpackto[f.Id] = f
loggo.Debug("recv data %d %d", f.Id, len(f.Data)) loggo.Debug("recv data %d %d", f.Id, len(f.Data))
} else if f.Type == (int32)(Frame_PING) {
fm.processPing(f)
} else if f.Type == (int32)(Frame_PONG) {
fm.processPong(f)
} }
} }
fm.recvlist.Init() fm.recvlist.Init()
@ -202,17 +212,18 @@ func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int,
Dataid: make([]int32, len(tmpackto))} Dataid: make([]int32, len(tmpackto))}
index := 0 index := 0
for id, rf := range tmpackto { for id, rf := range tmpackto {
if fm.addToRecvWin(rf) {
f.Dataid[index] = id f.Dataid[index] = id
index++ index++
fm.addToRecvWin(rf)
loggo.Debug("add data to win %d %d", rf.Id, len(rf.Data)) loggo.Debug("add data to win %d %d", rf.Id, len(rf.Data))
} }
}
fm.sendlist.PushBack(f) fm.sendlist.PushBack(f)
loggo.Debug("send ack %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ",")) loggo.Debug("send ack %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
} }
} }
func (fm *FrameMgr) addToRecvWin(rf *Frame) { func (fm *FrameMgr) addToRecvWin(rf *Frame) bool {
begin := fm.recvid begin := fm.recvid
end := fm.recvid + fm.windowsize end := fm.recvid + fm.windowsize
@ -222,14 +233,14 @@ func (fm *FrameMgr) addToRecvWin(rf *Frame) {
} }
if id > end || id < begin { if id > end || id < begin {
loggo.Debug("recv frame not in range %d %d %d", begin, end, id) loggo.Debug("recv frame not in range %d %d %d", begin, end, id)
return return false
} }
for e := fm.recvwin.Front(); e != nil; e = e.Next() { for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame) f := e.Value.(*Frame)
if f.Id == rf.Id { if f.Id == rf.Id {
loggo.Debug("recv frame ignore %d %d", f.Id, len(f.Data)) loggo.Debug("recv frame ignore %d %d", f.Id, len(f.Data))
return return true
} }
} }
@ -239,12 +250,13 @@ func (fm *FrameMgr) addToRecvWin(rf *Frame) {
if fm.compareId(rf, f) < 0 { if fm.compareId(rf, f) < 0 {
fm.recvwin.InsertBefore(rf, e) fm.recvwin.InsertBefore(rf, e)
loggo.Debug("insert recv win %d %d before %d", rf.Id, len(rf.Data), f.Id) loggo.Debug("insert recv win %d %d before %d", rf.Id, len(rf.Data), f.Id)
return return true
} }
} }
fm.recvwin.PushBack(rf) fm.recvwin.PushBack(rf)
loggo.Debug("insert recv win last %d %d", rf.Id, len(rf.Data)) loggo.Debug("insert recv win last %d %d", rf.Id, len(rf.Data))
return true
} }
func (fm *FrameMgr) compareId(lf *Frame, rf *Frame) int { func (fm *FrameMgr) compareId(lf *Frame, rf *Frame) int {
@ -300,7 +312,7 @@ func (fm *FrameMgr) combineWindowToRecvBuffer() {
id = fm.recvid id = fm.recvid
for len(reqtmp) < fm.windowsize && e != nil { for len(reqtmp) < fm.windowsize && e != nil {
f := e.Value.(*Frame) f := e.Value.(*Frame)
loggo.Debug("start add req id %d %d", f.Id, id) loggo.Debug("start add req id %d %d %d", fm.recvid, f.Id, id)
if f.Id != (int32)(id) { if f.Id != (int32)(id) {
reqtmp[id]++ reqtmp[id]++
loggo.Debug("add req id %d ", id) loggo.Debug("add req id %d ", id)
@ -353,3 +365,30 @@ func (fm *FrameMgr) Close() {
func (fm *FrameMgr) IsRemoteClosed() bool { func (fm *FrameMgr) IsRemoteClosed() bool {
return fm.remoteclosed return fm.remoteclosed
} }
func (fm *FrameMgr) ping() {
cur := time.Now().UnixNano()
if cur-fm.lastPingTime > int64(1000*1000) {
f := &Frame{Type: (int32)(Frame_PING), Resend: false, Sendtime: cur,
Id: 0}
fm.sendlist.PushBack(f)
loggo.Debug("send ping %d", cur)
fm.lastPingTime = cur
}
}
func (fm *FrameMgr) processPing(f *Frame) {
rf := &Frame{Type: (int32)(Frame_PONG), Resend: false, Sendtime: f.Sendtime,
Id: 0}
fm.sendlist.PushBack(rf)
loggo.Debug("recv ping %d", f.Sendtime)
}
func (fm *FrameMgr) processPong(f *Frame) {
cur := time.Now().UnixNano()
if cur > f.Sendtime {
rtt := (cur - f.Sendtime) / 1000
fm.rttms = (fm.rttms + (int)(rtt)) / 2
loggo.Debug("recv pong %d %d", rtt, fm.rttms)
}
}

View File

@ -54,18 +54,24 @@ const (
Frame_DATA Frame_TYPE = 0 Frame_DATA Frame_TYPE = 0
Frame_REQ Frame_TYPE = 1 Frame_REQ Frame_TYPE = 1
Frame_ACK Frame_TYPE = 2 Frame_ACK Frame_TYPE = 2
Frame_PING Frame_TYPE = 3
Frame_PONG Frame_TYPE = 4
) )
var Frame_TYPE_name = map[int32]string{ var Frame_TYPE_name = map[int32]string{
0: "DATA", 0: "DATA",
1: "REQ", 1: "REQ",
2: "ACK", 2: "ACK",
3: "PING",
4: "PONG",
} }
var Frame_TYPE_value = map[string]int32{ var Frame_TYPE_value = map[string]int32{
"DATA": 0, "DATA": 0,
"REQ": 1, "REQ": 1,
"ACK": 2, "ACK": 2,
"PING": 3,
"PONG": 4,
} }
func (x Frame_TYPE) String() string { func (x Frame_TYPE) String() string {
@ -292,29 +298,29 @@ func init() {
func init() { proto.RegisterFile("msg.proto", fileDescriptor_c06e4cca6c2cc899) } func init() { proto.RegisterFile("msg.proto", fileDescriptor_c06e4cca6c2cc899) }
var fileDescriptor_c06e4cca6c2cc899 = []byte{ var fileDescriptor_c06e4cca6c2cc899 = []byte{
// 372 bytes of a gzipped FileDescriptorProto // 383 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x51, 0xdd, 0xca, 0xd3, 0x30, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x51, 0x5d, 0xab, 0xd3, 0x30,
0x18, 0x36, 0x6d, 0xd3, 0x75, 0xef, 0x37, 0x3f, 0xba, 0xf8, 0x43, 0xf0, 0xa8, 0x14, 0xc4, 0x9e, 0x18, 0x36, 0x6d, 0xd3, 0x75, 0xef, 0x99, 0x87, 0x9e, 0xf8, 0x41, 0xf0, 0xaa, 0x14, 0xc4, 0xde,
0xe8, 0x81, 0x5e, 0x41, 0x37, 0xe7, 0x18, 0x52, 0x99, 0x61, 0x27, 0x7a, 0x32, 0xba, 0x35, 0x2b, 0xe8, 0x85, 0x82, 0xf7, 0x3d, 0xc7, 0xe3, 0x18, 0xd2, 0x39, 0xc3, 0x6e, 0xf4, 0x66, 0x74, 0x6b,
0x41, 0xd3, 0x96, 0x36, 0x43, 0xe7, 0x3d, 0x89, 0x57, 0xe0, 0xad, 0x89, 0xe4, 0x5d, 0x36, 0x18, 0x56, 0x82, 0xa6, 0x2d, 0x6d, 0x86, 0xce, 0xff, 0xe4, 0x4f, 0xf0, 0xca, 0xff, 0x25, 0x92, 0x77,
0xdf, 0x51, 0x9e, 0x3f, 0x92, 0x87, 0x3c, 0x30, 0xd6, 0x43, 0xfd, 0xa6, 0xeb, 0x5b, 0xd3, 0xa6, 0x59, 0x61, 0x78, 0x95, 0xe7, 0x8b, 0xe6, 0x69, 0x1e, 0x98, 0xea, 0xa1, 0x7e, 0xd5, 0xf5, 0xad,
0xff, 0x3c, 0xa0, 0xc5, 0xa9, 0x18, 0x6a, 0x76, 0x0f, 0x9e, 0xaa, 0x38, 0x49, 0x48, 0x36, 0x16, 0x69, 0xd3, 0xbf, 0x1e, 0xd0, 0xe2, 0x58, 0x0c, 0x35, 0xbb, 0x06, 0x4f, 0x55, 0x9c, 0x24, 0x24,
0x9e, 0xaa, 0x18, 0x83, 0xc0, 0x9c, 0x3a, 0xc9, 0xbd, 0x84, 0x64, 0x54, 0x20, 0x66, 0xcf, 0x21, 0x9b, 0x0a, 0x4f, 0x55, 0x8c, 0x41, 0x60, 0x8e, 0x9d, 0xe4, 0x5e, 0x42, 0x32, 0x2a, 0x10, 0xb3,
0x34, 0x65, 0x5f, 0x4b, 0xc3, 0x7d, 0xcc, 0x39, 0x66, 0xb3, 0x55, 0x69, 0x4a, 0x1e, 0x24, 0x24, 0xa7, 0x10, 0x9a, 0xb2, 0xaf, 0xa5, 0xe1, 0x3e, 0xe6, 0x1c, 0xb3, 0xd9, 0xaa, 0x34, 0x25, 0x0f,
0x9b, 0x08, 0xc4, 0x36, 0xdb, 0xe3, 0x1b, 0x9c, 0x26, 0x24, 0x9b, 0x0a, 0xc7, 0xd8, 0x53, 0xa0, 0x12, 0x92, 0xcd, 0x04, 0x62, 0x9b, 0xed, 0xf1, 0x0e, 0x4e, 0x13, 0x92, 0xdd, 0x08, 0xc7, 0xd8,
0xba, 0xac, 0xd5, 0x9e, 0x87, 0x28, 0x9f, 0x09, 0x8b, 0xc1, 0xff, 0x26, 0x4f, 0x7c, 0x84, 0x9a, 0x63, 0xa0, 0xba, 0xac, 0xd5, 0x8e, 0x87, 0x28, 0x9f, 0x08, 0x8b, 0xc1, 0xff, 0x2a, 0x8f, 0x7c,
0x85, 0x8c, 0xc3, 0xc8, 0xec, 0x3b, 0xdd, 0x56, 0x92, 0x47, 0x58, 0xe1, 0x42, 0xd9, 0x6b, 0x60, 0x82, 0x9a, 0x85, 0x8c, 0xc3, 0xc4, 0xec, 0x3a, 0xdd, 0x56, 0x92, 0x47, 0x58, 0xe1, 0x4c, 0xd9,
0x0e, 0x6e, 0x77, 0xc7, 0xc3, 0x41, 0xf6, 0x83, 0xfa, 0x25, 0xf9, 0x18, 0x43, 0x53, 0xe7, 0xcc, 0x4b, 0x60, 0x0e, 0x6e, 0xb6, 0x87, 0xfd, 0x5e, 0xf6, 0x83, 0xfa, 0x29, 0xf9, 0x14, 0x43, 0x37,
0xae, 0x06, 0x7b, 0x09, 0xf7, 0x97, 0xb8, 0x2e, 0x7f, 0xfe, 0x50, 0x0d, 0x07, 0x8c, 0x3e, 0x76, 0xce, 0xb9, 0x1d, 0x0d, 0xf6, 0x1c, 0xae, 0xcf, 0x71, 0x5d, 0xfe, 0xf8, 0xae, 0x1a, 0x0e, 0x18,
0x6a, 0x81, 0x22, 0x7b, 0x0b, 0xcf, 0x2e, 0xb1, 0x5e, 0x0e, 0xb2, 0xa9, 0xb6, 0x46, 0x69, 0xa9, 0x7d, 0xe8, 0xd4, 0x02, 0x45, 0xf6, 0x1a, 0x9e, 0x9c, 0x63, 0xbd, 0x1c, 0x64, 0x53, 0x6d, 0x8c,
0x07, 0x7e, 0x87, 0xe9, 0x27, 0xce, 0x14, 0xe8, 0x6d, 0xd0, 0xc2, 0x8e, 0x4a, 0xcb, 0xf6, 0x68, 0xd2, 0x52, 0x0f, 0xfc, 0x0a, 0xd3, 0x8f, 0x9c, 0x29, 0xd0, 0x5b, 0xa3, 0x85, 0x1d, 0x95, 0x96,
0xf8, 0xc4, 0x75, 0x3c, 0xd3, 0xf4, 0x15, 0x04, 0x9b, 0x2f, 0xeb, 0x05, 0x8b, 0x20, 0x78, 0x9f, 0xed, 0xc1, 0xf0, 0x99, 0xeb, 0x78, 0xa2, 0xe9, 0x0b, 0x08, 0xd6, 0x9f, 0x57, 0xf7, 0x2c, 0x82,
0x6f, 0xf2, 0xf8, 0x91, 0x45, 0xeb, 0xd5, 0xa7, 0x65, 0x4c, 0xd8, 0x1d, 0xd0, 0x22, 0x5f, 0xae, 0xe0, 0x5d, 0xbe, 0xce, 0xe3, 0x07, 0x16, 0xad, 0x16, 0xcb, 0x79, 0x4c, 0xd8, 0x15, 0xd0, 0x22,
0xe6, 0xf1, 0xef, 0xbf, 0x7e, 0xfa, 0x87, 0x00, 0xfd, 0xd0, 0x97, 0x5a, 0x5e, 0x3f, 0x9c, 0xdc, 0x9f, 0x2f, 0xee, 0xe2, 0x5f, 0xbf, 0xfd, 0xf4, 0x0f, 0x01, 0xfa, 0xbe, 0x2f, 0xb5, 0x1c, 0x1f,
0x7e, 0xf8, 0xb9, 0x0c, 0xce, 0x10, 0x09, 0xc7, 0xd8, 0x0b, 0x88, 0xec, 0x69, 0x5f, 0xc3, 0x29, 0x9c, 0x5c, 0x3e, 0xf8, 0xa9, 0x0c, 0xce, 0x10, 0x09, 0xc7, 0xd8, 0x33, 0x88, 0xec, 0x69, 0x6f,
0x7c, 0x71, 0xe5, 0x6e, 0xc8, 0x00, 0x6f, 0x71, 0x43, 0xe2, 0x38, 0xf4, 0x76, 0x1c, 0x7b, 0xaa, 0xc3, 0x29, 0x7c, 0x31, 0x72, 0x37, 0x64, 0x80, 0x5f, 0x71, 0x43, 0xe2, 0x38, 0xf4, 0x72, 0x1c,
0x8a, 0x87, 0x89, 0x9f, 0x51, 0xe1, 0x58, 0x9a, 0x3e, 0xa8, 0x3d, 0x02, 0x5f, 0x2c, 0x3e, 0xc7, 0x7b, 0xaa, 0x8a, 0x87, 0x89, 0x9f, 0x51, 0xe1, 0x58, 0xfa, 0xf6, 0xbf, 0xda, 0x13, 0xf0, 0xc5,
0xc4, 0x82, 0x7c, 0xfe, 0x31, 0xf6, 0x66, 0x93, 0xaf, 0xd0, 0xa9, 0xa6, 0x36, 0xc7, 0xa6, 0x91, 0xfd, 0xa7, 0x98, 0x58, 0x90, 0xdf, 0x7d, 0x88, 0xbd, 0xf1, 0x47, 0x7c, 0x44, 0x1f, 0x97, 0xf3,
0xdf, 0x77, 0x21, 0xae, 0xfa, 0xee, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6c, 0x65, 0x2c, 0xe9, 0x38, 0xb8, 0x9d, 0x7d, 0x81, 0x4e, 0x35, 0xb5, 0x39, 0x34, 0x8d, 0xfc, 0xb6, 0x0d, 0x71, 0xe9,
0x54, 0x02, 0x00, 0x00, 0x37, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x10, 0x46, 0x79, 0xfa, 0x68, 0x02, 0x00, 0x00,
} }

View File

@ -27,6 +27,8 @@ message Frame {
DATA = 0; DATA = 0;
REQ = 1; REQ = 1;
ACK = 2; ACK = 2;
PING = 3;
PONG = 4;
} }
int32 type = 1; int32 type = 1;