proxy_manager.go 6.9 KB


  1. package client
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/fatedier/frp/models/config"
  6. "github.com/fatedier/frp/models/msg"
  7. "github.com/fatedier/frp/utils/errors"
  8. "github.com/fatedier/frp/utils/log"
  9. frpNet "github.com/fatedier/frp/utils/net"
  10. )
  11. const (
  12. ProxyStatusNew = "new"
  13. ProxyStatusStartErr = "start error"
  14. ProxyStatusRunning = "running"
  15. ProxyStatusClosed = "closed"
  16. )
  17. type ProxyManager struct {
  18. ctl *Control
  19. proxies map[string]*ProxyWrapper
  20. visitorCfgs map[string]config.ProxyConf
  21. visitors map[string]Visitor
  22. sendCh chan (msg.Message)
  23. closed bool
  24. mu sync.RWMutex
  25. log.Logger
  26. }
  27. type ProxyWrapper struct {
  28. Name string
  29. Type string
  30. Status string
  31. Err string
  32. Cfg config.ProxyConf
  33. pxy Proxy
  34. mu sync.RWMutex
  35. }
  36. type ProxyStatus struct {
  37. Name string `json:"name"`
  38. Type string `json:"type"`
  39. Status string `json:"status"`
  40. Err string `json:"err"`
  41. Cfg config.ProxyConf `json:"cfg"`
  42. }
  43. func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper {
  44. return &ProxyWrapper{
  45. Name: cfg.GetName(),
  46. Type: cfg.GetType(),
  47. Status: ProxyStatusNew,
  48. Cfg: cfg,
  49. pxy: nil,
  50. }
  51. }
  52. func (pw *ProxyWrapper) IsRunning() bool {
  53. pw.mu.RLock()
  54. defer pw.mu.RUnlock()
  55. if pw.Status == ProxyStatusRunning {
  56. return true
  57. } else {
  58. return false
  59. }
  60. }
  61. func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
  62. pw.mu.RLock()
  63. defer pw.mu.RUnlock()
  64. ps := &ProxyStatus{
  65. Name: pw.Name,
  66. Type: pw.Type,
  67. Status: pw.Status,
  68. Err: pw.Err,
  69. Cfg: pw.Cfg,
  70. }
  71. return ps
  72. }
  73. func (pw *ProxyWrapper) Start(serverRespErr string) error {
  74. if pw.pxy != nil {
  75. pw.pxy.Close()
  76. pw.pxy = nil
  77. }
  78. if serverRespErr != "" {
  79. pw.mu.Lock()
  80. pw.Status = ProxyStatusStartErr
  81. pw.Err = serverRespErr
  82. pw.mu.Unlock()
  83. return fmt.Errorf(serverRespErr)
  84. }
  85. pxy := NewProxy(pw.Cfg)
  86. pw.mu.Lock()
  87. defer pw.mu.Unlock()
  88. if err := pxy.Run(); err != nil {
  89. pw.Status = ProxyStatusStartErr
  90. pw.Err = err.Error()
  91. return err
  92. }
  93. pw.Status = ProxyStatusRunning
  94. pw.Err = ""
  95. pw.pxy = pxy
  96. return nil
  97. }
  98. func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
  99. pw.mu.RLock()
  100. pxy := pw.pxy
  101. pw.mu.RUnlock()
  102. if pxy != nil {
  103. workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  104. go pxy.InWorkConn(workConn)
  105. } else {
  106. workConn.Close()
  107. }
  108. }
  109. func (pw *ProxyWrapper) Close() {
  110. pw.mu.Lock()
  111. defer pw.mu.Unlock()
  112. if pw.pxy != nil {
  113. pw.pxy.Close()
  114. pw.pxy = nil
  115. }
  116. pw.Status = ProxyStatusClosed
  117. }
  118. func NewProxyManager(ctl *Control, msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
  119. return &ProxyManager{
  120. proxies: make(map[string]*ProxyWrapper),
  121. visitorCfgs: make(map[string]config.ProxyConf),
  122. visitors: make(map[string]Visitor),
  123. sendCh: msgSendCh,
  124. closed: false,
  125. Logger: log.NewPrefixLogger(logPrefix),
  126. }
  127. }
  128. func (pm *ProxyManager) Reset(msgSendCh chan (msg.Message), logPrefix string) {
  129. pm.mu.Lock()
  130. defer pm.mu.Unlock()
  131. pm.closed = false
  132. pm.sendCh = msgSendCh
  133. pm.ClearLogPrefix()
  134. pm.AddLogPrefix(logPrefix)
  135. }
  136. // Must hold the lock before calling this function.
  137. func (pm *ProxyManager) sendMsg(m msg.Message) error {
  138. err := errors.PanicToError(func() {
  139. pm.sendCh <- m
  140. })
  141. if err != nil {
  142. pm.closed = true
  143. }
  144. return err
  145. }
  146. func (pm *ProxyManager) StartProxy(name string, serverRespErr string) error {
  147. pm.mu.Lock()
  148. defer pm.mu.Unlock()
  149. if pm.closed {
  150. return fmt.Errorf("ProxyManager is closed now")
  151. }
  152. pxy, ok := pm.proxies[name]
  153. if !ok {
  154. return fmt.Errorf("no proxy found")
  155. }
  156. if err := pxy.Start(serverRespErr); err != nil {
  157. errRet := err
  158. err = pm.sendMsg(&msg.CloseProxy{
  159. ProxyName: name,
  160. })
  161. if err != nil {
  162. errRet = fmt.Errorf("send CloseProxy message error")
  163. }
  164. return errRet
  165. }
  166. return nil
  167. }
  168. func (pm *ProxyManager) CloseProxies() {
  169. pm.mu.RLock()
  170. defer pm.mu.RUnlock()
  171. for _, pxy := range pm.proxies {
  172. pxy.Close()
  173. }
  174. }
  175. func (pm *ProxyManager) CheckAndStartProxy() {
  176. pm.mu.RLock()
  177. defer pm.mu.RUnlock()
  178. if pm.closed {
  179. pm.Warn("CheckAndStartProxy error: ProxyManager is closed now")
  180. return
  181. }
  182. for _, pxy := range pm.proxies {
  183. if !pxy.IsRunning() {
  184. var newProxyMsg msg.NewProxy
  185. pxy.Cfg.UnMarshalToMsg(&newProxyMsg)
  186. err := pm.sendMsg(&newProxyMsg)
  187. if err != nil {
  188. pm.Warn("[%s] proxy send NewProxy message error")
  189. return
  190. }
  191. }
  192. }
  193. for _, cfg := range pm.visitorCfgs {
  194. if _, exist := pm.visitors[cfg.GetName()]; !exist {
  195. pm.Info("try to start visitor [%s]", cfg.GetName())
  196. visitor := NewVisitor(pm.ctl, cfg)
  197. err := visitor.Run()
  198. if err != nil {
  199. visitor.Warn("start error: %v", err)
  200. continue
  201. }
  202. pm.visitors[cfg.GetName()] = visitor
  203. visitor.Info("start visitor success")
  204. }
  205. }
  206. }
  207. func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf) error {
  208. pm.mu.Lock()
  209. defer pm.mu.Unlock()
  210. if pm.closed {
  211. err := fmt.Errorf("Reload error: ProxyManager is closed now")
  212. pm.Warn(err.Error())
  213. return err
  214. }
  215. delPxyNames := make([]string, 0)
  216. for name, pxy := range pm.proxies {
  217. del := false
  218. cfg, ok := pxyCfgs[name]
  219. if !ok {
  220. del = true
  221. } else {
  222. if !pxy.Cfg.Compare(cfg) {
  223. del = true
  224. }
  225. }
  226. if del {
  227. delPxyNames = append(delPxyNames, name)
  228. delete(pm.proxies, name)
  229. pxy.Close()
  230. err := pm.sendMsg(&msg.CloseProxy{
  231. ProxyName: name,
  232. })
  233. if err != nil {
  234. err = fmt.Errorf("Reload error: ProxyManager is closed now")
  235. pm.Warn(err.Error())
  236. return err
  237. }
  238. }
  239. }
  240. pm.Info("proxy removed: %v", delPxyNames)
  241. addPxyNames := make([]string, 0)
  242. for name, cfg := range pxyCfgs {
  243. if _, ok := pm.proxies[name]; !ok {
  244. pxy := NewProxyWrapper(cfg)
  245. pm.proxies[name] = pxy
  246. addPxyNames = append(addPxyNames, name)
  247. }
  248. }
  249. pm.Info("proxy added: %v", addPxyNames)
  250. delVisitorName := make([]string, 0)
  251. for name, oldVisitorCfg := range pm.visitorCfgs {
  252. del := false
  253. cfg, ok := visitorCfgs[name]
  254. if !ok {
  255. del = true
  256. } else {
  257. if !oldVisitorCfg.Compare(cfg) {
  258. del = true
  259. }
  260. }
  261. if del {
  262. delVisitorName = append(delVisitorName, name)
  263. delete(pm.visitorCfgs, name)
  264. if visitor, ok := pm.visitors[name]; ok {
  265. visitor.Close()
  266. }
  267. delete(pm.visitors, name)
  268. }
  269. }
  270. pm.Info("visitor removed: %v", delVisitorName)
  271. addVisitorName := make([]string, 0)
  272. for name, visitorCfg := range visitorCfgs {
  273. if _, ok := pm.visitorCfgs[name]; !ok {
  274. pm.visitorCfgs[name] = visitorCfg
  275. addVisitorName = append(addVisitorName, name)
  276. }
  277. }
  278. pm.Info("visitor added: %v", addVisitorName)
  279. return nil
  280. }
  281. func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
  282. pm.mu.RLock()
  283. pw, ok := pm.proxies[name]
  284. pm.mu.RUnlock()
  285. if ok {
  286. pw.InWorkConn(workConn)
  287. } else {
  288. workConn.Close()
  289. }
  290. }
  291. func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
  292. ps := make([]*ProxyStatus, 0)
  293. pm.mu.RLock()
  294. defer pm.mu.RUnlock()
  295. for _, pxy := range pm.proxies {
  296. ps = append(ps, pxy.GetStatus())
  297. }
  298. return ps
  299. }