proxy_manager.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. package proxy
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/fatedier/frp/client/event"
  6. "github.com/fatedier/frp/models/config"
  7. "github.com/fatedier/frp/models/msg"
  8. "github.com/fatedier/frp/utils/log"
  9. frpNet "github.com/fatedier/frp/utils/net"
  10. "github.com/fatedier/golib/errors"
  11. )
  12. type ProxyManager struct {
  13. sendCh chan (msg.Message)
  14. proxies map[string]*ProxyWrapper
  15. closed bool
  16. mu sync.RWMutex
  17. logPrefix string
  18. log.Logger
  19. }
  20. func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
  21. return &ProxyManager{
  22. proxies: make(map[string]*ProxyWrapper),
  23. sendCh: msgSendCh,
  24. closed: false,
  25. logPrefix: logPrefix,
  26. Logger: log.NewPrefixLogger(logPrefix),
  27. }
  28. }
  29. func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
  30. pm.mu.RLock()
  31. pxy, ok := pm.proxies[name]
  32. pm.mu.RUnlock()
  33. if !ok {
  34. return fmt.Errorf("proxy [%s] not found", name)
  35. }
  36. err := pxy.SetRunningStatus(remoteAddr, serverRespErr)
  37. if err != nil {
  38. return err
  39. }
  40. return nil
  41. }
  42. func (pm *ProxyManager) Close() {
  43. pm.mu.RLock()
  44. defer pm.mu.RUnlock()
  45. for _, pxy := range pm.proxies {
  46. pxy.Stop()
  47. }
  48. }
  49. func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
  50. pm.mu.RLock()
  51. pw, ok := pm.proxies[name]
  52. pm.mu.RUnlock()
  53. if ok {
  54. pw.InWorkConn(workConn)
  55. } else {
  56. workConn.Close()
  57. }
  58. }
  59. func (pm *ProxyManager) HandleEvent(evType event.EventType, payload interface{}) error {
  60. var m msg.Message
  61. switch e := payload.(type) {
  62. case *event.StartProxyPayload:
  63. m = e.NewProxyMsg
  64. case *event.CloseProxyPayload:
  65. m = e.CloseProxyMsg
  66. default:
  67. return event.ErrPayloadType
  68. }
  69. err := errors.PanicToError(func() {
  70. pm.sendCh <- m
  71. })
  72. return err
  73. }
  74. func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
  75. ps := make([]*ProxyStatus, 0)
  76. pm.mu.RLock()
  77. defer pm.mu.RUnlock()
  78. for _, pxy := range pm.proxies {
  79. ps = append(ps, pxy.GetStatus())
  80. }
  81. return ps
  82. }
  83. func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) {
  84. pm.mu.Lock()
  85. defer pm.mu.Unlock()
  86. delPxyNames := make([]string, 0)
  87. for name, pxy := range pm.proxies {
  88. del := false
  89. cfg, ok := pxyCfgs[name]
  90. if !ok {
  91. del = true
  92. } else {
  93. if !pxy.Cfg.Compare(cfg) {
  94. del = true
  95. }
  96. }
  97. if del {
  98. delPxyNames = append(delPxyNames, name)
  99. delete(pm.proxies, name)
  100. pxy.Stop()
  101. }
  102. }
  103. if len(delPxyNames) > 0 {
  104. pm.Info("proxy removed: %v", delPxyNames)
  105. }
  106. addPxyNames := make([]string, 0)
  107. for name, cfg := range pxyCfgs {
  108. if _, ok := pm.proxies[name]; !ok {
  109. pxy := NewProxyWrapper(cfg, pm.HandleEvent, pm.logPrefix)
  110. pm.proxies[name] = pxy
  111. addPxyNames = append(addPxyNames, name)
  112. pxy.Start()
  113. }
  114. }
  115. if len(addPxyNames) > 0 {
  116. pm.Info("proxy added: %v", addPxyNames)
  117. }
  118. }