feat: drop frame in kernel to take advance of happy eyeballs if outbound shoud fail

This commit is contained in:
mzz2017 2023-02-08 20:54:28 +08:00 committed by mzz
parent 5e7b68822a
commit 2ef332b018
10 changed files with 110 additions and 23 deletions

View File

@ -60,7 +60,8 @@ const (
OutboundLogicalOr OutboundIndex = 0xFE
OutboundLogicalAnd OutboundIndex = 0xFF
OutboundLogicalMax = OutboundLogicalAnd
OutboundMax = OutboundLogicalAnd
OutboundUserDefinedMax = OutboundControlPlaneDirect - 1
)
func (i OutboundIndex) String() string {

View File

@ -27,6 +27,8 @@ type AliveDialerSet struct {
l4proto consts.L4ProtoStr
ipversion consts.IpVersionStr
aliveChangeCallback func(alive bool)
mu sync.Mutex
dialerToIndex map[*Dialer]int // *Dialer -> index of inorderedAliveDialerSet
dialerToLatency map[*Dialer]time.Duration
@ -43,6 +45,7 @@ func NewAliveDialerSet(
ipversion consts.IpVersionStr,
selectionPolicy consts.DialerSelectionPolicy,
dialers []*Dialer,
aliveChangeCallback func(alive bool),
setAlive bool,
) *AliveDialerSet {
a := &AliveDialerSet{
@ -50,6 +53,7 @@ func NewAliveDialerSet(
dialerGroupName: dialerGroupName,
l4proto: l4proto,
ipversion: ipversion,
aliveChangeCallback: aliveChangeCallback,
dialerToIndex: make(map[*Dialer]int),
dialerToLatency: make(map[*Dialer]time.Duration),
inorderedAliveDialerSet: make([]*Dialer, 0, len(dialers)),
@ -108,6 +112,7 @@ func (a *AliveDialerSet) SetAlive(dialer *Dialer, alive bool) {
// This dialer is already alive.
} else {
// Not alive -> alive.
defer a.aliveChangeCallback(true)
a.dialerToIndex[dialer] = len(a.inorderedAliveDialerSet)
a.inorderedAliveDialerSet = append(a.inorderedAliveDialerSet, dialer)
}
@ -115,6 +120,7 @@ func (a *AliveDialerSet) SetAlive(dialer *Dialer, alive bool) {
index := a.dialerToIndex[dialer]
if index >= 0 {
// Alive -> not alive.
defer a.aliveChangeCallback(false)
// Remove the dialer from inorderedAliveDialerSet.
if index >= len(a.inorderedAliveDialerSet) {
a.log.Panicf("index:%v >= len(a.inorderedAliveDialerSet):%v", index, len(a.inorderedAliveDialerSet))

View File

@ -284,7 +284,7 @@ func (d *Dialer) Check(timeout time.Duration,
"network": string(opts.ResultLogger.L4proto) + string(opts.ResultLogger.IpVersion),
"node": d.name,
"err": err.Error(),
}).Debugln("Connectivity Check")
}).Debugln("Connectivity Check Failed")
}
opts.ResultLogger.LatencyN.AppendLatency(timeout)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/v2rayA/dae/common/consts"
"github.com/v2rayA/dae/component/outbound/dialer"
"golang.org/x/net/proxy"
"golang.org/x/sys/unix"
"net"
"net/netip"
"strings"
@ -34,13 +35,13 @@ type DialerGroup struct {
selectionPolicy *DialerSelectionPolicy
}
func NewDialerGroup(option *dialer.GlobalOption, name string, dialers []*dialer.Dialer, p DialerSelectionPolicy) *DialerGroup {
func NewDialerGroup(option *dialer.GlobalOption, name string, dialers []*dialer.Dialer, p DialerSelectionPolicy, aliveChangeCallback func(alive bool, l4proto uint8, ipversion uint8)) *DialerGroup {
log := option.Log
var registeredAliveDialerSet bool
aliveTcp4DialerSet := dialer.NewAliveDialerSet(log, name, consts.L4ProtoStr_TCP, consts.IpVersionStr_4, p.Policy, dialers, true)
aliveTcp6DialerSet := dialer.NewAliveDialerSet(log, name, consts.L4ProtoStr_TCP, consts.IpVersionStr_6, p.Policy, dialers, true)
aliveUdp4DialerSet := dialer.NewAliveDialerSet(log, name, consts.L4ProtoStr_UDP, consts.IpVersionStr_4, p.Policy, dialers, true)
aliveUdp6DialerSet := dialer.NewAliveDialerSet(log, name, consts.L4ProtoStr_UDP, consts.IpVersionStr_6, p.Policy, dialers, true)
aliveTcp4DialerSet := dialer.NewAliveDialerSet(log, name, consts.L4ProtoStr_TCP, consts.IpVersionStr_4, p.Policy, dialers, func(alive bool) { aliveChangeCallback(alive, unix.IPPROTO_TCP, 4) }, true)
aliveTcp6DialerSet := dialer.NewAliveDialerSet(log, name, consts.L4ProtoStr_TCP, consts.IpVersionStr_6, p.Policy, dialers, func(alive bool) { aliveChangeCallback(alive, unix.IPPROTO_TCP, 6) }, true)
aliveUdp4DialerSet := dialer.NewAliveDialerSet(log, name, consts.L4ProtoStr_UDP, consts.IpVersionStr_4, p.Policy, dialers, func(alive bool) { aliveChangeCallback(alive, unix.IPPROTO_UDP, 4) }, true)
aliveUdp6DialerSet := dialer.NewAliveDialerSet(log, name, consts.L4ProtoStr_UDP, consts.IpVersionStr_6, p.Policy, dialers, func(alive bool) { aliveChangeCallback(alive, unix.IPPROTO_UDP, 6) }, true)
switch p.Policy {
case consts.DialerSelectionPolicy_Random,

22
control/connectivity.go Normal file
View File

@ -0,0 +1,22 @@
/*
* SPDX-License-Identifier: AGPL-3.0-only
* Copyright (c) since 2023, mzz2017 <mzz@tuta.io>
*/
package control
import "github.com/cilium/ebpf"
func (c *ControlPlaneCore) OutboundAliveChangeCallback(outbound uint8) func(alive bool, l4proto uint8, ipversion uint8) {
return func(alive bool, l4proto uint8, ipversion uint8) {
value := uint32(0)
if alive {
value = 1
}
_ = c.bpf.OutboundConnectivityMap.Update(bpfOutboundConnectivityQuery{
Outbound: outbound,
L4proto: l4proto,
Ipversion: ipversion,
}, value, ebpf.UpdateAny)
}
}

View File

@ -33,7 +33,9 @@ import (
)
type ControlPlane struct {
*ControlPlaneCore
log *logrus.Logger
core *ControlPlaneCore
deferFuncs []func() error
listenIp string
@ -81,7 +83,7 @@ func NewControlPlane(
}
if kernelVersion.Less(consts.BasicFeatureVersion) {
return nil, fmt.Errorf("your kernel version %v does not satisfy basic requirement; expect >=%v",
c.kernelVersion.String(),
kernelVersion.String(),
consts.BasicFeatureVersion.String())
}
@ -209,13 +211,13 @@ func NewControlPlane(
outbound.DialerSelectionPolicy{
Policy: consts.DialerSelectionPolicy_Fixed,
FixedIndex: 0,
}),
}, core.OutboundAliveChangeCallback(0)),
outbound.NewDialerGroup(option, consts.OutboundBlock.String(),
[]*dialer.Dialer{dialer.NewBlockDialer(option)},
outbound.DialerSelectionPolicy{
Policy: consts.DialerSelectionPolicy_Fixed,
FixedIndex: 0,
}),
}, core.OutboundAliveChangeCallback(1)),
}
// Filter out groups.
@ -242,17 +244,20 @@ func NewControlPlane(
log.Infoln("\t<Empty>")
}
// Create dialer group and append it to outbounds.
dialerGroup := outbound.NewDialerGroup(option, group.Name, dialers, *policy)
dialerGroup := outbound.NewDialerGroup(option, group.Name, dialers, *policy, core.OutboundAliveChangeCallback(uint8(len(outbounds))))
outbounds = append(outbounds, dialerGroup)
}
/// Routing.
// Generate outboundName2Id from outbounds.
if len(outbounds) > 0xff {
if len(outbounds) > int(consts.OutboundUserDefinedMax) {
return nil, fmt.Errorf("too many outbounds")
}
outboundName2Id := make(map[string]uint8)
for i, o := range outbounds {
if _, exist := outboundName2Id[o.Name]; exist {
return nil, fmt.Errorf("duplicated outbound name: %v", o.Name)
}
outboundName2Id[o.Name] = uint8(i)
}
builder := NewRoutingMatcherBuilder(outboundName2Id, &bpf)
@ -309,7 +314,8 @@ func NewControlPlane(
listenIp = "0.0.0.0"
}
return &ControlPlane{
ControlPlaneCore: core,
log: log,
core: core,
deferFuncs: nil,
listenIp: listenIp,
outbounds: outbounds,
@ -352,7 +358,7 @@ func (c *ControlPlane) ListenAndServe(port uint16) (err error) {
c.deferFuncs = append(c.deferFuncs, func() error {
return tcpFile.Close()
})
if err := c.bpf.ListenSocketMap.Update(consts.ZeroKey, uint64(tcpFile.Fd()), ebpf.UpdateAny); err != nil {
if err := c.core.bpf.ListenSocketMap.Update(consts.ZeroKey, uint64(tcpFile.Fd()), ebpf.UpdateAny); err != nil {
return err
}
// UDP socket.
@ -363,11 +369,11 @@ func (c *ControlPlane) ListenAndServe(port uint16) (err error) {
c.deferFuncs = append(c.deferFuncs, func() error {
return udpFile.Close()
})
if err := c.bpf.ListenSocketMap.Update(consts.OneKey, uint64(udpFile.Fd()), ebpf.UpdateAny); err != nil {
if err := c.core.bpf.ListenSocketMap.Update(consts.OneKey, uint64(udpFile.Fd()), ebpf.UpdateAny); err != nil {
return err
}
// Port.
if err := c.bpf.ParamMap.Update(consts.BigEndianTproxyPortKey, uint32(internal.Htons(port)), ebpf.UpdateAny); err != nil {
if err := c.core.bpf.ParamMap.Update(consts.BigEndianTproxyPortKey, uint32(internal.Htons(port)), ebpf.UpdateAny); err != nil {
return err
}
@ -407,7 +413,7 @@ func (c *ControlPlane) ListenAndServe(port uint16) (err error) {
}
dst := RetrieveOriginalDest(oob[:oobn])
var newBuf []byte
outboundIndex, err := c.RetrieveOutboundIndex(src, dst, unix.IPPROTO_UDP)
outboundIndex, err := c.core.RetrieveOutboundIndex(src, dst, unix.IPPROTO_UDP)
if err != nil {
// WAN. Old method.
addrHdr, dataOffset, err := ParseAddrHdr(buf[:n])
@ -448,5 +454,5 @@ func (c *ControlPlane) Close() (err error) {
}
}
}
return c.ControlPlaneCore.Close()
return c.core.Close()
}

View File

@ -74,7 +74,7 @@ func (c *ControlPlane) BatchUpdateDomainRouting(cache *dnsCache) error {
Bitmap: cache.DomainBitmap,
})
}
if _, err := BatchUpdate(c.bpf.DomainRoutingMap, keys, vals, &ebpf.BatchOptions{
if _, err := BatchUpdate(c.core.bpf.DomainRoutingMap, keys, vals, &ebpf.BatchOptions{
ElemFlags: uint64(ebpf.UpdateAny),
}); err != nil {
return err

View File

@ -74,6 +74,21 @@ enum {
DisableL4ChecksumPolicy_SetZero,
};
// Outbound Connectivity Map:
struct outbound_connectivity_query {
__u8 outbound;
__u8 l4proto;
__u8 ipversion;
};
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct outbound_connectivity_query);
__type(value, __u32); // true, false
__uint(max_entries, 256 * 2 * 2); // outbound * l4proto * ipversion
} outbound_connectivity_map SEC(".maps");
// Sockmap:
struct {
__uint(type, BPF_MAP_TYPE_SOCKMAP);
@ -1286,6 +1301,18 @@ new_connection:
goto block;
}
// Check outbound connectivity in specific ipversion and l4proto.
struct outbound_connectivity_query q = {0};
q.outbound = outbound;
q.ipversion = ipversion;
q.l4proto = l4proto;
__u32 *alive;
alive = bpf_map_lookup_elem(&outbound_connectivity_map, &q);
if (alive && *alive == 0) {
// Outbound is not alive.
goto block;
}
// Save routing result.
if ((ret = bpf_map_update_elem(&routing_tuples_map, &tuples, &outbound,
BPF_ANY))) {
@ -1591,6 +1618,18 @@ int tproxy_wan_egress(struct __sk_buff *skb) {
}
// Rewrite to control plane.
// Check outbound connectivity in specific ipversion and l4proto.
struct outbound_connectivity_query q = {0};
q.outbound = outbound;
q.ipversion = ipversion;
q.l4proto = l4proto;
__u32 *alive;
alive = bpf_map_lookup_elem(&outbound_connectivity_map, &q);
if (alive && *alive == 0) {
// Outbound is not alive.
return TC_ACT_SHOT;
}
if (unlikely(tcp_state_syn)) {
struct ip_port_outbound value_dst;
__builtin_memset(&value_dst, 0, sizeof(value_dst));
@ -1660,6 +1699,18 @@ int tproxy_wan_egress(struct __sk_buff *skb) {
// Rewrite to control plane.
// Check outbound connectivity in specific ipversion and l4proto.
struct outbound_connectivity_query q = {0};
q.outbound = new_hdr.outbound;
q.ipversion = ipversion;
q.l4proto = l4proto;
__u32 *alive;
alive = bpf_map_lookup_elem(&outbound_connectivity_map, &q);
if (alive && *alive == 0) {
// Outbound is not alive.
return TC_ACT_SHOT;
}
// Write mac.
if ((ret =
bpf_skb_store_bytes(skb, offsetof(struct ethhdr, h_dest),

View File

@ -23,12 +23,12 @@ func (c *ControlPlane) handleConn(lConn net.Conn) (err error) {
defer lConn.Close()
src := lConn.RemoteAddr().(*net.TCPAddr).AddrPort()
dst := lConn.LocalAddr().(*net.TCPAddr).AddrPort()
outboundIndex, err := c.RetrieveOutboundIndex(src, dst, unix.IPPROTO_TCP)
outboundIndex, err := c.core.RetrieveOutboundIndex(src, dst, unix.IPPROTO_TCP)
if err != nil {
// WAN. Old method.
var value bpfIpPortOutbound
ip6 := src.Addr().As16()
if e := c.bpf.TcpDstMap.Lookup(bpfIpPort{
if e := c.core.bpf.TcpDstMap.Lookup(bpfIpPort{
Ip: common.Ipv6ByteSliceToUint32Array(ip6[:]),
Port: internal.Htons(src.Port()),
}, &value); e != nil {

View File

@ -36,7 +36,7 @@ func (c *ControlPlaneCore) RetrieveOutboundIndex(src, dst netip.AddrPort, l4prot
if err := c.bpf.RoutingTuplesMap.Lookup(tuples, &_outboundIndex); err != nil {
return 0, fmt.Errorf("reading map: key [%v, %v, %v]: %w", src.String(), l4proto, dst.String(), err)
}
if _outboundIndex > uint32(consts.OutboundLogicalMax) {
if _outboundIndex > uint32(consts.OutboundMax) {
return 0, fmt.Errorf("bad outbound index")
}
return consts.OutboundIndex(_outboundIndex), nil