chore: stash

This commit is contained in:
mzz2017 2023-08-27 19:55:26 +08:00
parent 4fee719ff2
commit 532edc8a69

View File

@ -1,45 +1,86 @@
package control
import (
"container/list"
"context"
"errors"
"math"
"net"
"net/netip"
"os"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
"github.com/daeuniverse/dae/component/outbound/dialer"
"golang.org/x/sys/unix"
"github.com/daeuniverse/softwind/pool"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
)
const (
EnableBatchThreshold = 10
)
type packets struct {
mu sync.Mutex
list *list.List
}
type packetsPair struct {
*net.UDPAddr
*packets
}
type Anyfrom struct {
writeTimes uint32
*net.UDPConn
deadlineTimer *time.Timer
ttl time.Duration
// GSO support is modified from quic-go with many thanks.
gso bool
gotGSOError bool
is4 bool
pc4 *ipv4.PacketConn
pc6 *ipv6.PacketConn
deadlineTimer *time.Timer
ttl time.Duration
muMAddrPackets sync.Mutex
mAddrPackets map[netip.AddrPort]*packets
ticker *time.Ticker
}
func (a *Anyfrom) handleBatch() {
buffer := pool.GetBuffer()
defer pool.PutBuffer(buffer)
for range a.ticker.C {
a.muMAddrPackets.Lock()
var lists = make([]*packetsPair, 0, len(a.mAddrPackets))
for addr, packets := range a.mAddrPackets {
if packets == nil || packets.list.Len() == 0 {
delete(a.mAddrPackets, addr)
continue
}
lists = append(lists, &packetsPair{
AddrPort: addr,
packets: packets,
})
}
a.muMAddrPackets.Unlock()
for _, pair := range lists {
pc := ipv6.NewPacketConn(a.UDPConn)
pc.WriteBatch([]ipv6.Message{{
Buffers: [][]byte{},
Addr: pair.UDPAddr,
}})
ipv6.Message
}
}
}
func (a *Anyfrom) afterWrite(err error) {
if !a.gotGSOError && isGSOError(err) {
a.gotGSOError = true
times := atomic.AddUint32(&a.writeTimes, 1)
if times == EnableBatchThreshold {
go a.handleBatch()
}
a.RefreshTtl()
}
func (a *Anyfrom) RefreshTtl() {
a.deadlineTimer.Reset(a.ttl)
}
func (a *Anyfrom) SupportGso(size int) bool {
if size > math.MaxUint16 {
return false
}
return a.gso && !a.gotGSOError
func (a *Anyfrom) ShouldUseBatch() bool {
return a.writeTimes >= EnableBatchThreshold
}
func (a *Anyfrom) ReadFrom(b []byte) (int, net.Addr, error) {
defer a.RefreshTtl()
@ -67,7 +108,7 @@ func (a *Anyfrom) SyscallConn() (syscall.RawConn, error) {
}
func (a *Anyfrom) WriteMsgUDP(b []byte, oob []byte, addr *net.UDPAddr) (n int, oobn int, err error) {
defer a.afterWrite(err)
if a.SupportGso(len(b)) {
if a.ShouldUseBatch() {
return a.UDPConn.WriteMsgUDP(b, appendUDPSegmentSizeMsg(oob, uint16(len(b))), addr)
}
return a.UDPConn.WriteMsgUDP(b, oob, addr)
@ -104,53 +145,6 @@ func (a *Anyfrom) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (n int, err
return a.UDPConn.WriteToUDPAddrPort(b, addr)
}
// isGSOSupported tests if the kernel supports GSO.
// Sending with GSO might still fail later on, if the interface doesn't support it (see isGSOError).
func isGSOSupported(uc *net.UDPConn) bool {
// We disable GSO because we haven't thought through how to design to use larger packets.
return false
conn, err := uc.SyscallConn()
if err != nil {
return false
}
disabled, err := strconv.ParseBool(os.Getenv("DAE_DISABLE_GSO"))
if err == nil && disabled {
return false
}
var serr error
if err := conn.Control(func(fd uintptr) {
_, serr = unix.GetsockoptInt(int(fd), unix.IPPROTO_UDP, unix.UDP_SEGMENT)
}); err != nil {
return false
}
return serr == nil
}
func isGSOError(err error) bool {
var serr *os.SyscallError
if errors.As(err, &serr) {
// EIO is returned by udp_send_skb() if the device driver does not have tx checksums enabled,
// which is a hard requirement of UDP_SEGMENT. See:
// https://git.kernel.org/pub/scm/docs/man-pages/man-pages.git/tree/man7/udp.7?id=806eabd74910447f21005160e90957bde4db0183#n228
// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/net/ipv4/udp.c?h=v6.2&id=c9c3395d5e3dcc6daee66c6908354d47bf98cb0c#n942
return serr.Err == unix.EIO
}
return false
}
func appendUDPSegmentSizeMsg(b []byte, size uint16) []byte {
startLen := len(b)
const dataLen = 2 // payload is a uint16
b = append(b, make([]byte, unix.CmsgSpace(dataLen))...)
h := (*unix.Cmsghdr)(unsafe.Pointer(&b[startLen]))
h.Level = syscall.IPPROTO_UDP
h.Type = unix.UDP_SEGMENT
h.SetLen(unix.CmsgLen(dataLen))
// UnixRights uses the private `data` method, but I *think* this achieves the same goal.
offset := startLen + unix.CmsgSpace(0)
*(*uint16)(unsafe.Pointer(&b[offset])) = size
return b
}
// AnyfromPool is a full-cone udp listener pool
type AnyfromPool struct {
pool map[string]*Anyfrom
@ -166,14 +160,15 @@ func NewAnyfromPool() *AnyfromPool {
}
}
func (p *AnyfromPool) GetOrCreate(lAddr string, ttl time.Duration) (conn *Anyfrom, isNew bool, err error) {
func (p *AnyfromPool) GetOrCreate(lAddr netip.AddrPort, ttl time.Duration) (conn *Anyfrom, isNew bool, err error) {
strLAddr := lAddr.String()
p.mu.RLock()
af, ok := p.pool[lAddr]
af, ok := p.pool[strLAddr]
if !ok {
p.mu.RUnlock()
p.mu.Lock()
defer p.mu.Unlock()
if af, ok = p.pool[lAddr]; ok {
if af, ok = p.pool[strLAddr]; ok {
return af, false, nil
}
// Create an Anyfrom.
@ -184,28 +179,33 @@ func (p *AnyfromPool) GetOrCreate(lAddr string, ttl time.Duration) (conn *Anyfro
},
KeepAlive: 0,
}
pc, err := d.ListenPacket(context.Background(), "udp", lAddr)
pc, err := d.ListenPacket(context.Background(), "udp", strLAddr)
if err != nil {
return nil, true, err
}
uConn := pc.(*net.UDPConn)
af = &Anyfrom{
UDPConn: uConn,
deadlineTimer: nil,
ttl: ttl,
gotGSOError: false,
gso: isGSOSupported(uConn),
writeTimes: 0,
UDPConn: uConn,
is4: lAddr.Addr().Is4(),
pc4: nil,
pc6: nil,
deadlineTimer: nil,
ttl: ttl,
muMAddrPackets: sync.Mutex{},
mAddrPackets: make(map[netip.AddrPort]*packets, 4),
ticker: &time.Ticker{},
}
af.deadlineTimer = time.AfterFunc(ttl, func() {
p.mu.Lock()
defer p.mu.Unlock()
_af := p.pool[lAddr]
_af := p.pool[strLAddr]
if _af == af {
delete(p.pool, lAddr)
delete(p.pool, strLAddr)
af.Close()
}
})
p.pool[lAddr] = af
p.pool[strLAddr] = af
return af, true, nil
} else {
af.RefreshTtl()