Xray-core/app/router/balancing.go

120 lines
2.9 KiB
Go
Raw Normal View History

2020-11-25 14:01:53 +03:00
package router
import (
"context"
sync "sync"
"github.com/xtls/xray-core/features/extension"
2020-12-04 04:36:16 +03:00
"github.com/xtls/xray-core/features/outbound"
2020-11-25 14:01:53 +03:00
)
type BalancingStrategy interface {
PickOutbound([]string) string
}
type BalancingPrincipleTarget interface {
GetPrincipleTarget([]string) []string
2020-11-25 14:01:53 +03:00
}
type RoundRobinStrategy struct {
mu sync.Mutex
index int
}
func (s *RoundRobinStrategy) PickOutbound(tags []string) string {
n := len(tags)
if n == 0 {
panic("0 tags")
}
s.mu.Lock()
defer s.mu.Unlock()
tag := tags[s.index%n]
s.index = (s.index + 1) % n
return tag
}
2020-11-25 14:01:53 +03:00
type Balancer struct {
selectors []string
strategy BalancingStrategy
ohm outbound.Manager
fallbackTag string
override override
2020-11-25 14:01:53 +03:00
}
// PickOutbound picks the tag of a outbound
2020-11-25 14:01:53 +03:00
func (b *Balancer) PickOutbound() (string, error) {
candidates, err := b.SelectOutbounds()
if err != nil {
if b.fallbackTag != "" {
newError("fallback to [", b.fallbackTag, "], due to error: ", err).AtInfo().WriteToLog()
return b.fallbackTag, nil
}
return "", err
2020-11-25 14:01:53 +03:00
}
var tag string
if o := b.override.Get(); o != "" {
tag = o
} else {
tag = b.strategy.PickOutbound(candidates)
2020-11-25 14:01:53 +03:00
}
if tag == "" {
if b.fallbackTag != "" {
newError("fallback to [", b.fallbackTag, "], due to empty tag returned").AtInfo().WriteToLog()
return b.fallbackTag, nil
}
// will use default handler
2020-11-25 14:01:53 +03:00
return "", newError("balancing strategy returns empty tag")
}
return tag, nil
}
2022-05-18 10:29:01 +03:00
func (b *Balancer) InjectContext(ctx context.Context) {
if contextReceiver, ok := b.strategy.(extension.ContextReceiver); ok {
contextReceiver.InjectContext(ctx)
}
}
// SelectOutbounds select outbounds with selectors of the Balancer
func (b *Balancer) SelectOutbounds() ([]string, error) {
hs, ok := b.ohm.(outbound.HandlerSelector)
if !ok {
return nil, newError("outbound.Manager is not a HandlerSelector")
}
tags := hs.Select(b.selectors)
return tags, nil
}
// GetPrincipleTarget implements routing.BalancerPrincipleTarget
func (r *Router) GetPrincipleTarget(tag string) ([]string, error) {
if b, ok := r.balancers[tag]; ok {
if s, ok := b.strategy.(BalancingPrincipleTarget); ok {
candidates, err := b.SelectOutbounds()
if err != nil {
return nil, newError("unable to select outbounds").Base(err)
}
return s.GetPrincipleTarget(candidates), nil
}
return nil, newError("unsupported GetPrincipleTarget")
}
return nil, newError("cannot find tag")
}
// SetOverrideTarget implements routing.BalancerOverrider
func (r *Router) SetOverrideTarget(tag, target string) error {
if b, ok := r.balancers[tag]; ok {
b.override.Put(target)
return nil
}
return newError("cannot find tag")
}
// GetOverrideTarget implements routing.BalancerOverrider
func (r *Router) GetOverrideTarget(tag string) (string, error) {
if b, ok := r.balancers[tag]; ok {
return b.override.Get(), nil
}
return "", newError("cannot find tag")
}