fix: problem that disable l4proto+ipversion when single node is down

This commit is contained in:
mzz2017
2023-02-08 22:43:28 +08:00
committed by mzz
parent 00b8c6079e
commit f3e3ae2ae7
6 changed files with 40 additions and 20 deletions

View File

@ -111,16 +111,14 @@ func (a *AliveDialerSet) SetAlive(dialer *Dialer, alive bool) {
if index >= 0 {
// This dialer is already alive.
} else {
// Not alive -> alive.
defer a.aliveChangeCallback(true)
// Dialer: not alive -> alive.
a.dialerToIndex[dialer] = len(a.inorderedAliveDialerSet)
a.inorderedAliveDialerSet = append(a.inorderedAliveDialerSet, dialer)
}
} else {
index := a.dialerToIndex[dialer]
if index >= 0 {
// Alive -> not alive.
defer a.aliveChangeCallback(false)
// Dialer: alive -> not alive.
// Remove the dialer from inorderedAliveDialerSet.
if index >= len(a.inorderedAliveDialerSet) {
a.log.Panicf("index:%v >= len(a.inorderedAliveDialerSet):%v", index, len(a.inorderedAliveDialerSet))
@ -158,16 +156,22 @@ func (a *AliveDialerSet) SetAlive(dialer *Dialer, alive bool) {
}
if a.minLatency.dialer != oldBestDialer {
if a.minLatency.dialer != nil {
re := "re-"
if oldBestDialer == nil {
defer a.aliveChangeCallback(true)
re = ""
}
a.log.WithFields(logrus.Fields{
string(a.selectionPolicy): a.minLatency.latency,
"group": a.dialerGroupName,
"l4proto": a.l4proto,
"network": string(a.l4proto) + string(a.ipversion),
"dialer": a.minLatency.dialer.Name(),
}).Infof("Group re-selects dialer")
}).Infof("Group %vselects dialer", re)
} else {
defer a.aliveChangeCallback(false)
a.log.WithFields(logrus.Fields{
"group": a.dialerGroupName,
"l4proto": a.l4proto,
"network": string(a.l4proto) + string(a.ipversion),
}).Infof("Group has no dialer alive")
}
}
@ -177,7 +181,7 @@ func (a *AliveDialerSet) SetAlive(dialer *Dialer, alive bool) {
a.minLatency.dialer = dialer
a.log.WithFields(logrus.Fields{
"group": a.dialerGroupName,
"l4proto": a.l4proto,
"network": string(a.l4proto) + string(a.ipversion),
"dialer": a.minLatency.dialer.Name(),
}).Infof("Group selects dialer")
}

View File

@ -261,6 +261,9 @@ func (d *Dialer) Check(timeout time.Duration,
} else {
// Append timeout if there is any error or unexpected status code.
if err != nil {
if strings.Contains(err.Error(), "network is unreachable") {
err = fmt.Errorf("network is unreachable")
}
d.Log.WithFields(logrus.Fields{
// Add a space to ensure alphabetical order is first.
"network": string(opts.L4proto) + string(opts.IpVersion),

View File

@ -125,7 +125,7 @@ func (g *DialerGroup) Select(l4proto consts.L4ProtoStr, ipversion consts.IpVersi
if d == nil {
// No alive dialer.
g.log.WithFields(logrus.Fields{
"l4proto": l4proto,
"network": string(l4proto) + string(ipversion),
"group": g.Name,
}).Warnf("No alive dialer in DialerGroup, use \"block\".")
return g.block, nil
@ -143,7 +143,7 @@ func (g *DialerGroup) Select(l4proto consts.L4ProtoStr, ipversion consts.IpVersi
if d == nil {
// No alive dialer.
g.log.WithFields(logrus.Fields{
"l4proto": l4proto,
"network": string(l4proto) + string(ipversion),
"group": g.Name,
}).Warnf("No alive dialer in DialerGroup, use \"block\".")
return g.block, nil

View File

@ -6,16 +6,28 @@
package control
import (
"fmt"
"github.com/cilium/ebpf"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
"strconv"
)
func FormatL4Proto(l4proto uint8) string {
if l4proto == unix.IPPROTO_TCP {
return "tcp"
}
if l4proto == unix.IPPROTO_UDP {
return "udp"
}
return strconv.Itoa(int(l4proto))
}
func (c *ControlPlaneCore) OutboundAliveChangeCallback(outbound uint8) func(alive bool, l4proto uint8, ipversion uint8) {
return func(alive bool, l4proto uint8, ipversion uint8) {
c.log.WithFields(logrus.Fields{
"alive": alive,
"l4proto": l4proto,
"ipversion": ipversion,
"network": fmt.Sprintf("%v+%v", FormatL4Proto(l4proto), ipversion),
"outbound_id": outbound,
}).Tracef("outbound alive state changed")

View File

@ -58,12 +58,14 @@ func (c *ControlPlane) handleConn(lConn net.Conn) (err error) {
if outboundIndex < 0 || int(outboundIndex) >= len(c.outbounds) {
return fmt.Errorf("outbound id from bpf is out of range: %v not in [0, %v]", outboundIndex, len(c.outbounds)-1)
}
dialer, err := outbound.Select(consts.L4ProtoStr_TCP, consts.IpVersionFromAddr(dst.Addr()))
l4proto := consts.L4ProtoStr_TCP
ipversion := consts.IpVersionFromAddr(dst.Addr())
dialer, err := outbound.Select(l4proto, ipversion)
if err != nil {
return fmt.Errorf("failed to select dialer from group %v: %w", outbound.Name, err)
}
c.log.WithFields(logrus.Fields{
"l4proto": "TCP",
"network": string(l4proto) + string(ipversion),
"outbound": outbound.Name,
"dialer": dialer.Name(),
}).Infof("%v <-> %v", RefineSourceToShow(src, dst.Addr()), RefineAddrPortToShow(dst))

View File

@ -207,10 +207,9 @@ getNew:
// If the udp endpoint has been not alive, remove it from pool and get a new one.
if !isNew && !ue.Dialer.MustGetAlive(l4proto, ipversion) {
c.log.WithFields(logrus.Fields{
"src": src.String(),
"l4proto": l4proto,
"ipversion": ipversion,
"dialer": ue.Dialer.Name(),
"src": src.String(),
"network": string(l4proto) + string(ipversion),
"dialer": ue.Dialer.Name(),
}).Debugln("Old udp endpoint is not alive and removed")
_ = DefaultUdpEndpointPool.Remove(src, ue)
goto getNew
@ -223,7 +222,7 @@ getNew:
if isDns && c.log.IsLevelEnabled(logrus.DebugLevel) && len(dnsMessage.Questions) > 0 {
q := dnsMessage.Questions[0]
c.log.WithFields(logrus.Fields{
"l4proto": "UDP(DNS)",
"network": string(l4proto) + string(ipversion) + "(DNS)",
"outbound": outbound.Name,
"dialer": d.Name(),
"qname": strings.ToLower(q.Name.String()),
@ -234,7 +233,7 @@ getNew:
} else {
// TODO: Set-up ip to domain mapping and show domain if possible.
c.log.WithFields(logrus.Fields{
"l4proto": "UDP",
"network": string(l4proto) + string(ipversion),
"outbound": outbound.Name,
"dialer": d.Name(),
}).Infof("%v <-> %v",