control.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/fatedier/frp/pkg/models"
  8. "github.com/fatedier/frp/pkg/utils/conn"
  9. "github.com/fatedier/frp/pkg/utils/log"
  10. )
  11. func ProcessControlConn(l *conn.Listener) {
  12. for {
  13. c := l.GetConn()
  14. log.Debug("Get one new conn, %v", c.GetRemoteAddr())
  15. go controlWorker(c)
  16. }
  17. }
  18. // connection from every client and server
  19. func controlWorker(c *conn.Conn) {
  20. // the first message is from client to server
  21. // if error, close connection
  22. res, err := c.ReadLine()
  23. if err != nil {
  24. log.Warn("Read error, %v", err)
  25. return
  26. }
  27. log.Debug("get: %s", res)
  28. clientCtlReq := &models.ClientCtlReq{}
  29. clientCtlRes := &models.ClientCtlRes{}
  30. if err := json.Unmarshal([]byte(res), &clientCtlReq); err != nil {
  31. log.Warn("Parse err: %v : %s", err, res)
  32. return
  33. }
  34. // check
  35. succ, msg, needRes := checkProxy(clientCtlReq, c)
  36. if !succ {
  37. clientCtlRes.Code = 1
  38. clientCtlRes.Msg = msg
  39. }
  40. if needRes {
  41. // control conn
  42. defer c.Close()
  43. buf, _ := json.Marshal(clientCtlRes)
  44. err = c.Write(string(buf) + "\n")
  45. if err != nil {
  46. log.Warn("Write error, %v", err)
  47. time.Sleep(1 * time.Second)
  48. return
  49. }
  50. } else {
  51. // work conn, just return
  52. return
  53. }
  54. // others is from server to client
  55. server, ok := ProxyServers[clientCtlReq.ProxyName]
  56. if !ok {
  57. log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName)
  58. return
  59. }
  60. // read control msg from client
  61. go readControlMsgFromClient(server, c)
  62. serverCtlReq := &models.ClientCtlReq{}
  63. serverCtlReq.Type = models.WorkConn
  64. for {
  65. _, isStop := server.WaitUserConn()
  66. if isStop {
  67. break
  68. }
  69. buf, _ := json.Marshal(serverCtlReq)
  70. err = c.Write(string(buf) + "\n")
  71. if err != nil {
  72. log.Warn("ProxyName [%s], write to client error, proxy exit", server.Name)
  73. server.Close()
  74. return
  75. }
  76. log.Debug("ProxyName [%s], write to client to add work conn success", server.Name)
  77. }
  78. log.Error("ProxyName [%s], I'm dead!", server.Name)
  79. return
  80. }
  81. func checkProxy(req *models.ClientCtlReq, c *conn.Conn) (succ bool, msg string, needRes bool) {
  82. succ = false
  83. needRes = true
  84. // check if proxy name exist
  85. server, ok := ProxyServers[req.ProxyName]
  86. if !ok {
  87. msg = fmt.Sprintf("ProxyName [%s] is not exist", req.ProxyName)
  88. log.Warn(msg)
  89. return
  90. }
  91. // check password
  92. if req.Passwd != server.Passwd {
  93. msg = fmt.Sprintf("ProxyName [%s], password is not correct", req.ProxyName)
  94. log.Warn(msg)
  95. return
  96. }
  97. // control conn
  98. if req.Type == models.ControlConn {
  99. if server.Status != models.Idle {
  100. msg = fmt.Sprintf("ProxyName [%s], already in use", req.ProxyName)
  101. log.Warn(msg)
  102. return
  103. }
  104. // start proxy and listen for user conn, no block
  105. err := server.Start()
  106. if err != nil {
  107. msg = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err.Error())
  108. log.Warn(msg)
  109. return
  110. }
  111. log.Info("ProxyName [%s], start proxy success", req.ProxyName)
  112. } else if req.Type == models.WorkConn {
  113. // work conn
  114. needRes = false
  115. if server.Status != models.Working {
  116. log.Warn("ProxyName [%s], is not working when it gets one new work conn", req.ProxyName)
  117. return
  118. }
  119. server.CliConnChan <- c
  120. } else {
  121. log.Warn("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type)
  122. return
  123. }
  124. succ = true
  125. return
  126. }
  127. func readControlMsgFromClient(server *models.ProxyServer, c *conn.Conn) {
  128. isContinueRead := true
  129. f := func() {
  130. isContinueRead = false
  131. server.StopWaitUserConn()
  132. }
  133. timer := time.AfterFunc(time.Duration(HeartBeatTimeout)*time.Second, f)
  134. defer timer.Stop()
  135. for isContinueRead {
  136. content, err := c.ReadLine()
  137. //log.Debug("Receive msg from client! content:%s", content)
  138. if err != nil {
  139. if err == io.EOF {
  140. log.Warn("Server detect client[%s] is dead!", server.Name)
  141. server.StopWaitUserConn()
  142. break
  143. }
  144. log.Error("ProxyName [%s], read error:%s", server.Name, err.Error())
  145. continue
  146. }
  147. if content == "\n" {
  148. timer.Reset(time.Duration(HeartBeatTimeout) * time.Second)
  149. }
  150. }
  151. }