pingtunnel/framemgr.go

687 lines
16 KiB
Go
Raw Normal View History

2019-10-19 17:36:17 +03:00
package pingtunnel
import (
2019-10-27 14:06:55 +03:00
"bytes"
"compress/zlib"
2019-10-19 17:36:17 +03:00
"container/list"
2019-10-26 13:12:55 +03:00
"github.com/esrrhs/go-engine/src/common"
"github.com/esrrhs/go-engine/src/loggo"
2019-10-19 17:36:17 +03:00
"github.com/esrrhs/go-engine/src/rbuffergo"
2019-10-27 14:06:55 +03:00
"io"
2019-10-28 06:58:01 +03:00
"strconv"
2019-10-19 17:36:17 +03:00
"sync"
"time"
)
2019-10-28 06:58:01 +03:00
type FrameStat struct {
sendDataNum int
recvDataNum int
sendReqNum int
recvReqNum int
sendAckNum int
recvAckNum int
sendDataNumsMap map[int32]int
recvDataNumsMap map[int32]int
sendReqNumsMap map[int32]int
recvReqNumsMap map[int32]int
sendAckNumsMap map[int32]int
recvAckNumsMap map[int32]int
sendping int
sendpong int
recvping int
recvpong int
}
2019-10-19 17:36:17 +03:00
type FrameMgr struct {
2019-10-23 15:36:13 +03:00
sendb *rbuffergo.RBuffergo
recvb *rbuffergo.RBuffergo
2019-10-19 17:36:17 +03:00
recvlock sync.Locker
windowsize int
resend_timems int
2019-10-27 14:06:55 +03:00
compress int
2019-10-23 15:36:13 +03:00
sendwin *list.List
sendlist *list.List
sendid int
recvwin *list.List
recvlist *list.List
recvid int
2019-10-26 07:01:30 +03:00
close bool
remoteclosed bool
2019-10-26 12:51:38 +03:00
closesend bool
2019-10-26 14:54:49 +03:00
lastPingTime int64
2019-10-26 16:54:13 +03:00
rttns int64
reqmap map[int32]int64
sendmap map[int32]int64
2019-10-27 12:17:47 +03:00
2019-10-27 13:16:57 +03:00
connected bool
2019-10-28 06:58:01 +03:00
fs *FrameStat
openstat int
lastPrintStat int64
2019-10-19 17:36:17 +03:00
}
2019-10-28 06:58:01 +03:00
func NewFrameMgr(buffersize int, windowsize int, resend_timems int, compress int, openstat int) *FrameMgr {
2019-10-19 17:36:17 +03:00
sendb := rbuffergo.New(buffersize, false)
recvb := rbuffergo.New(buffersize, false)
fm := &FrameMgr{sendb: sendb, recvb: recvb,
2019-10-23 15:36:13 +03:00
recvlock: &sync.Mutex{},
2019-10-27 14:06:55 +03:00
windowsize: windowsize, resend_timems: resend_timems, compress: compress,
2019-10-23 15:36:13 +03:00
sendwin: list.New(), sendlist: list.New(), sendid: 0,
2019-10-26 07:01:30 +03:00
recvwin: list.New(), recvlist: list.New(), recvid: 0,
2019-10-26 14:54:49 +03:00
close: false, remoteclosed: false, closesend: false,
2019-10-26 16:54:13 +03:00
lastPingTime: time.Now().UnixNano(), rttns: (int64)(resend_timems * 1000),
2019-10-27 12:17:47 +03:00
reqmap: make(map[int32]int64), sendmap: make(map[int32]int64),
2019-10-28 06:58:01 +03:00
connected: false, openstat: openstat, lastPrintStat: time.Now().UnixNano()}
if openstat > 0 {
fm.resetStat()
}
2019-10-19 17:36:17 +03:00
return fm
}
func (fm *FrameMgr) GetSendBufferLeft() int {
left := fm.sendb.Capacity() - fm.sendb.Size()
return left
}
func (fm *FrameMgr) WriteSendBuffer(data []byte) {
fm.sendb.Write(data)
2019-10-26 13:33:22 +03:00
loggo.Debug("WriteSendBuffer %d %d", fm.sendb.Size(), len(data))
2019-10-19 17:36:17 +03:00
}
func (fm *FrameMgr) Update() {
fm.cutSendBufferToWindow()
2019-10-23 15:36:13 +03:00
fm.sendlist.Init()
2019-10-24 16:57:03 +03:00
2019-10-23 15:36:13 +03:00
tmpreq, tmpack, tmpackto := fm.preProcessRecvList()
fm.processRecvList(tmpreq, tmpack, tmpackto)
2019-10-24 16:42:46 +03:00
fm.combineWindowToRecvBuffer()
2019-10-24 16:57:03 +03:00
fm.calSendList()
2019-10-26 14:54:49 +03:00
fm.ping()
2019-10-28 06:58:01 +03:00
fm.printStat()
2019-10-19 17:36:17 +03:00
}
func (fm *FrameMgr) cutSendBufferToWindow() {
sendall := false
if fm.sendb.Size() < FRAME_MAX_SIZE {
sendall = true
}
2019-10-23 15:36:13 +03:00
for fm.sendb.Size() >= FRAME_MAX_SIZE && fm.sendwin.Len() < fm.windowsize {
2019-10-27 13:16:57 +03:00
fd := &FrameData{Type: (int32)(FrameData_USER_DATA),
2019-10-22 16:04:25 +03:00
Data: make([]byte, FRAME_MAX_SIZE)}
2019-10-27 13:16:57 +03:00
fm.sendb.Read(fd.Data)
2019-10-27 14:20:35 +03:00
if fm.compress > 0 && len(fd.Data) > fm.compress {
2019-10-27 14:06:55 +03:00
newb := fm.compressData(fd.Data)
if len(newb) < len(fd.Data) {
fd.Data = newb
fd.Compress = true
}
}
2019-10-27 13:16:57 +03:00
f := &Frame{Type: (int32)(Frame_DATA),
Id: (int32)(fm.sendid),
Data: fd}
2019-10-19 17:36:17 +03:00
fm.sendid++
2019-10-24 16:42:46 +03:00
if fm.sendid >= FRAME_MAX_ID {
2019-10-19 17:36:17 +03:00
fm.sendid = 0
}
2019-10-23 15:36:13 +03:00
fm.sendwin.PushBack(f)
2019-10-26 13:33:22 +03:00
loggo.Debug("cut frame push to send win %d %d %d", f.Id, FRAME_MAX_SIZE, fm.sendwin.Len())
2019-10-19 17:36:17 +03:00
}
2019-10-23 15:36:13 +03:00
if sendall && fm.sendb.Size() > 0 && fm.sendwin.Len() < fm.windowsize {
2019-10-27 13:16:57 +03:00
fd := &FrameData{Type: (int32)(FrameData_USER_DATA),
2019-10-22 16:04:25 +03:00
Data: make([]byte, fm.sendb.Size())}
2019-10-27 13:16:57 +03:00
fm.sendb.Read(fd.Data)
2019-10-27 14:20:35 +03:00
if fm.compress > 0 && len(fd.Data) > fm.compress {
2019-10-27 14:06:55 +03:00
newb := fm.compressData(fd.Data)
if len(newb) < len(fd.Data) {
fd.Data = newb
fd.Compress = true
}
}
2019-10-27 13:16:57 +03:00
f := &Frame{Type: (int32)(Frame_DATA),
Id: (int32)(fm.sendid),
Data: fd}
2019-10-19 17:36:17 +03:00
fm.sendid++
2019-10-24 16:42:46 +03:00
if fm.sendid >= FRAME_MAX_ID {
2019-10-19 17:36:17 +03:00
fm.sendid = 0
}
2019-10-23 15:36:13 +03:00
fm.sendwin.PushBack(f)
2019-10-27 13:16:57 +03:00
loggo.Debug("cut small frame push to send win %d %d %d", f.Id, len(f.Data.Data), fm.sendwin.Len())
2019-10-19 17:36:17 +03:00
}
2019-10-26 07:01:30 +03:00
2019-10-26 13:12:55 +03:00
if fm.sendb.Empty() && fm.close && !fm.closesend && fm.sendwin.Len() < fm.windowsize {
2019-10-27 13:16:57 +03:00
fd := &FrameData{Type: (int32)(FrameData_CLOSE)}
f := &Frame{Type: (int32)(Frame_DATA),
2019-10-26 07:01:30 +03:00
Id: (int32)(fm.sendid),
2019-10-27 13:16:57 +03:00
Data: fd}
2019-10-26 07:01:30 +03:00
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
2019-10-26 12:51:38 +03:00
2019-10-27 13:16:57 +03:00
fm.sendwin.PushBack(f)
2019-10-26 12:51:38 +03:00
fm.closesend = true
2019-10-26 13:12:55 +03:00
loggo.Debug("close frame push to send win %d %d", f.Id, fm.sendwin.Len())
2019-10-26 07:01:30 +03:00
}
2019-10-19 17:36:17 +03:00
}
func (fm *FrameMgr) calSendList() {
2019-10-27 12:17:47 +03:00
2019-10-19 17:36:17 +03:00
cur := time.Now().UnixNano()
2019-10-23 15:36:13 +03:00
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
2019-10-26 15:01:51 +03:00
if f.Resend || cur-f.Sendtime > int64(fm.resend_timems*(int)(time.Millisecond)) {
2019-10-26 16:54:13 +03:00
oldsend := fm.sendmap[f.Id]
if cur-oldsend > fm.rttns {
f.Sendtime = cur
fm.sendlist.PushBack(f)
f.Resend = false
fm.sendmap[f.Id] = cur
2019-10-28 06:58:01 +03:00
if fm.openstat > 0 {
fm.fs.sendDataNum++
fm.fs.sendDataNumsMap[f.Id]++
}
2019-10-27 13:16:57 +03:00
loggo.Debug("push frame to sendlist %d %d", f.Id, len(f.Data.Data))
2019-10-26 16:54:13 +03:00
}
2019-10-19 17:36:17 +03:00
}
}
}
func (fm *FrameMgr) getSendList() *list.List {
return fm.sendlist
}
2019-10-23 15:36:13 +03:00
func (fm *FrameMgr) OnRecvFrame(f *Frame) {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
fm.recvlist.PushBack(f)
}
func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int32]*Frame) {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
tmpreq := make(map[int32]int)
tmpack := make(map[int32]int)
tmpackto := make(map[int32]*Frame)
for e := fm.recvlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Type == (int32)(Frame_REQ) {
for _, id := range f.Dataid {
tmpreq[id]++
2019-10-26 13:33:22 +03:00
loggo.Debug("recv req %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
2019-10-23 15:36:13 +03:00
}
} else if f.Type == (int32)(Frame_ACK) {
for _, id := range f.Dataid {
tmpack[id]++
2019-10-26 13:33:22 +03:00
loggo.Debug("recv ack %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
2019-10-23 15:36:13 +03:00
}
} else if f.Type == (int32)(Frame_DATA) {
tmpackto[f.Id] = f
2019-10-28 06:58:01 +03:00
if fm.openstat > 0 {
fm.fs.recvDataNum++
fm.fs.recvDataNumsMap[f.Id]++
}
2019-10-27 13:16:57 +03:00
loggo.Debug("recv data %d %d", f.Id, len(f.Data.Data))
2019-10-26 14:54:49 +03:00
} else if f.Type == (int32)(Frame_PING) {
fm.processPing(f)
} else if f.Type == (int32)(Frame_PONG) {
fm.processPong(f)
2019-10-27 13:16:57 +03:00
} else {
2019-10-27 14:20:35 +03:00
loggo.Error("error frame type %d", f.Type)
2019-10-23 15:36:13 +03:00
}
}
fm.recvlist.Init()
return tmpreq, tmpack, tmpackto
}
func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int, tmpackto map[int32]*Frame) {
2019-10-28 06:58:01 +03:00
for id, num := range tmpreq {
2019-10-23 15:36:13 +03:00
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == id {
f.Resend = true
2019-10-27 13:16:57 +03:00
loggo.Debug("choose resend win %d %d", f.Id, len(f.Data.Data))
2019-10-23 15:36:13 +03:00
break
}
}
2019-10-28 06:58:01 +03:00
if fm.openstat > 0 {
fm.fs.recvReqNum += num
fm.fs.recvReqNumsMap[id] += num
}
2019-10-23 15:36:13 +03:00
}
2019-10-28 06:58:01 +03:00
for id, num := range tmpack {
2019-10-23 15:36:13 +03:00
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == id {
fm.sendwin.Remove(e)
2019-10-26 16:54:13 +03:00
delete(fm.sendmap, f.Id)
2019-10-27 13:16:57 +03:00
loggo.Debug("remove send win %d %d", f.Id, len(f.Data.Data))
2019-10-23 15:36:13 +03:00
break
}
}
2019-10-28 06:58:01 +03:00
if fm.openstat > 0 {
fm.fs.recvAckNum += num
fm.fs.recvAckNumsMap[id] += num
}
2019-10-23 15:36:13 +03:00
}
if len(tmpackto) > 0 {
2019-10-26 16:00:15 +03:00
tmp := make([]int32, len(tmpackto))
2019-10-23 15:36:13 +03:00
index := 0
for id, rf := range tmpackto {
2019-10-26 14:54:49 +03:00
if fm.addToRecvWin(rf) {
2019-10-26 16:00:15 +03:00
tmp[index] = id
2019-10-26 14:54:49 +03:00
index++
2019-10-28 06:58:01 +03:00
if fm.openstat > 0 {
fm.fs.sendAckNum++
fm.fs.sendAckNumsMap[id]++
}
2019-10-27 13:16:57 +03:00
loggo.Debug("add data to win %d %d", rf.Id, len(rf.Data.Data))
2019-10-26 14:54:49 +03:00
}
2019-10-23 15:36:13 +03:00
}
2019-10-26 16:00:15 +03:00
if index > 0 {
f := &Frame{Type: (int32)(Frame_ACK), Resend: false, Sendtime: 0,
Id: 0,
Dataid: tmp[0:index]}
fm.sendlist.PushBack(f)
loggo.Debug("send ack %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
}
2019-10-23 15:36:13 +03:00
}
}
2019-10-26 14:54:49 +03:00
func (fm *FrameMgr) addToRecvWin(rf *Frame) bool {
2019-10-23 15:36:13 +03:00
2019-10-26 15:39:41 +03:00
if !fm.isIdInRange((int)(rf.Id), FRAME_MAX_ID) {
loggo.Debug("recv frame not in range %d %d", rf.Id, fm.recvid)
2019-10-26 16:26:19 +03:00
if fm.isIdOld((int)(rf.Id), FRAME_MAX_ID) {
2019-10-26 16:22:14 +03:00
return true
}
2019-10-26 14:54:49 +03:00
return false
2019-10-24 16:42:46 +03:00
}
2019-10-23 15:36:13 +03:00
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == rf.Id {
2019-10-27 13:16:57 +03:00
loggo.Debug("recv frame ignore %d %d", f.Id, len(f.Data.Data))
2019-10-26 14:54:49 +03:00
return true
2019-10-23 15:36:13 +03:00
}
}
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
2019-10-26 13:12:55 +03:00
loggo.Debug("start insert recv win %d %d %d", fm.recvid, rf.Id, f.Id)
2019-10-26 15:39:41 +03:00
if fm.compareId((int)(rf.Id), (int)(f.Id)) < 0 {
2019-10-23 15:36:13 +03:00
fm.recvwin.InsertBefore(rf, e)
2019-10-27 13:16:57 +03:00
loggo.Debug("insert recv win %d %d before %d", rf.Id, len(rf.Data.Data), f.Id)
2019-10-26 14:54:49 +03:00
return true
2019-10-23 15:36:13 +03:00
}
}
2019-10-24 16:42:46 +03:00
fm.recvwin.PushBack(rf)
2019-10-27 13:16:57 +03:00
loggo.Debug("insert recv win last %d %d", rf.Id, len(rf.Data.Data))
2019-10-26 14:54:49 +03:00
return true
2019-10-24 16:42:46 +03:00
}
2019-10-27 13:16:57 +03:00
func (fm *FrameMgr) processRecvFrame(f *Frame) bool {
if f.Data.Type == (int32)(FrameData_USER_DATA) {
left := fm.recvb.Capacity() - fm.recvb.Size()
if left >= len(f.Data.Data) {
2019-10-27 14:06:55 +03:00
src := f.Data.Data
if f.Data.Compress {
err, old := fm.deCompressData(src)
if err != nil {
loggo.Error("recv frame deCompressData error %d", f.Id)
return false
}
if left < len(old) {
return false
}
loggo.Debug("deCompressData recv frame %d %d %d",
f.Id, len(src), len(old))
src = old
}
fm.recvb.Write(src)
2019-10-27 13:16:57 +03:00
loggo.Debug("combined recv frame to recv buffer %d %d",
2019-10-27 14:06:55 +03:00
f.Id, len(src))
2019-10-27 13:16:57 +03:00
return true
}
2019-10-27 14:06:55 +03:00
return false
2019-10-27 13:16:57 +03:00
} else if f.Data.Type == (int32)(FrameData_CLOSE) {
fm.remoteclosed = true
loggo.Debug("recv remote close frame %d", f.Id)
return true
} else if f.Data.Type == (int32)(FrameData_CONN) {
fm.sendConnectRsp()
fm.connected = true
loggo.Debug("recv remote conn frame %d", f.Id)
return true
} else if f.Data.Type == (int32)(FrameData_CONNRSP) {
fm.connected = true
loggo.Debug("recv remote conn rsp frame %d", f.Id)
return true
} else {
loggo.Error("recv frame type error %d", f.Data.Type)
return false
2019-10-27 12:17:47 +03:00
}
2019-10-27 13:16:57 +03:00
}
func (fm *FrameMgr) combineWindowToRecvBuffer() {
2019-10-27 12:17:47 +03:00
2019-10-24 16:42:46 +03:00
for {
done := false
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
2019-10-26 16:00:15 +03:00
if f.Id == (int32)(fm.recvid) {
2019-10-27 13:16:57 +03:00
delete(fm.reqmap, f.Id)
if fm.processRecvFrame(f) {
2019-10-24 16:42:46 +03:00
fm.recvwin.Remove(e)
done = true
2019-10-27 13:16:57 +03:00
loggo.Debug("process recv frame ok %d %d",
f.Id, len(f.Data.Data))
2019-10-24 16:42:46 +03:00
break
}
}
}
if !done {
break
} else {
fm.recvid++
if fm.recvid >= FRAME_MAX_ID {
fm.recvid = 0
}
2019-10-26 13:12:55 +03:00
loggo.Debug("combined ok add recvid %d ", fm.recvid)
2019-10-24 16:42:46 +03:00
}
}
2019-10-24 16:57:03 +03:00
2019-10-26 16:54:13 +03:00
cur := time.Now().UnixNano()
2019-10-24 16:57:03 +03:00
reqtmp := make(map[int]int)
e := fm.recvwin.Front()
2019-10-26 16:00:15 +03:00
id := fm.recvid
2019-10-26 15:39:41 +03:00
for len(reqtmp) < fm.windowsize && len(reqtmp)*4 < FRAME_MAX_SIZE/2 && e != nil {
2019-10-24 16:57:03 +03:00
f := e.Value.(*Frame)
2019-10-26 14:54:49 +03:00
loggo.Debug("start add req id %d %d %d", fm.recvid, f.Id, id)
2019-10-24 16:57:03 +03:00
if f.Id != (int32)(id) {
2019-10-26 16:54:13 +03:00
oldReq := fm.reqmap[f.Id]
if cur-oldReq > fm.rttns {
reqtmp[id]++
fm.reqmap[f.Id] = cur
loggo.Debug("add req id %d ", id)
}
2019-10-24 16:57:03 +03:00
} else {
e = e.Next()
}
id++
2019-10-26 07:01:30 +03:00
if id >= FRAME_MAX_ID {
id = 0
}
}
2019-10-26 12:51:38 +03:00
if len(reqtmp) > 0 {
f := &Frame{Type: (int32)(Frame_REQ), Resend: false, Sendtime: 0,
Id: 0,
Dataid: make([]int32, len(reqtmp))}
index := 0
for id, _ := range reqtmp {
f.Dataid[index] = (int32)(id)
index++
2019-10-28 06:58:01 +03:00
if fm.openstat > 0 {
fm.fs.sendReqNum++
fm.fs.sendReqNumsMap[(int32)(id)]++
}
2019-10-26 12:51:38 +03:00
}
fm.sendlist.PushBack(f)
2019-10-26 13:33:22 +03:00
loggo.Debug("send req %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
2019-10-24 16:57:03 +03:00
}
2019-10-24 16:42:46 +03:00
}
func (fm *FrameMgr) GetRecvBufferSize() int {
return fm.recvb.Size()
}
func (fm *FrameMgr) GetRecvReadLineBuffer() []byte {
2019-10-26 13:33:22 +03:00
ret := fm.recvb.GetReadLineBuffer()
loggo.Debug("GetRecvReadLineBuffer %d %d", fm.recvb.Size(), len(ret))
return ret
2019-10-24 16:42:46 +03:00
}
func (fm *FrameMgr) SkipRecvBuffer(size int) {
fm.recvb.SkipRead(size)
2019-10-26 13:33:22 +03:00
loggo.Debug("SkipRead %d %d", fm.recvb.Size(), size)
2019-10-23 15:36:13 +03:00
}
2019-10-26 07:01:30 +03:00
func (fm *FrameMgr) Close() {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
fm.close = true
}
func (fm *FrameMgr) IsRemoteClosed() bool {
return fm.remoteclosed
}
2019-10-26 14:54:49 +03:00
func (fm *FrameMgr) ping() {
cur := time.Now().UnixNano()
2019-10-26 15:01:51 +03:00
if cur-fm.lastPingTime > (int64)(time.Second) {
2019-10-28 06:58:01 +03:00
fm.lastPingTime = cur
2019-10-26 14:54:49 +03:00
f := &Frame{Type: (int32)(Frame_PING), Resend: false, Sendtime: cur,
Id: 0}
fm.sendlist.PushBack(f)
loggo.Debug("send ping %d", cur)
2019-10-28 06:58:01 +03:00
if fm.openstat > 0 {
fm.fs.sendping++
}
2019-10-26 14:54:49 +03:00
}
}
func (fm *FrameMgr) processPing(f *Frame) {
rf := &Frame{Type: (int32)(Frame_PONG), Resend: false, Sendtime: f.Sendtime,
Id: 0}
fm.sendlist.PushBack(rf)
2019-10-28 06:58:01 +03:00
if fm.openstat > 0 {
fm.fs.recvping++
fm.fs.sendpong++
}
2019-10-26 14:54:49 +03:00
loggo.Debug("recv ping %d", f.Sendtime)
}
func (fm *FrameMgr) processPong(f *Frame) {
cur := time.Now().UnixNano()
if cur > f.Sendtime {
2019-10-26 16:54:13 +03:00
rtt := cur - f.Sendtime
fm.rttns = (fm.rttns + rtt) / 2
2019-10-28 06:58:01 +03:00
if fm.openstat > 0 {
fm.fs.recvpong++
}
2019-10-26 16:54:13 +03:00
loggo.Debug("recv pong %d %dms", rtt, fm.rttns/1000/1000)
2019-10-26 14:54:49 +03:00
}
}
2019-10-26 15:39:41 +03:00
func (fm *FrameMgr) isIdInRange(id int, maxid int) bool {
begin := fm.recvid
end := fm.recvid + fm.windowsize
if end >= maxid {
if id >= 0 && id < end-maxid {
return true
}
end = maxid
}
if id >= begin && id < end {
return true
}
return false
}
2019-10-26 16:22:14 +03:00
func (fm *FrameMgr) compareId(l int, r int) int {
if l < fm.recvid {
l += FRAME_MAX_ID
}
if r < fm.recvid {
r += FRAME_MAX_ID
}
return l - r
}
func (fm *FrameMgr) isIdOld(id int, maxid int) bool {
if id > fm.recvid {
return false
}
end := fm.recvid + fm.windowsize*2
if end >= maxid {
if id >= end-maxid && id < fm.recvid {
return true
}
} else {
if id < fm.recvid {
return true
}
}
return false
}
2019-10-27 12:17:47 +03:00
2019-10-27 13:16:57 +03:00
func (fm *FrameMgr) IsConnected() bool {
return fm.connected
2019-10-27 12:17:47 +03:00
}
2019-10-27 13:16:57 +03:00
func (fm *FrameMgr) Connect() {
fd := &FrameData{Type: (int32)(FrameData_CONN)}
f := &Frame{Type: (int32)(Frame_DATA),
Id: (int32)(fm.sendid),
Data: fd}
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
2019-10-27 12:17:47 +03:00
}
2019-10-27 13:16:57 +03:00
fm.sendwin.PushBack(f)
loggo.Debug("start connect")
2019-10-27 12:17:47 +03:00
}
2019-10-27 13:16:57 +03:00
func (fm *FrameMgr) sendConnectRsp() {
fd := &FrameData{Type: (int32)(FrameData_CONNRSP)}
f := &Frame{Type: (int32)(Frame_DATA),
Id: (int32)(fm.sendid),
Data: fd}
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
2019-10-27 12:17:47 +03:00
2019-10-27 13:16:57 +03:00
fm.sendwin.PushBack(f)
loggo.Debug("send connect rsp")
2019-10-27 12:17:47 +03:00
}
2019-10-27 14:06:55 +03:00
func (fm *FrameMgr) compressData(src []byte) []byte {
var b bytes.Buffer
w := zlib.NewWriter(&b)
w.Write(src)
2019-10-27 14:20:35 +03:00
w.Close()
2019-10-27 14:06:55 +03:00
return b.Bytes()
}
func (fm *FrameMgr) deCompressData(src []byte) (error, []byte) {
b := bytes.NewReader(src)
r, err := zlib.NewReader(b)
if err != nil {
return err, nil
}
var out bytes.Buffer
io.Copy(&out, r)
r.Close()
return nil, out.Bytes()
}
2019-10-28 06:58:01 +03:00
func (fm *FrameMgr) resetStat() {
fm.fs = &FrameStat{}
fm.fs.sendDataNumsMap = make(map[int32]int)
fm.fs.recvDataNumsMap = make(map[int32]int)
fm.fs.sendReqNumsMap = make(map[int32]int)
fm.fs.recvReqNumsMap = make(map[int32]int)
fm.fs.sendAckNumsMap = make(map[int32]int)
fm.fs.recvAckNumsMap = make(map[int32]int)
}
func (fm *FrameMgr) printStat() {
if fm.openstat > 0 {
cur := time.Now().UnixNano()
if cur-fm.lastPrintStat > (int64)(time.Second) {
fm.lastPrintStat = cur
fs := fm.fs
loggo.Info("\nsendDataNum %d\nrecvDataNum %d\nsendReqNum %d\nrecvReqNum %d\nsendAckNum %d\nrecvAckNum %d\n"+
"sendDataNumsMap %s\nrecvDataNumsMap %s\nsendReqNumsMap %s\nrecvReqNumsMap %s\nsendAckNumsMap %s\nrecvAckNumsMap %s\n"+
2019-10-28 07:29:56 +03:00
"sendping %d\nrecvping %d\nsendpong %d\nrecvpong %d\n"+
"sendwin %d\nrecvwin %d\n",
2019-10-28 06:58:01 +03:00
fs.sendDataNum, fs.recvDataNum,
fs.sendReqNum, fs.recvReqNum,
fs.sendAckNum, fs.recvAckNum,
fm.printStatMap(&fs.sendDataNumsMap), fm.printStatMap(&fs.recvDataNumsMap),
fm.printStatMap(&fs.sendReqNumsMap), fm.printStatMap(&fs.recvReqNumsMap),
fm.printStatMap(&fs.sendAckNumsMap), fm.printStatMap(&fs.recvAckNumsMap),
fs.sendping, fs.recvping,
2019-10-28 07:29:56 +03:00
fs.sendpong, fs.recvpong,
fm.sendwin.Len(), fm.recvwin.Len())
2019-10-28 06:58:01 +03:00
fm.resetStat()
}
}
}
func (fm *FrameMgr) printStatMap(m *map[int32]int) string {
tmp := make(map[int]int)
for _, v := range *m {
tmp[v]++
}
max := 0
for k, _ := range tmp {
if k > max {
max = k
}
}
var ret string
for i := 1; i <= max; i++ {
ret += strconv.Itoa(i) + "->" + strconv.Itoa(tmp[i]) + ","
}
if len(ret) <= 0 {
ret = "none"
}
return ret
}