proxy.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net"
  20. "time"
  21. "github.com/fatedier/frp/models/config"
  22. "github.com/fatedier/frp/models/msg"
  23. "github.com/fatedier/frp/models/proto/tcp"
  24. "github.com/fatedier/frp/models/proto/udp"
  25. "github.com/fatedier/frp/utils/errors"
  26. "github.com/fatedier/frp/utils/log"
  27. frpNet "github.com/fatedier/frp/utils/net"
  28. "github.com/fatedier/frp/utils/vhost"
  29. )
  30. type Proxy interface {
  31. Run() error
  32. GetControl() *Control
  33. GetName() string
  34. GetConf() config.ProxyConf
  35. GetWorkConnFromPool() (workConn frpNet.Conn, err error)
  36. Close()
  37. log.Logger
  38. }
  39. type BaseProxy struct {
  40. name string
  41. ctl *Control
  42. listeners []frpNet.Listener
  43. log.Logger
  44. }
  45. func (pxy *BaseProxy) GetName() string {
  46. return pxy.name
  47. }
  48. func (pxy *BaseProxy) GetControl() *Control {
  49. return pxy.ctl
  50. }
  51. func (pxy *BaseProxy) Close() {
  52. pxy.Info("proxy closing")
  53. for _, l := range pxy.listeners {
  54. l.Close()
  55. }
  56. }
  57. func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
  58. ctl := pxy.GetControl()
  59. // try all connections from the pool
  60. for i := 0; i < ctl.poolCount+1; i++ {
  61. if workConn, err = ctl.GetWorkConn(); err != nil {
  62. pxy.Warn("failed to get work connection: %v", err)
  63. return
  64. }
  65. pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
  66. workConn.AddLogPrefix(pxy.GetName())
  67. err := msg.WriteMsg(workConn, &msg.StartWorkConn{
  68. ProxyName: pxy.GetName(),
  69. })
  70. if err != nil {
  71. workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
  72. workConn.Close()
  73. } else {
  74. break
  75. }
  76. }
  77. if err != nil {
  78. pxy.Error("try to get work connection failed in the end")
  79. return
  80. }
  81. return
  82. }
  83. // startListenHandler start a goroutine handler for each listener.
  84. // p: p will just be passed to handler(Proxy, frpNet.Conn).
  85. // handler: each proxy type can set different handler function to deal with connections accepted from listeners.
  86. func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn)) {
  87. for _, listener := range pxy.listeners {
  88. go func(l frpNet.Listener) {
  89. for {
  90. // block
  91. // if listener is closed, err returned
  92. c, err := l.Accept()
  93. if err != nil {
  94. pxy.Info("listener is closed")
  95. return
  96. }
  97. pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
  98. go handler(p, c)
  99. }
  100. }(listener)
  101. }
  102. }
  103. func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) {
  104. basePxy := BaseProxy{
  105. name: pxyConf.GetName(),
  106. ctl: ctl,
  107. listeners: make([]frpNet.Listener, 0),
  108. Logger: log.NewPrefixLogger(ctl.runId),
  109. }
  110. switch cfg := pxyConf.(type) {
  111. case *config.TcpProxyConf:
  112. pxy = &TcpProxy{
  113. BaseProxy: basePxy,
  114. cfg: cfg,
  115. }
  116. case *config.HttpProxyConf:
  117. pxy = &HttpProxy{
  118. BaseProxy: basePxy,
  119. cfg: cfg,
  120. }
  121. case *config.HttpsProxyConf:
  122. pxy = &HttpsProxy{
  123. BaseProxy: basePxy,
  124. cfg: cfg,
  125. }
  126. case *config.UdpProxyConf:
  127. pxy = &UdpProxy{
  128. BaseProxy: basePxy,
  129. cfg: cfg,
  130. }
  131. default:
  132. return pxy, fmt.Errorf("proxy type not support")
  133. }
  134. pxy.AddLogPrefix(pxy.GetName())
  135. return
  136. }
  137. type TcpProxy struct {
  138. BaseProxy
  139. cfg *config.TcpProxyConf
  140. }
  141. func (pxy *TcpProxy) Run() error {
  142. listener, err := frpNet.ListenTcp(config.ServerCommonCfg.BindAddr, pxy.cfg.RemotePort)
  143. if err != nil {
  144. return err
  145. }
  146. listener.AddLogPrefix(pxy.name)
  147. pxy.listeners = append(pxy.listeners, listener)
  148. pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort)
  149. pxy.startListenHandler(pxy, HandleUserTcpConnection)
  150. return nil
  151. }
  152. func (pxy *TcpProxy) GetConf() config.ProxyConf {
  153. return pxy.cfg
  154. }
  155. func (pxy *TcpProxy) Close() {
  156. pxy.BaseProxy.Close()
  157. }
  158. type HttpProxy struct {
  159. BaseProxy
  160. cfg *config.HttpProxyConf
  161. }
  162. func (pxy *HttpProxy) Run() (err error) {
  163. routeConfig := &vhost.VhostRouteConfig{
  164. RewriteHost: pxy.cfg.HostHeaderRewrite,
  165. Username: pxy.cfg.HttpUser,
  166. Password: pxy.cfg.HttpPwd,
  167. }
  168. locations := pxy.cfg.Locations
  169. if len(locations) == 0 {
  170. locations = []string{""}
  171. }
  172. for _, domain := range pxy.cfg.CustomDomains {
  173. routeConfig.Domain = domain
  174. for _, location := range locations {
  175. routeConfig.Location = location
  176. l, err := pxy.ctl.svr.VhostHttpMuxer.Listen(routeConfig)
  177. if err != nil {
  178. return err
  179. }
  180. l.AddLogPrefix(pxy.name)
  181. pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
  182. pxy.listeners = append(pxy.listeners, l)
  183. }
  184. }
  185. if pxy.cfg.SubDomain != "" {
  186. routeConfig.Domain = pxy.cfg.SubDomain + "." + config.ServerCommonCfg.SubDomainHost
  187. for _, location := range locations {
  188. routeConfig.Location = location
  189. l, err := pxy.ctl.svr.VhostHttpMuxer.Listen(routeConfig)
  190. if err != nil {
  191. return err
  192. }
  193. l.AddLogPrefix(pxy.name)
  194. pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
  195. pxy.listeners = append(pxy.listeners, l)
  196. }
  197. }
  198. pxy.startListenHandler(pxy, HandleUserTcpConnection)
  199. return
  200. }
  201. func (pxy *HttpProxy) GetConf() config.ProxyConf {
  202. return pxy.cfg
  203. }
  204. func (pxy *HttpProxy) Close() {
  205. pxy.BaseProxy.Close()
  206. }
  207. type HttpsProxy struct {
  208. BaseProxy
  209. cfg *config.HttpsProxyConf
  210. }
  211. func (pxy *HttpsProxy) Run() (err error) {
  212. routeConfig := &vhost.VhostRouteConfig{}
  213. for _, domain := range pxy.cfg.CustomDomains {
  214. routeConfig.Domain = domain
  215. l, err := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig)
  216. if err != nil {
  217. return err
  218. }
  219. l.AddLogPrefix(pxy.name)
  220. pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
  221. pxy.listeners = append(pxy.listeners, l)
  222. }
  223. if pxy.cfg.SubDomain != "" {
  224. routeConfig.Domain = pxy.cfg.SubDomain + "." + config.ServerCommonCfg.SubDomainHost
  225. l, err := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig)
  226. if err != nil {
  227. return err
  228. }
  229. l.AddLogPrefix(pxy.name)
  230. pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
  231. pxy.listeners = append(pxy.listeners, l)
  232. }
  233. pxy.startListenHandler(pxy, HandleUserTcpConnection)
  234. return
  235. }
  236. func (pxy *HttpsProxy) GetConf() config.ProxyConf {
  237. return pxy.cfg
  238. }
  239. func (pxy *HttpsProxy) Close() {
  240. pxy.BaseProxy.Close()
  241. }
  242. type UdpProxy struct {
  243. BaseProxy
  244. cfg *config.UdpProxyConf
  245. udpConn *net.UDPConn
  246. workConn net.Conn
  247. sendCh chan *msg.UdpPacket
  248. readCh chan *msg.UdpPacket
  249. checkCloseCh chan int
  250. }
  251. func (pxy *UdpProxy) Run() (err error) {
  252. addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", config.ServerCommonCfg.BindAddr, pxy.cfg.RemotePort))
  253. if err != nil {
  254. return err
  255. }
  256. udpConn, err := net.ListenUDP("udp", addr)
  257. if err != nil {
  258. pxy.Warn("listen udp port error: %v", err)
  259. return err
  260. }
  261. pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort)
  262. pxy.udpConn = udpConn
  263. pxy.sendCh = make(chan *msg.UdpPacket, 64)
  264. pxy.readCh = make(chan *msg.UdpPacket, 64)
  265. pxy.checkCloseCh = make(chan int)
  266. workConnReaderFn := func(conn net.Conn) {
  267. for {
  268. var udpMsg msg.UdpPacket
  269. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  270. pxy.Warn("read from workConn for udp error: %v", errRet)
  271. conn.Close()
  272. // notity proxy to start a new work connection
  273. errors.PanicToError(func() {
  274. pxy.checkCloseCh <- 1
  275. })
  276. return
  277. }
  278. if errRet := errors.PanicToError(func() {
  279. pxy.readCh <- &udpMsg
  280. }); errRet != nil {
  281. pxy.Info("reader goroutine for udp work connection closed")
  282. return
  283. }
  284. }
  285. }
  286. workConnSenderFn := func(conn net.Conn, ctx context.Context) {
  287. var errRet error
  288. for {
  289. select {
  290. case udpMsg, ok := <-pxy.sendCh:
  291. if !ok {
  292. return
  293. }
  294. if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
  295. pxy.Info("sender goroutine for udp work connection closed: %v", errRet)
  296. return
  297. } else {
  298. continue
  299. }
  300. case <-ctx.Done():
  301. pxy.Info("sender goroutine for udp work connection closed")
  302. return
  303. }
  304. }
  305. }
  306. go func() {
  307. for {
  308. // Sleep a while for waiting control send the NewProxyResp to client.
  309. time.Sleep(500 * time.Millisecond)
  310. workConn, err := pxy.GetWorkConnFromPool()
  311. if err != nil {
  312. time.Sleep(5 * time.Second)
  313. // check if proxy is closed
  314. select {
  315. case _, ok := <-pxy.checkCloseCh:
  316. if !ok {
  317. return
  318. }
  319. default:
  320. }
  321. continue
  322. }
  323. pxy.workConn = workConn
  324. ctx, cancel := context.WithCancel(context.Background())
  325. go workConnReaderFn(workConn)
  326. go workConnSenderFn(workConn, ctx)
  327. _, ok := <-pxy.checkCloseCh
  328. cancel()
  329. if !ok {
  330. return
  331. }
  332. }
  333. }()
  334. // Read from user connections and send wrapped udp message to sendCh.
  335. // Client will transfor udp message to local udp service and waiting for response for a while.
  336. // Response will be wrapped to be transfored in work connection to server.
  337. udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh)
  338. return nil
  339. }
  340. func (pxy *UdpProxy) GetConf() config.ProxyConf {
  341. return pxy.cfg
  342. }
  343. func (pxy *UdpProxy) Close() {
  344. pxy.BaseProxy.Close()
  345. pxy.workConn.Close()
  346. pxy.udpConn.Close()
  347. close(pxy.checkCloseCh)
  348. close(pxy.readCh)
  349. close(pxy.sendCh)
  350. }
  351. // HandleUserTcpConnection is used for incoming tcp user connections.
  352. // It can be used for tcp, http, https type.
  353. func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) {
  354. defer userConn.Close()
  355. // try all connections from the pool
  356. workConn, err := pxy.GetWorkConnFromPool()
  357. if err != nil {
  358. return
  359. }
  360. defer workConn.Close()
  361. var local io.ReadWriteCloser = workConn
  362. cfg := pxy.GetConf().GetBaseInfo()
  363. if cfg.UseEncryption {
  364. local, err = tcp.WithEncryption(local, []byte(config.ServerCommonCfg.PrivilegeToken))
  365. if err != nil {
  366. pxy.Error("create encryption stream error: %v", err)
  367. return
  368. }
  369. }
  370. if cfg.UseCompression {
  371. local = tcp.WithCompression(local)
  372. }
  373. pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
  374. workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
  375. StatsOpenConnection(pxy.GetName())
  376. inCount, outCount := tcp.Join(local, userConn)
  377. StatsCloseConnection(pxy.GetName())
  378. StatsAddFlowIn(pxy.GetName(), inCount)
  379. StatsAddFlowOut(pxy.GetName(), outCount)
  380. pxy.Debug("join connections closed")
  381. }