server.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. // Copyright 2016 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "fmt"
  17. "net"
  18. "sync"
  19. "time"
  20. "github.com/fatedier/frp/src/models/config"
  21. "github.com/fatedier/frp/src/models/consts"
  22. "github.com/fatedier/frp/src/models/metric"
  23. "github.com/fatedier/frp/src/models/msg"
  24. "github.com/fatedier/frp/src/utils/conn"
  25. "github.com/fatedier/frp/src/utils/log"
  26. "github.com/fatedier/frp/src/utils/pool"
  27. )
  28. type Listener interface {
  29. Accept() (*conn.Conn, error)
  30. Close() error
  31. }
  32. type ProxyServer struct {
  33. config.BaseConf
  34. BindAddr string
  35. ListenPort int64
  36. CustomDomains []string
  37. Locations []string
  38. Status int64
  39. CtlConn *conn.Conn // control connection with frpc
  40. WorkConnUdp *conn.Conn // work connection for udp
  41. udpConn *net.UDPConn
  42. listeners []Listener // accept new connection from remote users
  43. ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
  44. workConnChan chan *conn.Conn // get new work conns from control goroutine
  45. udpSenderChan chan *msg.UdpPacket
  46. mutex sync.RWMutex
  47. closeChan chan struct{} // close this channel for notifying other goroutines that the proxy is closed
  48. }
  49. func NewProxyServer() (p *ProxyServer) {
  50. p = &ProxyServer{
  51. CustomDomains: make([]string, 0),
  52. Locations: make([]string, 0),
  53. }
  54. return p
  55. }
  56. func NewProxyServerFromCtlMsg(req *msg.ControlReq) (p *ProxyServer) {
  57. p = &ProxyServer{}
  58. p.Name = req.ProxyName
  59. p.Type = req.ProxyType
  60. p.UseEncryption = req.UseEncryption
  61. p.UseGzip = req.UseGzip
  62. p.PrivilegeMode = req.PrivilegeMode
  63. p.PrivilegeToken = PrivilegeToken
  64. p.BindAddr = BindAddr
  65. if p.Type == "tcp" || p.Type == "udp" {
  66. p.ListenPort = req.RemotePort
  67. } else if p.Type == "http" {
  68. p.ListenPort = VhostHttpPort
  69. } else if p.Type == "https" {
  70. p.ListenPort = VhostHttpsPort
  71. }
  72. p.CustomDomains = req.CustomDomains
  73. p.Locations = req.Locations
  74. p.HostHeaderRewrite = req.HostHeaderRewrite
  75. p.HttpUserName = req.HttpUserName
  76. p.HttpPassWord = req.HttpPassWord
  77. return
  78. }
  79. func (p *ProxyServer) Init() {
  80. p.Lock()
  81. p.Status = consts.Idle
  82. metric.SetStatus(p.Name, p.Status)
  83. p.workConnChan = make(chan *conn.Conn, p.PoolCount+10)
  84. p.ctlMsgChan = make(chan int64, p.PoolCount+10)
  85. p.udpSenderChan = make(chan *msg.UdpPacket, 1024)
  86. p.listeners = make([]Listener, 0)
  87. p.closeChan = make(chan struct{})
  88. p.Unlock()
  89. }
  90. func (p *ProxyServer) Compare(p2 *ProxyServer) bool {
  91. if p.Name != p2.Name || p.AuthToken != p2.AuthToken || p.Type != p2.Type ||
  92. p.BindAddr != p2.BindAddr || p.ListenPort != p2.ListenPort || p.HostHeaderRewrite != p2.HostHeaderRewrite {
  93. return false
  94. }
  95. if len(p.CustomDomains) != len(p2.CustomDomains) {
  96. return false
  97. }
  98. for i, _ := range p.CustomDomains {
  99. if p.CustomDomains[i] != p2.CustomDomains[i] {
  100. return false
  101. }
  102. }
  103. if len(p.Locations) != len(p2.Locations) {
  104. return false
  105. }
  106. for i, _ := range p.Locations {
  107. if p.Locations[i] != p2.Locations[i] {
  108. return false
  109. }
  110. }
  111. return true
  112. }
  113. func (p *ProxyServer) Lock() {
  114. p.mutex.Lock()
  115. }
  116. func (p *ProxyServer) Unlock() {
  117. p.mutex.Unlock()
  118. }
  119. // start listening for user conns
  120. func (p *ProxyServer) Start(c *conn.Conn) (err error) {
  121. p.CtlConn = c
  122. p.Init()
  123. if p.Type == "tcp" {
  124. l, err := conn.Listen(p.BindAddr, p.ListenPort)
  125. if err != nil {
  126. return err
  127. }
  128. p.listeners = append(p.listeners, l)
  129. } else if p.Type == "http" {
  130. for _, domain := range p.CustomDomains {
  131. if len(p.Locations) == 0 {
  132. l, err := VhostHttpMuxer.Listen(domain, "", p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  133. if err != nil {
  134. return err
  135. }
  136. log.Info("ProxyName [%s], type http listen for host [%s] location [%s]", p.Name, domain, "")
  137. p.listeners = append(p.listeners, l)
  138. } else {
  139. for _, location := range p.Locations {
  140. l, err := VhostHttpMuxer.Listen(domain, location, p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  141. if err != nil {
  142. return err
  143. }
  144. log.Info("ProxyName [%s], type http listen for host [%s] location [%s]", p.Name, domain, location)
  145. p.listeners = append(p.listeners, l)
  146. }
  147. }
  148. }
  149. if p.SubDomain != "" {
  150. if len(p.Locations) == 0 {
  151. l, err := VhostHttpMuxer.Listen(p.SubDomain, "", p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  152. if err != nil {
  153. return err
  154. }
  155. log.Info("ProxyName [%s], type http listen for host [%s] location [%s]", p.Name, p.SubDomain, "")
  156. p.listeners = append(p.listeners, l)
  157. } else {
  158. for _, location := range p.Locations {
  159. l, err := VhostHttpMuxer.Listen(p.SubDomain, location, p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  160. if err != nil {
  161. return err
  162. }
  163. log.Info("ProxyName [%s], type http listen for host [%s] location [%s]", p.Name, p.SubDomain, location)
  164. p.listeners = append(p.listeners, l)
  165. }
  166. }
  167. }
  168. } else if p.Type == "https" {
  169. for _, domain := range p.CustomDomains {
  170. l, err := VhostHttpsMuxer.Listen(domain, "", p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  171. if err != nil {
  172. return err
  173. }
  174. log.Info("ProxyName [%s], type https listen for host [%s]", p.Name, domain)
  175. p.listeners = append(p.listeners, l)
  176. }
  177. if p.SubDomain != "" {
  178. l, err := VhostHttpsMuxer.Listen(p.SubDomain, "", p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  179. if err != nil {
  180. return err
  181. }
  182. log.Info("ProxyName [%s], type https listen for host [%s]", p.Name, p.SubDomain)
  183. p.listeners = append(p.listeners, l)
  184. }
  185. }
  186. p.Lock()
  187. p.Status = consts.Working
  188. p.Unlock()
  189. metric.SetStatus(p.Name, p.Status)
  190. if p.Type == "udp" {
  191. // udp is special
  192. p.udpConn, err = conn.ListenUDP(p.BindAddr, p.ListenPort)
  193. if err != nil {
  194. log.Warn("ProxyName [%s], listen udp port error: %v", p.Name, err)
  195. return err
  196. }
  197. go func() {
  198. for {
  199. buf := pool.GetBuf(2048)
  200. n, remoteAddr, err := p.udpConn.ReadFromUDP(buf)
  201. if err != nil {
  202. log.Info("ProxyName [%s], udp listener is closed", p.Name)
  203. return
  204. }
  205. localAddr, _ := net.ResolveUDPAddr("udp", p.udpConn.LocalAddr().String())
  206. udpPacket := msg.NewUdpPacket(buf[0:n], remoteAddr, localAddr)
  207. select {
  208. case p.udpSenderChan <- udpPacket:
  209. default:
  210. log.Warn("ProxyName [%s], udp sender channel is full", p.Name)
  211. }
  212. pool.PutBuf(buf)
  213. }
  214. }()
  215. } else {
  216. // create connection pool if needed
  217. if p.PoolCount > 0 {
  218. go p.connectionPoolManager(p.closeChan)
  219. }
  220. // start a goroutine for every listener to accept user connection
  221. for _, listener := range p.listeners {
  222. go func(l Listener) {
  223. for {
  224. // block
  225. // if listener is closed, err returned
  226. c, err := l.Accept()
  227. if err != nil {
  228. log.Info("ProxyName [%s], listener is closed", p.Name)
  229. return
  230. }
  231. log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr())
  232. if p.Status != consts.Working {
  233. log.Debug("ProxyName [%s] is not working, new user conn close", p.Name)
  234. c.Close()
  235. return
  236. }
  237. go func(userConn *conn.Conn) {
  238. workConn, err := p.getWorkConn()
  239. if err != nil {
  240. return
  241. }
  242. // message will be transferred to another without modifying
  243. // l means local, r means remote
  244. log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(),
  245. userConn.GetLocalAddr(), userConn.GetRemoteAddr())
  246. needRecord := true
  247. go msg.JoinMore(userConn, workConn, p.BaseConf, needRecord)
  248. }(c)
  249. }
  250. }(listener)
  251. }
  252. }
  253. return nil
  254. }
  255. func (p *ProxyServer) Close() {
  256. p.Release()
  257. // if the proxy created by PrivilegeMode, delete it when closed
  258. if p.PrivilegeMode {
  259. // NOTE: this will take the global ProxyServerMap's lock
  260. // if we only want to release resources, use Release() instead
  261. DeleteProxy(p.Name)
  262. }
  263. }
  264. func (p *ProxyServer) Release() {
  265. p.Lock()
  266. defer p.Unlock()
  267. if p.Status != consts.Closed {
  268. p.Status = consts.Closed
  269. for _, l := range p.listeners {
  270. if l != nil {
  271. l.Close()
  272. }
  273. }
  274. close(p.ctlMsgChan)
  275. close(p.workConnChan)
  276. close(p.udpSenderChan)
  277. close(p.closeChan)
  278. if p.CtlConn != nil {
  279. p.CtlConn.Close()
  280. }
  281. if p.WorkConnUdp != nil {
  282. p.WorkConnUdp.Close()
  283. }
  284. if p.udpConn != nil {
  285. p.udpConn.Close()
  286. p.udpConn = nil
  287. }
  288. }
  289. metric.SetStatus(p.Name, p.Status)
  290. }
  291. func (p *ProxyServer) WaitUserConn() (closeFlag bool) {
  292. closeFlag = false
  293. _, ok := <-p.ctlMsgChan
  294. if !ok {
  295. closeFlag = true
  296. }
  297. return
  298. }
  299. func (p *ProxyServer) RegisterNewWorkConn(c *conn.Conn) {
  300. select {
  301. case p.workConnChan <- c:
  302. default:
  303. log.Debug("ProxyName [%s], workConnChan is full, so close this work connection", p.Name)
  304. c.Close()
  305. }
  306. }
  307. // create a tcp connection for forwarding udp packages
  308. func (p *ProxyServer) RegisterNewWorkConnUdp(c *conn.Conn) {
  309. if p.WorkConnUdp != nil && !p.WorkConnUdp.IsClosed() {
  310. p.WorkConnUdp.Close()
  311. }
  312. p.WorkConnUdp = c
  313. // read
  314. go func() {
  315. var (
  316. buf string
  317. err error
  318. )
  319. for {
  320. buf, err = c.ReadLine()
  321. if err != nil {
  322. log.Warn("ProxyName [%s], work connection for udp closed", p.Name)
  323. return
  324. }
  325. udpPacket := &msg.UdpPacket{}
  326. err = udpPacket.UnPack([]byte(buf))
  327. if err != nil {
  328. log.Warn("ProxyName [%s], unpack udp packet error: %v", p.Name, err)
  329. continue
  330. }
  331. // send to user
  332. _, err = p.udpConn.WriteToUDP(udpPacket.Content, udpPacket.Dst)
  333. if err != nil {
  334. continue
  335. }
  336. }
  337. }()
  338. // write
  339. go func() {
  340. for {
  341. udpPacket, ok := <-p.udpSenderChan
  342. if !ok {
  343. return
  344. }
  345. err := c.WriteString(string(udpPacket.Pack()) + "\n")
  346. if err != nil {
  347. log.Debug("ProxyName [%s], write to work connection for udp error: %v", p.Name, err)
  348. return
  349. }
  350. }
  351. }()
  352. }
  353. // When frps get one user connection, we get one work connection from the pool and return it.
  354. // If no workConn available in the pool, send message to frpc to get one or more
  355. // and wait until it is available.
  356. // return an error if wait timeout
  357. func (p *ProxyServer) getWorkConn() (workConn *conn.Conn, err error) {
  358. var ok bool
  359. // get a work connection from the pool
  360. for {
  361. select {
  362. case workConn, ok = <-p.workConnChan:
  363. if !ok {
  364. err = fmt.Errorf("ProxyName [%s], no work connections available, control is closing", p.Name)
  365. return
  366. }
  367. log.Debug("ProxyName [%s], get work connection from pool", p.Name)
  368. default:
  369. // no work connections available in the poll, send message to frpc to get more
  370. p.ctlMsgChan <- 1
  371. select {
  372. case workConn, ok = <-p.workConnChan:
  373. if !ok {
  374. err = fmt.Errorf("ProxyName [%s], no work connections available, control is closing", p.Name)
  375. return
  376. }
  377. case <-time.After(time.Duration(UserConnTimeout) * time.Second):
  378. log.Warn("ProxyName [%s], timeout trying to get work connection", p.Name)
  379. err = fmt.Errorf("ProxyName [%s], timeout trying to get work connection", p.Name)
  380. return
  381. }
  382. }
  383. // if connection pool is not used, we don't check the status
  384. // function CheckClosed will consume at least 1 millisecond if the connection isn't closed
  385. if p.PoolCount == 0 || !workConn.CheckClosed() {
  386. break
  387. } else {
  388. log.Warn("ProxyName [%s], connection got from pool, but it's already closed", p.Name)
  389. }
  390. }
  391. return
  392. }
  393. func (p *ProxyServer) connectionPoolManager(closeCh <-chan struct{}) {
  394. for {
  395. // check if we need more work connections and send messages to frpc to get more
  396. time.Sleep(time.Duration(2) * time.Second)
  397. select {
  398. // if the channel closed, it means the proxy is closed, so just return
  399. case <-closeCh:
  400. log.Info("ProxyName [%s], connectionPoolManager exit", p.Name)
  401. return
  402. default:
  403. curWorkConnNum := int64(len(p.workConnChan))
  404. diff := p.PoolCount - curWorkConnNum
  405. if diff > 0 {
  406. if diff < p.PoolCount/5 {
  407. diff = p.PoolCount*4/5 + 1
  408. } else if diff < p.PoolCount/2 {
  409. diff = p.PoolCount/4 + 1
  410. } else if diff < p.PoolCount*4/5 {
  411. diff = p.PoolCount/5 + 1
  412. } else {
  413. diff = p.PoolCount/10 + 1
  414. }
  415. if diff+curWorkConnNum > p.PoolCount {
  416. diff = p.PoolCount - curWorkConnNum
  417. }
  418. for i := 0; i < int(diff); i++ {
  419. p.ctlMsgChan <- 1
  420. }
  421. }
  422. }
  423. }
  424. }