proxy_manager.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package proxy
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/fatedier/frp/client/event"
  6. "github.com/fatedier/frp/models/config"
  7. "github.com/fatedier/frp/models/msg"
  8. "github.com/fatedier/frp/utils/log"
  9. frpNet "github.com/fatedier/frp/utils/net"
  10. "github.com/fatedier/golib/errors"
  11. )
  12. type ProxyManager struct {
  13. sendCh chan (msg.Message)
  14. proxies map[string]*ProxyWrapper
  15. closed bool
  16. mu sync.RWMutex
  17. clientCfg config.ClientCommonConf
  18. // The UDP port that the server is listening on
  19. serverUDPPort int
  20. logPrefix string
  21. log.Logger
  22. }
  23. func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string, clientCfg config.ClientCommonConf, serverUDPPort int) *ProxyManager {
  24. return &ProxyManager{
  25. proxies: make(map[string]*ProxyWrapper),
  26. sendCh: msgSendCh,
  27. closed: false,
  28. clientCfg: clientCfg,
  29. serverUDPPort: serverUDPPort,
  30. logPrefix: logPrefix,
  31. Logger: log.NewPrefixLogger(logPrefix),
  32. }
  33. }
  34. func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
  35. pm.mu.RLock()
  36. pxy, ok := pm.proxies[name]
  37. pm.mu.RUnlock()
  38. if !ok {
  39. return fmt.Errorf("proxy [%s] not found", name)
  40. }
  41. err := pxy.SetRunningStatus(remoteAddr, serverRespErr)
  42. if err != nil {
  43. return err
  44. }
  45. return nil
  46. }
  47. func (pm *ProxyManager) Close() {
  48. pm.mu.Lock()
  49. defer pm.mu.Unlock()
  50. for _, pxy := range pm.proxies {
  51. pxy.Stop()
  52. }
  53. pm.proxies = make(map[string]*ProxyWrapper)
  54. }
  55. func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn, m *msg.StartWorkConn) {
  56. pm.mu.RLock()
  57. pw, ok := pm.proxies[name]
  58. pm.mu.RUnlock()
  59. if ok {
  60. pw.InWorkConn(workConn, m)
  61. } else {
  62. workConn.Close()
  63. }
  64. }
  65. func (pm *ProxyManager) HandleEvent(evType event.EventType, payload interface{}) error {
  66. var m msg.Message
  67. switch e := payload.(type) {
  68. case *event.StartProxyPayload:
  69. m = e.NewProxyMsg
  70. case *event.CloseProxyPayload:
  71. m = e.CloseProxyMsg
  72. default:
  73. return event.ErrPayloadType
  74. }
  75. err := errors.PanicToError(func() {
  76. pm.sendCh <- m
  77. })
  78. return err
  79. }
  80. func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
  81. ps := make([]*ProxyStatus, 0)
  82. pm.mu.RLock()
  83. defer pm.mu.RUnlock()
  84. for _, pxy := range pm.proxies {
  85. ps = append(ps, pxy.GetStatus())
  86. }
  87. return ps
  88. }
  89. func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) {
  90. pm.mu.Lock()
  91. defer pm.mu.Unlock()
  92. delPxyNames := make([]string, 0)
  93. for name, pxy := range pm.proxies {
  94. del := false
  95. cfg, ok := pxyCfgs[name]
  96. if !ok {
  97. del = true
  98. } else {
  99. if !pxy.Cfg.Compare(cfg) {
  100. del = true
  101. }
  102. }
  103. if del {
  104. delPxyNames = append(delPxyNames, name)
  105. delete(pm.proxies, name)
  106. pxy.Stop()
  107. }
  108. }
  109. if len(delPxyNames) > 0 {
  110. pm.Info("proxy removed: %v", delPxyNames)
  111. }
  112. addPxyNames := make([]string, 0)
  113. for name, cfg := range pxyCfgs {
  114. if _, ok := pm.proxies[name]; !ok {
  115. pxy := NewProxyWrapper(cfg, pm.clientCfg, pm.HandleEvent, pm.logPrefix, pm.serverUDPPort)
  116. pm.proxies[name] = pxy
  117. addPxyNames = append(addPxyNames, name)
  118. pxy.Start()
  119. }
  120. }
  121. if len(addPxyNames) > 0 {
  122. pm.Info("proxy added: %v", addPxyNames)
  123. }
  124. }