This commit is contained in:
esrrhs 2019-10-24 21:42:46 +08:00
parent a2f409453b
commit 0c41f5e7d3
5 changed files with 145 additions and 50 deletions

View File

@ -186,16 +186,12 @@ func (p *Client) AcceptTcp() error {
conn, err := p.tcplistenConn.AcceptTCP() conn, err := p.tcplistenConn.AcceptTCP()
if err != nil { if err != nil {
if neterr, ok := err.(*net.OpError); ok { nerr, ok := err.(net.Error)
if neterr.Timeout() { if !ok || !nerr.Timeout() {
// Read timeout
continue
} else {
loggo.Error("Error accept tcp %s", err) loggo.Error("Error accept tcp %s", err)
continue continue
} }
} }
}
go p.AcceptTcpConn(conn) go p.AcceptTcpConn(conn)
} }
@ -209,6 +205,7 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
fm := NewFrameMgr(p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems) fm := NewFrameMgr(p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems)
now := time.Now()
clientConn := &ClientConn{tcpaddr: tcpsrcaddr, id: uuid, activeTime: now, close: false, clientConn := &ClientConn{tcpaddr: tcpsrcaddr, id: uuid, activeTime: now, close: false,
fm: fm} fm: fm}
p.localAddrToConnMap[tcpsrcaddr.String()] = clientConn p.localAddrToConnMap[tcpsrcaddr.String()] = clientConn
@ -223,16 +220,12 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, err := conn.Read(bytes) n, err := conn.Read(bytes)
if err != nil { if err != nil {
if neterr, ok := err.(*net.OpError); ok { nerr, ok := err.(net.Error)
if neterr.Timeout() { if !ok || !nerr.Timeout() {
// Read timeout
n = 0
} else {
loggo.Error("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err) loggo.Error("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
break break
} }
} }
}
if n > 0 { if n > 0 {
clientConn.fm.WriteSendBuffer(bytes[:n]) clientConn.fm.WriteSendBuffer(bytes[:n])
} }
@ -280,15 +273,14 @@ func (p *Client) Accept() error {
p.listenConn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) p.listenConn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, srcaddr, err := p.listenConn.ReadFromUDP(bytes) n, srcaddr, err := p.listenConn.ReadFromUDP(bytes)
if err != nil { if err != nil {
if neterr, ok := err.(*net.OpError); ok { nerr, ok := err.(net.Error)
if neterr.Timeout() { if !ok || !nerr.Timeout() {
// Read timeout
continue
} else {
loggo.Error("Error read udp %s", err) loggo.Error("Error read udp %s", err)
continue continue
} }
} }
if n <= 0 {
continue
} }
now := time.Now() now := time.Now()

View File

@ -55,6 +55,8 @@ func (fm *FrameMgr) Update() {
fm.processRecvList(tmpreq, tmpack, tmpackto) fm.processRecvList(tmpreq, tmpack, tmpackto)
fm.calSendList() fm.calSendList()
fm.combineWindowToRecvBuffer()
} }
func (fm *FrameMgr) cutSendBufferToWindow() { func (fm *FrameMgr) cutSendBufferToWindow() {
@ -72,7 +74,7 @@ func (fm *FrameMgr) cutSendBufferToWindow() {
fm.sendb.Read(f.Data) fm.sendb.Read(f.Data)
fm.sendid++ fm.sendid++
if fm.sendid > FRAME_MAX_ID { if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0 fm.sendid = 0
} }
@ -86,7 +88,7 @@ func (fm *FrameMgr) cutSendBufferToWindow() {
fm.sendb.Read(f.Data) fm.sendb.Read(f.Data)
fm.sendid++ fm.sendid++
if fm.sendid > FRAME_MAX_ID { if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0 fm.sendid = 0
} }
@ -180,6 +182,16 @@ func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int,
func (fm *FrameMgr) addToRecvWin(rf *Frame) { func (fm *FrameMgr) addToRecvWin(rf *Frame) {
begin := fm.recvid
end := fm.recvid + fm.windowsize
id := (int)(rf.Id)
if id < begin {
id += FRAME_MAX_ID
}
if id > end || id < begin {
return
}
for e := fm.recvwin.Front(); e != nil; e = e.Next() { for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame) f := e.Value.(*Frame)
if f.Id == rf.Id { if f.Id == rf.Id {
@ -189,15 +201,66 @@ func (fm *FrameMgr) addToRecvWin(rf *Frame) {
for e := fm.recvwin.Front(); e != nil; e = e.Next() { for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame) f := e.Value.(*Frame)
if rf.Id > (int32)(fm.recvid) && rf.Id < f.Id { if fm.compareId(rf, f) < 0 {
fm.recvwin.InsertBefore(rf, e) fm.recvwin.InsertBefore(rf, e)
return return
} }
} }
if fm.recvwin.Len() > 0 {
fm.recvwin.PushBack(rf) fm.recvwin.PushBack(rf)
}
func (fm *FrameMgr) compareId(lf *Frame, rf *Frame) int {
l := (int)(lf.Id)
r := (int)(rf.Id)
if l < fm.recvid {
l += FRAME_MAX_ID
}
if r < fm.recvid {
r += FRAME_MAX_ID
}
return l - r
}
func (fm *FrameMgr) combineWindowToRecvBuffer() {
id := fm.recvid
for {
done := false
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == (int32)(id) {
left := fm.recvb.Capacity() - fm.recvb.Size()
if left >= len(f.Data) {
fm.recvb.Write(f.Data)
fm.recvwin.Remove(e)
done = true
break
}
}
}
if !done {
break
} else { } else {
fm.recvwin.PushBack(rf) fm.recvid++
if fm.recvid >= FRAME_MAX_ID {
fm.recvid = 0
}
}
} }
} }
func (fm *FrameMgr) GetRecvBufferSize() int {
return fm.recvb.Size()
}
func (fm *FrameMgr) GetRecvReadLineBuffer() []byte {
return fm.recvb.GetReadLineBuffer()
}
func (fm *FrameMgr) SkipRecvBuffer(size int) {
fm.recvb.SkipRead(size)
}

View File

@ -136,5 +136,5 @@ func GetMd5String(s string) string {
const ( const (
FRAME_MAX_SIZE int = 888 FRAME_MAX_SIZE int = 888
FRAME_MAX_ID int = 999 FRAME_MAX_ID int = 10000
) )

View File

@ -6,7 +6,7 @@ import (
"testing" "testing"
) )
func Test0001(test *testing.T) { func Test0001(t *testing.T) {
my := &MyMsg{} my := &MyMsg{}
my.Id = "12345" my.Id = "12345"
@ -22,4 +22,32 @@ func Test0001(test *testing.T) {
proto.Unmarshal(dst[0:4], my1) proto.Unmarshal(dst[0:4], my1)
fmt.Println("my1 = ", my1) fmt.Println("my1 = ", my1)
fm := FrameMgr{}
fm.recvid = 0
fm.windowsize = 100
lr := &Frame{}
rr := &Frame{}
lr.Id = 1
rr.Id = 2
fmt.Println("fm.compareId(lr, rr) = ", fm.compareId(lr, rr))
lr.Id = 99
rr.Id = 8
fmt.Println("fm.compareId(lr, rr) = ", fm.compareId(lr, rr))
fm.recvid = 9000
lr.Id = 9998
rr.Id = 9999
fmt.Println("fm.compareId(lr, rr) = ", fm.compareId(lr, rr))
fm.recvid = 9000
lr.Id = 9998
rr.Id = 8
fmt.Println("fm.compareId(lr, rr) = ", fm.compareId(lr, rr))
fm.recvid = 0
lr.Id = 9998
rr.Id = 8
fmt.Println("fm.compareId(lr, rr) = ", fm.compareId(lr, rr))
} }

View File

@ -188,16 +188,12 @@ func (p *Server) RecvTCP(conn *ServerConn, id string, src *net.IPAddr) {
conn.tcpconn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) conn.tcpconn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, err := conn.tcpconn.Read(bytes) n, err := conn.tcpconn.Read(bytes)
if err != nil { if err != nil {
if neterr, ok := err.(*net.OpError); ok { nerr, ok := err.(net.Error)
if neterr.Timeout() { if !ok || !nerr.Timeout() {
// Read timeout
n = 0
} else {
loggo.Error("Error read tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) loggo.Error("Error read tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err)
break break
} }
} }
}
if n > 0 { if n > 0 {
conn.fm.WriteSendBuffer(bytes[:n]) conn.fm.WriteSendBuffer(bytes[:n])
} }
@ -226,7 +222,27 @@ func (p *Server) RecvTCP(conn *ServerConn, id string, src *net.IPAddr) {
p.sendPacket++ p.sendPacket++
p.sendPacketSize += (uint64)(len(mb)) p.sendPacketSize += (uint64)(len(mb))
} }
if conn.fm.GetRecvBufferSize() > 0 {
rr := conn.fm.GetRecvReadLineBuffer()
conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100))
n, err := conn.tcpconn.Write(rr)
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Error("Error write tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err)
break
} }
}
if n > 0 {
conn.fm.SkipRecvBuffer(n)
}
}
}
loggo.Info("close tcp conn %s %s", conn.id, conn.tcpaddrTarget.String())
p.Close(conn)
} }
func (p *Server) Recv(conn *ServerConn, id string, src *net.IPAddr) { func (p *Server) Recv(conn *ServerConn, id string, src *net.IPAddr) {
@ -239,17 +255,13 @@ func (p *Server) Recv(conn *ServerConn, id string, src *net.IPAddr) {
conn.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) conn.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, _, err := conn.conn.ReadFromUDP(bytes) n, _, err := conn.conn.ReadFromUDP(bytes)
if err != nil { if err != nil {
if neterr, ok := err.(*net.OpError); ok { nerr, ok := err.(net.Error)
if neterr.Timeout() { if !ok || !nerr.Timeout() {
// Read timeout
continue
} else {
loggo.Error("ReadFromUDP Error read udp %s", err) loggo.Error("ReadFromUDP Error read udp %s", err)
conn.close = true conn.close = true
return return
} }
} }
}
now := time.Now() now := time.Now()
conn.activeTime = now conn.activeTime = now