server/proxy: simplify the code (#3488)

This commit is contained in:
fatedier
2023-06-16 00:14:19 +08:00
committed by GitHub
parent 9ba6a06470
commit e1cef053be
10 changed files with 235 additions and 173 deletions

View File

@ -19,6 +19,7 @@ import (
"fmt"
"io"
"net"
"reflect"
"strconv"
"sync"
"time"
@ -36,6 +37,12 @@ import (
"github.com/fatedier/frp/server/metrics"
)
var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, config.ProxyConf) Proxy{}
func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy, config.ProxyConf) Proxy) {
proxyFactoryRegistry[proxyConfType] = factory
}
type GetWorkConnFn func() (net.Conn, error)
type Proxy interface {
@ -63,6 +70,7 @@ type BaseProxy struct {
limiter *rate.Limiter
userInfo plugin.UserInfo
loginMsg *msg.Login
pxyConf config.ProxyConf
mu sync.RWMutex
xl *xlog.Logger
@ -93,6 +101,10 @@ func (pxy *BaseProxy) GetLoginMsg() *msg.Login {
return pxy.loginMsg
}
func (pxy *BaseProxy) GetLimiter() *rate.Limiter {
return pxy.limiter
}
func (pxy *BaseProxy) Close() {
xl := xlog.FromContextSafe(pxy.ctx)
xl.Info("proxy closing")
@ -155,10 +167,8 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn,
return
}
// startListenHandler start a goroutine handler for each listener.
// p: p will just be passed to handler(Proxy, utilnet.Conn).
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, config.ServerCommonConf)) {
// startCommonTCPListenersHandler start a goroutine handler for each listener.
func (pxy *BaseProxy) startCommonTCPListenersHandler() {
xl := xlog.FromContextSafe(pxy.ctx)
for _, listener := range pxy.listeners {
go func(l net.Listener) {
@ -187,12 +197,72 @@ func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn,
return
}
xl.Info("get a user connection [%s]", c.RemoteAddr().String())
go handler(p, c, pxy.serverCfg)
go pxy.handleUserTCPConnection(c)
}
}(listener)
}
}
// HandleUserTCPConnection is used for incoming user TCP connections.
func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) {
xl := xlog.FromContextSafe(pxy.Context())
defer userConn.Close()
serverCfg := pxy.serverCfg
cfg := pxy.pxyConf.GetBaseConfig()
// server plugin hook
rc := pxy.GetResourceController()
content := &plugin.NewUserConnContent{
User: pxy.GetUserInfo(),
ProxyName: pxy.GetName(),
ProxyType: cfg.ProxyType,
RemoteAddr: userConn.RemoteAddr().String(),
}
_, err := rc.PluginManager.NewUserConn(content)
if err != nil {
xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
return
}
// try all connections from the pool
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
if err != nil {
return
}
defer workConn.Close()
var local io.ReadWriteCloser = workConn
xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
if cfg.UseEncryption {
local, err = libio.WithEncryption(local, []byte(serverCfg.Token))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
local = libio.WithCompression(local)
}
if pxy.GetLimiter() != nil {
local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
return local.Close()
})
}
xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
name := pxy.GetName()
proxyType := cfg.ProxyType
metrics.Server.OpenConnection(name, proxyType)
inCount, outCount, _ := libio.Join(local, userConn)
metrics.Server.CloseConnection(name, proxyType)
metrics.Server.AddTrafficIn(name, proxyType, inCount)
metrics.Server.AddTrafficOut(name, proxyType, outCount)
xl.Debug("join connections closed")
}
func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.ResourceController, poolCount int,
getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf, loginMsg *msg.Login,
) (pxy Proxy, err error) {
@ -216,114 +286,18 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso
ctx: xlog.NewContext(ctx, xl),
userInfo: userInfo,
loginMsg: loginMsg,
pxyConf: pxyConf,
}
switch cfg := pxyConf.(type) {
case *config.TCPProxyConf:
basePxy.usedPortsNum = 1
pxy = &TCPProxy{
BaseProxy: &basePxy,
cfg: cfg,
}
case *config.TCPMuxProxyConf:
pxy = &TCPMuxProxy{
BaseProxy: &basePxy,
cfg: cfg,
}
case *config.HTTPProxyConf:
pxy = &HTTPProxy{
BaseProxy: &basePxy,
cfg: cfg,
}
case *config.HTTPSProxyConf:
pxy = &HTTPSProxy{
BaseProxy: &basePxy,
cfg: cfg,
}
case *config.UDPProxyConf:
basePxy.usedPortsNum = 1
pxy = &UDPProxy{
BaseProxy: &basePxy,
cfg: cfg,
}
case *config.STCPProxyConf:
pxy = &STCPProxy{
BaseProxy: &basePxy,
cfg: cfg,
}
case *config.XTCPProxyConf:
pxy = &XTCPProxy{
BaseProxy: &basePxy,
cfg: cfg,
}
case *config.SUDPProxyConf:
pxy = &SUDPProxy{
BaseProxy: &basePxy,
cfg: cfg,
}
default:
factory := proxyFactoryRegistry[reflect.TypeOf(pxyConf)]
if factory == nil {
return pxy, fmt.Errorf("proxy type not support")
}
return
}
// HandleUserTCPConnection is used for incoming user TCP connections.
// It can be used for tcp, http, https type.
func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.ServerCommonConf) {
xl := xlog.FromContextSafe(pxy.Context())
defer userConn.Close()
// server plugin hook
rc := pxy.GetResourceController()
content := &plugin.NewUserConnContent{
User: pxy.GetUserInfo(),
ProxyName: pxy.GetName(),
ProxyType: pxy.GetConf().GetBaseConfig().ProxyType,
RemoteAddr: userConn.RemoteAddr().String(),
pxy = factory(&basePxy, pxyConf)
if pxy == nil {
return nil, fmt.Errorf("proxy not created")
}
_, err := rc.PluginManager.NewUserConn(content)
if err != nil {
xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
return
}
// try all connections from the pool
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
if err != nil {
return
}
defer workConn.Close()
var local io.ReadWriteCloser = workConn
cfg := pxy.GetConf().GetBaseConfig()
xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
if cfg.UseEncryption {
local, err = libio.WithEncryption(local, []byte(serverCfg.Token))
if err != nil {
xl.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
local = libio.WithCompression(local)
}
if pxy.GetLimiter() != nil {
local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
return local.Close()
})
}
xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
name := pxy.GetName()
proxyType := pxy.GetConf().GetBaseConfig().ProxyType
metrics.Server.OpenConnection(name, proxyType)
inCount, outCount, _ := libio.Join(local, userConn)
metrics.Server.CloseConnection(name, proxyType)
metrics.Server.AddTrafficIn(name, proxyType, inCount)
metrics.Server.AddTrafficOut(name, proxyType, outCount)
xl.Debug("join connections closed")
return pxy, nil
}
type Manager struct {