diff --git a/cmd/main.go b/cmd/main.go index 077b97c..02b53ae 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -74,6 +74,12 @@ Usage: -maxconn 最大连接数,默认1000 the max num of connections, default 1000 + + -maxprt server最大处理线程数,默认100 + max process thread in server, default 100 + + -maxprb server最大处理线程buffer数,默认1000 + max process thread's buffer in server, default 1000 ` func main() { @@ -94,6 +100,8 @@ func main() { loglevel := flag.String("loglevel", "info", "log level") open_sock5 := flag.Int("sock5", 0, "sock5 mode") maxconn := flag.Int("maxconn", 0, "max num of connections") + max_process_thread := flag.Int("maxprt", 0, "max process thread in server") + max_process_buffer := flag.Int("maxprb", 0, "max process thread's buffer in server") flag.Usage = func() { fmt.Printf(usage) } @@ -136,7 +144,7 @@ func main() { loggo.Info("key %d", *key) if *t == "server" { - s, err := pingtunnel.NewServer(*key, *maxconn) + s, err := pingtunnel.NewServer(*key, *maxconn, *max_process_thread, *max_process_buffer) if err != nil { loggo.Error("ERROR: %s", err.Error()) return diff --git a/server.go b/server.go index 6bebd3b..48d226b 100644 --- a/server.go +++ b/server.go @@ -3,6 +3,7 @@ package pingtunnel import ( "github.com/esrrhs/go-engine/src/common" "github.com/esrrhs/go-engine/src/loggo" + "github.com/esrrhs/go-engine/src/threadpool" "github.com/golang/protobuf/proto" "golang.org/x/net/icmp" "net" @@ -10,12 +11,19 @@ import ( "time" ) -func NewServer(key int, maxconn int) (*Server, error) { - return &Server{ +func NewServer(key int, maxconn int, maxprocessthread int, maxprocessbuffer int) (*Server, error) { + s := &Server{ exit: false, key: key, maxconn: maxconn, - }, nil + } + + s.processtp = threadpool.NewThreadPool(maxprocessthread, maxprocessbuffer, func(v interface{}) { + packet := v.(*Packet) + s.processDataPacket(packet) + }) + + return s, nil } type Server struct { @@ -38,6 +46,8 @@ type Server struct { echoId int echoSeq int + + processtp *threadpool.ThreadPool } type ServerConn struct { @@ -124,6 +134,11 @@ func (p *Server) processPacket(packet *Packet) { return } + p.processtp.AddJob((int)(common.HashString(packet.my.Id)), packet) +} + +func (p *Server) processDataPacket(packet *Packet) { + loggo.Debug("processPacket %s %s %d", packet.my.Id, packet.src.String(), len(packet.my.Data)) now := time.Now()