proxy_manager.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. package client
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/fatedier/frp/models/config"
  6. "github.com/fatedier/frp/models/msg"
  7. "github.com/fatedier/frp/utils/errors"
  8. "github.com/fatedier/frp/utils/log"
  9. frpNet "github.com/fatedier/frp/utils/net"
  10. )
  11. const (
  12. ProxyStatusNew = "new"
  13. ProxyStatusStartErr = "start error"
  14. ProxyStatusRunning = "running"
  15. ProxyStatusClosed = "closed"
  16. )
  17. type ProxyManager struct {
  18. ctl *Control
  19. proxies map[string]*ProxyWrapper
  20. visitorCfgs map[string]config.ProxyConf
  21. visitors map[string]Visitor
  22. sendCh chan (msg.Message)
  23. closed bool
  24. mu sync.RWMutex
  25. log.Logger
  26. }
  27. type ProxyWrapper struct {
  28. Name string
  29. Type string
  30. Status string
  31. Err string
  32. Cfg config.ProxyConf
  33. RemoteAddr string
  34. pxy Proxy
  35. mu sync.RWMutex
  36. }
  37. type ProxyStatus struct {
  38. Name string `json:"name"`
  39. Type string `json:"type"`
  40. Status string `json:"status"`
  41. Err string `json:"err"`
  42. Cfg config.ProxyConf `json:"cfg"`
  43. // Got from server.
  44. RemoteAddr string `json:"remote_addr"`
  45. }
  46. func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper {
  47. return &ProxyWrapper{
  48. Name: cfg.GetName(),
  49. Type: cfg.GetType(),
  50. Status: ProxyStatusNew,
  51. Cfg: cfg,
  52. pxy: nil,
  53. }
  54. }
  55. func (pw *ProxyWrapper) GetStatusStr() string {
  56. pw.mu.RLock()
  57. defer pw.mu.RUnlock()
  58. return pw.Status
  59. }
  60. func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
  61. pw.mu.RLock()
  62. defer pw.mu.RUnlock()
  63. ps := &ProxyStatus{
  64. Name: pw.Name,
  65. Type: pw.Type,
  66. Status: pw.Status,
  67. Err: pw.Err,
  68. Cfg: pw.Cfg,
  69. RemoteAddr: pw.RemoteAddr,
  70. }
  71. return ps
  72. }
  73. func (pw *ProxyWrapper) Start(remoteAddr string, serverRespErr string) error {
  74. if pw.pxy != nil {
  75. pw.pxy.Close()
  76. pw.pxy = nil
  77. }
  78. if serverRespErr != "" {
  79. pw.mu.Lock()
  80. pw.Status = ProxyStatusStartErr
  81. pw.RemoteAddr = remoteAddr
  82. pw.Err = serverRespErr
  83. pw.mu.Unlock()
  84. return fmt.Errorf(serverRespErr)
  85. }
  86. pxy := NewProxy(pw.Cfg)
  87. pw.mu.Lock()
  88. defer pw.mu.Unlock()
  89. pw.RemoteAddr = remoteAddr
  90. if err := pxy.Run(); err != nil {
  91. pw.Status = ProxyStatusStartErr
  92. pw.Err = err.Error()
  93. return err
  94. }
  95. pw.Status = ProxyStatusRunning
  96. pw.Err = ""
  97. pw.pxy = pxy
  98. return nil
  99. }
  100. func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
  101. pw.mu.RLock()
  102. pxy := pw.pxy
  103. pw.mu.RUnlock()
  104. if pxy != nil {
  105. workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  106. go pxy.InWorkConn(workConn)
  107. } else {
  108. workConn.Close()
  109. }
  110. }
  111. func (pw *ProxyWrapper) Close() {
  112. pw.mu.Lock()
  113. defer pw.mu.Unlock()
  114. if pw.pxy != nil {
  115. pw.pxy.Close()
  116. pw.pxy = nil
  117. }
  118. pw.Status = ProxyStatusClosed
  119. }
  120. func NewProxyManager(ctl *Control, msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
  121. return &ProxyManager{
  122. ctl: ctl,
  123. proxies: make(map[string]*ProxyWrapper),
  124. visitorCfgs: make(map[string]config.ProxyConf),
  125. visitors: make(map[string]Visitor),
  126. sendCh: msgSendCh,
  127. closed: false,
  128. Logger: log.NewPrefixLogger(logPrefix),
  129. }
  130. }
  131. func (pm *ProxyManager) Reset(msgSendCh chan (msg.Message), logPrefix string) {
  132. pm.mu.Lock()
  133. defer pm.mu.Unlock()
  134. pm.closed = false
  135. pm.sendCh = msgSendCh
  136. pm.ClearLogPrefix()
  137. pm.AddLogPrefix(logPrefix)
  138. }
  139. // Must hold the lock before calling this function.
  140. func (pm *ProxyManager) sendMsg(m msg.Message) error {
  141. err := errors.PanicToError(func() {
  142. pm.sendCh <- m
  143. })
  144. if err != nil {
  145. pm.closed = true
  146. }
  147. return err
  148. }
  149. func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
  150. pm.mu.Lock()
  151. defer pm.mu.Unlock()
  152. if pm.closed {
  153. return fmt.Errorf("ProxyManager is closed now")
  154. }
  155. pxy, ok := pm.proxies[name]
  156. if !ok {
  157. return fmt.Errorf("no proxy found")
  158. }
  159. if err := pxy.Start(remoteAddr, serverRespErr); err != nil {
  160. errRet := err
  161. err = pm.sendMsg(&msg.CloseProxy{
  162. ProxyName: name,
  163. })
  164. if err != nil {
  165. errRet = fmt.Errorf("send CloseProxy message error")
  166. }
  167. return errRet
  168. }
  169. return nil
  170. }
  171. func (pm *ProxyManager) CloseProxies() {
  172. pm.mu.RLock()
  173. defer pm.mu.RUnlock()
  174. for _, pxy := range pm.proxies {
  175. pxy.Close()
  176. }
  177. }
  178. // pxyStatus: check and start proxies in which status
  179. func (pm *ProxyManager) CheckAndStartProxy(pxyStatus []string) {
  180. pm.mu.RLock()
  181. defer pm.mu.RUnlock()
  182. if pm.closed {
  183. pm.Warn("CheckAndStartProxy error: ProxyManager is closed now")
  184. return
  185. }
  186. for _, pxy := range pm.proxies {
  187. status := pxy.GetStatusStr()
  188. for _, s := range pxyStatus {
  189. if status == s {
  190. var newProxyMsg msg.NewProxy
  191. pxy.Cfg.UnMarshalToMsg(&newProxyMsg)
  192. err := pm.sendMsg(&newProxyMsg)
  193. if err != nil {
  194. pm.Warn("[%s] proxy send NewProxy message error")
  195. return
  196. }
  197. break
  198. }
  199. }
  200. }
  201. for _, cfg := range pm.visitorCfgs {
  202. if _, exist := pm.visitors[cfg.GetName()]; !exist {
  203. pm.Info("try to start visitor [%s]", cfg.GetName())
  204. visitor := NewVisitor(pm.ctl, cfg)
  205. err := visitor.Run()
  206. if err != nil {
  207. visitor.Warn("start error: %v", err)
  208. continue
  209. }
  210. pm.visitors[cfg.GetName()] = visitor
  211. visitor.Info("start visitor success")
  212. }
  213. }
  214. }
  215. func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.ProxyConf, startNow bool) error {
  216. pm.mu.Lock()
  217. defer func() {
  218. pm.mu.Unlock()
  219. if startNow {
  220. go pm.CheckAndStartProxy([]string{ProxyStatusNew})
  221. }
  222. }()
  223. if pm.closed {
  224. err := fmt.Errorf("Reload error: ProxyManager is closed now")
  225. pm.Warn(err.Error())
  226. return err
  227. }
  228. delPxyNames := make([]string, 0)
  229. for name, pxy := range pm.proxies {
  230. del := false
  231. cfg, ok := pxyCfgs[name]
  232. if !ok {
  233. del = true
  234. } else {
  235. if !pxy.Cfg.Compare(cfg) {
  236. del = true
  237. }
  238. }
  239. if del {
  240. delPxyNames = append(delPxyNames, name)
  241. delete(pm.proxies, name)
  242. pxy.Close()
  243. err := pm.sendMsg(&msg.CloseProxy{
  244. ProxyName: name,
  245. })
  246. if err != nil {
  247. err = fmt.Errorf("Reload error: ProxyManager is closed now")
  248. pm.Warn(err.Error())
  249. return err
  250. }
  251. }
  252. }
  253. pm.Info("proxy removed: %v", delPxyNames)
  254. addPxyNames := make([]string, 0)
  255. for name, cfg := range pxyCfgs {
  256. if _, ok := pm.proxies[name]; !ok {
  257. pxy := NewProxyWrapper(cfg)
  258. pm.proxies[name] = pxy
  259. addPxyNames = append(addPxyNames, name)
  260. }
  261. }
  262. pm.Info("proxy added: %v", addPxyNames)
  263. delVisitorName := make([]string, 0)
  264. for name, oldVisitorCfg := range pm.visitorCfgs {
  265. del := false
  266. cfg, ok := visitorCfgs[name]
  267. if !ok {
  268. del = true
  269. } else {
  270. if !oldVisitorCfg.Compare(cfg) {
  271. del = true
  272. }
  273. }
  274. if del {
  275. delVisitorName = append(delVisitorName, name)
  276. delete(pm.visitorCfgs, name)
  277. if visitor, ok := pm.visitors[name]; ok {
  278. visitor.Close()
  279. }
  280. delete(pm.visitors, name)
  281. }
  282. }
  283. pm.Info("visitor removed: %v", delVisitorName)
  284. addVisitorName := make([]string, 0)
  285. for name, visitorCfg := range visitorCfgs {
  286. if _, ok := pm.visitorCfgs[name]; !ok {
  287. pm.visitorCfgs[name] = visitorCfg
  288. addVisitorName = append(addVisitorName, name)
  289. }
  290. }
  291. pm.Info("visitor added: %v", addVisitorName)
  292. return nil
  293. }
  294. func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
  295. pm.mu.RLock()
  296. pw, ok := pm.proxies[name]
  297. pm.mu.RUnlock()
  298. if ok {
  299. pw.InWorkConn(workConn)
  300. } else {
  301. workConn.Close()
  302. }
  303. }
  304. func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
  305. ps := make([]*ProxyStatus, 0)
  306. pm.mu.RLock()
  307. defer pm.mu.RUnlock()
  308. for _, pxy := range pm.proxies {
  309. ps = append(ps, pxy.GetStatus())
  310. }
  311. return ps
  312. }