Browse Source

Server manager support the NewUserConn operation (#1740)

support NewUserConn operation
zhang-wei 4 years ago
parent
commit
ad0c449a75

+ 20 - 1
doc/server_plugin.md

@@ -70,7 +70,7 @@ The response can look like any of the following:
 
 ### Operation
 
-Currently `Login`, `NewProxy`, `Ping` and `NewWorkConn` operations are supported.
+Currently `Login`, `NewProxy`, `Ping`, `NewWorkConn` and `NewUserConn` operations are supported.
 
 #### Login
 
@@ -172,6 +172,25 @@ New work connection received from frpc (RPC sent after `run_id` is matched with
 }
 ```
 
+#### NewUserConn
+
+New user connection received from proxy (support `tcp`, `stcp`, `https` and `tcpmux`) .
+
+```
+{
+    "content": {
+        "user": {
+            "user": <string>,
+            "metas": map<string>string
+            "run_id": <string>
+        },
+        "proxy_name": <string>,
+        "proxy_type": <string>,
+        "remote_addr": <string>
+    }
+}
+```
+
 ### Server Plugin Configuration
 
 ```ini

+ 58 - 1
doc/server_plugin_zh.md

@@ -69,7 +69,7 @@ Response
 
 ### 操作类型
 
-目前插件支持管理的操作类型有 `Login` 和 `NewProxy`。
+目前插件支持管理的操作类型有 `Login`、`NewProxy`、`Ping`、`NewWorkConn` 和 `NewUserConn`。
 
 #### Login
 
@@ -127,6 +127,63 @@ Response
 }
 ```
 
+#### Ping
+
+心跳相关信息
+
+```
+{
+    "content": {
+        "user": {
+            "user": <string>,
+            "metas": map<string>string
+            "run_id": <string>
+        },
+        "timestamp": <int64>,
+        "privilege_key": <string>
+    }
+}
+```
+
+#### NewWorkConn
+
+新增 `frpc` 连接相关信息
+
+```
+{
+    "content": {
+        "user": {
+            "user": <string>,
+            "metas": map<string>string
+            "run_id": <string>
+        },
+        "run_id": <string>
+        "timestamp": <int64>,
+        "privilege_key": <string>
+    }
+}
+```
+
+#### NewUserConn
+
+新增 `proxy` 连接相关信息 (支持 `tcp`、`stcp`、`https` 和 `tcpmux` 协议)。
+
+```
+{
+    "content": {
+        "user": {
+            "user": <string>,
+            "metas": map<string>string
+            "run_id": <string>
+        },
+        "proxy_name": <string>,
+        "proxy_type": <string>,
+        "remote_addr": <string>
+    }
+}
+```
+
+
 ### frps 中插件配置
 
 ```ini

+ 39 - 0
models/plugin/server/manager.go

@@ -28,6 +28,7 @@ type Manager struct {
 	newProxyPlugins    []Plugin
 	pingPlugins        []Plugin
 	newWorkConnPlugins []Plugin
+	newUserConnPlugins []Plugin
 }
 
 func NewManager() *Manager {
@@ -36,6 +37,7 @@ func NewManager() *Manager {
 		newProxyPlugins:    make([]Plugin, 0),
 		pingPlugins:        make([]Plugin, 0),
 		newWorkConnPlugins: make([]Plugin, 0),
+		newUserConnPlugins: make([]Plugin, 0),
 	}
 }
 
@@ -52,6 +54,9 @@ func (m *Manager) Register(p Plugin) {
 	if p.IsSupport(OpNewWorkConn) {
 		m.pingPlugins = append(m.pingPlugins, p)
 	}
+	if p.IsSupport(OpNewUserConn) {
+		m.newUserConnPlugins = append(m.newUserConnPlugins, p)
+	}
 }
 
 func (m *Manager) Login(content *LoginContent) (*LoginContent, error) {
@@ -189,3 +194,37 @@ func (m *Manager) NewWorkConn(content *NewWorkConnContent) (*NewWorkConnContent,
 	}
 	return content, nil
 }
+
+func (m *Manager) NewUserConn(content *NewUserConnContent) (*NewUserConnContent, error) {
+	if len(m.newUserConnPlugins) == 0 {
+		return content, nil
+	}
+
+	var (
+		res = &Response{
+			Reject:   false,
+			Unchange: true,
+		}
+		retContent interface{}
+		err        error
+	)
+	reqid, _ := util.RandId()
+	xl := xlog.New().AppendPrefix("reqid: " + reqid)
+	ctx := xlog.NewContext(context.Background(), xl)
+	ctx = NewReqidContext(ctx, reqid)
+
+	for _, p := range m.newUserConnPlugins {
+		res, retContent, err = p.Handle(ctx, OpNewUserConn, *content)
+		if err != nil {
+			xl.Info("send NewUserConn request to plugin [%s] error: %v", p.Name(), err)
+			return nil, errors.New("send NewUserConn request to plugin error")
+		}
+		if res.Reject {
+			return nil, fmt.Errorf("%s", res.RejectReason)
+		}
+		if !res.Unchange {
+			content = retContent.(*NewUserConnContent)
+		}
+	}
+	return content, nil
+}

+ 1 - 0
models/plugin/server/plugin.go

@@ -25,6 +25,7 @@ const (
 	OpNewProxy    = "NewProxy"
 	OpPing        = "Ping"
 	OpNewWorkConn = "NewWorkConn"
+	OpNewUserConn = "NewUserConn"
 )
 
 type Plugin interface {

+ 7 - 0
models/plugin/server/types.go

@@ -55,3 +55,10 @@ type NewWorkConnContent struct {
 	User UserInfo `json:"user"`
 	msg.NewWorkConn
 }
+
+type NewUserConnContent struct {
+	User       UserInfo `json:"user"`
+	ProxyName  string   `json:"proxy_name"`
+	ProxyType  string   `json:"proxy_type"`
+	RemoteAddr string   `json:"remote_addr"`
+}

+ 8 - 1
server/control.go

@@ -486,9 +486,16 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
 		return
 	}
 
+	// User info
+	userInfo := plugin.UserInfo{
+		User:  ctl.loginMsg.User,
+		Metas: ctl.loginMsg.Metas,
+		RunId: ctl.runId,
+	}
+
 	// NewProxy will return a interface Proxy.
 	// In fact it create different proxies by different proxy type, we just call run() here.
-	pxy, err := proxy.NewProxy(ctl.ctx, ctl.runId, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg)
+	pxy, err := proxy.NewProxy(ctl.ctx, userInfo, ctl.rc, ctl.poolCount, ctl.GetWorkConn, pxyConf, ctl.serverCfg)
 	if err != nil {
 		return remoteAddr, err
 	}

+ 4 - 0
server/controller/resource.go

@@ -16,6 +16,7 @@ package controller
 
 import (
 	"github.com/fatedier/frp/models/nathole"
+	plugin "github.com/fatedier/frp/models/plugin/server"
 	"github.com/fatedier/frp/server/group"
 	"github.com/fatedier/frp/server/ports"
 	"github.com/fatedier/frp/utils/tcpmux"
@@ -50,4 +51,7 @@ type ResourceController struct {
 
 	// TcpMux HTTP CONNECT multiplexer
 	TcpMuxHttpConnectMuxer *tcpmux.HttpConnectTcpMuxer
+
+	// All server manager plugin
+	PluginManager *plugin.Manager
 }

+ 28 - 1
server/proxy/proxy.go

@@ -24,6 +24,7 @@ import (
 
 	"github.com/fatedier/frp/models/config"
 	"github.com/fatedier/frp/models/msg"
+	plugin "github.com/fatedier/frp/models/plugin/server"
 	"github.com/fatedier/frp/server/controller"
 	"github.com/fatedier/frp/server/metrics"
 	frpNet "github.com/fatedier/frp/utils/net"
@@ -41,6 +42,8 @@ type Proxy interface {
 	GetConf() config.ProxyConf
 	GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error)
 	GetUsedPortsNum() int
+	GetResourceController() *controller.ResourceController
+	GetUserInfo() plugin.UserInfo
 	Close()
 }
 
@@ -52,6 +55,7 @@ type BaseProxy struct {
 	poolCount     int
 	getWorkConnFn GetWorkConnFn
 	serverCfg     config.ServerCommonConf
+	userInfo      plugin.UserInfo
 
 	mu  sync.RWMutex
 	xl  *xlog.Logger
@@ -70,6 +74,14 @@ func (pxy *BaseProxy) GetUsedPortsNum() int {
 	return pxy.usedPortsNum
 }
 
+func (pxy *BaseProxy) GetResourceController() *controller.ResourceController {
+	return pxy.rc
+}
+
+func (pxy *BaseProxy) GetUserInfo() plugin.UserInfo {
+	return pxy.userInfo
+}
+
 func (pxy *BaseProxy) Close() {
 	xl := xlog.FromContextSafe(pxy.ctx)
 	xl.Info("proxy closing")
@@ -154,7 +166,7 @@ func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn,
 	}
 }
 
-func NewProxy(ctx context.Context, runId string, rc *controller.ResourceController, poolCount int,
+func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.ResourceController, poolCount int,
 	getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf) (pxy Proxy, err error) {
 
 	xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName)
@@ -167,6 +179,7 @@ func NewProxy(ctx context.Context, runId string, rc *controller.ResourceControll
 		serverCfg:     serverCfg,
 		xl:            xl,
 		ctx:           xlog.NewContext(ctx, xl),
+		userInfo:      userInfo,
 	}
 	switch cfg := pxyConf.(type) {
 	case *config.TcpProxyConf:
@@ -218,6 +231,20 @@ func HandleUserTcpConnection(pxy Proxy, userConn net.Conn, serverCfg config.Serv
 	xl := xlog.FromContextSafe(pxy.Context())
 	defer userConn.Close()
 
+	// server plugin hook
+	rc := pxy.GetResourceController()
+	content := &plugin.NewUserConnContent{
+		User:       pxy.GetUserInfo(),
+		ProxyName:  pxy.GetName(),
+		ProxyType:  pxy.GetConf().GetBaseInfo().ProxyType,
+		RemoteAddr: userConn.RemoteAddr().String(),
+	}
+	_, err := rc.PluginManager.NewUserConn(content)
+	if err != nil {
+		xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
+		return
+	}
+
 	// try all connections from the pool
 	workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
 	if err != nil {

+ 1 - 0
server/service.go

@@ -119,6 +119,7 @@ func NewService(cfg config.ServerCommonConf) (svr *Service, err error) {
 		svr.pluginManager.Register(plugin.NewHTTPPluginOptions(options))
 		log.Info("plugin [%s] has been registered", name)
 	}
+	svr.rc.PluginManager = svr.pluginManager
 
 	// Init group controller
 	svr.rc.TcpGroupCtl = group.NewTcpGroupCtl(svr.rc.TcpPortManager)