From 532edc8a69fa7073afe9edbb767d855d42a97fa2 Mon Sep 17 00:00:00 2001 From: mzz2017 <2017@duck.com> Date: Sun, 27 Aug 2023 19:55:26 +0800 Subject: [PATCH] chore: stash --- control/anyfrom_pool.go | 156 ++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 78 deletions(-) diff --git a/control/anyfrom_pool.go b/control/anyfrom_pool.go index fc5bbc6..c44e101 100644 --- a/control/anyfrom_pool.go +++ b/control/anyfrom_pool.go @@ -1,45 +1,86 @@ package control import ( + "container/list" "context" - "errors" - "math" "net" "net/netip" - "os" - "strconv" "sync" + "sync/atomic" "syscall" "time" - "unsafe" "github.com/daeuniverse/dae/component/outbound/dialer" - "golang.org/x/sys/unix" + "github.com/daeuniverse/softwind/pool" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" ) +const ( + EnableBatchThreshold = 10 +) + +type packets struct { + mu sync.Mutex + list *list.List +} +type packetsPair struct { + *net.UDPAddr + *packets +} type Anyfrom struct { + writeTimes uint32 + *net.UDPConn - deadlineTimer *time.Timer - ttl time.Duration - // GSO support is modified from quic-go with many thanks. - gso bool - gotGSOError bool + is4 bool + pc4 *ipv4.PacketConn + pc6 *ipv6.PacketConn + deadlineTimer *time.Timer + ttl time.Duration + muMAddrPackets sync.Mutex + mAddrPackets map[netip.AddrPort]*packets + ticker *time.Ticker } +func (a *Anyfrom) handleBatch() { + buffer := pool.GetBuffer() + defer pool.PutBuffer(buffer) + for range a.ticker.C { + a.muMAddrPackets.Lock() + var lists = make([]*packetsPair, 0, len(a.mAddrPackets)) + for addr, packets := range a.mAddrPackets { + if packets == nil || packets.list.Len() == 0 { + delete(a.mAddrPackets, addr) + continue + } + lists = append(lists, &packetsPair{ + AddrPort: addr, + packets: packets, + }) + } + a.muMAddrPackets.Unlock() + for _, pair := range lists { + pc := ipv6.NewPacketConn(a.UDPConn) + pc.WriteBatch([]ipv6.Message{{ + Buffers: [][]byte{}, + Addr: pair.UDPAddr, + }}) + ipv6.Message + } + } +} func (a *Anyfrom) afterWrite(err error) { - if !a.gotGSOError && isGSOError(err) { - a.gotGSOError = true + times := atomic.AddUint32(&a.writeTimes, 1) + if times == EnableBatchThreshold { + go a.handleBatch() } a.RefreshTtl() } func (a *Anyfrom) RefreshTtl() { a.deadlineTimer.Reset(a.ttl) } -func (a *Anyfrom) SupportGso(size int) bool { - if size > math.MaxUint16 { - return false - } - return a.gso && !a.gotGSOError +func (a *Anyfrom) ShouldUseBatch() bool { + return a.writeTimes >= EnableBatchThreshold } func (a *Anyfrom) ReadFrom(b []byte) (int, net.Addr, error) { defer a.RefreshTtl() @@ -67,7 +108,7 @@ func (a *Anyfrom) SyscallConn() (syscall.RawConn, error) { } func (a *Anyfrom) WriteMsgUDP(b []byte, oob []byte, addr *net.UDPAddr) (n int, oobn int, err error) { defer a.afterWrite(err) - if a.SupportGso(len(b)) { + if a.ShouldUseBatch() { return a.UDPConn.WriteMsgUDP(b, appendUDPSegmentSizeMsg(oob, uint16(len(b))), addr) } return a.UDPConn.WriteMsgUDP(b, oob, addr) @@ -104,53 +145,6 @@ func (a *Anyfrom) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (n int, err return a.UDPConn.WriteToUDPAddrPort(b, addr) } -// isGSOSupported tests if the kernel supports GSO. -// Sending with GSO might still fail later on, if the interface doesn't support it (see isGSOError). -func isGSOSupported(uc *net.UDPConn) bool { - // We disable GSO because we haven't thought through how to design to use larger packets. - return false - conn, err := uc.SyscallConn() - if err != nil { - return false - } - disabled, err := strconv.ParseBool(os.Getenv("DAE_DISABLE_GSO")) - if err == nil && disabled { - return false - } - var serr error - if err := conn.Control(func(fd uintptr) { - _, serr = unix.GetsockoptInt(int(fd), unix.IPPROTO_UDP, unix.UDP_SEGMENT) - }); err != nil { - return false - } - return serr == nil -} -func isGSOError(err error) bool { - var serr *os.SyscallError - if errors.As(err, &serr) { - // EIO is returned by udp_send_skb() if the device driver does not have tx checksums enabled, - // which is a hard requirement of UDP_SEGMENT. See: - // https://git.kernel.org/pub/scm/docs/man-pages/man-pages.git/tree/man7/udp.7?id=806eabd74910447f21005160e90957bde4db0183#n228 - // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/net/ipv4/udp.c?h=v6.2&id=c9c3395d5e3dcc6daee66c6908354d47bf98cb0c#n942 - return serr.Err == unix.EIO - } - return false -} -func appendUDPSegmentSizeMsg(b []byte, size uint16) []byte { - startLen := len(b) - const dataLen = 2 // payload is a uint16 - b = append(b, make([]byte, unix.CmsgSpace(dataLen))...) - h := (*unix.Cmsghdr)(unsafe.Pointer(&b[startLen])) - h.Level = syscall.IPPROTO_UDP - h.Type = unix.UDP_SEGMENT - h.SetLen(unix.CmsgLen(dataLen)) - - // UnixRights uses the private `data` method, but I *think* this achieves the same goal. - offset := startLen + unix.CmsgSpace(0) - *(*uint16)(unsafe.Pointer(&b[offset])) = size - return b -} - // AnyfromPool is a full-cone udp listener pool type AnyfromPool struct { pool map[string]*Anyfrom @@ -166,14 +160,15 @@ func NewAnyfromPool() *AnyfromPool { } } -func (p *AnyfromPool) GetOrCreate(lAddr string, ttl time.Duration) (conn *Anyfrom, isNew bool, err error) { +func (p *AnyfromPool) GetOrCreate(lAddr netip.AddrPort, ttl time.Duration) (conn *Anyfrom, isNew bool, err error) { + strLAddr := lAddr.String() p.mu.RLock() - af, ok := p.pool[lAddr] + af, ok := p.pool[strLAddr] if !ok { p.mu.RUnlock() p.mu.Lock() defer p.mu.Unlock() - if af, ok = p.pool[lAddr]; ok { + if af, ok = p.pool[strLAddr]; ok { return af, false, nil } // Create an Anyfrom. @@ -184,28 +179,33 @@ func (p *AnyfromPool) GetOrCreate(lAddr string, ttl time.Duration) (conn *Anyfro }, KeepAlive: 0, } - pc, err := d.ListenPacket(context.Background(), "udp", lAddr) + pc, err := d.ListenPacket(context.Background(), "udp", strLAddr) if err != nil { return nil, true, err } uConn := pc.(*net.UDPConn) af = &Anyfrom{ - UDPConn: uConn, - deadlineTimer: nil, - ttl: ttl, - gotGSOError: false, - gso: isGSOSupported(uConn), + writeTimes: 0, + UDPConn: uConn, + is4: lAddr.Addr().Is4(), + pc4: nil, + pc6: nil, + deadlineTimer: nil, + ttl: ttl, + muMAddrPackets: sync.Mutex{}, + mAddrPackets: make(map[netip.AddrPort]*packets, 4), + ticker: &time.Ticker{}, } af.deadlineTimer = time.AfterFunc(ttl, func() { p.mu.Lock() defer p.mu.Unlock() - _af := p.pool[lAddr] + _af := p.pool[strLAddr] if _af == af { - delete(p.pool, lAddr) + delete(p.pool, strLAddr) af.Close() } }) - p.pool[lAddr] = af + p.pool[strLAddr] = af return af, true, nil } else { af.RefreshTtl()