proxy_manager.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package proxy
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/fatedier/frp/client/event"
  6. "github.com/fatedier/frp/models/config"
  7. "github.com/fatedier/frp/models/msg"
  8. "github.com/fatedier/frp/utils/log"
  9. frpNet "github.com/fatedier/frp/utils/net"
  10. "github.com/fatedier/golib/errors"
  11. )
  12. type ProxyManager struct {
  13. sendCh chan (msg.Message)
  14. proxies map[string]*ProxyWrapper
  15. closed bool
  16. mu sync.RWMutex
  17. logPrefix string
  18. log.Logger
  19. }
  20. func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
  21. return &ProxyManager{
  22. proxies: make(map[string]*ProxyWrapper),
  23. sendCh: msgSendCh,
  24. closed: false,
  25. logPrefix: logPrefix,
  26. Logger: log.NewPrefixLogger(logPrefix),
  27. }
  28. }
  29. func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
  30. pm.mu.RLock()
  31. pxy, ok := pm.proxies[name]
  32. pm.mu.RUnlock()
  33. if !ok {
  34. return fmt.Errorf("proxy [%s] not found", name)
  35. }
  36. err := pxy.SetRunningStatus(remoteAddr, serverRespErr)
  37. if err != nil {
  38. return err
  39. }
  40. return nil
  41. }
  42. func (pm *ProxyManager) Close() {
  43. pm.mu.Lock()
  44. defer pm.mu.Unlock()
  45. for _, pxy := range pm.proxies {
  46. pxy.Stop()
  47. }
  48. pm.proxies = make(map[string]*ProxyWrapper)
  49. }
  50. func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
  51. pm.mu.RLock()
  52. pw, ok := pm.proxies[name]
  53. pm.mu.RUnlock()
  54. if ok {
  55. pw.InWorkConn(workConn)
  56. } else {
  57. workConn.Close()
  58. }
  59. }
  60. func (pm *ProxyManager) HandleEvent(evType event.EventType, payload interface{}) error {
  61. var m msg.Message
  62. switch e := payload.(type) {
  63. case *event.StartProxyPayload:
  64. m = e.NewProxyMsg
  65. case *event.CloseProxyPayload:
  66. m = e.CloseProxyMsg
  67. default:
  68. return event.ErrPayloadType
  69. }
  70. err := errors.PanicToError(func() {
  71. pm.sendCh <- m
  72. })
  73. return err
  74. }
  75. func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
  76. ps := make([]*ProxyStatus, 0)
  77. pm.mu.RLock()
  78. defer pm.mu.RUnlock()
  79. for _, pxy := range pm.proxies {
  80. ps = append(ps, pxy.GetStatus())
  81. }
  82. return ps
  83. }
  84. func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) {
  85. pm.mu.Lock()
  86. defer pm.mu.Unlock()
  87. delPxyNames := make([]string, 0)
  88. for name, pxy := range pm.proxies {
  89. del := false
  90. cfg, ok := pxyCfgs[name]
  91. if !ok {
  92. del = true
  93. } else {
  94. if !pxy.Cfg.Compare(cfg) {
  95. del = true
  96. }
  97. }
  98. if del {
  99. delPxyNames = append(delPxyNames, name)
  100. delete(pm.proxies, name)
  101. pxy.Stop()
  102. }
  103. }
  104. if len(delPxyNames) > 0 {
  105. pm.Info("proxy removed: %v", delPxyNames)
  106. }
  107. addPxyNames := make([]string, 0)
  108. for name, cfg := range pxyCfgs {
  109. if _, ok := pm.proxies[name]; !ok {
  110. pxy := NewProxyWrapper(cfg, pm.HandleEvent, pm.logPrefix)
  111. pm.proxies[name] = pxy
  112. addPxyNames = append(addPxyNames, name)
  113. pxy.Start()
  114. }
  115. }
  116. if len(addPxyNames) > 0 {
  117. pm.Info("proxy added: %v", addPxyNames)
  118. }
  119. }