support http load balancing

This commit is contained in:
fatedier
2019-07-31 00:41:58 +08:00
parent 5796c27ed5
commit b3ed863021
11 changed files with 329 additions and 110 deletions

View File

@ -24,46 +24,47 @@ import (
gerr "github.com/fatedier/golib/errors"
)
type TcpGroupListener struct {
groupName string
group *TcpGroup
// TcpGroupCtl manage all TcpGroups
type TcpGroupCtl struct {
groups map[string]*TcpGroup
addr net.Addr
closeCh chan struct{}
// portManager is used to manage port
portManager *ports.PortManager
mu sync.Mutex
}
func newTcpGroupListener(name string, group *TcpGroup, addr net.Addr) *TcpGroupListener {
return &TcpGroupListener{
groupName: name,
group: group,
addr: addr,
closeCh: make(chan struct{}),
// NewTcpGroupCtl return a new TcpGroupCtl
func NewTcpGroupCtl(portManager *ports.PortManager) *TcpGroupCtl {
return &TcpGroupCtl{
groups: make(map[string]*TcpGroup),
portManager: portManager,
}
}
func (ln *TcpGroupListener) Accept() (c net.Conn, err error) {
var ok bool
select {
case <-ln.closeCh:
return nil, ErrListenerClosed
case c, ok = <-ln.group.Accept():
if !ok {
return nil, ErrListenerClosed
}
return c, nil
// Listen is the wrapper for TcpGroup's Listen
// If there are no group, we will create one here
func (tgc *TcpGroupCtl) Listen(proxyName string, group string, groupKey string,
addr string, port int) (l net.Listener, realPort int, err error) {
tgc.mu.Lock()
tcpGroup, ok := tgc.groups[group]
if !ok {
tcpGroup = NewTcpGroup(tgc)
tgc.groups[group] = tcpGroup
}
tgc.mu.Unlock()
return tcpGroup.Listen(proxyName, group, groupKey, addr, port)
}
func (ln *TcpGroupListener) Addr() net.Addr {
return ln.addr
}
func (ln *TcpGroupListener) Close() (err error) {
close(ln.closeCh)
ln.group.CloseListener(ln)
return
// RemoveGroup remove TcpGroup from controller
func (tgc *TcpGroupCtl) RemoveGroup(group string) {
tgc.mu.Lock()
defer tgc.mu.Unlock()
delete(tgc.groups, group)
}
// TcpGroup route connections to different proxies
type TcpGroup struct {
group string
groupKey string
@ -79,6 +80,7 @@ type TcpGroup struct {
mu sync.Mutex
}
// NewTcpGroup return a new TcpGroup
func NewTcpGroup(ctl *TcpGroupCtl) *TcpGroup {
return &TcpGroup{
lns: make([]*TcpGroupListener, 0),
@ -87,10 +89,14 @@ func NewTcpGroup(ctl *TcpGroupCtl) *TcpGroup {
}
}
// Listen will return a new TcpGroupListener
// if TcpGroup already has a listener, just add a new TcpGroupListener to the queues
// otherwise, listen on the real address
func (tg *TcpGroup) Listen(proxyName string, group string, groupKey string, addr string, port int) (ln *TcpGroupListener, realPort int, err error) {
tg.mu.Lock()
defer tg.mu.Unlock()
if len(tg.lns) == 0 {
// the first listener, listen on the real address
realPort, err = tg.ctl.portManager.Acquire(proxyName, port)
if err != nil {
return
@ -114,6 +120,7 @@ func (tg *TcpGroup) Listen(proxyName string, group string, groupKey string, addr
}
go tg.worker()
} else {
// address and port in the same group must be equal
if tg.group != group || tg.addr != addr {
err = ErrGroupParamsInvalid
return
@ -133,6 +140,7 @@ func (tg *TcpGroup) Listen(proxyName string, group string, groupKey string, addr
return
}
// worker is called when the real tcp listener has been created
func (tg *TcpGroup) worker() {
for {
c, err := tg.tcpLn.Accept()
@ -152,6 +160,7 @@ func (tg *TcpGroup) Accept() <-chan net.Conn {
return tg.acceptCh
}
// CloseListener remove the TcpGroupListener from the TcpGroup
func (tg *TcpGroup) CloseListener(ln *TcpGroupListener) {
tg.mu.Lock()
defer tg.mu.Unlock()
@ -169,36 +178,47 @@ func (tg *TcpGroup) CloseListener(ln *TcpGroupListener) {
}
}
type TcpGroupCtl struct {
groups map[string]*TcpGroup
// TcpGroupListener
type TcpGroupListener struct {
groupName string
group *TcpGroup
portManager *ports.PortManager
mu sync.Mutex
addr net.Addr
closeCh chan struct{}
}
func NewTcpGroupCtl(portManager *ports.PortManager) *TcpGroupCtl {
return &TcpGroupCtl{
groups: make(map[string]*TcpGroup),
portManager: portManager,
func newTcpGroupListener(name string, group *TcpGroup, addr net.Addr) *TcpGroupListener {
return &TcpGroupListener{
groupName: name,
group: group,
addr: addr,
closeCh: make(chan struct{}),
}
}
func (tgc *TcpGroupCtl) Listen(proxyNanme string, group string, groupKey string,
addr string, port int) (l net.Listener, realPort int, err error) {
tgc.mu.Lock()
defer tgc.mu.Unlock()
if tcpGroup, ok := tgc.groups[group]; ok {
return tcpGroup.Listen(proxyNanme, group, groupKey, addr, port)
} else {
tcpGroup = NewTcpGroup(tgc)
tgc.groups[group] = tcpGroup
return tcpGroup.Listen(proxyNanme, group, groupKey, addr, port)
// Accept will accept connections from TcpGroup
func (ln *TcpGroupListener) Accept() (c net.Conn, err error) {
var ok bool
select {
case <-ln.closeCh:
return nil, ErrListenerClosed
case c, ok = <-ln.group.Accept():
if !ok {
return nil, ErrListenerClosed
}
return c, nil
}
}
func (tgc *TcpGroupCtl) RemoveGroup(group string) {
tgc.mu.Lock()
defer tgc.mu.Unlock()
delete(tgc.groups, group)
func (ln *TcpGroupListener) Addr() net.Addr {
return ln.addr
}
// Close close the listener
func (ln *TcpGroupListener) Close() (err error) {
close(ln.closeCh)
// remove self from TcpGroup
ln.group.CloseListener(ln)
return
}