pingtunnel/framemgr.go
2019-10-26 12:01:30 +08:00

333 lines
6.3 KiB
Go

package pingtunnel
import (
"container/list"
"github.com/esrrhs/go-engine/src/rbuffergo"
"sync"
"time"
)
type FrameMgr struct {
sendb *rbuffergo.RBuffergo
recvb *rbuffergo.RBuffergo
recvlock sync.Locker
windowsize int
resend_timems int
sendwin *list.List
sendlist *list.List
sendid int
recvwin *list.List
recvlist *list.List
recvid int
close bool
remoteclosed bool
}
func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
sendb := rbuffergo.New(buffersize, false)
recvb := rbuffergo.New(buffersize, false)
fm := &FrameMgr{sendb: sendb, recvb: recvb,
recvlock: &sync.Mutex{},
windowsize: windowsize, resend_timems: resend_timems,
sendwin: list.New(), sendlist: list.New(), sendid: 0,
recvwin: list.New(), recvlist: list.New(), recvid: 0,
close: false, remoteclosed: false}
return fm
}
func (fm *FrameMgr) GetSendBufferLeft() int {
left := fm.sendb.Capacity() - fm.sendb.Size()
return left
}
func (fm *FrameMgr) WriteSendBuffer(data []byte) {
fm.sendb.Write(data)
}
func (fm *FrameMgr) Update() {
fm.cutSendBufferToWindow()
fm.sendlist.Init()
tmpreq, tmpack, tmpackto := fm.preProcessRecvList()
fm.processRecvList(tmpreq, tmpack, tmpackto)
fm.combineWindowToRecvBuffer()
fm.calSendList()
}
func (fm *FrameMgr) cutSendBufferToWindow() {
sendall := false
if fm.sendb.Size() < FRAME_MAX_SIZE {
sendall = true
}
for fm.sendb.Size() >= FRAME_MAX_SIZE && fm.sendwin.Len() < fm.windowsize {
f := &Frame{Type: (int32)(Frame_DATA), Resend: false, Sendtime: 0,
Id: (int32)(fm.sendid),
Data: make([]byte, FRAME_MAX_SIZE)}
fm.sendb.Read(f.Data)
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
fm.sendwin.PushBack(f)
}
if sendall && fm.sendb.Size() > 0 && fm.sendwin.Len() < fm.windowsize {
f := &Frame{Type: (int32)(Frame_DATA), Resend: false, Sendtime: 0,
Id: (int32)(fm.sendid),
Data: make([]byte, fm.sendb.Size())}
fm.sendb.Read(f.Data)
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
fm.sendwin.PushBack(f)
}
if fm.sendb.Empty() && fm.close {
f := &Frame{Type: (int32)(Frame_DATA), Resend: false, Sendtime: 0,
Id: (int32)(fm.sendid),
Data: make([]byte, 0)}
fm.sendwin.PushBack(f)
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
}
}
func (fm *FrameMgr) calSendList() {
cur := time.Now().UnixNano()
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Resend || cur-f.Sendtime > int64(fm.resend_timems*1000) {
f.Sendtime = cur
fm.sendlist.PushBack(f)
f.Resend = false
}
}
}
func (fm *FrameMgr) getSendList() *list.List {
return fm.sendlist
}
func (fm *FrameMgr) OnRecvFrame(f *Frame) {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
fm.recvlist.PushBack(f)
}
func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int32]*Frame) {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
tmpreq := make(map[int32]int)
tmpack := make(map[int32]int)
tmpackto := make(map[int32]*Frame)
for e := fm.recvlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Type == (int32)(Frame_REQ) {
for _, id := range f.Dataid {
tmpreq[id]++
}
} else if f.Type == (int32)(Frame_ACK) {
for _, id := range f.Dataid {
tmpack[id]++
}
} else if f.Type == (int32)(Frame_DATA) {
tmpackto[f.Id] = f
}
}
fm.recvlist.Init()
return tmpreq, tmpack, tmpackto
}
func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int, tmpackto map[int32]*Frame) {
for id, _ := range tmpreq {
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == id {
f.Resend = true
break
}
}
}
for id, _ := range tmpack {
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == id {
fm.sendwin.Remove(e)
break
}
}
}
if len(tmpackto) > 0 {
f := &Frame{Type: (int32)(Frame_ACK), Resend: false, Sendtime: 0,
Id: 0,
Dataid: make([]int32, len(tmpackto))}
index := 0
for id, rf := range tmpackto {
f.Dataid[index] = id
index++
fm.addToRecvWin(rf)
}
fm.sendlist.PushBack(f)
}
}
func (fm *FrameMgr) addToRecvWin(rf *Frame) {
begin := fm.recvid
end := fm.recvid + fm.windowsize
id := (int)(rf.Id)
if id < begin {
id += FRAME_MAX_ID
}
if id > end || id < begin {
return
}
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == rf.Id {
return
}
}
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if fm.compareId(rf, f) < 0 {
fm.recvwin.InsertBefore(rf, e)
return
}
}
fm.recvwin.PushBack(rf)
}
func (fm *FrameMgr) compareId(lf *Frame, rf *Frame) int {
l := (int)(lf.Id)
r := (int)(rf.Id)
if l < fm.recvid {
l += FRAME_MAX_ID
}
if r < fm.recvid {
r += FRAME_MAX_ID
}
return l - r
}
func (fm *FrameMgr) combineWindowToRecvBuffer() {
id := fm.recvid
for {
done := false
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == (int32)(id) {
left := fm.recvb.Capacity() - fm.recvb.Size()
if left >= len(f.Data) {
if len(f.Data) == 0 {
fm.remoteclosed = true
}
fm.recvb.Write(f.Data)
fm.recvwin.Remove(e)
done = true
break
}
}
}
if !done {
break
} else {
fm.recvid++
if fm.recvid >= FRAME_MAX_ID {
fm.recvid = 0
}
}
}
reqtmp := make(map[int]int)
e := fm.recvwin.Front()
id = fm.recvid
for len(reqtmp) < fm.windowsize && e != nil {
f := e.Value.(*Frame)
if f.Id != (int32)(id) {
reqtmp[id]++
} else {
e = e.Next()
}
id++
if id >= FRAME_MAX_ID {
id = 0
}
}
for len(reqtmp) < fm.windowsize {
reqtmp[id]++
id++
if id >= FRAME_MAX_ID {
id = 0
}
break
}
f := &Frame{Type: (int32)(Frame_REQ), Resend: false, Sendtime: 0,
Id: 0,
Dataid: make([]int32, len(reqtmp))}
index := 0
for id, _ := range reqtmp {
f.Dataid[index] = (int32)(id)
index++
}
fm.sendlist.PushBack(f)
}
func (fm *FrameMgr) GetRecvBufferSize() int {
return fm.recvb.Size()
}
func (fm *FrameMgr) GetRecvReadLineBuffer() []byte {
return fm.recvb.GetReadLineBuffer()
}
func (fm *FrameMgr) SkipRecvBuffer(size int) {
fm.recvb.SkipRead(size)
}
func (fm *FrameMgr) Close() {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
fm.close = true
}
func (fm *FrameMgr) IsRemoteClosed() bool {
return fm.remoteclosed
}