proxy_wrapper.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package client
  2. import (
  3. "fmt"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. "github.com/fatedier/frp/models/config"
  8. "github.com/fatedier/frp/models/msg"
  9. "github.com/fatedier/frp/utils/log"
  10. frpNet "github.com/fatedier/frp/utils/net"
  11. )
  12. const (
  13. ProxyStatusNew = "new"
  14. ProxyStatusWaitStart = "wait start"
  15. ProxyStatusStartErr = "start error"
  16. ProxyStatusRunning = "running"
  17. ProxyStatusCheckFailed = "check failed"
  18. ProxyStatusClosed = "closed"
  19. )
  20. var (
  21. statusCheckInterval time.Duration = 3 * time.Second
  22. waitResponseTimeout = 20 * time.Second
  23. startErrTimeout = 30 * time.Second
  24. )
  25. type ProxyStatus struct {
  26. Name string `json:"name"`
  27. Type string `json:"type"`
  28. Status string `json:"status"`
  29. Err string `json:"err"`
  30. Cfg config.ProxyConf `json:"cfg"`
  31. // Got from server.
  32. RemoteAddr string `json:"remote_addr"`
  33. }
  34. type ProxyWrapper struct {
  35. ProxyStatus
  36. // underlying proxy
  37. pxy Proxy
  38. // if ProxyConf has healcheck config
  39. // monitor will watch if it is alive
  40. monitor *HealthCheckMonitor
  41. // event handler
  42. handler EventHandler
  43. health uint32
  44. lastSendStartMsg time.Time
  45. lastStartErr time.Time
  46. closeCh chan struct{}
  47. mu sync.RWMutex
  48. log.Logger
  49. }
  50. func NewProxyWrapper(cfg config.ProxyConf, eventHandler EventHandler, logPrefix string) *ProxyWrapper {
  51. baseInfo := cfg.GetBaseInfo()
  52. pw := &ProxyWrapper{
  53. ProxyStatus: ProxyStatus{
  54. Name: baseInfo.ProxyName,
  55. Type: baseInfo.ProxyType,
  56. Status: ProxyStatusNew,
  57. Cfg: cfg,
  58. },
  59. closeCh: make(chan struct{}),
  60. handler: eventHandler,
  61. Logger: log.NewPrefixLogger(logPrefix),
  62. }
  63. pw.AddLogPrefix(pw.Name)
  64. if baseInfo.HealthCheckType != "" {
  65. pw.health = 1 // means failed
  66. pw.monitor = NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS,
  67. baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr,
  68. baseInfo.HealthCheckUrl, pw.statusNormalCallback, pw.statusFailedCallback)
  69. pw.monitor.SetLogger(pw.Logger)
  70. pw.Trace("enable health check monitor")
  71. }
  72. pw.pxy = NewProxy(pw.Cfg)
  73. return pw
  74. }
  75. func (pw *ProxyWrapper) SetRunningStatus(remoteAddr string, respErr string) error {
  76. pw.mu.Lock()
  77. defer pw.mu.Unlock()
  78. if pw.Status != ProxyStatusWaitStart {
  79. return fmt.Errorf("status not wait start, ignore start message")
  80. }
  81. pw.RemoteAddr = remoteAddr
  82. if respErr != "" {
  83. pw.Status = ProxyStatusStartErr
  84. pw.Err = respErr
  85. pw.lastStartErr = time.Now()
  86. return fmt.Errorf(pw.Err)
  87. }
  88. if err := pw.pxy.Run(); err != nil {
  89. pw.Status = ProxyStatusStartErr
  90. pw.Err = err.Error()
  91. pw.lastStartErr = time.Now()
  92. return err
  93. }
  94. pw.Status = ProxyStatusRunning
  95. pw.Err = ""
  96. return nil
  97. }
  98. func (pw *ProxyWrapper) Start() {
  99. go pw.checkWorker()
  100. if pw.monitor != nil {
  101. go pw.monitor.Start()
  102. }
  103. }
  104. func (pw *ProxyWrapper) Stop() {
  105. pw.mu.Lock()
  106. defer pw.mu.Unlock()
  107. pw.pxy.Close()
  108. if pw.monitor != nil {
  109. pw.monitor.Stop()
  110. }
  111. pw.Status = ProxyStatusClosed
  112. pw.handler(EvCloseProxy, &CloseProxyPayload{
  113. CloseProxyMsg: &msg.CloseProxy{
  114. ProxyName: pw.Name,
  115. },
  116. })
  117. }
  118. func (pw *ProxyWrapper) checkWorker() {
  119. for {
  120. // check proxy status
  121. now := time.Now()
  122. if atomic.LoadUint32(&pw.health) == 0 {
  123. pw.mu.Lock()
  124. if pw.Status == ProxyStatusNew ||
  125. pw.Status == ProxyStatusCheckFailed ||
  126. (pw.Status == ProxyStatusWaitStart && now.After(pw.lastSendStartMsg.Add(waitResponseTimeout))) ||
  127. (pw.Status == ProxyStatusStartErr && now.After(pw.lastStartErr.Add(startErrTimeout))) {
  128. pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart)
  129. pw.Status = ProxyStatusWaitStart
  130. var newProxyMsg msg.NewProxy
  131. pw.Cfg.MarshalToMsg(&newProxyMsg)
  132. pw.lastSendStartMsg = now
  133. pw.handler(EvStartProxy, &StartProxyPayload{
  134. NewProxyMsg: &newProxyMsg,
  135. })
  136. }
  137. pw.mu.Unlock()
  138. } else {
  139. pw.mu.Lock()
  140. if pw.Status == ProxyStatusRunning || pw.Status == ProxyStatusWaitStart {
  141. pw.handler(EvCloseProxy, &CloseProxyPayload{
  142. CloseProxyMsg: &msg.CloseProxy{
  143. ProxyName: pw.Name,
  144. },
  145. })
  146. pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed)
  147. pw.Status = ProxyStatusCheckFailed
  148. }
  149. pw.mu.Unlock()
  150. }
  151. select {
  152. case <-pw.closeCh:
  153. return
  154. case <-time.After(statusCheckInterval):
  155. }
  156. }
  157. }
  158. func (pw *ProxyWrapper) statusNormalCallback() {
  159. atomic.StoreUint32(&pw.health, 0)
  160. pw.Info("health check success")
  161. }
  162. func (pw *ProxyWrapper) statusFailedCallback() {
  163. atomic.StoreUint32(&pw.health, 1)
  164. pw.Info("health check failed")
  165. }
  166. func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
  167. pw.mu.RLock()
  168. pxy := pw.pxy
  169. pw.mu.RUnlock()
  170. if pxy != nil {
  171. workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  172. go pxy.InWorkConn(workConn)
  173. } else {
  174. workConn.Close()
  175. }
  176. }
  177. func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
  178. pw.mu.RLock()
  179. defer pw.mu.RUnlock()
  180. ps := &ProxyStatus{
  181. Name: pw.Name,
  182. Type: pw.Type,
  183. Status: pw.Status,
  184. Err: pw.Err,
  185. Cfg: pw.Cfg,
  186. RemoteAddr: pw.RemoteAddr,
  187. }
  188. return ps
  189. }