This commit is contained in:
esrrhs 2019-10-26 21:54:13 +08:00
parent b1fd10de27
commit 1222033a82

View File

@ -30,7 +30,10 @@ type FrameMgr struct {
closesend bool closesend bool
lastPingTime int64 lastPingTime int64
rttms int rttns int64
reqmap map[int32]int64
sendmap map[int32]int64
} }
func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr { func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
@ -44,7 +47,8 @@ func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
sendwin: list.New(), sendlist: list.New(), sendid: 0, sendwin: list.New(), sendlist: list.New(), sendid: 0,
recvwin: list.New(), recvlist: list.New(), recvid: 0, recvwin: list.New(), recvlist: list.New(), recvid: 0,
close: false, remoteclosed: false, closesend: false, close: false, remoteclosed: false, closesend: false,
lastPingTime: time.Now().UnixNano(), rttms: resend_timems} lastPingTime: time.Now().UnixNano(), rttns: (int64)(resend_timems * 1000),
reqmap: make(map[int32]int64), sendmap: make(map[int32]int64)}
return fm return fm
} }
@ -133,12 +137,16 @@ func (fm *FrameMgr) calSendList() {
for e := fm.sendwin.Front(); e != nil; e = e.Next() { for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame) f := e.Value.(*Frame)
if f.Resend || cur-f.Sendtime > int64(fm.resend_timems*(int)(time.Millisecond)) { if f.Resend || cur-f.Sendtime > int64(fm.resend_timems*(int)(time.Millisecond)) {
oldsend := fm.sendmap[f.Id]
if cur-oldsend > fm.rttns {
f.Sendtime = cur f.Sendtime = cur
fm.sendlist.PushBack(f) fm.sendlist.PushBack(f)
f.Resend = false f.Resend = false
fm.sendmap[f.Id] = cur
loggo.Debug("push frame to sendlist %d %d", f.Id, len(f.Data)) loggo.Debug("push frame to sendlist %d %d", f.Id, len(f.Data))
} }
} }
}
} }
func (fm *FrameMgr) getSendList() *list.List { func (fm *FrameMgr) getSendList() *list.List {
@ -201,6 +209,7 @@ func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int,
f := e.Value.(*Frame) f := e.Value.(*Frame)
if f.Id == id { if f.Id == id {
fm.sendwin.Remove(e) fm.sendwin.Remove(e)
delete(fm.sendmap, f.Id)
loggo.Debug("remove send win %d %d", f.Id, len(f.Data)) loggo.Debug("remove send win %d %d", f.Id, len(f.Data))
break break
} }
@ -275,6 +284,7 @@ func (fm *FrameMgr) combineWindowToRecvBuffer() {
} }
fm.recvb.Write(f.Data) fm.recvb.Write(f.Data)
fm.recvwin.Remove(e) fm.recvwin.Remove(e)
delete(fm.reqmap, f.Id)
done = true done = true
loggo.Debug("combined recv frame to recv buffer %d %d", f.Id, len(f.Data)) loggo.Debug("combined recv frame to recv buffer %d %d", f.Id, len(f.Data))
break break
@ -292,6 +302,7 @@ func (fm *FrameMgr) combineWindowToRecvBuffer() {
} }
} }
cur := time.Now().UnixNano()
reqtmp := make(map[int]int) reqtmp := make(map[int]int)
e := fm.recvwin.Front() e := fm.recvwin.Front()
id := fm.recvid id := fm.recvid
@ -299,8 +310,12 @@ func (fm *FrameMgr) combineWindowToRecvBuffer() {
f := e.Value.(*Frame) f := e.Value.(*Frame)
loggo.Debug("start add req id %d %d %d", fm.recvid, f.Id, id) loggo.Debug("start add req id %d %d %d", fm.recvid, f.Id, id)
if f.Id != (int32)(id) { if f.Id != (int32)(id) {
oldReq := fm.reqmap[f.Id]
if cur-oldReq > fm.rttns {
reqtmp[id]++ reqtmp[id]++
fm.reqmap[f.Id] = cur
loggo.Debug("add req id %d ", id) loggo.Debug("add req id %d ", id)
}
} else { } else {
e = e.Next() e = e.Next()
} }
@ -372,9 +387,9 @@ func (fm *FrameMgr) processPing(f *Frame) {
func (fm *FrameMgr) processPong(f *Frame) { func (fm *FrameMgr) processPong(f *Frame) {
cur := time.Now().UnixNano() cur := time.Now().UnixNano()
if cur > f.Sendtime { if cur > f.Sendtime {
rtt := (cur - f.Sendtime) / (int64)(time.Millisecond) rtt := cur - f.Sendtime
fm.rttms = (fm.rttms + (int)(rtt)) / 2 fm.rttns = (fm.rttns + rtt) / 2
loggo.Debug("recv pong %d %d", rtt, fm.rttms) loggo.Debug("recv pong %d %dms", rtt, fm.rttns/1000/1000)
} }
} }