proxy.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "net"
  21. "strconv"
  22. "strings"
  23. "sync"
  24. "time"
  25. "github.com/fatedier/frp/g"
  26. "github.com/fatedier/frp/models/config"
  27. "github.com/fatedier/frp/models/msg"
  28. "github.com/fatedier/frp/models/plugin"
  29. "github.com/fatedier/frp/models/proto/udp"
  30. "github.com/fatedier/frp/utils/log"
  31. frpNet "github.com/fatedier/frp/utils/net"
  32. "github.com/fatedier/golib/errors"
  33. frpIo "github.com/fatedier/golib/io"
  34. "github.com/fatedier/golib/pool"
  35. fmux "github.com/hashicorp/yamux"
  36. pp "github.com/pires/go-proxyproto"
  37. )
  38. // Proxy defines how to handle work connections for different proxy type.
  39. type Proxy interface {
  40. Run() error
  41. // InWorkConn accept work connections registered to server.
  42. InWorkConn(frpNet.Conn, *msg.StartWorkConn)
  43. Close()
  44. log.Logger
  45. }
  46. func NewProxy(pxyConf config.ProxyConf) (pxy Proxy) {
  47. baseProxy := BaseProxy{
  48. Logger: log.NewPrefixLogger(pxyConf.GetBaseInfo().ProxyName),
  49. }
  50. switch cfg := pxyConf.(type) {
  51. case *config.TcpProxyConf:
  52. pxy = &TcpProxy{
  53. BaseProxy: &baseProxy,
  54. cfg: cfg,
  55. }
  56. case *config.UdpProxyConf:
  57. pxy = &UdpProxy{
  58. BaseProxy: &baseProxy,
  59. cfg: cfg,
  60. }
  61. case *config.HttpProxyConf:
  62. pxy = &HttpProxy{
  63. BaseProxy: &baseProxy,
  64. cfg: cfg,
  65. }
  66. case *config.HttpsProxyConf:
  67. pxy = &HttpsProxy{
  68. BaseProxy: &baseProxy,
  69. cfg: cfg,
  70. }
  71. case *config.StcpProxyConf:
  72. pxy = &StcpProxy{
  73. BaseProxy: &baseProxy,
  74. cfg: cfg,
  75. }
  76. case *config.XtcpProxyConf:
  77. pxy = &XtcpProxy{
  78. BaseProxy: &baseProxy,
  79. cfg: cfg,
  80. }
  81. }
  82. return
  83. }
  84. type BaseProxy struct {
  85. closed bool
  86. mu sync.RWMutex
  87. log.Logger
  88. }
  89. // TCP
  90. type TcpProxy struct {
  91. *BaseProxy
  92. cfg *config.TcpProxyConf
  93. proxyPlugin plugin.Plugin
  94. }
  95. func (pxy *TcpProxy) Run() (err error) {
  96. if pxy.cfg.Plugin != "" {
  97. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  98. if err != nil {
  99. return
  100. }
  101. }
  102. return
  103. }
  104. func (pxy *TcpProxy) Close() {
  105. if pxy.proxyPlugin != nil {
  106. pxy.proxyPlugin.Close()
  107. }
  108. }
  109. func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
  110. HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  111. []byte(g.GlbClientCfg.Token), m)
  112. }
  113. // HTTP
  114. type HttpProxy struct {
  115. *BaseProxy
  116. cfg *config.HttpProxyConf
  117. proxyPlugin plugin.Plugin
  118. }
  119. func (pxy *HttpProxy) Run() (err error) {
  120. if pxy.cfg.Plugin != "" {
  121. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  122. if err != nil {
  123. return
  124. }
  125. }
  126. return
  127. }
  128. func (pxy *HttpProxy) Close() {
  129. if pxy.proxyPlugin != nil {
  130. pxy.proxyPlugin.Close()
  131. }
  132. }
  133. func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
  134. HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  135. []byte(g.GlbClientCfg.Token), m)
  136. }
  137. // HTTPS
  138. type HttpsProxy struct {
  139. *BaseProxy
  140. cfg *config.HttpsProxyConf
  141. proxyPlugin plugin.Plugin
  142. }
  143. func (pxy *HttpsProxy) Run() (err error) {
  144. if pxy.cfg.Plugin != "" {
  145. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  146. if err != nil {
  147. return
  148. }
  149. }
  150. return
  151. }
  152. func (pxy *HttpsProxy) Close() {
  153. if pxy.proxyPlugin != nil {
  154. pxy.proxyPlugin.Close()
  155. }
  156. }
  157. func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
  158. HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  159. []byte(g.GlbClientCfg.Token), m)
  160. }
  161. // STCP
  162. type StcpProxy struct {
  163. *BaseProxy
  164. cfg *config.StcpProxyConf
  165. proxyPlugin plugin.Plugin
  166. }
  167. func (pxy *StcpProxy) Run() (err error) {
  168. if pxy.cfg.Plugin != "" {
  169. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  170. if err != nil {
  171. return
  172. }
  173. }
  174. return
  175. }
  176. func (pxy *StcpProxy) Close() {
  177. if pxy.proxyPlugin != nil {
  178. pxy.proxyPlugin.Close()
  179. }
  180. }
  181. func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
  182. HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  183. []byte(g.GlbClientCfg.Token), m)
  184. }
  185. // XTCP
  186. type XtcpProxy struct {
  187. *BaseProxy
  188. cfg *config.XtcpProxyConf
  189. proxyPlugin plugin.Plugin
  190. }
  191. func (pxy *XtcpProxy) Run() (err error) {
  192. if pxy.cfg.Plugin != "" {
  193. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  194. if err != nil {
  195. return
  196. }
  197. }
  198. return
  199. }
  200. func (pxy *XtcpProxy) Close() {
  201. if pxy.proxyPlugin != nil {
  202. pxy.proxyPlugin.Close()
  203. }
  204. }
  205. func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
  206. defer conn.Close()
  207. var natHoleSidMsg msg.NatHoleSid
  208. err := msg.ReadMsgInto(conn, &natHoleSidMsg)
  209. if err != nil {
  210. pxy.Error("xtcp read from workConn error: %v", err)
  211. return
  212. }
  213. natHoleClientMsg := &msg.NatHoleClient{
  214. ProxyName: pxy.cfg.ProxyName,
  215. Sid: natHoleSidMsg.Sid,
  216. }
  217. raddr, _ := net.ResolveUDPAddr("udp",
  218. fmt.Sprintf("%s:%d", g.GlbClientCfg.ServerAddr, g.GlbClientCfg.ServerUdpPort))
  219. clientConn, err := net.DialUDP("udp", nil, raddr)
  220. defer clientConn.Close()
  221. err = msg.WriteMsg(clientConn, natHoleClientMsg)
  222. if err != nil {
  223. pxy.Error("send natHoleClientMsg to server error: %v", err)
  224. return
  225. }
  226. // Wait for client address at most 5 seconds.
  227. var natHoleRespMsg msg.NatHoleResp
  228. clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
  229. buf := pool.GetBuf(1024)
  230. n, err := clientConn.Read(buf)
  231. if err != nil {
  232. pxy.Error("get natHoleRespMsg error: %v", err)
  233. return
  234. }
  235. err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
  236. if err != nil {
  237. pxy.Error("get natHoleRespMsg error: %v", err)
  238. return
  239. }
  240. clientConn.SetReadDeadline(time.Time{})
  241. clientConn.Close()
  242. if natHoleRespMsg.Error != "" {
  243. pxy.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
  244. return
  245. }
  246. pxy.Trace("get natHoleRespMsg, sid [%s], client address [%s] visitor address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr, natHoleRespMsg.VisitorAddr)
  247. // Send detect message
  248. array := strings.Split(natHoleRespMsg.VisitorAddr, ":")
  249. if len(array) <= 1 {
  250. pxy.Error("get NatHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  251. }
  252. laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
  253. /*
  254. for i := 1000; i < 65000; i++ {
  255. pxy.sendDetectMsg(array[0], int64(i), laddr, "a")
  256. }
  257. */
  258. port, err := strconv.ParseInt(array[1], 10, 64)
  259. if err != nil {
  260. pxy.Error("get natHoleResp visitor address error: %v", natHoleRespMsg.VisitorAddr)
  261. return
  262. }
  263. pxy.sendDetectMsg(array[0], int(port), laddr, []byte(natHoleRespMsg.Sid))
  264. pxy.Trace("send all detect msg done")
  265. msg.WriteMsg(conn, &msg.NatHoleClientDetectOK{})
  266. // Listen for clientConn's address and wait for visitor connection
  267. lConn, err := net.ListenUDP("udp", laddr)
  268. if err != nil {
  269. pxy.Error("listen on visitorConn's local adress error: %v", err)
  270. return
  271. }
  272. defer lConn.Close()
  273. lConn.SetReadDeadline(time.Now().Add(8 * time.Second))
  274. sidBuf := pool.GetBuf(1024)
  275. var uAddr *net.UDPAddr
  276. n, uAddr, err = lConn.ReadFromUDP(sidBuf)
  277. if err != nil {
  278. pxy.Warn("get sid from visitor error: %v", err)
  279. return
  280. }
  281. lConn.SetReadDeadline(time.Time{})
  282. if string(sidBuf[:n]) != natHoleRespMsg.Sid {
  283. pxy.Warn("incorrect sid from visitor")
  284. return
  285. }
  286. pool.PutBuf(sidBuf)
  287. pxy.Info("nat hole connection make success, sid [%s]", natHoleRespMsg.Sid)
  288. lConn.WriteToUDP(sidBuf[:n], uAddr)
  289. kcpConn, err := frpNet.NewKcpConnFromUdp(lConn, false, natHoleRespMsg.VisitorAddr)
  290. if err != nil {
  291. pxy.Error("create kcp connection from udp connection error: %v", err)
  292. return
  293. }
  294. fmuxCfg := fmux.DefaultConfig()
  295. fmuxCfg.KeepAliveInterval = 5 * time.Second
  296. fmuxCfg.LogOutput = ioutil.Discard
  297. sess, err := fmux.Server(kcpConn, fmuxCfg)
  298. if err != nil {
  299. pxy.Error("create yamux server from kcp connection error: %v", err)
  300. return
  301. }
  302. defer sess.Close()
  303. muxConn, err := sess.Accept()
  304. if err != nil {
  305. pxy.Error("accept for yamux connection error: %v", err)
  306. return
  307. }
  308. HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf,
  309. frpNet.WrapConn(muxConn), []byte(pxy.cfg.Sk), m)
  310. }
  311. func (pxy *XtcpProxy) sendDetectMsg(addr string, port int, laddr *net.UDPAddr, content []byte) (err error) {
  312. daddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, port))
  313. if err != nil {
  314. return err
  315. }
  316. tConn, err := net.DialUDP("udp", laddr, daddr)
  317. if err != nil {
  318. return err
  319. }
  320. //uConn := ipv4.NewConn(tConn)
  321. //uConn.SetTTL(3)
  322. tConn.Write(content)
  323. tConn.Close()
  324. return nil
  325. }
  326. // UDP
  327. type UdpProxy struct {
  328. *BaseProxy
  329. cfg *config.UdpProxyConf
  330. localAddr *net.UDPAddr
  331. readCh chan *msg.UdpPacket
  332. // include msg.UdpPacket and msg.Ping
  333. sendCh chan msg.Message
  334. workConn frpNet.Conn
  335. }
  336. func (pxy *UdpProxy) Run() (err error) {
  337. pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort))
  338. if err != nil {
  339. return
  340. }
  341. return
  342. }
  343. func (pxy *UdpProxy) Close() {
  344. pxy.mu.Lock()
  345. defer pxy.mu.Unlock()
  346. if !pxy.closed {
  347. pxy.closed = true
  348. if pxy.workConn != nil {
  349. pxy.workConn.Close()
  350. }
  351. if pxy.readCh != nil {
  352. close(pxy.readCh)
  353. }
  354. if pxy.sendCh != nil {
  355. close(pxy.sendCh)
  356. }
  357. }
  358. }
  359. func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn, m *msg.StartWorkConn) {
  360. pxy.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
  361. // close resources releated with old workConn
  362. pxy.Close()
  363. pxy.mu.Lock()
  364. pxy.workConn = conn
  365. pxy.readCh = make(chan *msg.UdpPacket, 1024)
  366. pxy.sendCh = make(chan msg.Message, 1024)
  367. pxy.closed = false
  368. pxy.mu.Unlock()
  369. workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
  370. for {
  371. var udpMsg msg.UdpPacket
  372. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  373. pxy.Warn("read from workConn for udp error: %v", errRet)
  374. return
  375. }
  376. if errRet := errors.PanicToError(func() {
  377. pxy.Trace("get udp package from workConn: %s", udpMsg.Content)
  378. readCh <- &udpMsg
  379. }); errRet != nil {
  380. pxy.Info("reader goroutine for udp work connection closed: %v", errRet)
  381. return
  382. }
  383. }
  384. }
  385. workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
  386. defer func() {
  387. pxy.Info("writer goroutine for udp work connection closed")
  388. }()
  389. var errRet error
  390. for rawMsg := range sendCh {
  391. switch m := rawMsg.(type) {
  392. case *msg.UdpPacket:
  393. pxy.Trace("send udp package to workConn: %s", m.Content)
  394. case *msg.Ping:
  395. pxy.Trace("send ping message to udp workConn")
  396. }
  397. if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
  398. pxy.Error("udp work write error: %v", errRet)
  399. return
  400. }
  401. }
  402. }
  403. heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
  404. var errRet error
  405. for {
  406. time.Sleep(time.Duration(30) * time.Second)
  407. if errRet = errors.PanicToError(func() {
  408. sendCh <- &msg.Ping{}
  409. }); errRet != nil {
  410. pxy.Trace("heartbeat goroutine for udp work connection closed")
  411. break
  412. }
  413. }
  414. }
  415. go workConnSenderFn(pxy.workConn, pxy.sendCh)
  416. go workConnReaderFn(pxy.workConn, pxy.readCh)
  417. go heartbeatFn(pxy.workConn, pxy.sendCh)
  418. udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
  419. }
  420. // Common handler for tcp work connections.
  421. func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
  422. baseInfo *config.BaseProxyConf, workConn frpNet.Conn, encKey []byte, m *msg.StartWorkConn) {
  423. var (
  424. remote io.ReadWriteCloser
  425. err error
  426. )
  427. remote = workConn
  428. if baseInfo.UseEncryption {
  429. remote, err = frpIo.WithEncryption(remote, encKey)
  430. if err != nil {
  431. workConn.Close()
  432. workConn.Error("create encryption stream error: %v", err)
  433. return
  434. }
  435. }
  436. if baseInfo.UseCompression {
  437. remote = frpIo.WithCompression(remote)
  438. }
  439. if proxyPlugin != nil {
  440. // if plugin is set, let plugin handle connections first
  441. workConn.Debug("handle by plugin: %s", proxyPlugin.Name())
  442. proxyPlugin.Handle(remote, workConn)
  443. workConn.Debug("handle by plugin finished")
  444. return
  445. } else {
  446. localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort))
  447. if err != nil {
  448. workConn.Close()
  449. workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
  450. return
  451. }
  452. workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
  453. localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  454. // check if we need to send proxy protocol info
  455. if baseInfo.ProxyProtocolVersion != "" {
  456. if m.SrcAddr != "" && m.SrcPort != 0 {
  457. h := &pp.Header{
  458. Command: pp.PROXY,
  459. SourceAddress: net.ParseIP(m.SrcAddr),
  460. SourcePort: m.SrcPort,
  461. DestinationAddress: net.ParseIP(m.DstAddr),
  462. DestinationPort: m.DstPort,
  463. }
  464. if h.SourceAddress.To16() == nil {
  465. h.TransportProtocol = pp.TCPv4
  466. } else {
  467. h.TransportProtocol = pp.TCPv6
  468. }
  469. if baseInfo.ProxyProtocolVersion == "v1" {
  470. h.Version = 1
  471. } else if baseInfo.ProxyProtocolVersion == "v2" {
  472. h.Version = 2
  473. }
  474. h.WriteTo(localConn)
  475. }
  476. }
  477. frpIo.Join(localConn, remote)
  478. workConn.Debug("join connections closed")
  479. }
  480. }