control.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/fatedier/frp/models/consts"
  8. "github.com/fatedier/frp/models/msg"
  9. "github.com/fatedier/frp/models/server"
  10. "github.com/fatedier/frp/utils/conn"
  11. "github.com/fatedier/frp/utils/log"
  12. )
  13. func ProcessControlConn(l *conn.Listener) {
  14. for {
  15. c, err := l.GetConn()
  16. if err != nil {
  17. return
  18. }
  19. log.Debug("Get one new conn, %v", c.GetRemoteAddr())
  20. go controlWorker(c)
  21. }
  22. }
  23. // connection from every client and server
  24. func controlWorker(c *conn.Conn) {
  25. // the first message is from client to server
  26. // if error, close connection
  27. res, err := c.ReadLine()
  28. if err != nil {
  29. log.Warn("Read error, %v", err)
  30. return
  31. }
  32. log.Debug("get: %s", res)
  33. clientCtlReq := &msg.ClientCtlReq{}
  34. clientCtlRes := &msg.ClientCtlRes{}
  35. if err := json.Unmarshal([]byte(res), &clientCtlReq); err != nil {
  36. log.Warn("Parse err: %v : %s", err, res)
  37. return
  38. }
  39. // check
  40. succ, info, needRes := checkProxy(clientCtlReq, c)
  41. if !succ {
  42. clientCtlRes.Code = 1
  43. clientCtlRes.Msg = info
  44. }
  45. if needRes {
  46. defer c.Close()
  47. buf, _ := json.Marshal(clientCtlRes)
  48. err = c.Write(string(buf) + "\n")
  49. if err != nil {
  50. log.Warn("Write error, %v", err)
  51. time.Sleep(1 * time.Second)
  52. return
  53. }
  54. } else {
  55. // work conn, just return
  56. return
  57. }
  58. // other messages is from server to client
  59. s, ok := server.ProxyServers[clientCtlReq.ProxyName]
  60. if !ok {
  61. log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName)
  62. return
  63. }
  64. // read control msg from client
  65. go readControlMsgFromClient(s, c)
  66. serverCtlReq := &msg.ClientCtlReq{}
  67. serverCtlReq.Type = consts.WorkConn
  68. for {
  69. closeFlag := s.WaitUserConn()
  70. if closeFlag {
  71. log.Debug("ProxyName [%s], goroutine for dealing user conn is closed", s.Name)
  72. break
  73. }
  74. buf, _ := json.Marshal(serverCtlReq)
  75. err = c.Write(string(buf) + "\n")
  76. if err != nil {
  77. log.Warn("ProxyName [%s], write to client error, proxy exit", s.Name)
  78. s.Close()
  79. return
  80. }
  81. log.Debug("ProxyName [%s], write to client to add work conn success", s.Name)
  82. }
  83. log.Info("ProxyName [%s], I'm dead!", s.Name)
  84. return
  85. }
  86. func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, needRes bool) {
  87. succ = false
  88. needRes = true
  89. // check if proxy name exist
  90. s, ok := server.ProxyServers[req.ProxyName]
  91. if !ok {
  92. info = fmt.Sprintf("ProxyName [%s] is not exist", req.ProxyName)
  93. log.Warn(info)
  94. return
  95. }
  96. // check password
  97. if req.Passwd != s.Passwd {
  98. info = fmt.Sprintf("ProxyName [%s], password is not correct", req.ProxyName)
  99. log.Warn(info)
  100. return
  101. }
  102. // control conn
  103. if req.Type == consts.CtlConn {
  104. if s.Status != consts.Idle {
  105. info = fmt.Sprintf("ProxyName [%s], already in use", req.ProxyName)
  106. log.Warn(info)
  107. return
  108. }
  109. // start proxy and listen for user conn, no block
  110. err := s.Start()
  111. if err != nil {
  112. info = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err.Error())
  113. log.Warn(info)
  114. return
  115. }
  116. log.Info("ProxyName [%s], start proxy success", req.ProxyName)
  117. } else if req.Type == consts.WorkConn {
  118. // work conn
  119. needRes = false
  120. if s.Status != consts.Working {
  121. log.Warn("ProxyName [%s], is not working when it gets one new work conn", req.ProxyName)
  122. return
  123. }
  124. s.GetNewCliConn(c)
  125. } else {
  126. info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type)
  127. log.Warn(info)
  128. return
  129. }
  130. succ = true
  131. return
  132. }
  133. func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) {
  134. isContinueRead := true
  135. f := func() {
  136. isContinueRead = false
  137. s.Close()
  138. log.Error("ProxyName [%s], client heartbeat timeout", s.Name)
  139. }
  140. timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f)
  141. defer timer.Stop()
  142. for isContinueRead {
  143. _, err := c.ReadLine()
  144. if err != nil {
  145. if err == io.EOF {
  146. log.Warn("ProxyName [%s], client is dead!", s.Name)
  147. s.Close()
  148. break
  149. } else if c.IsClosed() {
  150. log.Warn("ProxyName [%s], client connection is closed", s.Name)
  151. break
  152. }
  153. log.Error("ProxyName [%s], read error: %v", s.Name, err)
  154. continue
  155. }
  156. log.Debug("ProxyName [%s], get heartbeat", s.Name)
  157. timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
  158. }
  159. }