mirror of
https://github.com/fatedier/frp.git
synced 2025-01-07 05:51:23 +07:00
bf635c0e90
* add close proxy op * Move to actual closing routine * Fix e2e tests for CloseProxy * Add warning on resource exhaustion * Add CloseProxy to manual close * retuen errors to `CloseProxy` callers
598 lines
14 KiB
Go
598 lines
14 KiB
Go
// Copyright 2017 fatedier, fatedier@gmail.com
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package server
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fatedier/frp/pkg/auth"
|
|
"github.com/fatedier/frp/pkg/config"
|
|
"github.com/fatedier/frp/pkg/consts"
|
|
frpErr "github.com/fatedier/frp/pkg/errors"
|
|
"github.com/fatedier/frp/pkg/msg"
|
|
plugin "github.com/fatedier/frp/pkg/plugin/server"
|
|
"github.com/fatedier/frp/pkg/util/util"
|
|
"github.com/fatedier/frp/pkg/util/version"
|
|
"github.com/fatedier/frp/pkg/util/xlog"
|
|
"github.com/fatedier/frp/server/controller"
|
|
"github.com/fatedier/frp/server/metrics"
|
|
"github.com/fatedier/frp/server/proxy"
|
|
|
|
"github.com/fatedier/golib/control/shutdown"
|
|
"github.com/fatedier/golib/crypto"
|
|
"github.com/fatedier/golib/errors"
|
|
)
|
|
|
|
type ControlManager struct {
|
|
// controls indexed by run id
|
|
ctlsByRunID map[string]*Control
|
|
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func NewControlManager() *ControlManager {
|
|
return &ControlManager{
|
|
ctlsByRunID: make(map[string]*Control),
|
|
}
|
|
}
|
|
|
|
func (cm *ControlManager) Add(runID string, ctl *Control) (oldCtl *Control) {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
|
|
oldCtl, ok := cm.ctlsByRunID[runID]
|
|
if ok {
|
|
oldCtl.Replaced(ctl)
|
|
}
|
|
cm.ctlsByRunID[runID] = ctl
|
|
return
|
|
}
|
|
|
|
// we should make sure if it's the same control to prevent delete a new one
|
|
func (cm *ControlManager) Del(runID string, ctl *Control) {
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
if c, ok := cm.ctlsByRunID[runID]; ok && c == ctl {
|
|
delete(cm.ctlsByRunID, runID)
|
|
}
|
|
}
|
|
|
|
func (cm *ControlManager) GetByID(runID string) (ctl *Control, ok bool) {
|
|
cm.mu.RLock()
|
|
defer cm.mu.RUnlock()
|
|
ctl, ok = cm.ctlsByRunID[runID]
|
|
return
|
|
}
|
|
|
|
type Control struct {
|
|
// all resource managers and controllers
|
|
rc *controller.ResourceController
|
|
|
|
// proxy manager
|
|
pxyManager *proxy.Manager
|
|
|
|
// plugin manager
|
|
pluginManager *plugin.Manager
|
|
|
|
// verifies authentication based on selected method
|
|
authVerifier auth.Verifier
|
|
|
|
// login message
|
|
loginMsg *msg.Login
|
|
|
|
// control connection
|
|
conn net.Conn
|
|
|
|
// put a message in this channel to send it over control connection to client
|
|
sendCh chan (msg.Message)
|
|
|
|
// read from this channel to get the next message sent by client
|
|
readCh chan (msg.Message)
|
|
|
|
// work connections
|
|
workConnCh chan net.Conn
|
|
|
|
// proxies in one client
|
|
proxies map[string]proxy.Proxy
|
|
|
|
// pool count
|
|
poolCount int
|
|
|
|
// ports used, for limitations
|
|
portsUsedNum int
|
|
|
|
// last time got the Ping message
|
|
lastPing time.Time
|
|
|
|
// A new run id will be generated when a new client login.
|
|
// If run id got from login message has same run id, it means it's the same client, so we can
|
|
// replace old controller instantly.
|
|
runID string
|
|
|
|
// control status
|
|
status string
|
|
|
|
readerShutdown *shutdown.Shutdown
|
|
writerShutdown *shutdown.Shutdown
|
|
managerShutdown *shutdown.Shutdown
|
|
allShutdown *shutdown.Shutdown
|
|
|
|
mu sync.RWMutex
|
|
|
|
// Server configuration information
|
|
serverCfg config.ServerCommonConf
|
|
|
|
xl *xlog.Logger
|
|
ctx context.Context
|
|
}
|
|
|
|
func NewControl(
|
|
ctx context.Context,
|
|
rc *controller.ResourceController,
|
|
pxyManager *proxy.Manager,
|
|
pluginManager *plugin.Manager,
|
|
authVerifier auth.Verifier,
|
|
ctlConn net.Conn,
|
|
loginMsg *msg.Login,
|
|
serverCfg config.ServerCommonConf,
|
|
) *Control {
|
|
|
|
poolCount := loginMsg.PoolCount
|
|
if poolCount > int(serverCfg.MaxPoolCount) {
|
|
poolCount = int(serverCfg.MaxPoolCount)
|
|
}
|
|
return &Control{
|
|
rc: rc,
|
|
pxyManager: pxyManager,
|
|
pluginManager: pluginManager,
|
|
authVerifier: authVerifier,
|
|
conn: ctlConn,
|
|
loginMsg: loginMsg,
|
|
sendCh: make(chan msg.Message, 10),
|
|
readCh: make(chan msg.Message, 10),
|
|
workConnCh: make(chan net.Conn, poolCount+10),
|
|
proxies: make(map[string]proxy.Proxy),
|
|
poolCount: poolCount,
|
|
portsUsedNum: 0,
|
|
lastPing: time.Now(),
|
|
runID: loginMsg.RunID,
|
|
status: consts.Working,
|
|
readerShutdown: shutdown.New(),
|
|
writerShutdown: shutdown.New(),
|
|
managerShutdown: shutdown.New(),
|
|
allShutdown: shutdown.New(),
|
|
serverCfg: serverCfg,
|
|
xl: xlog.FromContextSafe(ctx),
|
|
ctx: ctx,
|
|
}
|
|
}
|
|
|
|
// Start send a login success message to client and start working.
|
|
func (ctl *Control) Start() {
|
|
loginRespMsg := &msg.LoginResp{
|
|
Version: version.Full(),
|
|
RunID: ctl.runID,
|
|
ServerUDPPort: ctl.serverCfg.BindUDPPort,
|
|
Error: "",
|
|
}
|
|
msg.WriteMsg(ctl.conn, loginRespMsg)
|
|
|
|
go ctl.writer()
|
|
for i := 0; i < ctl.poolCount; i++ {
|
|
ctl.sendCh <- &msg.ReqWorkConn{}
|
|
}
|
|
|
|
go ctl.manager()
|
|
go ctl.reader()
|
|
go ctl.stoper()
|
|
}
|
|
|
|
func (ctl *Control) RegisterWorkConn(conn net.Conn) error {
|
|
xl := ctl.xl
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
xl.Error("panic error: %v", err)
|
|
xl.Error(string(debug.Stack()))
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case ctl.workConnCh <- conn:
|
|
xl.Debug("new work connection registered")
|
|
return nil
|
|
default:
|
|
xl.Debug("work connection pool is full, discarding")
|
|
return fmt.Errorf("work connection pool is full, discarding")
|
|
}
|
|
}
|
|
|
|
// When frps get one user connection, we get one work connection from the pool and return it.
|
|
// If no workConn available in the pool, send message to frpc to get one or more
|
|
// and wait until it is available.
|
|
// return an error if wait timeout
|
|
func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
|
|
xl := ctl.xl
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
xl.Error("panic error: %v", err)
|
|
xl.Error(string(debug.Stack()))
|
|
}
|
|
}()
|
|
|
|
var ok bool
|
|
// get a work connection from the pool
|
|
select {
|
|
case workConn, ok = <-ctl.workConnCh:
|
|
if !ok {
|
|
err = frpErr.ErrCtlClosed
|
|
return
|
|
}
|
|
xl.Debug("get work connection from pool")
|
|
default:
|
|
// no work connections available in the poll, send message to frpc to get more
|
|
if err = errors.PanicToError(func() {
|
|
ctl.sendCh <- &msg.ReqWorkConn{}
|
|
}); err != nil {
|
|
return nil, fmt.Errorf("control is already closed")
|
|
}
|
|
|
|
select {
|
|
case workConn, ok = <-ctl.workConnCh:
|
|
if !ok {
|
|
err = frpErr.ErrCtlClosed
|
|
xl.Warn("no work connections avaiable, %v", err)
|
|
return
|
|
}
|
|
|
|
case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):
|
|
err = fmt.Errorf("timeout trying to get work connection")
|
|
xl.Warn("%v", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// When we get a work connection from pool, replace it with a new one.
|
|
errors.PanicToError(func() {
|
|
ctl.sendCh <- &msg.ReqWorkConn{}
|
|
})
|
|
return
|
|
}
|
|
|
|
func (ctl *Control) Replaced(newCtl *Control) {
|
|
xl := ctl.xl
|
|
xl.Info("Replaced by client [%s]", newCtl.runID)
|
|
ctl.runID = ""
|
|
ctl.allShutdown.Start()
|
|
}
|
|
|
|
func (ctl *Control) writer() {
|
|
xl := ctl.xl
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
xl.Error("panic error: %v", err)
|
|
xl.Error(string(debug.Stack()))
|
|
}
|
|
}()
|
|
|
|
defer ctl.allShutdown.Start()
|
|
defer ctl.writerShutdown.Done()
|
|
|
|
encWriter, err := crypto.NewWriter(ctl.conn, []byte(ctl.serverCfg.Token))
|
|
if err != nil {
|
|
xl.Error("crypto new writer error: %v", err)
|
|
ctl.allShutdown.Start()
|
|
return
|
|
}
|
|
for {
|
|
m, ok := <-ctl.sendCh
|
|
if !ok {
|
|
xl.Info("control writer is closing")
|
|
return
|
|
}
|
|
|
|
if err := msg.WriteMsg(encWriter, m); err != nil {
|
|
xl.Warn("write message to control connection error: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ctl *Control) reader() {
|
|
xl := ctl.xl
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
xl.Error("panic error: %v", err)
|
|
xl.Error(string(debug.Stack()))
|
|
}
|
|
}()
|
|
|
|
defer ctl.allShutdown.Start()
|
|
defer ctl.readerShutdown.Done()
|
|
|
|
encReader := crypto.NewReader(ctl.conn, []byte(ctl.serverCfg.Token))
|
|
for {
|
|
m, err := msg.ReadMsg(encReader)
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
xl.Debug("control connection closed")
|
|
return
|
|
}
|
|
xl.Warn("read error: %v", err)
|
|
ctl.conn.Close()
|
|
return
|
|
}
|
|
|
|
ctl.readCh <- m
|
|
}
|
|
}
|
|
|
|
func (ctl *Control) stoper() {
|
|
xl := ctl.xl
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
xl.Error("panic error: %v", err)
|
|
xl.Error(string(debug.Stack()))
|
|
}
|
|
}()
|
|
|
|
ctl.allShutdown.WaitStart()
|
|
|
|
ctl.conn.Close()
|
|
ctl.readerShutdown.WaitDone()
|
|
|
|
close(ctl.readCh)
|
|
ctl.managerShutdown.WaitDone()
|
|
|
|
close(ctl.sendCh)
|
|
ctl.writerShutdown.WaitDone()
|
|
|
|
ctl.mu.Lock()
|
|
defer ctl.mu.Unlock()
|
|
|
|
close(ctl.workConnCh)
|
|
for workConn := range ctl.workConnCh {
|
|
workConn.Close()
|
|
}
|
|
|
|
for _, pxy := range ctl.proxies {
|
|
pxy.Close()
|
|
ctl.pxyManager.Del(pxy.GetName())
|
|
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
|
|
|
|
notifyContent := &plugin.CloseProxyContent{
|
|
User: plugin.UserInfo{
|
|
User: ctl.loginMsg.User,
|
|
Metas: ctl.loginMsg.Metas,
|
|
RunID: ctl.loginMsg.RunID,
|
|
},
|
|
CloseProxy: msg.CloseProxy{
|
|
ProxyName: pxy.GetName(),
|
|
},
|
|
}
|
|
go func() {
|
|
ctl.pluginManager.CloseProxy(notifyContent)
|
|
}()
|
|
}
|
|
|
|
ctl.allShutdown.Done()
|
|
xl.Info("client exit success")
|
|
metrics.Server.CloseClient()
|
|
}
|
|
|
|
// block until Control closed
|
|
func (ctl *Control) WaitClosed() {
|
|
ctl.allShutdown.WaitDone()
|
|
}
|
|
|
|
func (ctl *Control) manager() {
|
|
xl := ctl.xl
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
xl.Error("panic error: %v", err)
|
|
xl.Error(string(debug.Stack()))
|
|
}
|
|
}()
|
|
|
|
defer ctl.allShutdown.Start()
|
|
defer ctl.managerShutdown.Done()
|
|
|
|
var heartbeatCh <-chan time.Time
|
|
if ctl.serverCfg.TCPMux || ctl.serverCfg.HeartbeatTimeout <= 0 {
|
|
// Don't need application heartbeat here.
|
|
// yamux will do same thing.
|
|
} else {
|
|
heartbeat := time.NewTicker(time.Second)
|
|
defer heartbeat.Stop()
|
|
heartbeatCh = heartbeat.C
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-heartbeatCh:
|
|
if time.Since(ctl.lastPing) > time.Duration(ctl.serverCfg.HeartbeatTimeout)*time.Second {
|
|
xl.Warn("heartbeat timeout")
|
|
return
|
|
}
|
|
case rawMsg, ok := <-ctl.readCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
switch m := rawMsg.(type) {
|
|
case *msg.NewProxy:
|
|
content := &plugin.NewProxyContent{
|
|
User: plugin.UserInfo{
|
|
User: ctl.loginMsg.User,
|
|
Metas: ctl.loginMsg.Metas,
|
|
RunID: ctl.loginMsg.RunID,
|
|
},
|
|
NewProxy: *m,
|
|
}
|
|
var remoteAddr string
|
|
retContent, err := ctl.pluginManager.NewProxy(content)
|
|
if err == nil {
|
|
m = &retContent.NewProxy
|
|
remoteAddr, err = ctl.RegisterProxy(m)
|
|
}
|
|
|
|
// register proxy in this control
|
|
resp := &msg.NewProxyResp{
|
|
ProxyName: m.ProxyName,
|
|
}
|
|
if err != nil {
|
|
xl.Warn("new proxy [%s] error: %v", m.ProxyName, err)
|
|
resp.Error = util.GenerateResponseErrorString(fmt.Sprintf("new proxy [%s] error", m.ProxyName), err, ctl.serverCfg.DetailedErrorsToClient)
|
|
} else {
|
|
resp.RemoteAddr = remoteAddr
|
|
xl.Info("new proxy [%s] success", m.ProxyName)
|
|
metrics.Server.NewProxy(m.ProxyName, m.ProxyType)
|
|
}
|
|
ctl.sendCh <- resp
|
|
case *msg.CloseProxy:
|
|
ctl.CloseProxy(m)
|
|
xl.Info("close proxy [%s] success", m.ProxyName)
|
|
case *msg.Ping:
|
|
content := &plugin.PingContent{
|
|
User: plugin.UserInfo{
|
|
User: ctl.loginMsg.User,
|
|
Metas: ctl.loginMsg.Metas,
|
|
RunID: ctl.loginMsg.RunID,
|
|
},
|
|
Ping: *m,
|
|
}
|
|
retContent, err := ctl.pluginManager.Ping(content)
|
|
if err == nil {
|
|
m = &retContent.Ping
|
|
err = ctl.authVerifier.VerifyPing(m)
|
|
}
|
|
if err != nil {
|
|
xl.Warn("received invalid ping: %v", err)
|
|
ctl.sendCh <- &msg.Pong{
|
|
Error: util.GenerateResponseErrorString("invalid ping", err, ctl.serverCfg.DetailedErrorsToClient),
|
|
}
|
|
return
|
|
}
|
|
ctl.lastPing = time.Now()
|
|
xl.Debug("receive heartbeat")
|
|
ctl.sendCh <- &msg.Pong{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
|
|
var pxyConf config.ProxyConf
|
|
// Load configures from NewProxy message and check.
|
|
pxyConf, err = config.NewProxyConfFromMsg(pxyMsg, ctl.serverCfg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// User info
|
|
userInfo := plugin.UserInfo{
|
|
User: ctl.loginMsg.User,
|
|
Metas: ctl.loginMsg.Metas,
|
|
RunID: ctl.runID,
|
|
}
|
|
|
|
// NewProxy will return a interface Proxy.
|
|
// In fact it create different proxies by different proxy type, we just call run() here.
|
|
pxy, err := proxy.NewProxy(ctl.ctx, userInfo, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg)
|
|
if err != nil {
|
|
return remoteAddr, err
|
|
}
|
|
|
|
// Check ports used number in each client
|
|
if ctl.serverCfg.MaxPortsPerClient > 0 {
|
|
ctl.mu.Lock()
|
|
if ctl.portsUsedNum+pxy.GetUsedPortsNum() > int(ctl.serverCfg.MaxPortsPerClient) {
|
|
ctl.mu.Unlock()
|
|
err = fmt.Errorf("exceed the max_ports_per_client")
|
|
return
|
|
}
|
|
ctl.portsUsedNum = ctl.portsUsedNum + pxy.GetUsedPortsNum()
|
|
ctl.mu.Unlock()
|
|
|
|
defer func() {
|
|
if err != nil {
|
|
ctl.mu.Lock()
|
|
ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
|
|
ctl.mu.Unlock()
|
|
}
|
|
}()
|
|
}
|
|
|
|
remoteAddr, err = pxy.Run()
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
pxy.Close()
|
|
}
|
|
}()
|
|
|
|
err = ctl.pxyManager.Add(pxyMsg.ProxyName, pxy)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
ctl.mu.Lock()
|
|
ctl.proxies[pxy.GetName()] = pxy
|
|
ctl.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
|
|
ctl.mu.Lock()
|
|
pxy, ok := ctl.proxies[closeMsg.ProxyName]
|
|
if !ok {
|
|
ctl.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
if ctl.serverCfg.MaxPortsPerClient > 0 {
|
|
ctl.portsUsedNum = ctl.portsUsedNum - pxy.GetUsedPortsNum()
|
|
}
|
|
pxy.Close()
|
|
ctl.pxyManager.Del(pxy.GetName())
|
|
delete(ctl.proxies, closeMsg.ProxyName)
|
|
ctl.mu.Unlock()
|
|
|
|
metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConf().GetBaseInfo().ProxyType)
|
|
|
|
notifyContent := &plugin.CloseProxyContent{
|
|
User: plugin.UserInfo{
|
|
User: ctl.loginMsg.User,
|
|
Metas: ctl.loginMsg.Metas,
|
|
RunID: ctl.loginMsg.RunID,
|
|
},
|
|
CloseProxy: msg.CloseProxy{
|
|
ProxyName: pxy.GetName(),
|
|
},
|
|
}
|
|
go func() {
|
|
ctl.pluginManager.CloseProxy(notifyContent)
|
|
}()
|
|
|
|
return
|
|
}
|