proxy_manager.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package client
  2. import (
  3. "fmt"
  4. "sync"
  5. "github.com/fatedier/frp/models/config"
  6. "github.com/fatedier/frp/models/msg"
  7. "github.com/fatedier/frp/utils/log"
  8. frpNet "github.com/fatedier/frp/utils/net"
  9. "github.com/fatedier/golib/errors"
  10. )
  11. const (
  12. ProxyStatusNew = "new"
  13. ProxyStatusStartErr = "start error"
  14. ProxyStatusWaitStart = "wait start"
  15. ProxyStatusRunning = "running"
  16. ProxyStatusCheckFailed = "check failed"
  17. ProxyStatusClosed = "closed"
  18. )
  19. type ProxyManager struct {
  20. ctl *Control
  21. sendCh chan (msg.Message)
  22. proxies map[string]*ProxyWrapper
  23. closed bool
  24. mu sync.RWMutex
  25. log.Logger
  26. }
  27. type ProxyWrapper struct {
  28. Name string
  29. Type string
  30. Status string
  31. Err string
  32. Cfg config.ProxyConf
  33. RemoteAddr string
  34. pxy Proxy
  35. mu sync.RWMutex
  36. }
  37. type ProxyStatus struct {
  38. Name string `json:"name"`
  39. Type string `json:"type"`
  40. Status string `json:"status"`
  41. Err string `json:"err"`
  42. Cfg config.ProxyConf `json:"cfg"`
  43. // Got from server.
  44. RemoteAddr string `json:"remote_addr"`
  45. }
  46. func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper {
  47. return &ProxyWrapper{
  48. Name: cfg.GetBaseInfo().ProxyName,
  49. Type: cfg.GetBaseInfo().ProxyType,
  50. Status: ProxyStatusNew,
  51. Cfg: cfg,
  52. pxy: nil,
  53. }
  54. }
  55. func (pw *ProxyWrapper) GetStatusStr() string {
  56. pw.mu.RLock()
  57. defer pw.mu.RUnlock()
  58. return pw.Status
  59. }
  60. func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
  61. pw.mu.RLock()
  62. defer pw.mu.RUnlock()
  63. ps := &ProxyStatus{
  64. Name: pw.Name,
  65. Type: pw.Type,
  66. Status: pw.Status,
  67. Err: pw.Err,
  68. Cfg: pw.Cfg,
  69. RemoteAddr: pw.RemoteAddr,
  70. }
  71. return ps
  72. }
  73. func (pw *ProxyWrapper) WaitStart() {
  74. pw.mu.Lock()
  75. defer pw.mu.Unlock()
  76. pw.Status = ProxyStatusWaitStart
  77. }
  78. func (pw *ProxyWrapper) Start(remoteAddr string, serverRespErr string) error {
  79. if pw.pxy != nil {
  80. pw.pxy.Close()
  81. pw.pxy = nil
  82. }
  83. if serverRespErr != "" {
  84. pw.mu.Lock()
  85. pw.Status = ProxyStatusStartErr
  86. pw.RemoteAddr = remoteAddr
  87. pw.Err = serverRespErr
  88. pw.mu.Unlock()
  89. return fmt.Errorf(serverRespErr)
  90. }
  91. pxy := NewProxy(pw.Cfg)
  92. pw.mu.Lock()
  93. defer pw.mu.Unlock()
  94. pw.RemoteAddr = remoteAddr
  95. if err := pxy.Run(); err != nil {
  96. pw.Status = ProxyStatusStartErr
  97. pw.Err = err.Error()
  98. return err
  99. }
  100. pw.Status = ProxyStatusRunning
  101. pw.Err = ""
  102. pw.pxy = pxy
  103. return nil
  104. }
  105. func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
  106. pw.mu.RLock()
  107. pxy := pw.pxy
  108. pw.mu.RUnlock()
  109. if pxy != nil {
  110. workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  111. go pxy.InWorkConn(workConn)
  112. } else {
  113. workConn.Close()
  114. }
  115. }
  116. func (pw *ProxyWrapper) Close() {
  117. pw.mu.Lock()
  118. defer pw.mu.Unlock()
  119. if pw.pxy != nil {
  120. pw.pxy.Close()
  121. pw.pxy = nil
  122. }
  123. pw.Status = ProxyStatusClosed
  124. }
  125. func NewProxyManager(ctl *Control, msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
  126. return &ProxyManager{
  127. ctl: ctl,
  128. proxies: make(map[string]*ProxyWrapper),
  129. sendCh: msgSendCh,
  130. closed: false,
  131. Logger: log.NewPrefixLogger(logPrefix),
  132. }
  133. }
  134. func (pm *ProxyManager) Reset(msgSendCh chan (msg.Message), logPrefix string) {
  135. pm.mu.Lock()
  136. defer pm.mu.Unlock()
  137. pm.closed = false
  138. pm.sendCh = msgSendCh
  139. pm.ClearLogPrefix()
  140. pm.AddLogPrefix(logPrefix)
  141. }
  142. // Must hold the lock before calling this function.
  143. func (pm *ProxyManager) sendMsg(m msg.Message) error {
  144. err := errors.PanicToError(func() {
  145. pm.sendCh <- m
  146. })
  147. if err != nil {
  148. pm.closed = true
  149. }
  150. return err
  151. }
  152. func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
  153. pm.mu.Lock()
  154. defer pm.mu.Unlock()
  155. if pm.closed {
  156. return fmt.Errorf("ProxyManager is closed now")
  157. }
  158. pxy, ok := pm.proxies[name]
  159. if !ok {
  160. return fmt.Errorf("no proxy found")
  161. }
  162. if err := pxy.Start(remoteAddr, serverRespErr); err != nil {
  163. errRet := err
  164. err = pm.sendMsg(&msg.CloseProxy{
  165. ProxyName: name,
  166. })
  167. if err != nil {
  168. errRet = fmt.Errorf("send CloseProxy message error")
  169. }
  170. return errRet
  171. }
  172. return nil
  173. }
  174. func (pm *ProxyManager) CloseProxies() {
  175. pm.mu.RLock()
  176. defer pm.mu.RUnlock()
  177. for _, pxy := range pm.proxies {
  178. pxy.Close()
  179. }
  180. }
  181. // pxyStatus: check and start proxies in which status
  182. func (pm *ProxyManager) CheckAndStartProxy(pxyStatus []string) {
  183. pm.mu.RLock()
  184. defer pm.mu.RUnlock()
  185. if pm.closed {
  186. pm.Warn("CheckAndStartProxy error: ProxyManager is closed now")
  187. return
  188. }
  189. for _, pxy := range pm.proxies {
  190. status := pxy.GetStatusStr()
  191. for _, s := range pxyStatus {
  192. if status == s {
  193. var newProxyMsg msg.NewProxy
  194. pxy.Cfg.MarshalToMsg(&newProxyMsg)
  195. err := pm.sendMsg(&newProxyMsg)
  196. if err != nil {
  197. pm.Warn("[%s] proxy send NewProxy message error")
  198. return
  199. }
  200. pxy.WaitStart()
  201. break
  202. }
  203. }
  204. }
  205. }
  206. func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, startNow bool) error {
  207. pm.mu.Lock()
  208. defer func() {
  209. pm.mu.Unlock()
  210. if startNow {
  211. go pm.CheckAndStartProxy([]string{ProxyStatusNew})
  212. }
  213. }()
  214. if pm.closed {
  215. err := fmt.Errorf("Reload error: ProxyManager is closed now")
  216. pm.Warn(err.Error())
  217. return err
  218. }
  219. delPxyNames := make([]string, 0)
  220. for name, pxy := range pm.proxies {
  221. del := false
  222. cfg, ok := pxyCfgs[name]
  223. if !ok {
  224. del = true
  225. } else {
  226. if !pxy.Cfg.Compare(cfg) {
  227. del = true
  228. }
  229. }
  230. if del {
  231. delPxyNames = append(delPxyNames, name)
  232. delete(pm.proxies, name)
  233. pxy.Close()
  234. err := pm.sendMsg(&msg.CloseProxy{
  235. ProxyName: name,
  236. })
  237. if err != nil {
  238. err = fmt.Errorf("Reload error: ProxyManager is closed now")
  239. pm.Warn(err.Error())
  240. return err
  241. }
  242. }
  243. }
  244. pm.Info("proxy removed: %v", delPxyNames)
  245. addPxyNames := make([]string, 0)
  246. for name, cfg := range pxyCfgs {
  247. if _, ok := pm.proxies[name]; !ok {
  248. pxy := NewProxyWrapper(cfg)
  249. pm.proxies[name] = pxy
  250. addPxyNames = append(addPxyNames, name)
  251. }
  252. }
  253. pm.Info("proxy added: %v", addPxyNames)
  254. return nil
  255. }
  256. func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
  257. pm.mu.RLock()
  258. pw, ok := pm.proxies[name]
  259. pm.mu.RUnlock()
  260. if ok {
  261. pw.InWorkConn(workConn)
  262. } else {
  263. workConn.Close()
  264. }
  265. }
  266. func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
  267. ps := make([]*ProxyStatus, 0)
  268. pm.mu.RLock()
  269. defer pm.mu.RUnlock()
  270. for _, pxy := range pm.proxies {
  271. ps = append(ps, pxy.GetStatus())
  272. }
  273. return ps
  274. }