nathole.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package server
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/fatedier/frp/models/msg"
  9. "github.com/fatedier/frp/utils/log"
  10. "github.com/fatedier/frp/utils/util"
  11. "github.com/fatedier/golib/errors"
  12. "github.com/fatedier/golib/pool"
  13. )
  14. // Timeout seconds.
  15. var NatHoleTimeout int64 = 10
  16. type NatHoleController struct {
  17. listener *net.UDPConn
  18. clientCfgs map[string]*NatHoleClientCfg
  19. sessions map[string]*NatHoleSession
  20. mu sync.RWMutex
  21. }
  22. func NewNatHoleController(udpBindAddr string) (nc *NatHoleController, err error) {
  23. addr, err := net.ResolveUDPAddr("udp", udpBindAddr)
  24. if err != nil {
  25. return nil, err
  26. }
  27. lconn, err := net.ListenUDP("udp", addr)
  28. if err != nil {
  29. return nil, err
  30. }
  31. nc = &NatHoleController{
  32. listener: lconn,
  33. clientCfgs: make(map[string]*NatHoleClientCfg),
  34. sessions: make(map[string]*NatHoleSession),
  35. }
  36. return nc, nil
  37. }
  38. func (nc *NatHoleController) ListenClient(name string, sk string) (sidCh chan string) {
  39. clientCfg := &NatHoleClientCfg{
  40. Name: name,
  41. Sk: sk,
  42. SidCh: make(chan string),
  43. }
  44. nc.mu.Lock()
  45. nc.clientCfgs[name] = clientCfg
  46. nc.mu.Unlock()
  47. return clientCfg.SidCh
  48. }
  49. func (nc *NatHoleController) CloseClient(name string) {
  50. nc.mu.Lock()
  51. defer nc.mu.Unlock()
  52. delete(nc.clientCfgs, name)
  53. }
  54. func (nc *NatHoleController) Run() {
  55. for {
  56. buf := pool.GetBuf(1024)
  57. n, raddr, err := nc.listener.ReadFromUDP(buf)
  58. if err != nil {
  59. log.Trace("nat hole listener read from udp error: %v", err)
  60. return
  61. }
  62. rd := bytes.NewReader(buf[:n])
  63. rawMsg, err := msg.ReadMsg(rd)
  64. if err != nil {
  65. log.Trace("read nat hole message error: %v", err)
  66. continue
  67. }
  68. switch m := rawMsg.(type) {
  69. case *msg.NatHoleVisitor:
  70. go nc.HandleVisitor(m, raddr)
  71. case *msg.NatHoleClient:
  72. go nc.HandleClient(m, raddr)
  73. default:
  74. log.Trace("error nat hole message type")
  75. continue
  76. }
  77. pool.PutBuf(buf)
  78. }
  79. }
  80. func (nc *NatHoleController) GenSid() string {
  81. t := time.Now().Unix()
  82. id, _ := util.RandId()
  83. return fmt.Sprintf("%d%s", t, id)
  84. }
  85. func (nc *NatHoleController) HandleVisitor(m *msg.NatHoleVisitor, raddr *net.UDPAddr) {
  86. sid := nc.GenSid()
  87. session := &NatHoleSession{
  88. Sid: sid,
  89. VisitorAddr: raddr,
  90. NotifyCh: make(chan struct{}, 0),
  91. }
  92. nc.mu.Lock()
  93. clientCfg, ok := nc.clientCfgs[m.ProxyName]
  94. if !ok {
  95. nc.mu.Unlock()
  96. log.Debug("xtcp server for [%s] doesn't exist", m.ProxyName)
  97. return
  98. }
  99. if m.SignKey != util.GetAuthKey(clientCfg.Sk, m.Timestamp) {
  100. nc.mu.Unlock()
  101. log.Debug("xtcp connection of [%s] auth failed", m.ProxyName)
  102. return
  103. }
  104. nc.sessions[sid] = session
  105. nc.mu.Unlock()
  106. log.Trace("handle visitor message, sid [%s]", sid)
  107. defer func() {
  108. nc.mu.Lock()
  109. delete(nc.sessions, sid)
  110. nc.mu.Unlock()
  111. }()
  112. err := errors.PanicToError(func() {
  113. clientCfg.SidCh <- sid
  114. })
  115. if err != nil {
  116. return
  117. }
  118. // Wait client connections.
  119. select {
  120. case <-session.NotifyCh:
  121. resp := nc.GenNatHoleResponse(raddr, session)
  122. log.Trace("send nat hole response to visitor")
  123. nc.listener.WriteToUDP(resp, raddr)
  124. case <-time.After(time.Duration(NatHoleTimeout) * time.Second):
  125. return
  126. }
  127. }
  128. func (nc *NatHoleController) HandleClient(m *msg.NatHoleClient, raddr *net.UDPAddr) {
  129. nc.mu.RLock()
  130. session, ok := nc.sessions[m.Sid]
  131. nc.mu.RUnlock()
  132. if !ok {
  133. return
  134. }
  135. log.Trace("handle client message, sid [%s]", session.Sid)
  136. session.ClientAddr = raddr
  137. session.NotifyCh <- struct{}{}
  138. resp := nc.GenNatHoleResponse(raddr, session)
  139. log.Trace("send nat hole response to client")
  140. nc.listener.WriteToUDP(resp, raddr)
  141. }
  142. func (nc *NatHoleController) GenNatHoleResponse(raddr *net.UDPAddr, session *NatHoleSession) []byte {
  143. m := &msg.NatHoleResp{
  144. Sid: session.Sid,
  145. VisitorAddr: session.VisitorAddr.String(),
  146. ClientAddr: session.ClientAddr.String(),
  147. }
  148. b := bytes.NewBuffer(nil)
  149. err := msg.WriteMsg(b, m)
  150. if err != nil {
  151. return []byte("")
  152. }
  153. return b.Bytes()
  154. }
  155. type NatHoleSession struct {
  156. Sid string
  157. VisitorAddr *net.UDPAddr
  158. ClientAddr *net.UDPAddr
  159. NotifyCh chan struct{}
  160. }
  161. type NatHoleClientCfg struct {
  162. Name string
  163. Sk string
  164. SidCh chan string
  165. }