control.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package main
  2. import (
  3. "encoding/json"
  4. "io"
  5. "sync"
  6. "time"
  7. "frp/pkg/models"
  8. "frp/pkg/utils/conn"
  9. "frp/pkg/utils/log"
  10. )
  11. const (
  12. heartbeatDuration = 2 //心跳检测时间间隔,单位秒
  13. )
  14. var isHeartBeatContinue bool = true
  15. func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) {
  16. defer wait.Done()
  17. c := loginToServer(cli)
  18. if c == nil {
  19. log.Error("ProxyName [%s], connect to server failed!", cli.Name)
  20. return
  21. }
  22. defer c.Close()
  23. for {
  24. // ignore response content now
  25. _, err := c.ReadLine()
  26. if err == io.EOF {
  27. isHeartBeatContinue = false
  28. log.Debug("ProxyName [%s], server close this control conn", cli.Name)
  29. var sleepTime time.Duration = 1
  30. for {
  31. log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, ServerAddr, ServerPort)
  32. tmpConn := loginToServer(cli)
  33. if tmpConn != nil {
  34. c.Close()
  35. c = tmpConn
  36. break
  37. }
  38. if sleepTime < 60 {
  39. sleepTime++
  40. }
  41. time.Sleep(sleepTime * time.Second)
  42. }
  43. continue
  44. } else if err != nil {
  45. log.Warn("ProxyName [%s], read from server error, %v", cli.Name, err)
  46. continue
  47. }
  48. cli.StartTunnel(ServerAddr, ServerPort)
  49. }
  50. }
  51. func loginToServer(cli *models.ProxyClient) (connection *conn.Conn) {
  52. c := &conn.Conn{}
  53. connection = nil
  54. for i := 0; i < 1; i++ { // ZWF: 此处的for作为控制流使用
  55. err := c.ConnectServer(ServerAddr, ServerPort)
  56. if err != nil {
  57. log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", cli.Name, ServerAddr, ServerPort, err)
  58. break
  59. }
  60. req := &models.ClientCtlReq{
  61. Type: models.ControlConn,
  62. ProxyName: cli.Name,
  63. Passwd: cli.Passwd,
  64. }
  65. buf, _ := json.Marshal(req)
  66. err = c.Write(string(buf) + "\n")
  67. if err != nil {
  68. log.Error("ProxyName [%s], write to server error, %v", cli.Name, err)
  69. break
  70. }
  71. res, err := c.ReadLine()
  72. if err != nil {
  73. log.Error("ProxyName [%s], read from server error, %v", cli.Name, err)
  74. break
  75. }
  76. log.Debug("ProxyName [%s], read [%s]", cli.Name, res)
  77. clientCtlRes := &models.ClientCtlRes{}
  78. if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil {
  79. log.Error("ProxyName [%s], format server response error, %v", cli.Name, err)
  80. break
  81. }
  82. if clientCtlRes.Code != 0 {
  83. log.Error("ProxyName [%s], start proxy error, %s", cli.Name, clientCtlRes.Msg)
  84. break
  85. }
  86. connection = c
  87. go startHeartBeat(connection)
  88. log.Debug("ProxyName [%s], connect to server[%s:%d] success!", cli.Name, ServerAddr, ServerPort)
  89. }
  90. if connection == nil {
  91. c.Close()
  92. }
  93. return
  94. }
  95. func startHeartBeat(con *conn.Conn) {
  96. isHeartBeatContinue = true
  97. for {
  98. time.Sleep(heartbeatDuration * time.Second)
  99. if isHeartBeatContinue { // 把isHeartBeatContinue放在这里是为了防止SIGPIPE
  100. err := con.Write("\r\n")
  101. //log.Debug("send heart beat to server!")
  102. if err != nil {
  103. log.Error("Send hearbeat to server failed! Err:%s", err.Error())
  104. }
  105. } else {
  106. break
  107. }
  108. }
  109. }