This commit is contained in:
esrrhs 2019-11-01 21:10:32 +08:00
parent d84320fffb
commit db35790150
10 changed files with 2 additions and 2833 deletions

721
client.go
View File

@ -1,721 +0,0 @@
package pingtunnel
import (
"github.com/esrrhs/go-engine/src/common"
"github.com/esrrhs/go-engine/src/loggo"
"github.com/golang/protobuf/proto"
"golang.org/x/net/icmp"
"math"
"math/rand"
"net"
"sync"
"time"
)
const (
SEND_PROTO int = 8
RECV_PROTO int = 0
)
func NewClient(addr string, server string, target string, timeout int, key int,
tcpmode int, tcpmode_buffersize int, tcpmode_maxwin int, tcpmode_resend_timems int, tcpmode_compress int,
tcpmode_stat int, open_sock5 int, maxconn int) (*Client, error) {
var ipaddr *net.UDPAddr
var tcpaddr *net.TCPAddr
var err error
if tcpmode > 0 {
tcpaddr, err = net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}
} else {
ipaddr, err = net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
}
ipaddrServer, err := net.ResolveIPAddr("ip", server)
if err != nil {
return nil, err
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
return &Client{
exit: false,
rtt: 0,
id: r.Intn(math.MaxInt16),
ipaddr: ipaddr,
tcpaddr: tcpaddr,
addr: addr,
ipaddrServer: ipaddrServer,
addrServer: server,
targetAddr: target,
timeout: timeout,
key: key,
tcpmode: tcpmode,
tcpmode_buffersize: tcpmode_buffersize,
tcpmode_maxwin: tcpmode_maxwin,
tcpmode_resend_timems: tcpmode_resend_timems,
tcpmode_compress: tcpmode_compress,
tcpmode_stat: tcpmode_stat,
open_sock5: open_sock5,
maxconn: maxconn,
}, nil
}
type Client struct {
exit bool
rtt time.Duration
interval *time.Ticker
workResultLock sync.WaitGroup
maxconn int
id int
sequence int
timeout int
sproto int
rproto int
key int
tcpmode int
tcpmode_buffersize int
tcpmode_maxwin int
tcpmode_resend_timems int
tcpmode_compress int
tcpmode_stat int
open_sock5 int
ipaddr *net.UDPAddr
tcpaddr *net.TCPAddr
addr string
ipaddrServer *net.IPAddr
addrServer string
targetAddr string
conn *icmp.PacketConn
listenConn *net.UDPConn
tcplistenConn *net.TCPListener
localAddrToConnMap sync.Map
localIdToConnMap sync.Map
sendPacket uint64
recvPacket uint64
sendPacketSize uint64
recvPacketSize uint64
localAddrToConnMapSize int
localIdToConnMapSize int
}
type ClientConn struct {
exit bool
ipaddr *net.UDPAddr
tcpaddr *net.TCPAddr
id string
activeRecvTime time.Time
activeSendTime time.Time
close bool
fm *FrameMgr
}
func (p *Client) Addr() string {
return p.addr
}
func (p *Client) IPAddr() *net.UDPAddr {
return p.ipaddr
}
func (p *Client) TargetAddr() string {
return p.targetAddr
}
func (p *Client) ServerIPAddr() *net.IPAddr {
return p.ipaddrServer
}
func (p *Client) ServerAddr() string {
return p.addrServer
}
func (p *Client) RTT() time.Duration {
return p.rtt
}
func (p *Client) RecvPacketSize() uint64 {
return p.recvPacketSize
}
func (p *Client) SendPacketSize() uint64 {
return p.sendPacketSize
}
func (p *Client) RecvPacket() uint64 {
return p.recvPacket
}
func (p *Client) SendPacket() uint64 {
return p.sendPacket
}
func (p *Client) LocalIdToConnMapSize() int {
return p.localIdToConnMapSize
}
func (p *Client) LocalAddrToConnMapSize() int {
return p.localAddrToConnMapSize
}
func (p *Client) Run() error {
conn, err := icmp.ListenPacket("ip4:icmp", "")
if err != nil {
loggo.Error("Error listening for ICMP packets: %s", err.Error())
return err
}
p.conn = conn
if p.tcpmode > 0 {
tcplistenConn, err := net.ListenTCP("tcp", p.tcpaddr)
if err != nil {
loggo.Error("Error listening for tcp packets: %s", err.Error())
return err
}
p.tcplistenConn = tcplistenConn
} else {
listener, err := net.ListenUDP("udp", p.ipaddr)
if err != nil {
loggo.Error("Error listening for udp packets: %s", err.Error())
return err
}
p.listenConn = listener
}
if p.tcpmode > 0 {
go p.AcceptTcp()
} else {
go p.Accept()
}
recv := make(chan *Packet, 10000)
go recvICMP(&p.workResultLock, &p.exit, *p.conn, recv)
p.interval = time.NewTicker(time.Second)
go func() {
p.workResultLock.Add(1)
defer p.workResultLock.Done()
for !p.exit {
select {
case <-p.interval.C:
p.checkTimeoutConn()
p.ping()
p.showNet()
case r := <-recv:
p.processPacket(r)
}
}
}()
return nil
}
func (p *Client) Stop() {
p.exit = true
p.workResultLock.Wait()
p.conn.Close()
if p.tcplistenConn != nil {
p.tcplistenConn.Close()
}
if p.listenConn != nil {
p.listenConn.Close()
}
p.interval.Stop()
}
func (p *Client) AcceptTcp() error {
p.workResultLock.Add(1)
defer p.workResultLock.Done()
loggo.Info("client waiting local accept tcp")
for !p.exit {
p.tcplistenConn.SetDeadline(time.Now().Add(time.Millisecond * 1000))
conn, err := p.tcplistenConn.AcceptTCP()
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("Error accept tcp %s", err)
continue
}
}
if conn != nil {
if p.open_sock5 > 0 {
go p.AcceptSock5Conn(conn)
} else {
go p.AcceptTcpConn(conn, p.targetAddr)
}
}
}
return nil
}
func (p *Client) AcceptTcpConn(conn *net.TCPConn, targetAddr string) {
p.workResultLock.Add(1)
defer p.workResultLock.Done()
tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr)
if p.maxconn > 0 && p.localIdToConnMapSize >= p.maxconn {
loggo.Info("too many connections %d, client accept new local tcp fail %s", p.localIdToConnMapSize, tcpsrcaddr.String())
return
}
uuid := UniqueId()
fm := NewFrameMgr(p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress, p.tcpmode_stat)
now := time.Now()
clientConn := &ClientConn{exit: false, tcpaddr: tcpsrcaddr, id: uuid, activeRecvTime: now, activeSendTime: now, close: false,
fm: fm}
p.addClientConn(uuid, tcpsrcaddr.String(), clientConn)
loggo.Info("client accept new local tcp %s %s", uuid, tcpsrcaddr.String())
loggo.Info("start connect remote tcp %s %s", uuid, tcpsrcaddr.String())
clientConn.fm.Connect()
startConnectTime := time.Now()
for !p.exit && !clientConn.exit {
if clientConn.fm.IsConnected() {
break
}
clientConn.fm.Update()
sendlist := clientConn.fm.getSendList()
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, _ := proto.Marshal(f)
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress, p.tcpmode_stat,
p.timeout)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
time.Sleep(time.Millisecond * 10)
now := time.Now()
diffclose := now.Sub(startConnectTime)
if diffclose > time.Second*(time.Duration(p.timeout)) {
loggo.Info("can not connect remote tcp %s %s", uuid, tcpsrcaddr.String())
p.close(clientConn)
return
}
}
if !clientConn.exit {
loggo.Info("connected remote tcp %s %s", uuid, tcpsrcaddr.String())
}
bytes := make([]byte, 10240)
tcpActiveRecvTime := time.Now()
tcpActiveSendTime := time.Now()
for !p.exit && !clientConn.exit {
now := time.Now()
sleep := true
left := common.MinOfInt(clientConn.fm.GetSendBufferLeft(), len(bytes))
if left > 0 {
conn.SetReadDeadline(time.Now().Add(time.Millisecond * 1))
n, err := conn.Read(bytes[0:left])
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
clientConn.fm.Close()
break
}
}
if n > 0 {
sleep = false
clientConn.fm.WriteSendBuffer(bytes[:n])
tcpActiveRecvTime = now
}
}
clientConn.fm.Update()
sendlist := clientConn.fm.getSendList()
if sendlist.Len() > 0 {
sleep = false
clientConn.activeSendTime = now
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, err := proto.Marshal(f)
if err != nil {
loggo.Error("Error tcp Marshal %s %s %s", uuid, tcpsrcaddr.String(), err)
continue
}
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, 0, 0, 0, 0, 0,
0)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
}
if clientConn.fm.GetRecvBufferSize() > 0 {
sleep = false
rr := clientConn.fm.GetRecvReadLineBuffer()
conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 1))
n, err := conn.Write(rr)
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("Error write tcp %s %s %s", uuid, tcpsrcaddr.String(), err)
clientConn.fm.Close()
break
}
}
if n > 0 {
clientConn.fm.SkipRecvBuffer(n)
tcpActiveSendTime = now
}
}
if sleep {
time.Sleep(time.Millisecond * 10)
}
diffrecv := now.Sub(clientConn.activeRecvTime)
diffsend := now.Sub(clientConn.activeSendTime)
tcpdiffrecv := now.Sub(tcpActiveRecvTime)
tcpdiffsend := now.Sub(tcpActiveSendTime)
if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) ||
tcpdiffrecv > time.Second*(time.Duration(p.timeout)) || tcpdiffsend > time.Second*(time.Duration(p.timeout)) {
loggo.Info("close inactive conn %s %s", clientConn.id, clientConn.tcpaddr.String())
clientConn.fm.Close()
break
}
if clientConn.fm.IsRemoteClosed() {
loggo.Info("closed by remote conn %s %s", clientConn.id, clientConn.tcpaddr.String())
clientConn.fm.Close()
break
}
}
startCloseTime := time.Now()
for !p.exit && !clientConn.exit {
now := time.Now()
clientConn.fm.Update()
sendlist := clientConn.fm.getSendList()
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, _ := proto.Marshal(f)
p.sequence++
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb,
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, 0, 0, 0, 0, 0,
0)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
nodatarecv := true
if clientConn.fm.GetRecvBufferSize() > 0 {
rr := clientConn.fm.GetRecvReadLineBuffer()
conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100))
n, _ := conn.Write(rr)
if n > 0 {
clientConn.fm.SkipRecvBuffer(n)
nodatarecv = false
}
}
diffclose := now.Sub(startCloseTime)
timeout := diffclose > time.Second*(time.Duration(p.timeout))
remoteclosed := clientConn.fm.IsRemoteClosed()
if timeout {
loggo.Info("close conn had timeout %s %s", clientConn.id, clientConn.tcpaddr.String())
break
}
if remoteclosed && nodatarecv {
loggo.Info("remote conn had closed %s %s", clientConn.id, clientConn.tcpaddr.String())
break
}
time.Sleep(time.Millisecond * 100)
}
loggo.Info("close tcp conn %s %s", clientConn.id, clientConn.tcpaddr.String())
conn.Close()
p.close(clientConn)
}
func (p *Client) Accept() error {
p.workResultLock.Add(1)
defer p.workResultLock.Done()
loggo.Info("client waiting local accept udp")
bytes := make([]byte, 10240)
for !p.exit {
p.listenConn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, srcaddr, err := p.listenConn.ReadFromUDP(bytes)
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("Error read udp %s", err)
continue
}
}
if n <= 0 {
continue
}
now := time.Now()
clientConn := p.getClientConnByAddr(srcaddr.String())
if clientConn == nil {
if p.maxconn > 0 && p.localIdToConnMapSize >= p.maxconn {
loggo.Info("too many connections %d, client accept new local udp fail %s", p.localIdToConnMapSize, srcaddr.String())
continue
}
uuid := UniqueId()
clientConn = &ClientConn{exit: false, ipaddr: srcaddr, id: uuid, activeRecvTime: now, activeSendTime: now, close: false}
p.addClientConn(uuid, srcaddr.String(), clientConn)
loggo.Info("client accept new local udp %s %s", uuid, srcaddr.String())
}
clientConn.activeSendTime = now
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), bytes[:n],
SEND_PROTO, RECV_PROTO, p.key,
p.tcpmode, 0, 0, 0, 0, 0,
p.timeout)
p.sequence++
p.sendPacket++
p.sendPacketSize += (uint64)(n)
}
return nil
}
func (p *Client) processPacket(packet *Packet) {
if packet.my.Rproto >= 0 {
return
}
if packet.my.Key != (int32)(p.key) {
return
}
if packet.echoId != p.id {
return
}
if packet.my.Type == (int32)(MyMsg_PING) {
t := time.Time{}
t.UnmarshalBinary(packet.my.Data)
d := time.Now().Sub(t)
loggo.Info("pong from %s %s", packet.src.String(), d.String())
p.rtt = d
return
}
if packet.my.Type == (int32)(MyMsg_KICK) {
clientConn := p.getClientConnById(packet.my.Id)
if clientConn != nil {
p.close(clientConn)
loggo.Info("remote kick local %s", packet.my.Id)
}
return
}
loggo.Debug("processPacket %s %s %d", packet.my.Id, packet.src.String(), len(packet.my.Data))
clientConn := p.getClientConnById(packet.my.Id)
if clientConn == nil {
loggo.Debug("processPacket no conn %s ", packet.my.Id)
p.remoteError(packet.my.Id)
return
}
now := time.Now()
clientConn.activeRecvTime = now
if p.tcpmode > 0 {
f := &Frame{}
err := proto.Unmarshal(packet.my.Data, f)
if err != nil {
loggo.Error("Unmarshal tcp Error %s", err)
return
}
clientConn.fm.OnRecvFrame(f)
} else {
addr := clientConn.ipaddr
_, err := p.listenConn.WriteToUDP(packet.my.Data, addr)
if err != nil {
loggo.Info("WriteToUDP Error read udp %s", err)
clientConn.close = true
return
}
}
p.recvPacket++
p.recvPacketSize += (uint64)(len(packet.my.Data))
}
func (p *Client) close(clientConn *ClientConn) {
clientConn.exit = true
p.deleteClientConn(clientConn.id, clientConn.ipaddr.String())
p.deleteClientConn(clientConn.id, clientConn.tcpaddr.String())
}
func (p *Client) checkTimeoutConn() {
if p.tcpmode > 0 {
return
}
tmp := make(map[string]*ClientConn)
p.localIdToConnMap.Range(func(key, value interface{}) bool {
id := key.(string)
clientConn := value.(*ClientConn)
tmp[id] = clientConn
return true
})
now := time.Now()
for _, conn := range tmp {
diffrecv := now.Sub(conn.activeRecvTime)
diffsend := now.Sub(conn.activeSendTime)
if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) {
conn.close = true
}
}
for id, conn := range tmp {
if conn.close {
loggo.Info("close inactive conn %s %s", id, conn.ipaddr.String())
p.close(conn)
}
}
}
func (p *Client) ping() {
now := time.Now()
b, _ := now.MarshalBinary()
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, "", "", (uint32)(MyMsg_PING), b,
SEND_PROTO, RECV_PROTO, p.key,
0, 0, 0, 0, 0, 0,
0)
loggo.Info("ping %s %s %d %d %d %d", p.addrServer, now.String(), p.sproto, p.rproto, p.id, p.sequence)
p.sequence++
}
func (p *Client) showNet() {
p.localAddrToConnMapSize = 0
p.localIdToConnMap.Range(func(key, value interface{}) bool {
p.localAddrToConnMapSize++
return true
})
p.localIdToConnMapSize = 0
p.localIdToConnMap.Range(func(key, value interface{}) bool {
p.localIdToConnMapSize++
return true
})
loggo.Info("send %dPacket/s %dKB/s recv %dPacket/s %dKB/s %d/%dConnections",
p.sendPacket, p.sendPacketSize/1024, p.recvPacket, p.recvPacketSize/1024, p.localAddrToConnMapSize, p.localIdToConnMapSize)
p.sendPacket = 0
p.recvPacket = 0
p.sendPacketSize = 0
p.recvPacketSize = 0
}
func (p *Client) AcceptSock5Conn(conn *net.TCPConn) {
p.workResultLock.Add(1)
defer p.workResultLock.Done()
var err error = nil
if err = sock5Handshake(conn); err != nil {
loggo.Error("socks handshake: %s", err)
conn.Close()
return
}
_, addr, err := sock5GetRequest(conn)
if err != nil {
loggo.Error("error getting request: %s", err)
conn.Close()
return
}
// Sending connection established message immediately to client.
// This some round trip time for creating socks connection with the client.
// But if connection failed, the client will get connection reset error.
_, err = conn.Write([]byte{0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x08, 0x43})
if err != nil {
loggo.Error("send connection confirmation:", err)
conn.Close()
return
}
loggo.Info("accept new sock5 conn: %s", addr)
p.AcceptTcpConn(conn, addr)
}
func (p *Client) addClientConn(uuid string, addr string, clientConn *ClientConn) {
p.localAddrToConnMap.Store(addr, clientConn)
p.localIdToConnMap.Store(uuid, clientConn)
}
func (p *Client) getClientConnByAddr(addr string) *ClientConn {
ret, ok := p.localAddrToConnMap.Load(addr)
if !ok {
return nil
}
return ret.(*ClientConn)
}
func (p *Client) getClientConnById(uuid string) *ClientConn {
ret, ok := p.localIdToConnMap.Load(uuid)
if !ok {
return nil
}
return ret.(*ClientConn)
}
func (p *Client) deleteClientConn(uuid string, addr string) {
p.localIdToConnMap.Delete(uuid)
p.localAddrToConnMap.Delete(addr)
}
func (p *Client) remoteError(uuid string) {
sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, "", uuid, (uint32)(MyMsg_KICK), []byte{},
SEND_PROTO, RECV_PROTO, p.key,
0, 0, 0, 0, 0, 0,
0)
}

View File

@ -1,686 +0,0 @@
package pingtunnel
import (
"bytes"
"compress/zlib"
"container/list"
"github.com/esrrhs/go-engine/src/common"
"github.com/esrrhs/go-engine/src/loggo"
"github.com/esrrhs/go-engine/src/rbuffergo"
"io"
"strconv"
"sync"
"time"
)
type FrameStat struct {
sendDataNum int
recvDataNum int
sendReqNum int
recvReqNum int
sendAckNum int
recvAckNum int
sendDataNumsMap map[int32]int
recvDataNumsMap map[int32]int
sendReqNumsMap map[int32]int
recvReqNumsMap map[int32]int
sendAckNumsMap map[int32]int
recvAckNumsMap map[int32]int
sendping int
sendpong int
recvping int
recvpong int
}
type FrameMgr struct {
sendb *rbuffergo.RBuffergo
recvb *rbuffergo.RBuffergo
recvlock sync.Locker
windowsize int
resend_timems int
compress int
sendwin *list.List
sendlist *list.List
sendid int
recvwin *list.List
recvlist *list.List
recvid int
close bool
remoteclosed bool
closesend bool
lastPingTime int64
rttns int64
reqmap map[int32]int64
sendmap map[int32]int64
connected bool
fs *FrameStat
openstat int
lastPrintStat int64
}
func NewFrameMgr(buffersize int, windowsize int, resend_timems int, compress int, openstat int) *FrameMgr {
sendb := rbuffergo.New(buffersize, false)
recvb := rbuffergo.New(buffersize, false)
fm := &FrameMgr{sendb: sendb, recvb: recvb,
recvlock: &sync.Mutex{},
windowsize: windowsize, resend_timems: resend_timems, compress: compress,
sendwin: list.New(), sendlist: list.New(), sendid: 0,
recvwin: list.New(), recvlist: list.New(), recvid: 0,
close: false, remoteclosed: false, closesend: false,
lastPingTime: time.Now().UnixNano(), rttns: (int64)(resend_timems * 1000),
reqmap: make(map[int32]int64), sendmap: make(map[int32]int64),
connected: false, openstat: openstat, lastPrintStat: time.Now().UnixNano()}
if openstat > 0 {
fm.resetStat()
}
return fm
}
func (fm *FrameMgr) GetSendBufferLeft() int {
left := fm.sendb.Capacity() - fm.sendb.Size()
return left
}
func (fm *FrameMgr) WriteSendBuffer(data []byte) {
fm.sendb.Write(data)
loggo.Debug("WriteSendBuffer %d %d", fm.sendb.Size(), len(data))
}
func (fm *FrameMgr) Update() {
fm.cutSendBufferToWindow()
fm.sendlist.Init()
tmpreq, tmpack, tmpackto := fm.preProcessRecvList()
fm.processRecvList(tmpreq, tmpack, tmpackto)
fm.combineWindowToRecvBuffer()
fm.calSendList()
fm.ping()
fm.printStat()
}
func (fm *FrameMgr) cutSendBufferToWindow() {
sendall := false
if fm.sendb.Size() < FRAME_MAX_SIZE {
sendall = true
}
for fm.sendb.Size() >= FRAME_MAX_SIZE && fm.sendwin.Len() < fm.windowsize {
fd := &FrameData{Type: (int32)(FrameData_USER_DATA),
Data: make([]byte, FRAME_MAX_SIZE)}
fm.sendb.Read(fd.Data)
if fm.compress > 0 && len(fd.Data) > fm.compress {
newb := fm.compressData(fd.Data)
if len(newb) < len(fd.Data) {
fd.Data = newb
fd.Compress = true
}
}
f := &Frame{Type: (int32)(Frame_DATA),
Id: (int32)(fm.sendid),
Data: fd}
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
fm.sendwin.PushBack(f)
loggo.Debug("cut frame push to send win %d %d %d", f.Id, FRAME_MAX_SIZE, fm.sendwin.Len())
}
if sendall && fm.sendb.Size() > 0 && fm.sendwin.Len() < fm.windowsize {
fd := &FrameData{Type: (int32)(FrameData_USER_DATA),
Data: make([]byte, fm.sendb.Size())}
fm.sendb.Read(fd.Data)
if fm.compress > 0 && len(fd.Data) > fm.compress {
newb := fm.compressData(fd.Data)
if len(newb) < len(fd.Data) {
fd.Data = newb
fd.Compress = true
}
}
f := &Frame{Type: (int32)(Frame_DATA),
Id: (int32)(fm.sendid),
Data: fd}
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
fm.sendwin.PushBack(f)
loggo.Debug("cut small frame push to send win %d %d %d", f.Id, len(f.Data.Data), fm.sendwin.Len())
}
if fm.sendb.Empty() && fm.close && !fm.closesend && fm.sendwin.Len() < fm.windowsize {
fd := &FrameData{Type: (int32)(FrameData_CLOSE)}
f := &Frame{Type: (int32)(Frame_DATA),
Id: (int32)(fm.sendid),
Data: fd}
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
fm.sendwin.PushBack(f)
fm.closesend = true
loggo.Debug("close frame push to send win %d %d", f.Id, fm.sendwin.Len())
}
}
func (fm *FrameMgr) calSendList() {
cur := time.Now().UnixNano()
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Resend || cur-f.Sendtime > int64(fm.resend_timems*(int)(time.Millisecond)) {
oldsend := fm.sendmap[f.Id]
if cur-oldsend > fm.rttns {
f.Sendtime = cur
fm.sendlist.PushBack(f)
f.Resend = false
fm.sendmap[f.Id] = cur
if fm.openstat > 0 {
fm.fs.sendDataNum++
fm.fs.sendDataNumsMap[f.Id]++
}
loggo.Debug("push frame to sendlist %d %d", f.Id, len(f.Data.Data))
}
}
}
}
func (fm *FrameMgr) getSendList() *list.List {
return fm.sendlist
}
func (fm *FrameMgr) OnRecvFrame(f *Frame) {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
fm.recvlist.PushBack(f)
}
func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int32]*Frame) {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
tmpreq := make(map[int32]int)
tmpack := make(map[int32]int)
tmpackto := make(map[int32]*Frame)
for e := fm.recvlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Type == (int32)(Frame_REQ) {
for _, id := range f.Dataid {
tmpreq[id]++
loggo.Debug("recv req %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
}
} else if f.Type == (int32)(Frame_ACK) {
for _, id := range f.Dataid {
tmpack[id]++
loggo.Debug("recv ack %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
}
} else if f.Type == (int32)(Frame_DATA) {
tmpackto[f.Id] = f
if fm.openstat > 0 {
fm.fs.recvDataNum++
fm.fs.recvDataNumsMap[f.Id]++
}
loggo.Debug("recv data %d %d", f.Id, len(f.Data.Data))
} else if f.Type == (int32)(Frame_PING) {
fm.processPing(f)
} else if f.Type == (int32)(Frame_PONG) {
fm.processPong(f)
} else {
loggo.Error("error frame type %d", f.Type)
}
}
fm.recvlist.Init()
return tmpreq, tmpack, tmpackto
}
func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int, tmpackto map[int32]*Frame) {
for id, num := range tmpreq {
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == id {
f.Resend = true
loggo.Debug("choose resend win %d %d", f.Id, len(f.Data.Data))
break
}
}
if fm.openstat > 0 {
fm.fs.recvReqNum += num
fm.fs.recvReqNumsMap[id] += num
}
}
for id, num := range tmpack {
for e := fm.sendwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == id {
fm.sendwin.Remove(e)
delete(fm.sendmap, f.Id)
loggo.Debug("remove send win %d %d", f.Id, len(f.Data.Data))
break
}
}
if fm.openstat > 0 {
fm.fs.recvAckNum += num
fm.fs.recvAckNumsMap[id] += num
}
}
if len(tmpackto) > 0 {
tmp := make([]int32, len(tmpackto))
index := 0
for id, rf := range tmpackto {
if fm.addToRecvWin(rf) {
tmp[index] = id
index++
if fm.openstat > 0 {
fm.fs.sendAckNum++
fm.fs.sendAckNumsMap[id]++
}
loggo.Debug("add data to win %d %d", rf.Id, len(rf.Data.Data))
}
}
if index > 0 {
f := &Frame{Type: (int32)(Frame_ACK), Resend: false, Sendtime: 0,
Id: 0,
Dataid: tmp[0:index]}
fm.sendlist.PushBack(f)
loggo.Debug("send ack %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
}
}
}
func (fm *FrameMgr) addToRecvWin(rf *Frame) bool {
if !fm.isIdInRange((int)(rf.Id), FRAME_MAX_ID) {
loggo.Debug("recv frame not in range %d %d", rf.Id, fm.recvid)
if fm.isIdOld((int)(rf.Id), FRAME_MAX_ID) {
return true
}
return false
}
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == rf.Id {
loggo.Debug("recv frame ignore %d %d", f.Id, len(f.Data.Data))
return true
}
}
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
loggo.Debug("start insert recv win %d %d %d", fm.recvid, rf.Id, f.Id)
if fm.compareId((int)(rf.Id), (int)(f.Id)) < 0 {
fm.recvwin.InsertBefore(rf, e)
loggo.Debug("insert recv win %d %d before %d", rf.Id, len(rf.Data.Data), f.Id)
return true
}
}
fm.recvwin.PushBack(rf)
loggo.Debug("insert recv win last %d %d", rf.Id, len(rf.Data.Data))
return true
}
func (fm *FrameMgr) processRecvFrame(f *Frame) bool {
if f.Data.Type == (int32)(FrameData_USER_DATA) {
left := fm.recvb.Capacity() - fm.recvb.Size()
if left >= len(f.Data.Data) {
src := f.Data.Data
if f.Data.Compress {
err, old := fm.deCompressData(src)
if err != nil {
loggo.Error("recv frame deCompressData error %d", f.Id)
return false
}
if left < len(old) {
return false
}
loggo.Debug("deCompressData recv frame %d %d %d",
f.Id, len(src), len(old))
src = old
}
fm.recvb.Write(src)
loggo.Debug("combined recv frame to recv buffer %d %d",
f.Id, len(src))
return true
}
return false
} else if f.Data.Type == (int32)(FrameData_CLOSE) {
fm.remoteclosed = true
loggo.Debug("recv remote close frame %d", f.Id)
return true
} else if f.Data.Type == (int32)(FrameData_CONN) {
fm.sendConnectRsp()
fm.connected = true
loggo.Debug("recv remote conn frame %d", f.Id)
return true
} else if f.Data.Type == (int32)(FrameData_CONNRSP) {
fm.connected = true
loggo.Debug("recv remote conn rsp frame %d", f.Id)
return true
} else {
loggo.Error("recv frame type error %d", f.Data.Type)
return false
}
}
func (fm *FrameMgr) combineWindowToRecvBuffer() {
for {
done := false
for e := fm.recvwin.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
if f.Id == (int32)(fm.recvid) {
delete(fm.reqmap, f.Id)
if fm.processRecvFrame(f) {
fm.recvwin.Remove(e)
done = true
loggo.Debug("process recv frame ok %d %d",
f.Id, len(f.Data.Data))
break
}
}
}
if !done {
break
} else {
fm.recvid++
if fm.recvid >= FRAME_MAX_ID {
fm.recvid = 0
}
loggo.Debug("combined ok add recvid %d ", fm.recvid)
}
}
cur := time.Now().UnixNano()
reqtmp := make(map[int]int)
e := fm.recvwin.Front()
id := fm.recvid
for len(reqtmp) < fm.windowsize && len(reqtmp)*4 < FRAME_MAX_SIZE/2 && e != nil {
f := e.Value.(*Frame)
loggo.Debug("start add req id %d %d %d", fm.recvid, f.Id, id)
if f.Id != (int32)(id) {
oldReq := fm.reqmap[f.Id]
if cur-oldReq > fm.rttns {
reqtmp[id]++
fm.reqmap[f.Id] = cur
loggo.Debug("add req id %d ", id)
}
} else {
e = e.Next()
}
id++
if id >= FRAME_MAX_ID {
id = 0
}
}
if len(reqtmp) > 0 {
f := &Frame{Type: (int32)(Frame_REQ), Resend: false, Sendtime: 0,
Id: 0,
Dataid: make([]int32, len(reqtmp))}
index := 0
for id, _ := range reqtmp {
f.Dataid[index] = (int32)(id)
index++
if fm.openstat > 0 {
fm.fs.sendReqNum++
fm.fs.sendReqNumsMap[(int32)(id)]++
}
}
fm.sendlist.PushBack(f)
loggo.Debug("send req %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ","))
}
}
func (fm *FrameMgr) GetRecvBufferSize() int {
return fm.recvb.Size()
}
func (fm *FrameMgr) GetRecvReadLineBuffer() []byte {
ret := fm.recvb.GetReadLineBuffer()
loggo.Debug("GetRecvReadLineBuffer %d %d", fm.recvb.Size(), len(ret))
return ret
}
func (fm *FrameMgr) SkipRecvBuffer(size int) {
fm.recvb.SkipRead(size)
loggo.Debug("SkipRead %d %d", fm.recvb.Size(), size)
}
func (fm *FrameMgr) Close() {
fm.recvlock.Lock()
defer fm.recvlock.Unlock()
fm.close = true
}
func (fm *FrameMgr) IsRemoteClosed() bool {
return fm.remoteclosed
}
func (fm *FrameMgr) ping() {
cur := time.Now().UnixNano()
if cur-fm.lastPingTime > (int64)(time.Second) {
fm.lastPingTime = cur
f := &Frame{Type: (int32)(Frame_PING), Resend: false, Sendtime: cur,
Id: 0}
fm.sendlist.PushBack(f)
loggo.Debug("send ping %d", cur)
if fm.openstat > 0 {
fm.fs.sendping++
}
}
}
func (fm *FrameMgr) processPing(f *Frame) {
rf := &Frame{Type: (int32)(Frame_PONG), Resend: false, Sendtime: f.Sendtime,
Id: 0}
fm.sendlist.PushBack(rf)
if fm.openstat > 0 {
fm.fs.recvping++
fm.fs.sendpong++
}
loggo.Debug("recv ping %d", f.Sendtime)
}
func (fm *FrameMgr) processPong(f *Frame) {
cur := time.Now().UnixNano()
if cur > f.Sendtime {
rtt := cur - f.Sendtime
fm.rttns = (fm.rttns + rtt) / 2
if fm.openstat > 0 {
fm.fs.recvpong++
}
loggo.Debug("recv pong %d %dms", rtt, fm.rttns/1000/1000)
}
}
func (fm *FrameMgr) isIdInRange(id int, maxid int) bool {
begin := fm.recvid
end := fm.recvid + fm.windowsize
if end >= maxid {
if id >= 0 && id < end-maxid {
return true
}
end = maxid
}
if id >= begin && id < end {
return true
}
return false
}
func (fm *FrameMgr) compareId(l int, r int) int {
if l < fm.recvid {
l += FRAME_MAX_ID
}
if r < fm.recvid {
r += FRAME_MAX_ID
}
return l - r
}
func (fm *FrameMgr) isIdOld(id int, maxid int) bool {
if id > fm.recvid {
return false
}
end := fm.recvid + fm.windowsize*2
if end >= maxid {
if id >= end-maxid && id < fm.recvid {
return true
}
} else {
if id < fm.recvid {
return true
}
}
return false
}
func (fm *FrameMgr) IsConnected() bool {
return fm.connected
}
func (fm *FrameMgr) Connect() {
fd := &FrameData{Type: (int32)(FrameData_CONN)}
f := &Frame{Type: (int32)(Frame_DATA),
Id: (int32)(fm.sendid),
Data: fd}
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
fm.sendwin.PushBack(f)
loggo.Debug("start connect")
}
func (fm *FrameMgr) sendConnectRsp() {
fd := &FrameData{Type: (int32)(FrameData_CONNRSP)}
f := &Frame{Type: (int32)(Frame_DATA),
Id: (int32)(fm.sendid),
Data: fd}
fm.sendid++
if fm.sendid >= FRAME_MAX_ID {
fm.sendid = 0
}
fm.sendwin.PushBack(f)
loggo.Debug("send connect rsp")
}
func (fm *FrameMgr) compressData(src []byte) []byte {
var b bytes.Buffer
w := zlib.NewWriter(&b)
w.Write(src)
w.Close()
return b.Bytes()
}
func (fm *FrameMgr) deCompressData(src []byte) (error, []byte) {
b := bytes.NewReader(src)
r, err := zlib.NewReader(b)
if err != nil {
return err, nil
}
var out bytes.Buffer
io.Copy(&out, r)
r.Close()
return nil, out.Bytes()
}
func (fm *FrameMgr) resetStat() {
fm.fs = &FrameStat{}
fm.fs.sendDataNumsMap = make(map[int32]int)
fm.fs.recvDataNumsMap = make(map[int32]int)
fm.fs.sendReqNumsMap = make(map[int32]int)
fm.fs.recvReqNumsMap = make(map[int32]int)
fm.fs.sendAckNumsMap = make(map[int32]int)
fm.fs.recvAckNumsMap = make(map[int32]int)
}
func (fm *FrameMgr) printStat() {
if fm.openstat > 0 {
cur := time.Now().UnixNano()
if cur-fm.lastPrintStat > (int64)(time.Second) {
fm.lastPrintStat = cur
fs := fm.fs
loggo.Info("\nsendDataNum %d\nrecvDataNum %d\nsendReqNum %d\nrecvReqNum %d\nsendAckNum %d\nrecvAckNum %d\n"+
"sendDataNumsMap %s\nrecvDataNumsMap %s\nsendReqNumsMap %s\nrecvReqNumsMap %s\nsendAckNumsMap %s\nrecvAckNumsMap %s\n"+
"sendping %d\nrecvping %d\nsendpong %d\nrecvpong %d\n"+
"sendwin %d\nrecvwin %d\n",
fs.sendDataNum, fs.recvDataNum,
fs.sendReqNum, fs.recvReqNum,
fs.sendAckNum, fs.recvAckNum,
fm.printStatMap(&fs.sendDataNumsMap), fm.printStatMap(&fs.recvDataNumsMap),
fm.printStatMap(&fs.sendReqNumsMap), fm.printStatMap(&fs.recvReqNumsMap),
fm.printStatMap(&fs.sendAckNumsMap), fm.printStatMap(&fs.recvAckNumsMap),
fs.sendping, fs.recvping,
fs.sendpong, fs.recvpong,
fm.sendwin.Len(), fm.recvwin.Len())
fm.resetStat()
}
}
}
func (fm *FrameMgr) printStatMap(m *map[int32]int) string {
tmp := make(map[int]int)
for _, v := range *m {
tmp[v]++
}
max := 0
for k, _ := range tmp {
if k > max {
max = k
}
}
var ret string
for i := 1; i <= max; i++ {
ret += strconv.Itoa(i) + "->" + strconv.Itoa(tmp[i]) + ","
}
if len(ret) <= 0 {
ret = "none"
}
return ret
}

View File

@ -1 +0,0 @@
protoc --go_out=. *.proto

View File

@ -1,10 +1,10 @@
package main package pingtunnel
import ( import (
"flag" "flag"
"fmt" "fmt"
"github.com/esrrhs/go-engine/src/loggo" "github.com/esrrhs/go-engine/src/loggo"
"github.com/esrrhs/pingtunnel" "github.com/esrrhs/go-engine/src/pingtunnel"
"strconv" "strconv"
"time" "time"
) )

441
msg.pb.go
View File

@ -1,441 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: msg.proto
package pingtunnel
import (
fmt "fmt"
proto "github.com/golang/protobuf/proto"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type MyMsg_TYPE int32
const (
MyMsg_DATA MyMsg_TYPE = 0
MyMsg_PING MyMsg_TYPE = 1
MyMsg_KICK MyMsg_TYPE = 2
MyMsg_MAGIC MyMsg_TYPE = 57005
)
var MyMsg_TYPE_name = map[int32]string{
0: "DATA",
1: "PING",
2: "KICK",
57005: "MAGIC",
}
var MyMsg_TYPE_value = map[string]int32{
"DATA": 0,
"PING": 1,
"KICK": 2,
"MAGIC": 57005,
}
func (x MyMsg_TYPE) String() string {
return proto.EnumName(MyMsg_TYPE_name, int32(x))
}
func (MyMsg_TYPE) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_c06e4cca6c2cc899, []int{0, 0}
}
type FrameData_TYPE int32
const (
FrameData_USER_DATA FrameData_TYPE = 0
FrameData_CONN FrameData_TYPE = 1
FrameData_CONNRSP FrameData_TYPE = 2
FrameData_CLOSE FrameData_TYPE = 3
)
var FrameData_TYPE_name = map[int32]string{
0: "USER_DATA",
1: "CONN",
2: "CONNRSP",
3: "CLOSE",
}
var FrameData_TYPE_value = map[string]int32{
"USER_DATA": 0,
"CONN": 1,
"CONNRSP": 2,
"CLOSE": 3,
}
func (x FrameData_TYPE) String() string {
return proto.EnumName(FrameData_TYPE_name, int32(x))
}
func (FrameData_TYPE) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_c06e4cca6c2cc899, []int{1, 0}
}
type Frame_TYPE int32
const (
Frame_DATA Frame_TYPE = 0
Frame_REQ Frame_TYPE = 1
Frame_ACK Frame_TYPE = 2
Frame_PING Frame_TYPE = 3
Frame_PONG Frame_TYPE = 4
)
var Frame_TYPE_name = map[int32]string{
0: "DATA",
1: "REQ",
2: "ACK",
3: "PING",
4: "PONG",
}
var Frame_TYPE_value = map[string]int32{
"DATA": 0,
"REQ": 1,
"ACK": 2,
"PING": 3,
"PONG": 4,
}
func (x Frame_TYPE) String() string {
return proto.EnumName(Frame_TYPE_name, int32(x))
}
func (Frame_TYPE) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_c06e4cca6c2cc899, []int{2, 0}
}
type MyMsg struct {
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Type int32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"`
Target string `protobuf:"bytes,3,opt,name=target,proto3" json:"target,omitempty"`
Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
Rproto int32 `protobuf:"zigzag32,5,opt,name=rproto,proto3" json:"rproto,omitempty"`
Magic int32 `protobuf:"zigzag32,6,opt,name=magic,proto3" json:"magic,omitempty"`
Key int32 `protobuf:"zigzag32,7,opt,name=key,proto3" json:"key,omitempty"`
Timeout int32 `protobuf:"varint,8,opt,name=timeout,proto3" json:"timeout,omitempty"`
Tcpmode int32 `protobuf:"varint,9,opt,name=tcpmode,proto3" json:"tcpmode,omitempty"`
TcpmodeBuffersize int32 `protobuf:"varint,10,opt,name=tcpmode_buffersize,json=tcpmodeBuffersize,proto3" json:"tcpmode_buffersize,omitempty"`
TcpmodeMaxwin int32 `protobuf:"varint,11,opt,name=tcpmode_maxwin,json=tcpmodeMaxwin,proto3" json:"tcpmode_maxwin,omitempty"`
TcpmodeResendTimems int32 `protobuf:"varint,12,opt,name=tcpmode_resend_timems,json=tcpmodeResendTimems,proto3" json:"tcpmode_resend_timems,omitempty"`
TcpmodeCompress int32 `protobuf:"varint,13,opt,name=tcpmode_compress,json=tcpmodeCompress,proto3" json:"tcpmode_compress,omitempty"`
TcpmodeStat int32 `protobuf:"varint,14,opt,name=tcpmode_stat,json=tcpmodeStat,proto3" json:"tcpmode_stat,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *MyMsg) Reset() { *m = MyMsg{} }
func (m *MyMsg) String() string { return proto.CompactTextString(m) }
func (*MyMsg) ProtoMessage() {}
func (*MyMsg) Descriptor() ([]byte, []int) {
return fileDescriptor_c06e4cca6c2cc899, []int{0}
}
func (m *MyMsg) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_MyMsg.Unmarshal(m, b)
}
func (m *MyMsg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_MyMsg.Marshal(b, m, deterministic)
}
func (m *MyMsg) XXX_Merge(src proto.Message) {
xxx_messageInfo_MyMsg.Merge(m, src)
}
func (m *MyMsg) XXX_Size() int {
return xxx_messageInfo_MyMsg.Size(m)
}
func (m *MyMsg) XXX_DiscardUnknown() {
xxx_messageInfo_MyMsg.DiscardUnknown(m)
}
var xxx_messageInfo_MyMsg proto.InternalMessageInfo
func (m *MyMsg) GetId() string {
if m != nil {
return m.Id
}
return ""
}
func (m *MyMsg) GetType() int32 {
if m != nil {
return m.Type
}
return 0
}
func (m *MyMsg) GetTarget() string {
if m != nil {
return m.Target
}
return ""
}
func (m *MyMsg) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func (m *MyMsg) GetRproto() int32 {
if m != nil {
return m.Rproto
}
return 0
}
func (m *MyMsg) GetMagic() int32 {
if m != nil {
return m.Magic
}
return 0
}
func (m *MyMsg) GetKey() int32 {
if m != nil {
return m.Key
}
return 0
}
func (m *MyMsg) GetTimeout() int32 {
if m != nil {
return m.Timeout
}
return 0
}
func (m *MyMsg) GetTcpmode() int32 {
if m != nil {
return m.Tcpmode
}
return 0
}
func (m *MyMsg) GetTcpmodeBuffersize() int32 {
if m != nil {
return m.TcpmodeBuffersize
}
return 0
}
func (m *MyMsg) GetTcpmodeMaxwin() int32 {
if m != nil {
return m.TcpmodeMaxwin
}
return 0
}
func (m *MyMsg) GetTcpmodeResendTimems() int32 {
if m != nil {
return m.TcpmodeResendTimems
}
return 0
}
func (m *MyMsg) GetTcpmodeCompress() int32 {
if m != nil {
return m.TcpmodeCompress
}
return 0
}
func (m *MyMsg) GetTcpmodeStat() int32 {
if m != nil {
return m.TcpmodeStat
}
return 0
}
type FrameData struct {
Type int32 `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
Compress bool `protobuf:"varint,3,opt,name=compress,proto3" json:"compress,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FrameData) Reset() { *m = FrameData{} }
func (m *FrameData) String() string { return proto.CompactTextString(m) }
func (*FrameData) ProtoMessage() {}
func (*FrameData) Descriptor() ([]byte, []int) {
return fileDescriptor_c06e4cca6c2cc899, []int{1}
}
func (m *FrameData) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FrameData.Unmarshal(m, b)
}
func (m *FrameData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_FrameData.Marshal(b, m, deterministic)
}
func (m *FrameData) XXX_Merge(src proto.Message) {
xxx_messageInfo_FrameData.Merge(m, src)
}
func (m *FrameData) XXX_Size() int {
return xxx_messageInfo_FrameData.Size(m)
}
func (m *FrameData) XXX_DiscardUnknown() {
xxx_messageInfo_FrameData.DiscardUnknown(m)
}
var xxx_messageInfo_FrameData proto.InternalMessageInfo
func (m *FrameData) GetType() int32 {
if m != nil {
return m.Type
}
return 0
}
func (m *FrameData) GetData() []byte {
if m != nil {
return m.Data
}
return nil
}
func (m *FrameData) GetCompress() bool {
if m != nil {
return m.Compress
}
return false
}
type Frame struct {
Type int32 `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"`
Resend bool `protobuf:"varint,2,opt,name=resend,proto3" json:"resend,omitempty"`
Sendtime int64 `protobuf:"varint,3,opt,name=sendtime,proto3" json:"sendtime,omitempty"`
Id int32 `protobuf:"varint,4,opt,name=id,proto3" json:"id,omitempty"`
Data *FrameData `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"`
Dataid []int32 `protobuf:"varint,6,rep,packed,name=dataid,proto3" json:"dataid,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Frame) Reset() { *m = Frame{} }
func (m *Frame) String() string { return proto.CompactTextString(m) }
func (*Frame) ProtoMessage() {}
func (*Frame) Descriptor() ([]byte, []int) {
return fileDescriptor_c06e4cca6c2cc899, []int{2}
}
func (m *Frame) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Frame.Unmarshal(m, b)
}
func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Frame.Marshal(b, m, deterministic)
}
func (m *Frame) XXX_Merge(src proto.Message) {
xxx_messageInfo_Frame.Merge(m, src)
}
func (m *Frame) XXX_Size() int {
return xxx_messageInfo_Frame.Size(m)
}
func (m *Frame) XXX_DiscardUnknown() {
xxx_messageInfo_Frame.DiscardUnknown(m)
}
var xxx_messageInfo_Frame proto.InternalMessageInfo
func (m *Frame) GetType() int32 {
if m != nil {
return m.Type
}
return 0
}
func (m *Frame) GetResend() bool {
if m != nil {
return m.Resend
}
return false
}
func (m *Frame) GetSendtime() int64 {
if m != nil {
return m.Sendtime
}
return 0
}
func (m *Frame) GetId() int32 {
if m != nil {
return m.Id
}
return 0
}
func (m *Frame) GetData() *FrameData {
if m != nil {
return m.Data
}
return nil
}
func (m *Frame) GetDataid() []int32 {
if m != nil {
return m.Dataid
}
return nil
}
func init() {
proto.RegisterEnum("MyMsg_TYPE", MyMsg_TYPE_name, MyMsg_TYPE_value)
proto.RegisterEnum("FrameData_TYPE", FrameData_TYPE_name, FrameData_TYPE_value)
proto.RegisterEnum("Frame_TYPE", Frame_TYPE_name, Frame_TYPE_value)
proto.RegisterType((*MyMsg)(nil), "MyMsg")
proto.RegisterType((*FrameData)(nil), "FrameData")
proto.RegisterType((*Frame)(nil), "Frame")
}
func init() { proto.RegisterFile("msg.proto", fileDescriptor_c06e4cca6c2cc899) }
var fileDescriptor_c06e4cca6c2cc899 = []byte{
// 499 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xcb, 0x6e, 0xd3, 0x4c,
0x14, 0xfe, 0xc7, 0x97, 0x24, 0x3e, 0xb9, 0xfc, 0xd3, 0xe1, 0xa2, 0x11, 0x0b, 0x14, 0x2c, 0x21,
0x99, 0x05, 0x95, 0x28, 0x12, 0xac, 0x53, 0x37, 0x44, 0x51, 0xc9, 0x85, 0x49, 0x58, 0xc0, 0x26,
0x72, 0xe3, 0xa9, 0x65, 0x81, 0x2f, 0xb2, 0x27, 0x82, 0xf0, 0x04, 0xbc, 0x0c, 0x8f, 0xc0, 0x33,
0xb0, 0xe3, 0x79, 0xd0, 0x9c, 0x8e, 0xdd, 0x4a, 0xb0, 0xf2, 0x77, 0x4b, 0xfc, 0xf9, 0x9c, 0x03,
0x5e, 0x56, 0x27, 0xa7, 0x65, 0x55, 0xa8, 0xc2, 0xff, 0x6d, 0x83, 0xbb, 0x38, 0x2e, 0xea, 0x84,
0x8d, 0xc0, 0x4a, 0x63, 0x4e, 0xc6, 0x24, 0xf0, 0x84, 0x95, 0xc6, 0x8c, 0x81, 0xa3, 0x8e, 0xa5,
0xe4, 0xd6, 0x98, 0x04, 0xae, 0x40, 0xcc, 0x1e, 0x42, 0x47, 0x45, 0x55, 0x22, 0x15, 0xb7, 0x31,
0x67, 0x98, 0xce, 0xc6, 0x91, 0x8a, 0xb8, 0x33, 0x26, 0xc1, 0x40, 0x20, 0xd6, 0xd9, 0x0a, 0xdf,
0xc1, 0xdd, 0x31, 0x09, 0x4e, 0x84, 0x61, 0xec, 0x3e, 0xb8, 0x59, 0x94, 0xa4, 0x7b, 0xde, 0x41,
0xf9, 0x86, 0x30, 0x0a, 0xf6, 0x27, 0x79, 0xe4, 0x5d, 0xd4, 0x34, 0x64, 0x1c, 0xba, 0x2a, 0xcd,
0x64, 0x71, 0x50, 0xbc, 0x87, 0x15, 0x1a, 0x8a, 0xce, 0xbe, 0xcc, 0x8a, 0x58, 0x72, 0xcf, 0x38,
0x37, 0x94, 0x3d, 0x07, 0x66, 0xe0, 0xee, 0xea, 0x70, 0x7d, 0x2d, 0xab, 0x3a, 0xfd, 0x26, 0x39,
0x60, 0xe8, 0xc4, 0x38, 0xe7, 0xad, 0xc1, 0x9e, 0xc2, 0xa8, 0x89, 0x67, 0xd1, 0xd7, 0x2f, 0x69,
0xce, 0xfb, 0x18, 0x1d, 0x1a, 0x75, 0x81, 0x22, 0x3b, 0x83, 0x07, 0x4d, 0xac, 0x92, 0xb5, 0xcc,
0xe3, 0x9d, 0x6e, 0x92, 0xd5, 0x7c, 0x80, 0xe9, 0x7b, 0xc6, 0x14, 0xe8, 0x6d, 0xd1, 0x62, 0xcf,
0x80, 0x36, 0xbf, 0xd9, 0x17, 0x59, 0x59, 0xc9, 0xba, 0xe6, 0x43, 0x8c, 0xff, 0x6f, 0xf4, 0xd0,
0xc8, 0xec, 0x09, 0x0c, 0x9a, 0x68, 0xad, 0x22, 0xc5, 0x47, 0x18, 0xeb, 0x1b, 0x6d, 0xa3, 0x22,
0xe5, 0xbf, 0x00, 0x67, 0xfb, 0x61, 0x3d, 0x65, 0x3d, 0x70, 0x2e, 0x26, 0xdb, 0x09, 0xfd, 0x4f,
0xa3, 0xf5, 0x7c, 0x39, 0xa3, 0x44, 0xa3, 0xcb, 0x79, 0x78, 0x49, 0x2d, 0xd6, 0x07, 0x77, 0x31,
0x99, 0xcd, 0x43, 0xfa, 0xe3, 0xa7, 0xed, 0x7f, 0x27, 0xe0, 0xbd, 0xa9, 0xa2, 0x4c, 0x5e, 0xe8,
0x65, 0x34, 0xcb, 0x24, 0x77, 0x96, 0xd9, 0x2c, 0xcd, 0xba, 0xb3, 0xb4, 0x47, 0xd0, 0x6b, 0xeb,
0xea, 0x15, 0xf7, 0x44, 0xcb, 0xfd, 0xd7, 0xa6, 0xc4, 0x10, 0xbc, 0xf7, 0x9b, 0xa9, 0xd8, 0xdd,
0x36, 0x09, 0x57, 0xcb, 0x25, 0x25, 0xac, 0x0f, 0x5d, 0x8d, 0xc4, 0x66, 0x4d, 0x2d, 0xe6, 0x81,
0x1b, 0xbe, 0x5d, 0x6d, 0xa6, 0xd4, 0xf6, 0x7f, 0x11, 0x70, 0xb1, 0xca, 0x3f, 0x6b, 0xe8, 0x3b,
0xc1, 0xc9, 0x61, 0x91, 0x9e, 0x30, 0x4c, 0x57, 0xd1, 0x4f, 0x3d, 0x6a, 0xac, 0x62, 0x8b, 0x96,
0x9b, 0x5b, 0x75, 0xf0, 0x5f, 0xf4, 0xad, 0x3e, 0x36, 0x9f, 0xa2, 0x2f, 0xad, 0x7f, 0x06, 0xa7,
0xed, 0x87, 0xdf, 0xde, 0xa2, 0x7e, 0xa6, 0x31, 0xef, 0x8c, 0xed, 0xc0, 0x15, 0x86, 0xf9, 0xaf,
0xfe, 0x9a, 0x6b, 0x17, 0x6c, 0x31, 0x7d, 0x47, 0x89, 0x06, 0x13, 0x9c, 0x6a, 0x33, 0x69, 0x1b,
0xd1, 0x6a, 0x39, 0xa3, 0xce, 0xf9, 0xe0, 0x23, 0x94, 0x69, 0x9e, 0xa8, 0x43, 0x9e, 0xcb, 0xcf,
0x57, 0x1d, 0x3c, 0xec, 0x97, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x15, 0x89, 0x25, 0x68, 0x57,
0x03, 0x00, 0x00,
}

View File

@ -1,55 +0,0 @@
syntax = "proto3";
option go_package = "pingtunnel";
message MyMsg {
enum TYPE {
DATA = 0;
PING = 1;
KICK = 2;
MAGIC = 0xdead;
}
string id = 1;
int32 type = 2;
string target = 3;
bytes data = 4;
sint32 rproto = 5;
sint32 magic = 6;
sint32 key = 7;
int32 timeout = 8;
int32 tcpmode = 9;
int32 tcpmode_buffersize = 10;
int32 tcpmode_maxwin = 11;
int32 tcpmode_resend_timems = 12;
int32 tcpmode_compress = 13;
int32 tcpmode_stat = 14;
}
message FrameData {
enum TYPE {
USER_DATA = 0;
CONN = 1;
CONNRSP = 2;
CLOSE = 3;
}
int32 type = 1;
bytes data = 2;
bool compress = 3;
}
message Frame {
enum TYPE {
DATA = 0;
REQ = 1;
ACK = 2;
PING = 3;
PONG = 4;
}
int32 type = 1;
bool resend = 2;
int64 sendtime = 3;
int32 id = 4;
FrameData data = 5;
repeated int32 dataid = 6;
}

View File

@ -1,149 +0,0 @@
package pingtunnel
import (
"crypto/md5"
"crypto/rand"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"github.com/esrrhs/go-engine/src/loggo"
"github.com/golang/protobuf/proto"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"io"
"net"
"sync"
"syscall"
"time"
)
func sendICMP(id int, sequence int, conn icmp.PacketConn, server *net.IPAddr, target string,
connId string, msgType uint32, data []byte, sproto int, rproto int, key int,
tcpmode int, tcpmode_buffer_size int, tcpmode_maxwin int, tcpmode_resend_time int, tcpmode_compress int, tcpmode_stat int,
timeout int) {
m := &MyMsg{
Id: connId,
Type: (int32)(msgType),
Target: target,
Data: data,
Rproto: (int32)(rproto),
Key: (int32)(key),
Tcpmode: (int32)(tcpmode),
TcpmodeBuffersize: (int32)(tcpmode_buffer_size),
TcpmodeMaxwin: (int32)(tcpmode_maxwin),
TcpmodeResendTimems: (int32)(tcpmode_resend_time),
TcpmodeCompress: (int32)(tcpmode_compress),
TcpmodeStat: (int32)(tcpmode_stat),
Timeout: (int32)(timeout),
Magic: (int32)(MyMsg_MAGIC),
}
mb, err := proto.Marshal(m)
if err != nil {
loggo.Error("sendICMP Marshal MyMsg error %s %s", server.String(), err)
return
}
body := &icmp.Echo{
ID: id,
Seq: sequence,
Data: mb,
}
msg := &icmp.Message{
Type: (ipv4.ICMPType)(sproto),
Code: 0,
Body: body,
}
bytes, err := msg.Marshal(nil)
if err != nil {
loggo.Error("sendICMP Marshal error %s %s", server.String(), err)
return
}
for {
if _, err := conn.WriteTo(bytes, server); err != nil {
if neterr, ok := err.(*net.OpError); ok {
if neterr.Err == syscall.ENOBUFS {
continue
}
}
loggo.Info("sendICMP WriteTo error %s %s", server.String(), err)
}
break
}
return
}
func recvICMP(workResultLock *sync.WaitGroup, exit *bool, conn icmp.PacketConn, recv chan<- *Packet) {
(*workResultLock).Add(1)
defer (*workResultLock).Done()
bytes := make([]byte, 10240)
for !*exit {
conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, srcaddr, err := conn.ReadFrom(bytes)
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("Error read icmp message %s", err)
continue
}
}
if n <= 0 {
continue
}
echoId := int(binary.BigEndian.Uint16(bytes[4:6]))
echoSeq := int(binary.BigEndian.Uint16(bytes[6:8]))
my := &MyMsg{}
err = proto.Unmarshal(bytes[8:n], my)
if err != nil {
loggo.Debug("Unmarshal MyMsg error: %s", err)
continue
}
if my.Magic != (int32)(MyMsg_MAGIC) {
loggo.Debug("processPacket data invalid %s", my.Id)
continue
}
recv <- &Packet{my: my,
src: srcaddr.(*net.IPAddr),
echoId: echoId, echoSeq: echoSeq}
}
}
type Packet struct {
my *MyMsg
src *net.IPAddr
echoId int
echoSeq int
}
func UniqueId() string {
b := make([]byte, 48)
if _, err := io.ReadFull(rand.Reader, b); err != nil {
return ""
}
return GetMd5String(base64.URLEncoding.EncodeToString(b))
}
func GetMd5String(s string) string {
h := md5.New()
h.Write([]byte(s))
return hex.EncodeToString(h.Sum(nil))
}
const (
FRAME_MAX_SIZE int = 888
FRAME_MAX_ID int = 100000
)

View File

@ -1,116 +0,0 @@
package pingtunnel
import (
"fmt"
"github.com/golang/protobuf/proto"
"testing"
)
func Test0001(t *testing.T) {
my := &MyMsg{}
my.Id = "12345"
my.Target = "111:11"
my.Type = 12
my.Data = make([]byte, 0)
dst, _ := proto.Marshal(my)
fmt.Println("dst = ", dst)
my1 := &MyMsg{}
proto.Unmarshal(dst, my1)
fmt.Println("my1 = ", my1)
fmt.Println("my1.Data = ", my1.Data)
proto.Unmarshal(dst[0:4], my1)
fmt.Println("my1 = ", my1)
fm := FrameMgr{}
fm.recvid = 4
fm.windowsize = 100
lr := &Frame{}
rr := &Frame{}
lr.Id = 1
rr.Id = 4
fmt.Println("fm.compareId(lr, rr) = ", fm.compareId((int)(lr.Id), (int)(rr.Id)))
lr.Id = 99
rr.Id = 8
fmt.Println("fm.compareId(lr, rr) = ", fm.compareId((int)(lr.Id), (int)(rr.Id)))
fm.recvid = 9000
lr.Id = 9998
rr.Id = 9999
fmt.Println("fm.compareId(lr, rr) = ", fm.compareId((int)(lr.Id), (int)(rr.Id)))
fm.recvid = 9000
lr.Id = 9998
rr.Id = 8
fmt.Println("fm.compareId(lr, rr) = ", fm.compareId((int)(lr.Id), (int)(rr.Id)))
fm.recvid = 0
lr.Id = 9998
rr.Id = 8
fmt.Println("fm.compareId(lr, rr) = ", fm.compareId((int)(lr.Id), (int)(rr.Id)))
fm.recvid = 0
fm.windowsize = 5
fmt.Println("fm.isIdInRange = ", fm.isIdInRange(4, 10))
fm.recvid = 0
fm.windowsize = 5
fmt.Println("fm.isIdInRange = ", fm.isIdInRange(5, 10))
fm.recvid = 4
fm.windowsize = 5
fmt.Println("fm.isIdInRange = ", fm.isIdInRange(1, 10))
fm.recvid = 7
fm.windowsize = 5
fmt.Println("fm.isIdInRange = ", fm.isIdInRange(1, 10))
fm.recvid = 7
fm.windowsize = 5
fmt.Println("fm.isIdInRange = ", fm.isIdInRange(2, 10))
fm.recvid = 7
fm.windowsize = 5
fmt.Println("fm.isIdInRange = ", fm.isIdInRange(9, 10))
fm.recvid = 10
fm.windowsize = 10000
fmt.Println("fm.isIdInRange = ", fm.isIdInRange(0, FRAME_MAX_ID))
fm.recvid = 7
fm.windowsize = 5
fmt.Println("fm.isIdOld = ", fm.isIdOld(2, 10))
fm.recvid = 7
fm.windowsize = 5
fmt.Println("fm.isIdOld = ", fm.isIdOld(1, 10))
fm.recvid = 3
fm.windowsize = 5
fmt.Println("fm.isIdOld = ", fm.isIdOld(1, 10))
fm.recvid = 13
fm.windowsize = 10000
fmt.Println("fm.isIdOld = ", fm.isIdOld(9, FRAME_MAX_ID))
dd := fm.compressData(([]byte)("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))
fmt.Println("fm.compressData = ", len(dd))
_, ddd := fm.deCompressData(dd)
fmt.Println("fm.deCompressData = ", (string)(ddd))
mm := make(map[int32]int)
mm[1] = 1
mm[2] = 1
mm[3] = 1
mm[4] = 2
mm[6] = 7
mms := fm.printStatMap(&mm)
fmt.Println("fm.printStatMap = ", mms)
fm.openstat = 1
fm.resetStat()
fm.printStat()
}

526
server.go
View File

@ -1,526 +0,0 @@
package pingtunnel
import (
"github.com/esrrhs/go-engine/src/common"
"github.com/esrrhs/go-engine/src/loggo"
"github.com/esrrhs/go-engine/src/threadpool"
"github.com/golang/protobuf/proto"
"golang.org/x/net/icmp"
"net"
"sync"
"time"
)
func NewServer(key int, maxconn int, maxprocessthread int, maxprocessbuffer int) (*Server, error) {
s := &Server{
exit: false,
key: key,
maxconn: maxconn,
}
s.processtp = threadpool.NewThreadPool(maxprocessthread, maxprocessbuffer, func(v interface{}) {
packet := v.(*Packet)
s.processDataPacket(packet)
})
return s, nil
}
type Server struct {
exit bool
key int
interval *time.Ticker
workResultLock sync.WaitGroup
maxconn int
conn *icmp.PacketConn
localConnMap sync.Map
remoteConnErrorMap sync.Map
sendPacket uint64
recvPacket uint64
sendPacketSize uint64
recvPacketSize uint64
localConnMapSize int
echoId int
echoSeq int
processtp *threadpool.ThreadPool
}
type ServerConn struct {
exit bool
timeout int
ipaddrTarget *net.UDPAddr
conn *net.UDPConn
tcpaddrTarget *net.TCPAddr
tcpconn *net.TCPConn
id string
activeRecvTime time.Time
activeSendTime time.Time
close bool
rproto int
fm *FrameMgr
tcpmode int
}
func (p *Server) Run() error {
conn, err := icmp.ListenPacket("ip4:icmp", "")
if err != nil {
loggo.Error("Error listening for ICMP packets: %s", err.Error())
return err
}
p.conn = conn
recv := make(chan *Packet, 10000)
go recvICMP(&p.workResultLock, &p.exit, *p.conn, recv)
p.interval = time.NewTicker(time.Second)
go func() {
p.workResultLock.Add(1)
defer p.workResultLock.Done()
for !p.exit {
select {
case <-p.interval.C:
p.checkTimeoutConn()
p.showNet()
case r := <-recv:
p.processPacket(r)
}
}
}()
return nil
}
func (p *Server) Stop() {
p.exit = true
p.workResultLock.Wait()
p.conn.Close()
p.interval.Stop()
}
func (p *Server) processPacket(packet *Packet) {
if packet.my.Key != (int32)(p.key) {
return
}
p.echoId = packet.echoId
p.echoSeq = packet.echoSeq
if packet.my.Type == (int32)(MyMsg_PING) {
t := time.Time{}
t.UnmarshalBinary(packet.my.Data)
loggo.Info("ping from %s %s %d %d %d", packet.src.String(), t.String(), packet.my.Rproto, packet.echoId, packet.echoSeq)
sendICMP(packet.echoId, packet.echoSeq, *p.conn, packet.src, "", "", (uint32)(MyMsg_PING), packet.my.Data,
(int)(packet.my.Rproto), -1, p.key,
0, 0, 0, 0, 0, 0,
0)
return
}
if packet.my.Type == (int32)(MyMsg_KICK) {
localConn := p.getServerConnById(packet.my.Id)
if localConn != nil {
p.close(localConn)
loggo.Info("remote kick local %s", packet.my.Id)
}
return
}
p.processtp.AddJob((int)(common.HashString(packet.my.Id)), packet)
}
func (p *Server) processDataPacket(packet *Packet) {
loggo.Debug("processPacket %s %s %d", packet.my.Id, packet.src.String(), len(packet.my.Data))
now := time.Now()
id := packet.my.Id
localConn := p.getServerConnById(id)
if localConn == nil {
if p.maxconn > 0 && p.localConnMapSize >= p.maxconn {
loggo.Info("too many connections %d, server connected target fail %s", p.localConnMapSize, packet.my.Target)
p.remoteError(id, packet)
return
}
if packet.my.Tcpmode > 0 {
addr := packet.my.Target
c, err := net.DialTimeout("tcp", addr, time.Second)
if err != nil {
loggo.Error("Error listening for tcp packets: %s", err.Error())
p.remoteError(id, packet)
return
}
targetConn := c.(*net.TCPConn)
ipaddrTarget := targetConn.RemoteAddr().(*net.TCPAddr)
fm := NewFrameMgr((int)(packet.my.TcpmodeBuffersize), (int)(packet.my.TcpmodeMaxwin), (int)(packet.my.TcpmodeResendTimems), (int)(packet.my.TcpmodeCompress),
(int)(packet.my.TcpmodeStat))
localConn = &ServerConn{exit: false, timeout: (int)(packet.my.Timeout), tcpconn: targetConn, tcpaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false,
rproto: (int)(packet.my.Rproto), fm: fm, tcpmode: (int)(packet.my.Tcpmode)}
p.addServerConn(id, localConn)
go p.RecvTCP(localConn, id, packet.src)
} else {
addr := packet.my.Target
c, err := net.DialTimeout("udp", addr, time.Second)
if err != nil {
loggo.Error("Error listening for tcp packets: %s", err.Error())
p.remoteError(id, packet)
return
}
targetConn := c.(*net.UDPConn)
ipaddrTarget := targetConn.RemoteAddr().(*net.UDPAddr)
localConn = &ServerConn{exit: false, timeout: (int)(packet.my.Timeout), conn: targetConn, ipaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false,
rproto: (int)(packet.my.Rproto), tcpmode: (int)(packet.my.Tcpmode)}
p.addServerConn(id, localConn)
go p.Recv(localConn, id, packet.src)
}
}
localConn.activeRecvTime = now
if packet.my.Type == (int32)(MyMsg_DATA) {
if packet.my.Tcpmode > 0 {
f := &Frame{}
err := proto.Unmarshal(packet.my.Data, f)
if err != nil {
loggo.Error("Unmarshal tcp Error %s", err)
return
}
localConn.fm.OnRecvFrame(f)
} else {
_, err := localConn.conn.Write(packet.my.Data)
if err != nil {
loggo.Info("WriteToUDP Error %s", err)
localConn.close = true
return
}
}
p.recvPacket++
p.recvPacketSize += (uint64)(len(packet.my.Data))
}
}
func (p *Server) RecvTCP(conn *ServerConn, id string, src *net.IPAddr) {
p.workResultLock.Add(1)
defer p.workResultLock.Done()
loggo.Info("server waiting target response %s -> %s %s", conn.tcpaddrTarget.String(), conn.id, conn.tcpconn.LocalAddr().String())
loggo.Info("start wait remote connect tcp %s %s", conn.id, conn.tcpaddrTarget.String())
startConnectTime := time.Now()
for !p.exit && !conn.exit {
if conn.fm.IsConnected() {
break
}
conn.fm.Update()
sendlist := conn.fm.getSendList()
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, _ := proto.Marshal(f)
sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb,
conn.rproto, -1, p.key,
0, 0, 0, 0, 0, 0,
0)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
time.Sleep(time.Millisecond * 10)
now := time.Now()
diffclose := now.Sub(startConnectTime)
if diffclose > time.Second*(time.Duration(conn.timeout)) {
loggo.Info("can not connect remote tcp %s %s", conn.id, conn.tcpaddrTarget.String())
p.close(conn)
return
}
}
if !conn.exit {
loggo.Info("remote connected tcp %s %s", conn.id, conn.tcpaddrTarget.String())
}
bytes := make([]byte, 10240)
tcpActiveRecvTime := time.Now()
tcpActiveSendTime := time.Now()
for !p.exit && !conn.exit {
now := time.Now()
sleep := true
left := common.MinOfInt(conn.fm.GetSendBufferLeft(), len(bytes))
if left > 0 {
conn.tcpconn.SetReadDeadline(time.Now().Add(time.Millisecond * 1))
n, err := conn.tcpconn.Read(bytes[0:left])
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("Error read tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err)
conn.fm.Close()
break
}
}
if n > 0 {
sleep = false
conn.fm.WriteSendBuffer(bytes[:n])
tcpActiveRecvTime = now
}
}
conn.fm.Update()
sendlist := conn.fm.getSendList()
if sendlist.Len() > 0 {
sleep = false
conn.activeSendTime = now
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, err := proto.Marshal(f)
if err != nil {
loggo.Error("Error tcp Marshal %s %s %s", conn.id, conn.tcpaddrTarget.String(), err)
continue
}
sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb,
conn.rproto, -1, p.key,
0, 0, 0, 0, 0, 0,
0)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
}
if conn.fm.GetRecvBufferSize() > 0 {
sleep = false
rr := conn.fm.GetRecvReadLineBuffer()
conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 1))
n, err := conn.tcpconn.Write(rr)
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("Error write tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err)
conn.fm.Close()
break
}
}
if n > 0 {
conn.fm.SkipRecvBuffer(n)
tcpActiveSendTime = now
}
}
if sleep {
time.Sleep(time.Millisecond * 10)
}
diffrecv := now.Sub(conn.activeRecvTime)
diffsend := now.Sub(conn.activeSendTime)
tcpdiffrecv := now.Sub(tcpActiveRecvTime)
tcpdiffsend := now.Sub(tcpActiveSendTime)
if diffrecv > time.Second*(time.Duration(conn.timeout)) || diffsend > time.Second*(time.Duration(conn.timeout)) ||
tcpdiffrecv > time.Second*(time.Duration(conn.timeout)) || tcpdiffsend > time.Second*(time.Duration(conn.timeout)) {
loggo.Info("close inactive conn %s %s", conn.id, conn.tcpaddrTarget.String())
conn.fm.Close()
break
}
if conn.fm.IsRemoteClosed() {
loggo.Info("closed by remote conn %s %s", conn.id, conn.tcpaddrTarget.String())
conn.fm.Close()
break
}
}
startCloseTime := time.Now()
for !p.exit && !conn.exit {
now := time.Now()
conn.fm.Update()
sendlist := conn.fm.getSendList()
for e := sendlist.Front(); e != nil; e = e.Next() {
f := e.Value.(*Frame)
mb, _ := proto.Marshal(f)
sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb,
conn.rproto, -1, p.key,
0, 0, 0, 0, 0, 0,
0)
p.sendPacket++
p.sendPacketSize += (uint64)(len(mb))
}
nodatarecv := true
if conn.fm.GetRecvBufferSize() > 0 {
rr := conn.fm.GetRecvReadLineBuffer()
conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100))
n, _ := conn.tcpconn.Write(rr)
if n > 0 {
conn.fm.SkipRecvBuffer(n)
nodatarecv = false
}
}
diffclose := now.Sub(startCloseTime)
timeout := diffclose > time.Second*(time.Duration(conn.timeout))
remoteclosed := conn.fm.IsRemoteClosed()
if timeout {
loggo.Info("close conn had timeout %s %s", conn.id, conn.tcpaddrTarget.String())
break
}
if remoteclosed && nodatarecv {
loggo.Info("remote conn had closed %s %s", conn.id, conn.tcpaddrTarget.String())
break
}
time.Sleep(time.Millisecond * 100)
}
time.Sleep(time.Second)
loggo.Info("close tcp conn %s %s", conn.id, conn.tcpaddrTarget.String())
p.close(conn)
}
func (p *Server) Recv(conn *ServerConn, id string, src *net.IPAddr) {
p.workResultLock.Add(1)
defer p.workResultLock.Done()
loggo.Info("server waiting target response %s -> %s %s", conn.ipaddrTarget.String(), conn.id, conn.conn.LocalAddr().String())
for !p.exit {
bytes := make([]byte, 2000)
conn.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
n, _, err := conn.conn.ReadFromUDP(bytes)
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
loggo.Info("ReadFromUDP Error read udp %s", err)
conn.close = true
return
}
}
now := time.Now()
conn.activeSendTime = now
sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), bytes[:n],
conn.rproto, -1, p.key,
0, 0, 0, 0, 0, 0,
0)
p.sendPacket++
p.sendPacketSize += (uint64)(n)
}
}
func (p *Server) close(conn *ServerConn) {
if p.getServerConnById(conn.id) != nil {
conn.exit = true
if conn.conn != nil {
conn.conn.Close()
}
if conn.tcpconn != nil {
conn.tcpconn.Close()
}
p.deleteServerConn(conn.id)
}
}
func (p *Server) checkTimeoutConn() {
tmp := make(map[string]*ServerConn)
p.localConnMap.Range(func(key, value interface{}) bool {
id := key.(string)
serverConn := value.(*ServerConn)
tmp[id] = serverConn
return true
})
now := time.Now()
for _, conn := range tmp {
if conn.tcpmode > 0 {
continue
}
diffrecv := now.Sub(conn.activeRecvTime)
diffsend := now.Sub(conn.activeSendTime)
if diffrecv > time.Second*(time.Duration(conn.timeout)) || diffsend > time.Second*(time.Duration(conn.timeout)) {
conn.close = true
}
}
for id, conn := range tmp {
if conn.tcpmode > 0 {
continue
}
if conn.close {
loggo.Info("close inactive conn %s %s", id, conn.ipaddrTarget.String())
p.close(conn)
}
}
}
func (p *Server) showNet() {
p.localConnMapSize = 0
p.localConnMap.Range(func(key, value interface{}) bool {
p.localConnMapSize++
return true
})
loggo.Info("send %dPacket/s %dKB/s recv %dPacket/s %dKB/s %dConnections",
p.sendPacket, p.sendPacketSize/1024, p.recvPacket, p.recvPacketSize/1024, p.localConnMapSize)
p.sendPacket = 0
p.recvPacket = 0
p.sendPacketSize = 0
p.recvPacketSize = 0
}
func (p *Server) addServerConn(uuid string, serverConn *ServerConn) {
p.localConnMap.Store(uuid, serverConn)
}
func (p *Server) getServerConnById(uuid string) *ServerConn {
ret, ok := p.localConnMap.Load(uuid)
if !ok {
return nil
}
return ret.(*ServerConn)
}
func (p *Server) deleteServerConn(uuid string) {
p.localConnMap.Delete(uuid)
}
func (p *Server) remoteError(uuid string, packet *Packet) {
sendICMP(packet.echoId, packet.echoSeq, *p.conn, packet.src, "", uuid, (uint32)(MyMsg_KICK), []byte{},
(int)(packet.my.Rproto), -1, p.key,
0, 0, 0, 0, 0, 0,
0)
}

136
sock5.go
View File

@ -1,136 +0,0 @@
package pingtunnel
import (
"encoding/binary"
"errors"
"io"
"net"
"strconv"
"time"
)
var (
errAddrType = errors.New("socks addr type not supported")
errVer = errors.New("socks version not supported")
errMethod = errors.New("socks only support 1 method now")
errAuthExtraData = errors.New("socks authentication get extra data")
errReqExtraData = errors.New("socks request get extra data")
errCmd = errors.New("socks command not supported")
)
const (
socksVer5 = 5
socksCmdConnect = 1
)
func sock5Handshake(conn net.Conn) (err error) {
const (
idVer = 0
idNmethod = 1
)
// version identification and method selection message in theory can have
// at most 256 methods, plus version and nmethod field in total 258 bytes
// the current rfc defines only 3 authentication methods (plus 2 reserved),
// so it won't be such long in practice
buf := make([]byte, 258)
var n int
conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
// make sure we get the nmethod field
if n, err = io.ReadAtLeast(conn, buf, idNmethod+1); err != nil {
return
}
if buf[idVer] != socksVer5 {
return errVer
}
nmethod := int(buf[idNmethod])
msgLen := nmethod + 2
if n == msgLen { // handshake done, common case
// do nothing, jump directly to send confirmation
} else if n < msgLen { // has more methods to read, rare case
if _, err = io.ReadFull(conn, buf[n:msgLen]); err != nil {
return
}
} else { // error, should not get extra data
return errAuthExtraData
}
// send confirmation: version 5, no authentication required
_, err = conn.Write([]byte{socksVer5, 0})
return
}
func sock5GetRequest(conn net.Conn) (rawaddr []byte, host string, err error) {
const (
idVer = 0
idCmd = 1
idType = 3 // address type index
idIP0 = 4 // ip address start index
idDmLen = 4 // domain address length index
idDm0 = 5 // domain address start index
typeIPv4 = 1 // type is ipv4 address
typeDm = 3 // type is domain address
typeIPv6 = 4 // type is ipv6 address
lenIPv4 = 3 + 1 + net.IPv4len + 2 // 3(ver+cmd+rsv) + 1addrType + ipv4 + 2port
lenIPv6 = 3 + 1 + net.IPv6len + 2 // 3(ver+cmd+rsv) + 1addrType + ipv6 + 2port
lenDmBase = 3 + 1 + 1 + 2 // 3 + 1addrType + 1addrLen + 2port, plus addrLen
)
// refer to getRequest in server.go for why set buffer size to 263
buf := make([]byte, 263)
var n int
conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100))
// read till we get possible domain length field
if n, err = io.ReadAtLeast(conn, buf, idDmLen+1); err != nil {
return
}
// check version and cmd
if buf[idVer] != socksVer5 {
err = errVer
return
}
if buf[idCmd] != socksCmdConnect {
err = errCmd
return
}
reqLen := -1
switch buf[idType] {
case typeIPv4:
reqLen = lenIPv4
case typeIPv6:
reqLen = lenIPv6
case typeDm:
reqLen = int(buf[idDmLen]) + lenDmBase
default:
err = errAddrType
return
}
if n == reqLen {
// common case, do nothing
} else if n < reqLen { // rare case
if _, err = io.ReadFull(conn, buf[n:reqLen]); err != nil {
return
}
} else {
err = errReqExtraData
return
}
rawaddr = buf[idType:reqLen]
switch buf[idType] {
case typeIPv4:
host = net.IP(buf[idIP0 : idIP0+net.IPv4len]).String()
case typeIPv6:
host = net.IP(buf[idIP0 : idIP0+net.IPv6len]).String()
case typeDm:
host = string(buf[idDm0 : idDm0+buf[idDmLen]])
}
port := binary.BigEndian.Uint16(buf[reqLen-2 : reqLen])
host = net.JoinHostPort(host, strconv.Itoa(int(port)))
return
}