dae/component/outbound/dialer/alive_dialer_set.go

196 lines
5.6 KiB
Go
Raw Normal View History

2023-01-23 18:54:21 +07:00
/*
* SPDX-License-Identifier: AGPL-3.0-only
2023-01-28 12:56:06 +07:00
* Copyright (c) since 2023, v2rayA Organization <team@v2raya.org>
2023-01-23 18:54:21 +07:00
*/
package dialer
import (
"github.com/mzz2017/softwind/pkg/fastrand"
"github.com/sirupsen/logrus"
2023-01-28 14:47:43 +07:00
"github.com/v2rayA/dae/common/consts"
2023-01-23 18:54:21 +07:00
"sync"
"time"
)
type minLatency struct {
latency time.Duration
dialer *Dialer
}
// AliveDialerSet assumes mapping between index and dialer MUST remain unchanged.
//
// It is thread-safe.
type AliveDialerSet struct {
2023-01-28 14:47:43 +07:00
log *logrus.Logger
dialerGroupName string
l4proto consts.L4ProtoStr
ipversion consts.IpVersionStr
2023-01-23 18:54:21 +07:00
aliveChangeCallback func(alive bool)
2023-01-23 18:54:21 +07:00
mu sync.Mutex
dialerToIndex map[*Dialer]int // *Dialer -> index of inorderedAliveDialerSet
dialerToLatency map[*Dialer]time.Duration
inorderedAliveDialerSet []*Dialer
selectionPolicy consts.DialerSelectionPolicy
minLatency minLatency
}
func NewAliveDialerSet(
log *logrus.Logger,
2023-01-28 14:47:43 +07:00
dialerGroupName string,
l4proto consts.L4ProtoStr,
ipversion consts.IpVersionStr,
2023-01-23 18:54:21 +07:00
selectionPolicy consts.DialerSelectionPolicy,
dialers []*Dialer,
aliveChangeCallback func(alive bool),
2023-01-23 18:54:21 +07:00
setAlive bool,
) *AliveDialerSet {
a := &AliveDialerSet{
log: log,
2023-01-28 14:47:43 +07:00
dialerGroupName: dialerGroupName,
l4proto: l4proto,
ipversion: ipversion,
aliveChangeCallback: aliveChangeCallback,
2023-01-23 18:54:21 +07:00
dialerToIndex: make(map[*Dialer]int),
dialerToLatency: make(map[*Dialer]time.Duration),
inorderedAliveDialerSet: make([]*Dialer, 0, len(dialers)),
selectionPolicy: selectionPolicy,
minLatency: minLatency{
// Initiate the latency with a very big value.
latency: time.Hour,
},
}
for _, d := range dialers {
a.dialerToIndex[d] = -1
}
for _, d := range dialers {
a.SetAlive(d, setAlive)
}
return a
}
func (a *AliveDialerSet) GetRand() *Dialer {
a.mu.Lock()
defer a.mu.Unlock()
if len(a.inorderedAliveDialerSet) == 0 {
return nil
}
ind := fastrand.Intn(len(a.inorderedAliveDialerSet))
return a.inorderedAliveDialerSet[ind]
}
// GetMinLatency acquires correct selectionPolicy.
func (a *AliveDialerSet) GetMinLatency() *Dialer {
return a.minLatency.dialer
}
// SetAlive should be invoked when dialer every time latency and alive state changes.
func (a *AliveDialerSet) SetAlive(dialer *Dialer, alive bool) {
a.mu.Lock()
defer a.mu.Unlock()
var (
latency time.Duration
hasLatency bool
2023-01-28 14:59:53 +07:00
minPolicy bool
2023-01-23 18:54:21 +07:00
)
switch a.selectionPolicy {
case consts.DialerSelectionPolicy_MinLastLatency:
latency, hasLatency = dialer.MustGetLatencies10(a.l4proto, a.ipversion).LastLatency()
2023-01-28 14:59:53 +07:00
minPolicy = true
2023-01-23 18:54:21 +07:00
case consts.DialerSelectionPolicy_MinAverage10Latencies:
latency, hasLatency = dialer.MustGetLatencies10(a.l4proto, a.ipversion).AvgLatency()
2023-01-28 14:59:53 +07:00
minPolicy = true
2023-01-23 18:54:21 +07:00
}
if alive {
index := a.dialerToIndex[dialer]
if index >= 0 {
// This dialer is already alive.
} else {
// Not alive -> alive.
defer a.aliveChangeCallback(true)
2023-01-23 18:54:21 +07:00
a.dialerToIndex[dialer] = len(a.inorderedAliveDialerSet)
a.inorderedAliveDialerSet = append(a.inorderedAliveDialerSet, dialer)
}
} else {
index := a.dialerToIndex[dialer]
if index >= 0 {
// Alive -> not alive.
defer a.aliveChangeCallback(false)
2023-01-23 18:54:21 +07:00
// Remove the dialer from inorderedAliveDialerSet.
if index >= len(a.inorderedAliveDialerSet) {
a.log.Panicf("index:%v >= len(a.inorderedAliveDialerSet):%v", index, len(a.inorderedAliveDialerSet))
}
a.dialerToIndex[dialer] = -1
if index < len(a.inorderedAliveDialerSet)-1 {
// Swap this element with the last element.
dialerToSwap := a.inorderedAliveDialerSet[len(a.inorderedAliveDialerSet)-1]
if dialer == dialerToSwap {
a.log.Panicf("dialer[%p] == dialerToSwap[%p]", dialer, dialerToSwap)
}
a.dialerToIndex[dialerToSwap] = index
a.inorderedAliveDialerSet[index], a.inorderedAliveDialerSet[len(a.inorderedAliveDialerSet)-1] =
a.inorderedAliveDialerSet[len(a.inorderedAliveDialerSet)-1], a.inorderedAliveDialerSet[index]
}
// Pop the last element.
a.inorderedAliveDialerSet = a.inorderedAliveDialerSet[:len(a.inorderedAliveDialerSet)-1]
} else {
// This dialer is already not alive.
}
}
2023-01-28 14:59:53 +07:00
2023-01-23 18:54:21 +07:00
if hasLatency {
2023-01-28 14:59:53 +07:00
oldBestDialer := a.minLatency.dialer
2023-01-28 14:47:43 +07:00
// Calc minLatency.
2023-01-23 18:54:21 +07:00
a.dialerToLatency[dialer] = latency
if latency < a.minLatency.latency {
a.minLatency.latency = latency
a.minLatency.dialer = dialer
} else if a.minLatency.dialer == dialer {
a.minLatency.latency = time.Hour
a.minLatency.dialer = nil
a.calcMinLatency()
}
2023-01-28 14:47:43 +07:00
if a.minLatency.dialer != oldBestDialer {
2023-01-29 10:19:58 +07:00
if a.minLatency.dialer != nil {
2023-02-04 14:02:44 +07:00
a.log.WithFields(logrus.Fields{
string(a.selectionPolicy): a.minLatency.latency,
"group": a.dialerGroupName,
"l4proto": a.l4proto,
2023-02-04 14:02:44 +07:00
"dialer": a.minLatency.dialer.Name(),
}).Infof("Group re-selects dialer")
2023-01-30 14:50:55 +07:00
} else {
2023-02-04 14:02:44 +07:00
a.log.WithFields(logrus.Fields{
"group": a.dialerGroupName,
"l4proto": a.l4proto,
2023-02-04 14:02:44 +07:00
}).Infof("Group has no dialer alive")
2023-01-29 10:19:58 +07:00
}
2023-01-28 14:47:43 +07:00
}
} else {
2023-01-28 14:59:53 +07:00
if alive && minPolicy && a.minLatency.dialer == nil {
2023-02-04 14:02:44 +07:00
// Use first dialer if no dialer has alive state (usually happen at the very beginning).
2023-01-28 14:47:43 +07:00
a.minLatency.dialer = dialer
2023-02-04 14:02:44 +07:00
a.log.WithFields(logrus.Fields{
"group": a.dialerGroupName,
"l4proto": a.l4proto,
"dialer": a.minLatency.dialer.Name(),
}).Infof("Group selects dialer")
2023-01-28 14:47:43 +07:00
}
2023-01-23 18:54:21 +07:00
}
}
func (a *AliveDialerSet) calcMinLatency() {
for _, d := range a.inorderedAliveDialerSet {
latency := a.dialerToLatency[d]
if latency < a.minLatency.latency {
a.minLatency.latency = latency
a.minLatency.dialer = d
}
}
}