feat: latency-based failover (#119)

This commit is contained in:
mzz
2023-07-10 19:44:56 +08:00
committed by GitHub
parent 36dfdbac8b
commit 11d2ea945f
17 changed files with 257 additions and 64 deletions

View File

@ -7,6 +7,7 @@ package dialer
import (
"fmt"
"sort"
"strings"
"sync"
"time"
@ -40,6 +41,7 @@ type AliveDialerSet struct {
mu sync.Mutex
dialerToIndex map[*Dialer]int // *Dialer -> index of inorderedAliveDialerSet
dialerToLatency map[*Dialer]time.Duration
dialerToLatencyOffset map[*Dialer]time.Duration
inorderedAliveDialerSet []*Dialer
selectionPolicy consts.DialerSelectionPolicy
@ -53,9 +55,18 @@ func NewAliveDialerSet(
tolerance time.Duration,
selectionPolicy consts.DialerSelectionPolicy,
dialers []*Dialer,
dialersAnnotations []*Annotation,
aliveChangeCallback func(alive bool),
setAlive bool,
) *AliveDialerSet {
if len(dialers) != len(dialersAnnotations) {
panic(fmt.Sprintf("unmatched annotations length: %v dialers and %v annotations", len(dialers), len(dialersAnnotations)))
}
dialerToLatencyOffset := make(map[*Dialer]time.Duration)
for i := range dialers {
d, a := dialers[i], dialersAnnotations[i]
dialerToLatencyOffset[d] = a.AddLatency
}
a := &AliveDialerSet{
log: log,
dialerGroupName: dialerGroupName,
@ -64,6 +75,7 @@ func NewAliveDialerSet(
aliveChangeCallback: aliveChangeCallback,
dialerToIndex: make(map[*Dialer]int),
dialerToLatency: make(map[*Dialer]time.Duration),
dialerToLatencyOffset: dialerToLatencyOffset,
inorderedAliveDialerSet: make([]*Dialer, 0, len(dialers)),
selectionPolicy: selectionPolicy,
minLatency: minLatency{
@ -98,14 +110,49 @@ func (a *AliveDialerSet) GetMinLatency() (d *Dialer, latency time.Duration) {
func (a *AliveDialerSet) printLatencies() {
var builder strings.Builder
builder.WriteString(fmt.Sprintf("Group '%v' [%v]:\n", a.dialerGroupName, a.CheckTyp.String()))
var alive []*struct {
d *Dialer
l time.Duration
}
for _, d := range a.inorderedAliveDialerSet {
latency, ok := a.dialerToLatency[d]
if !ok {
continue
}
builder.WriteString(fmt.Sprintf("%v: %v\n", d.property.Name, latency.String()))
alive = append(alive, &struct {
d *Dialer
l time.Duration
}{d, latency})
}
a.log.Traceln(builder.String())
sort.SliceStable(alive, func(i, j int) bool {
return alive[i].l < alive[j].l
})
for i, dl := range alive {
builder.WriteString(fmt.Sprintf("%4d. %v: %v\n", i+1, dl.d.property.Name, a.latencyString(dl.d, dl.l)))
}
a.log.Infoln(strings.TrimSuffix(builder.String(), "\n"))
}
func (a *AliveDialerSet) offsetLatency(d *Dialer, latency time.Duration, reverse bool) time.Duration {
offset := a.dialerToLatencyOffset[d]
var result time.Duration
if !reverse {
result = latency + offset
} else {
result = latency - offset
}
epsilon := 1 * time.Nanosecond
if result < +epsilon {
return +epsilon
}
if result > Timeout-epsilon {
result = Timeout - epsilon
}
return result
}
func (a *AliveDialerSet) latencyString(d *Dialer, afterLatency time.Duration) string {
return latencyString(afterLatency, a.offsetLatency(d, afterLatency, true))
}
// NotifyLatencyChange should be invoked when dialer every time latency and alive state changes.
@ -113,6 +160,7 @@ func (a *AliveDialerSet) NotifyLatencyChange(dialer *Dialer, alive bool) {
a.mu.Lock()
defer a.mu.Unlock()
var (
rawLatency time.Duration
latency time.Duration
hasLatency bool
minPolicy bool
@ -120,16 +168,21 @@ func (a *AliveDialerSet) NotifyLatencyChange(dialer *Dialer, alive bool) {
switch a.selectionPolicy {
case consts.DialerSelectionPolicy_MinLastLatency:
latency, hasLatency = dialer.mustGetCollection(a.CheckTyp).Latencies10.LastLatency()
rawLatency, hasLatency = dialer.mustGetCollection(a.CheckTyp).Latencies10.LastLatency()
minPolicy = true
case consts.DialerSelectionPolicy_MinAverage10Latencies:
latency, hasLatency = dialer.mustGetCollection(a.CheckTyp).Latencies10.AvgLatency()
rawLatency, hasLatency = dialer.mustGetCollection(a.CheckTyp).Latencies10.AvgLatency()
minPolicy = true
case consts.DialerSelectionPolicy_MinMovingAverageLatencies:
latency = dialer.mustGetCollection(a.CheckTyp).MovingAverage
hasLatency = latency > 0
rawLatency = dialer.mustGetCollection(a.CheckTyp).MovingAverage
hasLatency = rawLatency > 0
minPolicy = true
}
if hasLatency {
latency = a.offsetLatency(dialer, rawLatency, false)
} else {
latency = rawLatency
}
if alive {
index := a.dialerToIndex[dialer]
@ -212,16 +265,14 @@ func (a *AliveDialerSet) NotifyLatencyChange(dialer *Dialer, alive bool) {
oldDialerName = bakOldBestDialer.property.Name
}
a.log.WithFields(logrus.Fields{
string(a.selectionPolicy): a.minLatency.latency,
string(a.selectionPolicy): a.latencyString(a.minLatency.dialer, a.minLatency.latency),
"_new_dialer": a.minLatency.dialer.property.Name,
"_old_dialer": oldDialerName,
"group": a.dialerGroupName,
"network": a.CheckTyp.String(),
"new_dialer": a.minLatency.dialer.property.Name,
"old_dialer": oldDialerName,
}).Infof("Group %vselects dialer", re)
if a.log.IsLevelEnabled(logrus.TraceLevel) {
a.printLatencies()
}
a.printLatencies()
} else {
// Alive -> not alive
defer a.aliveChangeCallback(false)

View File

@ -0,0 +1,36 @@
package dialer
import (
"fmt"
"time"
"github.com/daeuniverse/dae/pkg/config_parser"
)
const (
AnnotationKey_AddLatency = "add_latency"
)
type Annotation struct {
AddLatency time.Duration
}
func NewAnnotation(annotation []*config_parser.Param) (*Annotation, error) {
var anno Annotation
for _, param := range annotation {
switch param.Key {
case AnnotationKey_AddLatency:
latency, err := time.ParseDuration(param.Val)
if err != nil {
return nil, fmt.Errorf("incorrect latency format: %w", err)
}
// Only the first setting is valid.
if anno.AddLatency == 0 {
anno.AddLatency = latency
}
default:
return nil, fmt.Errorf("unknown filter annotation: %v", param.Key)
}
}
return &anno, nil
}

View File

@ -32,6 +32,8 @@ import (
"github.com/sirupsen/logrus"
)
const Timeout = 10 * time.Second
type NetworkType struct {
L4Proto consts.L4ProtoStr
IpVersion consts.IpVersionStr
@ -220,7 +222,7 @@ func (c *TcpCheckOptionRaw) Option() (opt *TcpCheckOption, err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.opt == nil {
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), Timeout)
defer cancel()
ctx = context.WithValue(ctx, "logger", c.Log)
tcpCheckOption, err := ParseTcpCheckOption(ctx, c.Raw, c.Method, c.ResolverNetwork)
@ -244,7 +246,7 @@ func (c *CheckDnsOptionRaw) Option() (opt *CheckDnsOption, err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.opt == nil {
ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.TODO(), Timeout)
defer cancel()
udpCheckOption, err := ParseCheckDnsOption(ctx, c.Raw, c.ResolverNetwork)
if err != nil {
@ -263,15 +265,15 @@ type CheckOption struct {
func (d *Dialer) ActivateCheck() {
d.tickerMu.Lock()
defer d.tickerMu.Unlock()
if d.instanceOption.CheckEnabled {
if d.InstanceOption.CheckEnabled {
return
}
d.instanceOption.CheckEnabled = true
d.InstanceOption.CheckEnabled = true
go d.aliveBackground()
}
func (d *Dialer) aliveBackground() {
timeout := 10 * time.Second
timeout := Timeout
cycle := d.CheckInterval
var tcpSomark uint32
if network, err := netproxy.ParseMagicNetwork(d.TcpCheckOptionRaw.ResolverNetwork); err == nil {
@ -444,15 +446,17 @@ func (d *Dialer) aliveBackground() {
}()
var wg sync.WaitGroup
for range d.checkCh {
// No need to test if there is no dialer selection policy using its latency.
for _, opt := range CheckOpts {
if len(d.mustGetCollection(opt.networkType).AliveDialerSetSet) > 0 {
wg.Add(1)
go func(opt *CheckOption) {
d.Check(timeout, opt)
wg.Done()
}(opt)
// No need to test if there is no dialer selection policy using its latency.
if len(d.mustGetCollection(opt.networkType).AliveDialerSetSet) == 0 {
continue
}
wg.Add(1)
go func(opt *CheckOption) {
_, _ = d.Check(timeout, opt)
wg.Done()
}(opt)
}
// Wait to block the loop.
wg.Wait()
@ -521,7 +525,7 @@ func (d *Dialer) Check(timeout time.Duration,
d.Log.WithFields(logrus.Fields{
"network": opts.networkType.String(),
"node": d.property.Name,
"last": latency.Truncate(time.Millisecond),
"last": latency.Truncate(time.Millisecond).String(),
"avg_10": avg.Truncate(time.Millisecond),
"mov_avg": collection.MovingAverage.Truncate(time.Millisecond),
}).Debugln("Connectivity Check")

View File

@ -17,7 +17,7 @@ var (
type Dialer struct {
*GlobalOption
instanceOption InstanceOption
InstanceOption InstanceOption
netproxy.Dialer
property Property
@ -65,7 +65,7 @@ func NewDialer(dialer netproxy.Dialer, option *GlobalOption, iOption InstanceOpt
ctx, cancel := context.WithCancel(context.Background())
d := &Dialer{
GlobalOption: option,
instanceOption: iOption,
InstanceOption: iOption,
Dialer: dialer,
property: property,
collectionFineMu: sync.Mutex{},

View File

@ -0,0 +1,10 @@
package dialer
import "time"
func latencyString(latencyAfterOffset, latencyBeforeOffset time.Duration) string {
if latencyBeforeOffset == latencyAfterOffset {
return latencyAfterOffset.Truncate(time.Millisecond).String()
}
return latencyAfterOffset.Truncate(time.Millisecond).String() + "(" + latencyBeforeOffset.Truncate(time.Millisecond).String() + ")"
}

View File

@ -31,7 +31,14 @@ type DialerGroup struct {
selectionPolicy *DialerSelectionPolicy
}
func NewDialerGroup(option *dialer.GlobalOption, name string, dialers []*dialer.Dialer, p DialerSelectionPolicy, aliveChangeCallback func(alive bool, networkType *dialer.NetworkType, isInit bool)) *DialerGroup {
func NewDialerGroup(
option *dialer.GlobalOption,
name string,
dialers []*dialer.Dialer,
dialersAnnotations []*dialer.Annotation,
p DialerSelectionPolicy,
aliveChangeCallback func(alive bool, networkType *dialer.NetworkType, isInit bool),
) *DialerGroup {
log := option.Log
var aliveDnsTcp4DialerSet *dialer.AliveDialerSet
var aliveDnsTcp6DialerSet *dialer.AliveDialerSet
@ -65,7 +72,7 @@ func NewDialerGroup(option *dialer.GlobalOption, name string, dialers []*dialer.
}
if needAliveState {
aliveTcp4DialerSet = dialer.NewAliveDialerSet(
log, name, networkType, option.CheckTolerance, p.Policy, dialers,
log, name, networkType, option.CheckTolerance, p.Policy, dialers, dialersAnnotations,
func(networkType *dialer.NetworkType) func(alive bool) {
// Use the trick to copy a pointer of *dialer.NetworkType.
return func(alive bool) { aliveChangeCallback(alive, networkType, false) }
@ -80,7 +87,7 @@ func NewDialerGroup(option *dialer.GlobalOption, name string, dialers []*dialer.
}
if needAliveState {
aliveTcp6DialerSet = dialer.NewAliveDialerSet(
log, name, networkType, option.CheckTolerance, p.Policy, dialers,
log, name, networkType, option.CheckTolerance, p.Policy, dialers, dialersAnnotations,
func(networkType *dialer.NetworkType) func(alive bool) {
// Use the trick to copy a pointer of *dialer.NetworkType.
return func(alive bool) { aliveChangeCallback(alive, networkType, false) }
@ -95,7 +102,7 @@ func NewDialerGroup(option *dialer.GlobalOption, name string, dialers []*dialer.
}
if needAliveState {
aliveDnsUdp4DialerSet = dialer.NewAliveDialerSet(
log, name, networkType, option.CheckTolerance, p.Policy, dialers,
log, name, networkType, option.CheckTolerance, p.Policy, dialers, dialersAnnotations,
func(networkType *dialer.NetworkType) func(alive bool) {
// Use the trick to copy a pointer of *dialer.NetworkType.
return func(alive bool) { aliveChangeCallback(alive, networkType, false) }
@ -110,7 +117,7 @@ func NewDialerGroup(option *dialer.GlobalOption, name string, dialers []*dialer.
}
if needAliveState {
aliveDnsUdp6DialerSet = dialer.NewAliveDialerSet(
log, name, networkType, option.CheckTolerance, p.Policy, dialers,
log, name, networkType, option.CheckTolerance, p.Policy, dialers, dialersAnnotations,
func(networkType *dialer.NetworkType) func(alive bool) {
// Use the trick to copy a pointer of *dialer.NetworkType.
return func(alive bool) { aliveChangeCallback(alive, networkType, false) }
@ -123,13 +130,13 @@ func NewDialerGroup(option *dialer.GlobalOption, name string, dialers []*dialer.
L4Proto: consts.L4ProtoStr_TCP,
IpVersion: consts.IpVersionStr_4,
IsDns: true,
}, option.CheckTolerance, p.Policy, dialers, func(alive bool) {}, true)
}, option.CheckTolerance, p.Policy, dialers, dialersAnnotations, func(alive bool) {}, true)
aliveDnsTcp6DialerSet = dialer.NewAliveDialerSet(log, name, &dialer.NetworkType{
L4Proto: consts.L4ProtoStr_TCP,
IpVersion: consts.IpVersionStr_6,
IsDns: true,
}, option.CheckTolerance, p.Policy, dialers, func(alive bool) {}, true)
}, option.CheckTolerance, p.Policy, dialers, dialersAnnotations, func(alive bool) {}, true)
}
for _, d := range dialers {

View File

@ -125,17 +125,37 @@ func (s *DialerSet) filterHit(dialer *dialer.Dialer, filters []*config_parser.Fu
return true, nil
}
func (s *DialerSet) Filter(filters []*config_parser.Function) (dialers []*dialer.Dialer, err error) {
for _, d := range s.dialers {
hit, err := s.filterHit(d, filters)
if err != nil {
return nil, err
func (s *DialerSet) FilterAndAnnotate(filters [][]*config_parser.Function, annotations [][]*config_parser.Param) (dialers []*dialer.Dialer, filterAnnotations []*dialer.Annotation, err error) {
if len(filters) != len(annotations) {
return nil, nil, fmt.Errorf("[CODE BUG]: unmatched annotations length: %v filters and %v annotations", len(filters), len(annotations))
}
if len(filters) == 0 {
anno := make([]*dialer.Annotation, len(s.dialers))
for i := range anno {
anno[i] = &dialer.Annotation{}
}
if hit {
dialers = append(dialers, d)
return s.dialers, anno, nil
}
nextDialerLoop:
for _, d := range s.dialers {
// Hit any.
for j, f := range filters {
hit, err := s.filterHit(d, f)
if err != nil {
return nil, nil, err
}
if hit {
anno, err := dialer.NewAnnotation(annotations[j])
if err != nil {
return nil, nil, fmt.Errorf("apply filter annotation: %w", err)
}
dialers = append(dialers, d)
filterAnnotations = append(filterAnnotations, anno)
continue nextDialerLoop
}
}
}
return dialers, nil
return dialers, filterAnnotations, nil
}
func (s *DialerSet) Close() error {