/* * SPDX-License-Identifier: AGPL-3.0-only * Copyright (c) 2022-2023, daeuniverse Organization */ package control import ( "context" "errors" "fmt" "net" "net/netip" "os" "regexp" "sync" "github.com/cilium/ebpf" ciliumLink "github.com/cilium/ebpf/link" "github.com/daeuniverse/dae/common" "github.com/daeuniverse/dae/common/consts" internal "github.com/daeuniverse/dae/pkg/ebpf_internal" dnsmessage "github.com/miekg/dns" "github.com/mohae/deepcopy" "github.com/safchain/ethtool" "github.com/sirupsen/logrus" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" ) // coreFlip should be 0 or 1 var coreFlip = 0 type controlPlaneCore struct { mu sync.Mutex log *logrus.Logger deferFuncs []func() error bpf *bpfObjects outboundId2Name map[uint8]string kernelVersion *internal.Version flip int isReload bool bpfEjected bool closed context.Context close context.CancelFunc } func newControlPlaneCore(log *logrus.Logger, bpf *bpfObjects, outboundId2Name map[uint8]string, kernelVersion *internal.Version, isReload bool, ) *controlPlaneCore { if isReload { coreFlip = coreFlip&1 ^ 1 } var deferFuncs []func() error if !isReload { deferFuncs = append(deferFuncs, bpf.Close) } closed, toClose := context.WithCancel(context.Background()) return &controlPlaneCore{ log: log, deferFuncs: deferFuncs, bpf: bpf, outboundId2Name: outboundId2Name, kernelVersion: kernelVersion, flip: coreFlip, isReload: isReload, bpfEjected: false, closed: closed, close: toClose, } } func (c *controlPlaneCore) Flip() { coreFlip = coreFlip&1 ^ 1 } func (c *controlPlaneCore) Close() (err error) { c.mu.Lock() defer c.mu.Unlock() select { case <-c.closed.Done(): return nil default: } // Invoke defer funcs in reverse order. for i := len(c.deferFuncs) - 1; i >= 0; i-- { if e := c.deferFuncs[i](); e != nil { // Combine errors. if err != nil { err = fmt.Errorf("%w; %v", err, e) } else { err = e } } } c.close() return err } func getIfParamsFromLink(link netlink.Link) (ifParams bpfIfParams, err error) { // Get link offload features. et, err := ethtool.NewEthtool() if err != nil { return bpfIfParams{}, err } defer et.Close() features, err := et.Features(link.Attrs().Name) if err != nil { return bpfIfParams{}, err } if features["tx-checksum-ip-generic"] { ifParams.TxL4CksmIp4Offload = true ifParams.TxL4CksmIp6Offload = true } if features["tx-checksum-ipv4"] { ifParams.TxL4CksmIp4Offload = true } if features["tx-checksum-ipv6"] { ifParams.TxL4CksmIp6Offload = true } if features["rx-checksum"] { ifParams.RxCksmOffload = true } switch { case regexp.MustCompile(`^docker\d+$`).MatchString(link.Attrs().Name): ifParams.UseNonstandardOffloadAlgorithm = true default: } return ifParams, nil } func (c *controlPlaneCore) mapLinkType(ifname string) error { link, err := netlink.LinkByName(ifname) if err != nil { return err } linkType := uint32(0xffff) switch link.Attrs().EncapType { case "none": linkType = consts.LinkType_None case "ether": linkType = consts.LinkType_Ethernet default: } return c.bpf.bpfMaps.LinktypeMap.Update(uint32(link.Attrs().Index), linkType, ebpf.UpdateAny) } func (c *controlPlaneCore) addQdisc(ifname string) error { link, err := netlink.LinkByName(ifname) if err != nil { return err } qdisc := &netlink.GenericQdisc{ QdiscAttrs: netlink.QdiscAttrs{ LinkIndex: link.Attrs().Index, Handle: netlink.MakeHandle(0xffff, 0), Parent: netlink.HANDLE_CLSACT, }, QdiscType: "clsact", } if err := netlink.QdiscAdd(qdisc); err != nil { return fmt.Errorf("cannot add clsact qdisc: %w", err) } return nil } func (c *controlPlaneCore) delQdisc(ifname string) error { link, err := netlink.LinkByName(ifname) if err != nil { return err } qdisc := &netlink.GenericQdisc{ QdiscAttrs: netlink.QdiscAttrs{ LinkIndex: link.Attrs().Index, Handle: netlink.MakeHandle(0xffff, 0), Parent: netlink.HANDLE_CLSACT, }, QdiscType: "clsact", } if err := netlink.QdiscDel(qdisc); err != nil { if !os.IsExist(err) { return fmt.Errorf("cannot add clsact qdisc: %w", err) } } return nil } func (c *controlPlaneCore) setupRoutingPolicy() (err error) { /// Insert ip rule / ip route. var table = 2023 + c.flip /** ip table ip route add local default dev lo table 2023 ip -6 route add local default dev lo table 2023 */ routes := []netlink.Route{{ Scope: unix.RT_SCOPE_HOST, LinkIndex: consts.LoopbackIfIndex, Dst: &net.IPNet{ IP: []byte{0, 0, 0, 0}, Mask: net.CIDRMask(0, 32), }, Table: table, Type: unix.RTN_LOCAL, }, { Scope: unix.RT_SCOPE_HOST, LinkIndex: consts.LoopbackIfIndex, Dst: &net.IPNet{ IP: []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, Mask: net.CIDRMask(0, 128), }, Table: table, Type: unix.RTN_LOCAL, }} var routeBadIpv6 bool cleanRoutes := func() error { var errs error for _, route := range routes { if e := netlink.RouteDel(&route); e != nil { if len(route.Dst.IP) == net.IPv6len && routeBadIpv6 { // Not clean for bad ipv6. continue } if errs != nil { errs = fmt.Errorf("%w; %v", errs, e) } else { errs = e } } } if errs != nil { return fmt.Errorf("IpRouteDel(lo): %w", errs) } return nil } tryRouteAddAgain: for _, route := range routes { if err = netlink.RouteAdd(&route); err != nil { if os.IsExist(err) { _ = cleanRoutes() goto tryRouteAddAgain } if len(route.Dst.IP) == net.IPv6len { // ipv6 c.log.Warnln("IpRouteAdd: Bad IPv6 support. Perhaps your machine disabled IPv6.") routeBadIpv6 = true continue } return fmt.Errorf("IpRouteAdd: %w", err) } } c.deferFuncs = append(c.deferFuncs, cleanRoutes) /** ip rule ip rule add fwmark 0x8000000/0x8000000 table 2023 ip -6 rule add fwmark 0x8000000/0x8000000 table 2023 */ rules := []netlink.Rule{{ SuppressIfgroup: -1, SuppressPrefixlen: -1, Priority: -1, Goto: -1, Flow: -1, Family: unix.AF_INET, Table: table, Mark: int(consts.TproxyMark), Mask: int(consts.TproxyMark), }, { SuppressIfgroup: -1, SuppressPrefixlen: -1, Priority: -1, Goto: -1, Flow: -1, Family: unix.AF_INET6, Table: table, Mark: int(consts.TproxyMark), Mask: int(consts.TproxyMark), }} var ruleBadIpv6 bool cleanRules := func() error { var errs error for _, rule := range rules { if rule.Family == unix.AF_INET6 && ruleBadIpv6 { // Not clean for bad ipv6. continue } if e := netlink.RuleDel(&rule); e != nil { if errs != nil { errs = fmt.Errorf("%w; %v", errs, e) } else { errs = e } } } if errs != nil { return fmt.Errorf("IpRuleDel: %w", errs) } return nil } tryRuleAddAgain: for _, rule := range rules { if err = netlink.RuleAdd(&rule); err != nil { if os.IsExist(err) { _ = cleanRules() goto tryRuleAddAgain } if rule.Family == unix.AF_INET6 { // ipv6 c.log.Warnln("IpRuleAdd: Bad IPv6 support. Perhaps your machine disabled IPv6 (need CONFIG_IPV6_MULTIPLE_TABLES).") ruleBadIpv6 = true continue } return fmt.Errorf("IpRuleAdd: %w", err) } } c.deferFuncs = append(c.deferFuncs, cleanRules) return nil } func (c *controlPlaneCore) addLinkCb(_ifname string, rtmType uint16, cb func()) error { ch := make(chan netlink.LinkUpdate) done := make(chan struct{}) if e := netlink.LinkSubscribeWithOptions(ch, done, netlink.LinkSubscribeOptions{ ErrorCallback: func(err error) { c.log.Debug("LinkSubscribe:", err) }, ListExisting: true, }); e != nil { return e } go func(ctx context.Context, ch <-chan netlink.LinkUpdate, done chan struct{}) { for { select { case <-ctx.Done(): close(done) return case <-done: return case update := <-ch: if update.Header.Type == rtmType { ifname := update.Link.Attrs().Name if ifname == _ifname { cb() close(done) return } } } } }(c.closed, ch, done) return nil } // addNewLinkBindLanCb waits for NEWLINK msg of given `ifname` and invokes `bindLan`. func (c *controlPlaneCore) addNewLinkBindLanCb(ifname string, autoConfigKernelParameter bool) error { return c.addLinkCb(ifname, unix.RTM_NEWLINK, func() { c.log.Warnf("New link creation of '%v' is detected. Bind LAN program to it.", ifname) if err := c.addQdisc(ifname); err != nil { c.log.Errorf("addQdisc: %v", err) return } if err := c.bindLan(ifname, autoConfigKernelParameter); err != nil { c.log.Errorf("bindLan: %v", err) } }) } // bindLan automatically configures kernel parameters and bind to lan interface `ifname`. // bindLan supports lazy-bind if interface `ifname` is not found. // bindLan supports rebinding when the interface `ifname` is deleted in the future. func (c *controlPlaneCore) bindLan(ifname string, autoConfigKernelParameter bool) error { if autoConfigKernelParameter { SetSendRedirects(ifname, "0") SetForwarding(ifname, "1") } if err := c._bindLan(ifname); err != nil { var notFoundErr netlink.LinkNotFoundError if !errors.As(err, ¬FoundErr) { return err } // Not found error. // Listen for `NEWLINK` to bind. c.log.Warnf("Link '%v' is not found. Bind LAN program to it once it is created.", ifname) if e := c.addNewLinkBindLanCb(ifname, autoConfigKernelParameter); e != nil { return fmt.Errorf("%w: %v", err, e) } return nil } // Listen for `DELLINK` and add `NEWLINK` callback to re-bind. if err := c.addLinkCb(ifname, unix.RTM_DELLINK, func() { c.log.Warnf("Link deletion of '%v' is detected. Bind LAN program to it once it is re-created.", ifname) if e := c.addNewLinkBindLanCb(ifname, autoConfigKernelParameter); e != nil { c.log.Errorf("Failed to add callback for re-bind LAN program to '%v': %v", ifname, e) } }); err != nil { return fmt.Errorf("failed to add re-bind callback: %w", err) } return nil } func (c *controlPlaneCore) _bindLan(ifname string) error { c.mu.Lock() defer c.mu.Unlock() select { case <-c.closed.Done(): return nil default: } c.log.Infof("Bind to LAN: %v", ifname) link, err := netlink.LinkByName(ifname) if err != nil { return err } if err = CheckIpforward(ifname); err != nil { return err } if err = CheckSendRedirects(ifname); err != nil { return err } _ = c.addQdisc(ifname) _ = c.mapLinkType(ifname) /// Insert an elem into IfindexParamsMap. ifParams, err := getIfParamsFromLink(link) if err != nil { return err } if err = ifParams.CheckVersionRequirement(c.kernelVersion); err != nil { return err } if err := c.bpf.IfindexParamsMap.Update(uint32(link.Attrs().Index), ifParams, ebpf.UpdateAny); err != nil { return fmt.Errorf("update IfindexIpsMap: %w", err) } // Insert filters. filterIngress := &netlink.BpfFilter{ FilterAttrs: netlink.FilterAttrs{ LinkIndex: link.Attrs().Index, Parent: netlink.HANDLE_MIN_INGRESS, Handle: netlink.MakeHandle(0x2023, 0b100+uint16(c.flip)), Protocol: unix.ETH_P_ALL, // Priority should be behind of WAN's Priority: 2, }, Fd: c.bpf.bpfPrograms.TproxyLanIngress.FD(), Name: consts.AppName + "_lan_ingress", DirectAction: true, } // Remove and add. _ = netlink.FilterDel(filterIngress) if !c.isReload { // Clean up thoroughly. filterIngressFlipped := deepcopy.Copy(filterIngress).(*netlink.BpfFilter) filterIngressFlipped.FilterAttrs.Handle ^= 1 _ = netlink.FilterDel(filterIngressFlipped) } if err := netlink.FilterAdd(filterIngress); err != nil { return fmt.Errorf("cannot attach ebpf object to filter ingress: %w", err) } c.deferFuncs = append(c.deferFuncs, func() error { if err := netlink.FilterDel(filterIngress); err != nil { return fmt.Errorf("FilterDel(%v:%v): %w", ifname, filterIngress.Name, err) } return nil }) // Insert filters. filterEgress := &netlink.BpfFilter{ FilterAttrs: netlink.FilterAttrs{ LinkIndex: link.Attrs().Index, Parent: netlink.HANDLE_MIN_EGRESS, Handle: netlink.MakeHandle(0x2023, 0b010+uint16(c.flip)), Protocol: unix.ETH_P_ALL, // Priority should be front of WAN's Priority: 1, }, Fd: c.bpf.bpfPrograms.TproxyLanEgress.FD(), Name: consts.AppName + "_lan_egress", DirectAction: true, } // Remove and add. _ = netlink.FilterDel(filterEgress) if !c.isReload { // Clean up thoroughly. filterEgressFlipped := deepcopy.Copy(filterEgress).(*netlink.BpfFilter) filterEgressFlipped.FilterAttrs.Handle ^= 1 _ = netlink.FilterDel(filterEgressFlipped) } if err := netlink.FilterAdd(filterEgress); err != nil { return fmt.Errorf("cannot attach ebpf object to filter egress: %w", err) } c.deferFuncs = append(c.deferFuncs, func() error { if err := netlink.FilterDel(filterEgress); err != nil { return fmt.Errorf("FilterDel(%v:%v): %w", ifname, filterEgress.Name, err) } return nil }) return nil } func (c *controlPlaneCore) setupSkPidMonitor() error { /// Set-up SrcPidMapper. /// Attach programs to support pname routing. // Get the first-mounted cgroupv2 path. cgroupPath, err := detectCgroupPath() if err != nil { return err } // Bind cg programs type cgProg struct { Name string Prog *ebpf.Program Attach ebpf.AttachType } cgProgs := []cgProg{ {Prog: c.bpf.TproxyWanCgSockCreate, Attach: ebpf.AttachCGroupInetSockCreate}, {Prog: c.bpf.TproxyWanCgSockRelease, Attach: ebpf.AttachCgroupInetSockRelease}, {Prog: c.bpf.TproxyWanCgConnect4, Attach: ebpf.AttachCGroupInet4Connect}, {Prog: c.bpf.TproxyWanCgConnect6, Attach: ebpf.AttachCGroupInet6Connect}, {Prog: c.bpf.TproxyWanCgSendmsg4, Attach: ebpf.AttachCGroupUDP4Sendmsg}, {Prog: c.bpf.TproxyWanCgSendmsg6, Attach: ebpf.AttachCGroupUDP6Sendmsg}, } for _, prog := range cgProgs { attached, err := ciliumLink.AttachCgroup(ciliumLink.CgroupOptions{ Path: cgroupPath, Attach: prog.Attach, Program: prog.Prog, }) if err != nil { return fmt.Errorf("AttachCgroup: %v: %w", prog.Prog.String(), err) } c.deferFuncs = append(c.deferFuncs, func() error { if err := attached.Close(); err != nil { return fmt.Errorf("inet6Bind.Close(): %w", err) } return nil }) } return nil } func (c *controlPlaneCore) bindWan(ifname string) error { return c._bindWan(ifname) } func (c *controlPlaneCore) _bindWan(ifname string) error { c.mu.Lock() defer c.mu.Unlock() select { case <-c.closed.Done(): return nil default: } c.log.Infof("Bind to WAN: %v", ifname) link, err := netlink.LinkByName(ifname) if err != nil { return err } if link.Attrs().Index == consts.LoopbackIfIndex { return fmt.Errorf("cannot bind to loopback interface") } _ = c.addQdisc(ifname) _ = c.mapLinkType(ifname) /// Insert an elem into IfindexParamsMap. ifParams, err := getIfParamsFromLink(link) if err != nil { return err } if err = ifParams.CheckVersionRequirement(c.kernelVersion); err != nil { return err } if err := c.bpf.IfindexParamsMap.Update(uint32(link.Attrs().Index), ifParams, ebpf.UpdateAny); err != nil { return fmt.Errorf("update IfindexIpsMap: %w", err) } /// Set-up WAN ingress/egress TC programs. // Insert TC filters filterEgress := &netlink.BpfFilter{ FilterAttrs: netlink.FilterAttrs{ LinkIndex: link.Attrs().Index, Parent: netlink.HANDLE_MIN_EGRESS, Handle: netlink.MakeHandle(0x2023, 0b100+uint16(c.flip)), Protocol: unix.ETH_P_ALL, Priority: 2, }, Fd: c.bpf.bpfPrograms.TproxyWanEgress.FD(), Name: consts.AppName + "_wan_egress", DirectAction: true, } _ = netlink.FilterDel(filterEgress) // Remove and add. if !c.isReload { // Clean up thoroughly. filterEgressFlipped := deepcopy.Copy(filterEgress).(*netlink.BpfFilter) filterEgressFlipped.FilterAttrs.Handle ^= 1 _ = netlink.FilterDel(filterEgressFlipped) } if err := netlink.FilterAdd(filterEgress); err != nil { return fmt.Errorf("cannot attach ebpf object to filter egress: %w", err) } c.deferFuncs = append(c.deferFuncs, func() error { if err := netlink.FilterDel(filterEgress); err != nil && !os.IsNotExist(err) { return fmt.Errorf("FilterDel(%v:%v): %w", ifname, filterEgress.Name, err) } return nil }) filterIngress := &netlink.BpfFilter{ FilterAttrs: netlink.FilterAttrs{ LinkIndex: link.Attrs().Index, Parent: netlink.HANDLE_MIN_INGRESS, Handle: netlink.MakeHandle(0x2023, 0b010+uint16(c.flip)), Protocol: unix.ETH_P_ALL, Priority: 1, }, Fd: c.bpf.bpfPrograms.TproxyWanIngress.FD(), Name: consts.AppName + "_wan_ingress", DirectAction: true, } _ = netlink.FilterDel(filterIngress) // Remove and add. if !c.isReload { // Clean up thoroughly. filterIngressFlipped := deepcopy.Copy(filterIngress).(*netlink.BpfFilter) filterIngressFlipped.FilterAttrs.Handle ^= 1 _ = netlink.FilterDel(filterIngressFlipped) } if err := netlink.FilterAdd(filterIngress); err != nil { return fmt.Errorf("cannot attach ebpf object to filter ingress: %w", err) } c.deferFuncs = append(c.deferFuncs, func() error { if err := netlink.FilterDel(filterIngress); err != nil { return fmt.Errorf("FilterDel(%v:%v): %w", ifname, filterIngress.Name, err) } return nil }) return nil } // BatchUpdateDomainRouting update bpf map domain_routing. Since one IP may have multiple domains, this function should // be invoked every A/AAAA-record lookup. func (c *controlPlaneCore) BatchUpdateDomainRouting(cache *DnsCache) error { // Parse ips from DNS resp answers. var ips []netip.Addr for _, ans := range cache.Answer { var ( ip netip.Addr ok bool ) switch body := ans.(type) { case *dnsmessage.A: ip, ok = netip.AddrFromSlice(body.A) case *dnsmessage.AAAA: ip, ok = netip.AddrFromSlice(body.AAAA) } if !ok || ip.IsUnspecified() { continue } ips = append(ips, ip) } if len(ips) == 0 { return nil } // Update bpf map. // Construct keys and vals, and BpfMapBatchUpdate. var keys [][4]uint32 var vals []bpfDomainRouting for _, ip := range ips { ip6 := ip.As16() keys = append(keys, common.Ipv6ByteSliceToUint32Array(ip6[:])) r := bpfDomainRouting{} if len(cache.DomainBitmap) != len(r.Bitmap) { return fmt.Errorf("domain bitmap length not sync with kern program") } copy(r.Bitmap[:], cache.DomainBitmap) vals = append(vals, r) } if _, err := BpfMapBatchUpdate(c.bpf.DomainRoutingMap, keys, vals, &ebpf.BatchOptions{ ElemFlags: uint64(ebpf.UpdateAny), }); err != nil { return err } return nil } // BatchRemoveDomainRouting remove bpf map domain_routing. func (c *controlPlaneCore) BatchRemoveDomainRouting(cache *DnsCache) error { // Parse ips from DNS resp answers. var ips []netip.Addr for _, ans := range cache.Answer { var ( ip netip.Addr ok bool ) switch body := ans.(type) { case *dnsmessage.A: ip, ok = netip.AddrFromSlice(body.A) case *dnsmessage.AAAA: ip, ok = netip.AddrFromSlice(body.AAAA) } if !ok || ip.IsUnspecified() { continue } ips = append(ips, ip) } if len(ips) == 0 { return nil } // Update bpf map. // Construct keys and vals, and BpfMapBatchUpdate. var keys [][4]uint32 for _, ip := range ips { ip6 := ip.As16() keys = append(keys, common.Ipv6ByteSliceToUint32Array(ip6[:])) } if _, err := BpfMapBatchDelete(c.bpf.DomainRoutingMap, keys); err != nil { return err } return nil } // EjectBpf will resect bpf from destroying life-cycle of control plane core. func (c *controlPlaneCore) EjectBpf() *bpfObjects { if !c.bpfEjected && !c.isReload { c.deferFuncs = c.deferFuncs[1:] } c.bpfEjected = true return c.bpf } // InjectBpf will inject bpf back. func (c *controlPlaneCore) InjectBpf(bpf *bpfObjects) { if c.bpfEjected { c.bpfEjected = false c.deferFuncs = append([]func() error{bpf.Close}, c.deferFuncs...) } return }