proxy.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "context"
  18. "fmt"
  19. "io"
  20. "io/ioutil"
  21. "net"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. "github.com/fatedier/frp/models/config"
  27. "github.com/fatedier/frp/models/msg"
  28. "github.com/fatedier/frp/models/plugin"
  29. "github.com/fatedier/frp/models/proto/udp"
  30. frpNet "github.com/fatedier/frp/utils/net"
  31. "github.com/fatedier/frp/utils/xlog"
  32. "github.com/fatedier/golib/errors"
  33. frpIo "github.com/fatedier/golib/io"
  34. "github.com/fatedier/golib/pool"
  35. fmux "github.com/hashicorp/yamux"
  36. pp "github.com/pires/go-proxyproto"
  37. )
  38. // Proxy defines how to handle work connections for different proxy type.
  39. type Proxy interface {
  40. Run() error
  41. // InWorkConn accept work connections registered to server.
  42. InWorkConn(net.Conn, *msg.StartWorkConn)
  43. Close()
  44. }
  45. func NewProxy(ctx context.Context, pxyConf config.ProxyConf, clientCfg config.ClientCommonConf, serverUDPPort int) (pxy Proxy) {
  46. baseProxy := BaseProxy{
  47. clientCfg: clientCfg,
  48. serverUDPPort: serverUDPPort,
  49. xl: xlog.FromContextSafe(ctx),
  50. ctx: ctx,
  51. }
  52. switch cfg := pxyConf.(type) {
  53. case *config.TcpProxyConf:
  54. pxy = &TcpProxy{
  55. BaseProxy: &baseProxy,
  56. cfg: cfg,
  57. }
  58. case *config.UdpProxyConf:
  59. pxy = &UdpProxy{
  60. BaseProxy: &baseProxy,
  61. cfg: cfg,
  62. }
  63. case *config.HttpProxyConf:
  64. pxy = &HttpProxy{
  65. BaseProxy: &baseProxy,
  66. cfg: cfg,
  67. }
  68. case *config.HttpsProxyConf:
  69. pxy = &HttpsProxy{
  70. BaseProxy: &baseProxy,
  71. cfg: cfg,
  72. }
  73. case *config.StcpProxyConf:
  74. pxy = &StcpProxy{
  75. BaseProxy: &baseProxy,
  76. cfg: cfg,
  77. }
  78. case *config.XtcpProxyConf:
  79. pxy = &XtcpProxy{
  80. BaseProxy: &baseProxy,
  81. cfg: cfg,
  82. }
  83. }
  84. return
  85. }
  86. type BaseProxy struct {
  87. closed bool
  88. clientCfg config.ClientCommonConf
  89. serverUDPPort int
  90. mu sync.RWMutex
  91. xl *xlog.Logger
  92. ctx context.Context
  93. }
  94. // TCP
  95. type TcpProxy struct {
  96. *BaseProxy
  97. cfg *config.TcpProxyConf
  98. proxyPlugin plugin.Plugin
  99. }
  100. func (pxy *TcpProxy) Run() (err error) {
  101. if pxy.cfg.Plugin != "" {
  102. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  103. if err != nil {
  104. return
  105. }
  106. }
  107. return
  108. }
  109. func (pxy *TcpProxy) Close() {
  110. if pxy.proxyPlugin != nil {
  111. pxy.proxyPlugin.Close()
  112. }
  113. }
  114. func (pxy *TcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  115. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  116. []byte(pxy.clientCfg.Token), m)
  117. }
  118. // HTTP
  119. type HttpProxy struct {
  120. *BaseProxy
  121. cfg *config.HttpProxyConf
  122. proxyPlugin plugin.Plugin
  123. }
  124. func (pxy *HttpProxy) Run() (err error) {
  125. if pxy.cfg.Plugin != "" {
  126. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  127. if err != nil {
  128. return
  129. }
  130. }
  131. return
  132. }
  133. func (pxy *HttpProxy) Close() {
  134. if pxy.proxyPlugin != nil {
  135. pxy.proxyPlugin.Close()
  136. }
  137. }
  138. func (pxy *HttpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  139. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  140. []byte(pxy.clientCfg.Token), m)
  141. }
  142. // HTTPS
  143. type HttpsProxy struct {
  144. *BaseProxy
  145. cfg *config.HttpsProxyConf
  146. proxyPlugin plugin.Plugin
  147. }
  148. func (pxy *HttpsProxy) Run() (err error) {
  149. if pxy.cfg.Plugin != "" {
  150. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  151. if err != nil {
  152. return
  153. }
  154. }
  155. return
  156. }
  157. func (pxy *HttpsProxy) Close() {
  158. if pxy.proxyPlugin != nil {
  159. pxy.proxyPlugin.Close()
  160. }
  161. }
  162. func (pxy *HttpsProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  163. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  164. []byte(pxy.clientCfg.Token), m)
  165. }
  166. // STCP
  167. type StcpProxy struct {
  168. *BaseProxy
  169. cfg *config.StcpProxyConf
  170. proxyPlugin plugin.Plugin
  171. }
  172. func (pxy *StcpProxy) Run() (err error) {
  173. if pxy.cfg.Plugin != "" {
  174. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  175. if err != nil {
  176. return
  177. }
  178. }
  179. return
  180. }
  181. func (pxy *StcpProxy) Close() {
  182. if pxy.proxyPlugin != nil {
  183. pxy.proxyPlugin.Close()
  184. }
  185. }
  186. func (pxy *StcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  187. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  188. []byte(pxy.clientCfg.Token), m)
  189. }
  190. // XTCP
  191. type XtcpProxy struct {
  192. *BaseProxy
  193. cfg *config.XtcpProxyConf
  194. proxyPlugin plugin.Plugin
  195. }
  196. func (pxy *XtcpProxy) Run() (err error) {
  197. if pxy.cfg.Plugin != "" {
  198. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  199. if err != nil {
  200. return
  201. }
  202. }
  203. return
  204. }
  205. func (pxy *XtcpProxy) Close() {
  206. if pxy.proxyPlugin != nil {
  207. pxy.proxyPlugin.Close()
  208. }
  209. }
  210. func (pxy *XtcpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  211. xl := pxy.xl
  212. defer conn.Close()
  213. var natHoleSidMsg msg.NatHoleSid
  214. err := msg.ReadMsgInto(conn, &natHoleSidMsg)
  215. if err != nil {
  216. xl.Error("xtcp read from workConn error: %v", err)
  217. return
  218. }
  219. natHoleClientMsg := &msg.NatHoleClient{
  220. ProxyName: pxy.cfg.ProxyName,
  221. Sid: natHoleSidMsg.Sid,
  222. }
  223. raddr, _ := net.ResolveUDPAddr("udp",
  224. fmt.Sprintf("%s:%d", pxy.clientCfg.ServerAddr, pxy.serverUDPPort))
  225. clientConn, err := net.DialUDP("udp", nil, raddr)
  226. defer clientConn.Close()
  227. err = msg.WriteMsg(clientConn, natHoleClientMsg)
  228. if err != nil {
  229. xl.Error("send natHoleClientMsg to server error: %v", err)
  230. return
  231. }
  232. // Wait for client address at most 5 seconds.
  233. var natHoleRespMsg msg.NatHoleResp
  234. clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
  235. buf := pool.GetBuf(1024)
  236. n, err := clientConn.Read(buf)
  237. if err != nil {
  238. xl.Error("get natHoleRespMsg error: %v", err)
  239. return
  240. }
  241. err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
  242. if err != nil {
  243. xl.Error("get natHoleRespMsg error: %v", err)
  244. return
  245. }
  246. clientConn.SetReadDeadline(time.Time{})
  247. clientConn.Close()
  248. if natHoleRespMsg.Error != "" {
  249. xl.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
  250. return
  251. }
  252. xl.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
  253. // Send detect message
  254. array := strings.Split(natHoleRespMsg.VisitorAddr, ":")
  255. if len(array) <= 1 {
  256. xl.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  257. }
  258. laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
  259. /*
  260. for i := 1000; i < 65000; i++ {
  261. pxy.sendDetectMsg(array[0], int64(i), laddr, "a")
  262. }
  263. */
  264. port, err := strconv.ParseInt(array[1], 10, 64)
  265. if err != nil {
  266. xl.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  267. return
  268. }
  269. pxy.sendDetectMsg(array[0], int(port), laddr, []byte(natHoleRespMsg.Sid))
  270. xl.Trace("send all detect msg done")
  271. msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{})
  272. // Listen for clientConn's address and wait for visitor connection
  273. lConn, err := net.ListenUDP("udp", laddr)
  274. if err != nil {
  275. xl.Error("listen on visitorConn's local adress error: %v", err)
  276. return
  277. }
  278. defer lConn.Close()
  279. lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
  280. sidBuf := pool.GetBuf(1024)
  281. var uAddr *net.UDPAddr
  282. n, uAddr, err = lConn.ReadFromUDP(sidBuf)
  283. if err != nil {
  284. xl.Warn("get sid from visitor error: %v", err)
  285. return
  286. }
  287. lConn.SetReadDeadline(time.Time{})
  288. if string(sidBuf[:n]) != natHoleRespMsg.Sid {
  289. xl.Warn("incorrect sid from visitor")
  290. return
  291. }
  292. pool.PutBuf(sidBuf)
  293. xl.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
  294. lConn.WriteToUDP(sidBuf[:n], uAddr)
  295. kcpConn, err := frpNet.NewKcpConnFromUdp(lConn, false, natHoleRespMsg.VisitorAddr)
  296. if err != nil {
  297. xl.Error("create kcp connection from udp connection error: %v", err)
  298. return
  299. }
  300. fmuxCfg := fmux.DefaultConfig()
  301. fmuxCfg.KeepAliveInterval = 5 * time.Second
  302. fmuxCfg.LogOutput = ioutil.Discard
  303. sess, err := fmux.Server(kcpConn, fmuxCfg)
  304. if err != nil {
  305. xl.Error("create yamux server from kcp connection error: %v", err)
  306. return
  307. }
  308. defer sess.Close()
  309. muxConn, err := sess.Accept()
  310. if err != nil {
  311. xl.Error("accept for yamux connection error: %v", err)
  312. return
  313. }
  314. HandleTcpWorkConnection(pxy.ctx, &pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf,
  315. muxConn, []byte(pxy.cfg.Sk), m)
  316. }
  317. func (pxy *XtcpProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
  318. daddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, port))
  319. if err != nil {
  320. return err
  321. }
  322. tConn, err := net.DialUDP("udp", laddr, daddr)
  323. if err != nil {
  324. return err
  325. }
  326. //uConn := ipv4.NewConn(tConn)
  327. //uConn.SetTTL(3)
  328. tConn.Write(content)
  329. tConn.Close()
  330. return nil
  331. }
  332. // UDP
  333. type UdpProxy struct {
  334. *BaseProxy
  335. cfg *config.UdpProxyConf
  336. localAddr *net.UDPAddr
  337. readCh chan *msg.UdpPacket
  338. // include msg.UdpPacket and msg.Ping
  339. sendCh chan msg.Message
  340. workConn net.Conn
  341. }
  342. func (pxy *UdpProxy) Run() (err error) {
  343. pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort))
  344. if err != nil {
  345. return
  346. }
  347. return
  348. }
  349. func (pxy *UdpProxy) Close() {
  350. pxy.mu.Lock()
  351. defer pxy.mu.Unlock()
  352. if !pxy.closed {
  353. pxy.closed = true
  354. if pxy.workConn != nil {
  355. pxy.workConn.Close()
  356. }
  357. if pxy.readCh != nil {
  358. close(pxy.readCh)
  359. }
  360. if pxy.sendCh != nil {
  361. close(pxy.sendCh)
  362. }
  363. }
  364. }
  365. func (pxy *UdpProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  366. xl := pxy.xl
  367. xl.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
  368. // close resources releated with old workConn
  369. pxy.Close()
  370. pxy.mu.Lock()
  371. pxy.workConn = conn
  372. pxy.readCh = make(chan *msg.UdpPacket, 1024)
  373. pxy.sendCh = make(chan msg.Message, 1024)
  374. pxy.closed = false
  375. pxy.mu.Unlock()
  376. workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
  377. for {
  378. var udpMsg msg.UdpPacket
  379. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  380. xl.Warn("read from workConn for udp error: %v", errRet)
  381. return
  382. }
  383. if errRet := errors.PanicToError(func() {
  384. xl.Trace("get udp package from workConn: %s", udpMsg.Content)
  385. readCh <- &udpMsg
  386. }); errRet != nil {
  387. xl.Info("reader goroutine for udp work connection closed: %v", errRet)
  388. return
  389. }
  390. }
  391. }
  392. workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
  393. defer func() {
  394. xl.Info("writer goroutine for udp work connection closed")
  395. }()
  396. var errRet error
  397. for rawMsg := range sendCh {
  398. switch m := rawMsg.(type) {
  399. case *msg.UdpPacket:
  400. xl.Trace("send udp package to workConn: %s", m.Content)
  401. case *msg.Ping:
  402. xl.Trace("send ping message to udp workConn")
  403. }
  404. if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
  405. xl.Error("udp work write error: %v", errRet)
  406. return
  407. }
  408. }
  409. }
  410. heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
  411. var errRet error
  412. for {
  413. time.Sleep(time.Duration(30) * time.Second)
  414. if errRet = errors.PanicToError(func() {
  415. sendCh <- &msg.Ping{}
  416. }); errRet != nil {
  417. xl.Trace("heartbeat goroutine for udp work connection closed")
  418. break
  419. }
  420. }
  421. }
  422. go workConnSenderFn(pxy.workConn, pxy.sendCh)
  423. go workConnReaderFn(pxy.workConn, pxy.readCh)
  424. go heartbeatFn(pxy.workConn, pxy.sendCh)
  425. udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
  426. }
  427. // Common handler for tcp work connections.
  428. func HandleTcpWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
  429. baseInfo *config.BaseProxyConf, workConn net.Conn, encKey []byte, m *msg.StartWorkConn) {
  430. xl := xlog.FromContextSafe(ctx)
  431. var (
  432. remote io.ReadWriteCloser
  433. err error
  434. )
  435. remote = workConn
  436. xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
  437. baseInfo.UseEncryption, baseInfo.UseCompression)
  438. if baseInfo.UseEncryption {
  439. remote, err = frpIo.WithEncryption(remote, encKey)
  440. if err != nil {
  441. workConn.Close()
  442. xl.Error("create encryption stream error: %v", err)
  443. return
  444. }
  445. }
  446. if baseInfo.UseCompression {
  447. remote = frpIo.WithCompression(remote)
  448. }
  449. // check if we need to send proxy protocol info
  450. var extraInfo []byte
  451. if baseInfo.ProxyProtocolVersion != "" {
  452. if m.SrcAddr != "" && m.SrcPort != 0 {
  453. if m.DstAddr == "" {
  454. m.DstAddr = "127.0.0.1"
  455. }
  456. h := &pp.Header{
  457. Command: pp.PROXY,
  458. SourceAddress: net.ParseIP(m.SrcAddr),
  459. SourcePort: m.SrcPort,
  460. DestinationAddress: net.ParseIP(m.DstAddr),
  461. DestinationPort: m.DstPort,
  462. }
  463. if strings.Contains(m.SrcAddr, ".") {
  464. h.TransportProtocol = pp.TCPv4
  465. } else {
  466. h.TransportProtocol = pp.TCPv6
  467. }
  468. if baseInfo.ProxyProtocolVersion == "v1" {
  469. h.Version = 1
  470. } else if baseInfo.ProxyProtocolVersion == "v2" {
  471. h.Version = 2
  472. }
  473. buf := bytes.NewBuffer(nil)
  474. h.WriteTo(buf)
  475. extraInfo = buf.Bytes()
  476. }
  477. }
  478. if proxyPlugin != nil {
  479. // if plugin is set, let plugin handle connections first
  480. xl.Debug("handle by plugin: %s", proxyPlugin.Name())
  481. proxyPlugin.Handle(remote, workConn, extraInfo)
  482. xl.Debug("handle by plugin finished")
  483. return
  484. } else {
  485. localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort))
  486. if err != nil {
  487. workConn.Close()
  488. xl.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
  489. return
  490. }
  491. xl.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
  492. localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  493. if len(extraInfo) > 0 {
  494. localConn.Write(extraInfo)
  495. }
  496. frpIo.Join(localConn, remote)
  497. xl.Debug("join connections closed")
  498. }
  499. }