proxy_manager.go 6.3 KB


  1. package client
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/fatedier/frp/models/config"
  6. "github.com/fatedier/frp/models/msg"
  7. "github.com/fatedier/frp/utils/log"
  8. frpNet "github.com/fatedier/frp/utils/net"
  9. "github.com/fatedier/golib/errors"
  10. )
  11. const (
  12. ProxyStatusNew = "new"
  13. ProxyStatusStartErr = "start error"
  14. ProxyStatusWaitStart = "wait start"
  15. ProxyStatusRunning = "running"
  16. ProxyStatusCheckFailed = "check failed"
  17. ProxyStatusClosed = "closed"
  18. )
  19. type ProxyManager struct {
  20. sendCh chan (msg.Message)
  21. proxies map[string]*ProxyWrapper
  22. closed bool
  23. mu sync.RWMutex
  24. log.Logger
  25. }
  26. func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
  27. return &ProxyManager{
  28. proxies: make(map[string]*ProxyWrapper),
  29. sendCh: msgSendCh,
  30. closed: false,
  31. Logger: log.NewPrefixLogger(logPrefix),
  32. }
  33. }
  34. func (pm *ProxyManager) Reset(msgSendCh chan (msg.Message), logPrefix string) {
  35. pm.mu.Lock()
  36. defer pm.mu.Unlock()
  37. pm.closed = false
  38. pm.sendCh = msgSendCh
  39. pm.ClearLogPrefix()
  40. pm.AddLogPrefix(logPrefix)
  41. }
  42. // Must hold the lock before calling this function.
  43. func (pm *ProxyManager) sendMsg(m msg.Message) error {
  44. err := errors.PanicToError(func() {
  45. pm.sendCh <- m
  46. })
  47. if err != nil {
  48. pm.closed = true
  49. }
  50. return err
  51. }
  52. func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
  53. pm.mu.Lock()
  54. defer pm.mu.Unlock()
  55. if pm.closed {
  56. return fmt.Errorf("ProxyManager is closed now")
  57. }
  58. pxy, ok := pm.proxies[name]
  59. if !ok {
  60. return fmt.Errorf("no proxy found")
  61. }
  62. if err := pxy.Start(remoteAddr, serverRespErr); err != nil {
  63. errRet := err
  64. err = pm.sendMsg(&msg.CloseProxy{
  65. ProxyName: name,
  66. })
  67. if err != nil {
  68. errRet = fmt.Errorf("send CloseProxy message error")
  69. }
  70. return errRet
  71. }
  72. return nil
  73. }
  74. func (pm *ProxyManager) CloseProxies() {
  75. pm.mu.RLock()
  76. defer pm.mu.RUnlock()
  77. for _, pxy := range pm.proxies {
  78. pxy.Close()
  79. }
  80. }
  81. // pxyStatus: check and start proxies in which status
  82. func (pm *ProxyManager) CheckAndStartProxy(pxyStatus []string) {
  83. pm.mu.RLock()
  84. defer pm.mu.RUnlock()
  85. if pm.closed {
  86. pm.Warn("CheckAndStartProxy error: ProxyManager is closed now")
  87. return
  88. }
  89. for _, pxy := range pm.proxies {
  90. status := pxy.GetStatusStr()
  91. for _, s := range pxyStatus {
  92. if status == s {
  93. var newProxyMsg msg.NewProxy
  94. pxy.Cfg.MarshalToMsg(&newProxyMsg)
  95. err := pm.sendMsg(&newProxyMsg)
  96. if err != nil {
  97. pm.Warn("[%s] proxy send NewProxy message error")
  98. return
  99. }
  100. pxy.WaitStart()
  101. break
  102. }
  103. }
  104. }
  105. }
  106. func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, startNow bool) error {
  107. pm.mu.Lock()
  108. defer func() {
  109. pm.mu.Unlock()
  110. if startNow {
  111. go pm.CheckAndStartProxy([]string{ProxyStatusNew})
  112. }
  113. }()
  114. if pm.closed {
  115. err := fmt.Errorf("Reload error: ProxyManager is closed now")
  116. pm.Warn(err.Error())
  117. return err
  118. }
  119. delPxyNames := make([]string, 0)
  120. for name, pxy := range pm.proxies {
  121. del := false
  122. cfg, ok := pxyCfgs[name]
  123. if !ok {
  124. del = true
  125. } else {
  126. if !pxy.Cfg.Compare(cfg) {
  127. del = true
  128. }
  129. }
  130. if del {
  131. delPxyNames = append(delPxyNames, name)
  132. delete(pm.proxies, name)
  133. pxy.Close()
  134. err := pm.sendMsg(&msg.CloseProxy{
  135. ProxyName: name,
  136. })
  137. if err != nil {
  138. err = fmt.Errorf("Reload error: ProxyManager is closed now")
  139. pm.Warn(err.Error())
  140. return err
  141. }
  142. }
  143. }
  144. pm.Info("proxy removed: %v", delPxyNames)
  145. addPxyNames := make([]string, 0)
  146. for name, cfg := range pxyCfgs {
  147. if _, ok := pm.proxies[name]; !ok {
  148. pxy := NewProxyWrapper(cfg)
  149. pm.proxies[name] = pxy
  150. addPxyNames = append(addPxyNames, name)
  151. }
  152. }
  153. pm.Info("proxy added: %v", addPxyNames)
  154. return nil
  155. }
  156. func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
  157. pm.mu.RLock()
  158. pw, ok := pm.proxies[name]
  159. pm.mu.RUnlock()
  160. if ok {
  161. pw.InWorkConn(workConn)
  162. } else {
  163. workConn.Close()
  164. }
  165. }
  166. func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
  167. ps := make([]*ProxyStatus, 0)
  168. pm.mu.RLock()
  169. defer pm.mu.RUnlock()
  170. for _, pxy := range pm.proxies {
  171. ps = append(ps, pxy.GetStatus())
  172. }
  173. return ps
  174. }
  175. type ProxyStatus struct {
  176. Name string `json:"name"`
  177. Type string `json:"type"`
  178. Status string `json:"status"`
  179. Err string `json:"err"`
  180. Cfg config.ProxyConf `json:"cfg"`
  181. // Got from server.
  182. RemoteAddr string `json:"remote_addr"`
  183. }
  184. // ProxyWrapper is a wrapper of Proxy interface only used in ProxyManager
  185. // Add additional proxy status info
  186. type ProxyWrapper struct {
  187. Name string
  188. Type string
  189. Status string
  190. Err string
  191. Cfg config.ProxyConf
  192. RemoteAddr string
  193. pxy Proxy
  194. mu sync.RWMutex
  195. }
  196. func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper {
  197. return &ProxyWrapper{
  198. Name: cfg.GetBaseInfo().ProxyName,
  199. Type: cfg.GetBaseInfo().ProxyType,
  200. Status: ProxyStatusNew,
  201. Cfg: cfg,
  202. pxy: nil,
  203. }
  204. }
  205. func (pw *ProxyWrapper) GetStatusStr() string {
  206. pw.mu.RLock()
  207. defer pw.mu.RUnlock()
  208. return pw.Status
  209. }
  210. func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
  211. pw.mu.RLock()
  212. defer pw.mu.RUnlock()
  213. ps := &ProxyStatus{
  214. Name: pw.Name,
  215. Type: pw.Type,
  216. Status: pw.Status,
  217. Err: pw.Err,
  218. Cfg: pw.Cfg,
  219. RemoteAddr: pw.RemoteAddr,
  220. }
  221. return ps
  222. }
  223. func (pw *ProxyWrapper) WaitStart() {
  224. pw.mu.Lock()
  225. defer pw.mu.Unlock()
  226. pw.Status = ProxyStatusWaitStart
  227. }
  228. func (pw *ProxyWrapper) Start(remoteAddr string, serverRespErr string) error {
  229. if pw.pxy != nil {
  230. pw.pxy.Close()
  231. pw.pxy = nil
  232. }
  233. if serverRespErr != "" {
  234. pw.mu.Lock()
  235. pw.Status = ProxyStatusStartErr
  236. pw.RemoteAddr = remoteAddr
  237. pw.Err = serverRespErr
  238. pw.mu.Unlock()
  239. return fmt.Errorf(serverRespErr)
  240. }
  241. pxy := NewProxy(pw.Cfg)
  242. pw.mu.Lock()
  243. defer pw.mu.Unlock()
  244. pw.RemoteAddr = remoteAddr
  245. if err := pxy.Run(); err != nil {
  246. pw.Status = ProxyStatusStartErr
  247. pw.Err = err.Error()
  248. return err
  249. }
  250. pw.Status = ProxyStatusRunning
  251. pw.Err = ""
  252. pw.pxy = pxy
  253. return nil
  254. }
  255. func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
  256. pw.mu.RLock()
  257. pxy := pw.pxy
  258. pw.mu.RUnlock()
  259. if pxy != nil {
  260. workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  261. go pxy.InWorkConn(workConn)
  262. } else {
  263. workConn.Close()
  264. }
  265. }
  266. func (pw *ProxyWrapper) Close() {
  267. pw.mu.Lock()
  268. defer pw.mu.Unlock()
  269. if pw.pxy != nil {
  270. pw.pxy.Close()
  271. pw.pxy = nil
  272. }
  273. pw.Status = ProxyStatusClosed
  274. }