瀏覽代碼

vnet: add exponential backoff for failed reconnections (#5035)

fatedier 3 周之前
父節點
當前提交
e025843d3c

+ 4 - 0
Release.md

@@ -1,3 +1,7 @@
 ## Features
 ## Features
 
 
 * HTTPS proxies now support load balancing groups. Multiple HTTPS proxies can be configured with the same `loadBalancer.group` and `loadBalancer.groupKey` to share the same custom domain and distribute traffic across multiple backend services, similar to the existing TCP and HTTP load balancing capabilities.
 * HTTPS proxies now support load balancing groups. Multiple HTTPS proxies can be configured with the same `loadBalancer.group` and `loadBalancer.groupKey` to share the same custom domain and distribute traffic across multiple backend services, similar to the existing TCP and HTTP load balancing capabilities.
+
+## Improvements
+
+* **VirtualNet**: Implemented intelligent reconnection with exponential backoff. When connection errors occur repeatedly, the reconnect interval increases from 60s to 300s (max), reducing unnecessary reconnection attempts. Normal disconnections still reconnect quickly at 10s intervals.

+ 17 - 1
client/visitor/stcp.go

@@ -15,6 +15,7 @@
 package visitor
 package visitor
 
 
 import (
 import (
+	"fmt"
 	"io"
 	"io"
 	"net"
 	"net"
 	"strconv"
 	"strconv"
@@ -81,11 +82,22 @@ func (sv *STCPVisitor) internalConnWorker() {
 
 
 func (sv *STCPVisitor) handleConn(userConn net.Conn) {
 func (sv *STCPVisitor) handleConn(userConn net.Conn) {
 	xl := xlog.FromContextSafe(sv.ctx)
 	xl := xlog.FromContextSafe(sv.ctx)
-	defer userConn.Close()
+	var tunnelErr error
+	defer func() {
+		// If there was an error and connection supports CloseWithError, use it
+		if tunnelErr != nil {
+			if eConn, ok := userConn.(interface{ CloseWithError(error) error }); ok {
+				_ = eConn.CloseWithError(tunnelErr)
+				return
+			}
+		}
+		userConn.Close()
+	}()
 
 
 	xl.Debugf("get a new stcp user connection")
 	xl.Debugf("get a new stcp user connection")
 	visitorConn, err := sv.helper.ConnectServer()
 	visitorConn, err := sv.helper.ConnectServer()
 	if err != nil {
 	if err != nil {
+		tunnelErr = err
 		return
 		return
 	}
 	}
 	defer visitorConn.Close()
 	defer visitorConn.Close()
@@ -102,6 +114,7 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
 	err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
 	err = msg.WriteMsg(visitorConn, newVisitorConnMsg)
 	if err != nil {
 	if err != nil {
 		xl.Warnf("send newVisitorConnMsg to server error: %v", err)
 		xl.Warnf("send newVisitorConnMsg to server error: %v", err)
+		tunnelErr = err
 		return
 		return
 	}
 	}
 
 
@@ -110,12 +123,14 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
 	err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
 	err = msg.ReadMsgInto(visitorConn, &newVisitorConnRespMsg)
 	if err != nil {
 	if err != nil {
 		xl.Warnf("get newVisitorConnRespMsg error: %v", err)
 		xl.Warnf("get newVisitorConnRespMsg error: %v", err)
+		tunnelErr = err
 		return
 		return
 	}
 	}
 	_ = visitorConn.SetReadDeadline(time.Time{})
 	_ = visitorConn.SetReadDeadline(time.Time{})
 
 
 	if newVisitorConnRespMsg.Error != "" {
 	if newVisitorConnRespMsg.Error != "" {
 		xl.Warnf("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
 		xl.Warnf("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
+		tunnelErr = fmt.Errorf("%s", newVisitorConnRespMsg.Error)
 		return
 		return
 	}
 	}
 
 
@@ -125,6 +140,7 @@ func (sv *STCPVisitor) handleConn(userConn net.Conn) {
 		remote, err = libio.WithEncryption(remote, []byte(sv.cfg.SecretKey))
 		remote, err = libio.WithEncryption(remote, []byte(sv.cfg.SecretKey))
 		if err != nil {
 		if err != nil {
 			xl.Errorf("create encryption stream error: %v", err)
 			xl.Errorf("create encryption stream error: %v", err)
+			tunnelErr = err
 			return
 			return
 		}
 		}
 	}
 	}

+ 1 - 1
client/visitor/visitor.go

@@ -71,7 +71,7 @@ func NewVisitor(
 				Name:           cfg.GetBaseConfig().Name,
 				Name:           cfg.GetBaseConfig().Name,
 				Ctx:            ctx,
 				Ctx:            ctx,
 				VnetController: helper.VNetController(),
 				VnetController: helper.VNetController(),
-				HandleConn: func(conn net.Conn) {
+				SendConnToVisitor: func(conn net.Conn) {
 					_ = baseVisitor.AcceptConn(conn)
 					_ = baseVisitor.AcceptConn(conn)
 				},
 				},
 			},
 			},

+ 11 - 0
client/visitor/xtcp.go

@@ -162,8 +162,16 @@ func (sv *XTCPVisitor) keepTunnelOpenWorker() {
 func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
 func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
 	xl := xlog.FromContextSafe(sv.ctx)
 	xl := xlog.FromContextSafe(sv.ctx)
 	isConnTransferred := false
 	isConnTransferred := false
+	var tunnelErr error
 	defer func() {
 	defer func() {
 		if !isConnTransferred {
 		if !isConnTransferred {
+			// If there was an error and connection supports CloseWithError, use it
+			if tunnelErr != nil {
+				if eConn, ok := userConn.(interface{ CloseWithError(error) error }); ok {
+					_ = eConn.CloseWithError(tunnelErr)
+					return
+				}
+			}
 			userConn.Close()
 			userConn.Close()
 		}
 		}
 	}()
 	}()
@@ -181,6 +189,8 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
 	tunnelConn, err := sv.openTunnel(ctx)
 	tunnelConn, err := sv.openTunnel(ctx)
 	if err != nil {
 	if err != nil {
 		xl.Errorf("open tunnel error: %v", err)
 		xl.Errorf("open tunnel error: %v", err)
+		tunnelErr = err
+
 		// no fallback, just return
 		// no fallback, just return
 		if sv.cfg.FallbackTo == "" {
 		if sv.cfg.FallbackTo == "" {
 			return
 			return
@@ -200,6 +210,7 @@ func (sv *XTCPVisitor) handleConn(userConn net.Conn) {
 		muxConnRWCloser, err = libio.WithEncryption(muxConnRWCloser, []byte(sv.cfg.SecretKey))
 		muxConnRWCloser, err = libio.WithEncryption(muxConnRWCloser, []byte(sv.cfg.SecretKey))
 		if err != nil {
 		if err != nil {
 			xl.Errorf("create encryption stream error: %v", err)
 			xl.Errorf("create encryption stream error: %v", err)
+			tunnelErr = err
 			return
 			return
 		}
 		}
 	}
 	}

+ 12 - 3
pkg/plugin/visitor/plugin.go

@@ -23,11 +23,20 @@ import (
 	"github.com/fatedier/frp/pkg/vnet"
 	"github.com/fatedier/frp/pkg/vnet"
 )
 )
 
 
+// PluginContext provides the necessary context and callbacks for visitor plugins.
 type PluginContext struct {
 type PluginContext struct {
-	Name           string
-	Ctx            context.Context
+	// Name is the unique identifier for this visitor, used for logging and routing.
+	Name string
+
+	// Ctx manages the plugin's lifecycle and carries the logger for structured logging.
+	Ctx context.Context
+
+	// VnetController manages TUN device routing. May be nil if virtual networking is disabled.
 	VnetController *vnet.Controller
 	VnetController *vnet.Controller
-	HandleConn     func(net.Conn)
+
+	// SendConnToVisitor sends a connection to the visitor's internal processing queue.
+	// Does not return error; failures are handled by closing the connection.
+	SendConnToVisitor func(net.Conn)
 }
 }
 
 
 // Creators is used for create plugins to handle connections.
 // Creators is used for create plugins to handle connections.

+ 36 - 8
pkg/plugin/visitor/virtual_net.go

@@ -42,6 +42,8 @@ type VirtualNetPlugin struct {
 	controllerConn net.Conn
 	controllerConn net.Conn
 	closeSignal    chan struct{}
 	closeSignal    chan struct{}
 
 
+	consecutiveErrors int // Tracks consecutive connection errors for exponential backoff
+
 	ctx    context.Context
 	ctx    context.Context
 	cancel context.CancelFunc
 	cancel context.CancelFunc
 }
 }
@@ -98,7 +100,6 @@ func (p *VirtualNetPlugin) Start() {
 
 
 func (p *VirtualNetPlugin) run() {
 func (p *VirtualNetPlugin) run() {
 	xl := xlog.FromContextSafe(p.ctx)
 	xl := xlog.FromContextSafe(p.ctx)
-	reconnectDelay := 10 * time.Second
 
 
 	for {
 	for {
 		currentCloseSignal := make(chan struct{})
 		currentCloseSignal := make(chan struct{})
@@ -121,7 +122,10 @@ func (p *VirtualNetPlugin) run() {
 		p.controllerConn = controllerConn
 		p.controllerConn = controllerConn
 		p.mu.Unlock()
 		p.mu.Unlock()
 
 
-		pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func() {
+		// Wrap with CloseNotifyConn which supports both close notification and error recording
+		var closeErr error
+		pluginNotifyConn := netutil.WrapCloseNotifyConn(pluginConn, func(err error) {
+			closeErr = err
 			close(currentCloseSignal) // Signal the run loop on close.
 			close(currentCloseSignal) // Signal the run loop on close.
 		})
 		})
 
 
@@ -129,9 +133,9 @@ func (p *VirtualNetPlugin) run() {
 		p.pluginCtx.VnetController.RegisterClientRoute(p.ctx, p.pluginCtx.Name, p.routes, controllerConn)
 		p.pluginCtx.VnetController.RegisterClientRoute(p.ctx, p.pluginCtx.Name, p.routes, controllerConn)
 		xl.Infof("successfully registered client route for visitor [%s]. Starting connection handler with CloseNotifyConn.", p.pluginCtx.Name)
 		xl.Infof("successfully registered client route for visitor [%s]. Starting connection handler with CloseNotifyConn.", p.pluginCtx.Name)
 
 
-		// Pass the CloseNotifyConn to HandleConn.
-		// HandleConn is responsible for calling Close() on pluginNotifyConn.
-		p.pluginCtx.HandleConn(pluginNotifyConn)
+		// Pass the CloseNotifyConn to the visitor for handling.
+		// The visitor can call CloseWithError to record the failure reason.
+		p.pluginCtx.SendConnToVisitor(pluginNotifyConn)
 
 
 		// Wait for context cancellation or connection close.
 		// Wait for context cancellation or connection close.
 		select {
 		select {
@@ -140,8 +144,32 @@ func (p *VirtualNetPlugin) run() {
 			p.cleanupControllerConn(xl)
 			p.cleanupControllerConn(xl)
 			return
 			return
 		case <-currentCloseSignal:
 		case <-currentCloseSignal:
-			xl.Infof("detected connection closed via CloseNotifyConn for visitor [%s].", p.pluginCtx.Name)
-			// HandleConn closed the plugin side. Close the controller side.
+			// Determine reconnect delay based on error with exponential backoff
+			var reconnectDelay time.Duration
+			if closeErr != nil {
+				p.consecutiveErrors++
+				xl.Warnf("connection closed with error for visitor [%s] (consecutive errors: %d): %v",
+					p.pluginCtx.Name, p.consecutiveErrors, closeErr)
+
+				// Exponential backoff: 60s, 120s, 240s, 300s (capped)
+				baseDelay := 60 * time.Second
+				reconnectDelay = baseDelay * time.Duration(1<<uint(p.consecutiveErrors-1))
+				if reconnectDelay > 300*time.Second {
+					reconnectDelay = 300 * time.Second
+				}
+			} else {
+				// Reset consecutive errors on successful connection
+				if p.consecutiveErrors > 0 {
+					xl.Infof("connection closed normally for visitor [%s], resetting error counter (was %d)",
+						p.pluginCtx.Name, p.consecutiveErrors)
+					p.consecutiveErrors = 0
+				} else {
+					xl.Infof("connection closed normally for visitor [%s]", p.pluginCtx.Name)
+				}
+				reconnectDelay = 10 * time.Second
+			}
+
+			// The visitor closed the plugin side. Close the controller side.
 			p.cleanupControllerConn(xl)
 			p.cleanupControllerConn(xl)
 
 
 			xl.Infof("waiting %v before attempting reconnection for visitor [%s]...", reconnectDelay, p.pluginCtx.Name)
 			xl.Infof("waiting %v before attempting reconnection for visitor [%s]...", reconnectDelay, p.pluginCtx.Name)
@@ -184,7 +212,7 @@ func (p *VirtualNetPlugin) Close() error {
 	}
 	}
 
 
 	// Explicitly close the controller side of the pipe.
 	// Explicitly close the controller side of the pipe.
-	// This ensures the pipe is broken even if the run loop is stuck or HandleConn hasn't closed its end.
+	// This ensures the pipe is broken even if the run loop is stuck or the visitor hasn't closed its end.
 	p.cleanupControllerConn(xl)
 	p.cleanupControllerConn(xl)
 	xl.Infof("finished cleaning up connections during close for visitor [%s]", p.pluginCtx.Name)
 	xl.Infof("finished cleaning up connections during close for visitor [%s]", p.pluginCtx.Name)
 
 

+ 17 - 4
pkg/util/net/conn.go

@@ -135,11 +135,11 @@ type CloseNotifyConn struct {
 	// 1 means closed
 	// 1 means closed
 	closeFlag int32
 	closeFlag int32
 
 
-	closeFn func()
+	closeFn func(error)
 }
 }
 
 
-// closeFn will be only called once
-func WrapCloseNotifyConn(c net.Conn, closeFn func()) net.Conn {
+// closeFn will be only called once with the error (nil if Close() was called, non-nil if CloseWithError() was called)
+func WrapCloseNotifyConn(c net.Conn, closeFn func(error)) *CloseNotifyConn {
 	return &CloseNotifyConn{
 	return &CloseNotifyConn{
 		Conn:    c,
 		Conn:    c,
 		closeFn: closeFn,
 		closeFn: closeFn,
@@ -151,12 +151,25 @@ func (cc *CloseNotifyConn) Close() (err error) {
 	if pflag == 0 {
 	if pflag == 0 {
 		err = cc.Conn.Close()
 		err = cc.Conn.Close()
 		if cc.closeFn != nil {
 		if cc.closeFn != nil {
-			cc.closeFn()
+			cc.closeFn(nil)
 		}
 		}
 	}
 	}
 	return
 	return
 }
 }
 
 
+// CloseWithError closes the connection and passes the error to the close callback.
+func (cc *CloseNotifyConn) CloseWithError(err error) error {
+	pflag := atomic.SwapInt32(&cc.closeFlag, 1)
+	if pflag == 0 {
+		closeErr := cc.Conn.Close()
+		if cc.closeFn != nil {
+			cc.closeFn(err)
+		}
+		return closeErr
+	}
+	return nil
+}
+
 type StatsConn struct {
 type StatsConn struct {
 	net.Conn
 	net.Conn
 
 

+ 1 - 1
pkg/util/net/websocket.go

@@ -32,7 +32,7 @@ func NewWebsocketListener(ln net.Listener) (wl *WebsocketListener) {
 	muxer := http.NewServeMux()
 	muxer := http.NewServeMux()
 	muxer.Handle(FrpWebsocketPath, websocket.Handler(func(c *websocket.Conn) {
 	muxer.Handle(FrpWebsocketPath, websocket.Handler(func(c *websocket.Conn) {
 		notifyCh := make(chan struct{})
 		notifyCh := make(chan struct{})
-		conn := WrapCloseNotifyConn(c, func() {
+		conn := WrapCloseNotifyConn(c, func(_ error) {
 			close(notifyCh)
 			close(notifyCh)
 		})
 		})
 		wl.acceptCh <- conn
 		wl.acceptCh <- conn