fix: make udp packets be sent in order

This commit is contained in:
mzz2017 2024-06-15 14:36:11 +08:00
parent 78da4a20b7
commit 5de32db41b
6 changed files with 124 additions and 13 deletions

View File

@ -738,7 +738,15 @@ func (c *ControlPlane) Serve(readyChan chan<- bool, listener *Listener) (err err
copy(newBuf, buf[:n]) copy(newBuf, buf[:n])
newOob := pool.Get(oobn) newOob := pool.Get(oobn)
copy(newOob, oob[:oobn]) copy(newOob, oob[:oobn])
go func(data pool.PB, oob pool.PB, src netip.AddrPort) { newSrc := src
convergeSrc := common.ConvergeAddrPort(src)
// Debug:
// t := time.Now()
DefaultUdpTaskPool.EmitTask(convergeSrc.String(), func() {
data := newBuf
oob := newOob
src := newSrc
defer data.Put() defer data.Put()
defer oob.Put() defer oob.Put()
var realDst netip.AddrPort var realDst netip.AddrPort
@ -781,7 +789,10 @@ func (c *ControlPlane) Serve(readyChan chan<- bool, listener *Listener) (err err
if e := c.handlePkt(udpConn, data, common.ConvergeAddrPort(src), common.ConvergeAddrPort(pktDst), common.ConvergeAddrPort(realDst), routingResult, false); e != nil { if e := c.handlePkt(udpConn, data, common.ConvergeAddrPort(src), common.ConvergeAddrPort(pktDst), common.ConvergeAddrPort(realDst), routingResult, false); e != nil {
c.log.Warnln("handlePkt:", e) c.log.Warnln("handlePkt:", e)
} }
}(newBuf, newOob, src) })
// if d := time.Since(t); d > 100*time.Millisecond {
// logrus.Println(d)
// }
} }
}() }()
<-c.ctx.Done() <-c.ctx.Done()

View File

@ -33,10 +33,11 @@ const (
) )
type DialOption struct { type DialOption struct {
Target string Target string
Dialer *dialer.Dialer Dialer *dialer.Dialer
Outbound *ob.DialerGroup Outbound *ob.DialerGroup
Network string Network string
SniffedDomain string
} }
func ChooseNatTimeout(data []byte, sniffDns bool) (dmsg *dnsmessage.Msg, timeout time.Duration) { func ChooseNatTimeout(data []byte, sniffDns bool) (dmsg *dnsmessage.Msg, timeout time.Duration) {
@ -127,12 +128,13 @@ func (c *ControlPlane) handlePkt(lConn *net.UDPConn, data []byte, src, pktDst, r
// From localhost, so dst IP is src IP. // From localhost, so dst IP is src IP.
realSrc = netip.AddrPortFrom(pktDst.Addr(), src.Port()) realSrc = netip.AddrPortFrom(pktDst.Addr(), src.Port())
} }
ue, ueExists := DefaultUdpEndpointPool.Get(realSrc)
// To keep consistency with kernel program, we only sniff DNS request sent to 53. // To keep consistency with kernel program, we only sniff DNS request sent to 53.
dnsMessage, natTimeout := ChooseNatTimeout(data, realDst.Port() == 53) dnsMessage, natTimeout := ChooseNatTimeout(data, realDst.Port() == 53)
// We should cache DNS records and set record TTL to 0, in order to monitor the dns req and resp in real time. // We should cache DNS records and set record TTL to 0, in order to monitor the dns req and resp in real time.
isDns := dnsMessage != nil isDns := dnsMessage != nil
if !isDns && !skipSniffing && !DefaultUdpEndpointPool.Exists(realSrc) { if !isDns && !skipSniffing && !ueExists {
// Sniff Quic, ... // Sniff Quic, ...
key := PacketSnifferKey{ key := PacketSnifferKey{
LAddr: realSrc, LAddr: realSrc,
@ -202,7 +204,6 @@ func (c *ControlPlane) handlePkt(lConn *net.UDPConn, data []byte, src, pktDst, r
// However, games may not use QUIC for communication, thus we cannot use domain to dial, which is fine. // However, games may not use QUIC for communication, thus we cannot use domain to dial, which is fine.
// Get udp endpoint. // Get udp endpoint.
var ue *UdpEndpoint
retry := 0 retry := 0
networkType := &dialer.NetworkType{ networkType := &dialer.NetworkType{
L4Proto: consts.L4ProtoStr_UDP, L4Proto: consts.L4ProtoStr_UDP,

View File

@ -31,6 +31,10 @@ type UdpEndpoint struct {
Dialer *dialer.Dialer Dialer *dialer.Dialer
Outbound *outbound.DialerGroup Outbound *outbound.DialerGroup
// Non-empty indicates this UDP Endpoint is related with a sniffed domain.
SniffedDomain string
DialTarget string
} }
func (ue *UdpEndpoint) start() { func (ue *UdpEndpoint) start() {
@ -95,9 +99,12 @@ func (p *UdpEndpointPool) Remove(lAddr netip.AddrPort, udpEndpoint *UdpEndpoint)
return nil return nil
} }
func (p *UdpEndpointPool) Exists(lAddr netip.AddrPort) (ok bool) { func (p *UdpEndpointPool) Get(lAddr netip.AddrPort) (udpEndpoint *UdpEndpoint, ok bool) {
_, ok = p.pool.Load(lAddr) _ue, ok := p.pool.Load(lAddr)
return ok if !ok {
return nil, ok
}
return _ue.(*UdpEndpoint), ok
} }
func (p *UdpEndpointPool) GetOrCreate(lAddr netip.AddrPort, createOption *UdpEndpointOptions) (udpEndpoint *UdpEndpoint, isNew bool, err error) { func (p *UdpEndpointPool) GetOrCreate(lAddr netip.AddrPort, createOption *UdpEndpointOptions) (udpEndpoint *UdpEndpoint, isNew bool, err error) {
@ -146,6 +153,8 @@ begin:
NatTimeout: createOption.NatTimeout, NatTimeout: createOption.NatTimeout,
Dialer: dialOption.Dialer, Dialer: dialOption.Dialer,
Outbound: dialOption.Outbound, Outbound: dialOption.Outbound,
SniffedDomain: dialOption.SniffedDomain,
DialTarget: dialOption.Target,
} }
ue.deadlineTimer = time.AfterFunc(createOption.NatTimeout, func() { ue.deadlineTimer = time.AfterFunc(createOption.NatTimeout, func() {
if _ue, ok := p.pool.LoadAndDelete(lAddr); ok { if _ue, ok := p.pool.LoadAndDelete(lAddr); ok {

84
control/udp_task_pool.go Normal file
View File

@ -0,0 +1,84 @@
package control
import (
"sync"
"time"
)
const UdpTaskQueueLength = 128
type UdpTask = func()
type UdpTaskQueue struct {
ch chan UdpTask
timer *time.Timer
agingTime time.Duration
closed chan struct{}
freed chan struct{}
}
func (q *UdpTaskQueue) Push(task UdpTask) {
q.timer.Reset(q.agingTime)
q.ch <- task
}
type UdpTaskPool struct {
queueChPool sync.Pool
// mu protects m
mu sync.Mutex
m map[string]*UdpTaskQueue
}
func NewUdpTaskPool() *UdpTaskPool {
p := &UdpTaskPool{
queueChPool: sync.Pool{New: func() any {
return make(chan UdpTask, UdpTaskQueueLength)
}},
mu: sync.Mutex{},
m: map[string]*UdpTaskQueue{},
}
return p
}
func (p *UdpTaskPool) convoy(q *UdpTaskQueue) {
for {
select {
case <-q.closed:
close(q.freed)
return
case t := <-q.ch:
t()
}
}
}
func (p *UdpTaskPool) EmitTask(key string, task UdpTask) {
p.mu.Lock()
q, ok := p.m[key]
if !ok {
ch := p.queueChPool.Get().(chan UdpTask)
q = &UdpTaskQueue{
ch: ch,
timer: nil,
agingTime: DefaultNatTimeout,
closed: make(chan struct{}),
freed: make(chan struct{}),
}
q.timer = time.AfterFunc(q.agingTime, func() {
p.mu.Lock()
defer p.mu.Unlock()
if p.m[key] == q {
delete(p.m, key)
}
close(q.closed)
<-q.freed
p.queueChPool.Put(ch)
})
p.m[key] = q
go p.convoy(q)
}
p.mu.Unlock()
q.Push(task)
}
var DefaultUdpTaskPool = NewUdpTaskPool()

3
go.mod
View File

@ -36,7 +36,7 @@ require (
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
github.com/klauspost/compress v1.17.4 // indirect github.com/klauspost/compress v1.17.4 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect github.com/stretchr/testify v1.8.1 // indirect
go.uber.org/mock v0.4.0 // indirect go.uber.org/mock v0.4.0 // indirect
golang.org/x/mod v0.12.0 // indirect golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.20.0 // indirect golang.org/x/net v0.20.0 // indirect
@ -64,6 +64,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mzz2017/disk-bloom v1.0.1 // indirect github.com/mzz2017/disk-bloom v1.0.1 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/ginkgo v1.16.5 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb // indirect github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb // indirect
github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/pflag v1.0.5 // indirect
github.com/vishvananda/netns v0.0.4 // indirect github.com/vishvananda/netns v0.0.4 // indirect

7
go.sum
View File

@ -133,12 +133,17 @@ github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRM
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg= github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/v2rayA/ahocorasick-domain v0.0.0-20230218160829-122a074c48c8 h1:2Liq3JvM/acVQZ7Gq9U5PpznMzlFRPYMPQxC2yXSi74= github.com/v2rayA/ahocorasick-domain v0.0.0-20230218160829-122a074c48c8 h1:2Liq3JvM/acVQZ7Gq9U5PpznMzlFRPYMPQxC2yXSi74=