1
0

control.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "time"
  7. "frp/pkg/models"
  8. "frp/pkg/utils/conn"
  9. "frp/pkg/utils/log"
  10. )
  11. func ProcessControlConn(l *conn.Listener) {
  12. for {
  13. c := l.GetConn()
  14. log.Debug("Get one new conn, %v", c.GetRemoteAddr())
  15. go controlWorker(c)
  16. }
  17. }
  18. // control connection from every client and server
  19. func controlWorker(c *conn.Conn) {
  20. defer c.Close()
  21. // the first message is from client to server
  22. // if error, close connection
  23. res, err := c.ReadLine()
  24. if err != nil {
  25. log.Warn("Read error, %v", err)
  26. return
  27. }
  28. log.Debug("get: %s", res)
  29. clientCtlReq := &models.ClientCtlReq{}
  30. clientCtlRes := &models.ClientCtlRes{}
  31. if err := json.Unmarshal([]byte(res), &clientCtlReq); err != nil {
  32. log.Warn("Parse err: %v : %s", err, res)
  33. return
  34. }
  35. // check
  36. succ, msg, needRes := checkProxy(clientCtlReq, c)
  37. if !succ {
  38. clientCtlRes.Code = 1
  39. clientCtlRes.Msg = msg
  40. }
  41. if needRes {
  42. buf, _ := json.Marshal(clientCtlRes)
  43. err = c.Write(string(buf) + "\n")
  44. if err != nil {
  45. log.Warn("Write error, %v", err)
  46. time.Sleep(1 * time.Second)
  47. return
  48. }
  49. } else {
  50. // work conn, just return
  51. return
  52. }
  53. // others is from server to client
  54. server, ok := ProxyServers[clientCtlReq.ProxyName]
  55. if !ok {
  56. log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName)
  57. return
  58. }
  59. // read control msg from client
  60. go readControlMsgFromClient(server, c)
  61. serverCtlReq := &models.ClientCtlReq{}
  62. serverCtlReq.Type = models.WorkConn
  63. for {
  64. _, isStop := server.WaitUserConn()
  65. if isStop {
  66. break
  67. }
  68. buf, _ := json.Marshal(serverCtlReq)
  69. err = c.Write(string(buf) + "\n")
  70. if err != nil {
  71. log.Warn("ProxyName [%s], write to client error, proxy exit", server.Name)
  72. server.Close()
  73. return
  74. }
  75. log.Debug("ProxyName [%s], write to client to add work conn success", server.Name)
  76. }
  77. log.Error("ProxyName [%s], I'm dead!", server.Name)
  78. return
  79. }
  80. func checkProxy(req *models.ClientCtlReq, c *conn.Conn) (succ bool, msg string, needRes bool) {
  81. succ = false
  82. needRes = true
  83. // check if proxy name exist
  84. server, ok := ProxyServers[req.ProxyName]
  85. if !ok {
  86. msg = fmt.Sprintf("ProxyName [%s] is not exist", req.ProxyName)
  87. log.Warn(msg)
  88. return
  89. }
  90. // check password
  91. if req.Passwd != server.Passwd {
  92. msg = fmt.Sprintf("ProxyName [%s], password is not correct", req.ProxyName)
  93. log.Warn(msg)
  94. return
  95. }
  96. // control conn
  97. if req.Type == models.ControlConn {
  98. if server.Status != models.Idle {
  99. msg = fmt.Sprintf("ProxyName [%s], already in use", req.ProxyName)
  100. log.Warn(msg)
  101. return
  102. }
  103. // start proxy and listen for user conn, no block
  104. err := server.Start()
  105. if err != nil {
  106. msg = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err.Error())
  107. log.Warn(msg)
  108. return
  109. }
  110. log.Info("ProxyName [%s], start proxy success", req.ProxyName)
  111. } else if req.Type == models.WorkConn {
  112. // work conn
  113. needRes = false
  114. if server.Status != models.Working {
  115. log.Warn("ProxyName [%s], is not working when it gets one new work conn", req.ProxyName)
  116. return
  117. }
  118. server.CliConnChan <- c
  119. } else {
  120. msg = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName)
  121. log.Warn(msg)
  122. return
  123. }
  124. succ = true
  125. return
  126. }
  127. func readControlMsgFromClient(server *models.ProxyServer, c *conn.Conn) {
  128. isContinueRead := true
  129. f := func() {
  130. isContinueRead = false
  131. server.StopWaitUserConn()
  132. }
  133. timer := time.AfterFunc(time.Duration(HeartBeatTimeout)*time.Second, f)
  134. defer timer.Stop()
  135. for isContinueRead {
  136. content, err := c.ReadLine()
  137. //log.Debug("Receive msg from client! content:%s", content)
  138. if err != nil {
  139. if err == io.EOF {
  140. log.Warn("Server detect client[%s] is dead!", server.Name)
  141. server.StopWaitUserConn()
  142. break
  143. }
  144. log.Error("ProxyName [%s], read error:%s", server.Name, err.Error())
  145. continue
  146. }
  147. if content == "\r\n" {
  148. log.Debug("receive hearbeat:%s", content)
  149. timer.Reset(time.Duration(HeartBeatTimeout) * time.Second)
  150. }
  151. }
  152. }