server.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package server
  2. import (
  3. "container/list"
  4. "sync"
  5. "time"
  6. "github.com/fatedier/frp/models/consts"
  7. "github.com/fatedier/frp/utils/conn"
  8. "github.com/fatedier/frp/utils/log"
  9. )
  10. type ProxyServer struct {
  11. Name string
  12. Passwd string
  13. BindAddr string
  14. ListenPort int64
  15. Status int64
  16. listener *conn.Listener // accept new connection from remote users
  17. ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
  18. cliConnChan chan *conn.Conn // get client conns from control goroutine
  19. userConnList *list.List // store user conns
  20. mutex sync.Mutex
  21. }
  22. func (p *ProxyServer) Init() {
  23. p.Status = consts.Idle
  24. p.cliConnChan = make(chan *conn.Conn)
  25. p.ctlMsgChan = make(chan int64)
  26. p.userConnList = list.New()
  27. }
  28. func (p *ProxyServer) Lock() {
  29. p.mutex.Lock()
  30. }
  31. func (p *ProxyServer) Unlock() {
  32. p.mutex.Unlock()
  33. }
  34. // start listening for user conns
  35. func (p *ProxyServer) Start() (err error) {
  36. p.Init()
  37. p.listener, err = conn.Listen(p.BindAddr, p.ListenPort)
  38. if err != nil {
  39. return err
  40. }
  41. p.Status = consts.Working
  42. // start a goroutine for listener to accept user connection
  43. go func() {
  44. for {
  45. // block
  46. // if listener is closed, err returned
  47. c, err := p.listener.GetConn()
  48. if err != nil {
  49. log.Info("ProxyName [%s], listener is closed", p.Name)
  50. return
  51. }
  52. log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr())
  53. // insert into list
  54. p.Lock()
  55. if p.Status != consts.Working {
  56. log.Debug("ProxyName [%s] is not working, new user conn close", p.Name)
  57. c.Close()
  58. p.Unlock()
  59. return
  60. }
  61. p.userConnList.PushBack(c)
  62. p.Unlock()
  63. // put msg to control conn
  64. p.ctlMsgChan <- 1
  65. // set timeout
  66. time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() {
  67. p.Lock()
  68. defer p.Unlock()
  69. element := p.userConnList.Front()
  70. if element == nil {
  71. return
  72. }
  73. userConn := element.Value.(*conn.Conn)
  74. if userConn == c {
  75. log.Warn("ProxyName [%s], user conn [%s] timeout", p.Name, c.GetRemoteAddr())
  76. }
  77. })
  78. }
  79. }()
  80. // start another goroutine for join two conns from client and user
  81. go func() {
  82. for {
  83. cliConn, ok := <-p.cliConnChan
  84. if !ok {
  85. return
  86. }
  87. p.Lock()
  88. element := p.userConnList.Front()
  89. var userConn *conn.Conn
  90. if element != nil {
  91. userConn = element.Value.(*conn.Conn)
  92. p.userConnList.Remove(element)
  93. } else {
  94. cliConn.Close()
  95. p.Unlock()
  96. continue
  97. }
  98. p.Unlock()
  99. // msg will transfer to another without modifying
  100. // l means local, r means remote
  101. log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", cliConn.GetLocalAddr(), cliConn.GetRemoteAddr(),
  102. userConn.GetLocalAddr(), userConn.GetRemoteAddr())
  103. go conn.Join(cliConn, userConn)
  104. }
  105. }()
  106. return nil
  107. }
  108. func (p *ProxyServer) Close() {
  109. p.Lock()
  110. p.Status = consts.Idle
  111. p.listener.Close()
  112. close(p.ctlMsgChan)
  113. close(p.cliConnChan)
  114. p.userConnList = list.New()
  115. p.Unlock()
  116. }
  117. func (p *ProxyServer) WaitUserConn() (closeFlag bool) {
  118. closeFlag = false
  119. _, ok := <-p.ctlMsgChan
  120. if !ok {
  121. closeFlag = true
  122. }
  123. return
  124. }
  125. func (p *ProxyServer) GetNewCliConn(c *conn.Conn) {
  126. p.cliConnChan <- c
  127. }