This commit is contained in:
esrrhs 2019-10-26 12:01:30 +08:00
parent ff3daaed0f
commit a13cd98e29
8 changed files with 265 additions and 103 deletions

View File

@ -1,6 +1,7 @@
package pingtunnel
import (
"github.com/esrrhs/go-engine/src/common"
"github.com/esrrhs/go-engine/src/loggo"
"github.com/golang/protobuf/proto"
"golang.org/x/net/icmp"
@ -189,7 +190,7 @@ func (p *Client) AcceptTcp() error {
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Error("Error accept tcp %s", err)
loggo.Info("Error accept tcp %s", err)
continue
}
}
@ -217,15 +218,21 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
bytes := make([]byte, 10240)
tcpActiveRecvTime := time.Now()
tcpActiveSendTime := time.Now()
for {
left := clientConn.fm.GetSendBufferLeft()
if left >= len(bytes) {
now := time.Now()
left := common.MinOfInt(clientConn.fm.GetSendBufferLeft(), len(bytes))
if left > 0 {
conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, err := conn.Read(bytes)
n, err := conn.Read(bytes[0:left])
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Error("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
loggo.Info("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
clientConn.fm.Close()
break
}
}
@ -237,38 +244,33 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
clientConn.fm.Update()
sendlist := clientConn.fm.getSendList()
now := time.Now()
if sendlist.Len() > 0 {
clientConn.activeSendTime = now
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, err := proto.Marshal(f)
if err != nil {
loggo.Error("Error tcp Marshal %s %s %s", uuid, tcpsrcaddr.String(), err)
continue
}
p.sequence++
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
}
if clientConn.fm.GetRecvBufferSize() > 0 {
rr := clientConn.fm.GetRecvReadLineBuffer()
conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100))
n, err := conn.Write(rr)
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Error("Error write tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
loggo.Info("Error write tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
clientConn.fm.Close()
break
}
}
@ -279,10 +281,66 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) {
diffrecv := now.Sub(clientConn.activeRecvTime)
diffsend := now.Sub(clientConn.activeSendTime)
if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) {
tcpdiffrecv := now.Sub(tcpActiveRecvTime)
tcpdiffsend := now.Sub(tcpActiveSendTime)
if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) ||
tcpdiffrecv > time.Second*(time.Duration(p.timeout)) || tcpdiffsend > time.Second*(time.Duration(p.timeout)) {
loggo.Info("close inactive conn %s %s", clientConn.id, clientConn.tcpaddr.String())
clientConn.fm.Close()
break
}
if clientConn.fm.IsRemoteClosed() {
loggo.Info("closed by remote conn %s %s", clientConn.id, clientConn.tcpaddr.String())
clientConn.fm.Close()
break
}
}
startCloseTime := time.Now()
for {
now := time.Now()
clientConn.fm.Update()
sendlist := clientConn.fm.getSendList()
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, _ := proto.Marshal(f)
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
nodatarecv := true
if clientConn.fm.GetRecvBufferSize() > 0 {
rr := clientConn.fm.GetRecvReadLineBuffer()
conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100))
n, _ := conn.Write(rr)
if n > 0 {
clientConn.fm.SkipRecvBuffer(n)
nodatarecv = false
}
}
diffclose := now.Sub(startCloseTime)
timeout := diffclose > time.Second*(time.Duration(p.timeout))
remoteclosed := clientConn.fm.IsRemoteClosed()
if timeout {
loggo.Info("close conn had timeout %s %s", clientConn.id, clientConn.tcpaddr.String())
break
}
if remoteclosed && nodatarecv {
loggo.Info("remote conn had closed %s %s", clientConn.id, clientConn.tcpaddr.String())
break
}
time.Sleep(time.Millisecond * 100)
}
loggo.Info("close tcp conn %s %s", clientConn.id, clientConn.tcpaddr.String())
@ -302,7 +360,7 @@ func (p *Client) Accept() error {
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Error("Error read udp %s", err)
loggo.Info("Error read udp %s", err)
continue
}
}
@ -379,7 +437,7 @@ func (p *Client) processPacket(packet *Packet) {
} else {
_, err := p.listenConn.WriteToUDP(packet.my.Data, addr)
if err != nil {
loggo.Error("WriteToUDP Error read udp %s", err)
loggo.Info("WriteToUDP Error read udp %s", err)
clientConn.close = true
return
}

View File

@ -76,7 +76,7 @@ func main() {
loggo.Info("key %d", *key)
if *t == "server" {
s, err := pingtunnel.NewServer(*timeout, *key)
s, err := pingtunnel.NewServer(*key)
if err != nil {
loggo.Error("ERROR: %s", err.Error())
return

View File

@ -22,6 +22,9 @@ type FrameMgr struct {
recvwin *list.List
recvlist *list.List
recvid int
close bool
remoteclosed bool
}
func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
@ -33,7 +36,8 @@ func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr {
recvlock: &sync.Mutex{},
windowsize: windowsize, resend_timems: resend_timems,
sendwin: list.New(), sendlist: list.New(), sendid: 0,
recvwin: list.New(), recvlist: list.New(), recvid: 0}
recvwin: list.New(), recvlist: list.New(), recvid: 0,
close: false, remoteclosed: false}
return fm
}
@ -95,6 +99,18 @@ func (fm *FrameMgr) cutSendBufferToWindow() {
fm.sendwin.PushBack(f)
}
if fm.sendb.Empty() && fm.close {
f := &Frame{Type: (int32)(Frame_DATA), Resend: false, Sendtime: 0,
Id: (int32)(fm.sendid),
Data: make([]byte, 0)}
fm.sendwin.PushBack(f)
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
}
}
func (fm *FrameMgr) calSendList() {
@ -116,7 +132,6 @@ func (fm *FrameMgr) getSendList() *list.List {
func (fm *FrameMgr) OnRecvFrame(f *Frame) {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
fm.recvlist.PushBack(f)
}
@ -236,6 +251,9 @@ func (fm *FrameMgr) combineWindowToRecvBuffer() {
if f.Id == (int32)(id) {
left := fm.recvb.Capacity() - fm.recvb.Size()
if left >= len(f.Data) {
if len(f.Data) == 0 {
fm.remoteclosed = true
}
fm.recvb.Write(f.Data)
fm.recvwin.Remove(e)
done = true
@ -261,16 +279,24 @@ func (fm *FrameMgr) combineWindowToRecvBuffer() {
if f.Id != (int32)(id) {
reqtmp[id]++
} else {
reqtmp[id]++
e = e.Next()
}
id++
if fm.recvid >= FRAME_MAX_ID {
fm.recvid = 0
if id >= FRAME_MAX_ID {
id = 0
}
}
for len(reqtmp) < fm.windowsize {
reqtmp[id]++
id++
if id >= FRAME_MAX_ID {
id = 0
}
break
}
f := &Frame{Type: (int32)(Frame_REQ), Resend: false, Sendtime: 0,
Id: 0,
Dataid: make([]int32, len(reqtmp))}
@ -293,3 +319,14 @@ func (fm *FrameMgr) GetRecvReadLineBuffer() []byte {
func (fm *FrameMgr) SkipRecvBuffer(size int) {
fm.recvb.SkipRead(size)
}
func (fm *FrameMgr) Close() {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
fm.close = true
}
func (fm *FrameMgr) IsRemoteClosed() bool {
return fm.remoteclosed
}

View File

@ -88,6 +88,7 @@ type MyMsg struct {
TcpmodeBuffersize int32 `protobuf:"varint,9,opt,name=tcpmode_buffersize,json=tcpmodeBuffersize,proto3" json:"tcpmode_buffersize,omitempty"`
TcpmodeMaxwin int32 `protobuf:"varint,10,opt,name=tcpmode_maxwin,json=tcpmodeMaxwin,proto3" json:"tcpmode_maxwin,omitempty"`
TcpmodeResendTimems int32 `protobuf:"varint,11,opt,name=tcpmode_resend_timems,json=tcpmodeResendTimems,proto3" json:"tcpmode_resend_timems,omitempty"`
Timeout int32 `protobuf:"varint,12,opt,name=timeout,proto3" json:"timeout,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -195,6 +196,13 @@ func (m *MyMsg) GetTcpmodeResendTimems() int32 {
return 0
}
func (m *MyMsg) GetTimeout() int32 {
if m != nil {
return m.Timeout
}
return 0
}
type Frame struct {
Type int32 `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"`
Resend bool `protobuf:"varint,2,opt,name=resend,proto3" json:"resend,omitempty"`
@ -284,28 +292,29 @@ func init() {
func init() { proto.RegisterFile("msg.proto", fileDescriptor_c06e4cca6c2cc899) }
var fileDescriptor_c06e4cca6c2cc899 = []byte{
// 366 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x51, 0xdd, 0x8a, 0xda, 0x40,
0x18, 0xed, 0x24, 0x99, 0x18, 0x3f, 0xad, 0xc4, 0xe9, 0x0f, 0x43, 0xaf, 0x42, 0xa0, 0x34, 0x37,
0xed, 0x45, 0xfb, 0x04, 0xd1, 0x5a, 0x91, 0x92, 0x62, 0x07, 0x6f, 0xda, 0x1b, 0x89, 0x66, 0x0c,
0x43, 0x9b, 0x18, 0x92, 0x91, 0x5d, 0xf7, 0x9d, 0x96, 0x7d, 0x82, 0x7d, 0x9b, 0x7d, 0x90, 0x65,
0x3e, 0x47, 0x41, 0xf6, 0x6a, 0xce, 0xf9, 0xce, 0x61, 0xbe, 0x8f, 0x73, 0xa0, 0x5f, 0x75, 0xe5,
0x97, 0xa6, 0xdd, 0xeb, 0x7d, 0xfc, 0xe4, 0x00, 0xcd, 0x8e, 0x59, 0x57, 0xb2, 0x11, 0x38, 0xaa,
0xe0, 0x24, 0x22, 0x49, 0x5f, 0x38, 0xaa, 0x60, 0x0c, 0x3c, 0x7d, 0x6c, 0x24, 0x77, 0x22, 0x92,
0x50, 0x81, 0x98, 0xbd, 0x07, 0x5f, 0xe7, 0x6d, 0x29, 0x35, 0x77, 0xd1, 0x67, 0x99, 0xf1, 0x16,
0xb9, 0xce, 0xb9, 0x17, 0x91, 0x64, 0x28, 0x10, 0x1b, 0x6f, 0x8b, 0x3b, 0x38, 0x8d, 0x48, 0x32,
0x16, 0x96, 0xb1, 0xb7, 0x40, 0xab, 0xbc, 0x54, 0x5b, 0xee, 0xe3, 0xf8, 0x44, 0x58, 0x08, 0xee,
0x3f, 0x79, 0xe4, 0x3d, 0x9c, 0x19, 0xc8, 0x38, 0xf4, 0xf4, 0xb6, 0xa9, 0xf6, 0x85, 0xe4, 0x01,
0x9e, 0x70, 0xa6, 0xec, 0x33, 0x30, 0x0b, 0xd7, 0x9b, 0xc3, 0x6e, 0x27, 0xdb, 0x4e, 0xdd, 0x49,
0xde, 0x47, 0xd3, 0xd8, 0x2a, 0x93, 0x8b, 0xc0, 0x3e, 0xc2, 0xe8, 0x6c, 0xaf, 0xf2, 0xdb, 0x1b,
0x55, 0x73, 0x40, 0xeb, 0x6b, 0x3b, 0xcd, 0x70, 0xc8, 0xbe, 0xc2, 0xbb, 0xb3, 0xad, 0x95, 0x9d,
0xac, 0x8b, 0xb5, 0x56, 0x95, 0xac, 0x3a, 0x3e, 0x40, 0xf7, 0x1b, 0x2b, 0x0a, 0xd4, 0x56, 0x28,
0xc5, 0x9f, 0xc0, 0x5b, 0xfd, 0x59, 0xce, 0x58, 0x00, 0xde, 0xf7, 0x74, 0x95, 0x86, 0xaf, 0x0c,
0x5a, 0x2e, 0x7e, 0xcd, 0x43, 0xc2, 0x06, 0x40, 0xb3, 0x74, 0xbe, 0x98, 0x86, 0xf7, 0x8f, 0x6e,
0xfc, 0x40, 0x80, 0xfe, 0x68, 0xf3, 0x4a, 0x5e, 0x62, 0x25, 0xd7, 0xb1, 0x9e, 0x56, 0x62, 0xd8,
0x81, 0xb0, 0x8c, 0x7d, 0x80, 0xc0, 0xbc, 0xe6, 0x0e, 0x0c, 0xdc, 0x15, 0x17, 0x6e, 0xeb, 0xf2,
0xf0, 0x17, 0x5b, 0x17, 0x56, 0x40, 0xaf, 0x2b, 0x30, 0xaf, 0x2a, 0xb8, 0x1f, 0xb9, 0x09, 0x15,
0x96, 0xc5, 0xf1, 0x8b, 0xb3, 0x7b, 0xe0, 0x8a, 0xd9, 0xef, 0x90, 0x18, 0x90, 0x4e, 0x7f, 0x86,
0xce, 0x64, 0xf8, 0x17, 0x1a, 0x55, 0x97, 0xfa, 0x50, 0xd7, 0xf2, 0xff, 0xc6, 0xc7, 0xee, 0xbe,
0x3d, 0x07, 0x00, 0x00, 0xff, 0xff, 0xa7, 0xdc, 0xfd, 0xb2, 0x3a, 0x02, 0x00, 0x00,
// 372 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x51, 0xdd, 0xca, 0xd3, 0x30,
0x18, 0x36, 0x6d, 0xd3, 0x75, 0xef, 0x37, 0x3f, 0xba, 0xf8, 0x43, 0xf0, 0xa8, 0x14, 0xc4, 0x9e,
0xe8, 0x81, 0x5e, 0x41, 0x37, 0xe7, 0x18, 0x52, 0x99, 0x61, 0x27, 0x7a, 0x32, 0xba, 0x35, 0x2b,
0x41, 0xd3, 0x96, 0x36, 0x43, 0xe7, 0x3d, 0x89, 0x57, 0xe0, 0xad, 0x89, 0xe4, 0x5d, 0x36, 0x18,
0xdf, 0x51, 0x9e, 0x3f, 0x92, 0x87, 0x3c, 0x30, 0xd6, 0x43, 0xfd, 0xa6, 0xeb, 0x5b, 0xd3, 0xa6,
0xff, 0x3c, 0xa0, 0xc5, 0xa9, 0x18, 0x6a, 0x76, 0x0f, 0x9e, 0xaa, 0x38, 0x49, 0x48, 0x36, 0x16,
0x9e, 0xaa, 0x18, 0x83, 0xc0, 0x9c, 0x3a, 0xc9, 0xbd, 0x84, 0x64, 0x54, 0x20, 0x66, 0xcf, 0x21,
0x34, 0x65, 0x5f, 0x4b, 0xc3, 0x7d, 0xcc, 0x39, 0x66, 0xb3, 0x55, 0x69, 0x4a, 0x1e, 0x24, 0x24,
0x9b, 0x08, 0xc4, 0x36, 0xdb, 0xe3, 0x1b, 0x9c, 0x26, 0x24, 0x9b, 0x0a, 0xc7, 0xd8, 0x53, 0xa0,
0xba, 0xac, 0xd5, 0x9e, 0x87, 0x28, 0x9f, 0x09, 0x8b, 0xc1, 0xff, 0x26, 0x4f, 0x7c, 0x84, 0x9a,
0x85, 0x8c, 0xc3, 0xc8, 0xec, 0x3b, 0xdd, 0x56, 0x92, 0x47, 0x58, 0xe1, 0x42, 0xd9, 0x6b, 0x60,
0x0e, 0x6e, 0x77, 0xc7, 0xc3, 0x41, 0xf6, 0x83, 0xfa, 0x25, 0xf9, 0x18, 0x43, 0x53, 0xe7, 0xcc,
0xae, 0x06, 0x7b, 0x09, 0xf7, 0x97, 0xb8, 0x2e, 0x7f, 0xfe, 0x50, 0x0d, 0x07, 0x8c, 0x3e, 0x76,
0x6a, 0x81, 0x22, 0x7b, 0x0b, 0xcf, 0x2e, 0xb1, 0x5e, 0x0e, 0xb2, 0xa9, 0xb6, 0x46, 0x69, 0xa9,
0x07, 0x7e, 0x87, 0xe9, 0x27, 0xce, 0x14, 0xe8, 0x6d, 0xd0, 0xc2, 0x8e, 0x4a, 0xcb, 0xf6, 0x68,
0xf8, 0xc4, 0x75, 0x3c, 0xd3, 0xf4, 0x15, 0x04, 0x9b, 0x2f, 0xeb, 0x05, 0x8b, 0x20, 0x78, 0x9f,
0x6f, 0xf2, 0xf8, 0x91, 0x45, 0xeb, 0xd5, 0xa7, 0x65, 0x4c, 0xd8, 0x1d, 0xd0, 0x22, 0x5f, 0xae,
0xe6, 0xf1, 0xef, 0xbf, 0x7e, 0xfa, 0x87, 0x00, 0xfd, 0xd0, 0x97, 0x5a, 0x5e, 0x3f, 0x9c, 0xdc,
0x7e, 0xf8, 0xb9, 0x0c, 0xce, 0x10, 0x09, 0xc7, 0xd8, 0x0b, 0x88, 0xec, 0x69, 0x5f, 0xc3, 0x29,
0x7c, 0x71, 0xe5, 0x6e, 0xc8, 0x00, 0x6f, 0x71, 0x43, 0xe2, 0x38, 0xf4, 0x76, 0x1c, 0x7b, 0xaa,
0x8a, 0x87, 0x89, 0x9f, 0x51, 0xe1, 0x58, 0x9a, 0x3e, 0xa8, 0x3d, 0x02, 0x5f, 0x2c, 0x3e, 0xc7,
0xc4, 0x82, 0x7c, 0xfe, 0x31, 0xf6, 0x66, 0x93, 0xaf, 0xd0, 0xa9, 0xa6, 0x36, 0xc7, 0xa6, 0x91,
0xdf, 0x77, 0x21, 0xae, 0xfa, 0xee, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6c, 0x65, 0x2c, 0xe9,
0x54, 0x02, 0x00, 0x00,
}

View File

@ -19,6 +19,7 @@ message MyMsg {
int32 tcpmode_buffersize = 9;
int32 tcpmode_maxwin = 10;
int32 tcpmode_resend_timems = 11;
int32 timeout = 12;
}
message Frame {

View File

@ -65,7 +65,7 @@ func sendICMP(id int, sequence int, conn icmp.PacketConn, server *net.IPAddr, ta
continue
}
}
loggo.Error("sendICMP WriteTo error %s %s", server.String(), err)
loggo.Info("sendICMP WriteTo error %s %s", server.String(), err)
}
break
}
@ -83,7 +83,7 @@ func recvICMP(conn icmp.PacketConn, recv chan<- *Packet) {
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Error("Error read icmp message %s", err)
loggo.Info("Error read icmp message %s", err)
continue
}
}
@ -107,11 +107,6 @@ func recvICMP(conn icmp.PacketConn, recv chan<- *Packet) {
continue
}
if my.Data == nil {
loggo.Error("processPacket data nil %s", my.Id)
continue
}
recv <- &Packet{my: my,
src: srcaddr.(*net.IPAddr),
echoId: echoId, echoSeq: echoSeq}

View File

@ -12,13 +12,14 @@ func Test0001(t *testing.T) {
my.Id = "12345"
my.Target = "111:11"
my.Type = 12
my.Data = make([]byte, 3)
my.Data = make([]byte, 0)
dst, _ := proto.Marshal(my)
fmt.Println("dst = ", dst)
my1 := &MyMsg{}
proto.Unmarshal(dst, my1)
fmt.Println("my1 = ", my1)
fmt.Println("my1.Data = ", my1.Data)
proto.Unmarshal(dst[0:4], my1)
fmt.Println("my1 = ", my1)

103
server.go
View File

@ -1,6 +1,7 @@
package pingtunnel
import (
"github.com/esrrhs/go-engine/src/common"
"github.com/esrrhs/go-engine/src/loggo"
"github.com/golang/protobuf/proto"
"golang.org/x/net/icmp"
@ -8,15 +9,13 @@ import (
"time"
)
func NewServer(timeout int, key int) (*Server, error) {
func NewServer(key int) (*Server, error) {
return &Server{
timeout: timeout,
key: key,
}, nil
}
type Server struct {
timeout int
key int
conn *icmp.PacketConn
@ -33,6 +32,7 @@ type Server struct {
}
type ServerConn struct {
timeout int
ipaddrTarget *net.UDPAddr
conn *net.UDPConn
tcpaddrTarget *net.TCPAddr
@ -118,7 +118,7 @@ func (p *Server) processPacket(packet *Packet) {
fm := NewFrameMgr((int)(packet.my.TcpmodeBuffersize), (int)(packet.my.TcpmodeMaxwin), (int)(packet.my.TcpmodeResendTimems))
localConn = &ServerConn{tcpconn: targetConn, tcpaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false,
localConn = &ServerConn{timeout: (int)(packet.my.Timeout), tcpconn: targetConn, tcpaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false,
rproto: (int)(packet.my.Rproto), fm: fm, tcpmode: (int)(packet.my.Tcpmode)}
p.localConnMap[id] = localConn
@ -140,7 +140,7 @@ func (p *Server) processPacket(packet *Packet) {
return
}
localConn = &ServerConn{conn: targetConn, ipaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false,
localConn = &ServerConn{timeout: (int)(packet.my.Timeout), conn: targetConn, ipaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false,
rproto: (int)(packet.my.Rproto), tcpmode: (int)(packet.my.Tcpmode)}
p.localConnMap[id] = localConn
@ -166,7 +166,7 @@ func (p *Server) processPacket(packet *Packet) {
} else {
_, err := localConn.conn.Write(packet.my.Data)
if err != nil {
loggo.Error("WriteToUDP Error %s", err)
loggo.Info("WriteToUDP Error %s", err)
localConn.close = true
return
}
@ -183,72 +183,133 @@ func (p *Server) RecvTCP(conn *ServerConn, id string, src *net.IPAddr) {
bytes := make([]byte, 10240)
tcpActiveRecvTime := time.Now()
tcpActiveSendTime := time.Now()
for {
left := conn.fm.GetSendBufferLeft()
if left >= len(bytes) {
now := time.Now()
left := common.MinOfInt(conn.fm.GetSendBufferLeft(), len(bytes))
if left > 0 {
conn.tcpconn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, err := conn.tcpconn.Read(bytes)
n, err := conn.tcpconn.Read(bytes[0:left])
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Error("Error read tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err)
loggo.Info("Error read tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err)
conn.fm.Close()
break
}
}
if n > 0 {
conn.fm.WriteSendBuffer(bytes[:n])
tcpActiveRecvTime = now
}
}
conn.fm.Update()
sendlist := conn.fm.getSendList()
now := time.Now()
if sendlist.Len() > 0 {
conn.activeSendTime = now
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, err := proto.Marshal(f)
if err != nil {
loggo.Error("Error tcp Marshal %s %s %s", conn.id, conn.tcpaddrTarget.String(), err)
continue
}
sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb,
conn.rproto, -1, p.key,
0, 0, 0, 0)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
}
if conn.fm.GetRecvBufferSize() > 0 {
rr := conn.fm.GetRecvReadLineBuffer()
conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100))
n, err := conn.tcpconn.Write(rr)
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Error("Error write tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err)
loggo.Info("Error write tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err)
conn.fm.Close()
break
}
}
if n > 0 {
conn.fm.SkipRecvBuffer(n)
tcpActiveSendTime = now
}
}
diffrecv := now.Sub(conn.activeRecvTime)
diffsend := now.Sub(conn.activeSendTime)
if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) {
tcpdiffrecv := now.Sub(tcpActiveRecvTime)
tcpdiffsend := now.Sub(tcpActiveSendTime)
if diffrecv > time.Second*(time.Duration(conn.timeout)) || diffsend > time.Second*(time.Duration(conn.timeout)) ||
tcpdiffrecv > time.Second*(time.Duration(conn.timeout)) || tcpdiffsend > time.Second*(time.Duration(conn.timeout)) {
loggo.Info("close inactive conn %s %s", conn.id, conn.tcpaddrTarget.String())
conn.fm.Close()
break
}
if conn.fm.IsRemoteClosed() {
loggo.Info("closed by remote conn %s %s", conn.id, conn.tcpaddrTarget.String())
conn.fm.Close()
break
}
}
startCloseTime := time.Now()
for {
now := time.Now()
conn.fm.Update()
sendlist := conn.fm.getSendList()
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, _ := proto.Marshal(f)
sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb,
conn.rproto, -1, p.key,
0, 0, 0, 0)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
nodatarecv := true
if conn.fm.GetRecvBufferSize() > 0 {
rr := conn.fm.GetRecvReadLineBuffer()
conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100))
n, _ := conn.tcpconn.Write(rr)
if n > 0 {
conn.fm.SkipRecvBuffer(n)
nodatarecv = false
}
}
diffclose := now.Sub(startCloseTime)
timeout := diffclose > time.Second*(time.Duration(conn.timeout))
remoteclosed := conn.fm.IsRemoteClosed()
if timeout {
loggo.Info("close conn had timeout %s %s", conn.id, conn.tcpaddrTarget.String())
break
}
if remoteclosed && nodatarecv {
loggo.Info("remote conn had closed %s %s", conn.id, conn.tcpaddrTarget.String())
break
}
time.Sleep(time.Millisecond * 100)
}
time.Sleep(time.Second)
loggo.Info("close tcp conn %s %s", conn.id, conn.tcpaddrTarget.String())
p.Close(conn)
}
@ -265,7 +326,7 @@ func (p *Server) Recv(conn *ServerConn, id string, src *net.IPAddr) {
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Error("ReadFromUDP Error read udp %s", err)
loggo.Info("ReadFromUDP Error read udp %s", err)
conn.close = true
return
}
@ -304,7 +365,7 @@ func (p *Server) checkTimeoutConn() {
}
diffrecv := now.Sub(conn.activeRecvTime)
diffsend := now.Sub(conn.activeSendTime)
if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) {
if diffrecv > time.Second*(time.Duration(conn.timeout)) || diffsend > time.Second*(time.Duration(conn.timeout)) {
conn.close = true
}
}