nathole.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package server
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/fatedier/frp/models/msg"
  9. "github.com/fatedier/frp/utils/log"
  10. "github.com/fatedier/frp/utils/util"
  11. "github.com/fatedier/golib/errors"
  12. "github.com/fatedier/golib/pool"
  13. )
  14. // Timeout seconds.
  15. var NatHoleTimeout int64 = 10
  16. type NatHoleController struct {
  17. listener *net.UDPConn
  18. clientCfgs map[string]*NatHoleClientCfg
  19. sessions map[string]*NatHoleSession
  20. mu sync.RWMutex
  21. }
  22. func NewNatHoleController(udpBindAddr string) (nc *NatHoleController, err error) {
  23. addr, err := net.ResolveUDPAddr("udp", udpBindAddr)
  24. if err != nil {
  25. return nil, err
  26. }
  27. lconn, err := net.ListenUDP("udp", addr)
  28. if err != nil {
  29. return nil, err
  30. }
  31. nc = &NatHoleController{
  32. listener: lconn,
  33. clientCfgs: make(map[string]*NatHoleClientCfg),
  34. sessions: make(map[string]*NatHoleSession),
  35. }
  36. return nc, nil
  37. }
  38. func (nc *NatHoleController) ListenClient(name string, sk string) (sidCh chan string) {
  39. clientCfg := &NatHoleClientCfg{
  40. Name: name,
  41. Sk: sk,
  42. SidCh: make(chan string),
  43. }
  44. nc.mu.Lock()
  45. nc.clientCfgs[name] = clientCfg
  46. nc.mu.Unlock()
  47. return clientCfg.SidCh
  48. }
  49. func (nc *NatHoleController) CloseClient(name string) {
  50. nc.mu.Lock()
  51. defer nc.mu.Unlock()
  52. delete(nc.clientCfgs, name)
  53. }
  54. func (nc *NatHoleController) Run() {
  55. for {
  56. buf := pool.GetBuf(1024)
  57. n, raddr, err := nc.listener.ReadFromUDP(buf)
  58. if err != nil {
  59. log.Trace("nat hole listener read from udp error: %v", err)
  60. return
  61. }
  62. rd := bytes.NewReader(buf[:n])
  63. rawMsg, err := msg.ReadMsg(rd)
  64. if err != nil {
  65. log.Trace("read nat hole message error: %v", err)
  66. continue
  67. }
  68. switch m := rawMsg.(type) {
  69. case *msg.NatHoleVisitor:
  70. go nc.HandleVisitor(m, raddr)
  71. case *msg.NatHoleClient:
  72. go nc.HandleClient(m, raddr)
  73. default:
  74. log.Trace("error nat hole message type")
  75. continue
  76. }
  77. pool.PutBuf(buf)
  78. }
  79. }
  80. func (nc *NatHoleController) GenSid() string {
  81. t := time.Now().Unix()
  82. id, _ := util.RandId()
  83. return fmt.Sprintf("%d%s", t, id)
  84. }
  85. func (nc *NatHoleController) HandleVisitor(m *msg.NatHoleVisitor, raddr *net.UDPAddr) {
  86. sid := nc.GenSid()
  87. session := &NatHoleSession{
  88. Sid: sid,
  89. VisitorAddr: raddr,
  90. NotifyCh: make(chan struct{}, 0),
  91. }
  92. nc.mu.Lock()
  93. clientCfg, ok := nc.clientCfgs[m.ProxyName]
  94. if !ok {
  95. nc.mu.Unlock()
  96. errInfo := fmt.Sprintf("xtcp server for [%s] doesn't exist", m.ProxyName)
  97. log.Debug(errInfo)
  98. nc.listener.WriteToUDP(nc.GenNatHoleResponse(nil, errInfo), raddr)
  99. return
  100. }
  101. if m.SignKey != util.GetAuthKey(clientCfg.Sk, m.Timestamp) {
  102. nc.mu.Unlock()
  103. errInfo := fmt.Sprintf("xtcp connection of [%s] auth failed", m.ProxyName)
  104. log.Debug(errInfo)
  105. nc.listener.WriteToUDP(nc.GenNatHoleResponse(nil, errInfo), raddr)
  106. return
  107. }
  108. nc.sessions[sid] = session
  109. nc.mu.Unlock()
  110. log.Trace("handle visitor message, sid [%s]", sid)
  111. defer func() {
  112. nc.mu.Lock()
  113. delete(nc.sessions, sid)
  114. nc.mu.Unlock()
  115. }()
  116. err := errors.PanicToError(func() {
  117. clientCfg.SidCh <- sid
  118. })
  119. if err != nil {
  120. return
  121. }
  122. // Wait client connections.
  123. select {
  124. case <-session.NotifyCh:
  125. resp := nc.GenNatHoleResponse(session, "")
  126. log.Trace("send nat hole response to visitor")
  127. nc.listener.WriteToUDP(resp, raddr)
  128. case <-time.After(time.Duration(NatHoleTimeout) * time.Second):
  129. return
  130. }
  131. }
  132. func (nc *NatHoleController) HandleClient(m *msg.NatHoleClient, raddr *net.UDPAddr) {
  133. nc.mu.RLock()
  134. session, ok := nc.sessions[m.Sid]
  135. nc.mu.RUnlock()
  136. if !ok {
  137. return
  138. }
  139. log.Trace("handle client message, sid [%s]", session.Sid)
  140. session.ClientAddr = raddr
  141. session.NotifyCh <- struct{}{}
  142. resp := nc.GenNatHoleResponse(session, "")
  143. log.Trace("send nat hole response to client")
  144. nc.listener.WriteToUDP(resp, raddr)
  145. }
  146. func (nc *NatHoleController) GenNatHoleResponse(session *NatHoleSession, errInfo string) []byte {
  147. var (
  148. sid string
  149. visitorAddr string
  150. clientAddr string
  151. )
  152. if session != nil {
  153. sid = session.Sid
  154. visitorAddr = session.VisitorAddr.String()
  155. clientAddr = session.ClientAddr.String()
  156. }
  157. m := &msg.NatHoleResp{
  158. Sid: sid,
  159. VisitorAddr: visitorAddr,
  160. ClientAddr: clientAddr,
  161. Error: errInfo,
  162. }
  163. b := bytes.NewBuffer(nil)
  164. err := msg.WriteMsg(b, m)
  165. if err != nil {
  166. return []byte("")
  167. }
  168. return b.Bytes()
  169. }
  170. type NatHoleSession struct {
  171. Sid string
  172. VisitorAddr *net.UDPAddr
  173. ClientAddr *net.UDPAddr
  174. NotifyCh chan struct{}
  175. }
  176. type NatHoleClientCfg struct {
  177. Name string
  178. Sk string
  179. SidCh chan string
  180. }