control.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/fatedier/frp/models/consts"
  8. "github.com/fatedier/frp/models/msg"
  9. "github.com/fatedier/frp/models/server"
  10. "github.com/fatedier/frp/utils/conn"
  11. "github.com/fatedier/frp/utils/log"
  12. )
  13. func ProcessControlConn(l *conn.Listener) {
  14. for {
  15. c := l.GetConn()
  16. log.Debug("Get one new conn, %v", c.GetRemoteAddr())
  17. go controlWorker(c)
  18. }
  19. }
  20. // connection from every client and server
  21. func controlWorker(c *conn.Conn) {
  22. // the first message is from client to server
  23. // if error, close connection
  24. res, err := c.ReadLine()
  25. if err != nil {
  26. log.Warn("Read error, %v", err)
  27. return
  28. }
  29. log.Debug("get: %s", res)
  30. clientCtlReq := &msg.ClientCtlReq{}
  31. clientCtlRes := &msg.ClientCtlRes{}
  32. if err := json.Unmarshal([]byte(res), &clientCtlReq); err != nil {
  33. log.Warn("Parse err: %v : %s", err, res)
  34. return
  35. }
  36. // check
  37. succ, info, needRes := checkProxy(clientCtlReq, c)
  38. if !succ {
  39. clientCtlRes.Code = 1
  40. clientCtlRes.Msg = info
  41. }
  42. if needRes {
  43. // control conn
  44. defer c.Close()
  45. buf, _ := json.Marshal(clientCtlRes)
  46. err = c.Write(string(buf) + "\n")
  47. if err != nil {
  48. log.Warn("Write error, %v", err)
  49. time.Sleep(1 * time.Second)
  50. return
  51. }
  52. } else {
  53. // work conn, just return
  54. return
  55. }
  56. // others is from server to client
  57. s, ok := server.ProxyServers[clientCtlReq.ProxyName]
  58. if !ok {
  59. log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName)
  60. return
  61. }
  62. // read control msg from client
  63. go readControlMsgFromClient(s, c)
  64. serverCtlReq := &msg.ClientCtlReq{}
  65. serverCtlReq.Type = consts.WorkConn
  66. for {
  67. closeFlag := s.WaitUserConn()
  68. if closeFlag {
  69. log.Debug("ProxyName [%s], goroutine for dealing user conn is closed", s.Name)
  70. break
  71. }
  72. buf, _ := json.Marshal(serverCtlReq)
  73. err = c.Write(string(buf) + "\n")
  74. if err != nil {
  75. log.Warn("ProxyName [%s], write to client error, proxy exit", s.Name)
  76. s.Close()
  77. return
  78. }
  79. log.Debug("ProxyName [%s], write to client to add work conn success", s.Name)
  80. }
  81. log.Info("ProxyName [%s], I'm dead!", s.Name)
  82. return
  83. }
  84. func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, needRes bool) {
  85. succ = false
  86. needRes = true
  87. // check if proxy name exist
  88. s, ok := server.ProxyServers[req.ProxyName]
  89. if !ok {
  90. info = fmt.Sprintf("ProxyName [%s] is not exist", req.ProxyName)
  91. log.Warn(info)
  92. return
  93. }
  94. // check password
  95. if req.Passwd != s.Passwd {
  96. info = fmt.Sprintf("ProxyName [%s], password is not correct", req.ProxyName)
  97. log.Warn(info)
  98. return
  99. }
  100. // control conn
  101. if req.Type == consts.CtlConn {
  102. if s.Status != consts.Idle {
  103. info = fmt.Sprintf("ProxyName [%s], already in use", req.ProxyName)
  104. log.Warn(info)
  105. return
  106. }
  107. // start proxy and listen for user conn, no block
  108. err := s.Start()
  109. if err != nil {
  110. info = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err.Error())
  111. log.Warn(info)
  112. return
  113. }
  114. log.Info("ProxyName [%s], start proxy success", req.ProxyName)
  115. } else if req.Type == consts.WorkConn {
  116. // work conn
  117. needRes = false
  118. if s.Status != consts.Working {
  119. log.Warn("ProxyName [%s], is not working when it gets one new work conn", req.ProxyName)
  120. return
  121. }
  122. s.CliConnChan <- c
  123. } else {
  124. info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type)
  125. log.Warn(info)
  126. return
  127. }
  128. succ = true
  129. return
  130. }
  131. func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) {
  132. isContinueRead := true
  133. f := func() {
  134. isContinueRead = false
  135. c.Close()
  136. s.Close()
  137. }
  138. timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f)
  139. defer timer.Stop()
  140. for isContinueRead {
  141. _, err := c.ReadLine()
  142. if err != nil {
  143. if err == io.EOF {
  144. log.Warn("ProxyName [%s], client is dead!", s.Name)
  145. c.Close()
  146. s.Close()
  147. break
  148. }
  149. log.Error("ProxyName [%s], read error: %v", s.Name, err)
  150. continue
  151. }
  152. timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
  153. }
  154. }