proxy.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "net"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. "github.com/fatedier/frp/models/config"
  27. "github.com/fatedier/frp/models/msg"
  28. plugin "github.com/fatedier/frp/models/plugin/client"
  29. "github.com/fatedier/frp/models/proto/udp"
  30. "github.com/fatedier/frp/utils/limit"
  31. frpNet "github.com/fatedier/frp/utils/net"
  32. "github.com/fatedier/frp/utils/xlog"
  33. "github.com/fatedier/golib/errors"
  34. frpIo "github.com/fatedier/golib/io"
  35. "github.com/fatedier/golib/pool"
  36. fmux "github.com/hashicorp/yamux"
  37. pp "github.com/pires/go-proxyproto"
  38. "golang.org/x/time/rate"
  39. )
  40. // Proxy defines how to handle work connections for different proxy type.
  41. type Proxy interface {
  42. Run() error
  43. // InWorkConn accept work connections registered to server.
  44. InWorkConn(net.Conn, *msg.StartWorkConn)
  45. Close()
  46. }
  47. func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) {
  48. var limiter *rate.Limiter
  49. limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
  50. if limitBytes > 0 {
  51. limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
  52. }
  53. baseProxy := BaseProxy{
  54. clientCfg: clientCfg,
  55. serverUDPPort: serverUDPPort,
  56. limiter: limiter,
  57. xl: xlog.FromContextSafe(ctx),
  58. ctx: ctx,
  59. }
  60. switch cfg := pxyConf.(type) {
  61. case *config.TcpProxyConf:
  62. pxy = &TcpProxy{
  63. BaseProxy: &baseProxy,
  64. cfg: cfg,
  65. }
  66. case *config.TcpMuxProxyConf:
  67. pxy = &TcpMuxProxy{
  68. BaseProxy: &baseProxy,
  69. cfg: cfg,
  70. }
  71. case *config.UdpProxyConf:
  72. pxy = &UdpProxy{
  73. BaseProxy: &baseProxy,
  74. cfg: cfg,
  75. }
  76. case *config.HttpProxyConf:
  77. pxy = &HttpProxy{
  78. BaseProxy: &baseProxy,
  79. cfg: cfg,
  80. }
  81. case *config.HttpsProxyConf:
  82. pxy = &HttpsProxy{
  83. BaseProxy: &baseProxy,
  84. cfg: cfg,
  85. }
  86. case *config.StcpProxyConf:
  87. pxy = &StcpProxy{
  88. BaseProxy: &baseProxy,
  89. cfg: cfg,
  90. }
  91. case *config.XtcpProxyConf:
  92. pxy = &XtcpProxy{
  93. BaseProxy: &baseProxy,
  94. cfg: cfg,
  95. }
  96. }
  97. return
  98. }
  99. type BaseProxy struct {
  100. closed bool
  101. clientCfg config.ClientCommonConf
  102. serverUDPPort int
  103. limiter *rate.Limiter
  104. mu sync.RWMutex
  105. xl *xlog.Logger
  106. ctx context.Context
  107. }
  108. // TCP
  109. type TcpProxy struct {
  110. *BaseProxy
  111. cfg *config.TcpProxyConf
  112. proxyPlugin plugin.Plugin
  113. }
  114. func (pxy *TcpProxy) Run() (err error) {
  115. if pxy.cfg.Plugin != "" {
  116. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  117. if err != nil {
  118. return
  119. }
  120. }
  121. return
  122. }
  123. func (pxy *TcpProxy) Close() {
  124. if pxy.proxyPlugin != nil {
  125. pxy.proxyPlugin.Close()
  126. }
  127. }
  128. func (pxy *TcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  129. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  130. conn, []byte(pxy.clientCfg.Token), m)
  131. }
  132. // TCP Multiplexer
  133. type TcpMuxProxy struct {
  134. *BaseProxy
  135. cfg *config.TcpMuxProxyConf
  136. proxyPlugin plugin.Plugin
  137. }
  138. func (pxy *TcpMuxProxy) Run() (err error) {
  139. if pxy.cfg.Plugin != "" {
  140. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  141. if err != nil {
  142. return
  143. }
  144. }
  145. return
  146. }
  147. func (pxy *TcpMuxProxy) Close() {
  148. if pxy.proxyPlugin != nil {
  149. pxy.proxyPlugin.Close()
  150. }
  151. }
  152. func (pxy *TcpMuxProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  153. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  154. conn, []byte(pxy.clientCfg.Token), m)
  155. }
  156. // HTTP
  157. type HttpProxy struct {
  158. *BaseProxy
  159. cfg *config.HttpProxyConf
  160. proxyPlugin plugin.Plugin
  161. }
  162. func (pxy *HttpProxy) Run() (err error) {
  163. if pxy.cfg.Plugin != "" {
  164. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  165. if err != nil {
  166. return
  167. }
  168. }
  169. return
  170. }
  171. func (pxy *HttpProxy) Close() {
  172. if pxy.proxyPlugin != nil {
  173. pxy.proxyPlugin.Close()
  174. }
  175. }
  176. func (pxy *HttpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  177. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  178. conn, []byte(pxy.clientCfg.Token), m)
  179. }
  180. // HTTPS
  181. type HttpsProxy struct {
  182. *BaseProxy
  183. cfg *config.HttpsProxyConf
  184. proxyPlugin plugin.Plugin
  185. }
  186. func (pxy *HttpsProxy) Run() (err error) {
  187. if pxy.cfg.Plugin != "" {
  188. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  189. if err != nil {
  190. return
  191. }
  192. }
  193. return
  194. }
  195. func (pxy *HttpsProxy) Close() {
  196. if pxy.proxyPlugin != nil {
  197. pxy.proxyPlugin.Close()
  198. }
  199. }
  200. func (pxy *HttpsProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  201. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  202. conn, []byte(pxy.clientCfg.Token), m)
  203. }
  204. // STCP
  205. type StcpProxy struct {
  206. *BaseProxy
  207. cfg *config.StcpProxyConf
  208. proxyPlugin plugin.Plugin
  209. }
  210. func (pxy *StcpProxy) Run() (err error) {
  211. if pxy.cfg.Plugin != "" {
  212. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  213. if err != nil {
  214. return
  215. }
  216. }
  217. return
  218. }
  219. func (pxy *StcpProxy) Close() {
  220. if pxy.proxyPlugin != nil {
  221. pxy.proxyPlugin.Close()
  222. }
  223. }
  224. func (pxy *StcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  225. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  226. conn, []byte(pxy.clientCfg.Token), m)
  227. }
  228. // XTCP
  229. type XtcpProxy struct {
  230. *BaseProxy
  231. cfg *config.XtcpProxyConf
  232. proxyPlugin plugin.Plugin
  233. }
  234. func (pxy *XtcpProxy) Run() (err error) {
  235. if pxy.cfg.Plugin != "" {
  236. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  237. if err != nil {
  238. return
  239. }
  240. }
  241. return
  242. }
  243. func (pxy *XtcpProxy) Close() {
  244. if pxy.proxyPlugin != nil {
  245. pxy.proxyPlugin.Close()
  246. }
  247. }
  248. func (pxy *XtcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  249. xl := pxy.xl
  250. defer conn.Close()
  251. var natHoleSidMsg msg.NatHoleSid
  252. err := msg.ReadMsgInto(conn, &natHoleSidMsg)
  253. if err != nil {
  254. xl.Error("xtcp read from workConn error: %v", err)
  255. return
  256. }
  257. natHoleClientMsg := &msg.NatHoleClient{
  258. ProxyName: pxy.cfg.ProxyName,
  259. Sid: natHoleSidMsg.Sid,
  260. }
  261. raddr, _ := net.ResolveUDPAddr("udp",
  262. fmt.Sprintf("%s:%d", pxy.clientCfg.ServerAddr, pxy.serverUDPPort))
  263. clientConn, err := net.DialUDP("udp", nil, raddr)
  264. defer clientConn.Close()
  265. err = msg.WriteMsg(clientConn, natHoleClientMsg)
  266. if err != nil {
  267. xl.Error("send natHoleClientMsg to server error: %v", err)
  268. return
  269. }
  270. // Wait for client address at most 5 seconds.
  271. var natHoleRespMsg msg.NatHoleResp
  272. clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
  273. buf := pool.GetBuf(1024)
  274. n, err := clientConn.Read(buf)
  275. if err != nil {
  276. xl.Error("get natHoleRespMsg error: %v", err)
  277. return
  278. }
  279. err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
  280. if err != nil {
  281. xl.Error("get natHoleRespMsg error: %v", err)
  282. return
  283. }
  284. clientConn.SetReadDeadline(time.Time{})
  285. clientConn.Close()
  286. if natHoleRespMsg.Error != "" {
  287. xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
  288. return
  289. }
  290. xl.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
  291. // Send detect message
  292. array := strings.Split(natHoleRespMsg.VisitorAddr, ":")
  293. if len(array) <= 1 {
  294. xl.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  295. }
  296. laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
  297. /*
  298. for i := 1000; i < 65000; i++ {
  299. pxy.sendDetectMsg(array[0], int64(i), laddr, "a")
  300. }
  301. */
  302. port, err := strconv.ParseInt(array[1], 10, 64)
  303. if err != nil {
  304. xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  305. return
  306. }
  307. pxy.sendDetectMsg(array[0], int(port), laddr, []byte(natHoleRespMsg.Sid))
  308. xl.Trace("send all detect msg done")
  309. msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{})
  310. // Listen for clientConn's address and wait for visitor connection
  311. lConn, err := net.ListenUDP("udp", laddr)
  312. if err != nil {
  313. xl.Error("listen on visitorConn's local adress error: %v", err)
  314. return
  315. }
  316. defer lConn.Close()
  317. lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
  318. sidBuf := pool.GetBuf(1024)
  319. var uAddr *net.UDPAddr
  320. n, uAddr, err = lConn.ReadFromUDP(sidBuf)
  321. if err != nil {
  322. xl.Warn("get sid from visitor error: %v", err)
  323. return
  324. }
  325. lConn.SetReadDeadline(time.Time{})
  326. if string(sidBuf[:n]) != natHoleRespMsg.Sid {
  327. xl.Warn("incorrect sid from visitor")
  328. return
  329. }
  330. pool.PutBuf(sidBuf)
  331. xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
  332. lConn.WriteToUDP(sidBuf[:n], uAddr)
  333. kcpConn, err := frpNet.NewKcpConnFromUdp(lConn, false, uAddr.String())
  334. if err != nil {
  335. xl.Error("create kcp connection from udp connection error: %v", err)
  336. return
  337. }
  338. fmuxCfg := fmux.DefaultConfig()
  339. fmuxCfg.KeepAliveInterval = 5 * time.Second
  340. fmuxCfg.LogOutput = ioutil.Discard
  341. sess, err := fmux.Server(kcpConn, fmuxCfg)
  342. if err != nil {
  343. xl.Error("create yamux server from kcp connection error: %v", err)
  344. return
  345. }
  346. defer sess.Close()
  347. muxConn, err := sess.Accept()
  348. if err != nil {
  349. xl.Error("accept for yamux connection error: %v", err)
  350. return
  351. }
  352. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  353. muxConn, []byte(pxy.cfg.Sk), m)
  354. }
  355. func (pxy *XtcpProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
  356. daddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, port))
  357. if err != nil {
  358. return err
  359. }
  360. tConn, err := net.DialUDP("udp", laddr, daddr)
  361. if err != nil {
  362. return err
  363. }
  364. //uConn := ipv4.NewConn(tConn)
  365. //uConn.SetTTL(3)
  366. tConn.Write(content)
  367. tConn.Close()
  368. return nil
  369. }
  370. // UDP
  371. type UdpProxy struct {
  372. *BaseProxy
  373. cfg *config.UdpProxyConf
  374. localAddr *net.UDPAddr
  375. readCh chan *msg.UdpPacket
  376. // include msg.UdpPacket and msg.Ping
  377. sendCh chan msg.Message
  378. workConn net.Conn
  379. }
  380. func (pxy *UdpProxy) Run() (err error) {
  381. pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort))
  382. if err != nil {
  383. return
  384. }
  385. return
  386. }
  387. func (pxy *UdpProxy) Close() {
  388. pxy.mu.Lock()
  389. defer pxy.mu.Unlock()
  390. if !pxy.closed {
  391. pxy.closed = true
  392. if pxy.workConn != nil {
  393. pxy.workConn.Close()
  394. }
  395. if pxy.readCh != nil {
  396. close(pxy.readCh)
  397. }
  398. if pxy.sendCh != nil {
  399. close(pxy.sendCh)
  400. }
  401. }
  402. }
  403. func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  404. xl := pxy.xl
  405. xl.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
  406. // close resources releated with old workConn
  407. pxy.Close()
  408. if pxy.limiter != nil {
  409. rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
  410. return conn.Close()
  411. })
  412. conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
  413. }
  414. pxy.mu.Lock()
  415. pxy.workConn = conn
  416. pxy.readCh = make(chan *msg.UdpPacket, 1024)
  417. pxy.sendCh = make(chan msg.Message, 1024)
  418. pxy.closed = false
  419. pxy.mu.Unlock()
  420. workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
  421. for {
  422. var udpMsg msg.UdpPacket
  423. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  424. xl.Warn("read from workConn for udp error: %v", errRet)
  425. return
  426. }
  427. if errRet := errors.PanicToError(func() {
  428. xl.Trace("get udp package from workConn: %s", udpMsg.Content)
  429. readCh <- &udpMsg
  430. }); errRet != nil {
  431. xl.Info("reader goroutine for udp work connection closed: %v", errRet)
  432. return
  433. }
  434. }
  435. }
  436. workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
  437. defer func() {
  438. xl.Info("writer goroutine for udp work connection closed")
  439. }()
  440. var errRet error
  441. for rawMsg := range sendCh {
  442. switch m := rawMsg.(type) {
  443. case *msg.UdpPacket:
  444. xl.Trace("send udp package to workConn: %s", m.Content)
  445. case *msg.Ping:
  446. xl.Trace("send ping message to udp workConn")
  447. }
  448. if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
  449. xl.Error("udp work write error: %v", errRet)
  450. return
  451. }
  452. }
  453. }
  454. heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
  455. var errRet error
  456. for {
  457. time.Sleep(time.Duration(30) * time.Second)
  458. if errRet = errors.PanicToError(func() {
  459. sendCh <- &msg.Ping{}
  460. }); errRet != nil {
  461. xl.Trace("heartbeat goroutine for udp work connection closed")
  462. break
  463. }
  464. }
  465. }
  466. go workConnSenderFn(pxy.workConn, pxy.sendCh)
  467. go workConnReaderFn(pxy.workConn, pxy.readCh)
  468. go heartbeatFn(pxy.workConn, pxy.sendCh)
  469. udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
  470. }
  471. // Common handler for tcp work connections.
  472. func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
  473. baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
  474. xl := xlog.FromContextSafe(ctx)
  475. var (
  476. remote io.ReadWriteCloser
  477. err error
  478. )
  479. remote = workConn
  480. if limiter != nil {
  481. remote = frpIo.WrapReadWriteCloser(limit.NewReader(workConn, limiter), limit.NewWriter(workConn, limiter), func() error {
  482. return workConn.Close()
  483. })
  484. }
  485. xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
  486. baseInfo.UseEncryption, baseInfo.UseCompression)
  487. if baseInfo.UseEncryption {
  488. remote, err = frpIo.WithEncryption(remote, encKey)
  489. if err != nil {
  490. workConn.Close()
  491. xl.Error("create encryption stream error: %v", err)
  492. return
  493. }
  494. }
  495. if baseInfo.UseCompression {
  496. remote = frpIo.WithCompression(remote)
  497. }
  498. // check if we need to send proxy protocol info
  499. var extraInfo []byte
  500. if baseInfo.ProxyProtocolVersion != "" {
  501. if m.SrcAddr != "" && m.SrcPort != 0 {
  502. if m.DstAddr == "" {
  503. m.DstAddr = "127.0.0.1"
  504. }
  505. h := &pp.Header{
  506. Command: pp.PROXY,
  507. SourceAddress: net.ParseIP(m.SrcAddr),
  508. SourcePort: m.SrcPort,
  509. DestinationAddress: net.ParseIP(m.DstAddr),
  510. DestinationPort: m.DstPort,
  511. }
  512. if strings.Contains(m.SrcAddr, ".") {
  513. h.TransportProtocol = pp.TCPv4
  514. } else {
  515. h.TransportProtocol = pp.TCPv6
  516. }
  517. if baseInfo.ProxyProtocolVersion == "v1" {
  518. h.Version = 1
  519. } else if baseInfo.ProxyProtocolVersion == "v2" {
  520. h.Version = 2
  521. }
  522. buf := bytes.NewBuffer(nil)
  523. h.WriteTo(buf)
  524. extraInfo = buf.Bytes()
  525. }
  526. }
  527. if proxyPlugin != nil {
  528. // if plugin is set, let plugin handle connections first
  529. xl.Debug("handle by plugin: %s", proxyPlugin.Name())
  530. proxyPlugin.Handle(remote, workConn, extraInfo)
  531. xl.Debug("handle by plugin finished")
  532. return
  533. } else {
  534. localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort))
  535. if err != nil {
  536. workConn.Close()
  537. xl.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
  538. return
  539. }
  540. xl.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
  541. localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  542. if len(extraInfo) > 0 {
  543. localConn.Write(extraInfo)
  544. }
  545. frpIo.Join(localConn, remote)
  546. xl.Debug("join connections closed")
  547. }
  548. }