1
0

virtual_net.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. // Copyright 2025 The frp Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build !frps
  15. package visitor
  16. import (
  17. "context"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "sync"
  22. "time"
  23. v1 "github.com/fatedier/frp/pkg/config/v1"
  24. netutil "github.com/fatedier/frp/pkg/util/net"
  25. "github.com/fatedier/frp/pkg/util/xlog"
  26. )
  27. func init() {
  28. Register(v1.VisitorPluginVirtualNet, NewVirtualNetPlugin)
  29. }
  30. type VirtualNetPlugin struct {
  31. pluginCtx PluginContext
  32. routes []net.IPNet
  33. mu sync.Mutex
  34. controllerConn net.Conn
  35. closeSignal chan struct{}
  36. consecutiveErrors int // Tracks consecutive connection errors for exponential backoff
  37. ctx context.Context
  38. cancel context.CancelFunc
  39. }
  40. func NewVirtualNetPlugin(pluginCtx PluginContext, options v1.VisitorPluginOptions) (Plugin, error) {
  41. opts := options.(*v1.VirtualNetVisitorPluginOptions)
  42. p := &VirtualNetPlugin{
  43. pluginCtx: pluginCtx,
  44. routes: make([]net.IPNet, 0),
  45. }
  46. p.ctx, p.cancel = context.WithCancel(pluginCtx.Ctx)
  47. if opts.DestinationIP == "" {
  48. return nil, errors.New("destinationIP is required")
  49. }
  50. // Parse DestinationIP and create a host route.
  51. ip := net.ParseIP(opts.DestinationIP)
  52. if ip == nil {
  53. return nil, fmt.Errorf("invalid destination IP address [%s]", opts.DestinationIP)
  54. }
  55. var mask net.IPMask
  56. if ip.To4() != nil {
  57. mask = net.CIDRMask(32, 32) // /32 for IPv4
  58. } else {
  59. mask = net.CIDRMask(128, 128) // /128 for IPv6
  60. }
  61. p.routes = append(p.routes, net.IPNet{IP: ip, Mask: mask})
  62. return p, nil
  63. }
  64. func (p *VirtualNetPlugin) Name() string {
  65. return v1.VisitorPluginVirtualNet
  66. }
  67. func (p *VirtualNetPlugin) Start() {
  68. xl := xlog.FromContextSafe(p.pluginCtx.Ctx)
  69. if p.pluginCtx.VnetController == nil {
  70. return
  71. }
  72. routeStr := "unknown"
  73. if len(p.routes) > 0 {
  74. routeStr = p.routes[0].String()
  75. }
  76. xl.Infof("starting VirtualNetPlugin for visitor [%s], attempting to register routes for %s", p.pluginCtx.Name, routeStr)
  77. go p.run()
  78. }
  79. func (p *VirtualNetPlugin) run() {
  80. xl := xlog.FromContextSafe(p.ctx)
  81. for {
  82. currentCloseSignal := make(chan struct{})
  83. p.mu.Lock()
  84. p.closeSignal = currentCloseSignal
  85. p.mu.Unlock()
  86. select {
  87. case <-p.ctx.Done():
  88. xl.Infof("VirtualNetPlugin run loop for visitor [%s] stopping (context cancelled before pipe creation).", p.pluginCtx.Name)
  89. p.cleanupControllerConn(xl)
  90. return
  91. default:
  92. }
  93. controllerConn, pluginConn := net.Pipe()
  94. p.mu.Lock()
  95. p.controllerConn = controllerConn
  96. p.mu.Unlock()
  97. // Wrap with CloseNotifyConn which supports both close notification and error recording
  98. var closeErr error
  99. pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func(err error) {
  100. closeErr = err
  101. close(currentCloseSignal) // Signal the run loop on close.
  102. })
  103. xl.Infof("attempting to register client route for visitor [%s]", p.pluginCtx.Name)
  104. p.pluginCtx.VnetController.RegisterClientRoute(p.ctx, p.pluginCtx.Name, p.routes, controllerConn)
  105. xl.Infof("successfully registered client route for visitor [%s]. Starting connection handler with CloseNotifyConn.", p.pluginCtx.Name)
  106. // Pass the CloseNotifyConn to the visitor for handling.
  107. // The visitor can call CloseWithError to record the failure reason.
  108. p.pluginCtx.SendConnToVisitor(pluginNotifyConn)
  109. // Wait for context cancellation or connection close.
  110. select {
  111. case <-p.ctx.Done():
  112. xl.Infof("VirtualNetPlugin run loop stopping for visitor [%s] (context cancelled while waiting).", p.pluginCtx.Name)
  113. p.cleanupControllerConn(xl)
  114. return
  115. case <-currentCloseSignal:
  116. // Determine reconnect delay based on error with exponential backoff
  117. var reconnectDelay time.Duration
  118. if closeErr != nil {
  119. p.consecutiveErrors++
  120. xl.Warnf("connection closed with error for visitor [%s] (consecutive errors: %d): %v",
  121. p.pluginCtx.Name, p.consecutiveErrors, closeErr)
  122. // Exponential backoff: 60s, 120s, 240s, 300s (capped)
  123. baseDelay := 60 * time.Second
  124. reconnectDelay = baseDelay * time.Duration(1<<uint(p.consecutiveErrors-1))
  125. if reconnectDelay > 300*time.Second {
  126. reconnectDelay = 300 * time.Second
  127. }
  128. } else {
  129. // Reset consecutive errors on successful connection
  130. if p.consecutiveErrors > 0 {
  131. xl.Infof("connection closed normally for visitor [%s], resetting error counter (was %d)",
  132. p.pluginCtx.Name, p.consecutiveErrors)
  133. p.consecutiveErrors = 0
  134. } else {
  135. xl.Infof("connection closed normally for visitor [%s]", p.pluginCtx.Name)
  136. }
  137. reconnectDelay = 10 * time.Second
  138. }
  139. // The visitor closed the plugin side. Close the controller side.
  140. p.cleanupControllerConn(xl)
  141. xl.Infof("waiting %v before attempting reconnection for visitor [%s]...", reconnectDelay, p.pluginCtx.Name)
  142. select {
  143. case <-time.After(reconnectDelay):
  144. case <-p.ctx.Done():
  145. xl.Infof("VirtualNetPlugin reconnection delay interrupted for visitor [%s]", p.pluginCtx.Name)
  146. return
  147. }
  148. }
  149. xl.Infof("re-establishing virtual connection for visitor [%s]...", p.pluginCtx.Name)
  150. }
  151. }
  152. // cleanupControllerConn closes the current controllerConn (if it exists) under lock.
  153. func (p *VirtualNetPlugin) cleanupControllerConn(xl *xlog.Logger) {
  154. p.mu.Lock()
  155. defer p.mu.Unlock()
  156. if p.controllerConn != nil {
  157. xl.Debugf("cleaning up controllerConn for visitor [%s]", p.pluginCtx.Name)
  158. p.controllerConn.Close()
  159. p.controllerConn = nil
  160. }
  161. p.closeSignal = nil
  162. }
  163. // Close initiates the plugin shutdown.
  164. func (p *VirtualNetPlugin) Close() error {
  165. xl := xlog.FromContextSafe(p.pluginCtx.Ctx)
  166. xl.Infof("closing VirtualNetPlugin for visitor [%s]", p.pluginCtx.Name)
  167. // Signal the run loop goroutine to stop.
  168. p.cancel()
  169. // Unregister the route from the controller.
  170. if p.pluginCtx.VnetController != nil {
  171. p.pluginCtx.VnetController.UnregisterClientRoute(p.pluginCtx.Name)
  172. xl.Infof("unregistered client route for visitor [%s]", p.pluginCtx.Name)
  173. }
  174. // Explicitly close the controller side of the pipe.
  175. // This ensures the pipe is broken even if the run loop is stuck or the visitor hasn't closed its end.
  176. p.cleanupControllerConn(xl)
  177. xl.Infof("finished cleaning up connections during close for visitor [%s]", p.pluginCtx.Name)
  178. return nil
  179. }