1
0

message.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. // Copyright 2023 The frp Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package transport
  15. import (
  16. "context"
  17. "reflect"
  18. "sync"
  19. "github.com/fatedier/golib/errors"
  20. "github.com/fatedier/frp/pkg/msg"
  21. )
  22. type MessageTransporter interface {
  23. Send(msg.Message) error
  24. // Recv(ctx context.Context, laneKey string, msgType string) (Message, error)
  25. // Do will first send msg, then recv msg with the same laneKey and specified msgType.
  26. Do(ctx context.Context, req msg.Message, laneKey, recvMsgType string) (msg.Message, error)
  27. Dispatch(m msg.Message, laneKey string) bool
  28. DispatchWithType(m msg.Message, msgType, laneKey string) bool
  29. }
  30. func NewMessageTransporter(sendCh chan msg.Message) MessageTransporter {
  31. return &transporterImpl{
  32. sendCh: sendCh,
  33. registry: make(map[string]map[string]chan msg.Message),
  34. }
  35. }
  36. type transporterImpl struct {
  37. sendCh chan msg.Message
  38. // First key is message type and second key is lane key.
  39. // Dispatch will dispatch message to releated channel by its message type
  40. // and lane key.
  41. registry map[string]map[string]chan msg.Message
  42. mu sync.RWMutex
  43. }
  44. func (impl *transporterImpl) Send(m msg.Message) error {
  45. return errors.PanicToError(func() {
  46. impl.sendCh <- m
  47. })
  48. }
  49. func (impl *transporterImpl) Do(ctx context.Context, req msg.Message, laneKey, recvMsgType string) (msg.Message, error) {
  50. ch := make(chan msg.Message, 1)
  51. defer close(ch)
  52. unregisterFn := impl.registerMsgChan(ch, laneKey, recvMsgType)
  53. defer unregisterFn()
  54. if err := impl.Send(req); err != nil {
  55. return nil, err
  56. }
  57. select {
  58. case <-ctx.Done():
  59. return nil, ctx.Err()
  60. case resp := <-ch:
  61. return resp, nil
  62. }
  63. }
  64. func (impl *transporterImpl) DispatchWithType(m msg.Message, msgType, laneKey string) bool {
  65. var ch chan msg.Message
  66. impl.mu.RLock()
  67. byLaneKey, ok := impl.registry[msgType]
  68. if ok {
  69. ch = byLaneKey[laneKey]
  70. }
  71. impl.mu.RUnlock()
  72. if ch == nil {
  73. return false
  74. }
  75. if err := errors.PanicToError(func() {
  76. ch <- m
  77. }); err != nil {
  78. return false
  79. }
  80. return true
  81. }
  82. func (impl *transporterImpl) Dispatch(m msg.Message, laneKey string) bool {
  83. msgType := reflect.TypeOf(m).Elem().Name()
  84. return impl.DispatchWithType(m, msgType, laneKey)
  85. }
  86. func (impl *transporterImpl) registerMsgChan(recvCh chan msg.Message, laneKey string, msgType string) (unregister func()) {
  87. impl.mu.Lock()
  88. byLaneKey, ok := impl.registry[msgType]
  89. if !ok {
  90. byLaneKey = make(map[string]chan msg.Message)
  91. impl.registry[msgType] = byLaneKey
  92. }
  93. byLaneKey[laneKey] = recvCh
  94. impl.mu.Unlock()
  95. unregister = func() {
  96. impl.mu.Lock()
  97. delete(byLaneKey, laneKey)
  98. impl.mu.Unlock()
  99. }
  100. return
  101. }