proxy_manager.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package client
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/fatedier/frp/models/config"
  6. "github.com/fatedier/frp/models/msg"
  7. "github.com/fatedier/frp/utils/log"
  8. frpNet "github.com/fatedier/frp/utils/net"
  9. "github.com/fatedier/golib/errors"
  10. )
  11. type ProxyManager struct {
  12. sendCh chan (msg.Message)
  13. proxies map[string]*ProxyWrapper
  14. closed bool
  15. mu sync.RWMutex
  16. logPrefix string
  17. log.Logger
  18. }
  19. func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
  20. return &ProxyManager{
  21. proxies: make(map[string]*ProxyWrapper),
  22. sendCh: msgSendCh,
  23. closed: false,
  24. logPrefix: logPrefix,
  25. Logger: log.NewPrefixLogger(logPrefix),
  26. }
  27. }
  28. func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
  29. pm.mu.RLock()
  30. pxy, ok := pm.proxies[name]
  31. pm.mu.RUnlock()
  32. if !ok {
  33. return fmt.Errorf("proxy [%s] not found", name)
  34. }
  35. err := pxy.SetRunningStatus(remoteAddr, serverRespErr)
  36. if err != nil {
  37. return err
  38. }
  39. return nil
  40. }
  41. func (pm *ProxyManager) Close() {
  42. pm.mu.RLock()
  43. defer pm.mu.RUnlock()
  44. for _, pxy := range pm.proxies {
  45. pxy.Stop()
  46. }
  47. }
  48. func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
  49. pm.mu.RLock()
  50. pw, ok := pm.proxies[name]
  51. pm.mu.RUnlock()
  52. if ok {
  53. pw.InWorkConn(workConn)
  54. } else {
  55. workConn.Close()
  56. }
  57. }
  58. func (pm *ProxyManager) HandleEvent(evType EventType, payload interface{}) error {
  59. var m msg.Message
  60. switch event := payload.(type) {
  61. case *StartProxyPayload:
  62. m = event.NewProxyMsg
  63. case *CloseProxyPayload:
  64. m = event.CloseProxyMsg
  65. default:
  66. return ErrPayloadType
  67. }
  68. err := errors.PanicToError(func() {
  69. pm.sendCh <- m
  70. })
  71. return err
  72. }
  73. func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
  74. ps := make([]*ProxyStatus, 0)
  75. pm.mu.RLock()
  76. defer pm.mu.RUnlock()
  77. for _, pxy := range pm.proxies {
  78. ps = append(ps, pxy.GetStatus())
  79. }
  80. return ps
  81. }
  82. func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) {
  83. pm.mu.Lock()
  84. defer pm.mu.Unlock()
  85. delPxyNames := make([]string, 0)
  86. for name, pxy := range pm.proxies {
  87. del := false
  88. cfg, ok := pxyCfgs[name]
  89. if !ok {
  90. del = true
  91. } else {
  92. if !pxy.Cfg.Compare(cfg) {
  93. del = true
  94. }
  95. }
  96. if del {
  97. delPxyNames = append(delPxyNames, name)
  98. delete(pm.proxies, name)
  99. pxy.Stop()
  100. }
  101. }
  102. if len(delPxyNames) > 0 {
  103. pm.Info("proxy removed: %v", delPxyNames)
  104. }
  105. addPxyNames := make([]string, 0)
  106. for name, cfg := range pxyCfgs {
  107. if _, ok := pm.proxies[name]; !ok {
  108. pxy := NewProxyWrapper(cfg, pm.HandleEvent, pm.logPrefix)
  109. pm.proxies[name] = pxy
  110. addPxyNames = append(addPxyNames, name)
  111. pxy.Start()
  112. }
  113. }
  114. if len(addPxyNames) > 0 {
  115. pm.Info("proxy added: %v", addPxyNames)
  116. }
  117. }