diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f1c181e --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..aa799de --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM golang AS build-env + +RUN GO111MODULE=off go get -u github.com/esrrhs/pingtunnel +RUN GO111MODULE=off go get -u github.com/esrrhs/pingtunnel/... +RUN GO111MODULE=off go install github.com/esrrhs/pingtunnel + +FROM debian +COPY --from=build-env /go/bin/pingtunnel . +COPY GeoLite2-Country.mmdb . +WORKDIR ./ diff --git a/GeoLite2-Country.mmdb b/GeoLite2-Country.mmdb new file mode 100644 index 0000000..05870f1 Binary files /dev/null and b/GeoLite2-Country.mmdb differ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..b252c99 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2018 zhao xin + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..91f113f --- /dev/null +++ b/README.md @@ -0,0 +1,73 @@ +# Pingtunnel + +[](https://github.com/esrrhs/pingtunnel) +[](https://github.com/esrrhs/pingtunnel) +[![Go Report Card](https://goreportcard.com/badge/github.com/esrrhs/pingtunnel)](https://goreportcard.com/report/github.com/esrrhs/pingtunnel) +[](https://github.com/esrrhs/pingtunnel/releases) +[](https://github.com/esrrhs/pingtunnel/actions) + +Pingtunnel is a tool that send TCP/UDP traffic over ICMP. + +## Note: This tool is only to be used for study and research, do not use it for illegal purposes + +![image](network.jpg) + +## Usage + +### Install server + +- First prepare a server with a public IP, such as EC2 on AWS, assuming the domain name or public IP is www.yourserver.com +- Download the corresponding installation package from [releases](https://github.com/esrrhs/pingtunnel/releases), such as pingtunnel_linux64.zip, then decompress and execute with **root** privileges + +``` +sudo wget (link of latest release) +sudo unzip pingtunnel_linux64.zip +sudo ./pingtunnel -type server +``` + +- (Optional) Disable system default ping + +``` +echo 1 > /proc/sys/net/ipv4/icmp_echo_ignore_all +``` + +### Install the client + +- Download the corresponding installation package from [releases](https://github.com/esrrhs/pingtunnel/releases), such as pingtunnel_windows64.zip, and decompress it +- Then run with **administrator** privileges. The commands corresponding to different forwarding functions are as follows. +- If you see a log of ping pong, the connection is normal + +#### Forward sock5 + +``` +pingtunnel.exe -type client -l: 4455 -s www.yourserver.com -sock5 1 +``` + +#### Forward tcp + +``` +pingtunnel.exe -type client -l: 4455 -s www.yourserver.com -t www.yourserver.com:4455 -tcp 1 +``` + +#### Forward udp + +``` +pingtunnel.exe -type client -l: 4455 -s www.yourserver.com -t www.yourserver.com:4455 +``` + +### Use Docker +It can also be started directly with docker, which is more convenient. Same parameters as above +- server: +``` +docker run --name pingtunnel-server -d --privileged --network host --restart=always esrrhs/pingtunnel ./pingtunnel -type server -key 123456 +``` +- client: +``` +docker run --name pingtunnel-client -d --restart=always -p 1080: 1080 esrrhs/pingtunnel ./pingtunnel -type client -l: 1080 -s www.yourserver.com -sock5 1 -key 123456 +``` + +## Thanks for free JetBrains Open Source license + + + + diff --git a/_config.yml b/_config.yml new file mode 100644 index 0000000..c419263 --- /dev/null +++ b/_config.yml @@ -0,0 +1 @@ +theme: jekyll-theme-cayman \ No newline at end of file diff --git a/client.go b/client.go new file mode 100644 index 0000000..434e49a --- /dev/null +++ b/client.go @@ -0,0 +1,830 @@ +package main + +import ( + "github.com/esrrhs/gohome/common" + "github.com/esrrhs/gohome/frame" + "github.com/esrrhs/gohome/loggo" + "github.com/esrrhs/gohome/network" + "github.com/golang/protobuf/proto" + "golang.org/x/net/icmp" + "io" + "math" + "math/rand" + "net" + "sync" + "time" +) + +const ( + SEND_PROTO int = 8 + RECV_PROTO int = 0 +) + +func NewClient(addr string, server string, target string, timeout int, key int, + tcpmode int, tcpmode_buffersize int, tcpmode_maxwin int, tcpmode_resend_timems int, tcpmode_compress int, + tcpmode_stat int, open_sock5 int, maxconn int, sock5_filter *func(addr string) bool) (*Client, error) { + + var ipaddr *net.UDPAddr + var tcpaddr *net.TCPAddr + var err error + + if tcpmode > 0 { + tcpaddr, err = net.ResolveTCPAddr("tcp", addr) + if err != nil { + return nil, err + } + } else { + ipaddr, err = net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + } + + ipaddrServer, err := net.ResolveIPAddr("ip", server) + if err != nil { + return nil, err + } + + rand.Seed(time.Now().UnixNano()) + return &Client{ + exit: false, + rtt: 0, + id: rand.Intn(math.MaxInt16), + ipaddr: ipaddr, + tcpaddr: tcpaddr, + addr: addr, + ipaddrServer: ipaddrServer, + addrServer: server, + targetAddr: target, + timeout: timeout, + key: key, + tcpmode: tcpmode, + tcpmode_buffersize: tcpmode_buffersize, + tcpmode_maxwin: tcpmode_maxwin, + tcpmode_resend_timems: tcpmode_resend_timems, + tcpmode_compress: tcpmode_compress, + tcpmode_stat: tcpmode_stat, + open_sock5: open_sock5, + maxconn: maxconn, + pongTime: time.Now(), + sock5_filter: sock5_filter, + }, nil +} + +type Client struct { + exit bool + rtt time.Duration + workResultLock sync.WaitGroup + maxconn int + + id int + sequence int + + timeout int + sproto int + rproto int + key int + tcpmode int + tcpmode_buffersize int + tcpmode_maxwin int + tcpmode_resend_timems int + tcpmode_compress int + tcpmode_stat int + + open_sock5 int + sock5_filter *func(addr string) bool + + ipaddr *net.UDPAddr + tcpaddr *net.TCPAddr + addr string + + ipaddrServer *net.IPAddr + addrServer string + + targetAddr string + + conn *icmp.PacketConn + listenConn *net.UDPConn + tcplistenConn *net.TCPListener + + localAddrToConnMap sync.Map + localIdToConnMap sync.Map + + sendPacket uint64 + recvPacket uint64 + sendPacketSize uint64 + recvPacketSize uint64 + localAddrToConnMapSize int + localIdToConnMapSize int + + recvcontrol chan int + + pongTime time.Time +} + +type ClientConn struct { + exit bool + ipaddr *net.UDPAddr + tcpaddr *net.TCPAddr + id string + activeRecvTime time.Time + activeSendTime time.Time + close bool + + fm *frame.FrameMgr +} + +func (p *Client) Addr() string { + return p.addr +} + +func (p *Client) IPAddr() *net.UDPAddr { + return p.ipaddr +} + +func (p *Client) TargetAddr() string { + return p.targetAddr +} + +func (p *Client) ServerIPAddr() *net.IPAddr { + return p.ipaddrServer +} + +func (p *Client) ServerAddr() string { + return p.addrServer +} + +func (p *Client) RTT() time.Duration { + return p.rtt +} + +func (p *Client) RecvPacketSize() uint64 { + return p.recvPacketSize +} + +func (p *Client) SendPacketSize() uint64 { + return p.sendPacketSize +} + +func (p *Client) RecvPacket() uint64 { + return p.recvPacket +} + +func (p *Client) SendPacket() uint64 { + return p.sendPacket +} + +func (p *Client) LocalIdToConnMapSize() int { + return p.localIdToConnMapSize +} + +func (p *Client) LocalAddrToConnMapSize() int { + return p.localAddrToConnMapSize +} + +func (p *Client) Run() error { + + conn, err := icmp.ListenPacket("ip4:icmp", "") + if err != nil { + loggo.Error("Error listening for ICMP packets: %s", err.Error()) + return err + } + p.conn = conn + + if p.tcpmode > 0 { + tcplistenConn, err := net.ListenTCP("tcp", p.tcpaddr) + if err != nil { + loggo.Error("Error listening for tcp packets: %s", err.Error()) + return err + } + p.tcplistenConn = tcplistenConn + } else { + listener, err := net.ListenUDP("udp", p.ipaddr) + if err != nil { + loggo.Error("Error listening for udp packets: %s", err.Error()) + return err + } + p.listenConn = listener + } + + if p.tcpmode > 0 { + go p.AcceptTcp() + } else { + go p.Accept() + } + + recv := make(chan *Packet, 10000) + p.recvcontrol = make(chan int, 1) + go recvICMP(&p.workResultLock, &p.exit, *p.conn, recv) + + go func() { + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + for !p.exit { + p.checkTimeoutConn() + p.ping() + p.showNet() + time.Sleep(time.Second) + } + }() + + go func() { + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + for !p.exit { + p.updateServerAddr() + time.Sleep(time.Second) + } + }() + + go func() { + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + for !p.exit { + select { + case <-p.recvcontrol: + return + case r := <-recv: + p.processPacket(r) + } + } + }() + + return nil +} + +func (p *Client) Stop() { + p.exit = true + p.recvcontrol <- 1 + p.workResultLock.Wait() + p.conn.Close() + if p.tcplistenConn != nil { + p.tcplistenConn.Close() + } + if p.listenConn != nil { + p.listenConn.Close() + } +} + +func (p *Client) AcceptTcp() error { + + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + loggo.Info("client waiting local accept tcp") + + for !p.exit { + p.tcplistenConn.SetDeadline(time.Now().Add(time.Millisecond * 1000)) + + conn, err := p.tcplistenConn.AcceptTCP() + if err != nil { + nerr, ok := err.(net.Error) + if !ok || !nerr.Timeout() { + loggo.Info("Error accept tcp %s", err) + continue + } + } + + if conn != nil { + if p.open_sock5 > 0 { + go p.AcceptSock5Conn(conn) + } else { + go p.AcceptTcpConn(conn, p.targetAddr) + } + } + } + return nil +} + +func (p *Client) AcceptTcpConn(conn *net.TCPConn, targetAddr string) { + + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr) + + if p.maxconn > 0 && p.localIdToConnMapSize >= p.maxconn { + loggo.Info("too many connections %d, client accept new local tcp fail %s", p.localIdToConnMapSize, tcpsrcaddr.String()) + return + } + + uuid := common.UniqueId() + + fm := frame.NewFrameMgr(FRAME_MAX_SIZE, FRAME_MAX_ID, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress, p.tcpmode_stat) + + now := time.Now() + clientConn := &ClientConn{exit: false, tcpaddr: tcpsrcaddr, id: uuid, activeRecvTime: now, activeSendTime: now, close: false, + fm: fm} + p.addClientConn(uuid, tcpsrcaddr.String(), clientConn) + loggo.Info("client accept new local tcp %s %s", uuid, tcpsrcaddr.String()) + + loggo.Info("start connect remote tcp %s %s", uuid, tcpsrcaddr.String()) + clientConn.fm.Connect() + startConnectTime := common.GetNowUpdateInSecond() + for !p.exit && !clientConn.exit { + if clientConn.fm.IsConnected() { + break + } + clientConn.fm.Update() + sendlist := clientConn.fm.GetSendList() + for e := sendlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*frame.Frame) + mb, _ := clientConn.fm.MarshalFrame(f) + p.sequence++ + sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb, + SEND_PROTO, RECV_PROTO, p.key, + p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress, p.tcpmode_stat, + p.timeout) + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) + } + time.Sleep(time.Millisecond * 10) + now := common.GetNowUpdateInSecond() + diffclose := now.Sub(startConnectTime) + if diffclose > time.Second*5 { + loggo.Info("can not connect remote tcp %s %s", uuid, tcpsrcaddr.String()) + p.close(clientConn) + return + } + } + + if !clientConn.exit { + loggo.Info("connected remote tcp %s %s", uuid, tcpsrcaddr.String()) + } + + bytes := make([]byte, 10240) + + tcpActiveRecvTime := common.GetNowUpdateInSecond() + tcpActiveSendTime := common.GetNowUpdateInSecond() + + for !p.exit && !clientConn.exit { + now := common.GetNowUpdateInSecond() + sleep := true + + left := common.MinOfInt(clientConn.fm.GetSendBufferLeft(), len(bytes)) + if left > 0 { + conn.SetReadDeadline(time.Now().Add(time.Millisecond * 1)) + n, err := conn.Read(bytes[0:left]) + if err != nil { + nerr, ok := err.(net.Error) + if !ok || !nerr.Timeout() { + loggo.Info("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err) + clientConn.fm.Close() + break + } + } + if n > 0 { + sleep = false + clientConn.fm.WriteSendBuffer(bytes[:n]) + tcpActiveRecvTime = now + } + } + + clientConn.fm.Update() + + sendlist := clientConn.fm.GetSendList() + if sendlist.Len() > 0 { + sleep = false + clientConn.activeSendTime = now + for e := sendlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*frame.Frame) + mb, err := clientConn.fm.MarshalFrame(f) + if err != nil { + loggo.Error("Error tcp Marshal %s %s %s", uuid, tcpsrcaddr.String(), err) + continue + } + p.sequence++ + sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb, + SEND_PROTO, RECV_PROTO, p.key, + p.tcpmode, 0, 0, 0, 0, 0, + 0) + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) + } + } + + if clientConn.fm.GetRecvBufferSize() > 0 { + sleep = false + rr := clientConn.fm.GetRecvReadLineBuffer() + conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 1)) + n, err := conn.Write(rr) + if err != nil { + nerr, ok := err.(net.Error) + if !ok || !nerr.Timeout() { + loggo.Info("Error write tcp %s %s %s", uuid, tcpsrcaddr.String(), err) + clientConn.fm.Close() + break + } + } + if n > 0 { + clientConn.fm.SkipRecvBuffer(n) + tcpActiveSendTime = now + } + } + + if sleep { + time.Sleep(time.Millisecond * 10) + } + + diffrecv := now.Sub(clientConn.activeRecvTime) + diffsend := now.Sub(clientConn.activeSendTime) + tcpdiffrecv := now.Sub(tcpActiveRecvTime) + tcpdiffsend := now.Sub(tcpActiveSendTime) + if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) || + (tcpdiffrecv > time.Second*(time.Duration(p.timeout)) && tcpdiffsend > time.Second*(time.Duration(p.timeout))) { + loggo.Info("close inactive conn %s %s", clientConn.id, clientConn.tcpaddr.String()) + clientConn.fm.Close() + break + } + + if clientConn.fm.IsRemoteClosed() { + loggo.Info("closed by remote conn %s %s", clientConn.id, clientConn.tcpaddr.String()) + clientConn.fm.Close() + break + } + } + + clientConn.fm.Close() + + startCloseTime := common.GetNowUpdateInSecond() + for !p.exit && !clientConn.exit { + now := common.GetNowUpdateInSecond() + + clientConn.fm.Update() + + sendlist := clientConn.fm.GetSendList() + for e := sendlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*frame.Frame) + mb, _ := clientConn.fm.MarshalFrame(f) + p.sequence++ + sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb, + SEND_PROTO, RECV_PROTO, p.key, + p.tcpmode, 0, 0, 0, 0, 0, + 0) + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) + } + + nodatarecv := true + if clientConn.fm.GetRecvBufferSize() > 0 { + rr := clientConn.fm.GetRecvReadLineBuffer() + conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100)) + n, _ := conn.Write(rr) + if n > 0 { + clientConn.fm.SkipRecvBuffer(n) + nodatarecv = false + } + } + + diffclose := now.Sub(startCloseTime) + if diffclose > time.Second*60 { + loggo.Info("close conn had timeout %s %s", clientConn.id, clientConn.tcpaddr.String()) + break + } + + remoteclosed := clientConn.fm.IsRemoteClosed() + if remoteclosed && nodatarecv { + loggo.Info("remote conn had closed %s %s", clientConn.id, clientConn.tcpaddr.String()) + break + } + + time.Sleep(time.Millisecond * 100) + } + + loggo.Info("close tcp conn %s %s", clientConn.id, clientConn.tcpaddr.String()) + conn.Close() + p.close(clientConn) +} + +func (p *Client) Accept() error { + + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + loggo.Info("client waiting local accept udp") + + bytes := make([]byte, 10240) + + for !p.exit { + p.listenConn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) + n, srcaddr, err := p.listenConn.ReadFromUDP(bytes) + if err != nil { + nerr, ok := err.(net.Error) + if !ok || !nerr.Timeout() { + loggo.Info("Error read udp %s", err) + continue + } + } + if n <= 0 { + continue + } + + now := common.GetNowUpdateInSecond() + clientConn := p.getClientConnByAddr(srcaddr.String()) + if clientConn == nil { + if p.maxconn > 0 && p.localIdToConnMapSize >= p.maxconn { + loggo.Info("too many connections %d, client accept new local udp fail %s", p.localIdToConnMapSize, srcaddr.String()) + continue + } + uuid := common.UniqueId() + clientConn = &ClientConn{exit: false, ipaddr: srcaddr, id: uuid, activeRecvTime: now, activeSendTime: now, close: false} + p.addClientConn(uuid, srcaddr.String(), clientConn) + loggo.Info("client accept new local udp %s %s", uuid, srcaddr.String()) + } + + clientConn.activeSendTime = now + sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), bytes[:n], + SEND_PROTO, RECV_PROTO, p.key, + p.tcpmode, 0, 0, 0, 0, 0, + p.timeout) + + p.sequence++ + + p.sendPacket++ + p.sendPacketSize += (uint64)(n) + } + return nil +} + +func (p *Client) processPacket(packet *Packet) { + + if packet.my.Rproto >= 0 { + return + } + + if packet.my.Key != (int32)(p.key) { + return + } + + if packet.echoId != p.id { + return + } + + if packet.my.Type == (int32)(MyMsg_PING) { + t := time.Time{} + t.UnmarshalBinary(packet.my.Data) + now := time.Now() + d := now.Sub(t) + loggo.Info("pong from %s %s", packet.src.String(), d.String()) + p.rtt = d + p.pongTime = now + return + } + + if packet.my.Type == (int32)(MyMsg_KICK) { + clientConn := p.getClientConnById(packet.my.Id) + if clientConn != nil { + p.close(clientConn) + loggo.Info("remote kick local %s", packet.my.Id) + } + return + } + + loggo.Debug("processPacket %s %s %d", packet.my.Id, packet.src.String(), len(packet.my.Data)) + + clientConn := p.getClientConnById(packet.my.Id) + if clientConn == nil { + loggo.Debug("processPacket no conn %s ", packet.my.Id) + p.remoteError(packet.my.Id) + return + } + + now := common.GetNowUpdateInSecond() + clientConn.activeRecvTime = now + + if p.tcpmode > 0 { + f := &frame.Frame{} + err := proto.Unmarshal(packet.my.Data, f) + if err != nil { + loggo.Error("Unmarshal tcp Error %s", err) + return + } + + clientConn.fm.OnRecvFrame(f) + } else { + if packet.my.Data == nil { + return + } + addr := clientConn.ipaddr + _, err := p.listenConn.WriteToUDP(packet.my.Data, addr) + if err != nil { + loggo.Info("WriteToUDP Error read udp %s", err) + clientConn.close = true + return + } + } + + p.recvPacket++ + p.recvPacketSize += (uint64)(len(packet.my.Data)) +} + +func (p *Client) close(clientConn *ClientConn) { + clientConn.exit = true + p.deleteClientConn(clientConn.id, clientConn.ipaddr.String()) + p.deleteClientConn(clientConn.id, clientConn.tcpaddr.String()) +} + +func (p *Client) checkTimeoutConn() { + + if p.tcpmode > 0 { + return + } + + tmp := make(map[string]*ClientConn) + p.localIdToConnMap.Range(func(key, value interface{}) bool { + id := key.(string) + clientConn := value.(*ClientConn) + tmp[id] = clientConn + return true + }) + + now := common.GetNowUpdateInSecond() + for _, conn := range tmp { + diffrecv := now.Sub(conn.activeRecvTime) + diffsend := now.Sub(conn.activeSendTime) + if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) { + conn.close = true + } + } + + for id, conn := range tmp { + if conn.close { + loggo.Info("close inactive conn %s %s", id, conn.ipaddr.String()) + p.close(conn) + } + } +} + +func (p *Client) ping() { + now := time.Now() + b, _ := now.MarshalBinary() + sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, "", "", (uint32)(MyMsg_PING), b, + SEND_PROTO, RECV_PROTO, p.key, + 0, 0, 0, 0, 0, 0, + 0) + loggo.Info("ping %s %s %d %d %d %d", p.addrServer, now.String(), p.sproto, p.rproto, p.id, p.sequence) + p.sequence++ + if now.Sub(p.pongTime) > time.Second*3 { + p.rtt = 0 + } +} + +func (p *Client) showNet() { + p.localAddrToConnMapSize = 0 + p.localIdToConnMap.Range(func(key, value interface{}) bool { + p.localAddrToConnMapSize++ + return true + }) + p.localIdToConnMapSize = 0 + p.localIdToConnMap.Range(func(key, value interface{}) bool { + p.localIdToConnMapSize++ + return true + }) + loggo.Info("send %dPacket/s %dKB/s recv %dPacket/s %dKB/s %d/%dConnections", + p.sendPacket, p.sendPacketSize/1024, p.recvPacket, p.recvPacketSize/1024, p.localAddrToConnMapSize, p.localIdToConnMapSize) + p.sendPacket = 0 + p.recvPacket = 0 + p.sendPacketSize = 0 + p.recvPacketSize = 0 +} + +func (p *Client) AcceptSock5Conn(conn *net.TCPConn) { + + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + var err error = nil + if err = network.Sock5HandshakeBy(conn, "", ""); err != nil { + loggo.Error("socks handshake: %s", err) + conn.Close() + return + } + _, addr, err := network.Sock5GetRequest(conn) + if err != nil { + loggo.Error("error getting request: %s", err) + conn.Close() + return + } + // Sending connection established message immediately to client. + // This some round trip time for creating socks connection with the client. + // But if connection failed, the client will get connection reset error. + _, err = conn.Write([]byte{0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x08, 0x43}) + if err != nil { + loggo.Error("send connection confirmation: %s", err) + conn.Close() + return + } + + loggo.Info("accept new sock5 conn: %s", addr) + + if p.sock5_filter == nil { + p.AcceptTcpConn(conn, addr) + } else { + if (*p.sock5_filter)(addr) { + p.AcceptTcpConn(conn, addr) + return + } + p.AcceptDirectTcpConn(conn, addr) + } +} + +func (p *Client) addClientConn(uuid string, addr string, clientConn *ClientConn) { + + p.localAddrToConnMap.Store(addr, clientConn) + p.localIdToConnMap.Store(uuid, clientConn) +} + +func (p *Client) getClientConnByAddr(addr string) *ClientConn { + ret, ok := p.localAddrToConnMap.Load(addr) + if !ok { + return nil + } + return ret.(*ClientConn) +} + +func (p *Client) getClientConnById(uuid string) *ClientConn { + ret, ok := p.localIdToConnMap.Load(uuid) + if !ok { + return nil + } + return ret.(*ClientConn) +} + +func (p *Client) deleteClientConn(uuid string, addr string) { + p.localIdToConnMap.Delete(uuid) + p.localAddrToConnMap.Delete(addr) +} + +func (p *Client) remoteError(uuid string) { + sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, "", uuid, (uint32)(MyMsg_KICK), []byte{}, + SEND_PROTO, RECV_PROTO, p.key, + 0, 0, 0, 0, 0, 0, + 0) +} + +func (p *Client) AcceptDirectTcpConn(conn *net.TCPConn, targetAddr string) { + + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr) + + loggo.Info("client accept new direct local tcp %s %s", tcpsrcaddr.String(), targetAddr) + + tcpaddrTarget, err := net.ResolveTCPAddr("tcp", targetAddr) + if err != nil { + loggo.Info("direct local tcp ResolveTCPAddr fail: %s %s", targetAddr, err.Error()) + return + } + + targetconn, err := net.DialTCP("tcp", nil, tcpaddrTarget) + if err != nil { + loggo.Info("direct local tcp DialTCP fail: %s %s", targetAddr, err.Error()) + return + } + + go p.transfer(conn, targetconn, conn.RemoteAddr().String(), targetconn.RemoteAddr().String()) + go p.transfer(targetconn, conn, targetconn.RemoteAddr().String(), conn.RemoteAddr().String()) + + loggo.Info("client accept new direct local tcp ok %s %s", tcpsrcaddr.String(), targetAddr) +} + +func (p *Client) transfer(destination io.WriteCloser, source io.ReadCloser, dst string, src string) { + + defer common.CrashLog() + + defer destination.Close() + defer source.Close() + loggo.Info("client begin transfer from %s -> %s", src, dst) + io.Copy(destination, source) + loggo.Info("client end transfer from %s -> %s", src, dst) +} + +func (p *Client) updateServerAddr() { + ipaddrServer, err := net.ResolveIPAddr("ip", p.addrServer) + if err != nil { + return + } + if p.ipaddrServer.String() != ipaddrServer.String() { + p.ipaddrServer = ipaddrServer + } +} diff --git a/country.go b/country.go new file mode 100644 index 0000000..752b373 --- /dev/null +++ b/country.go @@ -0,0 +1,52 @@ +package main + +import ( + "errors" + "github.com/esrrhs/gohome/common" + "github.com/oschwald/geoip2-golang" + "net" +) + +var gdb *geoip2.Reader + +func LoadGeoDB(file string) error { + + if len(file) <= 0 { + file = common.GetDataDir() + "/geoip/" + "GeoLite2-Country.mmdb" + } + + db, err := geoip2.Open(file) + if err != nil { + return err + } + gdb = db + return nil +} + +func GetCountryIsoCode(ipaddr string) (string, error) { + + ip := net.ParseIP(ipaddr) + if ip == nil { + return "", errors.New("ip " + ipaddr + " ParseIP nil") + } + record, err := gdb.City(ip) + if err != nil { + return "", err + } + + return record.Country.IsoCode, nil +} + +func GetCountryName(ipaddr string) (string, error) { + + ip := net.ParseIP(ipaddr) + if ip == nil { + return "", errors.New("ip " + ipaddr + "ParseIP nil") + } + record, err := gdb.City(ip) + if err != nil { + return "", err + } + + return record.Country.Names["en"], nil +} diff --git a/country_test.go b/country_test.go new file mode 100644 index 0000000..6b7a819 --- /dev/null +++ b/country_test.go @@ -0,0 +1,16 @@ +package main + +import ( + "fmt" + "testing" +) + +func TestNew(t *testing.T) { + Load("./GeoLite2-Country.mmdb") + + fmt.Println(GetCountryIsoCode("39.106.101.133")) + fmt.Println(GetCountryIsoCode("")) + fmt.Println(GetCountryIsoCode("aa")) + fmt.Println(GetCountryIsoCode("39.106.101.133:14234")) + fmt.Println(GetCountryIsoCode("192.168.1.121")) +} diff --git a/docker-compose/.env b/docker-compose/.env new file mode 100644 index 0000000..87ccc82 --- /dev/null +++ b/docker-compose/.env @@ -0,0 +1,2 @@ +KEY=123456 +SERVER=www.yourserver.com \ No newline at end of file diff --git a/docker-compose/Readme.md b/docker-compose/Readme.md new file mode 100644 index 0000000..834b79e --- /dev/null +++ b/docker-compose/Readme.md @@ -0,0 +1,16 @@ +Deploy with docker-compose +=========================== + **First** edit `.env` file in this directory to your appropriate value. + +**Then** run stack with these commands: + +- in the server +``` +docker-compose -f server.yml up -d +``` +- in client machine +``` +docker-compose -f client.yml up -d +``` + +**Now** use socks5 proxy at port `1080` of your client machine \ No newline at end of file diff --git a/docker-compose/client.yml b/docker-compose/client.yml new file mode 100644 index 0000000..4eced79 --- /dev/null +++ b/docker-compose/client.yml @@ -0,0 +1,9 @@ +version: "3.7" + +services: + pingtunnelServer: + image: esrrhs/pingtunnel:latest + restart: always + ports: + - 1080:1080 + command: "./pingtunnel -type client -l 0.0.0.0:1080 -s ${SERVER} -sock5 1 -key ${KEY}" \ No newline at end of file diff --git a/docker-compose/server.yml b/docker-compose/server.yml new file mode 100644 index 0000000..63e6009 --- /dev/null +++ b/docker-compose/server.yml @@ -0,0 +1,8 @@ +version: "3.7" + +services: + pingtunnelServer: + image: esrrhs/pingtunnel:latest + restart: always + network_mode: host + command: "./pingtunnel -type server -key ${KEY}" \ No newline at end of file diff --git a/gen.bat b/gen.bat new file mode 100644 index 0000000..0c05672 --- /dev/null +++ b/gen.bat @@ -0,0 +1 @@ +protoc --go_out=. *.proto \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2830d07 --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module github.com/esrrhs/pingtunnul + +go 1.18 + +require ( + github.com/esrrhs/gohome v0.0.0-20230222132228-8bb1d3e2ecc4 + github.com/golang/protobuf v1.5.2 + github.com/oschwald/geoip2-golang v1.8.0 + golang.org/x/net v0.7.0 +) + +require ( + github.com/OneOfOne/xxhash v1.2.8 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/oschwald/maxminddb-golang v1.10.0 // indirect + golang.org/x/sys v0.5.0 // indirect + google.golang.org/protobuf v1.28.1 // indirect +) diff --git a/main.go b/main.go new file mode 100644 index 0000000..96f623a --- /dev/null +++ b/main.go @@ -0,0 +1,267 @@ +package main + +import ( + "flag" + "fmt" + "github.com/esrrhs/gohome/common" + "github.com/esrrhs/gohome/loggo" + "net" + "net/http" + _ "net/http/pprof" + "strconv" + "time" +) + +var usage = ` + 通过伪造ping,把tcp/udp/sock5流量通过远程服务器转发到目的服务器上。用于突破某些运营商封锁TCP/UDP流量。 + By forging ping, the tcp/udp/sock5 traffic is forwarded to the destination server through the remote server. Used to break certain operators to block TCP/UDP traffic. + +Usage: + + // server + pingtunnel -type server + + // client, Forward udp + pingtunnel -type client -l LOCAL_IP:4455 -s SERVER_IP -t SERVER_IP:4455 + + // client, Forward tcp + pingtunnel -type client -l LOCAL_IP:4455 -s SERVER_IP -t SERVER_IP:4455 -tcp 1 + + // client, Forward sock5, implicitly open tcp, so no target server is needed + pingtunnel -type client -l LOCAL_IP:4455 -s SERVER_IP -sock5 1 + + -type 服务器或者客户端 + client or server + +服务器参数server param: + + -key 设置的密码,默认0 + Set password, default 0 + + -nolog 不写日志文件,只打印标准输出,默认0 + Do not write log files, only print standard output, default 0 is off + + -noprint 不打印屏幕输出,默认0 + Do not print standard output, default 0 is off + + -loglevel 日志文件等级,默认info + log level, default is info + + -maxconn 最大连接数,默认0,不受限制 + the max num of connections, default 0 is no limit + + -maxprt server最大处理线程数,默认100 + max process thread in server, default 100 + + -maxprb server最大处理线程buffer数,默认1000 + max process thread's buffer in server, default 1000 + + -conntt server发起连接到目标地址的超时时间,默认1000ms + The timeout period for the server to initiate a connection to the destination address. The default is 1000ms. + +客户端参数client param: + + -l 本地的地址,发到这个端口的流量将转发到服务器 + Local address, traffic sent to this port will be forwarded to the server + + -s 服务器的地址,流量将通过隧道转发到这个服务器 + The address of the server, the traffic will be forwarded to this server through the tunnel + + -t 远端服务器转发的目的地址,流量将转发到这个地址 + Destination address forwarded by the remote server, traffic will be forwarded to this address + + -timeout 本地记录连接超时的时间,单位是秒,默认60s + The time when the local record connection timed out, in seconds, 60 seconds by default + + -key 设置的密码,默认0 + Set password, default 0 + + -tcp 设置是否转发tcp,默认0 + Set the switch to forward tcp, the default is 0 + + -tcp_bs tcp的发送接收缓冲区大小,默认1MB + Tcp send and receive buffer size, default 1MB + + -tcp_mw tcp的最大窗口,默认20000 + The maximum window of tcp, the default is 20000 + + -tcp_rst tcp的超时发送时间,默认400ms + Tcp timeout resend time, default 400ms + + -tcp_gz 当数据包超过这个大小,tcp将压缩数据,0表示不压缩,默认0 + Tcp will compress data when the packet exceeds this size, 0 means no compression, default 0 + + -tcp_stat 打印tcp的监控,默认0 + Print tcp connection statistic, default 0 is off + + -nolog 不写日志文件,只打印标准输出,默认0 + Do not write log files, only print standard output, default 0 is off + + -noprint 不打印屏幕输出,默认0 + Do not print standard output, default 0 is off + + -loglevel 日志文件等级,默认info + log level, default is info + + -sock5 开启sock5转发,默认0 + Turn on sock5 forwarding, default 0 is off + + -profile 在指定端口开启性能检测,默认0不开启 + Enable performance detection on the specified port. The default 0 is not enabled. + + -s5filter sock5模式设置转发过滤,默认全转发,设置CN代表CN地区的直连不转发 + Set the forwarding filter in the sock5 mode. The default is full forwarding. For example, setting the CN indicates that the Chinese address is not forwarded. + + -s5ftfile sock5模式转发过滤的数据文件,默认读取当前目录的GeoLite2-Country.mmdb + The data file in sock5 filter mode, the default reading of the current directory GeoLite2-Country.mmdb +` + +func main() { + + defer common.CrashLog() + + t := flag.String("type", "", "client or server") + listen := flag.String("l", "", "listen addr") + target := flag.String("t", "", "target addr") + server := flag.String("s", "", "server addr") + timeout := flag.Int("timeout", 60, "conn timeout") + key := flag.Int("key", 0, "key") + tcpmode := flag.Int("tcp", 0, "tcp mode") + tcpmode_buffersize := flag.Int("tcp_bs", 1*1024*1024, "tcp mode buffer size") + tcpmode_maxwin := flag.Int("tcp_mw", 20000, "tcp mode max win") + tcpmode_resend_timems := flag.Int("tcp_rst", 400, "tcp mode resend time ms") + tcpmode_compress := flag.Int("tcp_gz", 0, "tcp data compress") + nolog := flag.Int("nolog", 0, "write log file") + noprint := flag.Int("noprint", 0, "print stdout") + tcpmode_stat := flag.Int("tcp_stat", 0, "print tcp stat") + loglevel := flag.String("loglevel", "info", "log level") + open_sock5 := flag.Int("sock5", 0, "sock5 mode") + maxconn := flag.Int("maxconn", 0, "max num of connections") + max_process_thread := flag.Int("maxprt", 100, "max process thread in server") + max_process_buffer := flag.Int("maxprb", 1000, "max process thread's buffer in server") + profile := flag.Int("profile", 0, "open profile") + conntt := flag.Int("conntt", 1000, "the connect call's timeout") + s5filter := flag.String("s5filter", "", "sock5 filter") + s5ftfile := flag.String("s5ftfile", "GeoLite2-Country.mmdb", "sock5 filter file") + flag.Usage = func() { + fmt.Printf(usage) + } + + flag.Parse() + + if *t != "client" && *t != "server" { + flag.Usage() + return + } + if *t == "client" { + if len(*listen) == 0 || len(*server) == 0 { + flag.Usage() + return + } + if *open_sock5 == 0 && len(*target) == 0 { + flag.Usage() + return + } + if *open_sock5 != 0 { + *tcpmode = 1 + } + } + if *tcpmode_maxwin*10 > FRAME_MAX_ID { + fmt.Println("set tcp win to big, max = " + strconv.Itoa(FRAME_MAX_ID/10)) + return + } + + level := loggo.LEVEL_INFO + if loggo.NameToLevel(*loglevel) >= 0 { + level = loggo.NameToLevel(*loglevel) + } + loggo.Ini(loggo.Config{ + Level: level, + Prefix: "pingtunnel", + MaxDay: 3, + NoLogFile: *nolog > 0, + NoPrint: *noprint > 0, + }) + loggo.Info("start...") + loggo.Info("key %d", *key) + + if *t == "server" { + s, err := NewServer(*key, *maxconn, *max_process_thread, *max_process_buffer, *conntt) + if err != nil { + loggo.Error("ERROR: %s", err.Error()) + return + } + loggo.Info("Server start") + err = s.Run() + if err != nil { + loggo.Error("Run ERROR: %s", err.Error()) + return + } + } else if *t == "client" { + + loggo.Info("type %s", *t) + loggo.Info("listen %s", *listen) + loggo.Info("server %s", *server) + loggo.Info("target %s", *target) + + if *tcpmode == 0 { + *tcpmode_buffersize = 0 + *tcpmode_maxwin = 0 + *tcpmode_resend_timems = 0 + *tcpmode_compress = 0 + *tcpmode_stat = 0 + } + + if len(*s5filter) > 0 { + err := LoadGeoDB(*s5ftfile) + if err != nil { + loggo.Error("Load Sock5 ip file ERROR: %s", err.Error()) + return + } + } + filter := func(addr string) bool { + if len(*s5filter) <= 0 { + return true + } + + taddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return false + } + + ret, err := GetCountryIsoCode(taddr.IP.String()) + if err != nil { + return false + } + if len(ret) <= 0 { + return false + } + return ret != *s5filter + } + + c, err := NewClient(*listen, *server, *target, *timeout, *key, + *tcpmode, *tcpmode_buffersize, *tcpmode_maxwin, *tcpmode_resend_timems, *tcpmode_compress, + *tcpmode_stat, *open_sock5, *maxconn, &filter) + if err != nil { + loggo.Error("ERROR: %s", err.Error()) + return + } + loggo.Info("Client Listen %s (%s) Server %s (%s) TargetPort %s:", c.Addr(), c.IPAddr(), + c.ServerAddr(), c.ServerIPAddr(), c.TargetAddr()) + err = c.Run() + if err != nil { + loggo.Error("Run ERROR: %s", err.Error()) + return + } + } else { + return + } + + if *profile > 0 { + go http.ListenAndServe("0.0.0.0:"+strconv.Itoa(*profile), nil) + } + + for { + time.Sleep(time.Hour) + } +} diff --git a/msg.pb.go b/msg.pb.go new file mode 100644 index 0000000..181635f --- /dev/null +++ b/msg.pb.go @@ -0,0 +1,228 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: msg.proto + +package main + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type MyMsg_TYPE int32 + +const ( + MyMsg_DATA MyMsg_TYPE = 0 + MyMsg_PING MyMsg_TYPE = 1 + MyMsg_KICK MyMsg_TYPE = 2 + MyMsg_MAGIC MyMsg_TYPE = 57005 +) + +var MyMsg_TYPE_name = map[int32]string{ + 0: "DATA", + 1: "PING", + 2: "KICK", + 57005: "MAGIC", +} + +var MyMsg_TYPE_value = map[string]int32{ + "DATA": 0, + "PING": 1, + "KICK": 2, + "MAGIC": 57005, +} + +func (x MyMsg_TYPE) String() string { + return proto.EnumName(MyMsg_TYPE_name, int32(x)) +} + +func (MyMsg_TYPE) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_c06e4cca6c2cc899, []int{0, 0} +} + +type MyMsg struct { + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Type int32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"` + Target string `protobuf:"bytes,3,opt,name=target,proto3" json:"target,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` + Rproto int32 `protobuf:"zigzag32,5,opt,name=rproto,proto3" json:"rproto,omitempty"` + Magic int32 `protobuf:"zigzag32,6,opt,name=magic,proto3" json:"magic,omitempty"` + Key int32 `protobuf:"zigzag32,7,opt,name=key,proto3" json:"key,omitempty"` + Timeout int32 `protobuf:"varint,8,opt,name=timeout,proto3" json:"timeout,omitempty"` + Tcpmode int32 `protobuf:"varint,9,opt,name=tcpmode,proto3" json:"tcpmode,omitempty"` + TcpmodeBuffersize int32 `protobuf:"varint,10,opt,name=tcpmode_buffersize,json=tcpmodeBuffersize,proto3" json:"tcpmode_buffersize,omitempty"` + TcpmodeMaxwin int32 `protobuf:"varint,11,opt,name=tcpmode_maxwin,json=tcpmodeMaxwin,proto3" json:"tcpmode_maxwin,omitempty"` + TcpmodeResendTimems int32 `protobuf:"varint,12,opt,name=tcpmode_resend_timems,json=tcpmodeResendTimems,proto3" json:"tcpmode_resend_timems,omitempty"` + TcpmodeCompress int32 `protobuf:"varint,13,opt,name=tcpmode_compress,json=tcpmodeCompress,proto3" json:"tcpmode_compress,omitempty"` + TcpmodeStat int32 `protobuf:"varint,14,opt,name=tcpmode_stat,json=tcpmodeStat,proto3" json:"tcpmode_stat,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MyMsg) Reset() { *m = MyMsg{} } +func (m *MyMsg) String() string { return proto.CompactTextString(m) } +func (*MyMsg) ProtoMessage() {} +func (*MyMsg) Descriptor() ([]byte, []int) { + return fileDescriptor_c06e4cca6c2cc899, []int{0} +} + +func (m *MyMsg) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_MyMsg.Unmarshal(m, b) +} +func (m *MyMsg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_MyMsg.Marshal(b, m, deterministic) +} +func (m *MyMsg) XXX_Merge(src proto.Message) { + xxx_messageInfo_MyMsg.Merge(m, src) +} +func (m *MyMsg) XXX_Size() int { + return xxx_messageInfo_MyMsg.Size(m) +} +func (m *MyMsg) XXX_DiscardUnknown() { + xxx_messageInfo_MyMsg.DiscardUnknown(m) +} + +var xxx_messageInfo_MyMsg proto.InternalMessageInfo + +func (m *MyMsg) GetId() string { + if m != nil { + return m.Id + } + return "" +} + +func (m *MyMsg) GetType() int32 { + if m != nil { + return m.Type + } + return 0 +} + +func (m *MyMsg) GetTarget() string { + if m != nil { + return m.Target + } + return "" +} + +func (m *MyMsg) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *MyMsg) GetRproto() int32 { + if m != nil { + return m.Rproto + } + return 0 +} + +func (m *MyMsg) GetMagic() int32 { + if m != nil { + return m.Magic + } + return 0 +} + +func (m *MyMsg) GetKey() int32 { + if m != nil { + return m.Key + } + return 0 +} + +func (m *MyMsg) GetTimeout() int32 { + if m != nil { + return m.Timeout + } + return 0 +} + +func (m *MyMsg) GetTcpmode() int32 { + if m != nil { + return m.Tcpmode + } + return 0 +} + +func (m *MyMsg) GetTcpmodeBuffersize() int32 { + if m != nil { + return m.TcpmodeBuffersize + } + return 0 +} + +func (m *MyMsg) GetTcpmodeMaxwin() int32 { + if m != nil { + return m.TcpmodeMaxwin + } + return 0 +} + +func (m *MyMsg) GetTcpmodeResendTimems() int32 { + if m != nil { + return m.TcpmodeResendTimems + } + return 0 +} + +func (m *MyMsg) GetTcpmodeCompress() int32 { + if m != nil { + return m.TcpmodeCompress + } + return 0 +} + +func (m *MyMsg) GetTcpmodeStat() int32 { + if m != nil { + return m.TcpmodeStat + } + return 0 +} + +func init() { + proto.RegisterEnum("MyMsg_TYPE", MyMsg_TYPE_name, MyMsg_TYPE_value) + proto.RegisterType((*MyMsg)(nil), "MyMsg") +} + +func init() { proto.RegisterFile("msg.proto", fileDescriptor_c06e4cca6c2cc899) } + +var fileDescriptor_c06e4cca6c2cc899 = []byte{ + // 342 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x3c, 0x90, 0xdb, 0x6a, 0xe2, 0x50, + 0x14, 0x86, 0x27, 0x27, 0x0f, 0xcb, 0xe8, 0xc4, 0x35, 0x07, 0xd6, 0x65, 0x46, 0x18, 0xc8, 0x5c, + 0xcc, 0xc0, 0xb4, 0x4f, 0xa0, 0xb6, 0x88, 0x48, 0x8a, 0xa4, 0xde, 0xb4, 0x37, 0x12, 0xcd, 0x36, + 0x84, 0x36, 0x07, 0xb2, 0xb7, 0xb4, 0xf6, 0x9d, 0xfa, 0x08, 0x7d, 0x8d, 0x3e, 0x4f, 0xc9, 0x72, + 0xa7, 0x77, 0xff, 0xff, 0x7f, 0x5f, 0xc8, 0x62, 0x43, 0x3f, 0x97, 0xe9, 0xbf, 0xaa, 0x2e, 0x55, + 0x39, 0x79, 0xb7, 0xc0, 0x09, 0x4f, 0xa1, 0x4c, 0x71, 0x04, 0x66, 0x96, 0x90, 0xe1, 0x1b, 0x41, + 0x3f, 0x32, 0xb3, 0x04, 0x11, 0x6c, 0x75, 0xaa, 0x04, 0x99, 0xbe, 0x11, 0x38, 0x11, 0x67, 0xfc, + 0x09, 0x1d, 0x15, 0xd7, 0xa9, 0x50, 0x64, 0xb1, 0xa7, 0x5b, 0xe3, 0x26, 0xb1, 0x8a, 0xc9, 0xf6, + 0x8d, 0xc0, 0x8d, 0x38, 0x37, 0x6e, 0xcd, 0xff, 0x20, 0xc7, 0x37, 0x82, 0x71, 0xa4, 0x1b, 0x7e, + 0x07, 0x27, 0x8f, 0xd3, 0x6c, 0x4f, 0x1d, 0x9e, 0xcf, 0x05, 0x3d, 0xb0, 0x1e, 0xc4, 0x89, 0xba, + 0xbc, 0x35, 0x11, 0x09, 0xba, 0x2a, 0xcb, 0x45, 0x79, 0x54, 0xd4, 0xe3, 0x13, 0xda, 0xca, 0x64, + 0x5f, 0xe5, 0x65, 0x22, 0xa8, 0xaf, 0xc9, 0xb9, 0xe2, 0x5f, 0x40, 0x1d, 0xb7, 0xbb, 0xe3, 0xe1, + 0x20, 0x6a, 0x99, 0xbd, 0x08, 0x02, 0x96, 0xc6, 0x9a, 0xcc, 0x3e, 0x01, 0xfe, 0x86, 0x51, 0xab, + 0xe7, 0xf1, 0xf3, 0x53, 0x56, 0xd0, 0x80, 0xd5, 0xa1, 0x5e, 0x43, 0x1e, 0xf1, 0x02, 0x7e, 0xb4, + 0x5a, 0x2d, 0xa4, 0x28, 0x92, 0x6d, 0x73, 0x49, 0x2e, 0xc9, 0x65, 0xfb, 0x9b, 0x86, 0x11, 0xb3, + 0x0d, 0x23, 0xfc, 0x03, 0x5e, 0xfb, 0xcd, 0xbe, 0xcc, 0xab, 0x5a, 0x48, 0x49, 0x43, 0xd6, 0xbf, + 0xea, 0x7d, 0xae, 0x67, 0xfc, 0x05, 0x6e, 0xab, 0x4a, 0x15, 0x2b, 0x1a, 0xb1, 0x36, 0xd0, 0xdb, + 0xad, 0x8a, 0xd5, 0xe4, 0x3f, 0xd8, 0x9b, 0xbb, 0xf5, 0x35, 0xf6, 0xc0, 0xbe, 0x9a, 0x6e, 0xa6, + 0xde, 0x97, 0x26, 0xad, 0x97, 0x37, 0x0b, 0xcf, 0x68, 0xd2, 0x6a, 0x39, 0x5f, 0x79, 0x26, 0x0e, + 0xc0, 0x09, 0xa7, 0x8b, 0xe5, 0xdc, 0x7b, 0x7d, 0xb3, 0x66, 0xee, 0x3d, 0x54, 0x59, 0x91, 0xaa, + 0x63, 0x51, 0x88, 0xc7, 0x5d, 0x87, 0xdf, 0xfe, 0xf2, 0x23, 0x00, 0x00, 0xff, 0xff, 0x59, 0xbc, + 0x55, 0x76, 0xfa, 0x01, 0x00, 0x00, +} diff --git a/msg.proto b/msg.proto new file mode 100644 index 0000000..52f4530 --- /dev/null +++ b/msg.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; +option go_package = "main"; + +message MyMsg { + enum TYPE { + DATA = 0; + PING = 1; + KICK = 2; + MAGIC = 0xdead; + } + + string id = 1; + int32 type = 2; + string target = 3; + bytes data = 4; + sint32 rproto = 5; + sint32 magic = 6; + sint32 key = 7; + int32 timeout = 8; + int32 tcpmode = 9; + int32 tcpmode_buffersize = 10; + int32 tcpmode_maxwin = 11; + int32 tcpmode_resend_timems = 12; + int32 tcpmode_compress = 13; + int32 tcpmode_stat = 14; +} diff --git a/network.jpg b/network.jpg new file mode 100644 index 0000000..de09b5a Binary files /dev/null and b/network.jpg differ diff --git a/pack.sh b/pack.sh new file mode 100644 index 0000000..6b386f1 --- /dev/null +++ b/pack.sh @@ -0,0 +1,64 @@ +#! /bin/bash +#set -x +NAME="pingtunnel" + +export GO111MODULE=off + +#go tool dist list +build_list=$(go tool dist list) + +rm pack -rf +rm pack.zip -f +mkdir pack + +go get -u -v github.com/esrrhs/pingtunnel/... +last=`pwd` +cd $GOPATH/src/golang.org/x +for dir in `ls`; do + cd $dir + git pull + cd .. +done +cd $last + +for line in $build_list; do + os=$(echo "$line" | awk -F"/" '{print $1}') + arch=$(echo "$line" | awk -F"/" '{print $2}') + echo "os="$os" arch="$arch" start build" + if [ $os == "android" ]; then + continue + fi + if [ $os == "ios" ]; then + continue + fi + if [ $arch == "wasm" ]; then + continue + fi + CGO_ENABLED=0 GOOS=$os GOARCH=$arch go build -ldflags="-s -w" + if [ $? -ne 0 ]; then + echo "os="$os" arch="$arch" build fail" + exit 1 + fi + if [ $os = "windows" ]; then + zip ${NAME}_"${os}"_"${arch}"".zip" $NAME".exe" + if [ $? -ne 0 ]; then + echo "os="$os" arch="$arch" zip fail" + exit 1 + fi + mv ${NAME}_"${os}"_"${arch}"".zip" pack/ + rm $NAME".exe" -f + else + zip ${NAME}_"${os}"_"${arch}"".zip" $NAME + if [ $? -ne 0 ]; then + echo "os="$os" arch="$arch" zip fail" + exit 1 + fi + mv ${NAME}_"${os}"_"${arch}"".zip" pack/ + rm $NAME -f + fi + echo "os="$os" arch="$arch" done build" +done + +zip pack.zip pack/ -r + +echo "all done" diff --git a/pingtunnel.go b/pingtunnel.go new file mode 100644 index 0000000..ab6a1d2 --- /dev/null +++ b/pingtunnel.go @@ -0,0 +1,119 @@ +package main + +import ( + "encoding/binary" + "github.com/esrrhs/gohome/common" + "github.com/esrrhs/gohome/loggo" + "github.com/golang/protobuf/proto" + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "net" + "sync" + "time" +) + +func sendICMP(id int, sequence int, conn icmp.PacketConn, server *net.IPAddr, target string, + connId string, msgType uint32, data []byte, sproto int, rproto int, key int, + tcpmode int, tcpmode_buffer_size int, tcpmode_maxwin int, tcpmode_resend_time int, tcpmode_compress int, tcpmode_stat int, + timeout int) { + + m := &MyMsg{ + Id: connId, + Type: (int32)(msgType), + Target: target, + Data: data, + Rproto: (int32)(rproto), + Key: (int32)(key), + Tcpmode: (int32)(tcpmode), + TcpmodeBuffersize: (int32)(tcpmode_buffer_size), + TcpmodeMaxwin: (int32)(tcpmode_maxwin), + TcpmodeResendTimems: (int32)(tcpmode_resend_time), + TcpmodeCompress: (int32)(tcpmode_compress), + TcpmodeStat: (int32)(tcpmode_stat), + Timeout: (int32)(timeout), + Magic: (int32)(MyMsg_MAGIC), + } + + mb, err := proto.Marshal(m) + if err != nil { + loggo.Error("sendICMP Marshal MyMsg error %s %s", server.String(), err) + return + } + + body := &icmp.Echo{ + ID: id, + Seq: sequence, + Data: mb, + } + + msg := &icmp.Message{ + Type: (ipv4.ICMPType)(sproto), + Code: 0, + Body: body, + } + + bytes, err := msg.Marshal(nil) + if err != nil { + loggo.Error("sendICMP Marshal error %s %s", server.String(), err) + return + } + + conn.WriteTo(bytes, server) +} + +func recvICMP(workResultLock *sync.WaitGroup, exit *bool, conn icmp.PacketConn, recv chan<- *Packet) { + + defer common.CrashLog() + + (*workResultLock).Add(1) + defer (*workResultLock).Done() + + bytes := make([]byte, 10240) + for !*exit { + conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) + n, srcaddr, err := conn.ReadFrom(bytes) + + if err != nil { + nerr, ok := err.(net.Error) + if !ok || !nerr.Timeout() { + loggo.Info("Error read icmp message %s", err) + continue + } + } + + if n <= 0 { + continue + } + + echoId := int(binary.BigEndian.Uint16(bytes[4:6])) + echoSeq := int(binary.BigEndian.Uint16(bytes[6:8])) + + my := &MyMsg{} + err = proto.Unmarshal(bytes[8:n], my) + if err != nil { + loggo.Debug("Unmarshal MyMsg error: %s", err) + continue + } + + if my.Magic != (int32)(MyMsg_MAGIC) { + loggo.Debug("processPacket data invalid %s", my.Id) + continue + } + + recv <- &Packet{my: my, + src: srcaddr.(*net.IPAddr), + echoId: echoId, echoSeq: echoSeq} + } +} + +type Packet struct { + my *MyMsg + src *net.IPAddr + echoId int + echoSeq int +} + +const ( + FRAME_MAX_SIZE int = 888 + FRAME_MAX_ID int = 1000000 +) diff --git a/pingtunnel_test.go b/pingtunnel_test.go new file mode 100644 index 0000000..ad2ff3f --- /dev/null +++ b/pingtunnel_test.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "github.com/golang/protobuf/proto" + "testing" +) + +func Test0001(t *testing.T) { + + my := &MyMsg{} + my.Id = "12345" + my.Target = "111:11" + my.Type = 12 + my.Data = make([]byte, 0) + dst, _ := proto.Marshal(my) + fmt.Println("dst = ", dst) + + my1 := &MyMsg{} + proto.Unmarshal(dst, my1) + fmt.Println("my1 = ", my1) + fmt.Println("my1.Data = ", my1.Data) + + proto.Unmarshal(dst[0:4], my1) + fmt.Println("my1 = ", my1) + +} diff --git a/server.go b/server.go new file mode 100644 index 0000000..252c0f5 --- /dev/null +++ b/server.go @@ -0,0 +1,614 @@ +package main + +import ( + "github.com/esrrhs/gohome/common" + "github.com/esrrhs/gohome/frame" + "github.com/esrrhs/gohome/loggo" + "github.com/esrrhs/gohome/threadpool" + "github.com/golang/protobuf/proto" + "golang.org/x/net/icmp" + "net" + "sync" + "time" +) + +func NewServer(key int, maxconn int, maxprocessthread int, maxprocessbuffer int, connecttmeout int) (*Server, error) { + s := &Server{ + exit: false, + key: key, + maxconn: maxconn, + maxprocessthread: maxprocessthread, + maxprocessbuffer: maxprocessbuffer, + connecttmeout: connecttmeout, + } + + if maxprocessthread > 0 { + s.processtp = threadpool.NewThreadPool(maxprocessthread, maxprocessbuffer, func(v interface{}) { + packet := v.(*Packet) + s.processDataPacket(packet) + }) + } + + return s, nil +} + +type Server struct { + exit bool + key int + workResultLock sync.WaitGroup + maxconn int + maxprocessthread int + maxprocessbuffer int + connecttmeout int + + conn *icmp.PacketConn + + localConnMap sync.Map + connErrorMap sync.Map + + sendPacket uint64 + recvPacket uint64 + sendPacketSize uint64 + recvPacketSize uint64 + localConnMapSize int + + processtp *threadpool.ThreadPool + recvcontrol chan int +} + +type ServerConn struct { + exit bool + timeout int + ipaddrTarget *net.UDPAddr + conn *net.UDPConn + tcpaddrTarget *net.TCPAddr + tcpconn *net.TCPConn + id string + activeRecvTime time.Time + activeSendTime time.Time + close bool + rproto int + fm *frame.FrameMgr + tcpmode int + echoId int + echoSeq int +} + +func (p *Server) Run() error { + + conn, err := icmp.ListenPacket("ip4:icmp", "") + if err != nil { + loggo.Error("Error listening for ICMP packets: %s", err.Error()) + return err + } + p.conn = conn + + recv := make(chan *Packet, 10000) + p.recvcontrol = make(chan int, 1) + go recvICMP(&p.workResultLock, &p.exit, *p.conn, recv) + + go func() { + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + for !p.exit { + p.checkTimeoutConn() + p.showNet() + p.updateConnError() + time.Sleep(time.Second) + } + }() + + go func() { + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + for !p.exit { + select { + case <-p.recvcontrol: + return + case r := <-recv: + p.processPacket(r) + } + } + }() + + return nil +} + +func (p *Server) Stop() { + p.exit = true + p.recvcontrol <- 1 + p.workResultLock.Wait() + p.processtp.Stop() + p.conn.Close() +} + +func (p *Server) processPacket(packet *Packet) { + + if packet.my.Key != (int32)(p.key) { + return + } + + if packet.my.Type == (int32)(MyMsg_PING) { + t := time.Time{} + t.UnmarshalBinary(packet.my.Data) + loggo.Info("ping from %s %s %d %d %d", packet.src.String(), t.String(), packet.my.Rproto, packet.echoId, packet.echoSeq) + sendICMP(packet.echoId, packet.echoSeq, *p.conn, packet.src, "", "", (uint32)(MyMsg_PING), packet.my.Data, + (int)(packet.my.Rproto), -1, p.key, + 0, 0, 0, 0, 0, 0, + 0) + return + } + + if packet.my.Type == (int32)(MyMsg_KICK) { + localConn := p.getServerConnById(packet.my.Id) + if localConn != nil { + p.close(localConn) + loggo.Info("remote kick local %s", packet.my.Id) + } + return + } + + if p.maxprocessthread > 0 { + p.processtp.AddJob((int)(common.HashString(packet.my.Id)), packet) + } else { + p.processDataPacket(packet) + } +} + +func (p *Server) processDataPacketNewConn(id string, packet *Packet) *ServerConn { + + now := common.GetNowUpdateInSecond() + + loggo.Info("start add new connect %s %s", id, packet.my.Target) + + if p.maxconn > 0 && p.localConnMapSize >= p.maxconn { + loggo.Info("too many connections %d, server connected target fail %s", p.localConnMapSize, packet.my.Target) + p.remoteError(packet.echoId, packet.echoSeq, id, (int)(packet.my.Rproto), packet.src) + return nil + } + + addr := packet.my.Target + if p.isConnError(addr) { + loggo.Info("addr connect Error before: %s %s", id, addr) + p.remoteError(packet.echoId, packet.echoSeq, id, (int)(packet.my.Rproto), packet.src) + return nil + } + + if packet.my.Tcpmode > 0 { + + c, err := net.DialTimeout("tcp", addr, time.Millisecond*time.Duration(p.connecttmeout)) + if err != nil { + loggo.Error("Error listening for tcp packets: %s %s", id, err.Error()) + p.remoteError(packet.echoId, packet.echoSeq, id, (int)(packet.my.Rproto), packet.src) + p.addConnError(addr) + return nil + } + targetConn := c.(*net.TCPConn) + ipaddrTarget := targetConn.RemoteAddr().(*net.TCPAddr) + + fm := frame.NewFrameMgr(FRAME_MAX_SIZE, FRAME_MAX_ID, (int)(packet.my.TcpmodeBuffersize), (int)(packet.my.TcpmodeMaxwin), (int)(packet.my.TcpmodeResendTimems), (int)(packet.my.TcpmodeCompress), + (int)(packet.my.TcpmodeStat)) + + localConn := &ServerConn{exit: false, timeout: (int)(packet.my.Timeout), tcpconn: targetConn, tcpaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false, + rproto: (int)(packet.my.Rproto), fm: fm, tcpmode: (int)(packet.my.Tcpmode)} + + p.addServerConn(id, localConn) + + go p.RecvTCP(localConn, id, packet.src) + return localConn + + } else { + + c, err := net.DialTimeout("udp", addr, time.Millisecond*time.Duration(p.connecttmeout)) + if err != nil { + loggo.Error("Error listening for udp packets: %s %s", id, err.Error()) + p.remoteError(packet.echoId, packet.echoSeq, id, (int)(packet.my.Rproto), packet.src) + p.addConnError(addr) + return nil + } + targetConn := c.(*net.UDPConn) + ipaddrTarget := targetConn.RemoteAddr().(*net.UDPAddr) + + localConn := &ServerConn{exit: false, timeout: (int)(packet.my.Timeout), conn: targetConn, ipaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false, + rproto: (int)(packet.my.Rproto), tcpmode: (int)(packet.my.Tcpmode)} + + p.addServerConn(id, localConn) + + go p.Recv(localConn, id, packet.src) + + return localConn + } + + return nil +} + +func (p *Server) processDataPacket(packet *Packet) { + + loggo.Debug("processPacket %s %s %d", packet.my.Id, packet.src.String(), len(packet.my.Data)) + + now := common.GetNowUpdateInSecond() + + id := packet.my.Id + localConn := p.getServerConnById(id) + if localConn == nil { + localConn = p.processDataPacketNewConn(id, packet) + if localConn == nil { + return + } + } + + localConn.activeRecvTime = now + localConn.echoId = packet.echoId + localConn.echoSeq = packet.echoSeq + + if packet.my.Type == (int32)(MyMsg_DATA) { + + if packet.my.Tcpmode > 0 { + f := &frame.Frame{} + err := proto.Unmarshal(packet.my.Data, f) + if err != nil { + loggo.Error("Unmarshal tcp Error %s", err) + return + } + + localConn.fm.OnRecvFrame(f) + + } else { + if packet.my.Data == nil { + return + } + _, err := localConn.conn.Write(packet.my.Data) + if err != nil { + loggo.Info("WriteToUDP Error %s", err) + localConn.close = true + return + } + } + + p.recvPacket++ + p.recvPacketSize += (uint64)(len(packet.my.Data)) + } +} + +func (p *Server) RecvTCP(conn *ServerConn, id string, src *net.IPAddr) { + + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + loggo.Info("server waiting target response %s -> %s %s", conn.tcpaddrTarget.String(), conn.id, conn.tcpconn.LocalAddr().String()) + + loggo.Info("start wait remote connect tcp %s %s", conn.id, conn.tcpaddrTarget.String()) + startConnectTime := common.GetNowUpdateInSecond() + for !p.exit && !conn.exit { + if conn.fm.IsConnected() { + break + } + conn.fm.Update() + sendlist := conn.fm.GetSendList() + for e := sendlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*frame.Frame) + mb, _ := conn.fm.MarshalFrame(f) + sendICMP(conn.echoId, conn.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb, + conn.rproto, -1, p.key, + 0, 0, 0, 0, 0, 0, + 0) + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) + } + time.Sleep(time.Millisecond * 10) + now := common.GetNowUpdateInSecond() + diffclose := now.Sub(startConnectTime) + if diffclose > time.Second*5 { + loggo.Info("can not connect remote tcp %s %s", conn.id, conn.tcpaddrTarget.String()) + p.close(conn) + p.remoteError(conn.echoId, conn.echoSeq, id, conn.rproto, src) + return + } + } + + if !conn.exit { + loggo.Info("remote connected tcp %s %s", conn.id, conn.tcpaddrTarget.String()) + } + + bytes := make([]byte, 10240) + + tcpActiveRecvTime := common.GetNowUpdateInSecond() + tcpActiveSendTime := common.GetNowUpdateInSecond() + + for !p.exit && !conn.exit { + now := common.GetNowUpdateInSecond() + sleep := true + + left := common.MinOfInt(conn.fm.GetSendBufferLeft(), len(bytes)) + if left > 0 { + conn.tcpconn.SetReadDeadline(time.Now().Add(time.Millisecond * 1)) + n, err := conn.tcpconn.Read(bytes[0:left]) + if err != nil { + nerr, ok := err.(net.Error) + if !ok || !nerr.Timeout() { + loggo.Info("Error read tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) + conn.fm.Close() + break + } + } + if n > 0 { + sleep = false + conn.fm.WriteSendBuffer(bytes[:n]) + tcpActiveRecvTime = now + } + } + + conn.fm.Update() + + sendlist := conn.fm.GetSendList() + if sendlist.Len() > 0 { + sleep = false + conn.activeSendTime = now + for e := sendlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*frame.Frame) + mb, err := conn.fm.MarshalFrame(f) + if err != nil { + loggo.Error("Error tcp Marshal %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) + continue + } + sendICMP(conn.echoId, conn.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb, + conn.rproto, -1, p.key, + 0, 0, 0, 0, 0, 0, + 0) + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) + } + } + + if conn.fm.GetRecvBufferSize() > 0 { + sleep = false + rr := conn.fm.GetRecvReadLineBuffer() + conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 1)) + n, err := conn.tcpconn.Write(rr) + if err != nil { + nerr, ok := err.(net.Error) + if !ok || !nerr.Timeout() { + loggo.Info("Error write tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) + conn.fm.Close() + break + } + } + if n > 0 { + conn.fm.SkipRecvBuffer(n) + tcpActiveSendTime = now + } + } + + if sleep { + time.Sleep(time.Millisecond * 10) + } + + diffrecv := now.Sub(conn.activeRecvTime) + diffsend := now.Sub(conn.activeSendTime) + tcpdiffrecv := now.Sub(tcpActiveRecvTime) + tcpdiffsend := now.Sub(tcpActiveSendTime) + if diffrecv > time.Second*(time.Duration(conn.timeout)) || diffsend > time.Second*(time.Duration(conn.timeout)) || + (tcpdiffrecv > time.Second*(time.Duration(conn.timeout)) && tcpdiffsend > time.Second*(time.Duration(conn.timeout))) { + loggo.Info("close inactive conn %s %s", conn.id, conn.tcpaddrTarget.String()) + conn.fm.Close() + break + } + + if conn.fm.IsRemoteClosed() { + loggo.Info("closed by remote conn %s %s", conn.id, conn.tcpaddrTarget.String()) + conn.fm.Close() + break + } + } + + conn.fm.Close() + + startCloseTime := common.GetNowUpdateInSecond() + for !p.exit && !conn.exit { + now := common.GetNowUpdateInSecond() + + conn.fm.Update() + + sendlist := conn.fm.GetSendList() + for e := sendlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*frame.Frame) + mb, _ := conn.fm.MarshalFrame(f) + sendICMP(conn.echoId, conn.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb, + conn.rproto, -1, p.key, + 0, 0, 0, 0, 0, 0, + 0) + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) + } + + nodatarecv := true + if conn.fm.GetRecvBufferSize() > 0 { + rr := conn.fm.GetRecvReadLineBuffer() + conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100)) + n, _ := conn.tcpconn.Write(rr) + if n > 0 { + conn.fm.SkipRecvBuffer(n) + nodatarecv = false + } + } + + diffclose := now.Sub(startCloseTime) + if diffclose > time.Second*60 { + loggo.Info("close conn had timeout %s %s", conn.id, conn.tcpaddrTarget.String()) + break + } + + remoteclosed := conn.fm.IsRemoteClosed() + if remoteclosed && nodatarecv { + loggo.Info("remote conn had closed %s %s", conn.id, conn.tcpaddrTarget.String()) + break + } + + time.Sleep(time.Millisecond * 100) + } + + time.Sleep(time.Second) + + loggo.Info("close tcp conn %s %s", conn.id, conn.tcpaddrTarget.String()) + p.close(conn) +} + +func (p *Server) Recv(conn *ServerConn, id string, src *net.IPAddr) { + + defer common.CrashLog() + + p.workResultLock.Add(1) + defer p.workResultLock.Done() + + loggo.Info("server waiting target response %s -> %s %s", conn.ipaddrTarget.String(), conn.id, conn.conn.LocalAddr().String()) + + bytes := make([]byte, 2000) + + for !p.exit { + + conn.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) + n, _, err := conn.conn.ReadFromUDP(bytes) + if err != nil { + nerr, ok := err.(net.Error) + if !ok || !nerr.Timeout() { + loggo.Info("ReadFromUDP Error read udp %s", err) + conn.close = true + return + } + } + + now := common.GetNowUpdateInSecond() + conn.activeSendTime = now + + sendICMP(conn.echoId, conn.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), bytes[:n], + conn.rproto, -1, p.key, + 0, 0, 0, 0, 0, 0, + 0) + + p.sendPacket++ + p.sendPacketSize += (uint64)(n) + } +} + +func (p *Server) close(conn *ServerConn) { + if p.getServerConnById(conn.id) != nil { + conn.exit = true + if conn.conn != nil { + conn.conn.Close() + } + if conn.tcpconn != nil { + conn.tcpconn.Close() + } + p.deleteServerConn(conn.id) + } +} + +func (p *Server) checkTimeoutConn() { + + tmp := make(map[string]*ServerConn) + p.localConnMap.Range(func(key, value interface{}) bool { + id := key.(string) + serverConn := value.(*ServerConn) + tmp[id] = serverConn + return true + }) + + now := common.GetNowUpdateInSecond() + for _, conn := range tmp { + if conn.tcpmode > 0 { + continue + } + diffrecv := now.Sub(conn.activeRecvTime) + diffsend := now.Sub(conn.activeSendTime) + if diffrecv > time.Second*(time.Duration(conn.timeout)) || diffsend > time.Second*(time.Duration(conn.timeout)) { + conn.close = true + } + } + + for id, conn := range tmp { + if conn.tcpmode > 0 { + continue + } + if conn.close { + loggo.Info("close inactive conn %s %s", id, conn.ipaddrTarget.String()) + p.close(conn) + } + } +} + +func (p *Server) showNet() { + p.localConnMapSize = 0 + p.localConnMap.Range(func(key, value interface{}) bool { + p.localConnMapSize++ + return true + }) + loggo.Info("send %dPacket/s %dKB/s recv %dPacket/s %dKB/s %dConnections", + p.sendPacket, p.sendPacketSize/1024, p.recvPacket, p.recvPacketSize/1024, p.localConnMapSize) + p.sendPacket = 0 + p.recvPacket = 0 + p.sendPacketSize = 0 + p.recvPacketSize = 0 +} + +func (p *Server) addServerConn(uuid string, serverConn *ServerConn) { + p.localConnMap.Store(uuid, serverConn) +} + +func (p *Server) getServerConnById(uuid string) *ServerConn { + ret, ok := p.localConnMap.Load(uuid) + if !ok { + return nil + } + return ret.(*ServerConn) +} + +func (p *Server) deleteServerConn(uuid string) { + p.localConnMap.Delete(uuid) +} + +func (p *Server) remoteError(echoId int, echoSeq int, uuid string, rprpto int, src *net.IPAddr) { + sendICMP(echoId, echoSeq, *p.conn, src, "", uuid, (uint32)(MyMsg_KICK), []byte{}, + rprpto, -1, p.key, + 0, 0, 0, 0, 0, 0, + 0) +} + +func (p *Server) addConnError(addr string) { + _, ok := p.connErrorMap.Load(addr) + if !ok { + now := common.GetNowUpdateInSecond() + p.connErrorMap.Store(addr, now) + } +} + +func (p *Server) isConnError(addr string) bool { + _, ok := p.connErrorMap.Load(addr) + return ok +} + +func (p *Server) updateConnError() { + + tmp := make(map[string]time.Time) + p.connErrorMap.Range(func(key, value interface{}) bool { + id := key.(string) + t := value.(time.Time) + tmp[id] = t + return true + }) + + now := common.GetNowUpdateInSecond() + for id, t := range tmp { + diff := now.Sub(t) + if diff > time.Second*5 { + p.connErrorMap.Delete(id) + } + } +}