server.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package models
  2. import (
  3. "container/list"
  4. "sync"
  5. "github.com/fatedier/frp/pkg/utils/conn"
  6. "github.com/fatedier/frp/pkg/utils/log"
  7. )
  8. const (
  9. Idle = iota
  10. Working
  11. )
  12. type ProxyServer struct {
  13. Name string
  14. Passwd string
  15. BindAddr string
  16. ListenPort int64
  17. Status int64
  18. Listener *conn.Listener // accept new connection from remote users
  19. CtlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel
  20. StopBlockChan chan int64 // put any number to the channel, if you want to stop wait user conn
  21. CliConnChan chan *conn.Conn // get client conns from control goroutine
  22. UserConnList *list.List // store user conns
  23. Mutex sync.Mutex
  24. }
  25. func (p *ProxyServer) Init() {
  26. p.Status = Idle
  27. p.CtlMsgChan = make(chan int64)
  28. p.StopBlockChan = make(chan int64)
  29. p.CliConnChan = make(chan *conn.Conn)
  30. p.UserConnList = list.New()
  31. }
  32. func (p *ProxyServer) Lock() {
  33. p.Mutex.Lock()
  34. }
  35. func (p *ProxyServer) Unlock() {
  36. p.Mutex.Unlock()
  37. }
  38. // start listening for user conns
  39. func (p *ProxyServer) Start() (err error) {
  40. p.Listener, err = conn.Listen(p.BindAddr, p.ListenPort)
  41. if err != nil {
  42. return err
  43. }
  44. p.Status = Working
  45. // start a goroutine for listener
  46. go func() {
  47. for {
  48. // block
  49. c := p.Listener.GetConn()
  50. log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr())
  51. // put to list
  52. p.Lock()
  53. if p.Status != Working {
  54. log.Debug("ProxyName [%s] is not working, new user conn close", p.Name)
  55. c.Close()
  56. p.Unlock()
  57. return
  58. }
  59. p.UserConnList.PushBack(c)
  60. p.Unlock()
  61. // put msg to control conn
  62. p.CtlMsgChan <- 1
  63. }
  64. }()
  65. // start another goroutine for join two conns from client and user
  66. go func() {
  67. for {
  68. cliConn := <-p.CliConnChan
  69. p.Lock()
  70. element := p.UserConnList.Front()
  71. var userConn *conn.Conn
  72. if element != nil {
  73. userConn = element.Value.(*conn.Conn)
  74. p.UserConnList.Remove(element)
  75. } else {
  76. cliConn.Close()
  77. p.Unlock()
  78. continue
  79. }
  80. p.Unlock()
  81. // msg will transfer to another without modifying
  82. // l means local, r means remote
  83. log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", cliConn.GetLocalAddr(), cliConn.GetRemoteAddr(),
  84. userConn.GetLocalAddr(), userConn.GetRemoteAddr())
  85. go conn.Join(cliConn, userConn)
  86. }
  87. }()
  88. return nil
  89. }
  90. func (p *ProxyServer) Close() {
  91. p.Lock()
  92. p.Status = Idle
  93. p.CtlMsgChan = make(chan int64)
  94. p.CliConnChan = make(chan *conn.Conn)
  95. p.UserConnList = list.New()
  96. p.Unlock()
  97. }
  98. func (p *ProxyServer) WaitUserConn() (res int64, isStop bool) {
  99. select {
  100. case res = <-p.CtlMsgChan:
  101. return res, false
  102. case <-p.StopBlockChan:
  103. return 0, true
  104. }
  105. }
  106. func (p *ProxyServer) StopWaitUserConn() {
  107. p.StopBlockChan <- 1
  108. }