1
0

udp.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. // Copyright 2019 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net"
  20. "strconv"
  21. "time"
  22. "github.com/fatedier/golib/errors"
  23. frpIo "github.com/fatedier/golib/io"
  24. "golang.org/x/time/rate"
  25. "github.com/fatedier/frp/pkg/config"
  26. "github.com/fatedier/frp/pkg/msg"
  27. "github.com/fatedier/frp/pkg/proto/udp"
  28. "github.com/fatedier/frp/pkg/util/limit"
  29. frpNet "github.com/fatedier/frp/pkg/util/net"
  30. "github.com/fatedier/frp/server/metrics"
  31. )
  32. type UDPProxy struct {
  33. *BaseProxy
  34. cfg *config.UDPProxyConf
  35. realPort int
  36. // udpConn is the listener of udp packages
  37. udpConn *net.UDPConn
  38. // there are always only one workConn at the same time
  39. // get another one if it closed
  40. workConn net.Conn
  41. // sendCh is used for sending packages to workConn
  42. sendCh chan *msg.UDPPacket
  43. // readCh is used for reading packages from workConn
  44. readCh chan *msg.UDPPacket
  45. // checkCloseCh is used for watching if workConn is closed
  46. checkCloseCh chan int
  47. isClosed bool
  48. }
  49. func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
  50. xl := pxy.xl
  51. pxy.realPort, err = pxy.rc.UDPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
  52. if err != nil {
  53. return "", fmt.Errorf("acquire port %d error: %v", pxy.cfg.RemotePort, err)
  54. }
  55. defer func() {
  56. if err != nil {
  57. pxy.rc.UDPPortManager.Release(pxy.realPort)
  58. }
  59. }()
  60. remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
  61. pxy.cfg.RemotePort = pxy.realPort
  62. addr, errRet := net.ResolveUDPAddr("udp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realPort)))
  63. if errRet != nil {
  64. err = errRet
  65. return
  66. }
  67. udpConn, errRet := net.ListenUDP("udp", addr)
  68. if errRet != nil {
  69. err = errRet
  70. xl.Warn("listen udp port error: %v", err)
  71. return
  72. }
  73. xl.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort)
  74. pxy.udpConn = udpConn
  75. pxy.sendCh = make(chan *msg.UDPPacket, 1024)
  76. pxy.readCh = make(chan *msg.UDPPacket, 1024)
  77. pxy.checkCloseCh = make(chan int)
  78. // read message from workConn, if it returns any error, notify proxy to start a new workConn
  79. workConnReaderFn := func(conn net.Conn) {
  80. for {
  81. var (
  82. rawMsg msg.Message
  83. errRet error
  84. )
  85. xl.Trace("loop waiting message from udp workConn")
  86. // client will send heartbeat in workConn for keeping alive
  87. _ = conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
  88. if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
  89. xl.Warn("read from workConn for udp error: %v", errRet)
  90. _ = conn.Close()
  91. // notify proxy to start a new work connection
  92. // ignore error here, it means the proxy is closed
  93. _ = errors.PanicToError(func() {
  94. pxy.checkCloseCh <- 1
  95. })
  96. return
  97. }
  98. if err := conn.SetReadDeadline(time.Time{}); err != nil {
  99. xl.Warn("set read deadline error: %v", err)
  100. }
  101. switch m := rawMsg.(type) {
  102. case *msg.Ping:
  103. xl.Trace("udp work conn get ping message")
  104. continue
  105. case *msg.UDPPacket:
  106. if errRet := errors.PanicToError(func() {
  107. xl.Trace("get udp message from workConn: %s", m.Content)
  108. pxy.readCh <- m
  109. metrics.Server.AddTrafficOut(
  110. pxy.GetName(),
  111. pxy.GetConf().GetBaseInfo().ProxyType,
  112. int64(len(m.Content)),
  113. )
  114. }); errRet != nil {
  115. conn.Close()
  116. xl.Info("reader goroutine for udp work connection closed")
  117. return
  118. }
  119. }
  120. }
  121. }
  122. // send message to workConn
  123. workConnSenderFn := func(conn net.Conn, ctx context.Context) {
  124. var errRet error
  125. for {
  126. select {
  127. case udpMsg, ok := <-pxy.sendCh:
  128. if !ok {
  129. xl.Info("sender goroutine for udp work connection closed")
  130. return
  131. }
  132. if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
  133. xl.Info("sender goroutine for udp work connection closed: %v", errRet)
  134. conn.Close()
  135. return
  136. }
  137. xl.Trace("send message to udp workConn: %s", udpMsg.Content)
  138. metrics.Server.AddTrafficIn(
  139. pxy.GetName(),
  140. pxy.GetConf().GetBaseInfo().ProxyType,
  141. int64(len(udpMsg.Content)),
  142. )
  143. continue
  144. case <-ctx.Done():
  145. xl.Info("sender goroutine for udp work connection closed")
  146. return
  147. }
  148. }
  149. }
  150. go func() {
  151. // Sleep a while for waiting control send the NewProxyResp to client.
  152. time.Sleep(500 * time.Millisecond)
  153. for {
  154. workConn, err := pxy.GetWorkConnFromPool(nil, nil)
  155. if err != nil {
  156. time.Sleep(1 * time.Second)
  157. // check if proxy is closed
  158. select {
  159. case _, ok := <-pxy.checkCloseCh:
  160. if !ok {
  161. return
  162. }
  163. default:
  164. }
  165. continue
  166. }
  167. // close the old workConn and replace it with a new one
  168. if pxy.workConn != nil {
  169. pxy.workConn.Close()
  170. }
  171. var rwc io.ReadWriteCloser = workConn
  172. if pxy.cfg.UseEncryption {
  173. rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.serverCfg.Token))
  174. if err != nil {
  175. xl.Error("create encryption stream error: %v", err)
  176. workConn.Close()
  177. continue
  178. }
  179. }
  180. if pxy.cfg.UseCompression {
  181. rwc = frpIo.WithCompression(rwc)
  182. }
  183. if pxy.GetLimiter() != nil {
  184. rwc = frpIo.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error {
  185. return rwc.Close()
  186. })
  187. }
  188. pxy.workConn = frpNet.WrapReadWriteCloserToConn(rwc, workConn)
  189. ctx, cancel := context.WithCancel(context.Background())
  190. go workConnReaderFn(pxy.workConn)
  191. go workConnSenderFn(pxy.workConn, ctx)
  192. _, ok := <-pxy.checkCloseCh
  193. cancel()
  194. if !ok {
  195. return
  196. }
  197. }
  198. }()
  199. // Read from user connections and send wrapped udp message to sendCh (forwarded by workConn).
  200. // Client will transfor udp message to local udp service and waiting for response for a while.
  201. // Response will be wrapped to be forwarded by work connection to server.
  202. // Close readCh and sendCh at the end.
  203. go func() {
  204. udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh, int(pxy.serverCfg.UDPPacketSize))
  205. pxy.Close()
  206. }()
  207. return remoteAddr, nil
  208. }
  209. func (pxy *UDPProxy) GetConf() config.ProxyConf {
  210. return pxy.cfg
  211. }
  212. func (pxy *UDPProxy) GetLimiter() *rate.Limiter {
  213. return pxy.limiter
  214. }
  215. func (pxy *UDPProxy) Close() {
  216. pxy.mu.Lock()
  217. defer pxy.mu.Unlock()
  218. if !pxy.isClosed {
  219. pxy.isClosed = true
  220. pxy.BaseProxy.Close()
  221. if pxy.workConn != nil {
  222. pxy.workConn.Close()
  223. }
  224. pxy.udpConn.Close()
  225. // all channels only closed here
  226. close(pxy.checkCloseCh)
  227. close(pxy.readCh)
  228. close(pxy.sendCh)
  229. }
  230. pxy.rc.UDPPortManager.Release(pxy.realPort)
  231. }