start refactoring

This commit is contained in:
fatedier
2017-03-09 02:03:47 +08:00
parent b006540141
commit 88083d21e8
115 changed files with 5515 additions and 3491 deletions

332
server/control.go Normal file
View File

@ -0,0 +1,332 @@
package server
import (
"fmt"
"io"
"sync"
"time"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/consts"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/errors"
"github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/shutdown"
"github.com/fatedier/frp/utils/version"
)
type Control struct {
// frps service
svr *Service
// login message
loginMsg *msg.Login
// control connection
conn net.Conn
// put a message in this channel to send it over control connection to client
sendCh chan (msg.Message)
// read from this channel to get the next message sent by client
readCh chan (msg.Message)
// work connections
workConnCh chan net.Conn
// proxies in one client
proxies []Proxy
// pool count
poolCount int
// last time got the Ping message
lastPing time.Time
// A new run id will be generated when a new client login.
// If run id got from login message has same run id, it means it's the same client, so we can
// replace old controller instantly.
runId string
// control status
status string
readerShutdown *shutdown.Shutdown
writerShutdown *shutdown.Shutdown
managerShutdown *shutdown.Shutdown
allShutdown *shutdown.Shutdown
mu sync.RWMutex
}
func NewControl(svr *Service, ctlConn net.Conn, loginMsg *msg.Login) *Control {
return &Control{
svr: svr,
conn: ctlConn,
loginMsg: loginMsg,
sendCh: make(chan msg.Message, 10),
readCh: make(chan msg.Message, 10),
workConnCh: make(chan net.Conn, loginMsg.PoolCount+10),
proxies: make([]Proxy, 0),
poolCount: loginMsg.PoolCount,
lastPing: time.Now(),
runId: loginMsg.RunId,
status: consts.Working,
readerShutdown: shutdown.New(),
writerShutdown: shutdown.New(),
managerShutdown: shutdown.New(),
allShutdown: shutdown.New(),
}
}
// Start send a login success message to client and start working.
func (ctl *Control) Start() {
go ctl.writer()
ctl.sendCh <- &msg.LoginResp{
Version: version.Full(),
RunId: ctl.runId,
Error: "",
}
for i := 0; i < ctl.poolCount; i++ {
ctl.sendCh <- &msg.ReqWorkConn{}
}
go ctl.manager()
go ctl.reader()
go ctl.stoper()
}
func (ctl *Control) RegisterWorkConn(conn net.Conn) {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
select {
case ctl.workConnCh <- conn:
ctl.conn.Debug("new work connection registered.")
default:
ctl.conn.Debug("work connection pool is full, discarding.")
conn.Close()
}
}
// When frps get one user connection, we get one work connection from the pool and return it.
// If no workConn available in the pool, send message to frpc to get one or more
// and wait until it is available.
// return an error if wait timeout
func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
var ok bool
// get a work connection from the pool
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
err = fmt.Errorf("no work connections available, control is closing")
return
}
ctl.conn.Debug("get work connection from pool")
default:
// no work connections available in the poll, send message to frpc to get more
err = errors.PanicToError(func() {
ctl.sendCh <- &msg.ReqWorkConn{}
})
if err != nil {
ctl.conn.Error("%v", err)
return
}
select {
case workConn, ok = <-ctl.workConnCh:
if !ok {
err = fmt.Errorf("no work connections available, control is closing")
ctl.conn.Warn("%v", err)
return
}
case <-time.After(time.Duration(config.ServerCommonCfg.UserConnTimeout) * time.Second):
err = fmt.Errorf("timeout trying to get work connection")
ctl.conn.Warn("%v", err)
return
}
}
// When we get a work connection from pool, replace it with a new one.
errors.PanicToError(func() {
ctl.sendCh <- &msg.ReqWorkConn{}
})
return
}
func (ctl *Control) Replaced(newCtl *Control) {
ctl.conn.Info("Replaced by client [%s]", newCtl.runId)
ctl.runId = ""
ctl.allShutdown.Start()
}
func (ctl *Control) writer() {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
defer ctl.allShutdown.Start()
defer ctl.writerShutdown.Done()
for {
if m, ok := <-ctl.sendCh; !ok {
ctl.conn.Info("control writer is closing")
return
} else {
if err := msg.WriteMsg(ctl.conn, m); err != nil {
ctl.conn.Warn("write message to control connection error: %v", err)
return
}
}
}
}
func (ctl *Control) reader() {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
defer ctl.allShutdown.Start()
defer ctl.readerShutdown.Done()
for {
if m, err := msg.ReadMsg(ctl.conn); err != nil {
if err == io.EOF {
ctl.conn.Debug("control connection closed")
return
} else {
ctl.conn.Warn("read error: %v", err)
return
}
} else {
ctl.readCh <- m
}
}
}
func (ctl *Control) stoper() {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
ctl.allShutdown.WaitStart()
close(ctl.readCh)
ctl.managerShutdown.WaitDown()
close(ctl.sendCh)
ctl.writerShutdown.WaitDown()
ctl.conn.Close()
close(ctl.workConnCh)
for workConn := range ctl.workConnCh {
workConn.Close()
}
for _, pxy := range ctl.proxies {
ctl.svr.DelProxy(pxy.GetName())
pxy.Close()
}
ctl.allShutdown.Done()
ctl.conn.Info("all shutdown success")
}
func (ctl *Control) manager() {
defer func() {
if err := recover(); err != nil {
ctl.conn.Error("panic error: %v", err)
}
}()
defer ctl.allShutdown.Start()
defer ctl.managerShutdown.Done()
heartbeat := time.NewTicker(time.Second)
defer heartbeat.Stop()
for {
select {
case <-heartbeat.C:
if time.Since(ctl.lastPing) > time.Duration(config.ServerCommonCfg.HeartBeatTimeout)*time.Second {
ctl.conn.Warn("heartbeat timeout")
ctl.allShutdown.Start()
}
case rawMsg, ok := <-ctl.readCh:
if !ok {
return
}
switch m := rawMsg.(type) {
case *msg.NewProxy:
// register proxy in this control
err := ctl.RegisterProxy(m)
resp := &msg.NewProxyResp{
ProxyName: m.ProxyName,
}
if err != nil {
resp.Error = err.Error()
ctl.conn.Warn("new proxy [%s] error: %v", m.ProxyName, err)
} else {
ctl.conn.Info("new proxy [%s] success", m.ProxyName)
}
ctl.sendCh <- resp
case *msg.Ping:
ctl.lastPing = time.Now()
ctl.sendCh <- &msg.Pong{}
}
}
}
}
func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (err error) {
var pxyConf config.ProxyConf
// Load configures from NewProxy message and check.
pxyConf, err = config.NewProxyConf(pxyMsg)
if err != nil {
return err
}
// NewProxy will return a interface Proxy.
// In fact it create different proxies by different proxy type, we just call run() here.
pxy, err := NewProxy(ctl, pxyConf)
if err != nil {
return err
}
err = pxy.Run()
if err != nil {
return err
}
defer func() {
if err != nil {
pxy.Close()
}
}()
err = ctl.svr.RegisterProxy(pxyMsg.ProxyName, pxy)
if err != nil {
return err
}
ctl.proxies = append(ctl.proxies, pxy)
return nil
}

103
server/dashboard.go Normal file
View File

@ -0,0 +1,103 @@
// Copyright 2016 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"encoding/base64"
"fmt"
"net"
"net/http"
"strings"
"time"
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/models/config"
)
var (
httpServerReadTimeout = 10 * time.Second
httpServerWriteTimeout = 10 * time.Second
)
func RunDashboardServer(addr string, port int64) (err error) {
// url router
mux := http.NewServeMux()
// api, see dashboard_api.go
//mux.HandleFunc("/api/reload", use(apiReload, basicAuth))
//mux.HandleFunc("/api/proxies", apiProxies)
// view, see dashboard_view.go
mux.Handle("/favicon.ico", http.FileServer(assets.FileSystem))
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(assets.FileSystem)))
//mux.HandleFunc("/", use(viewDashboard, basicAuth))
address := fmt.Sprintf("%s:%d", addr, port)
server := &http.Server{
Addr: address,
Handler: mux,
ReadTimeout: httpServerReadTimeout,
WriteTimeout: httpServerWriteTimeout,
}
if address == "" {
address = ":http"
}
ln, err := net.Listen("tcp", address)
if err != nil {
return err
}
go server.Serve(ln)
return
}
func use(h http.HandlerFunc, middleware ...func(http.HandlerFunc) http.HandlerFunc) http.HandlerFunc {
for _, m := range middleware {
h = m(h)
}
return h
}
func basicAuth(h http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("WWW-Authenticate", `Basic realm="Restricted"`)
s := strings.SplitN(r.Header.Get("Authorization"), " ", 2)
if len(s) != 2 {
http.Error(w, "Not authorized", 401)
return
}
b, err := base64.StdEncoding.DecodeString(s[1])
if err != nil {
http.Error(w, err.Error(), 401)
return
}
pair := strings.SplitN(string(b), ":", 2)
if len(pair) != 2 {
http.Error(w, "Not authorized", 401)
return
}
if pair[0] != config.ServerCommonCfg.DashboardUser || pair[1] != config.ServerCommonCfg.DashboardPwd {
http.Error(w, "Not authorized", 401)
return
}
h.ServeHTTP(w, r)
}
}

69
server/dashboard_api.go Normal file
View File

@ -0,0 +1,69 @@
// Copyright 2016 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
/*
import (
"encoding/json"
"fmt"
"net/http"
"github.com/fatedier/frp/src/models/metric"
"github.com/fatedier/frp/src/utils/log"
)
type GeneralResponse struct {
Code int64 `json:"code"`
Msg string `json:"msg"`
}
func apiReload(w http.ResponseWriter, r *http.Request) {
var buf []byte
res := &GeneralResponse{}
defer func() {
log.Info("Http response [/api/reload]: %s", string(buf))
}()
log.Info("Http request: [/api/reload]")
err := ReloadConf(ConfigFile)
if err != nil {
res.Code = 2
res.Msg = fmt.Sprintf("%v", err)
log.Error("frps reload error: %v", err)
}
buf, _ = json.Marshal(res)
w.Write(buf)
}
type ProxiesResponse struct {
Code int64 `json:"code"`
Msg string `json:"msg"`
Proxies []*metric.ServerMetric `json:"proxies"`
}
func apiProxies(w http.ResponseWriter, r *http.Request) {
var buf []byte
res := &ProxiesResponse{}
defer func() {
log.Info("Http response [/api/proxies]: code [%d]", res.Code)
}()
log.Info("Http request: [/api/proxies]")
res.Proxies = metric.GetAllProxyMetrics()
buf, _ = json.Marshal(res)
w.Write(buf)
}
*/

42
server/dashboard_view.go Normal file
View File

@ -0,0 +1,42 @@
// Copyright 2016 fatedier, fatedier@gmail.com
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
/*
import (
"html/template"
"net/http"
"github.com/fatedier/frp/src/assets"
"github.com/fatedier/frp/src/models/metric"
"github.com/fatedier/frp/src/utils/log"
)
func viewDashboard(w http.ResponseWriter, r *http.Request) {
metrics := metric.GetAllProxyMetrics()
dashboardTpl, err := assets.ReadFile("index.html")
if err != nil {
http.Error(w, "get dashboard template file error", http.StatusInternalServerError)
return
}
t := template.Must(template.New("index.html").Delims("<<<", ">>>").Parse(dashboardTpl))
err = t.Execute(w, metrics)
if err != nil {
log.Warn("parse template file [index.html] error: %v", err)
http.Error(w, "parse template file error", http.StatusInternalServerError)
}
}
*/

75
server/manager.go Normal file
View File

@ -0,0 +1,75 @@
package server
import (
"fmt"
"sync"
)
type ControlManager struct {
// controls indexed by run id
ctlsByRunId map[string]*Control
mu sync.RWMutex
}
func NewControlManager() *ControlManager {
return &ControlManager{
ctlsByRunId: make(map[string]*Control),
}
}
func (cm *ControlManager) Add(runId string, ctl *Control) (oldCtl *Control) {
cm.mu.Lock()
defer cm.mu.Unlock()
oldCtl, ok := cm.ctlsByRunId[runId]
if ok {
oldCtl.Replaced(ctl)
}
cm.ctlsByRunId[runId] = ctl
return
}
func (cm *ControlManager) GetById(runId string) (ctl *Control, ok bool) {
cm.mu.RLock()
defer cm.mu.RUnlock()
ctl, ok = cm.ctlsByRunId[runId]
return
}
type ProxyManager struct {
// proxies indexed by proxy name
pxys map[string]Proxy
mu sync.RWMutex
}
func NewProxyManager() *ProxyManager {
return &ProxyManager{
pxys: make(map[string]Proxy),
}
}
func (pm *ProxyManager) Add(name string, pxy Proxy) error {
pm.mu.Lock()
defer pm.mu.Unlock()
if _, ok := pm.pxys[name]; ok {
return fmt.Errorf("proxy name [%s] is already in use", name)
}
pm.pxys[name] = pxy
return nil
}
func (pm *ProxyManager) Del(name string) {
pm.mu.Lock()
defer pm.mu.Unlock()
delete(pm.pxys, name)
}
func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) {
pm.mu.RLock()
defer pm.mu.RUnlock()
pxy, ok = pm.pxys[name]
return
}

219
server/proxy.go Normal file
View File

@ -0,0 +1,219 @@
package server
import (
"fmt"
"io"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/models/proto/tcp"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/net"
)
type Proxy interface {
Run() error
GetControl() *Control
GetName() string
GetConf() config.ProxyConf
Close()
log.Logger
}
type BaseProxy struct {
name string
ctl *Control
listeners []net.Listener
log.Logger
}
func (pxy *BaseProxy) GetName() string {
return pxy.name
}
func (pxy *BaseProxy) GetControl() *Control {
return pxy.ctl
}
func (pxy *BaseProxy) Close() {
pxy.Info("proxy closing")
for _, l := range pxy.listeners {
l.Close()
}
}
func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) {
basePxy := BaseProxy{
name: pxyConf.GetName(),
ctl: ctl,
listeners: make([]net.Listener, 0),
Logger: log.NewPrefixLogger(ctl.runId),
}
switch cfg := pxyConf.(type) {
case *config.TcpProxyConf:
pxy = &TcpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpProxyConf:
pxy = &HttpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.HttpsProxyConf:
pxy = &HttpsProxy{
BaseProxy: basePxy,
cfg: cfg,
}
case *config.UdpProxyConf:
pxy = &UdpProxy{
BaseProxy: basePxy,
cfg: cfg,
}
default:
return pxy, fmt.Errorf("proxy type not support")
}
pxy.AddLogPrefix(pxy.GetName())
return
}
type TcpProxy struct {
BaseProxy
cfg *config.TcpProxyConf
}
func (pxy *TcpProxy) Run() error {
listener, err := net.ListenTcp(config.ServerCommonCfg.BindAddr, int64(pxy.cfg.RemotePort))
if err != nil {
return err
}
pxy.listeners = append(pxy.listeners, listener)
go func(l net.Listener) {
for {
// block
// if listener is closed, err returned
c, err := l.Accept()
if err != nil {
pxy.Info("listener is closed")
return
}
pxy.Debug("got one user connection [%s]", c.RemoteAddr().String())
go HandleUserTcpConnection(pxy, c)
}
}(listener)
return nil
}
func (pxy *TcpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *TcpProxy) Close() {
pxy.BaseProxy.Close()
}
type HttpProxy struct {
BaseProxy
cfg *config.HttpProxyConf
}
func (pxy *HttpProxy) Run() (err error) {
return
}
func (pxy *HttpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpProxy) Close() {
pxy.BaseProxy.Close()
}
type HttpsProxy struct {
BaseProxy
cfg *config.HttpsProxyConf
}
func (pxy *HttpsProxy) Run() (err error) {
return
}
func (pxy *HttpsProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *HttpsProxy) Close() {
pxy.BaseProxy.Close()
}
type UdpProxy struct {
BaseProxy
cfg *config.UdpProxyConf
}
func (pxy *UdpProxy) Run() (err error) {
return
}
func (pxy *UdpProxy) GetConf() config.ProxyConf {
return pxy.cfg
}
func (pxy *UdpProxy) Close() {
pxy.BaseProxy.Close()
}
// HandleUserTcpConnection is used for incoming tcp user connections.
func HandleUserTcpConnection(pxy Proxy, userConn net.Conn) {
defer userConn.Close()
ctl := pxy.GetControl()
var (
workConn net.Conn
err error
)
// try all connections from the pool
for i := 0; i < ctl.poolCount+1; i++ {
if workConn, err = ctl.GetWorkConn(); err != nil {
pxy.Warn("failed to get work connection: %v", err)
return
}
defer workConn.Close()
pxy.Info("get one new work connection: %s", workConn.RemoteAddr().String())
workConn.AddLogPrefix(pxy.GetName())
err := msg.WriteMsg(workConn, &msg.StartWorkConn{
ProxyName: pxy.GetName(),
})
if err != nil {
workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
workConn.Close()
} else {
break
}
}
if err != nil {
pxy.Error("try to get work connection failed in the end")
return
}
var (
local io.ReadWriteCloser
remote io.ReadWriteCloser
)
local = workConn
remote = userConn
cfg := pxy.GetConf().GetBaseInfo()
if cfg.UseEncryption {
local, err = tcp.WithEncryption(local, []byte(config.ServerCommonCfg.PrivilegeToken))
if err != nil {
pxy.Error("create encryption stream error: %v", err)
return
}
}
if cfg.UseCompression {
local = tcp.WithCompression(local)
}
tcp.Join(local, remote)
}

196
server/service.go Normal file
View File

@ -0,0 +1,196 @@
package server
import (
"fmt"
"time"
"github.com/fatedier/frp/assets"
"github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg"
"github.com/fatedier/frp/utils/log"
"github.com/fatedier/frp/utils/net"
"github.com/fatedier/frp/utils/util"
"github.com/fatedier/frp/utils/version"
"github.com/fatedier/frp/utils/vhost"
)
// Server service.
type Service struct {
// Accept connections from client.
listener net.Listener
// For http proxies, route requests to different clients by hostname and other infomation.
VhostHttpMuxer *vhost.HttpMuxer
// For https proxies, route requests to different clients by hostname and other infomation.
VhostHttpsMuxer *vhost.HttpsMuxer
// Manage all controllers.
ctlManager *ControlManager
// Manage all proxies.
pxyManager *ProxyManager
}
func NewService() (svr *Service, err error) {
svr = &Service{
ctlManager: NewControlManager(),
pxyManager: NewProxyManager(),
}
// Init assets.
err = assets.Load(config.ServerCommonCfg.AssetsDir)
if err != nil {
err = fmt.Errorf("Load assets error: %v", err)
return
}
// Listen for accepting connections from client.
svr.listener, err = net.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort)
if err != nil {
err = fmt.Errorf("Create server listener error, %v", err)
return
}
// Create http vhost muxer.
if config.ServerCommonCfg.VhostHttpPort != 0 {
var l net.Listener
l, err = net.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpPort)
if err != nil {
err = fmt.Errorf("Create vhost http listener error, %v", err)
return
}
svr.VhostHttpMuxer, err = vhost.NewHttpMuxer(l, 30*time.Second)
if err != nil {
err = fmt.Errorf("Create vhost httpMuxer error, %v", err)
return
}
}
// Create https vhost muxer.
if config.ServerCommonCfg.VhostHttpsPort != 0 {
var l net.Listener
l, err = net.ListenTcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpsPort)
if err != nil {
err = fmt.Errorf("Create vhost https listener error, %v", err)
return
}
svr.VhostHttpsMuxer, err = vhost.NewHttpsMuxer(l, 30*time.Second)
if err != nil {
err = fmt.Errorf("Create vhost httpsMuxer error, %v", err)
return
}
}
// Create dashboard web server.
if config.ServerCommonCfg.DashboardPort != 0 {
err = RunDashboardServer(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.DashboardPort)
if err != nil {
err = fmt.Errorf("Create dashboard web server error, %v", err)
return
}
}
return
}
func (svr *Service) Run() {
// Listen for incoming connections from client.
for {
c, err := svr.listener.Accept()
if err != nil {
log.Warn("Listener for incoming connections from client closed")
return
}
// Start a new goroutine for dealing connections.
go func(frpConn net.Conn) {
var rawMsg msg.Message
if rawMsg, err = msg.ReadMsg(frpConn); err != nil {
log.Warn("Failed to read message: %v", err)
frpConn.Close()
return
}
switch m := rawMsg.(type) {
case *msg.Login:
err = svr.RegisterControl(frpConn, m)
// If login failed, send error message there.
// Otherwise send success message in control's work goroutine.
if err != nil {
frpConn.Warn("%v", err)
msg.WriteMsg(frpConn, &msg.LoginResp{
Version: version.Full(),
Error: err.Error(),
})
frpConn.Close()
}
case *msg.NewWorkConn:
svr.RegisterWorkConn(frpConn, m)
default:
log.Warn("Error message type for the new connection [%s]", frpConn.RemoteAddr().String())
frpConn.Close()
}
}(c)
}
}
func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err error) {
ctlConn.Info("client login info: ip [%s] version [%s] hostname [%s] os [%s] arch [%s]",
ctlConn.RemoteAddr().String(), loginMsg.Version, loginMsg.Hostname, loginMsg.Os, loginMsg.Arch)
// Check client version.
if ok, msg := version.Compat(loginMsg.Version); !ok {
err = fmt.Errorf("%s", msg)
return
}
// Check auth.
nowTime := time.Now().Unix()
if config.ServerCommonCfg.AuthTimeout != 0 && nowTime-loginMsg.Timestamp > config.ServerCommonCfg.AuthTimeout {
err = fmt.Errorf("authorization timeout")
return
}
if util.GetAuthKey(config.ServerCommonCfg.PrivilegeToken, loginMsg.Timestamp) != loginMsg.PrivilegeKey {
err = fmt.Errorf("authorization failed")
return
}
// If client's RunId is empty, it's a new client, we just create a new controller.
// Otherwise, we check if there is one controller has the same run id. If so, we release previous controller and start new one.
if loginMsg.RunId == "" {
loginMsg.RunId, err = util.RandId()
if err != nil {
return
}
}
ctl := NewControl(svr, ctlConn, loginMsg)
if oldCtl := svr.ctlManager.Add(loginMsg.RunId, ctl); oldCtl != nil {
oldCtl.allShutdown.WaitDown()
}
ctlConn.AddLogPrefix(loginMsg.RunId)
ctl.Start()
return
}
// RegisterWorkConn register a new work connection to control and proxies need it.
func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn) {
ctl, exist := svr.ctlManager.GetById(newMsg.RunId)
if !exist {
workConn.Warn("No client control found for run id [%s]", newMsg.RunId)
return
}
ctl.RegisterWorkConn(workConn)
return
}
func (svr *Service) RegisterProxy(name string, pxy Proxy) error {
err := svr.pxyManager.Add(name, pxy)
return err
}
func (svr *Service) DelProxy(name string) {
svr.pxyManager.Del(name)
}