1
0

virtual_net.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. // Copyright 2025 The frp Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //go:build !frps
  15. package visitor
  16. import (
  17. "context"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "sync"
  22. "time"
  23. v1 "github.com/fatedier/frp/pkg/config/v1"
  24. netutil "github.com/fatedier/frp/pkg/util/net"
  25. "github.com/fatedier/frp/pkg/util/xlog"
  26. )
  27. func init() {
  28. Register(v1.VisitorPluginVirtualNet, NewVirtualNetPlugin)
  29. }
  30. type VirtualNetPlugin struct {
  31. pluginCtx PluginContext
  32. routes []net.IPNet
  33. mu sync.Mutex
  34. controllerConn net.Conn
  35. closeSignal chan struct{}
  36. ctx context.Context
  37. cancel context.CancelFunc
  38. }
  39. func NewVirtualNetPlugin(pluginCtx PluginContext, options v1.VisitorPluginOptions) (Plugin, error) {
  40. opts := options.(*v1.VirtualNetVisitorPluginOptions)
  41. p := &VirtualNetPlugin{
  42. pluginCtx: pluginCtx,
  43. routes: make([]net.IPNet, 0),
  44. }
  45. p.ctx, p.cancel = context.WithCancel(pluginCtx.Ctx)
  46. if opts.DestinationIP == "" {
  47. return nil, errors.New("destinationIP is required")
  48. }
  49. // Parse DestinationIP as a single IP and create a host route
  50. ip := net.ParseIP(opts.DestinationIP)
  51. if ip == nil {
  52. return nil, fmt.Errorf("invalid destination IP address [%s]", opts.DestinationIP)
  53. }
  54. var mask net.IPMask
  55. if ip.To4() != nil {
  56. mask = net.CIDRMask(32, 32) // /32 for IPv4
  57. } else {
  58. mask = net.CIDRMask(128, 128) // /128 for IPv6
  59. }
  60. p.routes = append(p.routes, net.IPNet{IP: ip, Mask: mask})
  61. return p, nil
  62. }
  63. func (p *VirtualNetPlugin) Name() string {
  64. return v1.VisitorPluginVirtualNet
  65. }
  66. func (p *VirtualNetPlugin) Start() {
  67. xl := xlog.FromContextSafe(p.pluginCtx.Ctx)
  68. if p.pluginCtx.VnetController == nil {
  69. return
  70. }
  71. routeStr := "unknown"
  72. if len(p.routes) > 0 {
  73. routeStr = p.routes[0].String()
  74. }
  75. xl.Infof("Starting VirtualNetPlugin for visitor [%s], attempting to register routes for %s", p.pluginCtx.Name, routeStr)
  76. go p.run()
  77. }
  78. func (p *VirtualNetPlugin) run() {
  79. xl := xlog.FromContextSafe(p.ctx)
  80. reconnectDelay := 10 * time.Second
  81. for {
  82. // Create a signal channel for this connection attempt
  83. currentCloseSignal := make(chan struct{})
  84. // Store the signal channel under lock
  85. p.mu.Lock()
  86. p.closeSignal = currentCloseSignal
  87. p.mu.Unlock()
  88. select {
  89. case <-p.ctx.Done():
  90. xl.Infof("VirtualNetPlugin run loop for visitor [%s] stopping (context cancelled before pipe creation).", p.pluginCtx.Name)
  91. // Ensure controllerConn from previous loop is cleaned up if necessary
  92. p.cleanupControllerConn(xl)
  93. return
  94. default:
  95. }
  96. controllerConn, pluginConn := net.Pipe()
  97. // Store controllerConn under lock for cleanup purposes
  98. p.mu.Lock()
  99. p.controllerConn = controllerConn
  100. p.mu.Unlock()
  101. // Wrap pluginConn using CloseNotifyConn
  102. pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func() {
  103. close(currentCloseSignal) // Signal the run loop
  104. })
  105. xl.Infof("Attempting to register client route for visitor [%s]", p.pluginCtx.Name)
  106. err := p.pluginCtx.VnetController.RegisterClientRoute(p.ctx, p.pluginCtx.Name, p.routes, controllerConn)
  107. if err != nil {
  108. xl.Errorf("Failed to register client route for visitor [%s]: %v. Retrying after %v", p.pluginCtx.Name, err, reconnectDelay)
  109. p.cleanupPipePair(xl, controllerConn, pluginConn) // Close both ends on registration failure
  110. // Wait before retrying registration, unless context is cancelled
  111. select {
  112. case <-time.After(reconnectDelay):
  113. continue // Retry the loop
  114. case <-p.ctx.Done():
  115. xl.Infof("VirtualNetPlugin registration retry wait interrupted for visitor [%s]", p.pluginCtx.Name)
  116. return // Exit loop if context is cancelled during wait
  117. }
  118. }
  119. xl.Infof("Successfully registered client route for visitor [%s]. Starting connection handler with CloseNotifyConn.", p.pluginCtx.Name)
  120. // Pass the CloseNotifyConn to HandleConn.
  121. // HandleConn is responsible for calling Close() on pluginNotifyConn.
  122. p.pluginCtx.HandleConn(pluginNotifyConn)
  123. // Wait for either the plugin context to be cancelled or the wrapper's Close() to be called via the signal channel.
  124. select {
  125. case <-p.ctx.Done():
  126. xl.Infof("VirtualNetPlugin run loop stopping for visitor [%s] (context cancelled while waiting).", p.pluginCtx.Name)
  127. // Context cancelled, ensure controller side is closed if HandleConn didn't close its side yet.
  128. p.cleanupControllerConn(xl)
  129. return
  130. case <-currentCloseSignal:
  131. xl.Infof("Detected connection closed via CloseNotifyConn for visitor [%s].", p.pluginCtx.Name)
  132. // HandleConn closed the plugin side (pluginNotifyConn). The closeFn was called, closing currentCloseSignal.
  133. // We still need to close the controller side.
  134. p.cleanupControllerConn(xl)
  135. // Add a delay before attempting to reconnect, respecting context cancellation.
  136. xl.Infof("Waiting %v before attempting reconnection for visitor [%s]...", reconnectDelay, p.pluginCtx.Name)
  137. select {
  138. case <-time.After(reconnectDelay):
  139. // Delay completed, loop will continue.
  140. case <-p.ctx.Done():
  141. xl.Infof("VirtualNetPlugin reconnection delay interrupted for visitor [%s]", p.pluginCtx.Name)
  142. return // Exit loop if context is cancelled during wait
  143. }
  144. // Loop will continue to reconnect.
  145. }
  146. // Loop will restart, context check at the beginning of the loop is sufficient.
  147. xl.Infof("Re-establishing virtual connection for visitor [%s]...", p.pluginCtx.Name)
  148. }
  149. }
  150. // cleanupControllerConn closes the current controllerConn (if it exists) under lock.
  151. func (p *VirtualNetPlugin) cleanupControllerConn(xl *xlog.Logger) {
  152. p.mu.Lock()
  153. defer p.mu.Unlock()
  154. if p.controllerConn != nil {
  155. xl.Debugf("Cleaning up controllerConn for visitor [%s]", p.pluginCtx.Name)
  156. p.controllerConn.Close()
  157. p.controllerConn = nil
  158. }
  159. // Also clear the closeSignal reference for the completed/cancelled connection attempt
  160. p.closeSignal = nil
  161. }
  162. // cleanupPipePair closes both ends of a pipe, used typically when registration fails.
  163. func (p *VirtualNetPlugin) cleanupPipePair(xl *xlog.Logger, controllerConn, pluginConn net.Conn) {
  164. xl.Debugf("Cleaning up pipe pair for visitor [%s] after registration failure", p.pluginCtx.Name)
  165. controllerConn.Close()
  166. pluginConn.Close()
  167. p.mu.Lock()
  168. p.controllerConn = nil // Ensure field is nil if it was briefly set
  169. p.closeSignal = nil // Ensure field is nil if it was briefly set
  170. p.mu.Unlock()
  171. }
  172. // Close initiates the plugin shutdown.
  173. func (p *VirtualNetPlugin) Close() error {
  174. xl := xlog.FromContextSafe(p.pluginCtx.Ctx) // Use base context for close logging
  175. xl.Infof("Closing VirtualNetPlugin for visitor [%s]", p.pluginCtx.Name)
  176. // 1. Signal the run loop goroutine to stop via context cancellation.
  177. p.cancel()
  178. // 2. Unregister the route from the controller.
  179. // This might implicitly cause the VnetController to close its end of the pipe (controllerConn).
  180. if p.pluginCtx.VnetController != nil {
  181. p.pluginCtx.VnetController.UnregisterClientRoute(p.pluginCtx.Name)
  182. xl.Infof("Unregistered client route for visitor [%s]", p.pluginCtx.Name)
  183. } else {
  184. xl.Warnf("VnetController is nil during close for visitor [%s], cannot unregister route", p.pluginCtx.Name)
  185. }
  186. // 3. Explicitly close the controller side of the pipe managed by this plugin.
  187. // This ensures the pipe is broken even if the run loop is stuck or HandleConn hasn't closed its end.
  188. p.cleanupControllerConn(xl)
  189. xl.Infof("Finished cleaning up connections during close for visitor [%s]", p.pluginCtx.Name)
  190. return nil
  191. }