1
0

proxy.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "bytes"
  17. "context"
  18. "io"
  19. "net"
  20. "reflect"
  21. "strconv"
  22. "strings"
  23. "sync"
  24. "time"
  25. libio "github.com/fatedier/golib/io"
  26. libdial "github.com/fatedier/golib/net/dial"
  27. pp "github.com/pires/go-proxyproto"
  28. "golang.org/x/time/rate"
  29. "github.com/fatedier/frp/pkg/config"
  30. "github.com/fatedier/frp/pkg/msg"
  31. plugin "github.com/fatedier/frp/pkg/plugin/client"
  32. "github.com/fatedier/frp/pkg/transport"
  33. "github.com/fatedier/frp/pkg/util/limit"
  34. "github.com/fatedier/frp/pkg/util/xlog"
  35. )
  36. var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, config.ProxyConf) Proxy{}
  37. func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy, config.ProxyConf) Proxy) {
  38. proxyFactoryRegistry[proxyConfType] = factory
  39. }
  40. // Proxy defines how to handle work connections for different proxy type.
  41. type Proxy interface {
  42. Run() error
  43. // InWorkConn accept work connections registered to server.
  44. InWorkConn(net.Conn, *msg.StartWorkConn)
  45. Close()
  46. }
  47. func NewProxy(
  48. ctx context.Context,
  49. pxyConf config.ProxyConf,
  50. clientCfg config.ClientCommonConf,
  51. msgTransporter transport.MessageTransporter,
  52. ) (pxy Proxy) {
  53. var limiter *rate.Limiter
  54. limitBytes := pxyConf.GetBaseConfig().BandwidthLimit.Bytes()
  55. if limitBytes > 0 && pxyConf.GetBaseConfig().BandwidthLimitMode == config.BandwidthLimitModeClient {
  56. limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
  57. }
  58. baseProxy := BaseProxy{
  59. baseProxyConfig: pxyConf.GetBaseConfig(),
  60. clientCfg: clientCfg,
  61. limiter: limiter,
  62. msgTransporter: msgTransporter,
  63. xl: xlog.FromContextSafe(ctx),
  64. ctx: ctx,
  65. }
  66. factory := proxyFactoryRegistry[reflect.TypeOf(pxyConf)]
  67. if factory == nil {
  68. return nil
  69. }
  70. return factory(&baseProxy, pxyConf)
  71. }
  72. type BaseProxy struct {
  73. baseProxyConfig *config.BaseProxyConf
  74. clientCfg config.ClientCommonConf
  75. msgTransporter transport.MessageTransporter
  76. limiter *rate.Limiter
  77. // proxyPlugin is used to handle connections instead of dialing to local service.
  78. // It's only validate for TCP protocol now.
  79. proxyPlugin plugin.Plugin
  80. mu sync.RWMutex
  81. xl *xlog.Logger
  82. ctx context.Context
  83. }
  84. func (pxy *BaseProxy) Run() error {
  85. if pxy.baseProxyConfig.Plugin != "" {
  86. p, err := plugin.Create(pxy.baseProxyConfig.Plugin, pxy.baseProxyConfig.PluginParams)
  87. if err != nil {
  88. return err
  89. }
  90. pxy.proxyPlugin = p
  91. }
  92. return nil
  93. }
  94. func (pxy *BaseProxy) Close() {
  95. if pxy.proxyPlugin != nil {
  96. pxy.proxyPlugin.Close()
  97. }
  98. }
  99. func (pxy *BaseProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
  100. pxy.HandleTCPWorkConnection(conn, m, []byte(pxy.clientCfg.Token))
  101. }
  102. // Common handler for tcp work connections.
  103. func (pxy *BaseProxy) HandleTCPWorkConnection(workConn net.Conn, m *msg.StartWorkConn, encKey []byte) {
  104. xl := pxy.xl
  105. baseConfig := pxy.baseProxyConfig
  106. var (
  107. remote io.ReadWriteCloser
  108. err error
  109. )
  110. remote = workConn
  111. if pxy.limiter != nil {
  112. remote = libio.WrapReadWriteCloser(limit.NewReader(workConn, pxy.limiter), limit.NewWriter(workConn, pxy.limiter), func() error {
  113. return workConn.Close()
  114. })
  115. }
  116. xl.Trace("handle tcp work connection, use_encryption: %t, use_compression: %t",
  117. baseConfig.UseEncryption, baseConfig.UseCompression)
  118. if baseConfig.UseEncryption {
  119. remote, err = libio.WithEncryption(remote, encKey)
  120. if err != nil {
  121. workConn.Close()
  122. xl.Error("create encryption stream error: %v", err)
  123. return
  124. }
  125. }
  126. if baseConfig.UseCompression {
  127. remote = libio.WithCompression(remote)
  128. }
  129. // check if we need to send proxy protocol info
  130. var extraInfo []byte
  131. if baseConfig.ProxyProtocolVersion != "" {
  132. if m.SrcAddr != "" && m.SrcPort != 0 {
  133. if m.DstAddr == "" {
  134. m.DstAddr = "127.0.0.1"
  135. }
  136. srcAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.SrcAddr, strconv.Itoa(int(m.SrcPort))))
  137. dstAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.DstAddr, strconv.Itoa(int(m.DstPort))))
  138. h := &pp.Header{
  139. Command: pp.PROXY,
  140. SourceAddr: srcAddr,
  141. DestinationAddr: dstAddr,
  142. }
  143. if strings.Contains(m.SrcAddr, ".") {
  144. h.TransportProtocol = pp.TCPv4
  145. } else {
  146. h.TransportProtocol = pp.TCPv6
  147. }
  148. if baseConfig.ProxyProtocolVersion == "v1" {
  149. h.Version = 1
  150. } else if baseConfig.ProxyProtocolVersion == "v2" {
  151. h.Version = 2
  152. }
  153. buf := bytes.NewBuffer(nil)
  154. _, _ = h.WriteTo(buf)
  155. extraInfo = buf.Bytes()
  156. }
  157. }
  158. if pxy.proxyPlugin != nil {
  159. // if plugin is set, let plugin handle connection first
  160. xl.Debug("handle by plugin: %s", pxy.proxyPlugin.Name())
  161. pxy.proxyPlugin.Handle(remote, workConn, extraInfo)
  162. xl.Debug("handle by plugin finished")
  163. return
  164. }
  165. localConn, err := libdial.Dial(
  166. net.JoinHostPort(baseConfig.LocalIP, strconv.Itoa(baseConfig.LocalPort)),
  167. libdial.WithTimeout(10*time.Second),
  168. )
  169. if err != nil {
  170. workConn.Close()
  171. xl.Error("connect to local service [%s:%d] error: %v", baseConfig.LocalIP, baseConfig.LocalPort, err)
  172. return
  173. }
  174. xl.Debug("join connections, localConn(l[%s] r[%s]) workConn(l[%s] r[%s])", localConn.LocalAddr().String(),
  175. localConn.RemoteAddr().String(), workConn.LocalAddr().String(), workConn.RemoteAddr().String())
  176. if len(extraInfo) > 0 {
  177. if _, err := localConn.Write(extraInfo); err != nil {
  178. workConn.Close()
  179. xl.Error("write extraInfo to local conn error: %v", err)
  180. return
  181. }
  182. }
  183. _, _, errs := libio.Join(localConn, remote)
  184. xl.Debug("join connections closed")
  185. if len(errs) > 0 {
  186. xl.Trace("join connections errors: %v", errs)
  187. }
  188. }