fix(udp_task_pool): panic: close of closed channel (#570)

This commit is contained in:
mzz 2024-07-15 13:29:58 +08:00 committed by GitHub
parent ffdfadba8a
commit 5810244109
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -10,6 +10,7 @@ const UdpTaskQueueLength = 128
type UdpTask = func() type UdpTask = func()
type UdpTaskQueue struct { type UdpTaskQueue struct {
key string
ch chan UdpTask ch chan UdpTask
timer *time.Timer timer *time.Timer
agingTime time.Duration agingTime time.Duration
@ -47,7 +48,9 @@ func (p *UdpTaskPool) convoy(q *UdpTaskQueue) {
clearloop: clearloop:
for { for {
select { select {
case <-q.ch: case t := <-q.ch:
// Emit it back due to closed q.
p.EmitTask(q.key, t)
default: default:
break clearloop break clearloop
} }
@ -66,6 +69,7 @@ func (p *UdpTaskPool) EmitTask(key string, task UdpTask) {
if !ok { if !ok {
ch := p.queueChPool.Get().(chan UdpTask) ch := p.queueChPool.Get().(chan UdpTask)
q = &UdpTaskQueue{ q = &UdpTaskQueue{
key: key,
ch: ch, ch: ch,
timer: nil, timer: nil,
agingTime: DefaultNatTimeout, agingTime: DefaultNatTimeout,
@ -73,6 +77,12 @@ func (p *UdpTaskPool) EmitTask(key string, task UdpTask) {
freed: make(chan struct{}), freed: make(chan struct{}),
} }
q.timer = time.AfterFunc(q.agingTime, func() { q.timer = time.AfterFunc(q.agingTime, func() {
// This func may be invoked twice due to concurrent Reset.
select {
case <-q.closed:
return
default:
}
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
if p.m[key] == q { if p.m[key] == q {