1
0

proxy.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "net"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. "github.com/fatedier/frp/models/config"
  27. "github.com/fatedier/frp/models/msg"
  28. "github.com/fatedier/frp/models/plugin"
  29. "github.com/fatedier/frp/models/proto/udp"
  30. "github.com/fatedier/frp/utils/limit"
  31. frpNet "github.com/fatedier/frp/utils/net"
  32. "github.com/fatedier/frp/utils/xlog"
  33. "github.com/fatedier/golib/errors"
  34. frpIo "github.com/fatedier/golib/io"
  35. "github.com/fatedier/golib/pool"
  36. fmux "github.com/hashicorp/yamux"
  37. pp "github.com/pires/go-proxyproto"
  38. "golang.org/x/time/rate"
  39. )
  40. // Proxy defines how to handle work connections for different proxy type.
  41. type Proxy interface {
  42. Run() error
  43. // InWorkConn accept work connections registered to server.
  44. InWorkConn(net.Conn, *msg.StartWorkConn)
  45. Close()
  46. }
  47. func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) {
  48. var limiter *rate.Limiter
  49. limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
  50. if limitBytes > 0 {
  51. limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
  52. }
  53. baseProxy := BaseProxy{
  54. clientCfg: clientCfg,
  55. serverUDPPort: serverUDPPort,
  56. limiter: limiter,
  57. xl: xlog.FromContextSafe(ctx),
  58. ctx: ctx,
  59. }
  60. switch cfg := pxyConf.(type) {
  61. case *config.TcpProxyConf:
  62. pxy = &TcpProxy{
  63. BaseProxy: &baseProxy,
  64. cfg: cfg,
  65. }
  66. case *config.UdpProxyConf:
  67. pxy = &UdpProxy{
  68. BaseProxy: &baseProxy,
  69. cfg: cfg,
  70. }
  71. case *config.HttpProxyConf:
  72. pxy = &HttpProxy{
  73. BaseProxy: &baseProxy,
  74. cfg: cfg,
  75. }
  76. case *config.HttpsProxyConf:
  77. pxy = &HttpsProxy{
  78. BaseProxy: &baseProxy,
  79. cfg: cfg,
  80. }
  81. case *config.StcpProxyConf:
  82. pxy = &StcpProxy{
  83. BaseProxy: &baseProxy,
  84. cfg: cfg,
  85. }
  86. case *config.XtcpProxyConf:
  87. pxy = &XtcpProxy{
  88. BaseProxy: &baseProxy,
  89. cfg: cfg,
  90. }
  91. }
  92. return
  93. }
  94. type BaseProxy struct {
  95. closed bool
  96. clientCfg config.ClientCommonConf
  97. serverUDPPort int
  98. limiter *rate.Limiter
  99. mu sync.RWMutex
  100. xl *xlog.Logger
  101. ctx context.Context
  102. }
  103. // TCP
  104. type TcpProxy struct {
  105. *BaseProxy
  106. cfg *config.TcpProxyConf
  107. proxyPlugin plugin.Plugin
  108. }
  109. func (pxy *TcpProxy) Run() (err error) {
  110. if pxy.cfg.Plugin != "" {
  111. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  112. if err != nil {
  113. return
  114. }
  115. }
  116. return
  117. }
  118. func (pxy *TcpProxy) Close() {
  119. if pxy.proxyPlugin != nil {
  120. pxy.proxyPlugin.Close()
  121. }
  122. }
  123. func (pxy *TcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  124. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  125. conn, []byte(pxy.clientCfg.Token), m)
  126. }
  127. // HTTP
  128. type HttpProxy struct {
  129. *BaseProxy
  130. cfg *config.HttpProxyConf
  131. proxyPlugin plugin.Plugin
  132. }
  133. func (pxy *HttpProxy) Run() (err error) {
  134. if pxy.cfg.Plugin != "" {
  135. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  136. if err != nil {
  137. return
  138. }
  139. }
  140. return
  141. }
  142. func (pxy *HttpProxy) Close() {
  143. if pxy.proxyPlugin != nil {
  144. pxy.proxyPlugin.Close()
  145. }
  146. }
  147. func (pxy *HttpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  148. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  149. conn, []byte(pxy.clientCfg.Token), m)
  150. }
  151. // HTTPS
  152. type HttpsProxy struct {
  153. *BaseProxy
  154. cfg *config.HttpsProxyConf
  155. proxyPlugin plugin.Plugin
  156. }
  157. func (pxy *HttpsProxy) Run() (err error) {
  158. if pxy.cfg.Plugin != "" {
  159. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  160. if err != nil {
  161. return
  162. }
  163. }
  164. return
  165. }
  166. func (pxy *HttpsProxy) Close() {
  167. if pxy.proxyPlugin != nil {
  168. pxy.proxyPlugin.Close()
  169. }
  170. }
  171. func (pxy *HttpsProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  172. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  173. conn, []byte(pxy.clientCfg.Token), m)
  174. }
  175. // STCP
  176. type StcpProxy struct {
  177. *BaseProxy
  178. cfg *config.StcpProxyConf
  179. proxyPlugin plugin.Plugin
  180. }
  181. func (pxy *StcpProxy) Run() (err error) {
  182. if pxy.cfg.Plugin != "" {
  183. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  184. if err != nil {
  185. return
  186. }
  187. }
  188. return
  189. }
  190. func (pxy *StcpProxy) Close() {
  191. if pxy.proxyPlugin != nil {
  192. pxy.proxyPlugin.Close()
  193. }
  194. }
  195. func (pxy *StcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  196. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  197. conn, []byte(pxy.clientCfg.Token), m)
  198. }
  199. // XTCP
  200. type XtcpProxy struct {
  201. *BaseProxy
  202. cfg *config.XtcpProxyConf
  203. proxyPlugin plugin.Plugin
  204. }
  205. func (pxy *XtcpProxy) Run() (err error) {
  206. if pxy.cfg.Plugin != "" {
  207. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  208. if err != nil {
  209. return
  210. }
  211. }
  212. return
  213. }
  214. func (pxy *XtcpProxy) Close() {
  215. if pxy.proxyPlugin != nil {
  216. pxy.proxyPlugin.Close()
  217. }
  218. }
  219. func (pxy *XtcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  220. xl := pxy.xl
  221. defer conn.Close()
  222. var natHoleSidMsg msg.NatHoleSid
  223. err := msg.ReadMsgInto(conn, &natHoleSidMsg)
  224. if err != nil {
  225. xl.Error("xtcp read from workConn error: %v", err)
  226. return
  227. }
  228. natHoleClientMsg := &msg.NatHoleClient{
  229. ProxyName: pxy.cfg.ProxyName,
  230. Sid: natHoleSidMsg.Sid,
  231. }
  232. raddr, _ := net.ResolveUDPAddr("udp",
  233. fmt.Sprintf("%s:%d", pxy.clientCfg.ServerAddr, pxy.serverUDPPort))
  234. clientConn, err := net.DialUDP("udp", nil, raddr)
  235. defer clientConn.Close()
  236. err = msg.WriteMsg(clientConn, natHoleClientMsg)
  237. if err != nil {
  238. xl.Error("send natHoleClientMsg to server error: %v", err)
  239. return
  240. }
  241. // Wait for client address at most 5 seconds.
  242. var natHoleRespMsg msg.NatHoleResp
  243. clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
  244. buf := pool.GetBuf(1024)
  245. n, err := clientConn.Read(buf)
  246. if err != nil {
  247. xl.Error("get natHoleRespMsg error: %v", err)
  248. return
  249. }
  250. err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
  251. if err != nil {
  252. xl.Error("get natHoleRespMsg error: %v", err)
  253. return
  254. }
  255. clientConn.SetReadDeadline(time.Time{})
  256. clientConn.Close()
  257. if natHoleRespMsg.Error != "" {
  258. xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
  259. return
  260. }
  261. xl.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
  262. // Send detect message
  263. array := strings.Split(natHoleRespMsg.VisitorAddr, ":")
  264. if len(array) <= 1 {
  265. xl.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  266. }
  267. laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
  268. /*
  269. for i := 1000; i < 65000; i++ {
  270. pxy.sendDetectMsg(array[0], int64(i), laddr, "a")
  271. }
  272. */
  273. port, err := strconv.ParseInt(array[1], 10, 64)
  274. if err != nil {
  275. xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  276. return
  277. }
  278. pxy.sendDetectMsg(array[0], int(port), laddr, []byte(natHoleRespMsg.Sid))
  279. xl.Trace("send all detect msg done")
  280. msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{})
  281. // Listen for clientConn's address and wait for visitor connection
  282. lConn, err := net.ListenUDP("udp", laddr)
  283. if err != nil {
  284. xl.Error("listen on visitorConn's local adress error: %v", err)
  285. return
  286. }
  287. defer lConn.Close()
  288. lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
  289. sidBuf := pool.GetBuf(1024)
  290. var uAddr *net.UDPAddr
  291. n, uAddr, err = lConn.ReadFromUDP(sidBuf)
  292. if err != nil {
  293. xl.Warn("get sid from visitor error: %v", err)
  294. return
  295. }
  296. lConn.SetReadDeadline(time.Time{})
  297. if string(sidBuf[:n]) != natHoleRespMsg.Sid {
  298. xl.Warn("incorrect sid from visitor")
  299. return
  300. }
  301. pool.PutBuf(sidBuf)
  302. xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
  303. lConn.WriteToUDP(sidBuf[:n], uAddr)
  304. kcpConn, err := frpNet.NewKcpConnFromUdp(lConn, false, natHoleRespMsg.VisitorAddr)
  305. if err != nil {
  306. xl.Error("create kcp connection from udp connection error: %v", err)
  307. return
  308. }
  309. fmuxCfg := fmux.DefaultConfig()
  310. fmuxCfg.KeepAliveInterval = 5 * time.Second
  311. fmuxCfg.LogOutput = ioutil.Discard
  312. sess, err := fmux.Server(kcpConn, fmuxCfg)
  313. if err != nil {
  314. xl.Error("create yamux server from kcp connection error: %v", err)
  315. return
  316. }
  317. defer sess.Close()
  318. muxConn, err := sess.Accept()
  319. if err != nil {
  320. xl.Error("accept for yamux connection error: %v", err)
  321. return
  322. }
  323. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, pxy.limiter,
  324. muxConn, []byte(pxy.cfg.Sk), m)
  325. }
  326. func (pxy *XtcpProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
  327. daddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, port))
  328. if err != nil {
  329. return err
  330. }
  331. tConn, err := net.DialUDP("udp", laddr, daddr)
  332. if err != nil {
  333. return err
  334. }
  335. //uConn := ipv4.NewConn(tConn)
  336. //uConn.SetTTL(3)
  337. tConn.Write(content)
  338. tConn.Close()
  339. return nil
  340. }
  341. // UDP
  342. type UdpProxy struct {
  343. *BaseProxy
  344. cfg *config.UdpProxyConf
  345. localAddr *net.UDPAddr
  346. readCh chan *msg.UdpPacket
  347. // include msg.UdpPacket and msg.Ping
  348. sendCh chan msg.Message
  349. workConn net.Conn
  350. }
  351. func (pxy *UdpProxy) Run() (err error) {
  352. pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort))
  353. if err != nil {
  354. return
  355. }
  356. return
  357. }
  358. func (pxy *UdpProxy) Close() {
  359. pxy.mu.Lock()
  360. defer pxy.mu.Unlock()
  361. if !pxy.closed {
  362. pxy.closed = true
  363. if pxy.workConn != nil {
  364. pxy.workConn.Close()
  365. }
  366. if pxy.readCh != nil {
  367. close(pxy.readCh)
  368. }
  369. if pxy.sendCh != nil {
  370. close(pxy.sendCh)
  371. }
  372. }
  373. }
  374. func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  375. xl := pxy.xl
  376. xl.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
  377. // close resources releated with old workConn
  378. pxy.Close()
  379. if pxy.limiter != nil {
  380. rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
  381. return conn.Close()
  382. })
  383. conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
  384. }
  385. pxy.mu.Lock()
  386. pxy.workConn = conn
  387. pxy.readCh = make(chan *msg.UdpPacket, 1024)
  388. pxy.sendCh = make(chan msg.Message, 1024)
  389. pxy.closed = false
  390. pxy.mu.Unlock()
  391. workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
  392. for {
  393. var udpMsg msg.UdpPacket
  394. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  395. xl.Warn("read from workConn for udp error: %v", errRet)
  396. return
  397. }
  398. if errRet := errors.PanicToError(func() {
  399. xl.Trace("get udp package from workConn: %s", udpMsg.Content)
  400. readCh <- &udpMsg
  401. }); errRet != nil {
  402. xl.Info("reader goroutine for udp work connection closed: %v", errRet)
  403. return
  404. }
  405. }
  406. }
  407. workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
  408. defer func() {
  409. xl.Info("writer goroutine for udp work connection closed")
  410. }()
  411. var errRet error
  412. for rawMsg := range sendCh {
  413. switch m := rawMsg.(type) {
  414. case *msg.UdpPacket:
  415. xl.Trace("send udp package to workConn: %s", m.Content)
  416. case *msg.Ping:
  417. xl.Trace("send ping message to udp workConn")
  418. }
  419. if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
  420. xl.Error("udp work write error: %v", errRet)
  421. return
  422. }
  423. }
  424. }
  425. heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
  426. var errRet error
  427. for {
  428. time.Sleep(time.Duration(30) * time.Second)
  429. if errRet = errors.PanicToError(func() {
  430. sendCh <- &msg.Ping{}
  431. }); errRet != nil {
  432. xl.Trace("heartbeat goroutine for udp work connection closed")
  433. break
  434. }
  435. }
  436. }
  437. go workConnSenderFn(pxy.workConn, pxy.sendCh)
  438. go workConnReaderFn(pxy.workConn, pxy.readCh)
  439. go heartbeatFn(pxy.workConn, pxy.sendCh)
  440. udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
  441. }
  442. // Common handler for tcp work connections.
  443. func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
  444. baseInfo *config.BaseProxyConf, limiter *rate.Limiter, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
  445. xl := xlog.FromContextSafe(ctx)
  446. var (
  447. remote io.ReadWriteCloser
  448. err error
  449. )
  450. remote = workConn
  451. if limiter != nil {
  452. remote = frpIo.WrapReadWriteCloser(limit.NewReader(workConn, limiter), limit.NewWriter(workConn, limiter), func() error {
  453. return workConn.Close()
  454. })
  455. }
  456. xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
  457. baseInfo.UseEncryption, baseInfo.UseCompression)
  458. if baseInfo.UseEncryption {
  459. remote, err = frpIo.WithEncryption(remote, encKey)
  460. if err != nil {
  461. workConn.Close()
  462. xl.Error("create encryption stream error: %v", err)
  463. return
  464. }
  465. }
  466. if baseInfo.UseCompression {
  467. remote = frpIo.WithCompression(remote)
  468. }
  469. // check if we need to send proxy protocol info
  470. var extraInfo []byte
  471. if baseInfo.ProxyProtocolVersion != "" {
  472. if m.SrcAddr != "" && m.SrcPort != 0 {
  473. if m.DstAddr == "" {
  474. m.DstAddr = "127.0.0.1"
  475. }
  476. h := &pp.Header{
  477. Command: pp.PROXY,
  478. SourceAddress: net.ParseIP(m.SrcAddr),
  479. SourcePort: m.SrcPort,
  480. DestinationAddress: net.ParseIP(m.DstAddr),
  481. DestinationPort: m.DstPort,
  482. }
  483. if strings.Contains(m.SrcAddr, ".") {
  484. h.TransportProtocol = pp.TCPv4
  485. } else {
  486. h.TransportProtocol = pp.TCPv6
  487. }
  488. if baseInfo.ProxyProtocolVersion == "v1" {
  489. h.Version = 1
  490. } else if baseInfo.ProxyProtocolVersion == "v2" {
  491. h.Version = 2
  492. }
  493. buf := bytes.NewBuffer(nil)
  494. h.WriteTo(buf)
  495. extraInfo = buf.Bytes()
  496. }
  497. }
  498. if proxyPlugin != nil {
  499. // if plugin is set, let plugin handle connections first
  500. xl.Debug("handle by plugin: %s", proxyPlugin.Name())
  501. proxyPlugin.Handle(remote, workConn, extraInfo)
  502. xl.Debug("handle by plugin finished")
  503. return
  504. } else {
  505. localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort))
  506. if err != nil {
  507. workConn.Close()
  508. xl.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
  509. return
  510. }
  511. xl.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
  512. localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  513. if len(extraInfo) > 0 {
  514. localConn.Write(extraInfo)
  515. }
  516. frpIo.Join(localConn, remote)
  517. xl.Debug("join connections closed")
  518. }
  519. }