proxy.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "context"
  18. "io"
  19. "net"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. "github.com/fatedier/golib/errors"
  25. frpIo "github.com/fatedier/golib/io"
  26. libdial "github.com/fatedier/golib/net/dial"
  27. "github.com/fatedier/golib/pool"
  28. fmux "github.com/hashicorp/yamux"
  29. pp "github.com/pires/go-proxyproto"
  30. "golang.org/x/time/rate"
  31. "github.com/fatedier/frp/pkg/config"
  32. "github.com/fatedier/frp/pkg/msg"
  33. plugin "github.com/fatedier/frp/pkg/plugin/client"
  34. "github.com/fatedier/frp/pkg/proto/udp"
  35. "github.com/fatedier/frp/pkg/util/limit"
  36. frpNet "github.com/fatedier/frp/pkg/util/net"
  37. "github.com/fatedier/frp/pkg/util/xlog"
  38. )
  39. // Proxy defines how to handle work connections for different proxy type.
  40. type Proxy interface {
  41. Run() error
  42. // InWorkConn accept work connections registered to server.
  43. InWorkConn(net.Conn, *msg.StartWorkConn)
  44. Close()
  45. }
  46. func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) {
  47. var limiter *rate.Limiter
  48. limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
  49. if limitBytes > 0 && pxyConf.GetBaseInfo().BandwidthLimitMode == config.BandwidthLimitModeClient {
  50. limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
  51. }
  52. baseProxy := BaseProxy{
  53. clientCfg: clientCfg,
  54. serverUDPPort: serverUDPPort,
  55. limiter: limiter,
  56. xl: xlog.FromContextSafe(ctx),
  57. ctx: ctx,
  58. }
  59. switch cfg := pxyConf.(type) {
  60. case *config.TCPProxyConf:
  61. pxy = &TCPProxy{
  62. BaseProxy: &baseProxy,
  63. cfg: cfg,
  64. }
  65. case *config.TCPMuxProxyConf:
  66. pxy = &TCPMuxProxy{
  67. BaseProxy: &baseProxy,
  68. cfg: cfg,
  69. }
  70. case *config.UDPProxyConf:
  71. pxy = &UDPProxy{
  72. BaseProxy: &baseProxy,
  73. cfg: cfg,
  74. }
  75. case *config.HTTPProxyConf:
  76. pxy = &HTTPProxy{
  77. BaseProxy: &baseProxy,
  78. cfg: cfg,
  79. }
  80. case *config.HTTPSProxyConf:
  81. pxy = &HTTPSProxy{
  82. BaseProxy: &baseProxy,
  83. cfg: cfg,
  84. }
  85. case *config.STCPProxyConf:
  86. pxy = &STCPProxy{
  87. BaseProxy: &baseProxy,
  88. cfg: cfg,
  89. }
  90. case *config.XTCPProxyConf:
  91. pxy = &XTCPProxy{
  92. BaseProxy: &baseProxy,
  93. cfg: cfg,
  94. }
  95. case *config.SUDPProxyConf:
  96. pxy = &SUDPProxy{
  97. BaseProxy: &baseProxy,
  98. cfg: cfg,
  99. closeCh: make(chan struct{}),
  100. }
  101. }
  102. return
  103. }
  104. type BaseProxy struct {
  105. closed bool
  106. clientCfg config.ClientCommonConf
  107. serverUDPPort int
  108. limiter *rate.Limiter
  109. mu sync.RWMutex
  110. xl *xlog.Logger
  111. ctx context.Context
  112. }
  113. // TCP
  114. type TCPProxy struct {
  115. *BaseProxy
  116. cfg *config.TCPProxyConf
  117. proxyPlugin plugin.Plugin
  118. }
  119. func (pxy *TCPProxy) Run() (err error) {
  120. if pxy.cfg.Plugin != "" {
  121. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  122. if err != nil {
  123. return
  124. }
  125. }
  126. return
  127. }
  128. func (pxy *TCPProxy) Close() {
  129. if pxy.proxyPlugin != nil {
  130. pxy.proxyPlugin.Close()
  131. }
  132. }
  133. func (pxy *TCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  134. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  135. conn, []byte(pxy.clientCfg.Token), m)
  136. }
  137. // TCP Multiplexer
  138. type TCPMuxProxy struct {
  139. *BaseProxy
  140. cfg *config.TCPMuxProxyConf
  141. proxyPlugin plugin.Plugin
  142. }
  143. func (pxy *TCPMuxProxy) Run() (err error) {
  144. if pxy.cfg.Plugin != "" {
  145. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  146. if err != nil {
  147. return
  148. }
  149. }
  150. return
  151. }
  152. func (pxy *TCPMuxProxy) Close() {
  153. if pxy.proxyPlugin != nil {
  154. pxy.proxyPlugin.Close()
  155. }
  156. }
  157. func (pxy *TCPMuxProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  158. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  159. conn, []byte(pxy.clientCfg.Token), m)
  160. }
  161. // HTTP
  162. type HTTPProxy struct {
  163. *BaseProxy
  164. cfg *config.HTTPProxyConf
  165. proxyPlugin plugin.Plugin
  166. }
  167. func (pxy *HTTPProxy) Run() (err error) {
  168. if pxy.cfg.Plugin != "" {
  169. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  170. if err != nil {
  171. return
  172. }
  173. }
  174. return
  175. }
  176. func (pxy *HTTPProxy) Close() {
  177. if pxy.proxyPlugin != nil {
  178. pxy.proxyPlugin.Close()
  179. }
  180. }
  181. func (pxy *HTTPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  182. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  183. conn, []byte(pxy.clientCfg.Token), m)
  184. }
  185. // HTTPS
  186. type HTTPSProxy struct {
  187. *BaseProxy
  188. cfg *config.HTTPSProxyConf
  189. proxyPlugin plugin.Plugin
  190. }
  191. func (pxy *HTTPSProxy) Run() (err error) {
  192. if pxy.cfg.Plugin != "" {
  193. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  194. if err != nil {
  195. return
  196. }
  197. }
  198. return
  199. }
  200. func (pxy *HTTPSProxy) Close() {
  201. if pxy.proxyPlugin != nil {
  202. pxy.proxyPlugin.Close()
  203. }
  204. }
  205. func (pxy *HTTPSProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  206. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  207. conn, []byte(pxy.clientCfg.Token), m)
  208. }
  209. // STCP
  210. type STCPProxy struct {
  211. *BaseProxy
  212. cfg *config.STCPProxyConf
  213. proxyPlugin plugin.Plugin
  214. }
  215. func (pxy *STCPProxy) Run() (err error) {
  216. if pxy.cfg.Plugin != "" {
  217. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  218. if err != nil {
  219. return
  220. }
  221. }
  222. return
  223. }
  224. func (pxy *STCPProxy) Close() {
  225. if pxy.proxyPlugin != nil {
  226. pxy.proxyPlugin.Close()
  227. }
  228. }
  229. func (pxy *STCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  230. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  231. conn, []byte(pxy.clientCfg.Token), m)
  232. }
  233. // XTCP
  234. type XTCPProxy struct {
  235. *BaseProxy
  236. cfg *config.XTCPProxyConf
  237. proxyPlugin plugin.Plugin
  238. }
  239. func (pxy *XTCPProxy) Run() (err error) {
  240. if pxy.cfg.Plugin != "" {
  241. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  242. if err != nil {
  243. return
  244. }
  245. }
  246. return
  247. }
  248. func (pxy *XTCPProxy) Close() {
  249. if pxy.proxyPlugin != nil {
  250. pxy.proxyPlugin.Close()
  251. }
  252. }
  253. func (pxy *XTCPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  254. xl := pxy.xl
  255. defer conn.Close()
  256. var natHoleSidMsg msg.NatHoleSid
  257. err := msg.ReadMsgInto(conn, &natHoleSidMsg)
  258. if err != nil {
  259. xl.Error("xtcp read from workConn error: %v", err)
  260. return
  261. }
  262. natHoleClientMsg := &msg.NatHoleClient{
  263. ProxyName: pxy.cfg.ProxyName,
  264. Sid: natHoleSidMsg.Sid,
  265. }
  266. serverAddr := pxy.clientCfg.NatHoleServerAddr
  267. if serverAddr == "" {
  268. serverAddr = pxy.clientCfg.ServerAddr
  269. }
  270. raddr, _ := net.ResolveUDPAddr("udp",
  271. net.JoinHostPort(serverAddr, strconv.Itoa(pxy.serverUDPPort)))
  272. clientConn, err := net.DialUDP("udp", nil, raddr)
  273. if err != nil {
  274. xl.Error("dial server udp addr error: %v", err)
  275. return
  276. }
  277. defer clientConn.Close()
  278. err = msg.WriteMsg(clientConn, natHoleClientMsg)
  279. if err != nil {
  280. xl.Error("send natHoleClientMsg to server error: %v", err)
  281. return
  282. }
  283. // Wait for client address at most 5 seconds.
  284. var natHoleRespMsg msg.NatHoleResp
  285. _ = clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
  286. buf := pool.GetBuf(1024)
  287. n, err := clientConn.Read(buf)
  288. if err != nil {
  289. xl.Error("get natHoleRespMsg error: %v", err)
  290. return
  291. }
  292. err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
  293. if err != nil {
  294. xl.Error("get natHoleRespMsg error: %v", err)
  295. return
  296. }
  297. _ = clientConn.SetReadDeadline(time.Time{})
  298. _ = clientConn.Close()
  299. if natHoleRespMsg.Error != "" {
  300. xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
  301. return
  302. }
  303. xl.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
  304. // Send detect message
  305. host, portStr, err := net.SplitHostPort(natHoleRespMsg.VisitorAddr)
  306. if err != nil {
  307. xl.Error("get NatHoleResp visitor address [%s] error: %v", natHoleRespMsg.VisitorAddr, err)
  308. }
  309. laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
  310. port, err := strconv.ParseInt(portStr, 10, 64)
  311. if err != nil {
  312. xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  313. return
  314. }
  315. _ = pxy.sendDetectMsg(host, int(port), laddr, []byte(natHoleRespMsg.Sid))
  316. xl.Trace("send all detect msg done")
  317. if err := msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{}); err != nil {
  318. xl.Error("write message error: %v", err)
  319. return
  320. }
  321. // Listen for clientConn's address and wait for visitor connection
  322. lConn, err := net.ListenUDP("udp", laddr)
  323. if err != nil {
  324. xl.Error("listen on visitorConn's local address error: %v", err)
  325. return
  326. }
  327. defer lConn.Close()
  328. _ = lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
  329. sidBuf := pool.GetBuf(1024)
  330. var uAddr *net.UDPAddr
  331. n, uAddr, err = lConn.ReadFromUDP(sidBuf)
  332. if err != nil {
  333. xl.Warn("get sid from visitor error: %v", err)
  334. return
  335. }
  336. _ = lConn.SetReadDeadline(time.Time{})
  337. if string(sidBuf[:n]) != natHoleRespMsg.Sid {
  338. xl.Warn("incorrect sid from visitor")
  339. return
  340. }
  341. pool.PutBuf(sidBuf)
  342. xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
  343. if _, err := lConn.WriteToUDP(sidBuf[:n], uAddr); err != nil {
  344. xl.Error("write uaddr error: %v", err)
  345. return
  346. }
  347. kcpConn, err := frpNet.NewKCPConnFromUDP(lConn, false, uAddr.String())
  348. if err != nil {
  349. xl.Error("create kcp connection from udp connection error: %v", err)
  350. return
  351. }
  352. fmuxCfg := fmux.DefaultConfig()
  353. fmuxCfg.KeepAliveInterval = 5 * time.Second
  354. fmuxCfg.LogOutput = io.Discard
  355. sess, err := fmux.Server(kcpConn, fmuxCfg)
  356. if err != nil {
  357. xl.Error("create yamux server from kcp connection error: %v", err)
  358. return
  359. }
  360. defer sess.Close()
  361. muxConn, err := sess.Accept()
  362. if err != nil {
  363. xl.Error("accept for yamux connection error: %v", err)
  364. return
  365. }
  366. HandleTCPWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, pxy.cfg.GetBaseInfo(), pxy.limiter,
  367. muxConn, []byte(pxy.cfg.Sk), m)
  368. }
  369. func (pxy *XTCPProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
  370. daddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(addr, strconv.Itoa(port)))
  371. if err != nil {
  372. return err
  373. }
  374. tConn, err := net.DialUDP("udp", laddr, daddr)
  375. if err != nil {
  376. return err
  377. }
  378. // uConn := ipv4.NewConn(tConn)
  379. // uConn.SetTTL(3)
  380. if _, err := tConn.Write(content); err != nil {
  381. return err
  382. }
  383. return tConn.Close()
  384. }
  385. // UDP
  386. type UDPProxy struct {
  387. *BaseProxy
  388. cfg *config.UDPProxyConf
  389. localAddr *net.UDPAddr
  390. readCh chan *msg.UDPPacket
  391. // include msg.UDPPacket and msg.Ping
  392. sendCh chan msg.Message
  393. workConn net.Conn
  394. }
  395. func (pxy *UDPProxy) Run() (err error) {
  396. pxy.localAddr, err = net.ResolveUDPAddr("udp", net.JoinHostPort(pxy.cfg.LocalIP, strconv.Itoa(pxy.cfg.LocalPort)))
  397. if err != nil {
  398. return
  399. }
  400. return
  401. }
  402. func (pxy *UDPProxy) Close() {
  403. pxy.mu.Lock()
  404. defer pxy.mu.Unlock()
  405. if !pxy.closed {
  406. pxy.closed = true
  407. if pxy.workConn != nil {
  408. pxy.workConn.Close()
  409. }
  410. if pxy.readCh != nil {
  411. close(pxy.readCh)
  412. }
  413. if pxy.sendCh != nil {
  414. close(pxy.sendCh)
  415. }
  416. }
  417. }
  418. func (pxy *UDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  419. xl := pxy.xl
  420. xl.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
  421. // close resources releated with old workConn
  422. pxy.Close()
  423. var rwc io.ReadWriteCloser = conn
  424. var err error
  425. if pxy.limiter != nil {
  426. rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
  427. return conn.Close()
  428. })
  429. }
  430. if pxy.cfg.UseEncryption {
  431. rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
  432. if err != nil {
  433. conn.Close()
  434. xl.Error("create encryption stream error: %v", err)
  435. return
  436. }
  437. }
  438. if pxy.cfg.UseCompression {
  439. rwc = frpIo.WithCompression(rwc)
  440. }
  441. conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
  442. pxy.mu.Lock()
  443. pxy.workConn = conn
  444. pxy.readCh = make(chan *msg.UDPPacket, 1024)
  445. pxy.sendCh = make(chan msg.Message, 1024)
  446. pxy.closed = false
  447. pxy.mu.Unlock()
  448. workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) {
  449. for {
  450. var udpMsg msg.UDPPacket
  451. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  452. xl.Warn("read from workConn for udp error: %v", errRet)
  453. return
  454. }
  455. if errRet := errors.PanicToError(func() {
  456. xl.Trace("get udp package from workConn: %s", udpMsg.Content)
  457. readCh <- &udpMsg
  458. }); errRet != nil {
  459. xl.Info("reader goroutine for udp work connection closed: %v", errRet)
  460. return
  461. }
  462. }
  463. }
  464. workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
  465. defer func() {
  466. xl.Info("writer goroutine for udp work connection closed")
  467. }()
  468. var errRet error
  469. for rawMsg := range sendCh {
  470. switch m := rawMsg.(type) {
  471. case *msg.UDPPacket:
  472. xl.Trace("send udp package to workConn: %s", m.Content)
  473. case *msg.Ping:
  474. xl.Trace("send ping message to udp workConn")
  475. }
  476. if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
  477. xl.Error("udp work write error: %v", errRet)
  478. return
  479. }
  480. }
  481. }
  482. heartbeatFn := func(sendCh chan msg.Message) {
  483. var errRet error
  484. for {
  485. time.Sleep(time.Duration(30) * time.Second)
  486. if errRet = errors.PanicToError(func() {
  487. sendCh <- &msg.Ping{}
  488. }); errRet != nil {
  489. xl.Trace("heartbeat goroutine for udp work connection closed")
  490. break
  491. }
  492. }
  493. }
  494. go workConnSenderFn(pxy.workConn, pxy.sendCh)
  495. go workConnReaderFn(pxy.workConn, pxy.readCh)
  496. go heartbeatFn(pxy.sendCh)
  497. udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh, int(pxy.clientCfg.UDPPacketSize))
  498. }
  499. type SUDPProxy struct {
  500. *BaseProxy
  501. cfg *config.SUDPProxyConf
  502. localAddr *net.UDPAddr
  503. closeCh chan struct{}
  504. }
  505. func (pxy *SUDPProxy) Run() (err error) {
  506. pxy.localAddr, err = net.ResolveUDPAddr("udp", net.JoinHostPort(pxy.cfg.LocalIP, strconv.Itoa(pxy.cfg.LocalPort)))
  507. if err != nil {
  508. return
  509. }
  510. return
  511. }
  512. func (pxy *SUDPProxy) Close() {
  513. pxy.mu.Lock()
  514. defer pxy.mu.Unlock()
  515. select {
  516. case <-pxy.closeCh:
  517. return
  518. default:
  519. close(pxy.closeCh)
  520. }
  521. }
  522. func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  523. xl := pxy.xl
  524. xl.Info("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())
  525. var rwc io.ReadWriteCloser = conn
  526. var err error
  527. if pxy.limiter != nil {
  528. rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
  529. return conn.Close()
  530. })
  531. }
  532. if pxy.cfg.UseEncryption {
  533. rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
  534. if err != nil {
  535. conn.Close()
  536. xl.Error("create encryption stream error: %v", err)
  537. return
  538. }
  539. }
  540. if pxy.cfg.UseCompression {
  541. rwc = frpIo.WithCompression(rwc)
  542. }
  543. conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
  544. workConn := conn
  545. readCh := make(chan *msg.UDPPacket, 1024)
  546. sendCh := make(chan msg.Message, 1024)
  547. isClose := false
  548. mu := &sync.Mutex{}
  549. closeFn := func() {
  550. mu.Lock()
  551. defer mu.Unlock()
  552. if isClose {
  553. return
  554. }
  555. isClose = true
  556. if workConn != nil {
  557. workConn.Close()
  558. }
  559. close(readCh)
  560. close(sendCh)
  561. }
  562. // udp service <- frpc <- frps <- frpc visitor <- user
  563. workConnReaderFn := func(conn net.Conn, readCh chan *msg.UDPPacket) {
  564. defer closeFn()
  565. for {
  566. // first to check sudp proxy is closed or not
  567. select {
  568. case <-pxy.closeCh:
  569. xl.Trace("frpc sudp proxy is closed")
  570. return
  571. default:
  572. }
  573. var udpMsg msg.UDPPacket
  574. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  575. xl.Warn("read from workConn for sudp error: %v", errRet)
  576. return
  577. }
  578. if errRet := errors.PanicToError(func() {
  579. readCh <- &udpMsg
  580. }); errRet != nil {
  581. xl.Warn("reader goroutine for sudp work connection closed: %v", errRet)
  582. return
  583. }
  584. }
  585. }
  586. // udp service -> frpc -> frps -> frpc visitor -> user
  587. workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
  588. defer func() {
  589. closeFn()
  590. xl.Info("writer goroutine for sudp work connection closed")
  591. }()
  592. var errRet error
  593. for rawMsg := range sendCh {
  594. switch m := rawMsg.(type) {
  595. case *msg.UDPPacket:
  596. xl.Trace("frpc send udp package to frpc visitor, [udp local: %v, remote: %v], [tcp work conn local: %v, remote: %v]",
  597. m.LocalAddr.String(), m.RemoteAddr.String(), conn.LocalAddr().String(), conn.RemoteAddr().String())
  598. case *msg.Ping:
  599. xl.Trace("frpc send ping message to frpc visitor")
  600. }
  601. if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
  602. xl.Error("sudp work write error: %v", errRet)
  603. return
  604. }
  605. }
  606. }
  607. heartbeatFn := func(sendCh chan msg.Message) {
  608. ticker := time.NewTicker(30 * time.Second)
  609. defer func() {
  610. ticker.Stop()
  611. closeFn()
  612. }()
  613. var errRet error
  614. for {
  615. select {
  616. case <-ticker.C:
  617. if errRet = errors.PanicToError(func() {
  618. sendCh <- &msg.Ping{}
  619. }); errRet != nil {
  620. xl.Warn("heartbeat goroutine for sudp work connection closed")
  621. return
  622. }
  623. case <-pxy.closeCh:
  624. xl.Trace("frpc sudp proxy is closed")
  625. return
  626. }
  627. }
  628. }
  629. go workConnSenderFn(workConn, sendCh)
  630. go workConnReaderFn(workConn, readCh)
  631. go heartbeatFn(sendCh)
  632. udp.Forwarder(pxy.localAddr, readCh, sendCh, int(pxy.clientCfg.UDPPacketSize))
  633. }
  634. // Common handler for tcp work connections.
  635. func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
  636. baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn,
  637. ) {
  638. xl := xlog.FromContextSafe(ctx)
  639. var (
  640. remote io.ReadWriteCloser
  641. err error
  642. )
  643. remote = workConn
  644. if limiter != nil {
  645. remote = frpIo.WrapReadWriteCloser(limit.NewReader(workConn, limiter), limit.NewWriter(workConn, limiter), func() error {
  646. return workConn.Close()
  647. })
  648. }
  649. xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
  650. baseInfo.UseEncryption, baseInfo.UseCompression)
  651. if baseInfo.UseEncryption {
  652. remote, err = frpIo.WithEncryption(remote, encKey)
  653. if err != nil {
  654. workConn.Close()
  655. xl.Error("create encryption stream error: %v", err)
  656. return
  657. }
  658. }
  659. if baseInfo.UseCompression {
  660. remote = frpIo.WithCompression(remote)
  661. }
  662. // check if we need to send proxy protocol info
  663. var extraInfo []byte
  664. if baseInfo.ProxyProtocolVersion != "" {
  665. if m.SrcAddr != "" && m.SrcPort != 0 {
  666. if m.DstAddr == "" {
  667. m.DstAddr = "127.0.0.1"
  668. }
  669. srcAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.SrcAddr, strconv.Itoa(int(m.SrcPort))))
  670. dstAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.DstAddr, strconv.Itoa(int(m.DstPort))))
  671. h := &pp.Header{
  672. Command: pp.PROXY,
  673. SourceAddr: srcAddr,
  674. DestinationAddr: dstAddr,
  675. }
  676. if strings.Contains(m.SrcAddr, ".") {
  677. h.TransportProtocol = pp.TCPv4
  678. } else {
  679. h.TransportProtocol = pp.TCPv6
  680. }
  681. if baseInfo.ProxyProtocolVersion == "v1" {
  682. h.Version = 1
  683. } else if baseInfo.ProxyProtocolVersion == "v2" {
  684. h.Version = 2
  685. }
  686. buf := bytes.NewBuffer(nil)
  687. _, _ = h.WriteTo(buf)
  688. extraInfo = buf.Bytes()
  689. }
  690. }
  691. if proxyPlugin != nil {
  692. // if plugin is set, let plugin handle connections first
  693. xl.Debug("handle by plugin: %s", proxyPlugin.Name())
  694. proxyPlugin.Handle(remote, workConn, extraInfo)
  695. xl.Debug("handle by plugin finished")
  696. return
  697. }
  698. localConn, err := libdial.Dial(
  699. net.JoinHostPort(localInfo.LocalIP, strconv.Itoa(localInfo.LocalPort)),
  700. libdial.WithTimeout(10*time.Second),
  701. )
  702. if err != nil {
  703. workConn.Close()
  704. xl.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIP, localInfo.LocalPort, err)
  705. return
  706. }
  707. xl.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
  708. localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  709. if len(extraInfo) > 0 {
  710. if _, err := localConn.Write(extraInfo); err != nil {
  711. workConn.Close()
  712. xl.Error("write extraInfo to local conn error: %v", err)
  713. return
  714. }
  715. }
  716. _, _, errs := frpIo.Join(localConn, remote)
  717. xl.Debug("join connections closed")
  718. if len(errs) > 0 {
  719. xl.Trace("join connections errors: %v", errs)
  720. }
  721. }