From 5c366db84701ba35964fc87b47da3da8742e34ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C?= Date: Fri, 9 Apr 2021 05:20:30 +0800 Subject: [PATCH] Add observatory / latestPing balancing strategy Co-authored-by: Shelikhoo --- app/dispatcher/default.go | 17 +- app/observatory/config.pb.go | 530 ++++++++++++++++++ app/observatory/config.proto | 73 +++ app/observatory/errors.generated.go | 9 + app/observatory/explainErrors.go | 26 + app/observatory/observatory.go | 3 + app/observatory/observer.go | 216 +++++++ app/router/balancing.go | 8 + app/router/config.pb.go | 62 +- app/router/config.proto | 1 + app/router/strategy_leastping.go | 57 ++ common/session/context.go | 31 + features/extension/contextreceiver.go | 7 + features/extension/observatory.go | 19 + infra/conf/observatory.go | 14 + infra/conf/xray.go | 13 +- main/distro/all/all.go | 3 + transport/internet/tagged/tagged.go | 11 + .../tagged/taggedimpl/errors.generated.go | 9 + transport/internet/tagged/taggedimpl/impl.go | 46 ++ .../internet/tagged/taggedimpl/taggedimpl.go | 3 + 21 files changed, 1127 insertions(+), 31 deletions(-) create mode 100644 app/observatory/config.pb.go create mode 100644 app/observatory/config.proto create mode 100644 app/observatory/errors.generated.go create mode 100644 app/observatory/explainErrors.go create mode 100644 app/observatory/observatory.go create mode 100644 app/observatory/observer.go create mode 100644 app/router/strategy_leastping.go create mode 100644 features/extension/contextreceiver.go create mode 100644 features/extension/observatory.go create mode 100644 infra/conf/observatory.go create mode 100644 transport/internet/tagged/tagged.go create mode 100644 transport/internet/tagged/taggedimpl/errors.generated.go create mode 100644 transport/internet/tagged/taggedimpl/impl.go create mode 100644 transport/internet/tagged/taggedimpl/taggedimpl.go diff --git a/app/dispatcher/default.go b/app/dispatcher/default.go index 17010759..3c23cd67 100644 --- a/app/dispatcher/default.go +++ b/app/dispatcher/default.go @@ -199,8 +199,8 @@ func shouldOverride(ctx context.Context, result SniffResult, request session.Sni } if fkr0, ok := fakeDNSEngine.(dns.FakeDNSEngineRev0); ok && protocolString != "bittorrent" && p == "fakedns" && destination.Address.Family().IsIP() && fkr0.IsIPInIPPool(destination.Address) { - newError("Using sniffer ", protocolString, " since the fake DNS missed").WriteToLog(session.ExportIDToError(ctx)) - return true + newError("Using sniffer ", protocolString, " since the fake DNS missed").WriteToLog(session.ExportIDToError(ctx)) + return true } if resultSubset, ok := result.(SnifferIsProtoSubsetOf); ok { if resultSubset.IsProtoSubsetOf(p) { @@ -399,7 +399,18 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport. var handler outbound.Handler - if d.router != nil { + if forcedOutboundTag := session.GetForcedOutboundTagFromContext(ctx); forcedOutboundTag != "" { + ctx = session.SetForcedOutboundTagToContext(ctx, "") + if h := d.ohm.GetHandler(forcedOutboundTag); h != nil { + newError("taking platform initialized detour [", forcedOutboundTag, "] for [", destination, "]").WriteToLog(session.ExportIDToError(ctx)) + handler = h + } else { + newError("non existing tag for platform initialized detour: ", forcedOutboundTag).AtError().WriteToLog(session.ExportIDToError(ctx)) + common.Close(link.Writer) + common.Interrupt(link.Reader) + return + } + } else if d.router != nil { if route, err := d.router.PickRoute(routing_session.AsRoutingContext(ctx)); err == nil { tag := route.GetOutboundTag() if h := d.ohm.GetHandler(tag); h != nil { diff --git a/app/observatory/config.pb.go b/app/observatory/config.pb.go new file mode 100644 index 00000000..8613f389 --- /dev/null +++ b/app/observatory/config.pb.go @@ -0,0 +1,530 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.18.0 +// source: app/observatory/config.proto + +package observatory + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ObservationResult struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status []*OutboundStatus `protobuf:"bytes,1,rep,name=status,proto3" json:"status,omitempty"` +} + +func (x *ObservationResult) Reset() { + *x = ObservationResult{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_config_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ObservationResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ObservationResult) ProtoMessage() {} + +func (x *ObservationResult) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_config_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ObservationResult.ProtoReflect.Descriptor instead. +func (*ObservationResult) Descriptor() ([]byte, []int) { + return file_app_observatory_config_proto_rawDescGZIP(), []int{0} +} + +func (x *ObservationResult) GetStatus() []*OutboundStatus { + if x != nil { + return x.Status + } + return nil +} + +type OutboundStatus struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // @Document Whether this outbound is usable + //@Restriction ReadOnlyForUser + Alive bool `protobuf:"varint,1,opt,name=alive,proto3" json:"alive,omitempty"` + // @Document The time for probe request to finish. + //@Type time.ms + //@Restriction ReadOnlyForUser + Delay int64 `protobuf:"varint,2,opt,name=delay,proto3" json:"delay,omitempty"` + // @Document The last error caused this outbound failed to relay probe request + //@Restriction NotMachineReadable + LastErrorReason string `protobuf:"bytes,3,opt,name=last_error_reason,json=lastErrorReason,proto3" json:"last_error_reason,omitempty"` + // @Document The outbound tag for this Server + //@Type id.outboundTag + OutboundTag string `protobuf:"bytes,4,opt,name=outbound_tag,json=outboundTag,proto3" json:"outbound_tag,omitempty"` + // @Document The time this outbound is known to be alive + //@Type id.outboundTag + LastSeenTime int64 `protobuf:"varint,5,opt,name=last_seen_time,json=lastSeenTime,proto3" json:"last_seen_time,omitempty"` + // @Document The time this outbound is tried + //@Type id.outboundTag + LastTryTime int64 `protobuf:"varint,6,opt,name=last_try_time,json=lastTryTime,proto3" json:"last_try_time,omitempty"` +} + +func (x *OutboundStatus) Reset() { + *x = OutboundStatus{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_config_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *OutboundStatus) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*OutboundStatus) ProtoMessage() {} + +func (x *OutboundStatus) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_config_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use OutboundStatus.ProtoReflect.Descriptor instead. +func (*OutboundStatus) Descriptor() ([]byte, []int) { + return file_app_observatory_config_proto_rawDescGZIP(), []int{1} +} + +func (x *OutboundStatus) GetAlive() bool { + if x != nil { + return x.Alive + } + return false +} + +func (x *OutboundStatus) GetDelay() int64 { + if x != nil { + return x.Delay + } + return 0 +} + +func (x *OutboundStatus) GetLastErrorReason() string { + if x != nil { + return x.LastErrorReason + } + return "" +} + +func (x *OutboundStatus) GetOutboundTag() string { + if x != nil { + return x.OutboundTag + } + return "" +} + +func (x *OutboundStatus) GetLastSeenTime() int64 { + if x != nil { + return x.LastSeenTime + } + return 0 +} + +func (x *OutboundStatus) GetLastTryTime() int64 { + if x != nil { + return x.LastTryTime + } + return 0 +} + +type ProbeResult struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // @Document Whether this outbound is usable + //@Restriction ReadOnlyForUser + Alive bool `protobuf:"varint,1,opt,name=alive,proto3" json:"alive,omitempty"` + // @Document The time for probe request to finish. + //@Type time.ms + //@Restriction ReadOnlyForUser + Delay int64 `protobuf:"varint,2,opt,name=delay,proto3" json:"delay,omitempty"` + // @Document The error caused this outbound failed to relay probe request + //@Restriction NotMachineReadable + LastErrorReason string `protobuf:"bytes,3,opt,name=last_error_reason,json=lastErrorReason,proto3" json:"last_error_reason,omitempty"` +} + +func (x *ProbeResult) Reset() { + *x = ProbeResult{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_config_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ProbeResult) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProbeResult) ProtoMessage() {} + +func (x *ProbeResult) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_config_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProbeResult.ProtoReflect.Descriptor instead. +func (*ProbeResult) Descriptor() ([]byte, []int) { + return file_app_observatory_config_proto_rawDescGZIP(), []int{2} +} + +func (x *ProbeResult) GetAlive() bool { + if x != nil { + return x.Alive + } + return false +} + +func (x *ProbeResult) GetDelay() int64 { + if x != nil { + return x.Delay + } + return 0 +} + +func (x *ProbeResult) GetLastErrorReason() string { + if x != nil { + return x.LastErrorReason + } + return "" +} + +type Intensity struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // @Document The time interval for a probe request in ms. + //@Type time.ms + ProbeInterval uint32 `protobuf:"varint,1,opt,name=probe_interval,json=probeInterval,proto3" json:"probe_interval,omitempty"` +} + +func (x *Intensity) Reset() { + *x = Intensity{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_config_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Intensity) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Intensity) ProtoMessage() {} + +func (x *Intensity) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_config_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Intensity.ProtoReflect.Descriptor instead. +func (*Intensity) Descriptor() ([]byte, []int) { + return file_app_observatory_config_proto_rawDescGZIP(), []int{3} +} + +func (x *Intensity) GetProbeInterval() uint32 { + if x != nil { + return x.ProbeInterval + } + return 0 +} + +type Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // @Document The selectors for outbound under observation + SubjectSelector []string `protobuf:"bytes,2,rep,name=subject_selector,json=subjectSelector,proto3" json:"subject_selector,omitempty"` + ProbeUrl string `protobuf:"bytes,3,opt,name=probe_url,json=probeUrl,proto3" json:"probe_url,omitempty"` + ProbeInterval int64 `protobuf:"varint,4,opt,name=probe_interval,json=probeInterval,proto3" json:"probe_interval,omitempty"` + EnableConcurrency bool `protobuf:"varint,5,opt,name=enable_concurrency,json=enableConcurrency,proto3" json:"enable_concurrency,omitempty"` +} + +func (x *Config) Reset() { + *x = Config{} + if protoimpl.UnsafeEnabled { + mi := &file_app_observatory_config_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Config) ProtoMessage() {} + +func (x *Config) ProtoReflect() protoreflect.Message { + mi := &file_app_observatory_config_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Config.ProtoReflect.Descriptor instead. +func (*Config) Descriptor() ([]byte, []int) { + return file_app_observatory_config_proto_rawDescGZIP(), []int{4} +} + +func (x *Config) GetSubjectSelector() []string { + if x != nil { + return x.SubjectSelector + } + return nil +} + +func (x *Config) GetProbeUrl() string { + if x != nil { + return x.ProbeUrl + } + return "" +} + +func (x *Config) GetProbeInterval() int64 { + if x != nil { + return x.ProbeInterval + } + return 0 +} + +func (x *Config) GetEnableConcurrency() bool { + if x != nil { + return x.EnableConcurrency + } + return false +} + +var File_app_observatory_config_proto protoreflect.FileDescriptor + +var file_app_observatory_config_proto_rawDesc = []byte{ + 0x0a, 0x1c, 0x61, 0x70, 0x70, 0x2f, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, + 0x79, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x19, + 0x78, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f, 0x62, + 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x22, 0x56, 0x0a, 0x11, 0x4f, 0x62, 0x73, + 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x41, + 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, + 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f, + 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x4f, 0x75, 0x74, 0x62, 0x6f, + 0x75, 0x6e, 0x64, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x22, 0xd5, 0x01, 0x0a, 0x0e, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x05, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65, + 0x6c, 0x61, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, + 0x12, 0x2a, 0x0a, 0x11, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x72, + 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x6c, 0x61, 0x73, + 0x74, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c, + 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x74, 0x61, 0x67, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0b, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x54, 0x61, 0x67, 0x12, + 0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x65, 0x65, 0x6e, 0x5f, 0x74, 0x69, 0x6d, + 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x65, + 0x6e, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x22, 0x0a, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x74, 0x72, + 0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x6c, 0x61, + 0x73, 0x74, 0x54, 0x72, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x65, 0x0a, 0x0b, 0x50, 0x72, 0x6f, + 0x62, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x6c, 0x69, 0x76, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x12, 0x14, + 0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, + 0x65, 0x6c, 0x61, 0x79, 0x12, 0x2a, 0x0a, 0x11, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x65, 0x72, 0x72, + 0x6f, 0x72, 0x5f, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0f, 0x6c, 0x61, 0x73, 0x74, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, + 0x22, 0x32, 0x0a, 0x09, 0x49, 0x6e, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x74, 0x79, 0x12, 0x25, 0x0a, + 0x0e, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0d, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x49, 0x6e, 0x74, 0x65, + 0x72, 0x76, 0x61, 0x6c, 0x22, 0xa6, 0x01, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, + 0x29, 0x0a, 0x10, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x5f, 0x73, 0x65, 0x6c, 0x65, 0x63, + 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x73, 0x75, 0x62, 0x6a, 0x65, + 0x63, 0x74, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x72, + 0x6f, 0x62, 0x65, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, + 0x72, 0x6f, 0x62, 0x65, 0x55, 0x72, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x72, 0x6f, 0x62, 0x65, + 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x0d, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x12, 0x2d, + 0x0a, 0x12, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, + 0x65, 0x6e, 0x63, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x65, 0x6e, 0x61, 0x62, + 0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x42, 0x5e, 0x0a, + 0x18, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f, 0x62, + 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x50, 0x01, 0x5a, 0x29, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, + 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x6f, 0x62, 0x73, 0x65, 0x72, + 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0xaa, 0x02, 0x14, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x41, 0x70, + 0x70, 0x2e, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_app_observatory_config_proto_rawDescOnce sync.Once + file_app_observatory_config_proto_rawDescData = file_app_observatory_config_proto_rawDesc +) + +func file_app_observatory_config_proto_rawDescGZIP() []byte { + file_app_observatory_config_proto_rawDescOnce.Do(func() { + file_app_observatory_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_app_observatory_config_proto_rawDescData) + }) + return file_app_observatory_config_proto_rawDescData +} + +var file_app_observatory_config_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_app_observatory_config_proto_goTypes = []interface{}{ + (*ObservationResult)(nil), // 0: xray.core.app.observatory.ObservationResult + (*OutboundStatus)(nil), // 1: xray.core.app.observatory.OutboundStatus + (*ProbeResult)(nil), // 2: xray.core.app.observatory.ProbeResult + (*Intensity)(nil), // 3: xray.core.app.observatory.Intensity + (*Config)(nil), // 4: xray.core.app.observatory.Config +} +var file_app_observatory_config_proto_depIdxs = []int32{ + 1, // 0: xray.core.app.observatory.ObservationResult.status:type_name -> xray.core.app.observatory.OutboundStatus + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_app_observatory_config_proto_init() } +func file_app_observatory_config_proto_init() { + if File_app_observatory_config_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_app_observatory_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ObservationResult); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_observatory_config_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*OutboundStatus); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_observatory_config_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ProbeResult); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_observatory_config_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Intensity); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_app_observatory_config_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Config); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_app_observatory_config_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_app_observatory_config_proto_goTypes, + DependencyIndexes: file_app_observatory_config_proto_depIdxs, + MessageInfos: file_app_observatory_config_proto_msgTypes, + }.Build() + File_app_observatory_config_proto = out.File + file_app_observatory_config_proto_rawDesc = nil + file_app_observatory_config_proto_goTypes = nil + file_app_observatory_config_proto_depIdxs = nil +} diff --git a/app/observatory/config.proto b/app/observatory/config.proto new file mode 100644 index 00000000..9ac9c549 --- /dev/null +++ b/app/observatory/config.proto @@ -0,0 +1,73 @@ +syntax = "proto3"; + +package xray.core.app.observatory; +option csharp_namespace = "Xray.App.Observatory"; +option go_package = "github.com/xtls/xray-core/app/observatory"; +option java_package = "com.xray.app.observatory"; +option java_multiple_files = true; + +message ObservationResult { + repeated OutboundStatus status = 1; +} + +message OutboundStatus{ + /* @Document Whether this outbound is usable + @Restriction ReadOnlyForUser + */ + bool alive = 1; + /* @Document The time for probe request to finish. + @Type time.ms + @Restriction ReadOnlyForUser + */ + int64 delay = 2; + /* @Document The last error caused this outbound failed to relay probe request + @Restriction NotMachineReadable + */ + string last_error_reason = 3; + /* @Document The outbound tag for this Server + @Type id.outboundTag + */ + string outbound_tag = 4; + /* @Document The time this outbound is known to be alive + @Type id.outboundTag +*/ + int64 last_seen_time = 5; + /* @Document The time this outbound is tried + @Type id.outboundTag +*/ + int64 last_try_time = 6; +} + +message ProbeResult{ + /* @Document Whether this outbound is usable + @Restriction ReadOnlyForUser + */ + bool alive = 1; + /* @Document The time for probe request to finish. + @Type time.ms + @Restriction ReadOnlyForUser + */ + int64 delay = 2; + /* @Document The error caused this outbound failed to relay probe request + @Restriction NotMachineReadable +*/ + string last_error_reason = 3; +} + +message Intensity{ + /* @Document The time interval for a probe request in ms. + @Type time.ms + */ + uint32 probe_interval = 1; +} +message Config { + /* @Document The selectors for outbound under observation + */ + repeated string subject_selector = 2; + + string probe_url = 3; + + int64 probe_interval = 4; + + bool enable_concurrency = 5; +} \ No newline at end of file diff --git a/app/observatory/errors.generated.go b/app/observatory/errors.generated.go new file mode 100644 index 00000000..5cd4408a --- /dev/null +++ b/app/observatory/errors.generated.go @@ -0,0 +1,9 @@ +package observatory + +import "github.com/xtls/xray-core/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/app/observatory/explainErrors.go b/app/observatory/explainErrors.go new file mode 100644 index 00000000..9aba7923 --- /dev/null +++ b/app/observatory/explainErrors.go @@ -0,0 +1,26 @@ +package observatory + +import "github.com/xtls/xray-core/common/errors" + +type errorCollector struct { + errors *errors.Error +} + +func (e *errorCollector) SubmitError(err error) { + if e.errors == nil { + e.errors = newError("underlying connection error").Base(err) + return + } + e.errors = e.errors.Base(newError("underlying connection error").Base(err)) +} + +func newErrorCollector() *errorCollector { + return &errorCollector{} +} + +func (e *errorCollector) UnderlyingError() error { + if e.errors == nil { + return newError("failed to produce report") + } + return e.errors +} diff --git a/app/observatory/observatory.go b/app/observatory/observatory.go new file mode 100644 index 00000000..64726885 --- /dev/null +++ b/app/observatory/observatory.go @@ -0,0 +1,3 @@ +package observatory + +//go:generate go run github.com/xtls/xray-core/common/errors/errorgen diff --git a/app/observatory/observer.go b/app/observatory/observer.go new file mode 100644 index 00000000..618ecccb --- /dev/null +++ b/app/observatory/observer.go @@ -0,0 +1,216 @@ +package observatory + +import ( + "context" + "github.com/xtls/xray-core/core" + "net" + "net/http" + "net/url" + "sort" + "sync" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/xtls/xray-core/common" + v2net "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/common/signal/done" + "github.com/xtls/xray-core/common/task" + "github.com/xtls/xray-core/features/extension" + "github.com/xtls/xray-core/features/outbound" + "github.com/xtls/xray-core/transport/internet/tagged" +) + +type Observer struct { + config *Config + ctx context.Context + + statusLock sync.Mutex + status []*OutboundStatus + + finished *done.Instance + + ohm outbound.Manager +} + +func (o *Observer) GetObservation(ctx context.Context) (proto.Message, error) { + return &ObservationResult{Status: o.status}, nil +} + +func (o *Observer) Type() interface{} { + return extension.ObservatoryType() +} + +func (o *Observer) Start() error { + if o.config != nil && len(o.config.SubjectSelector) != 0 { + o.finished = done.New() + go o.background() + } + return nil +} + +func (o *Observer) Close() error { + if o.finished != nil { + return o.finished.Close() + } + return nil +} + +func (o *Observer) background() { + for !o.finished.Done() { + hs, ok := o.ohm.(outbound.HandlerSelector) + if !ok { + newError("outbound.Manager is not a HandlerSelector").WriteToLog() + return + } + + outbounds := hs.Select(o.config.SubjectSelector) + sort.Strings(outbounds) + + o.updateStatus(outbounds) + + for _, v := range outbounds { + result := o.probe(v) + o.updateStatusForResult(v, &result) + if o.finished.Done() { + return + } + sleepTime := time.Second * 10 + if o.config.ProbeInterval != 0 { + sleepTime = time.Duration(o.config.ProbeInterval) + } + time.Sleep(sleepTime) + } + } +} + +func (o *Observer) updateStatus(outbounds []string) { + o.statusLock.Lock() + defer o.statusLock.Unlock() + // TODO should remove old inbound that is removed + _ = outbounds +} + +func (o *Observer) probe(outbound string) ProbeResult { + errorCollectorForRequest := newErrorCollector() + + httpTransport := http.Transport{ + Proxy: func(*http.Request) (*url.URL, error) { + return nil, nil + }, + DialContext: func(ctx context.Context, network string, addr string) (net.Conn, error) { + var connection net.Conn + taskErr := task.Run(ctx, func() error { + // MUST use V2Fly's built in context system + dest, err := v2net.ParseDestination(network + ":" + addr) + if err != nil { + return newError("cannot understand address").Base(err) + } + trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest) + conn, err := tagged.Dialer(trackedCtx, dest, outbound) + if err != nil { + return newError("cannot dial remote address ", dest).Base(err) + } + connection = conn + return nil + }) + if taskErr != nil { + return nil, newError("cannot finish connection").Base(taskErr) + } + return connection, nil + }, + TLSHandshakeTimeout: time.Second * 5, + } + httpClient := &http.Client{ + Transport: &httpTransport, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + Jar: nil, + Timeout: time.Second * 5, + } + var GETTime time.Duration + err := task.Run(o.ctx, func() error { + startTime := time.Now() + probeURL := "https://api.v2fly.org/checkConnection.svgz" + if o.config.ProbeUrl != "" { + probeURL = o.config.ProbeUrl + } + response, err := httpClient.Get(probeURL) + if err != nil { + return newError("outbound failed to relay connection").Base(err) + } + if response.Body != nil { + response.Body.Close() + } + endTime := time.Now() + GETTime = endTime.Sub(startTime) + return nil + }) + if err != nil { + fullerr := newError("underlying connection failed").Base(errorCollectorForRequest.UnderlyingError()) + fullerr = newError("with outbound handler report").Base(fullerr) + fullerr = newError("GET request failed:", err).Base(fullerr) + fullerr = newError("the outbound ", outbound, " is dead:").Base(fullerr) + fullerr = fullerr.AtInfo() + fullerr.WriteToLog() + return ProbeResult{Alive: false, LastErrorReason: fullerr.Error()} + } + newError("the outbound ", outbound, " is alive:", GETTime.Seconds()).AtInfo().WriteToLog() + return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()} +} + +func (o *Observer) updateStatusForResult(outbound string, result *ProbeResult) { + o.statusLock.Lock() + defer o.statusLock.Unlock() + var status *OutboundStatus + if location := o.findStatusLocationLockHolderOnly(outbound); location != -1 { + status = o.status[location] + } else { + status = &OutboundStatus{} + o.status = append(o.status, status) + } + + status.LastTryTime = time.Now().Unix() + status.OutboundTag = outbound + status.Alive = result.Alive + if result.Alive { + status.Delay = result.Delay + status.LastSeenTime = status.LastTryTime + status.LastErrorReason = "" + } else { + status.LastErrorReason = result.LastErrorReason + status.Delay = 99999999 + } +} + +func (o *Observer) findStatusLocationLockHolderOnly(outbound string) int { + for i, v := range o.status { + if v.OutboundTag == outbound { + return i + } + } + return -1 +} + +func New(ctx context.Context, config *Config) (*Observer, error) { + var outboundManager outbound.Manager + err := core.RequireFeatures(ctx, func(om outbound.Manager) { + outboundManager = om + }) + if err != nil { + return nil, newError("Cannot get depended features").Base(err) + } + return &Observer{ + config: config, + ctx: ctx, + ohm: outboundManager, + }, nil +} + +func init() { + common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { + return New(ctx, config.(*Config)) + })) +} diff --git a/app/router/balancing.go b/app/router/balancing.go index d4642bd9..b7c43f32 100644 --- a/app/router/balancing.go +++ b/app/router/balancing.go @@ -1,7 +1,10 @@ package router import ( + "context" + "github.com/xtls/xray-core/common/dice" + "github.com/xtls/xray-core/features/extension" "github.com/xtls/xray-core/features/outbound" ) @@ -41,3 +44,8 @@ func (b *Balancer) PickOutbound() (string, error) { } return tag, nil } +func (b *Balancer) InjectContext(ctx context.Context) { + if contextReceiver, ok := b.strategy.(extension.ContextReceiver); ok { + contextReceiver.InjectContext(ctx) + } +} diff --git a/app/router/config.pb.go b/app/router/config.pb.go index 842e30d8..36f389f6 100644 --- a/app/router/config.pb.go +++ b/app/router/config.pb.go @@ -708,6 +708,7 @@ type BalancingRule struct { Tag string `protobuf:"bytes,1,opt,name=tag,proto3" json:"tag,omitempty"` OutboundSelector []string `protobuf:"bytes,2,rep,name=outbound_selector,json=outboundSelector,proto3" json:"outbound_selector,omitempty"` + Strategy string `protobuf:"bytes,3,opt,name=strategy,proto3" json:"strategy,omitempty"` } func (x *BalancingRule) Reset() { @@ -756,6 +757,13 @@ func (x *BalancingRule) GetOutboundSelector() []string { return nil } +func (x *BalancingRule) GetStrategy() string { + if x != nil { + return x.Strategy + } + return "" +} + type Config struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1010,36 +1018,38 @@ var file_app_router_config_proto_rawDesc = []byte{ 0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x5f, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x18, 0x11, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, - 0x42, 0x0c, 0x0a, 0x0a, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x5f, 0x74, 0x61, 0x67, 0x22, 0x4e, + 0x42, 0x0c, 0x0a, 0x0a, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x5f, 0x74, 0x61, 0x67, 0x22, 0x6a, 0x0a, 0x0d, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x52, 0x75, 0x6c, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x74, 0x61, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x74, 0x61, 0x67, 0x12, 0x2b, 0x0a, 0x11, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x73, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x10, 0x6f, 0x75, - 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x22, 0x9b, - 0x02, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4f, 0x0a, 0x0f, 0x64, 0x6f, 0x6d, - 0x61, 0x69, 0x6e, 0x5f, 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0e, 0x32, 0x26, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, - 0x75, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x44, 0x6f, 0x6d, 0x61, - 0x69, 0x6e, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x52, 0x0e, 0x64, 0x6f, 0x6d, 0x61, - 0x69, 0x6e, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x30, 0x0a, 0x04, 0x72, 0x75, - 0x6c, 0x65, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, - 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x69, - 0x6e, 0x67, 0x52, 0x75, 0x6c, 0x65, 0x52, 0x04, 0x72, 0x75, 0x6c, 0x65, 0x12, 0x45, 0x0a, 0x0e, - 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x5f, 0x72, 0x75, 0x6c, 0x65, 0x18, 0x03, - 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, - 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, - 0x52, 0x75, 0x6c, 0x65, 0x52, 0x0d, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x52, - 0x75, 0x6c, 0x65, 0x22, 0x47, 0x0a, 0x0e, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x74, 0x72, - 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x08, 0x0a, 0x04, 0x41, 0x73, 0x49, 0x73, 0x10, 0x00, 0x12, - 0x09, 0x0a, 0x05, 0x55, 0x73, 0x65, 0x49, 0x70, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x49, 0x70, - 0x49, 0x66, 0x4e, 0x6f, 0x6e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, - 0x49, 0x70, 0x4f, 0x6e, 0x44, 0x65, 0x6d, 0x61, 0x6e, 0x64, 0x10, 0x03, 0x42, 0x4f, 0x0a, 0x13, - 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, - 0x74, 0x65, 0x72, 0x50, 0x01, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, - 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0xaa, 0x02, 0x0f, 0x58, 0x72, - 0x61, 0x79, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x1a, + 0x0a, 0x08, 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x08, 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x22, 0x9b, 0x02, 0x0a, 0x06, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4f, 0x0a, 0x0f, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x5f, + 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x26, + 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, + 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x74, + 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x52, 0x0e, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x74, + 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x30, 0x0a, 0x04, 0x72, 0x75, 0x6c, 0x65, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, + 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x52, 0x75, + 0x6c, 0x65, 0x52, 0x04, 0x72, 0x75, 0x6c, 0x65, 0x12, 0x45, 0x0a, 0x0e, 0x62, 0x61, 0x6c, 0x61, + 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x5f, 0x72, 0x75, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x1e, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, + 0x65, 0x72, 0x2e, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x52, 0x75, 0x6c, 0x65, + 0x52, 0x0d, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x52, 0x75, 0x6c, 0x65, 0x22, + 0x47, 0x0a, 0x0e, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, + 0x79, 0x12, 0x08, 0x0a, 0x04, 0x41, 0x73, 0x49, 0x73, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x55, + 0x73, 0x65, 0x49, 0x70, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x49, 0x70, 0x49, 0x66, 0x4e, 0x6f, + 0x6e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x49, 0x70, 0x4f, 0x6e, + 0x44, 0x65, 0x6d, 0x61, 0x6e, 0x64, 0x10, 0x03, 0x42, 0x4f, 0x0a, 0x13, 0x63, 0x6f, 0x6d, 0x2e, + 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x50, + 0x01, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, + 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x70, + 0x2f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0xaa, 0x02, 0x0f, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x41, + 0x70, 0x70, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( diff --git a/app/router/config.proto b/app/router/config.proto index 324899d5..2886077a 100644 --- a/app/router/config.proto +++ b/app/router/config.proto @@ -127,6 +127,7 @@ message RoutingRule { message BalancingRule { string tag = 1; repeated string outbound_selector = 2; + string strategy = 3; } message Config { diff --git a/app/router/strategy_leastping.go b/app/router/strategy_leastping.go new file mode 100644 index 00000000..e970e7f8 --- /dev/null +++ b/app/router/strategy_leastping.go @@ -0,0 +1,57 @@ +package router + +import ( + "context" + "github.com/xtls/xray-core/core" + + "github.com/xtls/xray-core/app/observatory" + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/features/extension" +) + +type LeastPingStrategy struct { + ctx context.Context + observatory extension.Observatory +} + +func (l *LeastPingStrategy) InjectContext(ctx context.Context) { + common.Must(core.RequireFeatures(ctx, func(observatory extension.Observatory) error { + l.observatory = observatory + return nil + })) + l.ctx = ctx +} + +func (l *LeastPingStrategy) PickOutbound(strings []string) string { + observeReport, err := l.observatory.GetObservation(l.ctx) + if err != nil { + newError("cannot get observe report").Base(err).WriteToLog() + return "" + } + outboundsList := outboundList(strings) + if result, ok := observeReport.(*observatory.ObservationResult); ok { + status := result.Status + leastPing := int64(99999999) + selectedOutboundName := "" + for _, v := range status { + if outboundsList.contains(v.OutboundTag) && v.Alive && v.Delay < leastPing { + selectedOutboundName = v.OutboundTag + } + } + return selectedOutboundName + } + + //No way to understand observeReport + return "" +} + +type outboundList []string + +func (o outboundList) contains(name string) bool { + for _, v := range o { + if v == name { + return true + } + } + return false +} diff --git a/common/session/context.go b/common/session/context.go index 7f1d7df9..0961daf3 100644 --- a/common/session/context.go +++ b/common/session/context.go @@ -11,6 +11,7 @@ const ( contentSessionKey muxPreferedSessionKey sockoptSessionKey + trackedConnectionErrorKey ) // ContextWithID returns a new context with the given ID. @@ -84,3 +85,33 @@ func SockoptFromContext(ctx context.Context) *Sockopt { } return nil } + +func GetForcedOutboundTagFromContext(ctx context.Context) string { + if ContentFromContext(ctx) == nil { + return "" + } + return ContentFromContext(ctx).Attribute("forcedOutboundTag") +} + +func SetForcedOutboundTagToContext(ctx context.Context, tag string) context.Context { + if contentFromContext := ContentFromContext(ctx); contentFromContext == nil { + ctx = ContextWithContent(ctx, &Content{}) + } + ContentFromContext(ctx).SetAttribute("forcedOutboundTag", tag) + return ctx +} + +type TrackedRequestErrorFeedback interface { + SubmitError(err error) +} + +func SubmitOutboundErrorToOriginator(ctx context.Context, err error) { + if errorTracker := ctx.Value(trackedConnectionErrorKey); errorTracker != nil { + errorTracker := errorTracker.(TrackedRequestErrorFeedback) + errorTracker.SubmitError(err) + } +} + +func TrackedConnectionError(ctx context.Context, tracker TrackedRequestErrorFeedback) context.Context { + return context.WithValue(ctx, trackedConnectionErrorKey, tracker) +} diff --git a/features/extension/contextreceiver.go b/features/extension/contextreceiver.go new file mode 100644 index 00000000..2d339479 --- /dev/null +++ b/features/extension/contextreceiver.go @@ -0,0 +1,7 @@ +package extension + +import "context" + +type ContextReceiver interface { + InjectContext(ctx context.Context) +} diff --git a/features/extension/observatory.go b/features/extension/observatory.go new file mode 100644 index 00000000..27252782 --- /dev/null +++ b/features/extension/observatory.go @@ -0,0 +1,19 @@ +package extension + +import ( + "context" + + "github.com/golang/protobuf/proto" + + "github.com/xtls/xray-core/features" +) + +type Observatory interface { + features.Feature + + GetObservation(ctx context.Context) (proto.Message, error) +} + +func ObservatoryType() interface{} { + return (*Observatory)(nil) +} diff --git a/infra/conf/observatory.go b/infra/conf/observatory.go new file mode 100644 index 00000000..70624808 --- /dev/null +++ b/infra/conf/observatory.go @@ -0,0 +1,14 @@ +package conf + +import ( + "github.com/golang/protobuf/proto" + "github.com/xtls/xray-core/app/observatory" +) + +type ObservatoryConfig struct { + SubjectSelector []string `json:"subjectSelector"` +} + +func (o ObservatoryConfig) Build() (proto.Message, error) { + return &observatory.Config{SubjectSelector: o.SubjectSelector}, nil +} diff --git a/infra/conf/xray.go b/infra/conf/xray.go index 8471d618..28a155c9 100644 --- a/infra/conf/xray.go +++ b/infra/conf/xray.go @@ -6,12 +6,12 @@ import ( "os" "strings" - "github.com/xtls/xray-core/transport/internet" - "github.com/xtls/xray-core/app/dispatcher" "github.com/xtls/xray-core/app/proxyman" "github.com/xtls/xray-core/app/stats" "github.com/xtls/xray-core/common/serial" + "github.com/xtls/xray-core/transport/internet" + core "github.com/xtls/xray-core/core" "github.com/xtls/xray-core/transport/internet/xtls" ) @@ -405,6 +405,7 @@ type Config struct { Stats *StatsConfig `json:"stats"` Reverse *ReverseConfig `json:"reverse"` FakeDNS *FakeDNSConfig `json:"fakeDns"` + Observatory *ObservatoryConfig `json:"observatory"` } func (c *Config) findInboundTag(tag string) int { @@ -611,6 +612,14 @@ func (c *Config) Build() (*core.Config, error) { config.App = append(config.App, serial.ToTypedMessage(r)) } + if c.Observatory != nil { + r, err := c.Observatory.Build() + if err != nil { + return nil, err + } + config.App = append(config.App, serial.ToTypedMessage(r)) + } + var inbounds []InboundDetourConfig if c.InboundConfig != nil { diff --git a/main/distro/all/all.go b/main/distro/all/all.go index ca45123d..166b2b9b 100644 --- a/main/distro/all/all.go +++ b/main/distro/all/all.go @@ -23,6 +23,9 @@ import ( _ "github.com/xtls/xray-core/app/router" _ "github.com/xtls/xray-core/app/stats" + // Fix dependency cycle caused by core import in internet package + _ "github.com/xtls/xray-core/transport/internet/tagged/taggedimpl" + // Inbound and outbound proxies. _ "github.com/xtls/xray-core/proxy/blackhole" _ "github.com/xtls/xray-core/proxy/dns" diff --git a/transport/internet/tagged/tagged.go b/transport/internet/tagged/tagged.go new file mode 100644 index 00000000..430f9640 --- /dev/null +++ b/transport/internet/tagged/tagged.go @@ -0,0 +1,11 @@ +package tagged + +import ( + "context" + + "github.com/xtls/xray-core/common/net" +) + +type DialFunc func(ctx context.Context, dest net.Destination, tag string) (net.Conn, error) + +var Dialer DialFunc diff --git a/transport/internet/tagged/taggedimpl/errors.generated.go b/transport/internet/tagged/taggedimpl/errors.generated.go new file mode 100644 index 00000000..178794cc --- /dev/null +++ b/transport/internet/tagged/taggedimpl/errors.generated.go @@ -0,0 +1,9 @@ +package taggedimpl + +import "github.com/xtls/xray-core/common/errors" + +type errPathObjHolder struct{} + +func newError(values ...interface{}) *errors.Error { + return errors.New(values...).WithPathObj(errPathObjHolder{}) +} diff --git a/transport/internet/tagged/taggedimpl/impl.go b/transport/internet/tagged/taggedimpl/impl.go new file mode 100644 index 00000000..f54116b0 --- /dev/null +++ b/transport/internet/tagged/taggedimpl/impl.go @@ -0,0 +1,46 @@ +package taggedimpl + +import ( + "context" + "github.com/xtls/xray-core/common/net/cnc" + "github.com/xtls/xray-core/core" + + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/features/routing" + "github.com/xtls/xray-core/transport/internet/tagged" +) + +func DialTaggedOutbound(ctx context.Context, dest net.Destination, tag string) (net.Conn, error) { + var dispatcher routing.Dispatcher + if core.FromContext(ctx) == nil { + return nil, newError("Instance context variable is not in context, dial denied. ") + } + if err := core.RequireFeatures(ctx, func(dispatcherInstance routing.Dispatcher) { + dispatcher = dispatcherInstance + }); err != nil { + return nil, newError("Required Feature dispatcher not resolved").Base(err) + } + + content := new(session.Content) + content.SkipDNSResolve = true + + ctx = session.ContextWithContent(ctx, content) + ctx = session.SetForcedOutboundTagToContext(ctx, tag) + + r, err := dispatcher.Dispatch(ctx, dest) + if err != nil { + return nil, err + } + var readerOpt cnc.ConnectionOption + if dest.Network == net.Network_TCP { + readerOpt = cnc.ConnectionOutputMulti(r.Reader) + } else { + readerOpt = cnc.ConnectionOutputMultiUDP(r.Reader) + } + return cnc.NewConnection(cnc.ConnectionInputMulti(r.Writer), readerOpt), nil +} + +func init() { + tagged.Dialer = DialTaggedOutbound +} diff --git a/transport/internet/tagged/taggedimpl/taggedimpl.go b/transport/internet/tagged/taggedimpl/taggedimpl.go new file mode 100644 index 00000000..ef2f3c02 --- /dev/null +++ b/transport/internet/tagged/taggedimpl/taggedimpl.go @@ -0,0 +1,3 @@ +package taggedimpl + +//go:generate go run github.com/xtls/xray-core/common/errors/errorgen