2024-06-18 07:36:36 +02:00
package splithttp
import (
"context"
"crypto/tls"
"io"
gonet "net"
"net/http"
"strconv"
2024-06-21 01:30:51 +02:00
"strings"
2024-06-18 07:36:36 +02:00
"sync"
"time"
2024-07-19 17:53:47 +00:00
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
2024-10-30 03:31:05 +01:00
goreality "github.com/xtls/reality"
2024-06-18 07:36:36 +02:00
"github.com/xtls/xray-core/common"
2024-06-29 14:32:57 -04:00
"github.com/xtls/xray-core/common/errors"
2024-06-18 07:36:36 +02:00
"github.com/xtls/xray-core/common/net"
http_proto "github.com/xtls/xray-core/common/protocol/http"
"github.com/xtls/xray-core/common/signal/done"
"github.com/xtls/xray-core/transport/internet"
2024-10-30 03:31:05 +01:00
"github.com/xtls/xray-core/transport/internet/reality"
2024-06-18 07:36:36 +02:00
"github.com/xtls/xray-core/transport/internet/stat"
v2tls "github.com/xtls/xray-core/transport/internet/tls"
2024-06-23 19:05:37 +02:00
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
2024-06-18 07:36:36 +02:00
)
type requestHandler struct {
2024-07-29 06:32:04 +00:00
config * Config
2024-06-18 07:36:36 +02:00
host string
path string
ln * Listener
2024-07-17 07:41:17 -04:00
sessionMu * sync . Mutex
2024-06-18 07:36:36 +02:00
sessions sync . Map
localAddr gonet . TCPAddr
}
2024-06-21 01:30:51 +02:00
type httpSession struct {
2024-07-11 09:56:20 +02:00
uploadQueue * uploadQueue
2024-06-21 01:30:51 +02:00
// for as long as the GET request is not opened by the client, this will be
// open ("undone"), and the session may be expired within a certain TTL.
// after the client connects, this becomes "done" and the session lives as
// long as the GET request.
isFullyConnected * done . Instance
}
func ( h * requestHandler ) maybeReapSession ( isFullyConnected * done . Instance , sessionId string ) {
shouldReap := done . New ( )
go func ( ) {
time . Sleep ( 30 * time . Second )
shouldReap . Close ( )
} ( )
select {
case <- isFullyConnected . Wait ( ) :
return
case <- shouldReap . Wait ( ) :
h . sessions . Delete ( sessionId )
}
}
func ( h * requestHandler ) upsertSession ( sessionId string ) * httpSession {
2024-07-17 07:41:17 -04:00
// fast path
2024-06-21 01:30:51 +02:00
currentSessionAny , ok := h . sessions . Load ( sessionId )
if ok {
return currentSessionAny . ( * httpSession )
}
2024-07-17 07:41:17 -04:00
// slow path
h . sessionMu . Lock ( )
defer h . sessionMu . Unlock ( )
currentSessionAny , ok = h . sessions . Load ( sessionId )
if ok {
return currentSessionAny . ( * httpSession )
}
2024-06-21 01:30:51 +02:00
s := & httpSession {
2024-07-29 12:10:29 +02:00
uploadQueue : NewUploadQueue ( int ( h . ln . config . GetNormalizedScMaxConcurrentPosts ( ) . To ) ) ,
2024-06-21 01:30:51 +02:00
isFullyConnected : done . New ( ) ,
}
h . sessions . Store ( sessionId , s )
go h . maybeReapSession ( s . isFullyConnected , sessionId )
return s
}
2024-06-18 07:36:36 +02:00
func ( h * requestHandler ) ServeHTTP ( writer http . ResponseWriter , request * http . Request ) {
2024-07-06 17:12:49 -04:00
if len ( h . host ) > 0 && ! internet . IsValidHTTPHost ( request . Host , h . host ) {
2024-06-29 14:32:57 -04:00
errors . LogInfo ( context . Background ( ) , "failed to validate host, request:" , request . Host , ", config:" , h . host )
2024-06-18 07:36:36 +02:00
writer . WriteHeader ( http . StatusNotFound )
return
}
2024-06-21 01:30:51 +02:00
if ! strings . HasPrefix ( request . URL . Path , h . path ) {
2024-06-29 14:32:57 -04:00
errors . LogInfo ( context . Background ( ) , "failed to validate path, request:" , request . URL . Path , ", config:" , h . path )
2024-06-18 07:36:36 +02:00
writer . WriteHeader ( http . StatusNotFound )
return
}
2024-11-09 11:05:41 +00:00
h . config . WriteResponseHeader ( writer )
2024-06-21 01:30:51 +02:00
sessionId := ""
subpath := strings . Split ( request . URL . Path [ len ( h . path ) : ] , "/" )
if len ( subpath ) > 0 {
sessionId = subpath [ 0 ]
}
2024-06-18 07:36:36 +02:00
if sessionId == "" {
2024-06-29 14:32:57 -04:00
errors . LogInfo ( context . Background ( ) , "no sessionid on request:" , request . URL . Path )
2024-06-18 07:36:36 +02:00
writer . WriteHeader ( http . StatusBadRequest )
return
}
forwardedAddrs := http_proto . ParseXForwardedFor ( request . Header )
remoteAddr , err := gonet . ResolveTCPAddr ( "tcp" , request . RemoteAddr )
if err != nil {
remoteAddr = & gonet . TCPAddr { }
}
if len ( forwardedAddrs ) > 0 && forwardedAddrs [ 0 ] . Family ( ) . IsIP ( ) {
remoteAddr = & net . TCPAddr {
IP : forwardedAddrs [ 0 ] . IP ( ) ,
Port : int ( 0 ) ,
}
}
2024-06-21 01:30:51 +02:00
currentSession := h . upsertSession ( sessionId )
2024-07-29 12:10:29 +02:00
scMaxEachPostBytes := int ( h . ln . config . GetNormalizedScMaxEachPostBytes ( ) . To )
2024-06-21 01:30:51 +02:00
2024-06-18 07:36:36 +02:00
if request . Method == "POST" {
2024-06-21 01:30:51 +02:00
seq := ""
if len ( subpath ) > 1 {
seq = subpath [ 1 ]
2024-06-18 07:36:36 +02:00
}
if seq == "" {
2024-11-09 11:05:41 +00:00
if h . config . Mode == "packet-up" {
errors . LogInfo ( context . Background ( ) , "stream-up mode is not allowed" )
writer . WriteHeader ( http . StatusBadRequest )
return
}
err = currentSession . uploadQueue . Push ( Packet {
Reader : request . Body ,
} )
if err != nil {
errors . LogInfoInner ( context . Background ( ) , err , "failed to upload (PushReader)" )
writer . WriteHeader ( http . StatusConflict )
} else {
writer . WriteHeader ( http . StatusOK )
<- request . Context ( ) . Done ( )
}
return
}
if h . config . Mode == "stream-up" {
errors . LogInfo ( context . Background ( ) , "packet-up mode is not allowed" )
2024-06-18 07:36:36 +02:00
writer . WriteHeader ( http . StatusBadRequest )
return
}
payload , err := io . ReadAll ( request . Body )
2024-07-29 06:35:17 +02:00
2024-07-29 12:10:29 +02:00
if len ( payload ) > scMaxEachPostBytes {
errors . LogInfo ( context . Background ( ) , "Too large upload. scMaxEachPostBytes is set to " , scMaxEachPostBytes , "but request had size " , len ( payload ) , ". Adjust scMaxEachPostBytes on the server to be at least as large as client." )
2024-07-29 06:35:17 +02:00
writer . WriteHeader ( http . StatusRequestEntityTooLarge )
return
}
2024-06-18 07:36:36 +02:00
if err != nil {
2024-11-09 11:05:41 +00:00
errors . LogInfoInner ( context . Background ( ) , err , "failed to upload (ReadAll)" )
2024-06-18 07:36:36 +02:00
writer . WriteHeader ( http . StatusInternalServerError )
return
}
seqInt , err := strconv . ParseUint ( seq , 10 , 64 )
if err != nil {
2024-11-09 11:05:41 +00:00
errors . LogInfoInner ( context . Background ( ) , err , "failed to upload (ParseUint)" )
2024-06-18 07:36:36 +02:00
writer . WriteHeader ( http . StatusInternalServerError )
return
}
2024-06-21 01:30:51 +02:00
err = currentSession . uploadQueue . Push ( Packet {
2024-06-18 07:36:36 +02:00
Payload : payload ,
Seq : seqInt ,
} )
if err != nil {
2024-11-09 11:05:41 +00:00
errors . LogInfoInner ( context . Background ( ) , err , "failed to upload (PushPayload)" )
2024-06-18 07:36:36 +02:00
writer . WriteHeader ( http . StatusInternalServerError )
return
}
writer . WriteHeader ( http . StatusOK )
} else if request . Method == "GET" {
responseFlusher , ok := writer . ( http . Flusher )
if ! ok {
panic ( "expected http.ResponseWriter to be an http.Flusher" )
}
2024-06-21 01:30:51 +02:00
// after GET is done, the connection is finished. disable automatic
// session reaping, and handle it in defer
currentSession . isFullyConnected . Close ( )
2024-06-18 07:36:36 +02:00
defer h . sessions . Delete ( sessionId )
// magic header instructs nginx + apache to not buffer response body
writer . Header ( ) . Set ( "X-Accel-Buffering" , "no" )
2024-08-11 00:59:42 +01:00
// A web-compliant header telling all middleboxes to disable caching.
// Should be able to prevent overloading the cache, or stop CDNs from
// teeing the response stream into their cache, causing slowdowns.
writer . Header ( ) . Set ( "Cache-Control" , "no-store" )
2024-07-29 06:32:04 +00:00
if ! h . config . NoSSEHeader {
// magic header to make the HTTP middle box consider this as SSE to disable buffer
writer . Header ( ) . Set ( "Content-Type" , "text/event-stream" )
}
2024-07-11 09:56:20 +02:00
2024-06-18 07:36:36 +02:00
writer . WriteHeader ( http . StatusOK )
2024-08-11 01:56:25 +02:00
2024-06-18 07:36:36 +02:00
responseFlusher . Flush ( )
downloadDone := done . New ( )
conn := splitConn {
writer : & httpResponseBodyWriter {
responseWriter : writer ,
downloadDone : downloadDone ,
responseFlusher : responseFlusher ,
} ,
2024-06-21 01:30:51 +02:00
reader : currentSession . uploadQueue ,
2024-06-18 07:36:36 +02:00
remoteAddr : remoteAddr ,
}
h . ln . addConn ( stat . Connection ( & conn ) )
// "A ResponseWriter may not be used after [Handler.ServeHTTP] has returned."
2024-08-22 17:07:57 +02:00
select {
case <- request . Context ( ) . Done ( ) :
case <- downloadDone . Wait ( ) :
}
2024-06-18 07:36:36 +02:00
2024-08-22 17:07:57 +02:00
conn . Close ( )
2024-06-18 07:36:36 +02:00
} else {
2024-11-09 11:05:41 +00:00
errors . LogInfo ( context . Background ( ) , "unsupported method: " , request . Method )
2024-06-18 07:36:36 +02:00
writer . WriteHeader ( http . StatusMethodNotAllowed )
}
}
type httpResponseBodyWriter struct {
sync . Mutex
responseWriter http . ResponseWriter
responseFlusher http . Flusher
downloadDone * done . Instance
}
func ( c * httpResponseBodyWriter ) Write ( b [ ] byte ) ( int , error ) {
c . Lock ( )
defer c . Unlock ( )
if c . downloadDone . Done ( ) {
return 0 , io . ErrClosedPipe
}
n , err := c . responseWriter . Write ( b )
if err == nil {
c . responseFlusher . Flush ( )
}
return n , err
}
func ( c * httpResponseBodyWriter ) Close ( ) error {
c . Lock ( )
defer c . Unlock ( )
c . downloadDone . Close ( )
return nil
}
type Listener struct {
sync . Mutex
2024-07-19 17:53:47 +00:00
server http . Server
h3server * http3 . Server
listener net . Listener
h3listener * quic . EarlyListener
config * Config
addConn internet . ConnHandler
isH3 bool
2024-06-18 07:36:36 +02:00
}
func ListenSH ( ctx context . Context , address net . Address , port net . Port , streamSettings * internet . MemoryStreamConfig , addConn internet . ConnHandler ) ( internet . Listener , error ) {
l := & Listener {
addConn : addConn ,
}
shSettings := streamSettings . ProtocolSettings . ( * Config )
l . config = shSettings
if l . config != nil {
if streamSettings . SocketSettings == nil {
streamSettings . SocketSettings = & internet . SocketConfig { }
}
}
var listener net . Listener
var err error
var localAddr = gonet . TCPAddr { }
2024-07-19 17:53:47 +00:00
handler := & requestHandler {
2024-07-29 06:32:04 +00:00
config : shSettings ,
2024-07-19 17:53:47 +00:00
host : shSettings . Host ,
2024-08-10 23:47:42 +02:00
path : shSettings . GetNormalizedPath ( ) ,
2024-07-19 17:53:47 +00:00
ln : l ,
sessionMu : & sync . Mutex { } ,
sessions : sync . Map { } ,
localAddr : localAddr ,
}
tlsConfig := getTLSConfig ( streamSettings )
l . isH3 = len ( tlsConfig . NextProtos ) == 1 && tlsConfig . NextProtos [ 0 ] == "h3"
2024-06-18 07:36:36 +02:00
if port == net . Port ( 0 ) { // unix
listener , err = internet . ListenSystem ( ctx , & net . UnixAddr {
Name : address . Domain ( ) ,
Net : "unix" ,
} , streamSettings . SocketSettings )
if err != nil {
2024-06-29 14:32:57 -04:00
return nil , errors . New ( "failed to listen unix domain socket(for SH) on " , address ) . Base ( err )
2024-06-18 07:36:36 +02:00
}
2024-06-29 14:32:57 -04:00
errors . LogInfo ( ctx , "listening unix domain socket(for SH) on " , address )
2024-07-19 17:53:47 +00:00
} else if l . isH3 { // quic
Conn , err := internet . ListenSystemPacket ( context . Background ( ) , & net . UDPAddr {
IP : address . IP ( ) ,
Port : int ( port ) ,
} , streamSettings . SocketSettings )
if err != nil {
2024-07-21 02:29:50 +02:00
return nil , errors . New ( "failed to listen UDP(for SH3) on " , address , ":" , port ) . Base ( err )
2024-07-19 17:53:47 +00:00
}
2024-07-21 02:29:50 +02:00
h3listener , err := quic . ListenEarly ( Conn , tlsConfig , nil )
2024-07-19 17:53:47 +00:00
if err != nil {
return nil , errors . New ( "failed to listen QUIC(for SH3) on " , address , ":" , port ) . Base ( err )
}
l . h3listener = h3listener
errors . LogInfo ( ctx , "listening QUIC(for SH3) on " , address , ":" , port )
l . h3server = & http3 . Server {
Handler : handler ,
}
go func ( ) {
if err := l . h3server . ServeListener ( l . h3listener ) ; err != nil {
errors . LogWarningInner ( ctx , err , "failed to serve http3 for splithttp" )
}
} ( )
2024-06-18 07:36:36 +02:00
} else { // tcp
localAddr = gonet . TCPAddr {
IP : address . IP ( ) ,
Port : int ( port ) ,
}
listener , err = internet . ListenSystem ( ctx , & net . TCPAddr {
IP : address . IP ( ) ,
Port : int ( port ) ,
} , streamSettings . SocketSettings )
if err != nil {
2024-06-29 14:32:57 -04:00
return nil , errors . New ( "failed to listen TCP(for SH) on " , address , ":" , port ) . Base ( err )
2024-06-18 07:36:36 +02:00
}
2024-06-29 14:32:57 -04:00
errors . LogInfo ( ctx , "listening TCP(for SH) on " , address , ":" , port )
2024-07-21 02:29:50 +02:00
}
// tcp/unix (h1/h2)
if listener != nil {
if config := v2tls . ConfigFromStreamSettings ( streamSettings ) ; config != nil {
if tlsConfig := config . GetTLSConfig ( ) ; tlsConfig != nil {
listener = tls . NewListener ( listener , tlsConfig )
}
}
2024-10-30 03:31:05 +01:00
if config := reality . ConfigFromStreamSettings ( streamSettings ) ; config != nil {
listener = goreality . NewListener ( listener , config . GetREALITYConfig ( ) )
}
2024-07-23 04:19:31 +08:00
// h2cHandler can handle both plaintext HTTP/1.1 and h2c
h2cHandler := h2c . NewHandler ( handler , & http2 . Server { } )
2024-07-21 02:29:50 +02:00
l . listener = listener
2024-07-23 04:19:31 +08:00
l . server = http . Server {
Handler : h2cHandler ,
ReadHeaderTimeout : time . Second * 4 ,
MaxHeaderBytes : 8192 ,
}
2024-07-21 02:29:50 +02:00
2024-07-19 17:53:47 +00:00
go func ( ) {
if err := l . server . Serve ( l . listener ) ; err != nil {
errors . LogWarningInner ( ctx , err , "failed to serve http for splithttp" )
}
} ( )
}
2024-06-18 07:36:36 +02:00
return l , err
}
// Addr implements net.Listener.Addr().
func ( ln * Listener ) Addr ( ) net . Addr {
2024-09-25 21:29:41 -04:00
if ln . h3listener != nil {
return ln . h3listener . Addr ( )
}
if ln . listener != nil {
return ln . listener . Addr ( )
}
return nil
2024-06-18 07:36:36 +02:00
}
// Close implements net.Listener.Close().
func ( ln * Listener ) Close ( ) error {
2024-07-19 17:53:47 +00:00
if ln . h3server != nil {
if err := ln . h3server . Close ( ) ; err != nil {
return err
}
} else if ln . listener != nil {
return ln . listener . Close ( )
}
return errors . New ( "listener does not have an HTTP/3 server or a net.listener" )
}
func getTLSConfig ( streamSettings * internet . MemoryStreamConfig ) * tls . Config {
config := v2tls . ConfigFromStreamSettings ( streamSettings )
if config == nil {
return & tls . Config { }
}
return config . GetTLSConfig ( )
2024-06-18 07:36:36 +02:00
}
func init ( ) {
common . Must ( internet . RegisterTransportListener ( protocolName , ListenSH ) )
}