diff --git a/client.go b/client.go deleted file mode 100644 index 73bb1d6..0000000 --- a/client.go +++ /dev/null @@ -1,721 +0,0 @@ -package pingtunnel - -import ( - "github.com/esrrhs/go-engine/src/common" - "github.com/esrrhs/go-engine/src/loggo" - "github.com/golang/protobuf/proto" - "golang.org/x/net/icmp" - "math" - "math/rand" - "net" - "sync" - "time" -) - -const ( - SEND_PROTO int = 8 - RECV_PROTO int = 0 -) - -func NewClient(addr string, server string, target string, timeout int, key int, - tcpmode int, tcpmode_buffersize int, tcpmode_maxwin int, tcpmode_resend_timems int, tcpmode_compress int, - tcpmode_stat int, open_sock5 int, maxconn int) (*Client, error) { - - var ipaddr *net.UDPAddr - var tcpaddr *net.TCPAddr - var err error - - if tcpmode > 0 { - tcpaddr, err = net.ResolveTCPAddr("tcp", addr) - if err != nil { - return nil, err - } - } else { - ipaddr, err = net.ResolveUDPAddr("udp", addr) - if err != nil { - return nil, err - } - } - - ipaddrServer, err := net.ResolveIPAddr("ip", server) - if err != nil { - return nil, err - } - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - return &Client{ - exit: false, - rtt: 0, - id: r.Intn(math.MaxInt16), - ipaddr: ipaddr, - tcpaddr: tcpaddr, - addr: addr, - ipaddrServer: ipaddrServer, - addrServer: server, - targetAddr: target, - timeout: timeout, - key: key, - tcpmode: tcpmode, - tcpmode_buffersize: tcpmode_buffersize, - tcpmode_maxwin: tcpmode_maxwin, - tcpmode_resend_timems: tcpmode_resend_timems, - tcpmode_compress: tcpmode_compress, - tcpmode_stat: tcpmode_stat, - open_sock5: open_sock5, - maxconn: maxconn, - }, nil -} - -type Client struct { - exit bool - rtt time.Duration - interval *time.Ticker - workResultLock sync.WaitGroup - maxconn int - - id int - sequence int - - timeout int - sproto int - rproto int - key int - tcpmode int - tcpmode_buffersize int - tcpmode_maxwin int - tcpmode_resend_timems int - tcpmode_compress int - tcpmode_stat int - open_sock5 int - - ipaddr *net.UDPAddr - tcpaddr *net.TCPAddr - addr string - - ipaddrServer *net.IPAddr - addrServer string - - targetAddr string - - conn *icmp.PacketConn - listenConn *net.UDPConn - tcplistenConn *net.TCPListener - - localAddrToConnMap sync.Map - localIdToConnMap sync.Map - - sendPacket uint64 - recvPacket uint64 - sendPacketSize uint64 - recvPacketSize uint64 - localAddrToConnMapSize int - localIdToConnMapSize int -} - -type ClientConn struct { - exit bool - ipaddr *net.UDPAddr - tcpaddr *net.TCPAddr - id string - activeRecvTime time.Time - activeSendTime time.Time - close bool - - fm *FrameMgr -} - -func (p *Client) Addr() string { - return p.addr -} - -func (p *Client) IPAddr() *net.UDPAddr { - return p.ipaddr -} - -func (p *Client) TargetAddr() string { - return p.targetAddr -} - -func (p *Client) ServerIPAddr() *net.IPAddr { - return p.ipaddrServer -} - -func (p *Client) ServerAddr() string { - return p.addrServer -} - -func (p *Client) RTT() time.Duration { - return p.rtt -} - -func (p *Client) RecvPacketSize() uint64 { - return p.recvPacketSize -} - -func (p *Client) SendPacketSize() uint64 { - return p.sendPacketSize -} - -func (p *Client) RecvPacket() uint64 { - return p.recvPacket -} - -func (p *Client) SendPacket() uint64 { - return p.sendPacket -} - -func (p *Client) LocalIdToConnMapSize() int { - return p.localIdToConnMapSize -} - -func (p *Client) LocalAddrToConnMapSize() int { - return p.localAddrToConnMapSize -} - -func (p *Client) Run() error { - - conn, err := icmp.ListenPacket("ip4:icmp", "") - if err != nil { - loggo.Error("Error listening for ICMP packets: %s", err.Error()) - return err - } - p.conn = conn - - if p.tcpmode > 0 { - tcplistenConn, err := net.ListenTCP("tcp", p.tcpaddr) - if err != nil { - loggo.Error("Error listening for tcp packets: %s", err.Error()) - return err - } - p.tcplistenConn = tcplistenConn - } else { - listener, err := net.ListenUDP("udp", p.ipaddr) - if err != nil { - loggo.Error("Error listening for udp packets: %s", err.Error()) - return err - } - p.listenConn = listener - } - - if p.tcpmode > 0 { - go p.AcceptTcp() - } else { - go p.Accept() - } - - recv := make(chan *Packet, 10000) - go recvICMP(&p.workResultLock, &p.exit, *p.conn, recv) - - p.interval = time.NewTicker(time.Second) - - go func() { - p.workResultLock.Add(1) - defer p.workResultLock.Done() - - for !p.exit { - select { - case <-p.interval.C: - p.checkTimeoutConn() - p.ping() - p.showNet() - case r := <-recv: - p.processPacket(r) - } - } - }() - - return nil -} - -func (p *Client) Stop() { - p.exit = true - p.workResultLock.Wait() - p.conn.Close() - if p.tcplistenConn != nil { - p.tcplistenConn.Close() - } - if p.listenConn != nil { - p.listenConn.Close() - } - p.interval.Stop() -} - -func (p *Client) AcceptTcp() error { - - p.workResultLock.Add(1) - defer p.workResultLock.Done() - - loggo.Info("client waiting local accept tcp") - - for !p.exit { - p.tcplistenConn.SetDeadline(time.Now().Add(time.Millisecond * 1000)) - - conn, err := p.tcplistenConn.AcceptTCP() - if err != nil { - nerr, ok := err.(net.Error) - if !ok || !nerr.Timeout() { - loggo.Info("Error accept tcp %s", err) - continue - } - } - - if conn != nil { - if p.open_sock5 > 0 { - go p.AcceptSock5Conn(conn) - } else { - go p.AcceptTcpConn(conn, p.targetAddr) - } - } - } - return nil -} - -func (p *Client) AcceptTcpConn(conn *net.TCPConn, targetAddr string) { - - p.workResultLock.Add(1) - defer p.workResultLock.Done() - - tcpsrcaddr := conn.RemoteAddr().(*net.TCPAddr) - - if p.maxconn > 0 && p.localIdToConnMapSize >= p.maxconn { - loggo.Info("too many connections %d, client accept new local tcp fail %s", p.localIdToConnMapSize, tcpsrcaddr.String()) - return - } - - uuid := UniqueId() - - fm := NewFrameMgr(p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress, p.tcpmode_stat) - - now := time.Now() - clientConn := &ClientConn{exit: false, tcpaddr: tcpsrcaddr, id: uuid, activeRecvTime: now, activeSendTime: now, close: false, - fm: fm} - p.addClientConn(uuid, tcpsrcaddr.String(), clientConn) - loggo.Info("client accept new local tcp %s %s", uuid, tcpsrcaddr.String()) - - loggo.Info("start connect remote tcp %s %s", uuid, tcpsrcaddr.String()) - clientConn.fm.Connect() - startConnectTime := time.Now() - for !p.exit && !clientConn.exit { - if clientConn.fm.IsConnected() { - break - } - clientConn.fm.Update() - sendlist := clientConn.fm.getSendList() - for e := sendlist.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - mb, _ := proto.Marshal(f) - p.sequence++ - sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb, - SEND_PROTO, RECV_PROTO, p.key, - p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems, p.tcpmode_compress, p.tcpmode_stat, - p.timeout) - p.sendPacket++ - p.sendPacketSize += (uint64)(len(mb)) - } - time.Sleep(time.Millisecond * 10) - now := time.Now() - diffclose := now.Sub(startConnectTime) - if diffclose > time.Second*(time.Duration(p.timeout)) { - loggo.Info("can not connect remote tcp %s %s", uuid, tcpsrcaddr.String()) - p.close(clientConn) - return - } - } - - if !clientConn.exit { - loggo.Info("connected remote tcp %s %s", uuid, tcpsrcaddr.String()) - } - - bytes := make([]byte, 10240) - - tcpActiveRecvTime := time.Now() - tcpActiveSendTime := time.Now() - - for !p.exit && !clientConn.exit { - now := time.Now() - sleep := true - - left := common.MinOfInt(clientConn.fm.GetSendBufferLeft(), len(bytes)) - if left > 0 { - conn.SetReadDeadline(time.Now().Add(time.Millisecond * 1)) - n, err := conn.Read(bytes[0:left]) - if err != nil { - nerr, ok := err.(net.Error) - if !ok || !nerr.Timeout() { - loggo.Info("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err) - clientConn.fm.Close() - break - } - } - if n > 0 { - sleep = false - clientConn.fm.WriteSendBuffer(bytes[:n]) - tcpActiveRecvTime = now - } - } - - clientConn.fm.Update() - - sendlist := clientConn.fm.getSendList() - if sendlist.Len() > 0 { - sleep = false - clientConn.activeSendTime = now - for e := sendlist.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - mb, err := proto.Marshal(f) - if err != nil { - loggo.Error("Error tcp Marshal %s %s %s", uuid, tcpsrcaddr.String(), err) - continue - } - p.sequence++ - sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb, - SEND_PROTO, RECV_PROTO, p.key, - p.tcpmode, 0, 0, 0, 0, 0, - 0) - p.sendPacket++ - p.sendPacketSize += (uint64)(len(mb)) - } - } - - if clientConn.fm.GetRecvBufferSize() > 0 { - sleep = false - rr := clientConn.fm.GetRecvReadLineBuffer() - conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 1)) - n, err := conn.Write(rr) - if err != nil { - nerr, ok := err.(net.Error) - if !ok || !nerr.Timeout() { - loggo.Info("Error write tcp %s %s %s", uuid, tcpsrcaddr.String(), err) - clientConn.fm.Close() - break - } - } - if n > 0 { - clientConn.fm.SkipRecvBuffer(n) - tcpActiveSendTime = now - } - } - - if sleep { - time.Sleep(time.Millisecond * 10) - } - - diffrecv := now.Sub(clientConn.activeRecvTime) - diffsend := now.Sub(clientConn.activeSendTime) - tcpdiffrecv := now.Sub(tcpActiveRecvTime) - tcpdiffsend := now.Sub(tcpActiveSendTime) - if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) || - tcpdiffrecv > time.Second*(time.Duration(p.timeout)) || tcpdiffsend > time.Second*(time.Duration(p.timeout)) { - loggo.Info("close inactive conn %s %s", clientConn.id, clientConn.tcpaddr.String()) - clientConn.fm.Close() - break - } - - if clientConn.fm.IsRemoteClosed() { - loggo.Info("closed by remote conn %s %s", clientConn.id, clientConn.tcpaddr.String()) - clientConn.fm.Close() - break - } - } - - startCloseTime := time.Now() - for !p.exit && !clientConn.exit { - now := time.Now() - - clientConn.fm.Update() - - sendlist := clientConn.fm.getSendList() - for e := sendlist.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - mb, _ := proto.Marshal(f) - p.sequence++ - sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb, - SEND_PROTO, RECV_PROTO, p.key, - p.tcpmode, 0, 0, 0, 0, 0, - 0) - p.sendPacket++ - p.sendPacketSize += (uint64)(len(mb)) - } - - nodatarecv := true - if clientConn.fm.GetRecvBufferSize() > 0 { - rr := clientConn.fm.GetRecvReadLineBuffer() - conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100)) - n, _ := conn.Write(rr) - if n > 0 { - clientConn.fm.SkipRecvBuffer(n) - nodatarecv = false - } - } - - diffclose := now.Sub(startCloseTime) - timeout := diffclose > time.Second*(time.Duration(p.timeout)) - remoteclosed := clientConn.fm.IsRemoteClosed() - - if timeout { - loggo.Info("close conn had timeout %s %s", clientConn.id, clientConn.tcpaddr.String()) - break - } - - if remoteclosed && nodatarecv { - loggo.Info("remote conn had closed %s %s", clientConn.id, clientConn.tcpaddr.String()) - break - } - - time.Sleep(time.Millisecond * 100) - } - - loggo.Info("close tcp conn %s %s", clientConn.id, clientConn.tcpaddr.String()) - conn.Close() - p.close(clientConn) -} - -func (p *Client) Accept() error { - - p.workResultLock.Add(1) - defer p.workResultLock.Done() - - loggo.Info("client waiting local accept udp") - - bytes := make([]byte, 10240) - - for !p.exit { - p.listenConn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) - n, srcaddr, err := p.listenConn.ReadFromUDP(bytes) - if err != nil { - nerr, ok := err.(net.Error) - if !ok || !nerr.Timeout() { - loggo.Info("Error read udp %s", err) - continue - } - } - if n <= 0 { - continue - } - - now := time.Now() - clientConn := p.getClientConnByAddr(srcaddr.String()) - if clientConn == nil { - if p.maxconn > 0 && p.localIdToConnMapSize >= p.maxconn { - loggo.Info("too many connections %d, client accept new local udp fail %s", p.localIdToConnMapSize, srcaddr.String()) - continue - } - uuid := UniqueId() - clientConn = &ClientConn{exit: false, ipaddr: srcaddr, id: uuid, activeRecvTime: now, activeSendTime: now, close: false} - p.addClientConn(uuid, srcaddr.String(), clientConn) - loggo.Info("client accept new local udp %s %s", uuid, srcaddr.String()) - } - - clientConn.activeSendTime = now - sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), bytes[:n], - SEND_PROTO, RECV_PROTO, p.key, - p.tcpmode, 0, 0, 0, 0, 0, - p.timeout) - - p.sequence++ - - p.sendPacket++ - p.sendPacketSize += (uint64)(n) - } - return nil -} - -func (p *Client) processPacket(packet *Packet) { - - if packet.my.Rproto >= 0 { - return - } - - if packet.my.Key != (int32)(p.key) { - return - } - - if packet.echoId != p.id { - return - } - - if packet.my.Type == (int32)(MyMsg_PING) { - t := time.Time{} - t.UnmarshalBinary(packet.my.Data) - d := time.Now().Sub(t) - loggo.Info("pong from %s %s", packet.src.String(), d.String()) - p.rtt = d - return - } - - if packet.my.Type == (int32)(MyMsg_KICK) { - clientConn := p.getClientConnById(packet.my.Id) - if clientConn != nil { - p.close(clientConn) - loggo.Info("remote kick local %s", packet.my.Id) - } - return - } - - loggo.Debug("processPacket %s %s %d", packet.my.Id, packet.src.String(), len(packet.my.Data)) - - clientConn := p.getClientConnById(packet.my.Id) - if clientConn == nil { - loggo.Debug("processPacket no conn %s ", packet.my.Id) - p.remoteError(packet.my.Id) - return - } - - now := time.Now() - clientConn.activeRecvTime = now - - if p.tcpmode > 0 { - f := &Frame{} - err := proto.Unmarshal(packet.my.Data, f) - if err != nil { - loggo.Error("Unmarshal tcp Error %s", err) - return - } - - clientConn.fm.OnRecvFrame(f) - } else { - addr := clientConn.ipaddr - _, err := p.listenConn.WriteToUDP(packet.my.Data, addr) - if err != nil { - loggo.Info("WriteToUDP Error read udp %s", err) - clientConn.close = true - return - } - } - - p.recvPacket++ - p.recvPacketSize += (uint64)(len(packet.my.Data)) -} - -func (p *Client) close(clientConn *ClientConn) { - clientConn.exit = true - p.deleteClientConn(clientConn.id, clientConn.ipaddr.String()) - p.deleteClientConn(clientConn.id, clientConn.tcpaddr.String()) -} - -func (p *Client) checkTimeoutConn() { - - if p.tcpmode > 0 { - return - } - - tmp := make(map[string]*ClientConn) - p.localIdToConnMap.Range(func(key, value interface{}) bool { - id := key.(string) - clientConn := value.(*ClientConn) - tmp[id] = clientConn - return true - }) - - now := time.Now() - for _, conn := range tmp { - diffrecv := now.Sub(conn.activeRecvTime) - diffsend := now.Sub(conn.activeSendTime) - if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) { - conn.close = true - } - } - - for id, conn := range tmp { - if conn.close { - loggo.Info("close inactive conn %s %s", id, conn.ipaddr.String()) - p.close(conn) - } - } -} - -func (p *Client) ping() { - now := time.Now() - b, _ := now.MarshalBinary() - sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, "", "", (uint32)(MyMsg_PING), b, - SEND_PROTO, RECV_PROTO, p.key, - 0, 0, 0, 0, 0, 0, - 0) - loggo.Info("ping %s %s %d %d %d %d", p.addrServer, now.String(), p.sproto, p.rproto, p.id, p.sequence) - p.sequence++ -} - -func (p *Client) showNet() { - p.localAddrToConnMapSize = 0 - p.localIdToConnMap.Range(func(key, value interface{}) bool { - p.localAddrToConnMapSize++ - return true - }) - p.localIdToConnMapSize = 0 - p.localIdToConnMap.Range(func(key, value interface{}) bool { - p.localIdToConnMapSize++ - return true - }) - loggo.Info("send %dPacket/s %dKB/s recv %dPacket/s %dKB/s %d/%dConnections", - p.sendPacket, p.sendPacketSize/1024, p.recvPacket, p.recvPacketSize/1024, p.localAddrToConnMapSize, p.localIdToConnMapSize) - p.sendPacket = 0 - p.recvPacket = 0 - p.sendPacketSize = 0 - p.recvPacketSize = 0 -} - -func (p *Client) AcceptSock5Conn(conn *net.TCPConn) { - - p.workResultLock.Add(1) - defer p.workResultLock.Done() - - var err error = nil - if err = sock5Handshake(conn); err != nil { - loggo.Error("socks handshake: %s", err) - conn.Close() - return - } - _, addr, err := sock5GetRequest(conn) - if err != nil { - loggo.Error("error getting request: %s", err) - conn.Close() - return - } - // Sending connection established message immediately to client. - // This some round trip time for creating socks connection with the client. - // But if connection failed, the client will get connection reset error. - _, err = conn.Write([]byte{0x05, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x08, 0x43}) - if err != nil { - loggo.Error("send connection confirmation:", err) - conn.Close() - return - } - - loggo.Info("accept new sock5 conn: %s", addr) - - p.AcceptTcpConn(conn, addr) -} - -func (p *Client) addClientConn(uuid string, addr string, clientConn *ClientConn) { - - p.localAddrToConnMap.Store(addr, clientConn) - p.localIdToConnMap.Store(uuid, clientConn) -} - -func (p *Client) getClientConnByAddr(addr string) *ClientConn { - ret, ok := p.localAddrToConnMap.Load(addr) - if !ok { - return nil - } - return ret.(*ClientConn) -} - -func (p *Client) getClientConnById(uuid string) *ClientConn { - ret, ok := p.localIdToConnMap.Load(uuid) - if !ok { - return nil - } - return ret.(*ClientConn) -} - -func (p *Client) deleteClientConn(uuid string, addr string) { - p.localIdToConnMap.Delete(uuid) - p.localAddrToConnMap.Delete(addr) -} - -func (p *Client) remoteError(uuid string) { - sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, "", uuid, (uint32)(MyMsg_KICK), []byte{}, - SEND_PROTO, RECV_PROTO, p.key, - 0, 0, 0, 0, 0, 0, - 0) -} diff --git a/framemgr.go b/framemgr.go deleted file mode 100644 index b92f450..0000000 --- a/framemgr.go +++ /dev/null @@ -1,686 +0,0 @@ -package pingtunnel - -import ( - "bytes" - "compress/zlib" - "container/list" - "github.com/esrrhs/go-engine/src/common" - "github.com/esrrhs/go-engine/src/loggo" - "github.com/esrrhs/go-engine/src/rbuffergo" - "io" - "strconv" - "sync" - "time" -) - -type FrameStat struct { - sendDataNum int - recvDataNum int - sendReqNum int - recvReqNum int - sendAckNum int - recvAckNum int - sendDataNumsMap map[int32]int - recvDataNumsMap map[int32]int - sendReqNumsMap map[int32]int - recvReqNumsMap map[int32]int - sendAckNumsMap map[int32]int - recvAckNumsMap map[int32]int - sendping int - sendpong int - recvping int - recvpong int -} - -type FrameMgr struct { - sendb *rbuffergo.RBuffergo - recvb *rbuffergo.RBuffergo - - recvlock sync.Locker - windowsize int - resend_timems int - compress int - - sendwin *list.List - sendlist *list.List - sendid int - - recvwin *list.List - recvlist *list.List - recvid int - - close bool - remoteclosed bool - closesend bool - - lastPingTime int64 - rttns int64 - - reqmap map[int32]int64 - sendmap map[int32]int64 - - connected bool - - fs *FrameStat - openstat int - lastPrintStat int64 -} - -func NewFrameMgr(buffersize int, windowsize int, resend_timems int, compress int, openstat int) *FrameMgr { - - sendb := rbuffergo.New(buffersize, false) - recvb := rbuffergo.New(buffersize, false) - - fm := &FrameMgr{sendb: sendb, recvb: recvb, - recvlock: &sync.Mutex{}, - windowsize: windowsize, resend_timems: resend_timems, compress: compress, - sendwin: list.New(), sendlist: list.New(), sendid: 0, - recvwin: list.New(), recvlist: list.New(), recvid: 0, - close: false, remoteclosed: false, closesend: false, - lastPingTime: time.Now().UnixNano(), rttns: (int64)(resend_timems * 1000), - reqmap: make(map[int32]int64), sendmap: make(map[int32]int64), - connected: false, openstat: openstat, lastPrintStat: time.Now().UnixNano()} - if openstat > 0 { - fm.resetStat() - } - return fm -} - -func (fm *FrameMgr) GetSendBufferLeft() int { - left := fm.sendb.Capacity() - fm.sendb.Size() - return left -} - -func (fm *FrameMgr) WriteSendBuffer(data []byte) { - fm.sendb.Write(data) - loggo.Debug("WriteSendBuffer %d %d", fm.sendb.Size(), len(data)) -} - -func (fm *FrameMgr) Update() { - fm.cutSendBufferToWindow() - - fm.sendlist.Init() - - tmpreq, tmpack, tmpackto := fm.preProcessRecvList() - fm.processRecvList(tmpreq, tmpack, tmpackto) - - fm.combineWindowToRecvBuffer() - - fm.calSendList() - - fm.ping() - - fm.printStat() -} - -func (fm *FrameMgr) cutSendBufferToWindow() { - - sendall := false - - if fm.sendb.Size() < FRAME_MAX_SIZE { - sendall = true - } - - for fm.sendb.Size() >= FRAME_MAX_SIZE && fm.sendwin.Len() < fm.windowsize { - fd := &FrameData{Type: (int32)(FrameData_USER_DATA), - Data: make([]byte, FRAME_MAX_SIZE)} - fm.sendb.Read(fd.Data) - - if fm.compress > 0 && len(fd.Data) > fm.compress { - newb := fm.compressData(fd.Data) - if len(newb) < len(fd.Data) { - fd.Data = newb - fd.Compress = true - } - } - - f := &Frame{Type: (int32)(Frame_DATA), - Id: (int32)(fm.sendid), - Data: fd} - - fm.sendid++ - if fm.sendid >= FRAME_MAX_ID { - fm.sendid = 0 - } - - fm.sendwin.PushBack(f) - loggo.Debug("cut frame push to send win %d %d %d", f.Id, FRAME_MAX_SIZE, fm.sendwin.Len()) - } - - if sendall && fm.sendb.Size() > 0 && fm.sendwin.Len() < fm.windowsize { - fd := &FrameData{Type: (int32)(FrameData_USER_DATA), - Data: make([]byte, fm.sendb.Size())} - fm.sendb.Read(fd.Data) - - if fm.compress > 0 && len(fd.Data) > fm.compress { - newb := fm.compressData(fd.Data) - if len(newb) < len(fd.Data) { - fd.Data = newb - fd.Compress = true - } - } - - f := &Frame{Type: (int32)(Frame_DATA), - Id: (int32)(fm.sendid), - Data: fd} - - fm.sendid++ - if fm.sendid >= FRAME_MAX_ID { - fm.sendid = 0 - } - - fm.sendwin.PushBack(f) - loggo.Debug("cut small frame push to send win %d %d %d", f.Id, len(f.Data.Data), fm.sendwin.Len()) - } - - if fm.sendb.Empty() && fm.close && !fm.closesend && fm.sendwin.Len() < fm.windowsize { - fd := &FrameData{Type: (int32)(FrameData_CLOSE)} - - f := &Frame{Type: (int32)(Frame_DATA), - Id: (int32)(fm.sendid), - Data: fd} - - fm.sendid++ - if fm.sendid >= FRAME_MAX_ID { - fm.sendid = 0 - } - - fm.sendwin.PushBack(f) - fm.closesend = true - loggo.Debug("close frame push to send win %d %d", f.Id, fm.sendwin.Len()) - } -} - -func (fm *FrameMgr) calSendList() { - - cur := time.Now().UnixNano() - for e := fm.sendwin.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - if f.Resend || cur-f.Sendtime > int64(fm.resend_timems*(int)(time.Millisecond)) { - oldsend := fm.sendmap[f.Id] - if cur-oldsend > fm.rttns { - f.Sendtime = cur - fm.sendlist.PushBack(f) - f.Resend = false - fm.sendmap[f.Id] = cur - if fm.openstat > 0 { - fm.fs.sendDataNum++ - fm.fs.sendDataNumsMap[f.Id]++ - } - loggo.Debug("push frame to sendlist %d %d", f.Id, len(f.Data.Data)) - } - } - } -} - -func (fm *FrameMgr) getSendList() *list.List { - return fm.sendlist -} - -func (fm *FrameMgr) OnRecvFrame(f *Frame) { - fm.recvlock.Lock() - defer fm.recvlock.Unlock() - fm.recvlist.PushBack(f) -} - -func (fm *FrameMgr) preProcessRecvList() (map[int32]int, map[int32]int, map[int32]*Frame) { - fm.recvlock.Lock() - defer fm.recvlock.Unlock() - - tmpreq := make(map[int32]int) - tmpack := make(map[int32]int) - tmpackto := make(map[int32]*Frame) - for e := fm.recvlist.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - if f.Type == (int32)(Frame_REQ) { - for _, id := range f.Dataid { - tmpreq[id]++ - loggo.Debug("recv req %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ",")) - } - } else if f.Type == (int32)(Frame_ACK) { - for _, id := range f.Dataid { - tmpack[id]++ - loggo.Debug("recv ack %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ",")) - } - } else if f.Type == (int32)(Frame_DATA) { - tmpackto[f.Id] = f - if fm.openstat > 0 { - fm.fs.recvDataNum++ - fm.fs.recvDataNumsMap[f.Id]++ - } - loggo.Debug("recv data %d %d", f.Id, len(f.Data.Data)) - } else if f.Type == (int32)(Frame_PING) { - fm.processPing(f) - } else if f.Type == (int32)(Frame_PONG) { - fm.processPong(f) - } else { - loggo.Error("error frame type %d", f.Type) - } - } - fm.recvlist.Init() - return tmpreq, tmpack, tmpackto -} - -func (fm *FrameMgr) processRecvList(tmpreq map[int32]int, tmpack map[int32]int, tmpackto map[int32]*Frame) { - - for id, num := range tmpreq { - for e := fm.sendwin.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - if f.Id == id { - f.Resend = true - loggo.Debug("choose resend win %d %d", f.Id, len(f.Data.Data)) - break - } - } - if fm.openstat > 0 { - fm.fs.recvReqNum += num - fm.fs.recvReqNumsMap[id] += num - } - } - - for id, num := range tmpack { - for e := fm.sendwin.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - if f.Id == id { - fm.sendwin.Remove(e) - delete(fm.sendmap, f.Id) - loggo.Debug("remove send win %d %d", f.Id, len(f.Data.Data)) - break - } - } - if fm.openstat > 0 { - fm.fs.recvAckNum += num - fm.fs.recvAckNumsMap[id] += num - } - } - - if len(tmpackto) > 0 { - tmp := make([]int32, len(tmpackto)) - index := 0 - for id, rf := range tmpackto { - if fm.addToRecvWin(rf) { - tmp[index] = id - index++ - if fm.openstat > 0 { - fm.fs.sendAckNum++ - fm.fs.sendAckNumsMap[id]++ - } - loggo.Debug("add data to win %d %d", rf.Id, len(rf.Data.Data)) - } - } - if index > 0 { - f := &Frame{Type: (int32)(Frame_ACK), Resend: false, Sendtime: 0, - Id: 0, - Dataid: tmp[0:index]} - fm.sendlist.PushBack(f) - loggo.Debug("send ack %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ",")) - } - } -} - -func (fm *FrameMgr) addToRecvWin(rf *Frame) bool { - - if !fm.isIdInRange((int)(rf.Id), FRAME_MAX_ID) { - loggo.Debug("recv frame not in range %d %d", rf.Id, fm.recvid) - if fm.isIdOld((int)(rf.Id), FRAME_MAX_ID) { - return true - } - return false - } - - for e := fm.recvwin.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - if f.Id == rf.Id { - loggo.Debug("recv frame ignore %d %d", f.Id, len(f.Data.Data)) - return true - } - } - - for e := fm.recvwin.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - loggo.Debug("start insert recv win %d %d %d", fm.recvid, rf.Id, f.Id) - if fm.compareId((int)(rf.Id), (int)(f.Id)) < 0 { - fm.recvwin.InsertBefore(rf, e) - loggo.Debug("insert recv win %d %d before %d", rf.Id, len(rf.Data.Data), f.Id) - return true - } - } - - fm.recvwin.PushBack(rf) - loggo.Debug("insert recv win last %d %d", rf.Id, len(rf.Data.Data)) - return true -} - -func (fm *FrameMgr) processRecvFrame(f *Frame) bool { - if f.Data.Type == (int32)(FrameData_USER_DATA) { - left := fm.recvb.Capacity() - fm.recvb.Size() - if left >= len(f.Data.Data) { - src := f.Data.Data - if f.Data.Compress { - err, old := fm.deCompressData(src) - if err != nil { - loggo.Error("recv frame deCompressData error %d", f.Id) - return false - } - if left < len(old) { - return false - } - loggo.Debug("deCompressData recv frame %d %d %d", - f.Id, len(src), len(old)) - src = old - } - - fm.recvb.Write(src) - loggo.Debug("combined recv frame to recv buffer %d %d", - f.Id, len(src)) - return true - } - return false - } else if f.Data.Type == (int32)(FrameData_CLOSE) { - fm.remoteclosed = true - loggo.Debug("recv remote close frame %d", f.Id) - return true - } else if f.Data.Type == (int32)(FrameData_CONN) { - fm.sendConnectRsp() - fm.connected = true - loggo.Debug("recv remote conn frame %d", f.Id) - return true - } else if f.Data.Type == (int32)(FrameData_CONNRSP) { - fm.connected = true - loggo.Debug("recv remote conn rsp frame %d", f.Id) - return true - } else { - loggo.Error("recv frame type error %d", f.Data.Type) - return false - } -} - -func (fm *FrameMgr) combineWindowToRecvBuffer() { - - for { - done := false - for e := fm.recvwin.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - if f.Id == (int32)(fm.recvid) { - delete(fm.reqmap, f.Id) - if fm.processRecvFrame(f) { - fm.recvwin.Remove(e) - done = true - loggo.Debug("process recv frame ok %d %d", - f.Id, len(f.Data.Data)) - break - } - } - } - if !done { - break - } else { - fm.recvid++ - if fm.recvid >= FRAME_MAX_ID { - fm.recvid = 0 - } - loggo.Debug("combined ok add recvid %d ", fm.recvid) - } - } - - cur := time.Now().UnixNano() - reqtmp := make(map[int]int) - e := fm.recvwin.Front() - id := fm.recvid - for len(reqtmp) < fm.windowsize && len(reqtmp)*4 < FRAME_MAX_SIZE/2 && e != nil { - f := e.Value.(*Frame) - loggo.Debug("start add req id %d %d %d", fm.recvid, f.Id, id) - if f.Id != (int32)(id) { - oldReq := fm.reqmap[f.Id] - if cur-oldReq > fm.rttns { - reqtmp[id]++ - fm.reqmap[f.Id] = cur - loggo.Debug("add req id %d ", id) - } - } else { - e = e.Next() - } - - id++ - if id >= FRAME_MAX_ID { - id = 0 - } - } - - if len(reqtmp) > 0 { - f := &Frame{Type: (int32)(Frame_REQ), Resend: false, Sendtime: 0, - Id: 0, - Dataid: make([]int32, len(reqtmp))} - index := 0 - for id, _ := range reqtmp { - f.Dataid[index] = (int32)(id) - index++ - if fm.openstat > 0 { - fm.fs.sendReqNum++ - fm.fs.sendReqNumsMap[(int32)(id)]++ - } - } - fm.sendlist.PushBack(f) - loggo.Debug("send req %d %s", f.Id, common.Int32ArrayToString(f.Dataid, ",")) - } -} - -func (fm *FrameMgr) GetRecvBufferSize() int { - return fm.recvb.Size() -} - -func (fm *FrameMgr) GetRecvReadLineBuffer() []byte { - ret := fm.recvb.GetReadLineBuffer() - loggo.Debug("GetRecvReadLineBuffer %d %d", fm.recvb.Size(), len(ret)) - return ret -} - -func (fm *FrameMgr) SkipRecvBuffer(size int) { - fm.recvb.SkipRead(size) - loggo.Debug("SkipRead %d %d", fm.recvb.Size(), size) -} - -func (fm *FrameMgr) Close() { - fm.recvlock.Lock() - defer fm.recvlock.Unlock() - - fm.close = true -} - -func (fm *FrameMgr) IsRemoteClosed() bool { - return fm.remoteclosed -} - -func (fm *FrameMgr) ping() { - cur := time.Now().UnixNano() - if cur-fm.lastPingTime > (int64)(time.Second) { - fm.lastPingTime = cur - f := &Frame{Type: (int32)(Frame_PING), Resend: false, Sendtime: cur, - Id: 0} - fm.sendlist.PushBack(f) - loggo.Debug("send ping %d", cur) - if fm.openstat > 0 { - fm.fs.sendping++ - } - } -} - -func (fm *FrameMgr) processPing(f *Frame) { - rf := &Frame{Type: (int32)(Frame_PONG), Resend: false, Sendtime: f.Sendtime, - Id: 0} - fm.sendlist.PushBack(rf) - if fm.openstat > 0 { - fm.fs.recvping++ - fm.fs.sendpong++ - } - loggo.Debug("recv ping %d", f.Sendtime) -} - -func (fm *FrameMgr) processPong(f *Frame) { - cur := time.Now().UnixNano() - if cur > f.Sendtime { - rtt := cur - f.Sendtime - fm.rttns = (fm.rttns + rtt) / 2 - if fm.openstat > 0 { - fm.fs.recvpong++ - } - loggo.Debug("recv pong %d %dms", rtt, fm.rttns/1000/1000) - } -} - -func (fm *FrameMgr) isIdInRange(id int, maxid int) bool { - begin := fm.recvid - end := fm.recvid + fm.windowsize - if end >= maxid { - if id >= 0 && id < end-maxid { - return true - } - end = maxid - } - if id >= begin && id < end { - return true - } - return false -} - -func (fm *FrameMgr) compareId(l int, r int) int { - - if l < fm.recvid { - l += FRAME_MAX_ID - } - if r < fm.recvid { - r += FRAME_MAX_ID - } - - return l - r -} - -func (fm *FrameMgr) isIdOld(id int, maxid int) bool { - if id > fm.recvid { - return false - } - - end := fm.recvid + fm.windowsize*2 - if end >= maxid { - if id >= end-maxid && id < fm.recvid { - return true - } - } else { - if id < fm.recvid { - return true - } - } - - return false -} - -func (fm *FrameMgr) IsConnected() bool { - return fm.connected -} - -func (fm *FrameMgr) Connect() { - fd := &FrameData{Type: (int32)(FrameData_CONN)} - - f := &Frame{Type: (int32)(Frame_DATA), - Id: (int32)(fm.sendid), - Data: fd} - - fm.sendid++ - if fm.sendid >= FRAME_MAX_ID { - fm.sendid = 0 - } - - fm.sendwin.PushBack(f) - loggo.Debug("start connect") -} - -func (fm *FrameMgr) sendConnectRsp() { - fd := &FrameData{Type: (int32)(FrameData_CONNRSP)} - - f := &Frame{Type: (int32)(Frame_DATA), - Id: (int32)(fm.sendid), - Data: fd} - - fm.sendid++ - if fm.sendid >= FRAME_MAX_ID { - fm.sendid = 0 - } - - fm.sendwin.PushBack(f) - loggo.Debug("send connect rsp") -} - -func (fm *FrameMgr) compressData(src []byte) []byte { - var b bytes.Buffer - w := zlib.NewWriter(&b) - w.Write(src) - w.Close() - return b.Bytes() -} - -func (fm *FrameMgr) deCompressData(src []byte) (error, []byte) { - b := bytes.NewReader(src) - r, err := zlib.NewReader(b) - if err != nil { - return err, nil - } - var out bytes.Buffer - io.Copy(&out, r) - r.Close() - return nil, out.Bytes() -} - -func (fm *FrameMgr) resetStat() { - fm.fs = &FrameStat{} - fm.fs.sendDataNumsMap = make(map[int32]int) - fm.fs.recvDataNumsMap = make(map[int32]int) - fm.fs.sendReqNumsMap = make(map[int32]int) - fm.fs.recvReqNumsMap = make(map[int32]int) - fm.fs.sendAckNumsMap = make(map[int32]int) - fm.fs.recvAckNumsMap = make(map[int32]int) -} - -func (fm *FrameMgr) printStat() { - if fm.openstat > 0 { - cur := time.Now().UnixNano() - if cur-fm.lastPrintStat > (int64)(time.Second) { - fm.lastPrintStat = cur - fs := fm.fs - loggo.Info("\nsendDataNum %d\nrecvDataNum %d\nsendReqNum %d\nrecvReqNum %d\nsendAckNum %d\nrecvAckNum %d\n"+ - "sendDataNumsMap %s\nrecvDataNumsMap %s\nsendReqNumsMap %s\nrecvReqNumsMap %s\nsendAckNumsMap %s\nrecvAckNumsMap %s\n"+ - "sendping %d\nrecvping %d\nsendpong %d\nrecvpong %d\n"+ - "sendwin %d\nrecvwin %d\n", - fs.sendDataNum, fs.recvDataNum, - fs.sendReqNum, fs.recvReqNum, - fs.sendAckNum, fs.recvAckNum, - fm.printStatMap(&fs.sendDataNumsMap), fm.printStatMap(&fs.recvDataNumsMap), - fm.printStatMap(&fs.sendReqNumsMap), fm.printStatMap(&fs.recvReqNumsMap), - fm.printStatMap(&fs.sendAckNumsMap), fm.printStatMap(&fs.recvAckNumsMap), - fs.sendping, fs.recvping, - fs.sendpong, fs.recvpong, - fm.sendwin.Len(), fm.recvwin.Len()) - fm.resetStat() - } - } -} - -func (fm *FrameMgr) printStatMap(m *map[int32]int) string { - tmp := make(map[int]int) - for _, v := range *m { - tmp[v]++ - } - max := 0 - for k, _ := range tmp { - if k > max { - max = k - } - } - var ret string - for i := 1; i <= max; i++ { - ret += strconv.Itoa(i) + "->" + strconv.Itoa(tmp[i]) + "," - } - if len(ret) <= 0 { - ret = "none" - } - return ret -} diff --git a/gen.bat b/gen.bat deleted file mode 100644 index 0c05672..0000000 --- a/gen.bat +++ /dev/null @@ -1 +0,0 @@ -protoc --go_out=. *.proto \ No newline at end of file diff --git a/cmd/main.go b/main.go similarity index 98% rename from cmd/main.go rename to main.go index 5108ab8..83e7554 100644 --- a/cmd/main.go +++ b/main.go @@ -1,10 +1,10 @@ -package main +package pingtunnel import ( "flag" "fmt" "github.com/esrrhs/go-engine/src/loggo" - "github.com/esrrhs/pingtunnel" + "github.com/esrrhs/go-engine/src/pingtunnel" "strconv" "time" ) diff --git a/msg.pb.go b/msg.pb.go deleted file mode 100644 index 2884b3f..0000000 --- a/msg.pb.go +++ /dev/null @@ -1,441 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: msg.proto - -package pingtunnel - -import ( - fmt "fmt" - proto "github.com/golang/protobuf/proto" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type MyMsg_TYPE int32 - -const ( - MyMsg_DATA MyMsg_TYPE = 0 - MyMsg_PING MyMsg_TYPE = 1 - MyMsg_KICK MyMsg_TYPE = 2 - MyMsg_MAGIC MyMsg_TYPE = 57005 -) - -var MyMsg_TYPE_name = map[int32]string{ - 0: "DATA", - 1: "PING", - 2: "KICK", - 57005: "MAGIC", -} - -var MyMsg_TYPE_value = map[string]int32{ - "DATA": 0, - "PING": 1, - "KICK": 2, - "MAGIC": 57005, -} - -func (x MyMsg_TYPE) String() string { - return proto.EnumName(MyMsg_TYPE_name, int32(x)) -} - -func (MyMsg_TYPE) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_c06e4cca6c2cc899, []int{0, 0} -} - -type FrameData_TYPE int32 - -const ( - FrameData_USER_DATA FrameData_TYPE = 0 - FrameData_CONN FrameData_TYPE = 1 - FrameData_CONNRSP FrameData_TYPE = 2 - FrameData_CLOSE FrameData_TYPE = 3 -) - -var FrameData_TYPE_name = map[int32]string{ - 0: "USER_DATA", - 1: "CONN", - 2: "CONNRSP", - 3: "CLOSE", -} - -var FrameData_TYPE_value = map[string]int32{ - "USER_DATA": 0, - "CONN": 1, - "CONNRSP": 2, - "CLOSE": 3, -} - -func (x FrameData_TYPE) String() string { - return proto.EnumName(FrameData_TYPE_name, int32(x)) -} - -func (FrameData_TYPE) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_c06e4cca6c2cc899, []int{1, 0} -} - -type Frame_TYPE int32 - -const ( - Frame_DATA Frame_TYPE = 0 - Frame_REQ Frame_TYPE = 1 - Frame_ACK Frame_TYPE = 2 - Frame_PING Frame_TYPE = 3 - Frame_PONG Frame_TYPE = 4 -) - -var Frame_TYPE_name = map[int32]string{ - 0: "DATA", - 1: "REQ", - 2: "ACK", - 3: "PING", - 4: "PONG", -} - -var Frame_TYPE_value = map[string]int32{ - "DATA": 0, - "REQ": 1, - "ACK": 2, - "PING": 3, - "PONG": 4, -} - -func (x Frame_TYPE) String() string { - return proto.EnumName(Frame_TYPE_name, int32(x)) -} - -func (Frame_TYPE) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_c06e4cca6c2cc899, []int{2, 0} -} - -type MyMsg struct { - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Type int32 `protobuf:"varint,2,opt,name=type,proto3" json:"type,omitempty"` - Target string `protobuf:"bytes,3,opt,name=target,proto3" json:"target,omitempty"` - Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` - Rproto int32 `protobuf:"zigzag32,5,opt,name=rproto,proto3" json:"rproto,omitempty"` - Magic int32 `protobuf:"zigzag32,6,opt,name=magic,proto3" json:"magic,omitempty"` - Key int32 `protobuf:"zigzag32,7,opt,name=key,proto3" json:"key,omitempty"` - Timeout int32 `protobuf:"varint,8,opt,name=timeout,proto3" json:"timeout,omitempty"` - Tcpmode int32 `protobuf:"varint,9,opt,name=tcpmode,proto3" json:"tcpmode,omitempty"` - TcpmodeBuffersize int32 `protobuf:"varint,10,opt,name=tcpmode_buffersize,json=tcpmodeBuffersize,proto3" json:"tcpmode_buffersize,omitempty"` - TcpmodeMaxwin int32 `protobuf:"varint,11,opt,name=tcpmode_maxwin,json=tcpmodeMaxwin,proto3" json:"tcpmode_maxwin,omitempty"` - TcpmodeResendTimems int32 `protobuf:"varint,12,opt,name=tcpmode_resend_timems,json=tcpmodeResendTimems,proto3" json:"tcpmode_resend_timems,omitempty"` - TcpmodeCompress int32 `protobuf:"varint,13,opt,name=tcpmode_compress,json=tcpmodeCompress,proto3" json:"tcpmode_compress,omitempty"` - TcpmodeStat int32 `protobuf:"varint,14,opt,name=tcpmode_stat,json=tcpmodeStat,proto3" json:"tcpmode_stat,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *MyMsg) Reset() { *m = MyMsg{} } -func (m *MyMsg) String() string { return proto.CompactTextString(m) } -func (*MyMsg) ProtoMessage() {} -func (*MyMsg) Descriptor() ([]byte, []int) { - return fileDescriptor_c06e4cca6c2cc899, []int{0} -} - -func (m *MyMsg) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_MyMsg.Unmarshal(m, b) -} -func (m *MyMsg) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_MyMsg.Marshal(b, m, deterministic) -} -func (m *MyMsg) XXX_Merge(src proto.Message) { - xxx_messageInfo_MyMsg.Merge(m, src) -} -func (m *MyMsg) XXX_Size() int { - return xxx_messageInfo_MyMsg.Size(m) -} -func (m *MyMsg) XXX_DiscardUnknown() { - xxx_messageInfo_MyMsg.DiscardUnknown(m) -} - -var xxx_messageInfo_MyMsg proto.InternalMessageInfo - -func (m *MyMsg) GetId() string { - if m != nil { - return m.Id - } - return "" -} - -func (m *MyMsg) GetType() int32 { - if m != nil { - return m.Type - } - return 0 -} - -func (m *MyMsg) GetTarget() string { - if m != nil { - return m.Target - } - return "" -} - -func (m *MyMsg) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func (m *MyMsg) GetRproto() int32 { - if m != nil { - return m.Rproto - } - return 0 -} - -func (m *MyMsg) GetMagic() int32 { - if m != nil { - return m.Magic - } - return 0 -} - -func (m *MyMsg) GetKey() int32 { - if m != nil { - return m.Key - } - return 0 -} - -func (m *MyMsg) GetTimeout() int32 { - if m != nil { - return m.Timeout - } - return 0 -} - -func (m *MyMsg) GetTcpmode() int32 { - if m != nil { - return m.Tcpmode - } - return 0 -} - -func (m *MyMsg) GetTcpmodeBuffersize() int32 { - if m != nil { - return m.TcpmodeBuffersize - } - return 0 -} - -func (m *MyMsg) GetTcpmodeMaxwin() int32 { - if m != nil { - return m.TcpmodeMaxwin - } - return 0 -} - -func (m *MyMsg) GetTcpmodeResendTimems() int32 { - if m != nil { - return m.TcpmodeResendTimems - } - return 0 -} - -func (m *MyMsg) GetTcpmodeCompress() int32 { - if m != nil { - return m.TcpmodeCompress - } - return 0 -} - -func (m *MyMsg) GetTcpmodeStat() int32 { - if m != nil { - return m.TcpmodeStat - } - return 0 -} - -type FrameData struct { - Type int32 `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` - Compress bool `protobuf:"varint,3,opt,name=compress,proto3" json:"compress,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *FrameData) Reset() { *m = FrameData{} } -func (m *FrameData) String() string { return proto.CompactTextString(m) } -func (*FrameData) ProtoMessage() {} -func (*FrameData) Descriptor() ([]byte, []int) { - return fileDescriptor_c06e4cca6c2cc899, []int{1} -} - -func (m *FrameData) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_FrameData.Unmarshal(m, b) -} -func (m *FrameData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_FrameData.Marshal(b, m, deterministic) -} -func (m *FrameData) XXX_Merge(src proto.Message) { - xxx_messageInfo_FrameData.Merge(m, src) -} -func (m *FrameData) XXX_Size() int { - return xxx_messageInfo_FrameData.Size(m) -} -func (m *FrameData) XXX_DiscardUnknown() { - xxx_messageInfo_FrameData.DiscardUnknown(m) -} - -var xxx_messageInfo_FrameData proto.InternalMessageInfo - -func (m *FrameData) GetType() int32 { - if m != nil { - return m.Type - } - return 0 -} - -func (m *FrameData) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func (m *FrameData) GetCompress() bool { - if m != nil { - return m.Compress - } - return false -} - -type Frame struct { - Type int32 `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"` - Resend bool `protobuf:"varint,2,opt,name=resend,proto3" json:"resend,omitempty"` - Sendtime int64 `protobuf:"varint,3,opt,name=sendtime,proto3" json:"sendtime,omitempty"` - Id int32 `protobuf:"varint,4,opt,name=id,proto3" json:"id,omitempty"` - Data *FrameData `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"` - Dataid []int32 `protobuf:"varint,6,rep,packed,name=dataid,proto3" json:"dataid,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Frame) Reset() { *m = Frame{} } -func (m *Frame) String() string { return proto.CompactTextString(m) } -func (*Frame) ProtoMessage() {} -func (*Frame) Descriptor() ([]byte, []int) { - return fileDescriptor_c06e4cca6c2cc899, []int{2} -} - -func (m *Frame) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Frame.Unmarshal(m, b) -} -func (m *Frame) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Frame.Marshal(b, m, deterministic) -} -func (m *Frame) XXX_Merge(src proto.Message) { - xxx_messageInfo_Frame.Merge(m, src) -} -func (m *Frame) XXX_Size() int { - return xxx_messageInfo_Frame.Size(m) -} -func (m *Frame) XXX_DiscardUnknown() { - xxx_messageInfo_Frame.DiscardUnknown(m) -} - -var xxx_messageInfo_Frame proto.InternalMessageInfo - -func (m *Frame) GetType() int32 { - if m != nil { - return m.Type - } - return 0 -} - -func (m *Frame) GetResend() bool { - if m != nil { - return m.Resend - } - return false -} - -func (m *Frame) GetSendtime() int64 { - if m != nil { - return m.Sendtime - } - return 0 -} - -func (m *Frame) GetId() int32 { - if m != nil { - return m.Id - } - return 0 -} - -func (m *Frame) GetData() *FrameData { - if m != nil { - return m.Data - } - return nil -} - -func (m *Frame) GetDataid() []int32 { - if m != nil { - return m.Dataid - } - return nil -} - -func init() { - proto.RegisterEnum("MyMsg_TYPE", MyMsg_TYPE_name, MyMsg_TYPE_value) - proto.RegisterEnum("FrameData_TYPE", FrameData_TYPE_name, FrameData_TYPE_value) - proto.RegisterEnum("Frame_TYPE", Frame_TYPE_name, Frame_TYPE_value) - proto.RegisterType((*MyMsg)(nil), "MyMsg") - proto.RegisterType((*FrameData)(nil), "FrameData") - proto.RegisterType((*Frame)(nil), "Frame") -} - -func init() { proto.RegisterFile("msg.proto", fileDescriptor_c06e4cca6c2cc899) } - -var fileDescriptor_c06e4cca6c2cc899 = []byte{ - // 499 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x52, 0xcb, 0x6e, 0xd3, 0x4c, - 0x14, 0xfe, 0xc7, 0x97, 0x24, 0x3e, 0xb9, 0xfc, 0xd3, 0xe1, 0xa2, 0x11, 0x0b, 0x14, 0x2c, 0x21, - 0x99, 0x05, 0x95, 0x28, 0x12, 0xac, 0x53, 0x37, 0x44, 0x51, 0xc9, 0x85, 0x49, 0x58, 0xc0, 0x26, - 0x72, 0xe3, 0xa9, 0x65, 0x81, 0x2f, 0xb2, 0x27, 0x82, 0xf0, 0x04, 0xbc, 0x0c, 0x8f, 0xc0, 0x33, - 0xb0, 0xe3, 0x79, 0xd0, 0x9c, 0x8e, 0xdd, 0x4a, 0xb0, 0xf2, 0x77, 0x4b, 0xfc, 0xf9, 0x9c, 0x03, - 0x5e, 0x56, 0x27, 0xa7, 0x65, 0x55, 0xa8, 0xc2, 0xff, 0x6d, 0x83, 0xbb, 0x38, 0x2e, 0xea, 0x84, - 0x8d, 0xc0, 0x4a, 0x63, 0x4e, 0xc6, 0x24, 0xf0, 0x84, 0x95, 0xc6, 0x8c, 0x81, 0xa3, 0x8e, 0xa5, - 0xe4, 0xd6, 0x98, 0x04, 0xae, 0x40, 0xcc, 0x1e, 0x42, 0x47, 0x45, 0x55, 0x22, 0x15, 0xb7, 0x31, - 0x67, 0x98, 0xce, 0xc6, 0x91, 0x8a, 0xb8, 0x33, 0x26, 0xc1, 0x40, 0x20, 0xd6, 0xd9, 0x0a, 0xdf, - 0xc1, 0xdd, 0x31, 0x09, 0x4e, 0x84, 0x61, 0xec, 0x3e, 0xb8, 0x59, 0x94, 0xa4, 0x7b, 0xde, 0x41, - 0xf9, 0x86, 0x30, 0x0a, 0xf6, 0x27, 0x79, 0xe4, 0x5d, 0xd4, 0x34, 0x64, 0x1c, 0xba, 0x2a, 0xcd, - 0x64, 0x71, 0x50, 0xbc, 0x87, 0x15, 0x1a, 0x8a, 0xce, 0xbe, 0xcc, 0x8a, 0x58, 0x72, 0xcf, 0x38, - 0x37, 0x94, 0x3d, 0x07, 0x66, 0xe0, 0xee, 0xea, 0x70, 0x7d, 0x2d, 0xab, 0x3a, 0xfd, 0x26, 0x39, - 0x60, 0xe8, 0xc4, 0x38, 0xe7, 0xad, 0xc1, 0x9e, 0xc2, 0xa8, 0x89, 0x67, 0xd1, 0xd7, 0x2f, 0x69, - 0xce, 0xfb, 0x18, 0x1d, 0x1a, 0x75, 0x81, 0x22, 0x3b, 0x83, 0x07, 0x4d, 0xac, 0x92, 0xb5, 0xcc, - 0xe3, 0x9d, 0x6e, 0x92, 0xd5, 0x7c, 0x80, 0xe9, 0x7b, 0xc6, 0x14, 0xe8, 0x6d, 0xd1, 0x62, 0xcf, - 0x80, 0x36, 0xbf, 0xd9, 0x17, 0x59, 0x59, 0xc9, 0xba, 0xe6, 0x43, 0x8c, 0xff, 0x6f, 0xf4, 0xd0, - 0xc8, 0xec, 0x09, 0x0c, 0x9a, 0x68, 0xad, 0x22, 0xc5, 0x47, 0x18, 0xeb, 0x1b, 0x6d, 0xa3, 0x22, - 0xe5, 0xbf, 0x00, 0x67, 0xfb, 0x61, 0x3d, 0x65, 0x3d, 0x70, 0x2e, 0x26, 0xdb, 0x09, 0xfd, 0x4f, - 0xa3, 0xf5, 0x7c, 0x39, 0xa3, 0x44, 0xa3, 0xcb, 0x79, 0x78, 0x49, 0x2d, 0xd6, 0x07, 0x77, 0x31, - 0x99, 0xcd, 0x43, 0xfa, 0xe3, 0xa7, 0xed, 0x7f, 0x27, 0xe0, 0xbd, 0xa9, 0xa2, 0x4c, 0x5e, 0xe8, - 0x65, 0x34, 0xcb, 0x24, 0x77, 0x96, 0xd9, 0x2c, 0xcd, 0xba, 0xb3, 0xb4, 0x47, 0xd0, 0x6b, 0xeb, - 0xea, 0x15, 0xf7, 0x44, 0xcb, 0xfd, 0xd7, 0xa6, 0xc4, 0x10, 0xbc, 0xf7, 0x9b, 0xa9, 0xd8, 0xdd, - 0x36, 0x09, 0x57, 0xcb, 0x25, 0x25, 0xac, 0x0f, 0x5d, 0x8d, 0xc4, 0x66, 0x4d, 0x2d, 0xe6, 0x81, - 0x1b, 0xbe, 0x5d, 0x6d, 0xa6, 0xd4, 0xf6, 0x7f, 0x11, 0x70, 0xb1, 0xca, 0x3f, 0x6b, 0xe8, 0x3b, - 0xc1, 0xc9, 0x61, 0x91, 0x9e, 0x30, 0x4c, 0x57, 0xd1, 0x4f, 0x3d, 0x6a, 0xac, 0x62, 0x8b, 0x96, - 0x9b, 0x5b, 0x75, 0xf0, 0x5f, 0xf4, 0xad, 0x3e, 0x36, 0x9f, 0xa2, 0x2f, 0xad, 0x7f, 0x06, 0xa7, - 0xed, 0x87, 0xdf, 0xde, 0xa2, 0x7e, 0xa6, 0x31, 0xef, 0x8c, 0xed, 0xc0, 0x15, 0x86, 0xf9, 0xaf, - 0xfe, 0x9a, 0x6b, 0x17, 0x6c, 0x31, 0x7d, 0x47, 0x89, 0x06, 0x13, 0x9c, 0x6a, 0x33, 0x69, 0x1b, - 0xd1, 0x6a, 0x39, 0xa3, 0xce, 0xf9, 0xe0, 0x23, 0x94, 0x69, 0x9e, 0xa8, 0x43, 0x9e, 0xcb, 0xcf, - 0x57, 0x1d, 0x3c, 0xec, 0x97, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x15, 0x89, 0x25, 0x68, 0x57, - 0x03, 0x00, 0x00, -} diff --git a/msg.proto b/msg.proto deleted file mode 100644 index 8571aff..0000000 --- a/msg.proto +++ /dev/null @@ -1,55 +0,0 @@ -syntax = "proto3"; -option go_package = "pingtunnel"; - -message MyMsg { - enum TYPE { - DATA = 0; - PING = 1; - KICK = 2; - MAGIC = 0xdead; - } - - string id = 1; - int32 type = 2; - string target = 3; - bytes data = 4; - sint32 rproto = 5; - sint32 magic = 6; - sint32 key = 7; - int32 timeout = 8; - int32 tcpmode = 9; - int32 tcpmode_buffersize = 10; - int32 tcpmode_maxwin = 11; - int32 tcpmode_resend_timems = 12; - int32 tcpmode_compress = 13; - int32 tcpmode_stat = 14; -} - -message FrameData { - enum TYPE { - USER_DATA = 0; - CONN = 1; - CONNRSP = 2; - CLOSE = 3; - } - int32 type = 1; - bytes data = 2; - bool compress = 3; -} - -message Frame { - enum TYPE { - DATA = 0; - REQ = 1; - ACK = 2; - PING = 3; - PONG = 4; - } - - int32 type = 1; - bool resend = 2; - int64 sendtime = 3; - int32 id = 4; - FrameData data = 5; - repeated int32 dataid = 6; -} diff --git a/pingtunnel.go b/pingtunnel.go deleted file mode 100644 index 4681002..0000000 --- a/pingtunnel.go +++ /dev/null @@ -1,149 +0,0 @@ -package pingtunnel - -import ( - "crypto/md5" - "crypto/rand" - "encoding/base64" - "encoding/binary" - "encoding/hex" - "github.com/esrrhs/go-engine/src/loggo" - "github.com/golang/protobuf/proto" - "golang.org/x/net/icmp" - "golang.org/x/net/ipv4" - "io" - "net" - "sync" - "syscall" - "time" -) - -func sendICMP(id int, sequence int, conn icmp.PacketConn, server *net.IPAddr, target string, - connId string, msgType uint32, data []byte, sproto int, rproto int, key int, - tcpmode int, tcpmode_buffer_size int, tcpmode_maxwin int, tcpmode_resend_time int, tcpmode_compress int, tcpmode_stat int, - timeout int) { - - m := &MyMsg{ - Id: connId, - Type: (int32)(msgType), - Target: target, - Data: data, - Rproto: (int32)(rproto), - Key: (int32)(key), - Tcpmode: (int32)(tcpmode), - TcpmodeBuffersize: (int32)(tcpmode_buffer_size), - TcpmodeMaxwin: (int32)(tcpmode_maxwin), - TcpmodeResendTimems: (int32)(tcpmode_resend_time), - TcpmodeCompress: (int32)(tcpmode_compress), - TcpmodeStat: (int32)(tcpmode_stat), - Timeout: (int32)(timeout), - Magic: (int32)(MyMsg_MAGIC), - } - - mb, err := proto.Marshal(m) - if err != nil { - loggo.Error("sendICMP Marshal MyMsg error %s %s", server.String(), err) - return - } - - body := &icmp.Echo{ - ID: id, - Seq: sequence, - Data: mb, - } - - msg := &icmp.Message{ - Type: (ipv4.ICMPType)(sproto), - Code: 0, - Body: body, - } - - bytes, err := msg.Marshal(nil) - if err != nil { - loggo.Error("sendICMP Marshal error %s %s", server.String(), err) - return - } - - for { - if _, err := conn.WriteTo(bytes, server); err != nil { - if neterr, ok := err.(*net.OpError); ok { - if neterr.Err == syscall.ENOBUFS { - continue - } - } - loggo.Info("sendICMP WriteTo error %s %s", server.String(), err) - } - break - } - - return -} - -func recvICMP(workResultLock *sync.WaitGroup, exit *bool, conn icmp.PacketConn, recv chan<- *Packet) { - - (*workResultLock).Add(1) - defer (*workResultLock).Done() - - bytes := make([]byte, 10240) - for !*exit { - conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) - n, srcaddr, err := conn.ReadFrom(bytes) - - if err != nil { - nerr, ok := err.(net.Error) - if !ok || !nerr.Timeout() { - loggo.Info("Error read icmp message %s", err) - continue - } - } - - if n <= 0 { - continue - } - - echoId := int(binary.BigEndian.Uint16(bytes[4:6])) - echoSeq := int(binary.BigEndian.Uint16(bytes[6:8])) - - my := &MyMsg{} - err = proto.Unmarshal(bytes[8:n], my) - if err != nil { - loggo.Debug("Unmarshal MyMsg error: %s", err) - continue - } - - if my.Magic != (int32)(MyMsg_MAGIC) { - loggo.Debug("processPacket data invalid %s", my.Id) - continue - } - - recv <- &Packet{my: my, - src: srcaddr.(*net.IPAddr), - echoId: echoId, echoSeq: echoSeq} - } -} - -type Packet struct { - my *MyMsg - src *net.IPAddr - echoId int - echoSeq int -} - -func UniqueId() string { - b := make([]byte, 48) - - if _, err := io.ReadFull(rand.Reader, b); err != nil { - return "" - } - return GetMd5String(base64.URLEncoding.EncodeToString(b)) -} - -func GetMd5String(s string) string { - h := md5.New() - h.Write([]byte(s)) - return hex.EncodeToString(h.Sum(nil)) -} - -const ( - FRAME_MAX_SIZE int = 888 - FRAME_MAX_ID int = 100000 -) diff --git a/pingtunnel_test.go b/pingtunnel_test.go deleted file mode 100644 index e97d2c1..0000000 --- a/pingtunnel_test.go +++ /dev/null @@ -1,116 +0,0 @@ -package pingtunnel - -import ( - "fmt" - "github.com/golang/protobuf/proto" - "testing" -) - -func Test0001(t *testing.T) { - - my := &MyMsg{} - my.Id = "12345" - my.Target = "111:11" - my.Type = 12 - my.Data = make([]byte, 0) - dst, _ := proto.Marshal(my) - fmt.Println("dst = ", dst) - - my1 := &MyMsg{} - proto.Unmarshal(dst, my1) - fmt.Println("my1 = ", my1) - fmt.Println("my1.Data = ", my1.Data) - - proto.Unmarshal(dst[0:4], my1) - fmt.Println("my1 = ", my1) - - fm := FrameMgr{} - fm.recvid = 4 - fm.windowsize = 100 - lr := &Frame{} - rr := &Frame{} - lr.Id = 1 - rr.Id = 4 - fmt.Println("fm.compareId(lr, rr) = ", fm.compareId((int)(lr.Id), (int)(rr.Id))) - - lr.Id = 99 - rr.Id = 8 - fmt.Println("fm.compareId(lr, rr) = ", fm.compareId((int)(lr.Id), (int)(rr.Id))) - - fm.recvid = 9000 - lr.Id = 9998 - rr.Id = 9999 - fmt.Println("fm.compareId(lr, rr) = ", fm.compareId((int)(lr.Id), (int)(rr.Id))) - - fm.recvid = 9000 - lr.Id = 9998 - rr.Id = 8 - fmt.Println("fm.compareId(lr, rr) = ", fm.compareId((int)(lr.Id), (int)(rr.Id))) - - fm.recvid = 0 - lr.Id = 9998 - rr.Id = 8 - fmt.Println("fm.compareId(lr, rr) = ", fm.compareId((int)(lr.Id), (int)(rr.Id))) - - fm.recvid = 0 - fm.windowsize = 5 - fmt.Println("fm.isIdInRange = ", fm.isIdInRange(4, 10)) - - fm.recvid = 0 - fm.windowsize = 5 - fmt.Println("fm.isIdInRange = ", fm.isIdInRange(5, 10)) - - fm.recvid = 4 - fm.windowsize = 5 - fmt.Println("fm.isIdInRange = ", fm.isIdInRange(1, 10)) - - fm.recvid = 7 - fm.windowsize = 5 - fmt.Println("fm.isIdInRange = ", fm.isIdInRange(1, 10)) - - fm.recvid = 7 - fm.windowsize = 5 - fmt.Println("fm.isIdInRange = ", fm.isIdInRange(2, 10)) - - fm.recvid = 7 - fm.windowsize = 5 - fmt.Println("fm.isIdInRange = ", fm.isIdInRange(9, 10)) - - fm.recvid = 10 - fm.windowsize = 10000 - fmt.Println("fm.isIdInRange = ", fm.isIdInRange(0, FRAME_MAX_ID)) - - fm.recvid = 7 - fm.windowsize = 5 - fmt.Println("fm.isIdOld = ", fm.isIdOld(2, 10)) - - fm.recvid = 7 - fm.windowsize = 5 - fmt.Println("fm.isIdOld = ", fm.isIdOld(1, 10)) - - fm.recvid = 3 - fm.windowsize = 5 - fmt.Println("fm.isIdOld = ", fm.isIdOld(1, 10)) - - fm.recvid = 13 - fm.windowsize = 10000 - fmt.Println("fm.isIdOld = ", fm.isIdOld(9, FRAME_MAX_ID)) - - dd := fm.compressData(([]byte)("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")) - fmt.Println("fm.compressData = ", len(dd)) - - _, ddd := fm.deCompressData(dd) - fmt.Println("fm.deCompressData = ", (string)(ddd)) - - mm := make(map[int32]int) - mm[1] = 1 - mm[2] = 1 - mm[3] = 1 - mm[4] = 2 - mm[6] = 7 - mms := fm.printStatMap(&mm) - fmt.Println("fm.printStatMap = ", mms) - fm.openstat = 1 - fm.resetStat() - fm.printStat() -} diff --git a/server.go b/server.go deleted file mode 100644 index 48d226b..0000000 --- a/server.go +++ /dev/null @@ -1,526 +0,0 @@ -package pingtunnel - -import ( - "github.com/esrrhs/go-engine/src/common" - "github.com/esrrhs/go-engine/src/loggo" - "github.com/esrrhs/go-engine/src/threadpool" - "github.com/golang/protobuf/proto" - "golang.org/x/net/icmp" - "net" - "sync" - "time" -) - -func NewServer(key int, maxconn int, maxprocessthread int, maxprocessbuffer int) (*Server, error) { - s := &Server{ - exit: false, - key: key, - maxconn: maxconn, - } - - s.processtp = threadpool.NewThreadPool(maxprocessthread, maxprocessbuffer, func(v interface{}) { - packet := v.(*Packet) - s.processDataPacket(packet) - }) - - return s, nil -} - -type Server struct { - exit bool - key int - interval *time.Ticker - workResultLock sync.WaitGroup - maxconn int - - conn *icmp.PacketConn - - localConnMap sync.Map - remoteConnErrorMap sync.Map - - sendPacket uint64 - recvPacket uint64 - sendPacketSize uint64 - recvPacketSize uint64 - localConnMapSize int - - echoId int - echoSeq int - - processtp *threadpool.ThreadPool -} - -type ServerConn struct { - exit bool - timeout int - ipaddrTarget *net.UDPAddr - conn *net.UDPConn - tcpaddrTarget *net.TCPAddr - tcpconn *net.TCPConn - id string - activeRecvTime time.Time - activeSendTime time.Time - close bool - rproto int - fm *FrameMgr - tcpmode int -} - -func (p *Server) Run() error { - - conn, err := icmp.ListenPacket("ip4:icmp", "") - if err != nil { - loggo.Error("Error listening for ICMP packets: %s", err.Error()) - return err - } - p.conn = conn - - recv := make(chan *Packet, 10000) - go recvICMP(&p.workResultLock, &p.exit, *p.conn, recv) - - p.interval = time.NewTicker(time.Second) - - go func() { - p.workResultLock.Add(1) - defer p.workResultLock.Done() - - for !p.exit { - select { - case <-p.interval.C: - p.checkTimeoutConn() - p.showNet() - case r := <-recv: - p.processPacket(r) - } - } - }() - - return nil -} - -func (p *Server) Stop() { - p.exit = true - p.workResultLock.Wait() - p.conn.Close() - p.interval.Stop() -} - -func (p *Server) processPacket(packet *Packet) { - - if packet.my.Key != (int32)(p.key) { - return - } - - p.echoId = packet.echoId - p.echoSeq = packet.echoSeq - - if packet.my.Type == (int32)(MyMsg_PING) { - t := time.Time{} - t.UnmarshalBinary(packet.my.Data) - loggo.Info("ping from %s %s %d %d %d", packet.src.String(), t.String(), packet.my.Rproto, packet.echoId, packet.echoSeq) - sendICMP(packet.echoId, packet.echoSeq, *p.conn, packet.src, "", "", (uint32)(MyMsg_PING), packet.my.Data, - (int)(packet.my.Rproto), -1, p.key, - 0, 0, 0, 0, 0, 0, - 0) - return - } - - if packet.my.Type == (int32)(MyMsg_KICK) { - localConn := p.getServerConnById(packet.my.Id) - if localConn != nil { - p.close(localConn) - loggo.Info("remote kick local %s", packet.my.Id) - } - return - } - - p.processtp.AddJob((int)(common.HashString(packet.my.Id)), packet) -} - -func (p *Server) processDataPacket(packet *Packet) { - - loggo.Debug("processPacket %s %s %d", packet.my.Id, packet.src.String(), len(packet.my.Data)) - - now := time.Now() - - id := packet.my.Id - localConn := p.getServerConnById(id) - if localConn == nil { - - if p.maxconn > 0 && p.localConnMapSize >= p.maxconn { - loggo.Info("too many connections %d, server connected target fail %s", p.localConnMapSize, packet.my.Target) - p.remoteError(id, packet) - return - } - - if packet.my.Tcpmode > 0 { - - addr := packet.my.Target - - c, err := net.DialTimeout("tcp", addr, time.Second) - if err != nil { - loggo.Error("Error listening for tcp packets: %s", err.Error()) - p.remoteError(id, packet) - return - } - targetConn := c.(*net.TCPConn) - ipaddrTarget := targetConn.RemoteAddr().(*net.TCPAddr) - - fm := NewFrameMgr((int)(packet.my.TcpmodeBuffersize), (int)(packet.my.TcpmodeMaxwin), (int)(packet.my.TcpmodeResendTimems), (int)(packet.my.TcpmodeCompress), - (int)(packet.my.TcpmodeStat)) - - localConn = &ServerConn{exit: false, timeout: (int)(packet.my.Timeout), tcpconn: targetConn, tcpaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false, - rproto: (int)(packet.my.Rproto), fm: fm, tcpmode: (int)(packet.my.Tcpmode)} - - p.addServerConn(id, localConn) - - go p.RecvTCP(localConn, id, packet.src) - - } else { - - addr := packet.my.Target - - c, err := net.DialTimeout("udp", addr, time.Second) - if err != nil { - loggo.Error("Error listening for tcp packets: %s", err.Error()) - p.remoteError(id, packet) - return - } - targetConn := c.(*net.UDPConn) - ipaddrTarget := targetConn.RemoteAddr().(*net.UDPAddr) - - localConn = &ServerConn{exit: false, timeout: (int)(packet.my.Timeout), conn: targetConn, ipaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false, - rproto: (int)(packet.my.Rproto), tcpmode: (int)(packet.my.Tcpmode)} - - p.addServerConn(id, localConn) - - go p.Recv(localConn, id, packet.src) - } - } - - localConn.activeRecvTime = now - - if packet.my.Type == (int32)(MyMsg_DATA) { - - if packet.my.Tcpmode > 0 { - f := &Frame{} - err := proto.Unmarshal(packet.my.Data, f) - if err != nil { - loggo.Error("Unmarshal tcp Error %s", err) - return - } - - localConn.fm.OnRecvFrame(f) - - } else { - _, err := localConn.conn.Write(packet.my.Data) - if err != nil { - loggo.Info("WriteToUDP Error %s", err) - localConn.close = true - return - } - } - - p.recvPacket++ - p.recvPacketSize += (uint64)(len(packet.my.Data)) - } -} - -func (p *Server) RecvTCP(conn *ServerConn, id string, src *net.IPAddr) { - - p.workResultLock.Add(1) - defer p.workResultLock.Done() - - loggo.Info("server waiting target response %s -> %s %s", conn.tcpaddrTarget.String(), conn.id, conn.tcpconn.LocalAddr().String()) - - loggo.Info("start wait remote connect tcp %s %s", conn.id, conn.tcpaddrTarget.String()) - startConnectTime := time.Now() - for !p.exit && !conn.exit { - if conn.fm.IsConnected() { - break - } - conn.fm.Update() - sendlist := conn.fm.getSendList() - for e := sendlist.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - mb, _ := proto.Marshal(f) - sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb, - conn.rproto, -1, p.key, - 0, 0, 0, 0, 0, 0, - 0) - p.sendPacket++ - p.sendPacketSize += (uint64)(len(mb)) - } - time.Sleep(time.Millisecond * 10) - now := time.Now() - diffclose := now.Sub(startConnectTime) - if diffclose > time.Second*(time.Duration(conn.timeout)) { - loggo.Info("can not connect remote tcp %s %s", conn.id, conn.tcpaddrTarget.String()) - p.close(conn) - return - } - } - - if !conn.exit { - loggo.Info("remote connected tcp %s %s", conn.id, conn.tcpaddrTarget.String()) - } - - bytes := make([]byte, 10240) - - tcpActiveRecvTime := time.Now() - tcpActiveSendTime := time.Now() - - for !p.exit && !conn.exit { - now := time.Now() - sleep := true - - left := common.MinOfInt(conn.fm.GetSendBufferLeft(), len(bytes)) - if left > 0 { - conn.tcpconn.SetReadDeadline(time.Now().Add(time.Millisecond * 1)) - n, err := conn.tcpconn.Read(bytes[0:left]) - if err != nil { - nerr, ok := err.(net.Error) - if !ok || !nerr.Timeout() { - loggo.Info("Error read tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) - conn.fm.Close() - break - } - } - if n > 0 { - sleep = false - conn.fm.WriteSendBuffer(bytes[:n]) - tcpActiveRecvTime = now - } - } - - conn.fm.Update() - - sendlist := conn.fm.getSendList() - if sendlist.Len() > 0 { - sleep = false - conn.activeSendTime = now - for e := sendlist.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - mb, err := proto.Marshal(f) - if err != nil { - loggo.Error("Error tcp Marshal %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) - continue - } - sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb, - conn.rproto, -1, p.key, - 0, 0, 0, 0, 0, 0, - 0) - p.sendPacket++ - p.sendPacketSize += (uint64)(len(mb)) - } - } - - if conn.fm.GetRecvBufferSize() > 0 { - sleep = false - rr := conn.fm.GetRecvReadLineBuffer() - conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 1)) - n, err := conn.tcpconn.Write(rr) - if err != nil { - nerr, ok := err.(net.Error) - if !ok || !nerr.Timeout() { - loggo.Info("Error write tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) - conn.fm.Close() - break - } - } - if n > 0 { - conn.fm.SkipRecvBuffer(n) - tcpActiveSendTime = now - } - } - - if sleep { - time.Sleep(time.Millisecond * 10) - } - - diffrecv := now.Sub(conn.activeRecvTime) - diffsend := now.Sub(conn.activeSendTime) - tcpdiffrecv := now.Sub(tcpActiveRecvTime) - tcpdiffsend := now.Sub(tcpActiveSendTime) - if diffrecv > time.Second*(time.Duration(conn.timeout)) || diffsend > time.Second*(time.Duration(conn.timeout)) || - tcpdiffrecv > time.Second*(time.Duration(conn.timeout)) || tcpdiffsend > time.Second*(time.Duration(conn.timeout)) { - loggo.Info("close inactive conn %s %s", conn.id, conn.tcpaddrTarget.String()) - conn.fm.Close() - break - } - - if conn.fm.IsRemoteClosed() { - loggo.Info("closed by remote conn %s %s", conn.id, conn.tcpaddrTarget.String()) - conn.fm.Close() - break - } - } - - startCloseTime := time.Now() - for !p.exit && !conn.exit { - now := time.Now() - - conn.fm.Update() - - sendlist := conn.fm.getSendList() - for e := sendlist.Front(); e != nil; e = e.Next() { - f := e.Value.(*Frame) - mb, _ := proto.Marshal(f) - sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb, - conn.rproto, -1, p.key, - 0, 0, 0, 0, 0, 0, - 0) - p.sendPacket++ - p.sendPacketSize += (uint64)(len(mb)) - } - - nodatarecv := true - if conn.fm.GetRecvBufferSize() > 0 { - rr := conn.fm.GetRecvReadLineBuffer() - conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100)) - n, _ := conn.tcpconn.Write(rr) - if n > 0 { - conn.fm.SkipRecvBuffer(n) - nodatarecv = false - } - } - - diffclose := now.Sub(startCloseTime) - timeout := diffclose > time.Second*(time.Duration(conn.timeout)) - remoteclosed := conn.fm.IsRemoteClosed() - - if timeout { - loggo.Info("close conn had timeout %s %s", conn.id, conn.tcpaddrTarget.String()) - break - } - - if remoteclosed && nodatarecv { - loggo.Info("remote conn had closed %s %s", conn.id, conn.tcpaddrTarget.String()) - break - } - - time.Sleep(time.Millisecond * 100) - } - - time.Sleep(time.Second) - - loggo.Info("close tcp conn %s %s", conn.id, conn.tcpaddrTarget.String()) - p.close(conn) -} - -func (p *Server) Recv(conn *ServerConn, id string, src *net.IPAddr) { - - p.workResultLock.Add(1) - defer p.workResultLock.Done() - - loggo.Info("server waiting target response %s -> %s %s", conn.ipaddrTarget.String(), conn.id, conn.conn.LocalAddr().String()) - - for !p.exit { - bytes := make([]byte, 2000) - - conn.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) - n, _, err := conn.conn.ReadFromUDP(bytes) - if err != nil { - nerr, ok := err.(net.Error) - if !ok || !nerr.Timeout() { - loggo.Info("ReadFromUDP Error read udp %s", err) - conn.close = true - return - } - } - - now := time.Now() - conn.activeSendTime = now - - sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), bytes[:n], - conn.rproto, -1, p.key, - 0, 0, 0, 0, 0, 0, - 0) - - p.sendPacket++ - p.sendPacketSize += (uint64)(n) - } -} - -func (p *Server) close(conn *ServerConn) { - if p.getServerConnById(conn.id) != nil { - conn.exit = true - if conn.conn != nil { - conn.conn.Close() - } - if conn.tcpconn != nil { - conn.tcpconn.Close() - } - p.deleteServerConn(conn.id) - } -} - -func (p *Server) checkTimeoutConn() { - - tmp := make(map[string]*ServerConn) - p.localConnMap.Range(func(key, value interface{}) bool { - id := key.(string) - serverConn := value.(*ServerConn) - tmp[id] = serverConn - return true - }) - - now := time.Now() - for _, conn := range tmp { - if conn.tcpmode > 0 { - continue - } - diffrecv := now.Sub(conn.activeRecvTime) - diffsend := now.Sub(conn.activeSendTime) - if diffrecv > time.Second*(time.Duration(conn.timeout)) || diffsend > time.Second*(time.Duration(conn.timeout)) { - conn.close = true - } - } - - for id, conn := range tmp { - if conn.tcpmode > 0 { - continue - } - if conn.close { - loggo.Info("close inactive conn %s %s", id, conn.ipaddrTarget.String()) - p.close(conn) - } - } -} - -func (p *Server) showNet() { - p.localConnMapSize = 0 - p.localConnMap.Range(func(key, value interface{}) bool { - p.localConnMapSize++ - return true - }) - loggo.Info("send %dPacket/s %dKB/s recv %dPacket/s %dKB/s %dConnections", - p.sendPacket, p.sendPacketSize/1024, p.recvPacket, p.recvPacketSize/1024, p.localConnMapSize) - p.sendPacket = 0 - p.recvPacket = 0 - p.sendPacketSize = 0 - p.recvPacketSize = 0 -} - -func (p *Server) addServerConn(uuid string, serverConn *ServerConn) { - p.localConnMap.Store(uuid, serverConn) -} - -func (p *Server) getServerConnById(uuid string) *ServerConn { - ret, ok := p.localConnMap.Load(uuid) - if !ok { - return nil - } - return ret.(*ServerConn) -} - -func (p *Server) deleteServerConn(uuid string) { - p.localConnMap.Delete(uuid) -} - -func (p *Server) remoteError(uuid string, packet *Packet) { - sendICMP(packet.echoId, packet.echoSeq, *p.conn, packet.src, "", uuid, (uint32)(MyMsg_KICK), []byte{}, - (int)(packet.my.Rproto), -1, p.key, - 0, 0, 0, 0, 0, 0, - 0) -} diff --git a/sock5.go b/sock5.go deleted file mode 100644 index dc474e5..0000000 --- a/sock5.go +++ /dev/null @@ -1,136 +0,0 @@ -package pingtunnel - -import ( - "encoding/binary" - "errors" - "io" - "net" - "strconv" - "time" -) - -var ( - errAddrType = errors.New("socks addr type not supported") - errVer = errors.New("socks version not supported") - errMethod = errors.New("socks only support 1 method now") - errAuthExtraData = errors.New("socks authentication get extra data") - errReqExtraData = errors.New("socks request get extra data") - errCmd = errors.New("socks command not supported") -) - -const ( - socksVer5 = 5 - socksCmdConnect = 1 -) - -func sock5Handshake(conn net.Conn) (err error) { - const ( - idVer = 0 - idNmethod = 1 - ) - // version identification and method selection message in theory can have - // at most 256 methods, plus version and nmethod field in total 258 bytes - // the current rfc defines only 3 authentication methods (plus 2 reserved), - // so it won't be such long in practice - - buf := make([]byte, 258) - - var n int - conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) - // make sure we get the nmethod field - if n, err = io.ReadAtLeast(conn, buf, idNmethod+1); err != nil { - return - } - if buf[idVer] != socksVer5 { - return errVer - } - nmethod := int(buf[idNmethod]) - msgLen := nmethod + 2 - if n == msgLen { // handshake done, common case - // do nothing, jump directly to send confirmation - } else if n < msgLen { // has more methods to read, rare case - if _, err = io.ReadFull(conn, buf[n:msgLen]); err != nil { - return - } - } else { // error, should not get extra data - return errAuthExtraData - } - // send confirmation: version 5, no authentication required - _, err = conn.Write([]byte{socksVer5, 0}) - return -} - -func sock5GetRequest(conn net.Conn) (rawaddr []byte, host string, err error) { - const ( - idVer = 0 - idCmd = 1 - idType = 3 // address type index - idIP0 = 4 // ip address start index - idDmLen = 4 // domain address length index - idDm0 = 5 // domain address start index - - typeIPv4 = 1 // type is ipv4 address - typeDm = 3 // type is domain address - typeIPv6 = 4 // type is ipv6 address - - lenIPv4 = 3 + 1 + net.IPv4len + 2 // 3(ver+cmd+rsv) + 1addrType + ipv4 + 2port - lenIPv6 = 3 + 1 + net.IPv6len + 2 // 3(ver+cmd+rsv) + 1addrType + ipv6 + 2port - lenDmBase = 3 + 1 + 1 + 2 // 3 + 1addrType + 1addrLen + 2port, plus addrLen - ) - // refer to getRequest in server.go for why set buffer size to 263 - buf := make([]byte, 263) - var n int - conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) - // read till we get possible domain length field - if n, err = io.ReadAtLeast(conn, buf, idDmLen+1); err != nil { - return - } - // check version and cmd - if buf[idVer] != socksVer5 { - err = errVer - return - } - if buf[idCmd] != socksCmdConnect { - err = errCmd - return - } - - reqLen := -1 - switch buf[idType] { - case typeIPv4: - reqLen = lenIPv4 - case typeIPv6: - reqLen = lenIPv6 - case typeDm: - reqLen = int(buf[idDmLen]) + lenDmBase - default: - err = errAddrType - return - } - - if n == reqLen { - // common case, do nothing - } else if n < reqLen { // rare case - if _, err = io.ReadFull(conn, buf[n:reqLen]); err != nil { - return - } - } else { - err = errReqExtraData - return - } - - rawaddr = buf[idType:reqLen] - - switch buf[idType] { - case typeIPv4: - host = net.IP(buf[idIP0 : idIP0+net.IPv4len]).String() - case typeIPv6: - host = net.IP(buf[idIP0 : idIP0+net.IPv6len]).String() - case typeDm: - host = string(buf[idDm0 : idDm0+buf[idDmLen]]) - } - port := binary.BigEndian.Uint16(buf[reqLen-2 : reqLen]) - host = net.JoinHostPort(host, strconv.Itoa(int(port))) - - return -}