Explorar el Código

Merge pull request #4 from Hurricanezwf/fix_bug

fix heartbeat bug
fatedier hace 9 años
padre
commit
c7b0687b2c

+ 47 - 9
src/frp/cmd/frpc/control.go

@@ -14,6 +14,9 @@ import (
 	"frp/utils/log"
 )
 
+var connection *conn.Conn = nil
+var heartBeatTimer *time.Timer = nil
+
 func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
 	defer wait.Done()
 
@@ -22,12 +25,13 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
 		log.Error("ProxyName [%s], connect to server failed!", cli.Name)
 		return
 	}
-	defer c.Close()
+	connection = c
+	defer connection.Close()
 
 	for {
 		// ignore response content now
-		_, err := c.ReadLine()
-		if err == io.EOF {
+		content, err := connection.ReadLine()
+		if err == io.EOF || nil == connection || connection.IsClosed() {
 			log.Debug("ProxyName [%s], server close this control conn", cli.Name)
 			var sleepTime time.Duration = 1
 
@@ -36,8 +40,8 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
 				log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort)
 				tmpConn, err := loginToServer(cli)
 				if err == nil {
-					c.Close()
-					c = tmpConn
+					connection.Close()
+					connection = tmpConn
 					break
 				}
 
@@ -52,6 +56,21 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
 			continue
 		}
 
+		clientCtlRes := &msg.ClientCtlRes{}
+		if err := json.Unmarshal([]byte(content), clientCtlRes); err != nil {
+			log.Warn("Parse err: %v : %s", err, content)
+			continue
+		}
+		if consts.SCHeartBeatRes == clientCtlRes.GeneralRes.Code {
+			if heartBeatTimer != nil {
+				log.Debug("Client rcv heartbeat response")
+				heartBeatTimer.Reset(time.Duration(client.HeartBeatTimeout) * time.Second)
+			} else {
+				log.Error("heartBeatTimer is nil")
+			}
+			continue
+		}
+
 		cli.StartTunnel(client.ServerAddr, client.ServerPort)
 	}
 }
@@ -100,18 +119,37 @@ func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) {
 }
 
 func startHeartBeat(c *conn.Conn) {
+	f := func() {
+		log.Error("HeartBeat timeout!")
+		if c != nil {
+			c.Close()
+		}
+	}
+	heartBeatTimer = time.AfterFunc(time.Duration(client.HeartBeatTimeout)*time.Second, f)
+	defer heartBeatTimer.Stop()
+
+	clientCtlReq := &msg.ClientCtlReq{
+		Type:      consts.CSHeartBeatReq,
+		ProxyName: "",
+		Passwd:    "",
+	}
+	request, err := json.Marshal(clientCtlReq)
+	if err != nil {
+		log.Warn("Serialize clientCtlReq err! Err: %v", err)
+	}
+
 	log.Debug("Start to send heartbeat")
 	for {
 		time.Sleep(time.Duration(client.HeartBeatInterval) * time.Second)
-		if !c.IsClosed() {
-			err := c.Write("\n")
+		if c != nil && !c.IsClosed() {
+			err = c.Write(string(request) + "\n")
 			if err != nil {
-				log.Error("Send hearbeat to server failed! Err:%s", err.Error())
+				log.Error("Send hearbeat to server failed! Err:%v", err)
 				continue
 			}
 		} else {
 			break
 		}
 	}
-	log.Debug("heartbeat exit")
+	log.Debug("Heartbeat exit")
 }

+ 25 - 4
src/frp/cmd/frps/control.go

@@ -162,13 +162,13 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) {
 	defer timer.Stop()
 
 	for isContinueRead {
-		_, err := c.ReadLine()
+		content, err := c.ReadLine()
 		if err != nil {
 			if err == io.EOF {
 				log.Warn("ProxyName [%s], client is dead!", s.Name)
 				s.Close()
 				break
-			} else if c.IsClosed() {
+			} else if nil == c || c.IsClosed() {
 				log.Warn("ProxyName [%s], client connection is closed", s.Name)
 				break
 			}
@@ -176,8 +176,29 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) {
 			log.Error("ProxyName [%s], read error: %v", s.Name, err)
 			continue
 		}
-		log.Debug("ProxyName [%s], get heartbeat", s.Name)
 
-		timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
+		clientCtlReq := &msg.ClientCtlReq{}
+		if err := json.Unmarshal([]byte(content), clientCtlReq); err != nil {
+			log.Warn("Parse err: %v : %s", err, content)
+			continue
+		}
+		if consts.CSHeartBeatReq == clientCtlReq.Type {
+			log.Debug("ProxyName [%s], get heartbeat", s.Name)
+			timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
+
+			clientCtlRes := &msg.ClientCtlRes{}
+			clientCtlRes.GeneralRes.Code = consts.SCHeartBeatRes
+			response, err := json.Marshal(clientCtlRes)
+			if err != nil {
+				log.Warn("Serialize ClientCtlRes err! err: %v", err)
+				continue
+			}
+
+			err = c.Write(string(response) + "\n")
+			if err != nil {
+				log.Error("Send heartbeat response to client failed! Err:%v", err)
+				continue
+			}
+		}
 	}
 }

+ 1 - 0
src/frp/models/client/config.go

@@ -15,6 +15,7 @@ var (
 	LogLevel          string = "warn"
 	LogWay            string = "file"
 	HeartBeatInterval int64  = 5
+	HeartBeatTimeout  int64  = 30
 )
 
 var ProxyClients map[string]*ProxyClient = make(map[string]*ProxyClient)

+ 10 - 0
src/frp/models/consts/consts.go

@@ -11,3 +11,13 @@ const (
 	CtlConn = iota
 	WorkConn
 )
+
+// msg from client to server
+const (
+	CSHeartBeatReq = 1
+)
+
+// msg from server to client
+const (
+	SCHeartBeatRes = 100
+)