proxy.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "context"
  17. "io"
  18. "net"
  19. "reflect"
  20. "strconv"
  21. "sync"
  22. "time"
  23. libio "github.com/fatedier/golib/io"
  24. libnet "github.com/fatedier/golib/net"
  25. "golang.org/x/time/rate"
  26. "github.com/fatedier/frp/pkg/config/types"
  27. v1 "github.com/fatedier/frp/pkg/config/v1"
  28. "github.com/fatedier/frp/pkg/msg"
  29. plugin "github.com/fatedier/frp/pkg/plugin/client"
  30. "github.com/fatedier/frp/pkg/transport"
  31. "github.com/fatedier/frp/pkg/util/limit"
  32. netpkg "github.com/fatedier/frp/pkg/util/net"
  33. "github.com/fatedier/frp/pkg/util/xlog"
  34. "github.com/fatedier/frp/pkg/vnet"
  35. )
  36. var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, v1.ProxyConfigurer) Proxy{}
  37. func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy, v1.ProxyConfigurer) Proxy) {
  38. proxyFactoryRegistry[proxyConfType] = factory
  39. }
  40. // Proxy defines how to handle work connections for different proxy type.
  41. type Proxy interface {
  42. Run() error
  43. // InWorkConn accept work connections registered to server.
  44. InWorkConn(net.Conn, *msg.StartWorkConn)
  45. SetInWorkConnCallback(func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) /* continue */ bool)
  46. Close()
  47. }
  48. func NewProxy(
  49. ctx context.Context,
  50. pxyConf v1.ProxyConfigurer,
  51. clientCfg *v1.ClientCommonConfig,
  52. encryptionKey []byte,
  53. msgTransporter transport.MessageTransporter,
  54. vnetController *vnet.Controller,
  55. ) (pxy Proxy) {
  56. var limiter *rate.Limiter
  57. limitBytes := pxyConf.GetBaseConfig().Transport.BandwidthLimit.Bytes()
  58. if limitBytes > 0 && pxyConf.GetBaseConfig().Transport.BandwidthLimitMode == types.BandwidthLimitModeClient {
  59. limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
  60. }
  61. baseProxy := BaseProxy{
  62. baseCfg: pxyConf.GetBaseConfig(),
  63. clientCfg: clientCfg,
  64. encryptionKey: encryptionKey,
  65. limiter: limiter,
  66. msgTransporter: msgTransporter,
  67. vnetController: vnetController,
  68. xl: xlog.FromContextSafe(ctx),
  69. ctx: ctx,
  70. }
  71. factory := proxyFactoryRegistry[reflect.TypeOf(pxyConf)]
  72. if factory == nil {
  73. return nil
  74. }
  75. return factory(&baseProxy, pxyConf)
  76. }
  77. type BaseProxy struct {
  78. baseCfg *v1.ProxyBaseConfig
  79. clientCfg *v1.ClientCommonConfig
  80. encryptionKey []byte
  81. msgTransporter transport.MessageTransporter
  82. vnetController *vnet.Controller
  83. limiter *rate.Limiter
  84. // proxyPlugin is used to handle connections instead of dialing to local service.
  85. // It's only validate for TCP protocol now.
  86. proxyPlugin plugin.Plugin
  87. inWorkConnCallback func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) /* continue */ bool
  88. mu sync.RWMutex
  89. xl *xlog.Logger
  90. ctx context.Context
  91. }
  92. func (pxy *BaseProxy) Run() error {
  93. if pxy.baseCfg.Plugin.Type != "" {
  94. p, err := plugin.Create(pxy.baseCfg.Plugin.Type, plugin.PluginContext{
  95. Name: pxy.baseCfg.Name,
  96. VnetController: pxy.vnetController,
  97. }, pxy.baseCfg.Plugin.ClientPluginOptions)
  98. if err != nil {
  99. return err
  100. }
  101. pxy.proxyPlugin = p
  102. }
  103. return nil
  104. }
  105. func (pxy *BaseProxy) Close() {
  106. if pxy.proxyPlugin != nil {
  107. pxy.proxyPlugin.Close()
  108. }
  109. }
  110. func (pxy *BaseProxy) SetInWorkConnCallback(cb func(*v1.ProxyBaseConfig, net.Conn, *msg.StartWorkConn) bool) {
  111. pxy.inWorkConnCallback = cb
  112. }
  113. func (pxy *BaseProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  114. if pxy.inWorkConnCallback != nil {
  115. if !pxy.inWorkConnCallback(pxy.baseCfg, conn, m) {
  116. return
  117. }
  118. }
  119. pxy.HandleTCPWorkConnection(conn, m, pxy.encryptionKey)
  120. }
  121. // Common handler for tcp work connections.
  122. func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWorkConn, encKey []byte) {
  123. xl := pxy.xl
  124. baseCfg := pxy.baseCfg
  125. var (
  126. remote io.ReadWriteCloser
  127. err error
  128. )
  129. remote = workConn
  130. if pxy.limiter != nil {
  131. remote = libio.WrapReadWriteCloser(limit.NewReader(workConn, pxy.limiter), limit.NewWriter(workConn, pxy.limiter), func() error {
  132. return workConn.Close()
  133. })
  134. }
  135. xl.Tracef("handle tcp work connection, useEncryption: %t, useCompression: %t",
  136. baseCfg.Transport.UseEncryption, baseCfg.Transport.UseCompression)
  137. if baseCfg.Transport.UseEncryption {
  138. remote, err = libio.WithEncryption(remote, encKey)
  139. if err != nil {
  140. workConn.Close()
  141. xl.Errorf("create encryption stream error: %v", err)
  142. return
  143. }
  144. }
  145. var compressionResourceRecycleFn func()
  146. if baseCfg.Transport.UseCompression {
  147. remote, compressionResourceRecycleFn = libio.WithCompressionFromPool(remote)
  148. }
  149. // check if we need to send proxy protocol info
  150. var connInfo plugin.ConnectionInfo
  151. if m.SrcAddr != "" && m.SrcPort != 0 {
  152. if m.DstAddr == "" {
  153. m.DstAddr = "127.0.0.1"
  154. }
  155. srcAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.SrcAddr, strconv.Itoa(int(m.SrcPort))))
  156. dstAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.DstAddr, strconv.Itoa(int(m.DstPort))))
  157. connInfo.SrcAddr = srcAddr
  158. connInfo.DstAddr = dstAddr
  159. }
  160. if baseCfg.Transport.ProxyProtocolVersion != "" && m.SrcAddr != "" && m.SrcPort != 0 {
  161. // Use the common proxy protocol builder function
  162. header := netpkg.BuildProxyProtocolHeaderStruct(connInfo.SrcAddr, connInfo.DstAddr, baseCfg.Transport.ProxyProtocolVersion)
  163. connInfo.ProxyProtocolHeader = header
  164. }
  165. connInfo.Conn = remote
  166. connInfo.UnderlyingConn = workConn
  167. if pxy.proxyPlugin != nil {
  168. // if plugin is set, let plugin handle connection first
  169. xl.Debugf("handle by plugin: %s", pxy.proxyPlugin.Name())
  170. pxy.proxyPlugin.Handle(pxy.ctx, &connInfo)
  171. xl.Debugf("handle by plugin finished")
  172. return
  173. }
  174. localConn, err := libnet.Dial(
  175. net.JoinHostPort(baseCfg.LocalIP, strconv.Itoa(baseCfg.LocalPort)),
  176. libnet.WithTimeout(10*time.Second),
  177. )
  178. if err != nil {
  179. workConn.Close()
  180. xl.Errorf("connect to local service [%s:%d] error: %v", baseCfg.LocalIP, baseCfg.LocalPort, err)
  181. return
  182. }
  183. xl.Debugf("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
  184. localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  185. if connInfo.ProxyProtocolHeader != nil {
  186. if _, err := connInfo.ProxyProtocolHeader.WriteTo(localConn); err != nil {
  187. workConn.Close()
  188. xl.Errorf("write proxy protocol header to local conn error: %v", err)
  189. return
  190. }
  191. }
  192. _, _, errs := libio.Join(localConn, remote)
  193. xl.Debugf("join connections closed")
  194. if len(errs) > 0 {
  195. xl.Tracef("join connections errors: %v", errs)
  196. }
  197. if compressionResourceRecycleFn != nil {
  198. compressionResourceRecycleFn()
  199. }
  200. }