server.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. // Copyright 2016 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package server
  15. import (
  16. "fmt"
  17. "net"
  18. "sync"
  19. "time"
  20. "github.com/fatedier/frp/src/models/config"
  21. "github.com/fatedier/frp/src/models/consts"
  22. "github.com/fatedier/frp/src/models/metric"
  23. "github.com/fatedier/frp/src/models/msg"
  24. "github.com/fatedier/frp/src/utils/conn"
  25. "github.com/fatedier/frp/src/utils/log"
  26. "github.com/fatedier/frp/src/utils/pool"
  27. )
  28. type Listener interface {
  29. Accept() (*conn.Conn, error)
  30. Close() error
  31. }
  32. type ProxyServer struct {
  33. config.BaseConf
  34. BindAddr string
  35. ListenPort int64
  36. CustomDomains []string
  37. Locations []string
  38. Status int64
  39. CtlConn *conn.Conn // control connection with frpc
  40. WorkConnUdp *conn.Conn // work connection for udp
  41. udpConn *net.UDPConn
  42. listeners []Listener // accept new connection from remote users
  43. ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
  44. workConnChan chan *conn.Conn // get new work conns from control goroutine
  45. udpSenderChan chan *msg.UdpPacket
  46. mutex sync.RWMutex
  47. closeChan chan struct{} // close this channel for notifying other goroutines that the proxy is closed
  48. }
  49. func NewProxyServer() (p *ProxyServer) {
  50. p = &ProxyServer{
  51. CustomDomains: make([]string, 0),
  52. Locations: make([]string, 0),
  53. }
  54. return p
  55. }
  56. func NewProxyServerFromCtlMsg(req *msg.ControlReq) (p *ProxyServer) {
  57. p = &ProxyServer{}
  58. p.Name = req.ProxyName
  59. p.Type = req.ProxyType
  60. p.UseEncryption = req.UseEncryption
  61. p.UseGzip = req.UseGzip
  62. p.PrivilegeMode = req.PrivilegeMode
  63. p.PrivilegeToken = PrivilegeToken
  64. p.BindAddr = BindAddr
  65. if p.Type == "tcp" || p.Type == "udp" {
  66. p.ListenPort = req.RemotePort
  67. } else if p.Type == "http" {
  68. p.ListenPort = VhostHttpPort
  69. } else if p.Type == "https" {
  70. p.ListenPort = VhostHttpsPort
  71. }
  72. p.CustomDomains = req.CustomDomains
  73. p.Locations = req.Locations
  74. p.HostHeaderRewrite = req.HostHeaderRewrite
  75. p.HttpUserName = req.HttpUserName
  76. p.HttpPassWord = req.HttpPassWord
  77. return
  78. }
  79. func (p *ProxyServer) Init() {
  80. p.Lock()
  81. p.Status = consts.Idle
  82. metric.SetStatus(p.Name, p.Status)
  83. p.workConnChan = make(chan *conn.Conn, p.PoolCount+10)
  84. p.ctlMsgChan = make(chan int64, p.PoolCount+10)
  85. p.udpSenderChan = make(chan *msg.UdpPacket, 1024)
  86. p.listeners = make([]Listener, 0)
  87. p.closeChan = make(chan struct{})
  88. p.Unlock()
  89. }
  90. func (p *ProxyServer) Compare(p2 *ProxyServer) bool {
  91. if p.Name != p2.Name || p.AuthToken != p2.AuthToken || p.Type != p2.Type ||
  92. p.BindAddr != p2.BindAddr || p.ListenPort != p2.ListenPort || p.HostHeaderRewrite != p2.HostHeaderRewrite {
  93. return false
  94. }
  95. if len(p.CustomDomains) != len(p2.CustomDomains) {
  96. return false
  97. }
  98. for i, _ := range p.CustomDomains {
  99. if p.CustomDomains[i] != p2.CustomDomains[i] {
  100. return false
  101. }
  102. }
  103. if len(p.Locations) != len(p2.Locations) {
  104. return false
  105. }
  106. for i, _ := range p.Locations {
  107. if p.Locations[i] != p2.Locations[i] {
  108. return false
  109. }
  110. }
  111. return true
  112. }
  113. func (p *ProxyServer) Lock() {
  114. p.mutex.Lock()
  115. }
  116. func (p *ProxyServer) Unlock() {
  117. p.mutex.Unlock()
  118. }
  119. // start listening for user conns
  120. func (p *ProxyServer) Start(c *conn.Conn) (err error) {
  121. p.CtlConn = c
  122. p.Init()
  123. if p.Type == "tcp" {
  124. l, err := conn.Listen(p.BindAddr, p.ListenPort)
  125. if err != nil {
  126. return err
  127. }
  128. p.listeners = append(p.listeners, l)
  129. } else if p.Type == "http" {
  130. for _, domain := range p.CustomDomains {
  131. if len(p.Locations) == 0 {
  132. l, err := VhostHttpMuxer.Listen(domain, "", p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  133. if err != nil {
  134. return err
  135. }
  136. p.listeners = append(p.listeners, l)
  137. } else {
  138. for _, location := range p.Locations {
  139. l, err := VhostHttpMuxer.Listen(domain, location, p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  140. if err != nil {
  141. return err
  142. }
  143. p.listeners = append(p.listeners, l)
  144. }
  145. }
  146. }
  147. if p.SubDomain != "" {
  148. if len(p.Locations) == 0 {
  149. l, err := VhostHttpMuxer.Listen(p.SubDomain, "", p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  150. if err != nil {
  151. return err
  152. }
  153. p.listeners = append(p.listeners, l)
  154. } else {
  155. for _, location := range p.Locations {
  156. l, err := VhostHttpMuxer.Listen(p.SubDomain, location, p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  157. if err != nil {
  158. return err
  159. }
  160. p.listeners = append(p.listeners, l)
  161. }
  162. }
  163. }
  164. } else if p.Type == "https" {
  165. for _, domain := range p.CustomDomains {
  166. l, err := VhostHttpsMuxer.Listen(domain, "", p.HostHeaderRewrite, p.HttpUserName, p.HttpPassWord)
  167. if err != nil {
  168. return err
  169. }
  170. p.listeners = append(p.listeners, l)
  171. }
  172. }
  173. p.Lock()
  174. p.Status = consts.Working
  175. p.Unlock()
  176. metric.SetStatus(p.Name, p.Status)
  177. if p.Type == "udp" {
  178. // udp is special
  179. p.udpConn, err = conn.ListenUDP(p.BindAddr, p.ListenPort)
  180. if err != nil {
  181. log.Warn("ProxyName [%s], listen udp port error: %v", p.Name, err)
  182. return err
  183. }
  184. go func() {
  185. for {
  186. buf := pool.GetBuf(2048)
  187. n, remoteAddr, err := p.udpConn.ReadFromUDP(buf)
  188. if err != nil {
  189. log.Info("ProxyName [%s], udp listener is closed", p.Name)
  190. return
  191. }
  192. localAddr, _ := net.ResolveUDPAddr("udp", p.udpConn.LocalAddr().String())
  193. udpPacket := msg.NewUdpPacket(buf[0:n], remoteAddr, localAddr)
  194. select {
  195. case p.udpSenderChan <- udpPacket:
  196. default:
  197. log.Warn("ProxyName [%s], udp sender channel is full", p.Name)
  198. }
  199. pool.PutBuf(buf)
  200. }
  201. }()
  202. } else {
  203. // create connection pool if needed
  204. if p.PoolCount > 0 {
  205. go p.connectionPoolManager(p.closeChan)
  206. }
  207. // start a goroutine for every listener to accept user connection
  208. for _, listener := range p.listeners {
  209. go func(l Listener) {
  210. for {
  211. // block
  212. // if listener is closed, err returned
  213. c, err := l.Accept()
  214. if err != nil {
  215. log.Info("ProxyName [%s], listener is closed", p.Name)
  216. return
  217. }
  218. log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr())
  219. if p.Status != consts.Working {
  220. log.Debug("ProxyName [%s] is not working, new user conn close", p.Name)
  221. c.Close()
  222. return
  223. }
  224. go func(userConn *conn.Conn) {
  225. workConn, err := p.getWorkConn()
  226. if err != nil {
  227. return
  228. }
  229. // message will be transferred to another without modifying
  230. // l means local, r means remote
  231. log.Debug("Join two connections, (l[%s] r[%s]) (l[%s] r[%s])", workConn.GetLocalAddr(), workConn.GetRemoteAddr(),
  232. userConn.GetLocalAddr(), userConn.GetRemoteAddr())
  233. needRecord := true
  234. go msg.JoinMore(userConn, workConn, p.BaseConf, needRecord)
  235. }(c)
  236. }
  237. }(listener)
  238. }
  239. }
  240. return nil
  241. }
  242. func (p *ProxyServer) Close() {
  243. p.Lock()
  244. if p.Status != consts.Closed {
  245. p.Status = consts.Closed
  246. for _, l := range p.listeners {
  247. if l != nil {
  248. l.Close()
  249. }
  250. }
  251. close(p.ctlMsgChan)
  252. close(p.workConnChan)
  253. close(p.udpSenderChan)
  254. close(p.closeChan)
  255. if p.CtlConn != nil {
  256. p.CtlConn.Close()
  257. }
  258. if p.WorkConnUdp != nil {
  259. p.WorkConnUdp.Close()
  260. }
  261. if p.udpConn != nil {
  262. p.udpConn.Close()
  263. p.udpConn = nil
  264. }
  265. }
  266. metric.SetStatus(p.Name, p.Status)
  267. // if the proxy created by PrivilegeMode, delete it when closed
  268. if p.PrivilegeMode {
  269. DeleteProxy(p.Name)
  270. }
  271. p.Unlock()
  272. }
  273. func (p *ProxyServer) WaitUserConn() (closeFlag bool) {
  274. closeFlag = false
  275. _, ok := <-p.ctlMsgChan
  276. if !ok {
  277. closeFlag = true
  278. }
  279. return
  280. }
  281. func (p *ProxyServer) RegisterNewWorkConn(c *conn.Conn) {
  282. select {
  283. case p.workConnChan <- c:
  284. default:
  285. log.Debug("ProxyName [%s], workConnChan is full, so close this work connection", p.Name)
  286. c.Close()
  287. }
  288. }
  289. // create a tcp connection for forwarding udp packages
  290. func (p *ProxyServer) RegisterNewWorkConnUdp(c *conn.Conn) {
  291. if p.WorkConnUdp != nil && !p.WorkConnUdp.IsClosed() {
  292. p.WorkConnUdp.Close()
  293. }
  294. p.WorkConnUdp = c
  295. // read
  296. go func() {
  297. var (
  298. buf string
  299. err error
  300. )
  301. for {
  302. buf, err = c.ReadLine()
  303. if err != nil {
  304. log.Warn("ProxyName [%s], work connection for udp closed", p.Name)
  305. return
  306. }
  307. udpPacket := &msg.UdpPacket{}
  308. err = udpPacket.UnPack([]byte(buf))
  309. if err != nil {
  310. log.Warn("ProxyName [%s], unpack udp packet error: %v", p.Name, err)
  311. continue
  312. }
  313. // send to user
  314. _, err = p.udpConn.WriteToUDP(udpPacket.Content, udpPacket.Dst)
  315. if err != nil {
  316. continue
  317. }
  318. }
  319. }()
  320. // write
  321. go func() {
  322. for {
  323. udpPacket, ok := <-p.udpSenderChan
  324. if !ok {
  325. return
  326. }
  327. err := c.WriteString(string(udpPacket.Pack()) + "\n")
  328. if err != nil {
  329. log.Debug("ProxyName [%s], write to work connection for udp error: %v", p.Name, err)
  330. return
  331. }
  332. }
  333. }()
  334. }
  335. // When frps get one user connection, we get one work connection from the pool and return it.
  336. // If no workConn available in the pool, send message to frpc to get one or more
  337. // and wait until it is available.
  338. // return an error if wait timeout
  339. func (p *ProxyServer) getWorkConn() (workConn *conn.Conn, err error) {
  340. var ok bool
  341. // get a work connection from the pool
  342. for {
  343. select {
  344. case workConn, ok = <-p.workConnChan:
  345. if !ok {
  346. err = fmt.Errorf("ProxyName [%s], no work connections available, control is closing", p.Name)
  347. return
  348. }
  349. default:
  350. // no work connections available in the poll, send message to frpc to get more
  351. p.ctlMsgChan <- 1
  352. select {
  353. case workConn, ok = <-p.workConnChan:
  354. if !ok {
  355. err = fmt.Errorf("ProxyName [%s], no work connections available, control is closing", p.Name)
  356. return
  357. }
  358. case <-time.After(time.Duration(UserConnTimeout) * time.Second):
  359. log.Warn("ProxyName [%s], timeout trying to get work connection", p.Name)
  360. err = fmt.Errorf("ProxyName [%s], timeout trying to get work connection", p.Name)
  361. return
  362. }
  363. }
  364. // if connection pool is not used, we don't check the status
  365. // function CheckClosed will consume at least 1 millisecond if the connection isn't closed
  366. if p.PoolCount == 0 || !workConn.CheckClosed() {
  367. break
  368. } else {
  369. log.Warn("ProxyName [%s], connection got from pool, but it's already closed", p.Name)
  370. }
  371. }
  372. return
  373. }
  374. func (p *ProxyServer) connectionPoolManager(closeCh <-chan struct{}) {
  375. for {
  376. // check if we need more work connections and send messages to frpc to get more
  377. time.Sleep(time.Duration(2) * time.Second)
  378. select {
  379. // if the channel closed, it means the proxy is closed, so just return
  380. case <-closeCh:
  381. log.Info("ProxyName [%s], connectionPoolManager exit", p.Name)
  382. return
  383. default:
  384. curWorkConnNum := int64(len(p.workConnChan))
  385. diff := p.PoolCount - curWorkConnNum
  386. if diff > 0 {
  387. if diff < p.PoolCount/5 {
  388. diff = p.PoolCount*4/5 + 1
  389. } else if diff < p.PoolCount/2 {
  390. diff = p.PoolCount/4 + 1
  391. } else if diff < p.PoolCount*4/5 {
  392. diff = p.PoolCount/5 + 1
  393. } else {
  394. diff = p.PoolCount/10 + 1
  395. }
  396. if diff+curWorkConnNum > p.PoolCount {
  397. diff = p.PoolCount - curWorkConnNum
  398. }
  399. for i := 0; i < int(diff); i++ {
  400. p.ctlMsgChan <- 1
  401. }
  402. }
  403. }
  404. }
  405. }