123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564 |
- package server
- import (
- "context"
- "fmt"
- "net"
- "runtime/debug"
- "sync"
- "sync/atomic"
- "time"
- "github.com/samber/lo"
- "github.com/fatedier/frp/pkg/auth"
- "github.com/fatedier/frp/pkg/config"
- v1 "github.com/fatedier/frp/pkg/config/v1"
- pkgerr "github.com/fatedier/frp/pkg/errors"
- "github.com/fatedier/frp/pkg/msg"
- plugin "github.com/fatedier/frp/pkg/plugin/server"
- "github.com/fatedier/frp/pkg/transport"
- netpkg "github.com/fatedier/frp/pkg/util/net"
- "github.com/fatedier/frp/pkg/util/util"
- "github.com/fatedier/frp/pkg/util/version"
- "github.com/fatedier/frp/pkg/util/wait"
- "github.com/fatedier/frp/pkg/util/xlog"
- "github.com/fatedier/frp/server/controller"
- "github.com/fatedier/frp/server/metrics"
- "github.com/fatedier/frp/server/proxy"
- )
- type ControlManager struct {
-
- ctlsByRunID map[string]*Control
- mu sync.RWMutex
- }
- func NewControlManager() *ControlManager {
- return &ControlManager{
- ctlsByRunID: make(map[string]*Control),
- }
- }
- func (cm *ControlManager) Add(runID string, ctl *Control) (old *Control) {
- cm.mu.Lock()
- defer cm.mu.Unlock()
- var ok bool
- old, ok = cm.ctlsByRunID[runID]
- if ok {
- old.Replaced(ctl)
- }
- cm.ctlsByRunID[runID] = ctl
- return
- }
- func (cm *ControlManager) Del(runID string, ctl *Control) {
- cm.mu.Lock()
- defer cm.mu.Unlock()
- if c, ok := cm.ctlsByRunID[runID]; ok && c == ctl {
- delete(cm.ctlsByRunID, runID)
- }
- }
- func (cm *ControlManager) GetByID(runID string) (ctl *Control, ok bool) {
- cm.mu.RLock()
- defer cm.mu.RUnlock()
- ctl, ok = cm.ctlsByRunID[runID]
- return
- }
- func (cm *ControlManager) Close() error {
- cm.mu.Lock()
- defer cm.mu.Unlock()
- for _, ctl := range cm.ctlsByRunID {
- ctl.Close()
- }
- cm.ctlsByRunID = make(map[string]*Control)
- return nil
- }
- type Control struct {
-
- rc *controller.ResourceController
-
- pxyManager *proxy.Manager
-
- pluginManager *plugin.Manager
-
- authVerifier auth.Verifier
-
- msgTransporter transport.MessageTransporter
-
-
- msgDispatcher *msg.Dispatcher
-
- loginMsg *msg.Login
-
- conn net.Conn
-
- workConnCh chan net.Conn
-
- proxies map[string]proxy.Proxy
-
- poolCount int
-
- portsUsedNum int
-
- lastPing atomic.Value
-
-
-
- runID string
- mu sync.RWMutex
-
- serverCfg *v1.ServerConfig
- xl *xlog.Logger
- ctx context.Context
- doneCh chan struct{}
- }
- func NewControl(
- ctx context.Context,
- rc *controller.ResourceController,
- pxyManager *proxy.Manager,
- pluginManager *plugin.Manager,
- authVerifier auth.Verifier,
- ctlConn net.Conn,
- ctlConnEncrypted bool,
- loginMsg *msg.Login,
- serverCfg *v1.ServerConfig,
- ) (*Control, error) {
- poolCount := loginMsg.PoolCount
- if poolCount > int(serverCfg.Transport.MaxPoolCount) {
- poolCount = int(serverCfg.Transport.MaxPoolCount)
- }
- ctl := &Control{
- rc: rc,
- pxyManager: pxyManager,
- pluginManager: pluginManager,
- authVerifier: authVerifier,
- conn: ctlConn,
- loginMsg: loginMsg,
- workConnCh: make(chan net.Conn, poolCount+10),
- proxies: make(map[string]proxy.Proxy),
- poolCount: poolCount,
- portsUsedNum: 0,
- runID: loginMsg.RunID,
- serverCfg: serverCfg,
- xl: xlog.FromContextSafe(ctx),
- ctx: ctx,
- doneCh: make(chan struct{}),
- }
- ctl.lastPing.Store(time.Now())
- if ctlConnEncrypted {
- cryptoRW, err := netpkg.NewCryptoReadWriter(ctl.conn, []byte(ctl.serverCfg.Auth.Token))
- if err != nil {
- return nil, err
- }
- ctl.msgDispatcher = msg.NewDispatcher(cryptoRW)
- } else {
- ctl.msgDispatcher = msg.NewDispatcher(ctl.conn)
- }
- ctl.registerMsgHandlers()
- ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
- return ctl, nil
- }
- func (ctl *Control) Start() {
- loginRespMsg := &msg.LoginResp{
- Version: version.Full(),
- RunID: ctl.runID,
- Error: "",
- }
- _ = msg.WriteMsg(ctl.conn, loginRespMsg)
- go func() {
- for i := 0; i < ctl.poolCount; i++ {
-
- _ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
- }
- }()
- go ctl.worker()
- }
- func (ctl *Control) Close() error {
- ctl.conn.Close()
- return nil
- }
- func (ctl *Control) Replaced(newCtl *Control) {
- xl := ctl.xl
- xl.Infof("Replaced by client [%s]", newCtl.runID)
- ctl.runID = ""
- ctl.conn.Close()
- }
- func (ctl *Control) RegisterWorkConn(conn net.Conn) error {
- xl := ctl.xl
- defer func() {
- if err := recover(); err != nil {
- xl.Errorf("panic error: %v", err)
- xl.Errorf(string(debug.Stack()))
- }
- }()
- select {
- case ctl.workConnCh <- conn:
- xl.Debugf("new work connection registered")
- return nil
- default:
- xl.Debugf("work connection pool is full, discarding")
- return fmt.Errorf("work connection pool is full, discarding")
- }
- }
- func (ctl *Control) GetWorkConn() (workConn net.Conn, err error) {
- xl := ctl.xl
- defer func() {
- if err := recover(); err != nil {
- xl.Errorf("panic error: %v", err)
- xl.Errorf(string(debug.Stack()))
- }
- }()
- var ok bool
-
- select {
- case workConn, ok = <-ctl.workConnCh:
- if !ok {
- err = pkgerr.ErrCtlClosed
- return
- }
- xl.Debugf("get work connection from pool")
- default:
-
- if err := ctl.msgDispatcher.Send(&msg.ReqWorkConn{}); err != nil {
- return nil, fmt.Errorf("control is already closed")
- }
- select {
- case workConn, ok = <-ctl.workConnCh:
- if !ok {
- err = pkgerr.ErrCtlClosed
- xl.Warnf("no work connections available, %v", err)
- return
- }
- case <-time.After(time.Duration(ctl.serverCfg.UserConnTimeout) * time.Second):
- err = fmt.Errorf("timeout trying to get work connection")
- xl.Warnf("%v", err)
- return
- }
- }
-
- _ = ctl.msgDispatcher.Send(&msg.ReqWorkConn{})
- return
- }
- func (ctl *Control) heartbeatWorker() {
- if ctl.serverCfg.Transport.HeartbeatTimeout <= 0 {
- return
- }
- xl := ctl.xl
- go wait.Until(func() {
- if time.Since(ctl.lastPing.Load().(time.Time)) > time.Duration(ctl.serverCfg.Transport.HeartbeatTimeout)*time.Second {
- xl.Warnf("heartbeat timeout")
- ctl.conn.Close()
- return
- }
- }, time.Second, ctl.doneCh)
- }
- func (ctl *Control) WaitClosed() {
- <-ctl.doneCh
- }
- func (ctl *Control) worker() {
- xl := ctl.xl
- go ctl.heartbeatWorker()
- go ctl.msgDispatcher.Run()
- <-ctl.msgDispatcher.Done()
- ctl.conn.Close()
- ctl.mu.Lock()
- defer ctl.mu.Unlock()
- close(ctl.workConnCh)
- for workConn := range ctl.workConnCh {
- workConn.Close()
- }
- for _, pxy := range ctl.proxies {
- pxy.Close()
- ctl.pxyManager.Del(pxy.GetName())
- metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConfigurer().GetBaseConfig().Type)
- notifyContent := &plugin.CloseProxyContent{
- User: plugin.UserInfo{
- User: ctl.loginMsg.User,
- Metas: ctl.loginMsg.Metas,
- RunID: ctl.loginMsg.RunID,
- },
- CloseProxy: msg.CloseProxy{
- ProxyName: pxy.GetName(),
- },
- }
- go func() {
- _ = ctl.pluginManager.CloseProxy(notifyContent)
- }()
- }
- metrics.Server.CloseClient()
- xl.Infof("client exit success")
- close(ctl.doneCh)
- }
- func (ctl *Control) registerMsgHandlers() {
- ctl.msgDispatcher.RegisterHandler(&msg.NewProxy{}, ctl.handleNewProxy)
- ctl.msgDispatcher.RegisterHandler(&msg.Ping{}, ctl.handlePing)
- ctl.msgDispatcher.RegisterHandler(&msg.NatHoleVisitor{}, msg.AsyncHandler(ctl.handleNatHoleVisitor))
- ctl.msgDispatcher.RegisterHandler(&msg.NatHoleClient{}, msg.AsyncHandler(ctl.handleNatHoleClient))
- ctl.msgDispatcher.RegisterHandler(&msg.NatHoleReport{}, msg.AsyncHandler(ctl.handleNatHoleReport))
- ctl.msgDispatcher.RegisterHandler(&msg.CloseProxy{}, ctl.handleCloseProxy)
- }
- func (ctl *Control) handleNewProxy(m msg.Message) {
- xl := ctl.xl
- inMsg := m.(*msg.NewProxy)
- content := &plugin.NewProxyContent{
- User: plugin.UserInfo{
- User: ctl.loginMsg.User,
- Metas: ctl.loginMsg.Metas,
- RunID: ctl.loginMsg.RunID,
- },
- NewProxy: *inMsg,
- }
- var remoteAddr string
- retContent, err := ctl.pluginManager.NewProxy(content)
- if err == nil {
- inMsg = &retContent.NewProxy
- remoteAddr, err = ctl.RegisterProxy(inMsg)
- }
-
- resp := &msg.NewProxyResp{
- ProxyName: inMsg.ProxyName,
- }
- if err != nil {
- xl.Warnf("new proxy [%s] type [%s] error: %v", inMsg.ProxyName, inMsg.ProxyType, err)
- resp.Error = util.GenerateResponseErrorString(fmt.Sprintf("new proxy [%s] error", inMsg.ProxyName),
- err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient))
- } else {
- resp.RemoteAddr = remoteAddr
- xl.Infof("new proxy [%s] type [%s] success", inMsg.ProxyName, inMsg.ProxyType)
- metrics.Server.NewProxy(inMsg.ProxyName, inMsg.ProxyType)
- }
- _ = ctl.msgDispatcher.Send(resp)
- }
- func (ctl *Control) handlePing(m msg.Message) {
- xl := ctl.xl
- inMsg := m.(*msg.Ping)
- content := &plugin.PingContent{
- User: plugin.UserInfo{
- User: ctl.loginMsg.User,
- Metas: ctl.loginMsg.Metas,
- RunID: ctl.loginMsg.RunID,
- },
- Ping: *inMsg,
- }
- retContent, err := ctl.pluginManager.Ping(content)
- if err == nil {
- inMsg = &retContent.Ping
- err = ctl.authVerifier.VerifyPing(inMsg)
- }
- if err != nil {
- xl.Warnf("received invalid ping: %v", err)
- _ = ctl.msgDispatcher.Send(&msg.Pong{
- Error: util.GenerateResponseErrorString("invalid ping", err, lo.FromPtr(ctl.serverCfg.DetailedErrorsToClient)),
- })
- return
- }
- ctl.lastPing.Store(time.Now())
- xl.Debugf("receive heartbeat")
- _ = ctl.msgDispatcher.Send(&msg.Pong{})
- }
- func (ctl *Control) handleNatHoleVisitor(m msg.Message) {
- inMsg := m.(*msg.NatHoleVisitor)
- ctl.rc.NatHoleController.HandleVisitor(inMsg, ctl.msgTransporter, ctl.loginMsg.User)
- }
- func (ctl *Control) handleNatHoleClient(m msg.Message) {
- inMsg := m.(*msg.NatHoleClient)
- ctl.rc.NatHoleController.HandleClient(inMsg, ctl.msgTransporter)
- }
- func (ctl *Control) handleNatHoleReport(m msg.Message) {
- inMsg := m.(*msg.NatHoleReport)
- ctl.rc.NatHoleController.HandleReport(inMsg)
- }
- func (ctl *Control) handleCloseProxy(m msg.Message) {
- xl := ctl.xl
- inMsg := m.(*msg.CloseProxy)
- _ = ctl.CloseProxy(inMsg)
- xl.Infof("close proxy [%s] success", inMsg.ProxyName)
- }
- func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
- var pxyConf v1.ProxyConfigurer
-
- pxyConf, err = config.NewProxyConfigurerFromMsg(pxyMsg, ctl.serverCfg)
- if err != nil {
- return
- }
-
- userInfo := plugin.UserInfo{
- User: ctl.loginMsg.User,
- Metas: ctl.loginMsg.Metas,
- RunID: ctl.runID,
- }
-
-
- pxy, err := proxy.NewProxy(ctl.ctx, &proxy.Options{
- UserInfo: userInfo,
- LoginMsg: ctl.loginMsg,
- PoolCount: ctl.poolCount,
- ResourceController: ctl.rc,
- GetWorkConnFn: ctl.GetWorkConn,
- Configurer: pxyConf,
- ServerCfg: ctl.serverCfg,
- })
- if err != nil {
- return remoteAddr, err
- }
-
- if ctl.serverCfg.MaxPortsPerClient > 0 {
- ctl.mu.Lock()
- if ctl.portsUsedNum+pxy.GetUsedPortsNum() > int(ctl.serverCfg.MaxPortsPerClient) {
- ctl.mu.Unlock()
- err = fmt.Errorf("exceed the max_ports_per_client")
- return
- }
- ctl.portsUsedNum += pxy.GetUsedPortsNum()
- ctl.mu.Unlock()
- defer func() {
- if err != nil {
- ctl.mu.Lock()
- ctl.portsUsedNum -= pxy.GetUsedPortsNum()
- ctl.mu.Unlock()
- }
- }()
- }
- if ctl.pxyManager.Exist(pxyMsg.ProxyName) {
- err = fmt.Errorf("proxy [%s] already exists", pxyMsg.ProxyName)
- return
- }
- remoteAddr, err = pxy.Run()
- if err != nil {
- return
- }
- defer func() {
- if err != nil {
- pxy.Close()
- }
- }()
- err = ctl.pxyManager.Add(pxyMsg.ProxyName, pxy)
- if err != nil {
- return
- }
- ctl.mu.Lock()
- ctl.proxies[pxy.GetName()] = pxy
- ctl.mu.Unlock()
- return
- }
- func (ctl *Control) CloseProxy(closeMsg *msg.CloseProxy) (err error) {
- ctl.mu.Lock()
- pxy, ok := ctl.proxies[closeMsg.ProxyName]
- if !ok {
- ctl.mu.Unlock()
- return
- }
- if ctl.serverCfg.MaxPortsPerClient > 0 {
- ctl.portsUsedNum -= pxy.GetUsedPortsNum()
- }
- pxy.Close()
- ctl.pxyManager.Del(pxy.GetName())
- delete(ctl.proxies, closeMsg.ProxyName)
- ctl.mu.Unlock()
- metrics.Server.CloseProxy(pxy.GetName(), pxy.GetConfigurer().GetBaseConfig().Type)
- notifyContent := &plugin.CloseProxyContent{
- User: plugin.UserInfo{
- User: ctl.loginMsg.User,
- Metas: ctl.loginMsg.Metas,
- RunID: ctl.loginMsg.RunID,
- },
- CloseProxy: msg.CloseProxy{
- ProxyName: pxy.GetName(),
- },
- }
- go func() {
- _ = ctl.pluginManager.CloseProxy(notifyContent)
- }()
- return
- }
|