Browse Source

refactor: use MessageSender interface for message transporter (#5083)

fatedier 6 days ago
parent
commit
2e2802ea13
5 changed files with 14 additions and 12 deletions
  1. 4 0
      Release.md
  2. 1 1
      client/control.go
  3. 0 4
      pkg/msg/handler.go
  4. 8 6
      pkg/transport/message.go
  5. 1 1
      server/control.go

+ 4 - 0
Release.md

@@ -6,3 +6,7 @@
 ## Improvements
 
 * **VirtualNet**: Implemented intelligent reconnection with exponential backoff. When connection errors occur repeatedly, the reconnect interval increases from 60s to 300s (max), reducing unnecessary reconnection attempts. Normal disconnections still reconnect quickly at 10s intervals.
+
+## Fixes
+
+* Fix deadlock issue when TCP connection is closed. Previously, sending messages could block forever if the connection handler had already stopped.

+ 1 - 1
client/control.go

@@ -100,7 +100,7 @@ func NewControl(ctx context.Context, sessionCtx *SessionContext) (*Control, erro
 		ctl.msgDispatcher = msg.NewDispatcher(sessionCtx.Conn)
 	}
 	ctl.registerMsgHandlers()
-	ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
+	ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher)
 
 	ctl.pm = proxy.NewManager(ctl.ctx, sessionCtx.Common, ctl.msgTransporter, sessionCtx.VnetController)
 	ctl.vm = visitor.NewManager(ctl.ctx, sessionCtx.RunID, sessionCtx.Common,

+ 0 - 4
pkg/msg/handler.go

@@ -86,10 +86,6 @@ func (d *Dispatcher) Send(m Message) error {
 	}
 }
 
-func (d *Dispatcher) SendChannel() chan Message {
-	return d.sendCh
-}
-
 func (d *Dispatcher) RegisterHandler(msg Message, handler func(Message)) {
 	d.msgHandlers[reflect.TypeOf(msg)] = handler
 }

+ 8 - 6
pkg/transport/message.go

@@ -35,15 +35,19 @@ type MessageTransporter interface {
 	DispatchWithType(m msg.Message, msgType, laneKey string) bool
 }
 
-func NewMessageTransporter(sendCh chan msg.Message) MessageTransporter {
+type MessageSender interface {
+	Send(msg.Message) error
+}
+
+func NewMessageTransporter(sender MessageSender) MessageTransporter {
 	return &transporterImpl{
-		sendCh:   sendCh,
+		sender:   sender,
 		registry: make(map[string]map[string]chan msg.Message),
 	}
 }
 
 type transporterImpl struct {
-	sendCh chan msg.Message
+	sender MessageSender
 
 	// First key is message type and second key is lane key.
 	// Dispatch will dispatch message to related channel by its message type
@@ -53,9 +57,7 @@ type transporterImpl struct {
 }
 
 func (impl *transporterImpl) Send(m msg.Message) error {
-	return errors.PanicToError(func() {
-		impl.sendCh <- m
-	})
+	return impl.sender.Send(m)
 }
 
 func (impl *transporterImpl) Do(ctx context.Context, req msg.Message, laneKey, recvMsgType string) (msg.Message, error) {

+ 1 - 1
server/control.go

@@ -195,7 +195,7 @@ func NewControl(
 		ctl.msgDispatcher = msg.NewDispatcher(ctl.conn)
 	}
 	ctl.registerMsgHandlers()
-	ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher.SendChannel())
+	ctl.msgTransporter = transport.NewMessageTransporter(ctl.msgDispatcher)
 	return ctl, nil
 }