type http/tcpmux proxy support route_by_http_user, tcpmux support passthourgh mode (#2932)

This commit is contained in:
fatedier
2022-05-26 23:57:30 +08:00
committed by GitHub
parent bd89eaba2f
commit 4af85da0c2
22 changed files with 606 additions and 283 deletions

View File

@ -458,11 +458,11 @@ func (ctl *Control) manager() {
ProxyName: m.ProxyName,
}
if err != nil {
xl.Warn("new proxy [%s] error: %v", m.ProxyName, err)
xl.Warn("new proxy [%s] type [%s] error: %v", m.ProxyName, m.ProxyType, err)
resp.Error = util.GenerateResponseErrorString(fmt.Sprintf("new proxy [%s] error", m.ProxyName), err, ctl.serverCfg.DetailedErrorsToClient)
} else {
resp.RemoteAddr = remoteAddr
xl.Info("new proxy [%s] success", m.ProxyName)
xl.Info("new proxy [%s] type [%s] success", m.ProxyName, m.ProxyType)
metrics.Server.NewProxy(m.ProxyName, m.ProxyType)
}
ctl.sendCh <- resp

View File

@ -10,8 +10,11 @@ import (
)
type HTTPGroupController struct {
// groups by indexKey
groups map[string]*HTTPGroup
// register createConn for each group to vhostRouter.
// createConn will get a connection from one proxy of the group
vhostRouter *vhost.Routers
mu sync.Mutex
@ -24,10 +27,12 @@ func NewHTTPGroupController(vhostRouter *vhost.Routers) *HTTPGroupController {
}
}
func (ctl *HTTPGroupController) Register(proxyName, group, groupKey string,
routeConfig vhost.RouteConfig) (err error) {
func (ctl *HTTPGroupController) Register(
proxyName, group, groupKey string,
routeConfig vhost.RouteConfig,
) (err error) {
indexKey := httpGroupIndex(group, routeConfig.Domain, routeConfig.Location)
indexKey := group
ctl.mu.Lock()
g, ok := ctl.groups[indexKey]
if !ok {
@ -39,8 +44,8 @@ func (ctl *HTTPGroupController) Register(proxyName, group, groupKey string,
return g.Register(proxyName, group, groupKey, routeConfig)
}
func (ctl *HTTPGroupController) UnRegister(proxyName, group, domain, location string) {
indexKey := httpGroupIndex(group, domain, location)
func (ctl *HTTPGroupController) UnRegister(proxyName, group string, routeConfig vhost.RouteConfig) {
indexKey := group
ctl.mu.Lock()
defer ctl.mu.Unlock()
g, ok := ctl.groups[indexKey]
@ -55,11 +60,13 @@ func (ctl *HTTPGroupController) UnRegister(proxyName, group, domain, location st
}
type HTTPGroup struct {
group string
groupKey string
domain string
location string
group string
groupKey string
domain string
location string
routeByHTTPUser string
// CreateConnFuncs indexed by echo proxy name
createFuncs map[string]vhost.CreateConnFunc
pxyNames []string
index uint64
@ -75,8 +82,10 @@ func NewHTTPGroup(ctl *HTTPGroupController) *HTTPGroup {
}
}
func (g *HTTPGroup) Register(proxyName, group, groupKey string,
routeConfig vhost.RouteConfig) (err error) {
func (g *HTTPGroup) Register(
proxyName, group, groupKey string,
routeConfig vhost.RouteConfig,
) (err error) {
g.mu.Lock()
defer g.mu.Unlock()
@ -84,7 +93,7 @@ func (g *HTTPGroup) Register(proxyName, group, groupKey string,
// the first proxy in this group
tmp := routeConfig // copy object
tmp.CreateConnFn = g.createConn
err = g.ctl.vhostRouter.Add(routeConfig.Domain, routeConfig.Location, &tmp)
err = g.ctl.vhostRouter.Add(routeConfig.Domain, routeConfig.Location, routeConfig.RouteByHTTPUser, &tmp)
if err != nil {
return
}
@ -93,8 +102,10 @@ func (g *HTTPGroup) Register(proxyName, group, groupKey string,
g.groupKey = groupKey
g.domain = routeConfig.Domain
g.location = routeConfig.Location
g.routeByHTTPUser = routeConfig.RouteByHTTPUser
} else {
if g.group != group || g.domain != routeConfig.Domain || g.location != routeConfig.Location {
if g.group != group || g.domain != routeConfig.Domain ||
g.location != routeConfig.Location || g.routeByHTTPUser != routeConfig.RouteByHTTPUser {
err = ErrGroupParamsInvalid
return
}
@ -125,7 +136,7 @@ func (g *HTTPGroup) UnRegister(proxyName string) (isEmpty bool) {
if len(g.createFuncs) == 0 {
isEmpty = true
g.ctl.vhostRouter.Del(g.domain, g.location)
g.ctl.vhostRouter.Del(g.domain, g.location, g.routeByHTTPUser)
}
return
}
@ -138,6 +149,7 @@ func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) {
group := g.group
domain := g.domain
location := g.location
routeByHTTPUser := g.routeByHTTPUser
if len(g.pxyNames) > 0 {
name := g.pxyNames[int(newIndex)%len(g.pxyNames)]
f, _ = g.createFuncs[name]
@ -145,12 +157,9 @@ func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) {
g.mu.RUnlock()
if f == nil {
return nil, fmt.Errorf("no CreateConnFunc for http group [%s], domain [%s], location [%s]", group, domain, location)
return nil, fmt.Errorf("no CreateConnFunc for http group [%s], domain [%s], location [%s], routeByHTTPUser [%s]",
group, domain, location, routeByHTTPUser)
}
return f(remoteAddr)
}
func httpGroupIndex(group, domain, location string) string {
return fmt.Sprintf("%s_%s_%s", group, domain, location)
}

View File

@ -46,8 +46,11 @@ func NewTCPMuxGroupCtl(tcpMuxHTTPConnectMuxer *tcpmux.HTTPConnectTCPMuxer) *TCPM
// Listen is the wrapper for TCPMuxGroup's Listen
// If there are no group, we will create one here
func (tmgc *TCPMuxGroupCtl) Listen(ctx context.Context, multiplexer string, group string, groupKey string,
domain string) (l net.Listener, err error) {
func (tmgc *TCPMuxGroupCtl) Listen(
ctx context.Context,
multiplexer, group, groupKey string,
routeConfig vhost.RouteConfig,
) (l net.Listener, err error) {
tmgc.mu.Lock()
tcpMuxGroup, ok := tmgc.groups[group]
@ -59,7 +62,7 @@ func (tmgc *TCPMuxGroupCtl) Listen(ctx context.Context, multiplexer string, grou
switch multiplexer {
case consts.HTTPConnectTCPMultiplexer:
return tcpMuxGroup.HTTPConnectListen(ctx, group, groupKey, domain)
return tcpMuxGroup.HTTPConnectListen(ctx, group, groupKey, routeConfig)
default:
err = fmt.Errorf("unknown multiplexer [%s]", multiplexer)
return
@ -75,9 +78,10 @@ func (tmgc *TCPMuxGroupCtl) RemoveGroup(group string) {
// TCPMuxGroup route connections to different proxies
type TCPMuxGroup struct {
group string
groupKey string
domain string
group string
groupKey string
domain string
routeByHTTPUser string
acceptCh chan net.Conn
index uint64
@ -99,15 +103,17 @@ func NewTCPMuxGroup(ctl *TCPMuxGroupCtl) *TCPMuxGroup {
// Listen will return a new TCPMuxGroupListener
// if TCPMuxGroup already has a listener, just add a new TCPMuxGroupListener to the queues
// otherwise, listen on the real address
func (tmg *TCPMuxGroup) HTTPConnectListen(ctx context.Context, group string, groupKey string, domain string) (ln *TCPMuxGroupListener, err error) {
func (tmg *TCPMuxGroup) HTTPConnectListen(
ctx context.Context,
group, groupKey string,
routeConfig vhost.RouteConfig,
) (ln *TCPMuxGroupListener, err error) {
tmg.mu.Lock()
defer tmg.mu.Unlock()
if len(tmg.lns) == 0 {
// the first listener, listen on the real address
routeConfig := &vhost.RouteConfig{
Domain: domain,
}
tcpMuxLn, errRet := tmg.ctl.tcpMuxHTTPConnectMuxer.Listen(ctx, routeConfig)
tcpMuxLn, errRet := tmg.ctl.tcpMuxHTTPConnectMuxer.Listen(ctx, &routeConfig)
if errRet != nil {
return nil, errRet
}
@ -115,7 +121,8 @@ func (tmg *TCPMuxGroup) HTTPConnectListen(ctx context.Context, group string, gro
tmg.group = group
tmg.groupKey = groupKey
tmg.domain = domain
tmg.domain = routeConfig.Domain
tmg.routeByHTTPUser = routeConfig.RouteByHTTPUser
tmg.tcpMuxLn = tcpMuxLn
tmg.lns = append(tmg.lns, ln)
if tmg.acceptCh == nil {
@ -123,8 +130,8 @@ func (tmg *TCPMuxGroup) HTTPConnectListen(ctx context.Context, group string, gro
}
go tmg.worker()
} else {
// domain in the same group must be equal
if tmg.group != group || tmg.domain != domain {
// route config in the same group must be equal
if tmg.group != group || tmg.domain != routeConfig.Domain || tmg.routeByHTTPUser != routeConfig.RouteByHTTPUser {
return nil, ErrGroupParamsInvalid
}
if tmg.groupKey != groupKey {

View File

@ -38,11 +38,12 @@ type HTTPProxy struct {
func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
xl := pxy.xl
routeConfig := vhost.RouteConfig{
RewriteHost: pxy.cfg.HostHeaderRewrite,
Headers: pxy.cfg.Headers,
Username: pxy.cfg.HTTPUser,
Password: pxy.cfg.HTTPPwd,
CreateConnFn: pxy.GetRealConn,
RewriteHost: pxy.cfg.HostHeaderRewrite,
RouteByHTTPUser: pxy.cfg.RouteByHTTPUser,
Headers: pxy.cfg.Headers,
Username: pxy.cfg.HTTPUser,
Password: pxy.cfg.HTTPPwd,
CreateConnFn: pxy.GetRealConn,
}
locations := pxy.cfg.Locations
@ -65,8 +66,8 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
routeConfig.Domain = domain
for _, location := range locations {
routeConfig.Location = location
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
tmpRouteConfig := routeConfig
// handle group
if pxy.cfg.Group != "" {
@ -76,7 +77,7 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
}
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HTTPGroupCtl.UnRegister(pxy.name, pxy.cfg.Group, tmpDomain, tmpLocation)
pxy.rc.HTTPGroupCtl.UnRegister(pxy.name, pxy.cfg.Group, tmpRouteConfig)
})
} else {
// no group
@ -85,11 +86,12 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
return
}
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HTTPReverseProxy.UnRegister(tmpDomain, tmpLocation)
pxy.rc.HTTPReverseProxy.UnRegister(tmpRouteConfig)
})
}
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, int(pxy.serverCfg.VhostHTTPPort)))
xl.Info("http proxy listen for host [%s] location [%s] group [%s]", routeConfig.Domain, routeConfig.Location, pxy.cfg.Group)
xl.Info("http proxy listen for host [%s] location [%s] group [%s], routeByHTTPUser [%s]",
routeConfig.Domain, routeConfig.Location, pxy.cfg.Group, pxy.cfg.RouteByHTTPUser)
}
}
@ -97,8 +99,8 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
routeConfig.Domain = pxy.cfg.SubDomain + "." + pxy.serverCfg.SubDomainHost
for _, location := range locations {
routeConfig.Location = location
tmpDomain := routeConfig.Domain
tmpLocation := routeConfig.Location
tmpRouteConfig := routeConfig
// handle group
if pxy.cfg.Group != "" {
@ -108,7 +110,7 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
}
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HTTPGroupCtl.UnRegister(pxy.name, pxy.cfg.Group, tmpDomain, tmpLocation)
pxy.rc.HTTPGroupCtl.UnRegister(pxy.name, pxy.cfg.Group, tmpRouteConfig)
})
} else {
err = pxy.rc.HTTPReverseProxy.Register(routeConfig)
@ -116,12 +118,13 @@ func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
return
}
pxy.closeFuncs = append(pxy.closeFuncs, func() {
pxy.rc.HTTPReverseProxy.UnRegister(tmpDomain, tmpLocation)
pxy.rc.HTTPReverseProxy.UnRegister(tmpRouteConfig)
})
}
addrs = append(addrs, util.CanonicalAddr(tmpDomain, pxy.serverCfg.VhostHTTPPort))
addrs = append(addrs, util.CanonicalAddr(tmpRouteConfig.Domain, pxy.serverCfg.VhostHTTPPort))
xl.Info("http proxy listen for host [%s] location [%s] group [%s]", routeConfig.Domain, routeConfig.Location, pxy.cfg.Group)
xl.Info("http proxy listen for host [%s] location [%s] group [%s], routeByHTTPUser [%s]",
routeConfig.Domain, routeConfig.Location, pxy.cfg.Group, pxy.cfg.RouteByHTTPUser)
}
}
remoteAddr = strings.Join(addrs, ",")

View File

@ -30,20 +30,23 @@ type TCPMuxProxy struct {
cfg *config.TCPMuxProxyConf
}
func (pxy *TCPMuxProxy) httpConnectListen(domain string, addrs []string) (_ []string, err error) {
func (pxy *TCPMuxProxy) httpConnectListen(domain, routeByHTTPUser string, addrs []string) ([]string, error) {
var l net.Listener
var err error
routeConfig := &vhost.RouteConfig{
Domain: domain,
RouteByHTTPUser: routeByHTTPUser,
}
if pxy.cfg.Group != "" {
l, err = pxy.rc.TCPMuxGroupCtl.Listen(pxy.ctx, pxy.cfg.Multiplexer, pxy.cfg.Group, pxy.cfg.GroupKey, domain)
l, err = pxy.rc.TCPMuxGroupCtl.Listen(pxy.ctx, pxy.cfg.Multiplexer, pxy.cfg.Group, pxy.cfg.GroupKey, *routeConfig)
} else {
routeConfig := &vhost.RouteConfig{
Domain: domain,
}
l, err = pxy.rc.TCPMuxHTTPConnectMuxer.Listen(pxy.ctx, routeConfig)
}
if err != nil {
return nil, err
}
pxy.xl.Info("tcpmux httpconnect multiplexer listens for host [%s]", domain)
pxy.xl.Info("tcpmux httpconnect multiplexer listens for host [%s], group [%s] routeByHTTPUser [%s]",
domain, pxy.cfg.Group, pxy.cfg.RouteByHTTPUser)
pxy.listeners = append(pxy.listeners, l)
return append(addrs, util.CanonicalAddr(domain, pxy.serverCfg.TCPMuxHTTPConnectPort)), nil
}
@ -55,14 +58,14 @@ func (pxy *TCPMuxProxy) httpConnectRun() (remoteAddr string, err error) {
continue
}
addrs, err = pxy.httpConnectListen(domain, addrs)
addrs, err = pxy.httpConnectListen(domain, pxy.cfg.RouteByHTTPUser, addrs)
if err != nil {
return "", err
}
}
if pxy.cfg.SubDomain != "" {
addrs, err = pxy.httpConnectListen(pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost, addrs)
addrs, err = pxy.httpConnectListen(pxy.cfg.SubDomain+"."+pxy.serverCfg.SubDomainHost, pxy.cfg.RouteByHTTPUser, addrs)
if err != nil {
return "", err
}

View File

@ -131,12 +131,12 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
return
}
svr.rc.TCPMuxHTTPConnectMuxer, err = tcpmux.NewHTTPConnectTCPMuxer(l, vhostReadWriteTimeout)
svr.rc.TCPMuxHTTPConnectMuxer, err = tcpmux.NewHTTPConnectTCPMuxer(l, cfg.TCPMuxPassthrough, vhostReadWriteTimeout)
if err != nil {
err = fmt.Errorf("Create vhost tcpMuxer error, %v", err)
return
}
log.Info("tcpmux httpconnect multiplexer listen on %s", address)
log.Info("tcpmux httpconnect multiplexer listen on %s, passthough: %v", address, cfg.TCPMuxPassthrough)
}
// Init all plugins