control.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package main
  2. import (
  3. "encoding/json"
  4. "io"
  5. "sync"
  6. "time"
  7. "frp/pkg/models"
  8. "frp/pkg/utils/conn"
  9. "frp/pkg/utils/log"
  10. )
  11. const (
  12. heartbeatDuration = 2 //心跳检测时间间隔,单位秒
  13. )
  14. // client与server之间连接的保护锁
  15. var connProtect sync.Mutex
  16. func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) {
  17. defer wait.Done()
  18. c := loginToServer(cli)
  19. if c == nil {
  20. log.Error("ProxyName [%s], connect to server failed!", cli.Name)
  21. return
  22. }
  23. defer c.Close()
  24. go startHeartBeat(c)
  25. for {
  26. // ignore response content now
  27. _, err := c.ReadLine()
  28. if err == io.EOF {
  29. connProtect.Lock() // 除了这里,其他地方禁止对连接进行任何操作
  30. log.Debug("ProxyName [%s], server close this control conn", cli.Name)
  31. var sleepTime time.Duration = 1
  32. for {
  33. log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, ServerAddr, ServerPort)
  34. tmpConn := loginToServer(cli)
  35. if tmpConn != nil {
  36. c.Close()
  37. c = tmpConn
  38. break
  39. }
  40. if sleepTime < 60 {
  41. sleepTime++
  42. }
  43. time.Sleep(sleepTime * time.Second)
  44. }
  45. connProtect.Unlock()
  46. continue
  47. } else if err != nil {
  48. log.Warn("ProxyName [%s], read from server error, %v", cli.Name, err)
  49. continue
  50. }
  51. cli.StartTunnel(ServerAddr, ServerPort)
  52. }
  53. }
  54. func loginToServer(cli *models.ProxyClient) (connection *conn.Conn) {
  55. c := &conn.Conn{}
  56. connection = nil
  57. for i := 0; i < 1; i++ { // ZWF: 此处的for作为控制流使用
  58. err := c.ConnectServer(ServerAddr, ServerPort)
  59. if err != nil {
  60. log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", cli.Name, ServerAddr, ServerPort, err)
  61. break
  62. }
  63. req := &models.ClientCtlReq{
  64. Type: models.ControlConn,
  65. ProxyName: cli.Name,
  66. Passwd: cli.Passwd,
  67. }
  68. buf, _ := json.Marshal(req)
  69. err = c.Write(string(buf) + "\n")
  70. if err != nil {
  71. log.Error("ProxyName [%s], write to server error, %v", cli.Name, err)
  72. break
  73. }
  74. res, err := c.ReadLine()
  75. if err != nil {
  76. log.Error("ProxyName [%s], read from server error, %v", cli.Name, err)
  77. break
  78. }
  79. log.Debug("ProxyName [%s], read [%s]", cli.Name, res)
  80. clientCtlRes := &models.ClientCtlRes{}
  81. if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil {
  82. log.Error("ProxyName [%s], format server response error, %v", cli.Name, err)
  83. break
  84. }
  85. if clientCtlRes.Code != 0 {
  86. log.Error("ProxyName [%s], start proxy error, %s", cli.Name, clientCtlRes.Msg)
  87. break
  88. }
  89. connection = c
  90. }
  91. if connection == nil {
  92. c.Close()
  93. }
  94. return
  95. }
  96. func startHeartBeat(con *conn.Conn) {
  97. for {
  98. time.Sleep(heartbeatDuration * time.Second)
  99. connProtect.Lock()
  100. err := con.Write("\r\n")
  101. connProtect.Unlock()
  102. if err != nil {
  103. log.Error("Send hearbeat to server failed! Err:%s", err.Error())
  104. }
  105. }
  106. }