proxy_wrapper.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package client
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/fatedier/frp/models/config"
  8. "github.com/fatedier/frp/models/msg"
  9. "github.com/fatedier/frp/utils/log"
  10. frpNet "github.com/fatedier/frp/utils/net"
  11. "github.com/fatedier/golib/errors"
  12. )
  13. const (
  14. ProxyStatusNew = "new"
  15. ProxyStatusWaitStart = "wait start"
  16. ProxyStatusStartErr = "start error"
  17. ProxyStatusRunning = "running"
  18. ProxyStatusCheckFailed = "check failed"
  19. ProxyStatusClosed = "closed"
  20. )
  21. var (
  22. statusCheckInterval time.Duration = 3 * time.Second
  23. waitResponseTimeout = 20 * time.Second
  24. startErrTimeout = 30 * time.Second
  25. )
  26. type ProxyStatus struct {
  27. Name string `json:"name"`
  28. Type string `json:"type"`
  29. Status string `json:"status"`
  30. Err string `json:"err"`
  31. Cfg config.ProxyConf `json:"cfg"`
  32. // Got from server.
  33. RemoteAddr string `json:"remote_addr"`
  34. }
  35. type ProxyWrapper struct {
  36. ProxyStatus
  37. // underlying proxy
  38. pxy Proxy
  39. // if ProxyConf has healcheck config
  40. // monitor will watch if it is alive
  41. monitor *HealthCheckMonitor
  42. // event handler
  43. handler EventHandler
  44. health uint32
  45. lastSendStartMsg time.Time
  46. lastStartErr time.Time
  47. closeCh chan struct{}
  48. healthNotifyCh chan struct{}
  49. mu sync.RWMutex
  50. log.Logger
  51. }
  52. func NewProxyWrapper(cfg config.ProxyConf, eventHandler EventHandler, logPrefix string) *ProxyWrapper {
  53. baseInfo := cfg.GetBaseInfo()
  54. pw := &ProxyWrapper{
  55. ProxyStatus: ProxyStatus{
  56. Name: baseInfo.ProxyName,
  57. Type: baseInfo.ProxyType,
  58. Status: ProxyStatusNew,
  59. Cfg: cfg,
  60. },
  61. closeCh: make(chan struct{}),
  62. healthNotifyCh: make(chan struct{}),
  63. handler: eventHandler,
  64. Logger: log.NewPrefixLogger(logPrefix),
  65. }
  66. pw.AddLogPrefix(pw.Name)
  67. if baseInfo.HealthCheckType != "" {
  68. pw.health = 1 // means failed
  69. pw.monitor = NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS,
  70. baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr,
  71. baseInfo.HealthCheckUrl, pw.statusNormalCallback, pw.statusFailedCallback)
  72. pw.monitor.SetLogger(pw.Logger)
  73. pw.Trace("enable health check monitor")
  74. }
  75. pw.pxy = NewProxy(pw.Cfg)
  76. return pw
  77. }
  78. func (pw *ProxyWrapper) SetRunningStatus(remoteAddr string, respErr string) error {
  79. pw.mu.Lock()
  80. defer pw.mu.Unlock()
  81. if pw.Status != ProxyStatusWaitStart {
  82. return fmt.Errorf("status not wait start, ignore start message")
  83. }
  84. pw.RemoteAddr = remoteAddr
  85. if respErr != "" {
  86. pw.Status = ProxyStatusStartErr
  87. pw.Err = respErr
  88. pw.lastStartErr = time.Now()
  89. return fmt.Errorf(pw.Err)
  90. }
  91. if err := pw.pxy.Run(); err != nil {
  92. pw.Status = ProxyStatusStartErr
  93. pw.Err = err.Error()
  94. pw.lastStartErr = time.Now()
  95. return err
  96. }
  97. pw.Status = ProxyStatusRunning
  98. pw.Err = ""
  99. return nil
  100. }
  101. func (pw *ProxyWrapper) Start() {
  102. go pw.checkWorker()
  103. if pw.monitor != nil {
  104. go pw.monitor.Start()
  105. }
  106. }
  107. func (pw *ProxyWrapper) Stop() {
  108. pw.mu.Lock()
  109. defer pw.mu.Unlock()
  110. close(pw.closeCh)
  111. close(pw.healthNotifyCh)
  112. pw.pxy.Close()
  113. if pw.monitor != nil {
  114. pw.monitor.Stop()
  115. }
  116. pw.Status = ProxyStatusClosed
  117. pw.handler(EvCloseProxy, &CloseProxyPayload{
  118. CloseProxyMsg: &msg.CloseProxy{
  119. ProxyName: pw.Name,
  120. },
  121. })
  122. }
  123. func (pw *ProxyWrapper) checkWorker() {
  124. if pw.monitor != nil {
  125. // let monitor do check request first
  126. time.Sleep(500 * time.Millisecond)
  127. }
  128. for {
  129. // check proxy status
  130. now := time.Now()
  131. if atomic.LoadUint32(&pw.health) == 0 {
  132. pw.mu.Lock()
  133. if pw.Status == ProxyStatusNew ||
  134. pw.Status == ProxyStatusCheckFailed ||
  135. (pw.Status == ProxyStatusWaitStart && now.After(pw.lastSendStartMsg.Add(waitResponseTimeout))) ||
  136. (pw.Status == ProxyStatusStartErr && now.After(pw.lastStartErr.Add(startErrTimeout))) {
  137. pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart)
  138. pw.Status = ProxyStatusWaitStart
  139. var newProxyMsg msg.NewProxy
  140. pw.Cfg.MarshalToMsg(&newProxyMsg)
  141. pw.lastSendStartMsg = now
  142. pw.handler(EvStartProxy, &StartProxyPayload{
  143. NewProxyMsg: &newProxyMsg,
  144. })
  145. }
  146. pw.mu.Unlock()
  147. } else {
  148. pw.mu.Lock()
  149. if pw.Status == ProxyStatusRunning || pw.Status == ProxyStatusWaitStart {
  150. pw.handler(EvCloseProxy, &CloseProxyPayload{
  151. CloseProxyMsg: &msg.CloseProxy{
  152. ProxyName: pw.Name,
  153. },
  154. })
  155. pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed)
  156. pw.Status = ProxyStatusCheckFailed
  157. }
  158. pw.mu.Unlock()
  159. }
  160. select {
  161. case <-pw.closeCh:
  162. return
  163. case <-time.After(statusCheckInterval):
  164. case <-pw.healthNotifyCh:
  165. }
  166. }
  167. }
  168. func (pw *ProxyWrapper) statusNormalCallback() {
  169. atomic.StoreUint32(&pw.health, 0)
  170. errors.PanicToError(func() {
  171. select {
  172. case pw.healthNotifyCh <- struct{}{}:
  173. default:
  174. }
  175. })
  176. pw.Info("health check success")
  177. }
  178. func (pw *ProxyWrapper) statusFailedCallback() {
  179. atomic.StoreUint32(&pw.health, 1)
  180. errors.PanicToError(func() {
  181. select {
  182. case pw.healthNotifyCh <- struct{}{}:
  183. default:
  184. }
  185. })
  186. pw.Info("health check failed")
  187. }
  188. func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
  189. pw.mu.RLock()
  190. pxy := pw.pxy
  191. pw.mu.RUnlock()
  192. if pxy != nil {
  193. workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  194. go pxy.InWorkConn(workConn)
  195. } else {
  196. workConn.Close()
  197. }
  198. }
  199. func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
  200. pw.mu.RLock()
  201. defer pw.mu.RUnlock()
  202. ps := &ProxyStatus{
  203. Name: pw.Name,
  204. Type: pw.Type,
  205. Status: pw.Status,
  206. Err: pw.Err,
  207. Cfg: pw.Cfg,
  208. RemoteAddr: pw.RemoteAddr,
  209. }
  210. return ps
  211. }