mirror of
https://github.com/daeuniverse/dae.git
synced 2025-01-03 13:31:00 +07:00
feat: support assist connectivity check (udp-dns) by dns module (#543)
Co-authored-by: Sumire (菫) <151038614+sumire88@users.noreply.github.com>
This commit is contained in:
parent
93e47ffe88
commit
76a4fe9be7
@ -280,7 +280,6 @@ func (d *Dialer) ActivateCheck() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *Dialer) aliveBackground() {
|
func (d *Dialer) aliveBackground() {
|
||||||
timeout := Timeout
|
|
||||||
cycle := d.CheckInterval
|
cycle := d.CheckInterval
|
||||||
var tcpSomark uint32
|
var tcpSomark uint32
|
||||||
if network, err := netproxy.ParseMagicNetwork(d.TcpCheckOptionRaw.ResolverNetwork); err == nil {
|
if network, err := netproxy.ParseMagicNetwork(d.TcpCheckOptionRaw.ResolverNetwork); err == nil {
|
||||||
@ -461,7 +460,7 @@ func (d *Dialer) aliveBackground() {
|
|||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(opt *CheckOption) {
|
go func(opt *CheckOption) {
|
||||||
_, _ = d.Check(timeout, opt)
|
_, _ = d.Check(opt)
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}(opt)
|
}(opt)
|
||||||
}
|
}
|
||||||
@ -513,10 +512,48 @@ func (d *Dialer) UnregisterAliveDialerSet(a *AliveDialerSet) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Dialer) Check(timeout time.Duration,
|
func (d *Dialer) logUnavailable(
|
||||||
opts *CheckOption,
|
collection *collection,
|
||||||
) (ok bool, err error) {
|
network *NetworkType,
|
||||||
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
|
err error,
|
||||||
|
) {
|
||||||
|
// Append timeout if there is any error or unexpected status code.
|
||||||
|
if err != nil {
|
||||||
|
if strings.HasSuffix(err.Error(), "network is unreachable") {
|
||||||
|
err = fmt.Errorf("network is unreachable")
|
||||||
|
} else if strings.HasSuffix(err.Error(), "no suitable address found") ||
|
||||||
|
strings.HasSuffix(err.Error(), "non-IPv4 address") {
|
||||||
|
err = fmt.Errorf("IPv%v is not supported", network.IpVersion)
|
||||||
|
}
|
||||||
|
d.Log.WithFields(logrus.Fields{
|
||||||
|
"network": network.String(),
|
||||||
|
"node": d.property.Name,
|
||||||
|
"err": err.Error(),
|
||||||
|
}).Debugln("Connectivity Check Failed")
|
||||||
|
}
|
||||||
|
collection.Latencies10.AppendLatency(Timeout)
|
||||||
|
collection.MovingAverage = (collection.MovingAverage + Timeout) / 2
|
||||||
|
collection.Alive = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Dialer) informDialerGroupUpdate(collection *collection) {
|
||||||
|
// Inform DialerGroups to update state.
|
||||||
|
// We use lock because AliveDialerSetSet is a reference of that in collection.
|
||||||
|
d.collectionFineMu.Lock()
|
||||||
|
for a := range collection.AliveDialerSetSet {
|
||||||
|
a.NotifyLatencyChange(d, collection.Alive)
|
||||||
|
}
|
||||||
|
d.collectionFineMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Dialer) ReportUnavailable(typ *NetworkType, err error) {
|
||||||
|
collection := d.mustGetCollection(typ)
|
||||||
|
d.logUnavailable(collection, typ, err)
|
||||||
|
d.informDialerGroupUpdate(collection)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Dialer) Check(opts *CheckOption) (ok bool, err error) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.TODO(), Timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
// Calc latency.
|
// Calc latency.
|
||||||
@ -537,31 +574,9 @@ func (d *Dialer) Check(timeout time.Duration,
|
|||||||
"mov_avg": collection.MovingAverage.Truncate(time.Millisecond),
|
"mov_avg": collection.MovingAverage.Truncate(time.Millisecond),
|
||||||
}).Debugln("Connectivity Check")
|
}).Debugln("Connectivity Check")
|
||||||
} else {
|
} else {
|
||||||
// Append timeout if there is any error or unexpected status code.
|
d.logUnavailable(collection, opts.networkType, err)
|
||||||
if err != nil {
|
|
||||||
if strings.HasSuffix(err.Error(), "network is unreachable") {
|
|
||||||
err = fmt.Errorf("network is unreachable")
|
|
||||||
} else if strings.HasSuffix(err.Error(), "no suitable address found") ||
|
|
||||||
strings.HasSuffix(err.Error(), "non-IPv4 address") {
|
|
||||||
err = fmt.Errorf("IPv%v is not supported", opts.networkType.IpVersion)
|
|
||||||
}
|
}
|
||||||
d.Log.WithFields(logrus.Fields{
|
d.informDialerGroupUpdate(collection)
|
||||||
"network": opts.networkType.String(),
|
|
||||||
"node": d.property.Name,
|
|
||||||
"err": err.Error(),
|
|
||||||
}).Debugln("Connectivity Check Failed")
|
|
||||||
}
|
|
||||||
collection.Latencies10.AppendLatency(timeout)
|
|
||||||
collection.MovingAverage = (collection.MovingAverage + timeout) / 2
|
|
||||||
collection.Alive = false
|
|
||||||
}
|
|
||||||
// Inform DialerGroups to update state.
|
|
||||||
// We use lock because AliveDialerSetSet is a reference of that in collection.
|
|
||||||
d.collectionFineMu.Lock()
|
|
||||||
for a := range collection.AliveDialerSetSet {
|
|
||||||
a.NotifyLatencyChange(d, collection.Alive)
|
|
||||||
}
|
|
||||||
d.collectionFineMu.Unlock()
|
|
||||||
return ok, err
|
return ok, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -444,6 +444,13 @@ func NewControlPlane(
|
|||||||
}, nil
|
}, nil
|
||||||
},
|
},
|
||||||
BestDialerChooser: plane.chooseBestDnsDialer,
|
BestDialerChooser: plane.chooseBestDnsDialer,
|
||||||
|
TimeoutExceedCallback: func(dialArgument *dialArgument, err error) {
|
||||||
|
dialArgument.bestDialer.ReportUnavailable(&dialer.NetworkType{
|
||||||
|
L4Proto: dialArgument.l4proto,
|
||||||
|
IpVersion: dialArgument.ipversion,
|
||||||
|
IsDns: true,
|
||||||
|
}, err)
|
||||||
|
},
|
||||||
IpVersionPrefer: dnsConfig.IpVersionPrefer,
|
IpVersionPrefer: dnsConfig.IpVersionPrefer,
|
||||||
FixedDomainTtl: fixedDomainTtl,
|
FixedDomainTtl: fixedDomainTtl,
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
|
@ -61,6 +61,7 @@ type DnsControllerOption struct {
|
|||||||
CacheRemoveCallback func(cache *DnsCache) (err error)
|
CacheRemoveCallback func(cache *DnsCache) (err error)
|
||||||
NewCache func(fqdn string, answers []dnsmessage.RR, deadline time.Time, originalDeadline time.Time) (cache *DnsCache, err error)
|
NewCache func(fqdn string, answers []dnsmessage.RR, deadline time.Time, originalDeadline time.Time) (cache *DnsCache, err error)
|
||||||
BestDialerChooser func(req *udpRequest, upstream *dns.Upstream) (*dialArgument, error)
|
BestDialerChooser func(req *udpRequest, upstream *dns.Upstream) (*dialArgument, error)
|
||||||
|
TimeoutExceedCallback func(dialArgument *dialArgument, err error)
|
||||||
IpVersionPrefer int
|
IpVersionPrefer int
|
||||||
FixedDomainTtl map[string]int
|
FixedDomainTtl map[string]int
|
||||||
}
|
}
|
||||||
@ -76,6 +77,8 @@ type DnsController struct {
|
|||||||
cacheRemoveCallback func(cache *DnsCache) (err error)
|
cacheRemoveCallback func(cache *DnsCache) (err error)
|
||||||
newCache func(fqdn string, answers []dnsmessage.RR, deadline time.Time, originalDeadline time.Time) (cache *DnsCache, err error)
|
newCache func(fqdn string, answers []dnsmessage.RR, deadline time.Time, originalDeadline time.Time) (cache *DnsCache, err error)
|
||||||
bestDialerChooser func(req *udpRequest, upstream *dns.Upstream) (*dialArgument, error)
|
bestDialerChooser func(req *udpRequest, upstream *dns.Upstream) (*dialArgument, error)
|
||||||
|
// timeoutExceedCallback is used to report this dialer is broken for the NetworkType
|
||||||
|
timeoutExceedCallback func(dialArgument *dialArgument, err error)
|
||||||
|
|
||||||
fixedDomainTtl map[string]int
|
fixedDomainTtl map[string]int
|
||||||
// mutex protects the dnsCache.
|
// mutex protects the dnsCache.
|
||||||
@ -112,6 +115,7 @@ func NewDnsController(routing *dns.Dns, option *DnsControllerOption) (c *DnsCont
|
|||||||
cacheRemoveCallback: option.CacheRemoveCallback,
|
cacheRemoveCallback: option.CacheRemoveCallback,
|
||||||
newCache: option.NewCache,
|
newCache: option.NewCache,
|
||||||
bestDialerChooser: option.BestDialerChooser,
|
bestDialerChooser: option.BestDialerChooser,
|
||||||
|
timeoutExceedCallback: option.TimeoutExceedCallback,
|
||||||
|
|
||||||
fixedDomainTtl: option.FixedDomainTtl,
|
fixedDomainTtl: option.FixedDomainTtl,
|
||||||
dnsCacheMu: sync.Mutex{},
|
dnsCacheMu: sync.Mutex{},
|
||||||
@ -578,8 +582,9 @@ func (c *DnsController) dialSend(invokingDepth int, req *udpRequest, data []byte
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_ = conn.SetDeadline(time.Now().Add(5 * time.Second))
|
timeout := 5 * time.Second
|
||||||
dnsReqCtx, cancelDnsReqCtx := context.WithTimeout(context.TODO(), 5*time.Second)
|
_ = conn.SetDeadline(time.Now().Add(timeout))
|
||||||
|
dnsReqCtx, cancelDnsReqCtx := context.WithTimeout(context.TODO(), timeout)
|
||||||
defer cancelDnsReqCtx()
|
defer cancelDnsReqCtx()
|
||||||
go func() {
|
go func() {
|
||||||
// Send DNS request every seconds.
|
// Send DNS request every seconds.
|
||||||
@ -613,6 +618,9 @@ func (c *DnsController) dialSend(invokingDepth int, req *udpRequest, data []byte
|
|||||||
// Wait for response.
|
// Wait for response.
|
||||||
n, err := conn.Read(respBuf)
|
n, err := conn.Read(respBuf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if c.timeoutExceedCallback != nil {
|
||||||
|
c.timeoutExceedCallback(dialArgument, err)
|
||||||
|
}
|
||||||
return fmt.Errorf("failed to read from: %v (dialer: %v): %w", dialArgument.bestTarget, dialArgument.bestDialer.Property().Name, err)
|
return fmt.Errorf("failed to read from: %v (dialer: %v): %w", dialArgument.bestTarget, dialArgument.bestDialer.Property().Name, err)
|
||||||
}
|
}
|
||||||
var msg dnsmessage.Msg
|
var msg dnsmessage.Msg
|
||||||
|
Loading…
Reference in New Issue
Block a user