proxy.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package server
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "net"
  7. "time"
  8. "github.com/fatedier/frp/models/config"
  9. "github.com/fatedier/frp/models/msg"
  10. "github.com/fatedier/frp/models/proto/tcp"
  11. "github.com/fatedier/frp/models/proto/udp"
  12. "github.com/fatedier/frp/utils/errors"
  13. "github.com/fatedier/frp/utils/log"
  14. frpNet "github.com/fatedier/frp/utils/net"
  15. "github.com/fatedier/frp/utils/vhost"
  16. )
  17. type Proxy interface {
  18. Run() error
  19. GetControl() *Control
  20. GetName() string
  21. GetConf() config.ProxyConf
  22. GetWorkConnFromPool() (workConn frpNet.Conn, err error)
  23. Close()
  24. log.Logger
  25. }
  26. type BaseProxy struct {
  27. name string
  28. ctl *Control
  29. listeners []frpNet.Listener
  30. log.Logger
  31. }
  32. func (pxy *BaseProxy) GetName() string {
  33. return pxy.name
  34. }
  35. func (pxy *BaseProxy) GetControl() *Control {
  36. return pxy.ctl
  37. }
  38. func (pxy *BaseProxy) Close() {
  39. pxy.Info("proxy closing")
  40. for _, l := range pxy.listeners {
  41. l.Close()
  42. }
  43. }
  44. func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
  45. ctl := pxy.GetControl()
  46. // try all connections from the pool
  47. for i := 0; i < ctl.poolCount+1; i++ {
  48. if workConn, err = ctl.GetWorkConn(); err != nil {
  49. pxy.Warn("failed to get work connection: %v", err)
  50. return
  51. }
  52. pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
  53. workConn.AddLogPrefix(pxy.GetName())
  54. err := msg.WriteMsg(workConn, &msg.StartWorkConn{
  55. ProxyName: pxy.GetName(),
  56. })
  57. if err != nil {
  58. workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
  59. workConn.Close()
  60. } else {
  61. break
  62. }
  63. }
  64. if err != nil {
  65. pxy.Error("try to get work connection failed in the end")
  66. return
  67. }
  68. return
  69. }
  70. // startListenHandler start a goroutine handler for each listener.
  71. // p: p will just be passed to handler(Proxy, frpNet.Conn).
  72. // handler: each proxy type can set different handler function to deal with connections accepted from listeners.
  73. func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn)) {
  74. for _, listener := range pxy.listeners {
  75. go func(l frpNet.Listener) {
  76. for {
  77. // block
  78. // if listener is closed, err returned
  79. c, err := l.Accept()
  80. if err != nil {
  81. pxy.Info("listener is closed")
  82. return
  83. }
  84. pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
  85. go handler(p, c)
  86. }
  87. }(listener)
  88. }
  89. }
  90. func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) {
  91. basePxy := BaseProxy{
  92. name: pxyConf.GetName(),
  93. ctl: ctl,
  94. listeners: make([]frpNet.Listener, 0),
  95. Logger: log.NewPrefixLogger(ctl.runId),
  96. }
  97. switch cfg := pxyConf.(type) {
  98. case *config.TcpProxyConf:
  99. pxy = &TcpProxy{
  100. BaseProxy: basePxy,
  101. cfg: cfg,
  102. }
  103. case *config.HttpProxyConf:
  104. pxy = &HttpProxy{
  105. BaseProxy: basePxy,
  106. cfg: cfg,
  107. }
  108. case *config.HttpsProxyConf:
  109. pxy = &HttpsProxy{
  110. BaseProxy: basePxy,
  111. cfg: cfg,
  112. }
  113. case *config.UdpProxyConf:
  114. pxy = &UdpProxy{
  115. BaseProxy: basePxy,
  116. cfg: cfg,
  117. }
  118. default:
  119. return pxy, fmt.Errorf("proxy type not support")
  120. }
  121. pxy.AddLogPrefix(pxy.GetName())
  122. return
  123. }
  124. type TcpProxy struct {
  125. BaseProxy
  126. cfg *config.TcpProxyConf
  127. }
  128. func (pxy *TcpProxy) Run() error {
  129. listener, err := frpNet.ListenTcp(config.ServerCommonCfg.BindAddr, pxy.cfg.RemotePort)
  130. if err != nil {
  131. return err
  132. }
  133. listener.AddLogPrefix(pxy.name)
  134. pxy.listeners = append(pxy.listeners, listener)
  135. pxy.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort)
  136. pxy.startListenHandler(pxy, HandleUserTcpConnection)
  137. return nil
  138. }
  139. func (pxy *TcpProxy) GetConf() config.ProxyConf {
  140. return pxy.cfg
  141. }
  142. func (pxy *TcpProxy) Close() {
  143. pxy.BaseProxy.Close()
  144. }
  145. type HttpProxy struct {
  146. BaseProxy
  147. cfg *config.HttpProxyConf
  148. }
  149. func (pxy *HttpProxy) Run() (err error) {
  150. routeConfig := &vhost.VhostRouteConfig{
  151. RewriteHost: pxy.cfg.HostHeaderRewrite,
  152. Username: pxy.cfg.HttpUser,
  153. Password: pxy.cfg.HttpPwd,
  154. }
  155. locations := pxy.cfg.Locations
  156. if len(locations) == 0 {
  157. locations = []string{""}
  158. }
  159. for _, domain := range pxy.cfg.CustomDomains {
  160. routeConfig.Domain = domain
  161. for _, location := range locations {
  162. routeConfig.Location = location
  163. l, err := pxy.ctl.svr.VhostHttpMuxer.Listen(routeConfig)
  164. if err != nil {
  165. return err
  166. }
  167. l.AddLogPrefix(pxy.name)
  168. pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
  169. pxy.listeners = append(pxy.listeners, l)
  170. }
  171. }
  172. if pxy.cfg.SubDomain != "" {
  173. routeConfig.Domain = pxy.cfg.SubDomain + "." + config.ServerCommonCfg.SubDomainHost
  174. for _, location := range locations {
  175. routeConfig.Location = location
  176. l, err := pxy.ctl.svr.VhostHttpMuxer.Listen(routeConfig)
  177. if err != nil {
  178. return err
  179. }
  180. l.AddLogPrefix(pxy.name)
  181. pxy.Info("http proxy listen for host [%s] location [%s]", routeConfig.Domain, routeConfig.Location)
  182. pxy.listeners = append(pxy.listeners, l)
  183. }
  184. }
  185. pxy.startListenHandler(pxy, HandleUserTcpConnection)
  186. return
  187. }
  188. func (pxy *HttpProxy) GetConf() config.ProxyConf {
  189. return pxy.cfg
  190. }
  191. func (pxy *HttpProxy) Close() {
  192. pxy.BaseProxy.Close()
  193. }
  194. type HttpsProxy struct {
  195. BaseProxy
  196. cfg *config.HttpsProxyConf
  197. }
  198. func (pxy *HttpsProxy) Run() (err error) {
  199. routeConfig := &vhost.VhostRouteConfig{}
  200. for _, domain := range pxy.cfg.CustomDomains {
  201. routeConfig.Domain = domain
  202. l, err := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig)
  203. if err != nil {
  204. return err
  205. }
  206. l.AddLogPrefix(pxy.name)
  207. pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
  208. pxy.listeners = append(pxy.listeners, l)
  209. }
  210. if pxy.cfg.SubDomain != "" {
  211. routeConfig.Domain = pxy.cfg.SubDomain + "." + config.ServerCommonCfg.SubDomainHost
  212. l, err := pxy.ctl.svr.VhostHttpsMuxer.Listen(routeConfig)
  213. if err != nil {
  214. return err
  215. }
  216. l.AddLogPrefix(pxy.name)
  217. pxy.Info("https proxy listen for host [%s]", routeConfig.Domain)
  218. pxy.listeners = append(pxy.listeners, l)
  219. }
  220. pxy.startListenHandler(pxy, HandleUserTcpConnection)
  221. return
  222. }
  223. func (pxy *HttpsProxy) GetConf() config.ProxyConf {
  224. return pxy.cfg
  225. }
  226. func (pxy *HttpsProxy) Close() {
  227. pxy.BaseProxy.Close()
  228. }
  229. type UdpProxy struct {
  230. BaseProxy
  231. cfg *config.UdpProxyConf
  232. udpConn *net.UDPConn
  233. workConn net.Conn
  234. sendCh chan *msg.UdpPacket
  235. readCh chan *msg.UdpPacket
  236. checkCloseCh chan int
  237. }
  238. func (pxy *UdpProxy) Run() (err error) {
  239. addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", config.ServerCommonCfg.BindAddr, pxy.cfg.RemotePort))
  240. if err != nil {
  241. return err
  242. }
  243. udpConn, err := net.ListenUDP("udp", addr)
  244. if err != nil {
  245. pxy.Warn("listen udp port error: %v", err)
  246. return err
  247. }
  248. pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort)
  249. pxy.udpConn = udpConn
  250. pxy.sendCh = make(chan *msg.UdpPacket, 64)
  251. pxy.readCh = make(chan *msg.UdpPacket, 64)
  252. pxy.checkCloseCh = make(chan int)
  253. workConnReaderFn := func(conn net.Conn) {
  254. for {
  255. var udpMsg msg.UdpPacket
  256. if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
  257. pxy.Warn("read from workConn for udp error: %v", errRet)
  258. conn.Close()
  259. // notity proxy to start a new work connection
  260. errors.PanicToError(func() {
  261. pxy.checkCloseCh <- 1
  262. })
  263. return
  264. }
  265. if errRet := errors.PanicToError(func() {
  266. pxy.readCh <- &udpMsg
  267. }); errRet != nil {
  268. pxy.Info("reader goroutine for udp work connection closed")
  269. return
  270. }
  271. }
  272. }
  273. workConnSenderFn := func(conn net.Conn, ctx context.Context) {
  274. var errRet error
  275. for {
  276. select {
  277. case udpMsg, ok := <-pxy.sendCh:
  278. if !ok {
  279. return
  280. }
  281. if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
  282. pxy.Info("sender goroutine for udp work connection closed: %v", errRet)
  283. return
  284. } else {
  285. continue
  286. }
  287. case <-ctx.Done():
  288. pxy.Info("sender goroutine for udp work connection closed")
  289. return
  290. }
  291. }
  292. }
  293. go func() {
  294. for {
  295. // Sleep a while for waiting control send the NewProxyResp to client.
  296. time.Sleep(500 * time.Millisecond)
  297. workConn, err := pxy.GetWorkConnFromPool()
  298. if err != nil {
  299. time.Sleep(5 * time.Second)
  300. // check if proxy is closed
  301. select {
  302. case _, ok := <-pxy.checkCloseCh:
  303. if !ok {
  304. return
  305. }
  306. default:
  307. }
  308. continue
  309. }
  310. pxy.workConn = workConn
  311. ctx, cancel := context.WithCancel(context.Background())
  312. go workConnReaderFn(workConn)
  313. go workConnSenderFn(workConn, ctx)
  314. _, ok := <-pxy.checkCloseCh
  315. cancel()
  316. if !ok {
  317. return
  318. }
  319. }
  320. }()
  321. // Read from user connections and send wrapped udp message to sendCh.
  322. // Client will transfor udp message to local udp service and waiting for response for a while.
  323. // Response will be wrapped to be transfored in work connection to server.
  324. udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh)
  325. return nil
  326. }
  327. func (pxy *UdpProxy) GetConf() config.ProxyConf {
  328. return pxy.cfg
  329. }
  330. func (pxy *UdpProxy) Close() {
  331. pxy.BaseProxy.Close()
  332. pxy.workConn.Close()
  333. pxy.udpConn.Close()
  334. close(pxy.checkCloseCh)
  335. close(pxy.readCh)
  336. close(pxy.sendCh)
  337. }
  338. // HandleUserTcpConnection is used for incoming tcp user connections.
  339. // It can be used for tcp, http, https type.
  340. func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) {
  341. defer userConn.Close()
  342. // try all connections from the pool
  343. workConn, err := pxy.GetWorkConnFromPool()
  344. if err != nil {
  345. return
  346. }
  347. defer workConn.Close()
  348. var local io.ReadWriteCloser = workConn
  349. cfg := pxy.GetConf().GetBaseInfo()
  350. if cfg.UseEncryption {
  351. local, err = tcp.WithEncryption(local, []byte(config.ServerCommonCfg.PrivilegeToken))
  352. if err != nil {
  353. pxy.Error("create encryption stream error: %v", err)
  354. return
  355. }
  356. }
  357. if cfg.UseCompression {
  358. local = tcp.WithCompression(local)
  359. }
  360. pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
  361. workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
  362. tcp.Join(local, userConn)
  363. pxy.Debug("join connections closed")
  364. }