mirror of
https://github.com/XTLS/Xray-core.git
synced 2025-01-25 11:51:45 +03:00
017f53b5fc
* Add session context outbounds as slice slice is needed for dialer proxy where two outbounds work on top of each other There are two sets of target addr for example It also enable Xtls to correctly do splice copy by checking both outbounds are ready to do direct copy * Fill outbound tag info * Splice now checks capalibility from all outbounds * Fix unit tests
177 lines
4.3 KiB
Go
177 lines
4.3 KiB
Go
package outbound_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/xtls/xray-core/app/policy"
|
|
"github.com/xtls/xray-core/app/proxyman"
|
|
. "github.com/xtls/xray-core/app/proxyman/outbound"
|
|
"github.com/xtls/xray-core/app/stats"
|
|
"github.com/xtls/xray-core/common/net"
|
|
"github.com/xtls/xray-core/common/serial"
|
|
"github.com/xtls/xray-core/common/session"
|
|
core "github.com/xtls/xray-core/core"
|
|
"github.com/xtls/xray-core/features/outbound"
|
|
"github.com/xtls/xray-core/proxy/freedom"
|
|
"github.com/xtls/xray-core/transport/internet/stat"
|
|
)
|
|
|
|
func TestInterfaces(t *testing.T) {
|
|
_ = (outbound.Handler)(new(Handler))
|
|
_ = (outbound.Manager)(new(Manager))
|
|
}
|
|
|
|
const xrayKey core.XrayKey = 1
|
|
|
|
func TestOutboundWithoutStatCounter(t *testing.T) {
|
|
config := &core.Config{
|
|
App: []*serial.TypedMessage{
|
|
serial.ToTypedMessage(&stats.Config{}),
|
|
serial.ToTypedMessage(&policy.Config{
|
|
System: &policy.SystemPolicy{
|
|
Stats: &policy.SystemPolicy_Stats{
|
|
InboundUplink: true,
|
|
},
|
|
},
|
|
}),
|
|
},
|
|
}
|
|
|
|
v, _ := core.New(config)
|
|
v.AddFeature((outbound.Manager)(new(Manager)))
|
|
ctx := context.WithValue(context.Background(), xrayKey, v)
|
|
ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{{}})
|
|
h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
|
|
Tag: "tag",
|
|
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
|
|
})
|
|
conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
|
|
_, ok := conn.(*stat.CounterConnection)
|
|
if ok {
|
|
t.Errorf("Expected conn to not be CounterConnection")
|
|
}
|
|
}
|
|
|
|
func TestOutboundWithStatCounter(t *testing.T) {
|
|
config := &core.Config{
|
|
App: []*serial.TypedMessage{
|
|
serial.ToTypedMessage(&stats.Config{}),
|
|
serial.ToTypedMessage(&policy.Config{
|
|
System: &policy.SystemPolicy{
|
|
Stats: &policy.SystemPolicy_Stats{
|
|
OutboundUplink: true,
|
|
OutboundDownlink: true,
|
|
},
|
|
},
|
|
}),
|
|
},
|
|
}
|
|
|
|
v, _ := core.New(config)
|
|
v.AddFeature((outbound.Manager)(new(Manager)))
|
|
ctx := context.WithValue(context.Background(), xrayKey, v)
|
|
ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{{}})
|
|
h, _ := NewHandler(ctx, &core.OutboundHandlerConfig{
|
|
Tag: "tag",
|
|
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
|
|
})
|
|
conn, _ := h.(*Handler).Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), 13146))
|
|
_, ok := conn.(*stat.CounterConnection)
|
|
if !ok {
|
|
t.Errorf("Expected conn to be CounterConnection")
|
|
}
|
|
}
|
|
|
|
func TestTagsCache(t *testing.T) {
|
|
|
|
test_duration := 10 * time.Second
|
|
threads_num := 50
|
|
delay := 10 * time.Millisecond
|
|
tags_prefix := "node"
|
|
|
|
tags := sync.Map{}
|
|
counter := atomic.Uint64{}
|
|
|
|
ohm, err := New(context.Background(), &proxyman.OutboundConfig{})
|
|
if err != nil {
|
|
t.Error("failed to create outbound handler manager")
|
|
}
|
|
config := &core.Config{
|
|
App: []*serial.TypedMessage{},
|
|
}
|
|
v, _ := core.New(config)
|
|
v.AddFeature(ohm)
|
|
ctx := context.WithValue(context.Background(), xrayKey, v)
|
|
|
|
stop_add_rm := false
|
|
wg_add_rm := sync.WaitGroup{}
|
|
addHandlers := func() {
|
|
defer wg_add_rm.Done()
|
|
for !stop_add_rm {
|
|
time.Sleep(delay)
|
|
idx := counter.Add(1)
|
|
tag := fmt.Sprintf("%s%d", tags_prefix, idx)
|
|
cfg := &core.OutboundHandlerConfig{
|
|
Tag: tag,
|
|
ProxySettings: serial.ToTypedMessage(&freedom.Config{}),
|
|
}
|
|
if h, err := NewHandler(ctx, cfg); err == nil {
|
|
if err := ohm.AddHandler(ctx, h); err == nil {
|
|
// t.Log("add handler:", tag)
|
|
tags.Store(tag, nil)
|
|
} else {
|
|
t.Error("failed to add handler:", tag)
|
|
}
|
|
} else {
|
|
t.Error("failed to create handler:", tag)
|
|
}
|
|
}
|
|
}
|
|
|
|
rmHandlers := func() {
|
|
defer wg_add_rm.Done()
|
|
for !stop_add_rm {
|
|
time.Sleep(delay)
|
|
tags.Range(func(key interface{}, value interface{}) bool {
|
|
if _, ok := tags.LoadAndDelete(key); ok {
|
|
// t.Log("remove handler:", key)
|
|
ohm.RemoveHandler(ctx, key.(string))
|
|
return false
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
|
|
selectors := []string{tags_prefix}
|
|
wg_get := sync.WaitGroup{}
|
|
stop_get := false
|
|
getTags := func() {
|
|
defer wg_get.Done()
|
|
for !stop_get {
|
|
time.Sleep(delay)
|
|
_ = ohm.Select(selectors)
|
|
// t.Logf("get tags: %v", tag)
|
|
}
|
|
}
|
|
|
|
for i := 0; i < threads_num; i++ {
|
|
wg_add_rm.Add(2)
|
|
go rmHandlers()
|
|
go addHandlers()
|
|
wg_get.Add(1)
|
|
go getTags()
|
|
}
|
|
|
|
time.Sleep(test_duration)
|
|
stop_add_rm = true
|
|
wg_add_rm.Wait()
|
|
stop_get = true
|
|
wg_get.Wait()
|
|
}
|