proxy_wrapper.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package proxy
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/fatedier/frp/client/event"
  8. "github.com/fatedier/frp/client/health"
  9. "github.com/fatedier/frp/models/config"
  10. "github.com/fatedier/frp/models/msg"
  11. "github.com/fatedier/frp/utils/log"
  12. frpNet "github.com/fatedier/frp/utils/net"
  13. "github.com/fatedier/golib/errors"
  14. )
  15. const (
  16. ProxyStatusNew = "new"
  17. ProxyStatusWaitStart = "wait start"
  18. ProxyStatusStartErr = "start error"
  19. ProxyStatusRunning = "running"
  20. ProxyStatusCheckFailed = "check failed"
  21. ProxyStatusClosed = "closed"
  22. )
  23. var (
  24. statusCheckInterval time.Duration = 3 * time.Second
  25. waitResponseTimeout = 20 * time.Second
  26. startErrTimeout = 30 * time.Second
  27. )
  28. type ProxyStatus struct {
  29. Name string `json:"name"`
  30. Type string `json:"type"`
  31. Status string `json:"status"`
  32. Err string `json:"err"`
  33. Cfg config.ProxyConf `json:"cfg"`
  34. // Got from server.
  35. RemoteAddr string `json:"remote_addr"`
  36. }
  37. type ProxyWrapper struct {
  38. ProxyStatus
  39. // underlying proxy
  40. pxy Proxy
  41. // if ProxyConf has healcheck config
  42. // monitor will watch if it is alive
  43. monitor *health.HealthCheckMonitor
  44. // event handler
  45. handler event.EventHandler
  46. health uint32
  47. lastSendStartMsg time.Time
  48. lastStartErr time.Time
  49. closeCh chan struct{}
  50. healthNotifyCh chan struct{}
  51. mu sync.RWMutex
  52. log.Logger
  53. }
  54. func NewProxyWrapper(cfg config.ProxyConf, eventHandler event.EventHandler, logPrefix string) *ProxyWrapper {
  55. baseInfo := cfg.GetBaseInfo()
  56. pw := &ProxyWrapper{
  57. ProxyStatus: ProxyStatus{
  58. Name: baseInfo.ProxyName,
  59. Type: baseInfo.ProxyType,
  60. Status: ProxyStatusNew,
  61. Cfg: cfg,
  62. },
  63. closeCh: make(chan struct{}),
  64. healthNotifyCh: make(chan struct{}),
  65. handler: eventHandler,
  66. Logger: log.NewPrefixLogger(logPrefix),
  67. }
  68. pw.AddLogPrefix(pw.Name)
  69. if baseInfo.HealthCheckType != "" {
  70. pw.health = 1 // means failed
  71. pw.monitor = health.NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS,
  72. baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr,
  73. baseInfo.HealthCheckUrl, pw.statusNormalCallback, pw.statusFailedCallback)
  74. pw.monitor.SetLogger(pw.Logger)
  75. pw.Trace("enable health check monitor")
  76. }
  77. pw.pxy = NewProxy(pw.Cfg)
  78. return pw
  79. }
  80. func (pw *ProxyWrapper) SetRunningStatus(remoteAddr string, respErr string) error {
  81. pw.mu.Lock()
  82. defer pw.mu.Unlock()
  83. if pw.Status != ProxyStatusWaitStart {
  84. return fmt.Errorf("status not wait start, ignore start message")
  85. }
  86. pw.RemoteAddr = remoteAddr
  87. if respErr != "" {
  88. pw.Status = ProxyStatusStartErr
  89. pw.Err = respErr
  90. pw.lastStartErr = time.Now()
  91. return fmt.Errorf(pw.Err)
  92. }
  93. if err := pw.pxy.Run(); err != nil {
  94. pw.Status = ProxyStatusStartErr
  95. pw.Err = err.Error()
  96. pw.lastStartErr = time.Now()
  97. return err
  98. }
  99. pw.Status = ProxyStatusRunning
  100. pw.Err = ""
  101. return nil
  102. }
  103. func (pw *ProxyWrapper) Start() {
  104. go pw.checkWorker()
  105. if pw.monitor != nil {
  106. go pw.monitor.Start()
  107. }
  108. }
  109. func (pw *ProxyWrapper) Stop() {
  110. pw.mu.Lock()
  111. defer pw.mu.Unlock()
  112. close(pw.closeCh)
  113. close(pw.healthNotifyCh)
  114. pw.pxy.Close()
  115. if pw.monitor != nil {
  116. pw.monitor.Stop()
  117. }
  118. pw.Status = ProxyStatusClosed
  119. pw.handler(event.EvCloseProxy, &event.CloseProxyPayload{
  120. CloseProxyMsg: &msg.CloseProxy{
  121. ProxyName: pw.Name,
  122. },
  123. })
  124. }
  125. func (pw *ProxyWrapper) checkWorker() {
  126. if pw.monitor != nil {
  127. // let monitor do check request first
  128. time.Sleep(500 * time.Millisecond)
  129. }
  130. for {
  131. // check proxy status
  132. now := time.Now()
  133. if atomic.LoadUint32(&pw.health) == 0 {
  134. pw.mu.Lock()
  135. if pw.Status == ProxyStatusNew ||
  136. pw.Status == ProxyStatusCheckFailed ||
  137. (pw.Status == ProxyStatusWaitStart && now.After(pw.lastSendStartMsg.Add(waitResponseTimeout))) ||
  138. (pw.Status == ProxyStatusStartErr && now.After(pw.lastStartErr.Add(startErrTimeout))) {
  139. pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart)
  140. pw.Status = ProxyStatusWaitStart
  141. var newProxyMsg msg.NewProxy
  142. pw.Cfg.MarshalToMsg(&newProxyMsg)
  143. pw.lastSendStartMsg = now
  144. pw.handler(event.EvStartProxy, &event.StartProxyPayload{
  145. NewProxyMsg: &newProxyMsg,
  146. })
  147. }
  148. pw.mu.Unlock()
  149. } else {
  150. pw.mu.Lock()
  151. if pw.Status == ProxyStatusRunning || pw.Status == ProxyStatusWaitStart {
  152. pw.handler(event.EvCloseProxy, &event.CloseProxyPayload{
  153. CloseProxyMsg: &msg.CloseProxy{
  154. ProxyName: pw.Name,
  155. },
  156. })
  157. pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed)
  158. pw.Status = ProxyStatusCheckFailed
  159. }
  160. pw.mu.Unlock()
  161. }
  162. select {
  163. case <-pw.closeCh:
  164. return
  165. case <-time.After(statusCheckInterval):
  166. case <-pw.healthNotifyCh:
  167. }
  168. }
  169. }
  170. func (pw *ProxyWrapper) statusNormalCallback() {
  171. atomic.StoreUint32(&pw.health, 0)
  172. errors.PanicToError(func() {
  173. select {
  174. case pw.healthNotifyCh <- struct{}{}:
  175. default:
  176. }
  177. })
  178. pw.Info("health check success")
  179. }
  180. func (pw *ProxyWrapper) statusFailedCallback() {
  181. atomic.StoreUint32(&pw.health, 1)
  182. errors.PanicToError(func() {
  183. select {
  184. case pw.healthNotifyCh <- struct{}{}:
  185. default:
  186. }
  187. })
  188. pw.Info("health check failed")
  189. }
  190. func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
  191. pw.mu.RLock()
  192. pxy := pw.pxy
  193. pw.mu.RUnlock()
  194. if pxy != nil {
  195. workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  196. go pxy.InWorkConn(workConn)
  197. } else {
  198. workConn.Close()
  199. }
  200. }
  201. func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
  202. pw.mu.RLock()
  203. defer pw.mu.RUnlock()
  204. ps := &ProxyStatus{
  205. Name: pw.Name,
  206. Type: pw.Type,
  207. Status: pw.Status,
  208. Err: pw.Err,
  209. Cfg: pw.Cfg,
  210. RemoteAddr: pw.RemoteAddr,
  211. }
  212. return ps
  213. }