proxy.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io"
  20. "net"
  21. "strconv"
  22. "strings"
  23. "sync"
  24. "time"
  25. "github.com/fatedier/frp/pkg/config"
  26. "github.com/fatedier/frp/pkg/msg"
  27. plugin "github.com/fatedier/frp/pkg/plugin/client"
  28. "github.com/fatedier/frp/pkg/proto/udp"
  29. "github.com/fatedier/frp/pkg/util/limit"
  30. frpNet "github.com/fatedier/frp/pkg/util/net"
  31. "github.com/fatedier/frp/pkg/util/xlog"
  32. "github.com/fatedier/golib/errors"
  33. frpIo "github.com/fatedier/golib/io"
  34. libdial "github.com/fatedier/golib/net/dial"
  35. "github.com/fatedier/golib/pool"
  36. fmux "github.com/hashicorp/yamux"
  37. pp "github.com/pires/go-proxyproto"
  38. "golang.org/x/time/rate"
  39. )
  40. // Proxy defines how to handle work connections for different proxy type.
  41. type Proxy interface {
  42. Run() error
  43. // InWorkConn accept work connections registered to server.
  44. InWorkConn(net.Conn, *msg.StartWorkConn)
  45. Close()
  46. }
  47. func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) {
  48. var limiter *rate.Limiter
  49. limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
  50. if limitBytes > 0 {
  51. limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
  52. }
  53. baseProxy := BaseProxy{
  54. clientCfg: clientCfg,
  55. serverUDPPort: serverUDPPort,
  56. limiter: limiter,
  57. xl: xlog.FromContextSafe(ctx),
  58. ctx: ctx,
  59. }
  60. switch cfg := pxyConf.(type) {
  61. case *config.TCPProxyConf:
  62. pxy = &TCPProxy{
  63. BaseProxy: &baseProxy,
  64. cfg: cfg,
  65. }
  66. case *config.TCPMuxProxyConf:
  67. pxy = &TCPMuxProxy{
  68. BaseProxy: &baseProxy,
  69. cfg: cfg,
  70. }
  71. case *config.UDPProxyConf:
  72. pxy = &UDPProxy{
  73. BaseProxy: &baseProxy,
  74. cfg: cfg,
  75. }
  76. case *config.HTTPProxyConf:
  77. pxy = &HTTPProxy{
  78. BaseProxy: &baseProxy,
  79. cfg: cfg,
  80. }
  81. case *config.HTTPSProxyConf:
  82. pxy = &HTTPSProxy{
  83. BaseProxy: &baseProxy,
  84. cfg: cfg,
  85. }
  86. case *config.STCPProxyConf:
  87. pxy = &STCPProxy{
  88. BaseProxy: &baseProxy,
  89. cfg: cfg,
  90. }
  91. case *config.XTCPProxyConf:
  92. pxy = &XTCPProxy{
  93. BaseProxy: &baseProxy,
  94. cfg: cfg,
  95. }
  96. case *config.SUDPProxyConf:
  97. pxy = &SUDPProxy{
  98. BaseProxy: &baseProxy,
  99. cfg: cfg,
  100. closeCh: make(chan struct{}),
  101. }
  102. }
  103. return
  104. }
  105. type BaseProxy struct {
  106. closed bool
  107. clientCfg config.ClientCommonConf
  108. serverUDPPort int
  109. limiter *rate.Limiter
  110. mu sync.RWMutex
  111. xl *xlog.Logger
  112. ctx context.Context
  113. }
  114. // TCP
  115. type TCPProxy struct {
  116. *BaseProxy
  117. cfg *config.TCPProxyConf
  118. proxyPlugin plugin.Plugin
  119. }
  120. func (pxy *TCPProxy) Run() (err error) {
  121. if pxy.cfg.Plugin != "" {
  122. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  123. if err != nil {
  124. return
  125. }
  126. }
  127. return
  128. }
  129. func (pxy *TCPProxy) Close() {
  130. if pxy.proxyPlugin != nil {
  131. pxy.proxyPlugin.Close()
  132. }
  133. }
  134. func (pxy *TCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  135. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  136. conn, []byte(pxy.clientCfg.Token), m)
  137. }
  138. // TCP Multiplexer
  139. type TCPMuxProxy struct {
  140. *BaseProxy
  141. cfg *config.TCPMuxProxyConf
  142. proxyPlugin plugin.Plugin
  143. }
  144. func (pxy *TCPMuxProxy) Run() (err error) {
  145. if pxy.cfg.Plugin != "" {
  146. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  147. if err != nil {
  148. return
  149. }
  150. }
  151. return
  152. }
  153. func (pxy *TCPMuxProxy) Close() {
  154. if pxy.proxyPlugin != nil {
  155. pxy.proxyPlugin.Close()
  156. }
  157. }
  158. func (pxy *TCPMuxProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  159. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  160. conn, []byte(pxy.clientCfg.Token), m)
  161. }
  162. // HTTP
  163. type HTTPProxy struct {
  164. *BaseProxy
  165. cfg *config.HTTPProxyConf
  166. proxyPlugin plugin.Plugin
  167. }
  168. func (pxy *HTTPProxy) Run() (err error) {
  169. if pxy.cfg.Plugin != "" {
  170. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  171. if err != nil {
  172. return
  173. }
  174. }
  175. return
  176. }
  177. func (pxy *HTTPProxy) Close() {
  178. if pxy.proxyPlugin != nil {
  179. pxy.proxyPlugin.Close()
  180. }
  181. }
  182. func (pxy *HTTPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  183. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  184. conn, []byte(pxy.clientCfg.Token), m)
  185. }
  186. // HTTPS
  187. type HTTPSProxy struct {
  188. *BaseProxy
  189. cfg *config.HTTPSProxyConf
  190. proxyPlugin plugin.Plugin
  191. }
  192. func (pxy *HTTPSProxy) Run() (err error) {
  193. if pxy.cfg.Plugin != "" {
  194. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  195. if err != nil {
  196. return
  197. }
  198. }
  199. return
  200. }
  201. func (pxy *HTTPSProxy) Close() {
  202. if pxy.proxyPlugin != nil {
  203. pxy.proxyPlugin.Close()
  204. }
  205. }
  206. func (pxy *HTTPSProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  207. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  208. conn, []byte(pxy.clientCfg.Token), m)
  209. }
  210. // STCP
  211. type STCPProxy struct {
  212. *BaseProxy
  213. cfg *config.STCPProxyConf
  214. proxyPlugin plugin.Plugin
  215. }
  216. func (pxy *STCPProxy) Run() (err error) {
  217. if pxy.cfg.Plugin != "" {
  218. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  219. if err != nil {
  220. return
  221. }
  222. }
  223. return
  224. }
  225. func (pxy *STCPProxy) Close() {
  226. if pxy.proxyPlugin != nil {
  227. pxy.proxyPlugin.Close()
  228. }
  229. }
  230. func (pxy *STCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  231. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  232. conn, []byte(pxy.clientCfg.Token), m)
  233. }
  234. // XTCP
  235. type XTCPProxy struct {
  236. *BaseProxy
  237. cfg *config.XTCPProxyConf
  238. proxyPlugin plugin.Plugin
  239. }
  240. func (pxy *XTCPProxy) Run() (err error) {
  241. if pxy.cfg.Plugin != "" {
  242. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  243. if err != nil {
  244. return
  245. }
  246. }
  247. return
  248. }
  249. func (pxy *XTCPProxy) Close() {
  250. if pxy.proxyPlugin != nil {
  251. pxy.proxyPlugin.Close()
  252. }
  253. }
  254. func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  255. xl := pxy.xl
  256. defer conn.Close()
  257. var natHoleSidMsg msg.NatHoleSid
  258. err := msg.ReadMsgInto(conn, &natHoleSidMsg)
  259. if err != nil {
  260. xl.Error("xtcp read from workConn error: %v", err)
  261. return
  262. }
  263. natHoleClientMsg := &msg.NatHoleClient{
  264. ProxyName: pxy.cfg.ProxyName,
  265. Sid: natHoleSidMsg.Sid,
  266. }
  267. raddr, _ := net.ResolveUDPAddr("udp",
  268. fmt.Sprintf("%s:%d", pxy.clientCfg.ServerAddr, pxy.serverUDPPort))
  269. clientConn, err := net.DialUDP("udp", nil, raddr)
  270. if err != nil {
  271. xl.Error("dial server udp addr error: %v", err)
  272. return
  273. }
  274. defer clientConn.Close()
  275. err = msg.WriteMsg(clientConn, natHoleClientMsg)
  276. if err != nil {
  277. xl.Error("send natHoleClientMsg to server error: %v", err)
  278. return
  279. }
  280. // Wait for client address at most 5 seconds.
  281. var natHoleRespMsg msg.NatHoleResp
  282. clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
  283. buf := pool.GetBuf(1024)
  284. n, err := clientConn.Read(buf)
  285. if err != nil {
  286. xl.Error("get natHoleRespMsg error: %v", err)
  287. return
  288. }
  289. err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
  290. if err != nil {
  291. xl.Error("get natHoleRespMsg error: %v", err)
  292. return
  293. }
  294. clientConn.SetReadDeadline(time.Time{})
  295. clientConn.Close()
  296. if natHoleRespMsg.Error != "" {
  297. xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
  298. return
  299. }
  300. xl.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
  301. // Send detect message
  302. array := strings.Split(natHoleRespMsg.VisitorAddr, ":")
  303. if len(array) <= 1 {
  304. xl.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  305. }
  306. laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
  307. /*
  308. for i := 1000; i < 65000; i++ {
  309. pxy.sendDetectMsg(array[0], int64(i), laddr, "a")
  310. }
  311. */
  312. port, err := strconv.ParseInt(array[1], 10, 64)
  313. if err != nil {
  314. xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  315. return
  316. }
  317. pxy.sendDetectMsg(array[0], int(port), laddr, []byte(natHoleRespMsg.Sid))
  318. xl.Trace("send all detect msg done")
  319. msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{})
  320. // Listen for clientConn's address and wait for visitor connection
  321. lConn, err := net.ListenUDP("udp", laddr)
  322. if err != nil {
  323. xl.Error("listen on visitorConn's local adress error: %v", err)
  324. return
  325. }
  326. defer lConn.Close()
  327. lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
  328. sidBuf := pool.GetBuf(1024)
  329. var uAddr *net.UDPAddr
  330. n, uAddr, err = lConn.ReadFromUDP(sidBuf)
  331. if err != nil {
  332. xl.Warn("get sid from visitor error: %v", err)
  333. return
  334. }
  335. lConn.SetReadDeadline(time.Time{})
  336. if string(sidBuf[:n]) != natHoleRespMsg.Sid {
  337. xl.Warn("incorrect sid from visitor")
  338. return
  339. }
  340. pool.PutBuf(sidBuf)
  341. xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
  342. lConn.WriteToUDP(sidBuf[:n], uAddr)
  343. kcpConn, err := frpNet.NewKCPConnFromUDP(lConn, false, uAddr.String())
  344. if err != nil {
  345. xl.Error("create kcp connection from udp connection error: %v", err)
  346. return
  347. }
  348. fmuxCfg := fmux.DefaultConfig()
  349. fmuxCfg.KeepAliveInterval = 5 * time.Second
  350. fmuxCfg.LogOutput = io.Discard
  351. sess, err := fmux.Server(kcpConn, fmuxCfg)
  352. if err != nil {
  353. xl.Error("create yamux server from kcp connection error: %v", err)
  354. return
  355. }
  356. defer sess.Close()
  357. muxConn, err := sess.Accept()
  358. if err != nil {
  359. xl.Error("accept for yamux connection error: %v", err)
  360. return
  361. }
  362. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  363. muxConn, []byte(pxy.cfg.Sk), m)
  364. }
  365. func (pxy *XTCPProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
  366. daddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, port))
  367. if err != nil {
  368. return err
  369. }
  370. tConn, err := net.DialUDP("udp", laddr, daddr)
  371. if err != nil {
  372. return err
  373. }
  374. //uConn := ipv4.NewConn(tConn)
  375. //uConn.SetTTL(3)
  376. tConn.Write(content)
  377. tConn.Close()
  378. return nil
  379. }
  380. // UDP
  381. type UDPProxy struct {
  382. *BaseProxy
  383. cfg *config.UDPProxyConf
  384. localAddr *net.UDPAddr
  385. readCh chan *msg.UDPPacket
  386. // include msg.UDPPacket and msg.Ping
  387. sendCh chan msg.Message
  388. workConn net.Conn
  389. }
  390. func (pxy *UDPProxy) Run() (err error) {
  391. pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIP, pxy.cfg.LocalPort))
  392. if err != nil {
  393. return
  394. }
  395. return
  396. }
  397. func (pxy *UDPProxy) Close() {
  398. pxy.mu.Lock()
  399. defer pxy.mu.Unlock()
  400. if !pxy.closed {
  401. pxy.closed = true
  402. if pxy.workConn != nil {
  403. pxy.workConn.Close()
  404. }
  405. if pxy.readCh != nil {
  406. close(pxy.readCh)
  407. }
  408. if pxy.sendCh != nil {
  409. close(pxy.sendCh)
  410. }
  411. }
  412. }
  413. func (pxy *UDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  414. xl := pxy.xl
  415. xl.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
  416. // close resources releated with old workConn
  417. pxy.Close()
  418. var rwc io.ReadWriteCloser = conn
  419. var err error
  420. if pxy.limiter != nil {
  421. rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
  422. return conn.Close()
  423. })
  424. }
  425. if pxy.cfg.UseEncryption {
  426. rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
  427. if err != nil {
  428. conn.Close()
  429. xl.Error("create encryption stream error: %v", err)
  430. return
  431. }
  432. }
  433. if pxy.cfg.UseCompression {
  434. rwc = frpIo.WithCompression(rwc)
  435. }
  436. conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
  437. pxy.mu.Lock()
  438. pxy.workConn = conn
  439. pxy.readCh = make(chan *msg.UDPPacket, 1024)
  440. pxy.sendCh = make(chan msg.Message, 1024)
  441. pxy.closed = false
  442. pxy.mu.Unlock()
  443. workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) {
  444. for {
  445. var udpMsg msg.UDPPacket
  446. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  447. xl.Warn("read from workConn for udp error: %v", errRet)
  448. return
  449. }
  450. if errRet := errors.PanicToError(func() {
  451. xl.Trace("get udp package from workConn: %s", udpMsg.Content)
  452. readCh <- &udpMsg
  453. }); errRet != nil {
  454. xl.Info("reader goroutine for udp work connection closed: %v", errRet)
  455. return
  456. }
  457. }
  458. }
  459. workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
  460. defer func() {
  461. xl.Info("writer goroutine for udp work connection closed")
  462. }()
  463. var errRet error
  464. for rawMsg := range sendCh {
  465. switch m := rawMsg.(type) {
  466. case *msg.UDPPacket:
  467. xl.Trace("send udp package to workConn: %s", m.Content)
  468. case *msg.Ping:
  469. xl.Trace("send ping message to udp workConn")
  470. }
  471. if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
  472. xl.Error("udp work write error: %v", errRet)
  473. return
  474. }
  475. }
  476. }
  477. heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
  478. var errRet error
  479. for {
  480. time.Sleep(time.Duration(30) * time.Second)
  481. if errRet = errors.PanicToError(func() {
  482. sendCh <- &msg.Ping{}
  483. }); errRet != nil {
  484. xl.Trace("heartbeat goroutine for udp work connection closed")
  485. break
  486. }
  487. }
  488. }
  489. go workConnSenderFn(pxy.workConn, pxy.sendCh)
  490. go workConnReaderFn(pxy.workConn, pxy.readCh)
  491. go heartbeatFn(pxy.workConn, pxy.sendCh)
  492. udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh, int(pxy.clientCfg.UDPPacketSize))
  493. }
  494. type SUDPProxy struct {
  495. *BaseProxy
  496. cfg *config.SUDPProxyConf
  497. localAddr *net.UDPAddr
  498. closeCh chan struct{}
  499. }
  500. func (pxy *SUDPProxy) Run() (err error) {
  501. pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIP, pxy.cfg.LocalPort))
  502. if err != nil {
  503. return
  504. }
  505. return
  506. }
  507. func (pxy *SUDPProxy) Close() {
  508. pxy.mu.Lock()
  509. defer pxy.mu.Unlock()
  510. select {
  511. case <-pxy.closeCh:
  512. return
  513. default:
  514. close(pxy.closeCh)
  515. }
  516. }
  517. func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  518. xl := pxy.xl
  519. xl.Info("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())
  520. var rwc io.ReadWriteCloser = conn
  521. var err error
  522. if pxy.limiter != nil {
  523. rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
  524. return conn.Close()
  525. })
  526. }
  527. if pxy.cfg.UseEncryption {
  528. rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
  529. if err != nil {
  530. conn.Close()
  531. xl.Error("create encryption stream error: %v", err)
  532. return
  533. }
  534. }
  535. if pxy.cfg.UseCompression {
  536. rwc = frpIo.WithCompression(rwc)
  537. }
  538. conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
  539. workConn := conn
  540. readCh := make(chan *msg.UDPPacket, 1024)
  541. sendCh := make(chan msg.Message, 1024)
  542. isClose := false
  543. mu := &sync.Mutex{}
  544. closeFn := func() {
  545. mu.Lock()
  546. defer mu.Unlock()
  547. if isClose {
  548. return
  549. }
  550. isClose = true
  551. if workConn != nil {
  552. workConn.Close()
  553. }
  554. close(readCh)
  555. close(sendCh)
  556. }
  557. // udp service <- frpc <- frps <- frpc visitor <- user
  558. workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) {
  559. defer closeFn()
  560. for {
  561. // first to check sudp proxy is closed or not
  562. select {
  563. case <-pxy.closeCh:
  564. xl.Trace("frpc sudp proxy is closed")
  565. return
  566. default:
  567. }
  568. var udpMsg msg.UDPPacket
  569. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  570. xl.Warn("read from workConn for sudp error: %v", errRet)
  571. return
  572. }
  573. if errRet := errors.PanicToError(func() {
  574. readCh <- &udpMsg
  575. }); errRet != nil {
  576. xl.Warn("reader goroutine for sudp work connection closed: %v", errRet)
  577. return
  578. }
  579. }
  580. }
  581. // udp service -> frpc -> frps -> frpc visitor -> user
  582. workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
  583. defer func() {
  584. closeFn()
  585. xl.Info("writer goroutine for sudp work connection closed")
  586. }()
  587. var errRet error
  588. for rawMsg := range sendCh {
  589. switch m := rawMsg.(type) {
  590. case *msg.UDPPacket:
  591. xl.Trace("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]",
  592. m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String())
  593. case *msg.Ping:
  594. xl.Trace("frpc send ping message to frpc visitor")
  595. }
  596. if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
  597. xl.Error("sudp work write error: %v", errRet)
  598. return
  599. }
  600. }
  601. }
  602. heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
  603. ticker := time.NewTicker(30 * time.Second)
  604. defer func() {
  605. ticker.Stop()
  606. closeFn()
  607. }()
  608. var errRet error
  609. for {
  610. select {
  611. case <-ticker.C:
  612. if errRet = errors.PanicToError(func() {
  613. sendCh <- &msg.Ping{}
  614. }); errRet != nil {
  615. xl.Warn("heartbeat goroutine for sudp work connection closed")
  616. return
  617. }
  618. case <-pxy.closeCh:
  619. xl.Trace("frpc sudp proxy is closed")
  620. return
  621. }
  622. }
  623. }
  624. go workConnSenderFn(workConn, sendCh)
  625. go workConnReaderFn(workConn, readCh)
  626. go heartbeatFn(workConn, sendCh)
  627. udp.Forwarder(pxy.localAddr, readCh, sendCh, int(pxy.clientCfg.UDPPacketSize))
  628. }
  629. // Common handler for tcp work connections.
  630. func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
  631. baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
  632. xl := xlog.FromContextSafe(ctx)
  633. var (
  634. remote io.ReadWriteCloser
  635. err error
  636. )
  637. remote = workConn
  638. if limiter != nil {
  639. remote = frpIo.WrapReadWriteCloser(limit.NewReader(workConn, limiter), limit.NewWriter(workConn, limiter), func() error {
  640. return workConn.Close()
  641. })
  642. }
  643. xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
  644. baseInfo.UseEncryption, baseInfo.UseCompression)
  645. if baseInfo.UseEncryption {
  646. remote, err = frpIo.WithEncryption(remote, encKey)
  647. if err != nil {
  648. workConn.Close()
  649. xl.Error("create encryption stream error: %v", err)
  650. return
  651. }
  652. }
  653. if baseInfo.UseCompression {
  654. remote = frpIo.WithCompression(remote)
  655. }
  656. // check if we need to send proxy protocol info
  657. var extraInfo []byte
  658. if baseInfo.ProxyProtocolVersion != "" {
  659. if m.SrcAddr != "" && m.SrcPort != 0 {
  660. if m.DstAddr == "" {
  661. m.DstAddr = "127.0.0.1"
  662. }
  663. srcAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.SrcAddr, strconv.Itoa(int(m.SrcPort))))
  664. dstAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.DstAddr, strconv.Itoa(int(m.DstPort))))
  665. h := &pp.Header{
  666. Command: pp.PROXY,
  667. SourceAddr: srcAddr,
  668. DestinationAddr: dstAddr,
  669. }
  670. if strings.Contains(m.SrcAddr, ".") {
  671. h.TransportProtocol = pp.TCPv4
  672. } else {
  673. h.TransportProtocol = pp.TCPv6
  674. }
  675. if baseInfo.ProxyProtocolVersion == "v1" {
  676. h.Version = 1
  677. } else if baseInfo.ProxyProtocolVersion == "v2" {
  678. h.Version = 2
  679. }
  680. buf := bytes.NewBuffer(nil)
  681. h.WriteTo(buf)
  682. extraInfo = buf.Bytes()
  683. }
  684. }
  685. if proxyPlugin != nil {
  686. // if plugin is set, let plugin handle connections first
  687. xl.Debug("handle by plugin: %s", proxyPlugin.Name())
  688. proxyPlugin.Handle(remote, workConn, extraInfo)
  689. xl.Debug("handle by plugin finished")
  690. return
  691. }
  692. localConn, err := libdial.Dial(net.JoinHostPort(localInfo.LocalIP, strconv.Itoa(localInfo.LocalPort)))
  693. if err != nil {
  694. workConn.Close()
  695. xl.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIP, localInfo.LocalPort, err)
  696. return
  697. }
  698. xl.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
  699. localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  700. if len(extraInfo) > 0 {
  701. localConn.Write(extraInfo)
  702. }
  703. frpIo.Join(localConn, remote)
  704. xl.Debug("join connections closed")
  705. }