visitor_manager.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. // Copyright 2018 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package visitor
  15. import (
  16. "context"
  17. "fmt"
  18. "net"
  19. "reflect"
  20. "sync"
  21. "time"
  22. "github.com/samber/lo"
  23. v1 "github.com/fatedier/frp/pkg/config/v1"
  24. "github.com/fatedier/frp/pkg/transport"
  25. "github.com/fatedier/frp/pkg/util/xlog"
  26. )
  27. type Manager struct {
  28. clientCfg *v1.ClientCommonConfig
  29. cfgs map[string]v1.VisitorConfigurer
  30. visitors map[string]Visitor
  31. helper Helper
  32. checkInterval time.Duration
  33. mu sync.RWMutex
  34. ctx context.Context
  35. stopCh chan struct{}
  36. }
  37. func NewManager(
  38. ctx context.Context,
  39. runID string,
  40. clientCfg *v1.ClientCommonConfig,
  41. connectServer func() (net.Conn, error),
  42. msgTransporter transport.MessageTransporter,
  43. ) *Manager {
  44. m := &Manager{
  45. clientCfg: clientCfg,
  46. cfgs: make(map[string]v1.VisitorConfigurer),
  47. visitors: make(map[string]Visitor),
  48. checkInterval: 10 * time.Second,
  49. ctx: ctx,
  50. stopCh: make(chan struct{}),
  51. }
  52. m.helper = &visitorHelperImpl{
  53. connectServerFn: connectServer,
  54. msgTransporter: msgTransporter,
  55. transferConnFn: m.TransferConn,
  56. runID: runID,
  57. }
  58. return m
  59. }
  60. func (vm *Manager) Run() {
  61. xl := xlog.FromContextSafe(vm.ctx)
  62. ticker := time.NewTicker(vm.checkInterval)
  63. defer ticker.Stop()
  64. for {
  65. select {
  66. case <-vm.stopCh:
  67. xl.Info("gracefully shutdown visitor manager")
  68. return
  69. case <-ticker.C:
  70. vm.mu.Lock()
  71. for _, cfg := range vm.cfgs {
  72. name := cfg.GetBaseConfig().Name
  73. if _, exist := vm.visitors[name]; !exist {
  74. xl.Info("try to start visitor [%s]", name)
  75. _ = vm.startVisitor(cfg)
  76. }
  77. }
  78. vm.mu.Unlock()
  79. }
  80. }
  81. }
  82. func (vm *Manager) Close() {
  83. vm.mu.Lock()
  84. defer vm.mu.Unlock()
  85. for _, v := range vm.visitors {
  86. v.Close()
  87. }
  88. select {
  89. case <-vm.stopCh:
  90. default:
  91. close(vm.stopCh)
  92. }
  93. }
  94. // Hold lock before calling this function.
  95. func (vm *Manager) startVisitor(cfg v1.VisitorConfigurer) (err error) {
  96. xl := xlog.FromContextSafe(vm.ctx)
  97. name := cfg.GetBaseConfig().Name
  98. visitor := NewVisitor(vm.ctx, cfg, vm.clientCfg, vm.helper)
  99. err = visitor.Run()
  100. if err != nil {
  101. xl.Warn("start error: %v", err)
  102. } else {
  103. vm.visitors[name] = visitor
  104. xl.Info("start visitor success")
  105. }
  106. return
  107. }
  108. func (vm *Manager) Reload(cfgs []v1.VisitorConfigurer) {
  109. xl := xlog.FromContextSafe(vm.ctx)
  110. cfgsMap := lo.KeyBy(cfgs, func(c v1.VisitorConfigurer) string {
  111. return c.GetBaseConfig().Name
  112. })
  113. vm.mu.Lock()
  114. defer vm.mu.Unlock()
  115. delNames := make([]string, 0)
  116. for name, oldCfg := range vm.cfgs {
  117. del := false
  118. cfg, ok := cfgsMap[name]
  119. if !ok || !reflect.DeepEqual(oldCfg, cfg) {
  120. del = true
  121. }
  122. if del {
  123. delNames = append(delNames, name)
  124. delete(vm.cfgs, name)
  125. if visitor, ok := vm.visitors[name]; ok {
  126. visitor.Close()
  127. }
  128. delete(vm.visitors, name)
  129. }
  130. }
  131. if len(delNames) > 0 {
  132. xl.Info("visitor removed: %v", delNames)
  133. }
  134. addNames := make([]string, 0)
  135. for _, cfg := range cfgs {
  136. name := cfg.GetBaseConfig().Name
  137. if _, ok := vm.cfgs[name]; !ok {
  138. vm.cfgs[name] = cfg
  139. addNames = append(addNames, name)
  140. _ = vm.startVisitor(cfg)
  141. }
  142. }
  143. if len(addNames) > 0 {
  144. xl.Info("visitor added: %v", addNames)
  145. }
  146. }
  147. // TransferConn transfers a connection to a visitor.
  148. func (vm *Manager) TransferConn(name string, conn net.Conn) error {
  149. vm.mu.RLock()
  150. defer vm.mu.RUnlock()
  151. v, ok := vm.visitors[name]
  152. if !ok {
  153. return fmt.Errorf("visitor [%s] not found", name)
  154. }
  155. return v.AcceptConn(conn)
  156. }
  157. type visitorHelperImpl struct {
  158. connectServerFn func() (net.Conn, error)
  159. msgTransporter transport.MessageTransporter
  160. transferConnFn func(name string, conn net.Conn) error
  161. runID string
  162. }
  163. func (v *visitorHelperImpl) ConnectServer() (net.Conn, error) {
  164. return v.connectServerFn()
  165. }
  166. func (v *visitorHelperImpl) TransferConn(name string, conn net.Conn) error {
  167. return v.transferConnFn(name, conn)
  168. }
  169. func (v *visitorHelperImpl) MsgTransporter() transport.MessageTransporter {
  170. return v.msgTransporter
  171. }
  172. func (v *visitorHelperImpl) RunID() string {
  173. return v.runID
  174. }