proxy.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "fmt"
  18. "io"
  19. "io/ioutil"
  20. "net"
  21. "sync"
  22. "time"
  23. "github.com/fatedier/frp/g"
  24. "github.com/fatedier/frp/models/config"
  25. "github.com/fatedier/frp/models/msg"
  26. "github.com/fatedier/frp/models/plugin"
  27. "github.com/fatedier/frp/models/proto/udp"
  28. "github.com/fatedier/frp/utils/log"
  29. frpNet "github.com/fatedier/frp/utils/net"
  30. "github.com/fatedier/golib/errors"
  31. frpIo "github.com/fatedier/golib/io"
  32. "github.com/fatedier/golib/pool"
  33. fmux "github.com/hashicorp/yamux"
  34. )
  35. // Proxy defines how to handle work connections for different proxy type.
  36. type Proxy interface {
  37. Run() error
  38. // InWorkConn accept work connections registered to server.
  39. InWorkConn(conn frpNet.Conn)
  40. Close()
  41. log.Logger
  42. }
  43. func NewProxy(pxyConf config.ProxyConf) (pxy Proxy) {
  44. baseProxy := BaseProxy{
  45. Logger: log.NewPrefixLogger(pxyConf.GetBaseInfo().ProxyName),
  46. }
  47. switch cfg := pxyConf.(type) {
  48. case *config.TcpProxyConf:
  49. pxy = &TcpProxy{
  50. BaseProxy: &baseProxy,
  51. cfg: cfg,
  52. }
  53. case *config.UdpProxyConf:
  54. pxy = &UdpProxy{
  55. BaseProxy: &baseProxy,
  56. cfg: cfg,
  57. }
  58. case *config.HttpProxyConf:
  59. pxy = &HttpProxy{
  60. BaseProxy: &baseProxy,
  61. cfg: cfg,
  62. }
  63. case *config.HttpsProxyConf:
  64. pxy = &HttpsProxy{
  65. BaseProxy: &baseProxy,
  66. cfg: cfg,
  67. }
  68. case *config.StcpProxyConf:
  69. pxy = &StcpProxy{
  70. BaseProxy: &baseProxy,
  71. cfg: cfg,
  72. }
  73. case *config.XtcpProxyConf:
  74. pxy = &XtcpProxy{
  75. BaseProxy: &baseProxy,
  76. cfg: cfg,
  77. }
  78. }
  79. return
  80. }
  81. type BaseProxy struct {
  82. closed bool
  83. mu sync.RWMutex
  84. log.Logger
  85. }
  86. // TCP
  87. type TcpProxy struct {
  88. *BaseProxy
  89. cfg *config.TcpProxyConf
  90. proxyPlugin plugin.Plugin
  91. }
  92. func (pxy *TcpProxy) Run() (err error) {
  93. if pxy.cfg.Plugin != "" {
  94. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  95. if err != nil {
  96. return
  97. }
  98. }
  99. return
  100. }
  101. func (pxy *TcpProxy) Close() {
  102. if pxy.proxyPlugin != nil {
  103. pxy.proxyPlugin.Close()
  104. }
  105. }
  106. func (pxy *TcpProxy) InWorkConn(conn frpNet.Conn) {
  107. HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  108. []byte(g.GlbClientCfg.Token))
  109. }
  110. // HTTP
  111. type HttpProxy struct {
  112. *BaseProxy
  113. cfg *config.HttpProxyConf
  114. proxyPlugin plugin.Plugin
  115. }
  116. func (pxy *HttpProxy) Run() (err error) {
  117. if pxy.cfg.Plugin != "" {
  118. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  119. if err != nil {
  120. return
  121. }
  122. }
  123. return
  124. }
  125. func (pxy *HttpProxy) Close() {
  126. if pxy.proxyPlugin != nil {
  127. pxy.proxyPlugin.Close()
  128. }
  129. }
  130. func (pxy *HttpProxy) InWorkConn(conn frpNet.Conn) {
  131. HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  132. []byte(g.GlbClientCfg.Token))
  133. }
  134. // HTTPS
  135. type HttpsProxy struct {
  136. *BaseProxy
  137. cfg *config.HttpsProxyConf
  138. proxyPlugin plugin.Plugin
  139. }
  140. func (pxy *HttpsProxy) Run() (err error) {
  141. if pxy.cfg.Plugin != "" {
  142. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  143. if err != nil {
  144. return
  145. }
  146. }
  147. return
  148. }
  149. func (pxy *HttpsProxy) Close() {
  150. if pxy.proxyPlugin != nil {
  151. pxy.proxyPlugin.Close()
  152. }
  153. }
  154. func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn) {
  155. HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  156. []byte(g.GlbClientCfg.Token))
  157. }
  158. // STCP
  159. type StcpProxy struct {
  160. *BaseProxy
  161. cfg *config.StcpProxyConf
  162. proxyPlugin plugin.Plugin
  163. }
  164. func (pxy *StcpProxy) Run() (err error) {
  165. if pxy.cfg.Plugin != "" {
  166. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  167. if err != nil {
  168. return
  169. }
  170. }
  171. return
  172. }
  173. func (pxy *StcpProxy) Close() {
  174. if pxy.proxyPlugin != nil {
  175. pxy.proxyPlugin.Close()
  176. }
  177. }
  178. func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn) {
  179. HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn,
  180. []byte(g.GlbClientCfg.Token))
  181. }
  182. // XTCP
  183. type XtcpProxy struct {
  184. *BaseProxy
  185. cfg *config.XtcpProxyConf
  186. proxyPlugin plugin.Plugin
  187. }
  188. func (pxy *XtcpProxy) Run() (err error) {
  189. if pxy.cfg.Plugin != "" {
  190. pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
  191. if err != nil {
  192. return
  193. }
  194. }
  195. return
  196. }
  197. func (pxy *XtcpProxy) Close() {
  198. if pxy.proxyPlugin != nil {
  199. pxy.proxyPlugin.Close()
  200. }
  201. }
  202. func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn) {
  203. defer conn.Close()
  204. var natHoleSidMsg msg.NatHoleSid
  205. err := msg.ReadMsgInto(conn, &natHoleSidMsg)
  206. if err != nil {
  207. pxy.Error("xtcp read from workConn error: %v", err)
  208. return
  209. }
  210. natHoleClientMsg := &msg.NatHoleClient{
  211. ProxyName: pxy.cfg.ProxyName,
  212. Sid: natHoleSidMsg.Sid,
  213. }
  214. raddr, _ := net.ResolveUDPAddr("udp",
  215. fmt.Sprintf("%s:%d", g.GlbClientCfg.ServerAddr, g.GlbClientCfg.ServerUdpPort))
  216. clientConn, err := net.DialUDP("udp", nil, raddr)
  217. defer clientConn.Close()
  218. err = msg.WriteMsg(clientConn, natHoleClientMsg)
  219. if err != nil {
  220. pxy.Error("send natHoleClientMsg to server error: %v", err)
  221. return
  222. }
  223. // Wait for client address at most 5 seconds.
  224. var natHoleRespMsg msg.NatHoleResp
  225. clientConn.SetReadDeadline(time.Now().Add(5 * time.Second))
  226. buf := pool.GetBuf(1024)
  227. n, err := clientConn.Read(buf)
  228. if err != nil {
  229. pxy.Error("get natHoleRespMsg error: %v", err)
  230. return
  231. }
  232. err = msg.ReadMsgInto(bytes.NewReader(buf[:n]), &natHoleRespMsg)
  233. if err != nil {
  234. pxy.Error("get natHoleRespMsg error: %v", err)
  235. return
  236. }
  237. clientConn.SetReadDeadline(time.Time{})
  238. clientConn.Close()
  239. if natHoleRespMsg.Error != "" {
  240. pxy.Error("natHoleRespMsg get error info: %s", natHoleRespMsg.Error)
  241. return
  242. }
  243. pxy.Trace("get natHoleRespMsg, sid [%s], client address [%s]", natHoleRespMsg.Sid, natHoleRespMsg.ClientAddr)
  244. // Send sid to visitor udp address.
  245. time.Sleep(time.Second)
  246. laddr, _ := net.ResolveUDPAddr("udp", clientConn.LocalAddr().String())
  247. daddr, err := net.ResolveUDPAddr("udp", natHoleRespMsg.VisitorAddr)
  248. if err != nil {
  249. pxy.Error("resolve visitor udp address error: %v", err)
  250. return
  251. }
  252. lConn, err := net.DialUDP("udp", laddr, daddr)
  253. if err != nil {
  254. pxy.Error("dial visitor udp address error: %v", err)
  255. return
  256. }
  257. lConn.Write([]byte(natHoleRespMsg.Sid))
  258. kcpConn, err := frpNet.NewKcpConnFromUdp(lConn, true, natHoleRespMsg.VisitorAddr)
  259. if err != nil {
  260. pxy.Error("create kcp connection from udp connection error: %v", err)
  261. return
  262. }
  263. fmuxCfg := fmux.DefaultConfig()
  264. fmuxCfg.KeepAliveInterval = 5 * time.Second
  265. fmuxCfg.LogOutput = ioutil.Discard
  266. sess, err := fmux.Server(kcpConn, fmuxCfg)
  267. if err != nil {
  268. pxy.Error("create yamux server from kcp connection error: %v", err)
  269. return
  270. }
  271. defer sess.Close()
  272. muxConn, err := sess.Accept()
  273. if err != nil {
  274. pxy.Error("accept for yamux connection error: %v", err)
  275. return
  276. }
  277. HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf,
  278. frpNet.WrapConn(muxConn), []byte(pxy.cfg.Sk))
  279. }
  280. // UDP
  281. type UdpProxy struct {
  282. *BaseProxy
  283. cfg *config.UdpProxyConf
  284. localAddr *net.UDPAddr
  285. readCh chan *msg.UdpPacket
  286. // include msg.UdpPacket and msg.Ping
  287. sendCh chan msg.Message
  288. workConn frpNet.Conn
  289. }
  290. func (pxy *UdpProxy) Run() (err error) {
  291. pxy.localAddr, err = net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pxy.cfg.LocalIp, pxy.cfg.LocalPort))
  292. if err != nil {
  293. return
  294. }
  295. return
  296. }
  297. func (pxy *UdpProxy) Close() {
  298. pxy.mu.Lock()
  299. defer pxy.mu.Unlock()
  300. if !pxy.closed {
  301. pxy.closed = true
  302. if pxy.workConn != nil {
  303. pxy.workConn.Close()
  304. }
  305. if pxy.readCh != nil {
  306. close(pxy.readCh)
  307. }
  308. if pxy.sendCh != nil {
  309. close(pxy.sendCh)
  310. }
  311. }
  312. }
  313. func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) {
  314. pxy.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
  315. // close resources releated with old workConn
  316. pxy.Close()
  317. pxy.mu.Lock()
  318. pxy.workConn = conn
  319. pxy.readCh = make(chan *msg.UdpPacket, 1024)
  320. pxy.sendCh = make(chan msg.Message, 1024)
  321. pxy.closed = false
  322. pxy.mu.Unlock()
  323. workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
  324. for {
  325. var udpMsg msg.UdpPacket
  326. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  327. pxy.Warn("read from workConn for udp error: %v", errRet)
  328. return
  329. }
  330. if errRet := errors.PanicToError(func() {
  331. pxy.Trace("get udp package from workConn: %s", udpMsg.Content)
  332. readCh <- &udpMsg
  333. }); errRet != nil {
  334. pxy.Info("reader goroutine for udp work connection closed: %v", errRet)
  335. return
  336. }
  337. }
  338. }
  339. workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
  340. defer func() {
  341. pxy.Info("writer goroutine for udp work connection closed")
  342. }()
  343. var errRet error
  344. for rawMsg := range sendCh {
  345. switch m := rawMsg.(type) {
  346. case *msg.UdpPacket:
  347. pxy.Trace("send udp package to workConn: %s", m.Content)
  348. case *msg.Ping:
  349. pxy.Trace("send ping message to udp workConn")
  350. }
  351. if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
  352. pxy.Error("udp work write error: %v", errRet)
  353. return
  354. }
  355. }
  356. }
  357. heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
  358. var errRet error
  359. for {
  360. time.Sleep(time.Duration(30) * time.Second)
  361. if errRet = errors.PanicToError(func() {
  362. sendCh <- &msg.Ping{}
  363. }); errRet != nil {
  364. pxy.Trace("heartbeat goroutine for udp work connection closed")
  365. break
  366. }
  367. }
  368. }
  369. go workConnSenderFn(pxy.workConn, pxy.sendCh)
  370. go workConnReaderFn(pxy.workConn, pxy.readCh)
  371. go heartbeatFn(pxy.workConn, pxy.sendCh)
  372. udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
  373. }
  374. // Common handler for tcp work connections.
  375. func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin.Plugin,
  376. baseInfo *config.BaseProxyConf, workConn frpNet.Conn, encKey []byte) {
  377. var (
  378. remote io.ReadWriteCloser
  379. err error
  380. )
  381. remote = workConn
  382. if baseInfo.UseEncryption {
  383. remote, err = frpIo.WithEncryption(remote, encKey)
  384. if err != nil {
  385. workConn.Close()
  386. workConn.Error("create encryption stream error: %v", err)
  387. return
  388. }
  389. }
  390. if baseInfo.UseCompression {
  391. remote = frpIo.WithCompression(remote)
  392. }
  393. if proxyPlugin != nil {
  394. // if plugin is set, let plugin handle connections first
  395. workConn.Debug("handle by plugin: %s", proxyPlugin.Name())
  396. proxyPlugin.Handle(remote, workConn)
  397. workConn.Debug("handle by plugin finished")
  398. return
  399. } else {
  400. localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort))
  401. if err != nil {
  402. workConn.Close()
  403. workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err)
  404. return
  405. }
  406. workConn.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
  407. localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  408. frpIo.Join(localConn, remote)
  409. workConn.Debug("join connections closed")
  410. }
  411. }