control.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/fatedier/frp/models/consts"
  8. "github.com/fatedier/frp/models/msg"
  9. "github.com/fatedier/frp/models/server"
  10. "github.com/fatedier/frp/utils/conn"
  11. "github.com/fatedier/frp/utils/log"
  12. )
  13. func ProcessControlConn(l *conn.Listener) {
  14. for {
  15. c := l.GetConn()
  16. log.Debug("Get one new conn, %v", c.GetRemoteAddr())
  17. go controlWorker(c)
  18. }
  19. }
  20. // connection from every client and server
  21. func controlWorker(c *conn.Conn) {
  22. // the first message is from client to server
  23. // if error, close connection
  24. res, err := c.ReadLine()
  25. if err != nil {
  26. log.Warn("Read error, %v", err)
  27. return
  28. }
  29. log.Debug("get: %s", res)
  30. clientCtlReq := &msg.ClientCtlReq{}
  31. clientCtlRes := &msg.ClientCtlRes{}
  32. if err := json.Unmarshal([]byte(res), &clientCtlReq); err != nil {
  33. log.Warn("Parse err: %v : %s", err, res)
  34. return
  35. }
  36. // check
  37. succ, info, needRes := checkProxy(clientCtlReq, c)
  38. if !succ {
  39. clientCtlRes.Code = 1
  40. clientCtlRes.Msg = info
  41. }
  42. if needRes {
  43. // control conn
  44. defer c.Close()
  45. buf, _ := json.Marshal(clientCtlRes)
  46. err = c.Write(string(buf) + "\n")
  47. if err != nil {
  48. log.Warn("Write error, %v", err)
  49. time.Sleep(1 * time.Second)
  50. return
  51. }
  52. } else {
  53. // work conn, just return
  54. return
  55. }
  56. // others is from server to client
  57. server, ok := ProxyServers[clientCtlReq.ProxyName]
  58. if !ok {
  59. log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName)
  60. return
  61. }
  62. // read control msg from client
  63. go readControlMsgFromClient(server, c)
  64. serverCtlReq := &msg.ClientCtlReq{}
  65. serverCtlReq.Type = consts.WorkConn
  66. for {
  67. _, isStop := server.WaitUserConn()
  68. if isStop {
  69. break
  70. }
  71. buf, _ := json.Marshal(serverCtlReq)
  72. err = c.Write(string(buf) + "\n")
  73. if err != nil {
  74. log.Warn("ProxyName [%s], write to client error, proxy exit", server.Name)
  75. server.Close()
  76. return
  77. }
  78. log.Debug("ProxyName [%s], write to client to add work conn success", server.Name)
  79. }
  80. log.Error("ProxyName [%s], I'm dead!", server.Name)
  81. return
  82. }
  83. func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, needRes bool) {
  84. succ = false
  85. needRes = true
  86. // check if proxy name exist
  87. server, ok := ProxyServers[req.ProxyName]
  88. if !ok {
  89. info = fmt.Sprintf("ProxyName [%s] is not exist", req.ProxyName)
  90. log.Warn(info)
  91. return
  92. }
  93. // check password
  94. if req.Passwd != server.Passwd {
  95. info = fmt.Sprintf("ProxyName [%s], password is not correct", req.ProxyName)
  96. log.Warn(info)
  97. return
  98. }
  99. // control conn
  100. if req.Type == consts.CtlConn {
  101. if server.Status != consts.Idle {
  102. info = fmt.Sprintf("ProxyName [%s], already in use", req.ProxyName)
  103. log.Warn(info)
  104. return
  105. }
  106. // start proxy and listen for user conn, no block
  107. err := server.Start()
  108. if err != nil {
  109. info = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err.Error())
  110. log.Warn(info)
  111. return
  112. }
  113. log.Info("ProxyName [%s], start proxy success", req.ProxyName)
  114. } else if req.Type == consts.WorkConn {
  115. // work conn
  116. needRes = false
  117. if server.Status != consts.Working {
  118. log.Warn("ProxyName [%s], is not working when it gets one new work conn", req.ProxyName)
  119. return
  120. }
  121. server.CliConnChan <- c
  122. } else {
  123. info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type)
  124. log.Warn(info)
  125. return
  126. }
  127. succ = true
  128. return
  129. }
  130. func readControlMsgFromClient(server *server.ProxyServer, c *conn.Conn) {
  131. isContinueRead := true
  132. f := func() {
  133. isContinueRead = false
  134. server.StopWaitUserConn()
  135. }
  136. timer := time.AfterFunc(time.Duration(HeartBeatTimeout)*time.Second, f)
  137. defer timer.Stop()
  138. for isContinueRead {
  139. content, err := c.ReadLine()
  140. //log.Debug("Receive msg from client! content:%s", content)
  141. if err != nil {
  142. if err == io.EOF {
  143. log.Warn("Server detect client[%s] is dead!", server.Name)
  144. server.StopWaitUserConn()
  145. break
  146. }
  147. log.Error("ProxyName [%s], read error:%s", server.Name, err.Error())
  148. continue
  149. }
  150. if content == "\n" {
  151. timer.Reset(time.Duration(HeartBeatTimeout) * time.Second)
  152. }
  153. }
  154. }