visitor_manager.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. // Copyright 2018 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package visitor
  15. import (
  16. "context"
  17. "fmt"
  18. "net"
  19. "reflect"
  20. "sync"
  21. "time"
  22. "github.com/samber/lo"
  23. v1 "github.com/fatedier/frp/pkg/config/v1"
  24. "github.com/fatedier/frp/pkg/transport"
  25. "github.com/fatedier/frp/pkg/util/xlog"
  26. "github.com/fatedier/frp/pkg/vnet"
  27. )
  28. type Manager struct {
  29. clientCfg *v1.ClientCommonConfig
  30. cfgs map[string]v1.VisitorConfigurer
  31. visitors map[string]Visitor
  32. helper Helper
  33. checkInterval time.Duration
  34. keepVisitorsRunningOnce sync.Once
  35. mu sync.RWMutex
  36. ctx context.Context
  37. stopCh chan struct{}
  38. }
  39. func NewManager(
  40. ctx context.Context,
  41. runID string,
  42. clientCfg *v1.ClientCommonConfig,
  43. connectServer func() (net.Conn, error),
  44. msgTransporter transport.MessageTransporter,
  45. vnetController *vnet.Controller,
  46. ) *Manager {
  47. m := &Manager{
  48. clientCfg: clientCfg,
  49. cfgs: make(map[string]v1.VisitorConfigurer),
  50. visitors: make(map[string]Visitor),
  51. checkInterval: 10 * time.Second,
  52. ctx: ctx,
  53. stopCh: make(chan struct{}),
  54. }
  55. m.helper = &visitorHelperImpl{
  56. connectServerFn: connectServer,
  57. msgTransporter: msgTransporter,
  58. vnetController: vnetController,
  59. transferConnFn: m.TransferConn,
  60. runID: runID,
  61. }
  62. return m
  63. }
  64. // keepVisitorsRunning checks all visitors' status periodically, if some visitor is not running, start it.
  65. // It will only start after Reload is called and a new visitor is added.
  66. func (vm *Manager) keepVisitorsRunning() {
  67. xl := xlog.FromContextSafe(vm.ctx)
  68. ticker := time.NewTicker(vm.checkInterval)
  69. defer ticker.Stop()
  70. for {
  71. select {
  72. case <-vm.stopCh:
  73. xl.Tracef("gracefully shutdown visitor manager")
  74. return
  75. case <-ticker.C:
  76. vm.mu.Lock()
  77. for _, cfg := range vm.cfgs {
  78. name := cfg.GetBaseConfig().Name
  79. if _, exist := vm.visitors[name]; !exist {
  80. xl.Infof("try to start visitor [%s]", name)
  81. _ = vm.startVisitor(cfg)
  82. }
  83. }
  84. vm.mu.Unlock()
  85. }
  86. }
  87. }
  88. func (vm *Manager) Close() {
  89. vm.mu.Lock()
  90. defer vm.mu.Unlock()
  91. for _, v := range vm.visitors {
  92. v.Close()
  93. }
  94. select {
  95. case <-vm.stopCh:
  96. default:
  97. close(vm.stopCh)
  98. }
  99. }
  100. // Hold lock before calling this function.
  101. func (vm *Manager) startVisitor(cfg v1.VisitorConfigurer) (err error) {
  102. xl := xlog.FromContextSafe(vm.ctx)
  103. name := cfg.GetBaseConfig().Name
  104. visitor, err := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.helper)
  105. if err != nil {
  106. xl.Warnf("new visitor error: %v", err)
  107. return
  108. }
  109. err = visitor.Run()
  110. if err != nil {
  111. xl.Warnf("start error: %v", err)
  112. } else {
  113. vm.visitors[name] = visitor
  114. xl.Infof("start visitor success")
  115. }
  116. return
  117. }
  118. func (vm *Manager) UpdateAll(cfgs []v1.VisitorConfigurer) {
  119. if len(cfgs) > 0 {
  120. // Only start keepVisitorsRunning goroutine once and only when there is at least one visitor.
  121. vm.keepVisitorsRunningOnce.Do(func() {
  122. go vm.keepVisitorsRunning()
  123. })
  124. }
  125. xl := xlog.FromContextSafe(vm.ctx)
  126. cfgsMap := lo.KeyBy(cfgs, func(c v1.VisitorConfigurer) string {
  127. return c.GetBaseConfig().Name
  128. })
  129. vm.mu.Lock()
  130. defer vm.mu.Unlock()
  131. delNames := make([]string, 0)
  132. for name, oldCfg := range vm.cfgs {
  133. del := false
  134. cfg, ok := cfgsMap[name]
  135. if !ok || !reflect.DeepEqual(oldCfg, cfg) {
  136. del = true
  137. }
  138. if del {
  139. delNames = append(delNames, name)
  140. delete(vm.cfgs, name)
  141. if visitor, ok := vm.visitors[name]; ok {
  142. visitor.Close()
  143. }
  144. delete(vm.visitors, name)
  145. }
  146. }
  147. if len(delNames) > 0 {
  148. xl.Infof("visitor removed: %v", delNames)
  149. }
  150. addNames := make([]string, 0)
  151. for _, cfg := range cfgs {
  152. name := cfg.GetBaseConfig().Name
  153. if _, ok := vm.cfgs[name]; !ok {
  154. vm.cfgs[name] = cfg
  155. addNames = append(addNames, name)
  156. _ = vm.startVisitor(cfg)
  157. }
  158. }
  159. if len(addNames) > 0 {
  160. xl.Infof("visitor added: %v", addNames)
  161. }
  162. }
  163. // TransferConn transfers a connection to a visitor.
  164. func (vm *Manager) TransferConn(name string, conn net.Conn) error {
  165. vm.mu.RLock()
  166. defer vm.mu.RUnlock()
  167. v, ok := vm.visitors[name]
  168. if !ok {
  169. return fmt.Errorf("visitor [%s] not found", name)
  170. }
  171. return v.AcceptConn(conn)
  172. }
  173. type visitorHelperImpl struct {
  174. connectServerFn func() (net.Conn, error)
  175. msgTransporter transport.MessageTransporter
  176. vnetController *vnet.Controller
  177. transferConnFn func(name string, conn net.Conn) error
  178. runID string
  179. }
  180. func (v *visitorHelperImpl) ConnectServer() (net.Conn, error) {
  181. return v.connectServerFn()
  182. }
  183. func (v *visitorHelperImpl) TransferConn(name string, conn net.Conn) error {
  184. return v.transferConnFn(name, conn)
  185. }
  186. func (v *visitorHelperImpl) MsgTransporter() transport.MessageTransporter {
  187. return v.msgTransporter
  188. }
  189. func (v *visitorHelperImpl) VNetController() *vnet.Controller {
  190. return v.vnetController
  191. }
  192. func (v *visitorHelperImpl) RunID() string {
  193. return v.runID
  194. }