support bandwidth_limit set by server plugin (#3271)

* support bandwidth_limit set by server plugin

* limiter at proxy level

* bandwidth_limit_mode

* updates tests for bandwidth_limit_mode default

* bandwidth_limit_mode as string

* add checkForSrv for bandwidth_limit_mode

* bandwidth_limit flags for sub cmds

* gci write
This commit is contained in:
Craig O'Donnell
2023-02-08 11:38:36 -05:00
committed by GitHub
parent 113e3b0b0d
commit aa31d7ad0b
27 changed files with 284 additions and 41 deletions

View File

@ -20,8 +20,10 @@ import (
"strings"
frpIo "github.com/fatedier/golib/io"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/util/limit"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/vhost"
@ -135,6 +137,10 @@ func (pxy *HTTPProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HTTPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err error) {
xl := pxy.xl
rAddr, errRet := net.ResolveTCPAddr("tcp", remoteAddr)
@ -160,6 +166,13 @@ func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err err
if pxy.cfg.UseCompression {
rwc = frpIo.WithCompression(rwc)
}
if pxy.GetLimiter() != nil {
rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error {
return rwc.Close()
})
}
workConn = frpNet.WrapReadWriteCloserToConn(rwc, tmpConn)
workConn = frpNet.WrapStatsConn(workConn, pxy.updateStatsAfterClosedConn)
metrics.Server.OpenConnection(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)

View File

@ -17,6 +17,8 @@ package proxy
import (
"strings"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/util/util"
"github.com/fatedier/frp/pkg/util/vhost"
@ -74,6 +76,10 @@ func (pxy *HTTPSProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HTTPSProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *HTTPSProxy) Close() {
pxy.BaseProxy.Close()
}

View File

@ -24,10 +24,12 @@ import (
"time"
frpIo "github.com/fatedier/golib/io"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
plugin "github.com/fatedier/frp/pkg/plugin/server"
"github.com/fatedier/frp/pkg/util/limit"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/pkg/util/xlog"
"github.com/fatedier/frp/server/controller"
@ -45,6 +47,7 @@ type Proxy interface {
GetUsedPortsNum() int
GetResourceController() *controller.ResourceController
GetUserInfo() plugin.UserInfo
GetLimiter() *rate.Limiter
Close()
}
@ -56,6 +59,7 @@ type BaseProxy struct {
poolCount int
getWorkConnFn GetWorkConnFn
serverCfg config.ServerCommonConf
limiter *rate.Limiter
userInfo plugin.UserInfo
mu sync.RWMutex
@ -187,6 +191,13 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso
getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf,
) (pxy Proxy, err error) {
xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName)
var limiter *rate.Limiter
limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
if limitBytes > 0 && pxyConf.GetBaseInfo().BandwidthLimitMode == config.BandwidthLimitModeServer {
limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
}
basePxy := BaseProxy{
name: pxyConf.GetBaseInfo().ProxyName,
rc: rc,
@ -194,6 +205,7 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso
poolCount: poolCount,
getWorkConnFn: getWorkConnFn,
serverCfg: serverCfg,
limiter: limiter,
xl: xl,
ctx: xlog.NewContext(ctx, xl),
userInfo: userInfo,
@ -287,6 +299,13 @@ func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.Serv
if cfg.UseCompression {
local = frpIo.WithCompression(local)
}
if pxy.GetLimiter() != nil {
local = frpIo.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
return local.Close()
})
}
xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())

View File

@ -15,6 +15,8 @@
package proxy
import (
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
)
@ -41,6 +43,10 @@ func (pxy *STCPProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *STCPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *STCPProxy) Close() {
pxy.BaseProxy.Close()
pxy.rc.VisitorManager.CloseListener(pxy.GetName())

View File

@ -15,6 +15,8 @@
package proxy
import (
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
)
@ -42,6 +44,10 @@ func (pxy *SUDPProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *SUDPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *SUDPProxy) Close() {
pxy.BaseProxy.Close()
pxy.rc.VisitorManager.CloseListener(pxy.GetName())

View File

@ -19,6 +19,8 @@ import (
"net"
"strconv"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
)
@ -74,6 +76,10 @@ func (pxy *TCPProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *TCPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *TCPProxy) Close() {
pxy.BaseProxy.Close()
if pxy.cfg.Group == "" {

View File

@ -19,6 +19,8 @@ import (
"net"
"strings"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/consts"
"github.com/fatedier/frp/pkg/util/util"
@ -94,6 +96,10 @@ func (pxy *TCPMuxProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *TCPMuxProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *TCPMuxProxy) Close() {
pxy.BaseProxy.Close()
}

View File

@ -24,10 +24,12 @@ import (
"github.com/fatedier/golib/errors"
frpIo "github.com/fatedier/golib/io"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
"github.com/fatedier/frp/pkg/proto/udp"
"github.com/fatedier/frp/pkg/util/limit"
frpNet "github.com/fatedier/frp/pkg/util/net"
"github.com/fatedier/frp/server/metrics"
)
@ -198,6 +200,12 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
rwc = frpIo.WithCompression(rwc)
}
if pxy.GetLimiter() != nil {
rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error {
return rwc.Close()
})
}
pxy.workConn = frpNet.WrapReadWriteCloserToConn(rwc, workConn)
ctx, cancel := context.WithCancel(context.Background())
go workConnReaderFn(pxy.workConn)
@ -225,6 +233,10 @@ func (pxy *UDPProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *UDPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *UDPProxy) Close() {
pxy.mu.Lock()
defer pxy.mu.Unlock()

View File

@ -18,6 +18,7 @@ import (
"fmt"
"github.com/fatedier/golib/errors"
"golang.org/x/time/rate"
"github.com/fatedier/frp/pkg/config"
"github.com/fatedier/frp/pkg/msg"
@ -88,6 +89,10 @@ func (pxy *XTCPProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *XTCPProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *XTCPProxy) Close() {
pxy.BaseProxy.Close()
pxy.rc.NatHoleController.CloseClient(pxy.GetName())