package observatory import ( "context" "net" "net/http" "net/url" "sort" "sync" "time" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/errors" v2net "github.com/xtls/xray-core/common/net" "github.com/xtls/xray-core/common/session" "github.com/xtls/xray-core/common/signal/done" "github.com/xtls/xray-core/common/task" "github.com/xtls/xray-core/core" "github.com/xtls/xray-core/features/extension" "github.com/xtls/xray-core/features/outbound" "github.com/xtls/xray-core/features/routing" "github.com/xtls/xray-core/transport/internet/tagged" "google.golang.org/protobuf/proto" ) type Observer struct { config *Config ctx context.Context statusLock sync.Mutex status []*OutboundStatus finished *done.Instance ohm outbound.Manager dispatcher routing.Dispatcher } func (o *Observer) GetObservation(ctx context.Context) (proto.Message, error) { return &ObservationResult{Status: o.status}, nil } func (o *Observer) Type() interface{} { return extension.ObservatoryType() } func (o *Observer) Start() error { if o.config != nil && len(o.config.SubjectSelector) != 0 { o.finished = done.New() go o.background() } return nil } func (o *Observer) Close() error { if o.finished != nil { return o.finished.Close() } return nil } func (o *Observer) background() { for !o.finished.Done() { hs, ok := o.ohm.(outbound.HandlerSelector) if !ok { errors.LogInfo(o.ctx, "outbound.Manager is not a HandlerSelector") return } outbounds := hs.Select(o.config.SubjectSelector) o.updateStatus(outbounds) sleepTime := time.Second * 10 if o.config.ProbeInterval != 0 { sleepTime = time.Duration(o.config.ProbeInterval) } if !o.config.EnableConcurrency { sort.Strings(outbounds) for _, v := range outbounds { result := o.probe(v) o.updateStatusForResult(v, &result) if o.finished.Done() { return } time.Sleep(sleepTime) } continue } ch := make(chan struct{}, len(outbounds)) for _, v := range outbounds { go func(v string) { result := o.probe(v) o.updateStatusForResult(v, &result) ch <- struct{}{} }(v) } for range outbounds { select { case <-ch: case <-o.finished.Wait(): return } } time.Sleep(sleepTime) } } func (o *Observer) updateStatus(outbounds []string) { o.statusLock.Lock() defer o.statusLock.Unlock() // TODO should remove old inbound that is removed _ = outbounds } func (o *Observer) probe(outbound string) ProbeResult { errorCollectorForRequest := newErrorCollector() httpTransport := http.Transport{ Proxy: func(*http.Request) (*url.URL, error) { return nil, nil }, DialContext: func(ctx context.Context, network string, addr string) (net.Conn, error) { var connection net.Conn taskErr := task.Run(ctx, func() error { // MUST use Xray's built in context system dest, err := v2net.ParseDestination(network + ":" + addr) if err != nil { return errors.New("cannot understand address").Base(err) } trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest) conn, err := tagged.Dialer(trackedCtx, o.dispatcher, dest, outbound) if err != nil { return errors.New("cannot dial remote address ", dest).Base(err) } connection = conn return nil }) if taskErr != nil { return nil, errors.New("cannot finish connection").Base(taskErr) } return connection, nil }, TLSHandshakeTimeout: time.Second * 5, } httpClient := &http.Client{ Transport: &httpTransport, CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse }, Jar: nil, Timeout: time.Second * 5, } var GETTime time.Duration err := task.Run(o.ctx, func() error { startTime := time.Now() probeURL := "https://www.google.com/generate_204" if o.config.ProbeUrl != "" { probeURL = o.config.ProbeUrl } response, err := httpClient.Get(probeURL) if err != nil { return errors.New("outbound failed to relay connection").Base(err) } if response.Body != nil { response.Body.Close() } endTime := time.Now() GETTime = endTime.Sub(startTime) return nil }) if err != nil { var errorMessage = "the outbound " + outbound + " is dead: GET request failed:" + err.Error() + "with outbound handler report underlying connection failed" errors.LogInfoInner(o.ctx, errorCollectorForRequest.UnderlyingError(), errorMessage) return ProbeResult{Alive: false, LastErrorReason: errorMessage} } errors.LogInfo(o.ctx, "the outbound ", outbound, " is alive:", GETTime.Seconds()) return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()} } func (o *Observer) updateStatusForResult(outbound string, result *ProbeResult) { o.statusLock.Lock() defer o.statusLock.Unlock() var status *OutboundStatus if location := o.findStatusLocationLockHolderOnly(outbound); location != -1 { status = o.status[location] } else { status = &OutboundStatus{} o.status = append(o.status, status) } status.LastTryTime = time.Now().Unix() status.OutboundTag = outbound status.Alive = result.Alive if result.Alive { status.Delay = result.Delay status.LastSeenTime = status.LastTryTime status.LastErrorReason = "" } else { status.LastErrorReason = result.LastErrorReason status.Delay = 99999999 } } func (o *Observer) findStatusLocationLockHolderOnly(outbound string) int { for i, v := range o.status { if v.OutboundTag == outbound { return i } } return -1 } func New(ctx context.Context, config *Config) (*Observer, error) { var outboundManager outbound.Manager var dispatcher routing.Dispatcher err := core.RequireFeatures(ctx, func(om outbound.Manager, rd routing.Dispatcher) { outboundManager = om dispatcher = rd }) if err != nil { return nil, errors.New("Cannot get depended features").Base(err) } return &Observer{ config: config, ctx: ctx, ohm: outboundManager, dispatcher: dispatcher, }, nil } func init() { common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { return New(ctx, config.(*Config)) })) }