mirror of
https://github.com/daeuniverse/dae.git
synced 2024-12-22 15:54:42 +07:00
fix: crash on openwrt (#640)
Co-authored-by: dae-prow[bot] <136105375+dae-prow[bot]@users.noreply.github.com>
This commit is contained in:
parent
ff62fae5c5
commit
cec5e71d4e
@ -22,11 +22,14 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
var (
|
||||
DefaultNatTimeout = 3 * time.Minute
|
||||
DnsNatTimeout = 17 * time.Second // RFC 5452
|
||||
AnyfromTimeout = 5 * time.Second // Do not cache too long.
|
||||
MaxRetry = 2
|
||||
)
|
||||
|
||||
const (
|
||||
DnsNatTimeout = 17 * time.Second // RFC 5452
|
||||
AnyfromTimeout = 5 * time.Second // Do not cache too long.
|
||||
MaxRetry = 2
|
||||
)
|
||||
|
||||
type DialOption struct {
|
||||
|
@ -2,19 +2,26 @@ package control
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/panjf2000/ants"
|
||||
)
|
||||
|
||||
var isTest = false
|
||||
|
||||
const UdpTaskQueueLength = 128
|
||||
|
||||
type UdpTask = func()
|
||||
|
||||
// UdpTaskQueue make sure packets with the same key (4 tuples) will be sent in order.
|
||||
type UdpTaskQueue struct {
|
||||
key string
|
||||
p *UdpTaskPool
|
||||
ch chan UdpTask
|
||||
timer *time.Timer
|
||||
agingTime time.Duration
|
||||
closed chan struct{}
|
||||
closed atomic.Bool
|
||||
freed chan struct{}
|
||||
}
|
||||
|
||||
@ -23,6 +30,30 @@ func (q *UdpTaskQueue) Push(task UdpTask) {
|
||||
q.ch <- task
|
||||
}
|
||||
|
||||
func (q *UdpTaskQueue) convoy() {
|
||||
for {
|
||||
if q.closed.Load() {
|
||||
clearloop:
|
||||
for {
|
||||
select {
|
||||
case t := <-q.ch:
|
||||
// Emit it back due to closed q.
|
||||
ReemitWorkers.Submit(func() {
|
||||
q.p.EmitTask(q.key, t)
|
||||
})
|
||||
default:
|
||||
break clearloop
|
||||
}
|
||||
}
|
||||
close(q.freed)
|
||||
return
|
||||
} else {
|
||||
t := <-q.ch
|
||||
t()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type UdpTaskPool struct {
|
||||
queueChPool sync.Pool
|
||||
// mu protects m
|
||||
@ -41,28 +72,7 @@ func NewUdpTaskPool() *UdpTaskPool {
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *UdpTaskPool) convoy(q *UdpTaskQueue) {
|
||||
for {
|
||||
select {
|
||||
case <-q.closed:
|
||||
clearloop:
|
||||
for {
|
||||
select {
|
||||
case t := <-q.ch:
|
||||
// Emit it back due to closed q.
|
||||
p.EmitTask(q.key, t)
|
||||
default:
|
||||
break clearloop
|
||||
}
|
||||
}
|
||||
close(q.freed)
|
||||
return
|
||||
case t := <-q.ch:
|
||||
t()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// EmitTask: Make sure packets with the same key (4 tuples) will be sent in order.
|
||||
func (p *UdpTaskPool) EmitTask(key string, task UdpTask) {
|
||||
p.mu.Lock()
|
||||
q, ok := p.m[key]
|
||||
@ -70,33 +80,47 @@ func (p *UdpTaskPool) EmitTask(key string, task UdpTask) {
|
||||
ch := p.queueChPool.Get().(chan UdpTask)
|
||||
q = &UdpTaskQueue{
|
||||
key: key,
|
||||
p: p,
|
||||
ch: ch,
|
||||
timer: nil,
|
||||
agingTime: DefaultNatTimeout,
|
||||
closed: make(chan struct{}),
|
||||
closed: atomic.Bool{},
|
||||
freed: make(chan struct{}),
|
||||
}
|
||||
q.timer = time.AfterFunc(q.agingTime, func() {
|
||||
// This func may be invoked twice due to concurrent Reset.
|
||||
select {
|
||||
case <-q.closed:
|
||||
if !q.closed.CompareAndSwap(false, true) {
|
||||
return
|
||||
default:
|
||||
}
|
||||
if isTest {
|
||||
time.Sleep(3 * time.Microsecond)
|
||||
}
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if p.m[key] == q {
|
||||
delete(p.m, key)
|
||||
}
|
||||
close(q.closed)
|
||||
// Trigger next loop in func convoy
|
||||
q.ch <- func() {}
|
||||
<-q.freed
|
||||
p.queueChPool.Put(ch)
|
||||
})
|
||||
p.m[key] = q
|
||||
go p.convoy(q)
|
||||
go q.convoy()
|
||||
}
|
||||
p.mu.Unlock()
|
||||
q.Push(task)
|
||||
}
|
||||
|
||||
var DefaultUdpTaskPool = NewUdpTaskPool()
|
||||
var (
|
||||
DefaultUdpTaskPool = NewUdpTaskPool()
|
||||
ReemitWorkers *ants.Pool
|
||||
)
|
||||
|
||||
func init() {
|
||||
var err error
|
||||
ReemitWorkers, err = ants.NewPool(UdpTaskQueueLength/2, ants.WithExpiryDuration(AnyfromTimeout))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
31
control/udp_task_pool_test.go
Normal file
31
control/udp_task_pool_test.go
Normal file
@ -0,0 +1,31 @@
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
* Copyright (c) 2022-2024, daeuniverse Organization <dae@v2raya.org>
|
||||
*/
|
||||
|
||||
package control
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/shirou/gopsutil/v4/cpu"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestUdpTaskPool(t *testing.T) {
|
||||
isTest = true
|
||||
c, err := cpu.Times(false)
|
||||
require.NoError(t, err)
|
||||
t.Log(c)
|
||||
DefaultNatTimeout = 1000 * time.Microsecond
|
||||
for i := 0; i < 100; i++ {
|
||||
DefaultUdpTaskPool.EmitTask("testkey", func() {
|
||||
})
|
||||
time.Sleep(99 * time.Microsecond)
|
||||
}
|
||||
time.Sleep(5 * time.Second)
|
||||
c, err = cpu.Times(false)
|
||||
require.NoError(t, err)
|
||||
t.Log(c)
|
||||
}
|
9
go.mod
9
go.mod
@ -19,6 +19,7 @@ require (
|
||||
github.com/shirou/gopsutil/v4 v4.24.5
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
github.com/spf13/cobra v1.7.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
github.com/v2rayA/ahocorasick-domain v0.0.0-20231231085011-99ceb8ef3208
|
||||
github.com/vishvananda/netlink v1.1.0
|
||||
github.com/vishvananda/netns v0.0.4
|
||||
@ -37,6 +38,7 @@ require (
|
||||
github.com/awnumar/memguard v0.19.1 // indirect
|
||||
github.com/cloudflare/circl v1.3.7 // indirect
|
||||
github.com/daeuniverse/quic-go v0.0.0-20240413031024-943f218e0810 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/dsnet/compress v0.0.2-0.20210315054119-f66993602bf5 // indirect
|
||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
|
||||
@ -45,11 +47,17 @@ require (
|
||||
github.com/gorilla/websocket v1.5.0 // indirect
|
||||
github.com/klauspost/compress v1.17.4 // indirect
|
||||
github.com/klauspost/pgzip v1.2.5 // indirect
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
|
||||
github.com/nwaples/rardecode v1.1.0 // indirect
|
||||
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
|
||||
github.com/panjf2000/ants v1.3.0 // indirect
|
||||
github.com/pierrec/lz4/v4 v4.1.2 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
|
||||
github.com/quic-go/qpack v0.4.0 // indirect
|
||||
github.com/shoenig/go-m1cpu v0.1.6 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.12 // indirect
|
||||
github.com/tklauser/numcpus v0.6.1 // indirect
|
||||
github.com/ulikunitz/xz v0.5.9 // indirect
|
||||
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.4 // indirect
|
||||
@ -58,6 +66,7 @@ require (
|
||||
golang.org/x/net v0.22.0 // indirect
|
||||
golang.org/x/tools v0.18.0 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
|
17
go.sum
17
go.sum
@ -77,6 +77,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
@ -102,6 +103,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
|
||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||
@ -139,6 +142,8 @@ github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7J
|
||||
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
|
||||
github.com/onsi/gomega v1.27.8 h1:gegWiwZjBsf2DgiSbf5hpokZ98JVDMcWkUiigk6/KXc=
|
||||
github.com/onsi/gomega v1.27.8/go.mod h1:2J8vzI/s+2shY9XHRApDkdgPo1TKT7P2u6fXeJKFnNQ=
|
||||
github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
|
||||
github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
|
||||
github.com/pierrec/lz4/v4 v4.1.2 h1:qvY3YFXRQE/XB8MlLzJH7mSzBs74eA2gg52YTk6jUPM=
|
||||
github.com/pierrec/lz4/v4 v4.1.2/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
@ -158,6 +163,10 @@ github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb h1:XfLJSPIO
|
||||
github.com/seiflotfy/cuckoofilter v0.0.0-20220411075957-e3b120b3f5fb/go.mod h1:bR6DqgcAl1zTcOX8/pE2Qkj9XO00eCNqmKb7lXP8EAg=
|
||||
github.com/shirou/gopsutil/v4 v4.24.5 h1:gGsArG5K6vmsh5hcFOHaPm87UD003CaDMkAOweSQjhM=
|
||||
github.com/shirou/gopsutil/v4 v4.24.5/go.mod h1:aoebb2vxetJ/yIDZISmduFvVNPHqXQ9SEJwRXxkf0RA=
|
||||
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
|
||||
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
|
||||
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
|
||||
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
|
||||
@ -172,6 +181,10 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
|
||||
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
|
||||
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
|
||||
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
|
||||
github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg=
|
||||
github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
|
||||
github.com/ulikunitz/xz v0.5.8/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
|
||||
@ -237,6 +250,8 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
|
||||
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
|
||||
@ -271,6 +286,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
|
||||
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
|
||||
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
|
||||
|
Loading…
Reference in New Issue
Block a user