Browse Source

[Feature] Server Plugin - Ping and NewWorkConn RPC (#1702)

Guy Lewin 5 years ago
parent
commit
a4b105dedb

+ 39 - 2
doc/server_plugin.md

@@ -70,7 +70,7 @@ The response can look like any of the following:
 
 ### Operation
 
-Currently `Login` and `NewProxy` operations are supported.
+Currently `Login`, `NewProxy`, `Ping` and `NewWorkConn` operations are supported.
 
 #### Login
 
@@ -135,6 +135,43 @@ Create new proxy
 }
 ```
 
+#### Ping
+
+Heartbeat from frpc
+
+```
+{
+    "content": {
+        "user": {
+            "user": <string>,
+            "metas": map<string>string
+            "run_id": <string>
+        },
+        "timestamp": <int64>,
+        "privilege_key": <string>
+    }
+}
+```
+
+#### NewWorkConn
+
+New work connection received from frpc (RPC sent after `run_id` is matched with an existing frp connection)
+
+```
+{
+    "content": {
+        "user": {
+            "user": <string>,
+            "metas": map<string>string
+            "run_id": <string>
+        },
+        "run_id": <string>
+        "timestamp": <int64>,
+        "privilege_key": <string>
+    }
+}
+```
+
 ### Server Plugin Configuration
 
 ```ini
@@ -155,7 +192,7 @@ ops = NewProxy
 
 addr: the address where the external RPC service listens on.
 path: http request url path for the POST request.
-ops: operations plugin needs to handle (e.g. "Login", "NewProxy").
+ops: operations plugin needs to handle (e.g. "Login", "NewProxy", ...).
 
 ### Metadata
 

+ 74 - 4
models/plugin/server/manager.go

@@ -24,14 +24,18 @@ import (
 )
 
 type Manager struct {
-	loginPlugins    []Plugin
-	newProxyPlugins []Plugin
+	loginPlugins       []Plugin
+	newProxyPlugins    []Plugin
+	pingPlugins        []Plugin
+	newWorkConnPlugins []Plugin
 }
 
 func NewManager() *Manager {
 	return &Manager{
-		loginPlugins:    make([]Plugin, 0),
-		newProxyPlugins: make([]Plugin, 0),
+		loginPlugins:       make([]Plugin, 0),
+		newProxyPlugins:    make([]Plugin, 0),
+		pingPlugins:        make([]Plugin, 0),
+		newWorkConnPlugins: make([]Plugin, 0),
 	}
 }
 
@@ -42,6 +46,12 @@ func (m *Manager) Register(p Plugin) {
 	if p.IsSupport(OpNewProxy) {
 		m.newProxyPlugins = append(m.newProxyPlugins, p)
 	}
+	if p.IsSupport(OpPing) {
+		m.pingPlugins = append(m.pingPlugins, p)
+	}
+	if p.IsSupport(OpNewWorkConn) {
+		m.pingPlugins = append(m.pingPlugins, p)
+	}
 }
 
 func (m *Manager) Login(content *LoginContent) (*LoginContent, error) {
@@ -103,3 +113,63 @@ func (m *Manager) NewProxy(content *NewProxyContent) (*NewProxyContent, error) {
 	}
 	return content, nil
 }
+
+func (m *Manager) Ping(content *PingContent) (*PingContent, error) {
+	var (
+		res = &Response{
+			Reject:   false,
+			Unchange: true,
+		}
+		retContent interface{}
+		err        error
+	)
+	reqid, _ := util.RandId()
+	xl := xlog.New().AppendPrefix("reqid: " + reqid)
+	ctx := xlog.NewContext(context.Background(), xl)
+	ctx = NewReqidContext(ctx, reqid)
+
+	for _, p := range m.pingPlugins {
+		res, retContent, err = p.Handle(ctx, OpPing, *content)
+		if err != nil {
+			xl.Warn("send Ping request to plugin [%s] error: %v", p.Name(), err)
+			return nil, errors.New("send Ping request to plugin error")
+		}
+		if res.Reject {
+			return nil, fmt.Errorf("%s", res.RejectReason)
+		}
+		if !res.Unchange {
+			content = retContent.(*PingContent)
+		}
+	}
+	return content, nil
+}
+
+func (m *Manager) NewWorkConn(content *NewWorkConnContent) (*NewWorkConnContent, error) {
+	var (
+		res = &Response{
+			Reject:   false,
+			Unchange: true,
+		}
+		retContent interface{}
+		err        error
+	)
+	reqid, _ := util.RandId()
+	xl := xlog.New().AppendPrefix("reqid: " + reqid)
+	ctx := xlog.NewContext(context.Background(), xl)
+	ctx = NewReqidContext(ctx, reqid)
+
+	for _, p := range m.pingPlugins {
+		res, retContent, err = p.Handle(ctx, OpPing, *content)
+		if err != nil {
+			xl.Warn("send NewWorkConn request to plugin [%s] error: %v", p.Name(), err)
+			return nil, errors.New("send NewWorkConn request to plugin error")
+		}
+		if res.Reject {
+			return nil, fmt.Errorf("%s", res.RejectReason)
+		}
+		if !res.Unchange {
+			content = retContent.(*NewWorkConnContent)
+		}
+	}
+	return content, nil
+}

+ 4 - 2
models/plugin/server/plugin.go

@@ -21,8 +21,10 @@ import (
 const (
 	APIVersion = "0.1.0"
 
-	OpLogin    = "Login"
-	OpNewProxy = "NewProxy"
+	OpLogin       = "Login"
+	OpNewProxy    = "NewProxy"
+	OpPing        = "Ping"
+	OpNewWorkConn = "NewWorkConn"
 )
 
 type Plugin interface {

+ 10 - 0
models/plugin/server/types.go

@@ -45,3 +45,13 @@ type NewProxyContent struct {
 	User UserInfo `json:"user"`
 	msg.NewProxy
 }
+
+type PingContent struct {
+	User UserInfo `json:"user"`
+	msg.Ping
+}
+
+type NewWorkConnContent struct {
+	User UserInfo `json:"user"`
+	msg.NewWorkConn
+}

+ 15 - 2
server/control.go

@@ -450,10 +450,23 @@ func (ctl *Control) manager() {
 				ctl.CloseProxy(m)
 				xl.Info("close proxy [%s] success", m.ProxyName)
 			case *msg.Ping:
-				if err := ctl.authVerifier.VerifyPing(m); err != nil {
+				content := &plugin.PingContent{
+					User: plugin.UserInfo{
+						User:  ctl.loginMsg.User,
+						Metas: ctl.loginMsg.Metas,
+						RunId: ctl.loginMsg.RunId,
+					},
+					Ping: *m,
+				}
+				retContent, err := ctl.pluginManager.Ping(content)
+				if err == nil {
+					m = &retContent.Ping
+					err = ctl.authVerifier.VerifyPing(m)
+				}
+				if err != nil {
 					xl.Warn("received invalid ping: %v", err)
 					ctl.sendCh <- &msg.Pong{
-						Error: "invalid authentication in ping",
+						Error: util.GenerateResponseErrorString("invalid ping", err, ctl.serverCfg.DetailedErrorsToClient),
 					}
 					return
 				}

+ 19 - 5
server/service.go

@@ -457,13 +457,27 @@ func (svr *Service) RegisterWorkConn(workConn net.Conn, newMsg *msg.NewWorkConn)
 		xl.Warn("No client control found for run id [%s]", newMsg.RunId)
 		return fmt.Errorf("no client control found for run id [%s]", newMsg.RunId)
 	}
-	// Check auth.
-	if err := svr.authVerifier.VerifyNewWorkConn(newMsg); err != nil {
-		xl.Warn("Invalid authentication in NewWorkConn message on run id [%s]", newMsg.RunId)
+	// server plugin hook
+	content := &plugin.NewWorkConnContent{
+		User: plugin.UserInfo{
+			User:  ctl.loginMsg.User,
+			Metas: ctl.loginMsg.Metas,
+			RunId: ctl.loginMsg.RunId,
+		},
+		NewWorkConn: *newMsg,
+	}
+	retContent, err := svr.pluginManager.NewWorkConn(content)
+	if err == nil {
+		newMsg = &retContent.NewWorkConn
+		// Check auth.
+		err = svr.authVerifier.VerifyNewWorkConn(newMsg)
+	}
+	if err != nil {
+		xl.Warn("invalid NewWorkConn with run id [%s]", newMsg.RunId)
 		msg.WriteMsg(workConn, &msg.StartWorkConn{
-			Error: "invalid authentication in NewWorkConn",
+			Error: util.GenerateResponseErrorString("invalid NewWorkConn", err, ctl.serverCfg.DetailedErrorsToClient),
 		})
-		return fmt.Errorf("invalid authentication in NewWorkConn message on run id [%s]", newMsg.RunId)
+		return fmt.Errorf("invalid NewWorkConn with run id [%s]", newMsg.RunId)
 	}
 	return ctl.RegisterWorkConn(workConn)
 }