proxy.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net"
  20. "strconv"
  21. "sync"
  22. "github.com/fatedier/frp/models/config"
  23. "github.com/fatedier/frp/models/msg"
  24. "github.com/fatedier/frp/server/controller"
  25. "github.com/fatedier/frp/server/stats"
  26. frpNet "github.com/fatedier/frp/utils/net"
  27. "github.com/fatedier/frp/utils/xlog"
  28. frpIo "github.com/fatedier/golib/io"
  29. )
  30. type GetWorkConnFn func() (net.Conn, error)
  31. type Proxy interface {
  32. Context() context.Context
  33. Run() (remoteAddr string, err error)
  34. GetName() string
  35. GetConf() config.ProxyConf
  36. GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error)
  37. GetUsedPortsNum() int
  38. Close()
  39. }
  40. type BaseProxy struct {
  41. name string
  42. rc *controller.ResourceController
  43. statsCollector stats.Collector
  44. listeners []net.Listener
  45. usedPortsNum int
  46. poolCount int
  47. getWorkConnFn GetWorkConnFn
  48. serverCfg config.ServerCommonConf
  49. mu sync.RWMutex
  50. xl *xlog.Logger
  51. ctx context.Context
  52. }
  53. func (pxy *BaseProxy) GetName() string {
  54. return pxy.name
  55. }
  56. func (pxy *BaseProxy) Context() context.Context {
  57. return pxy.ctx
  58. }
  59. func (pxy *BaseProxy) GetUsedPortsNum() int {
  60. return pxy.usedPortsNum
  61. }
  62. func (pxy *BaseProxy) Close() {
  63. xl := xlog.FromContextSafe(pxy.ctx)
  64. xl.Info("proxy closing")
  65. for _, l := range pxy.listeners {
  66. l.Close()
  67. }
  68. }
  69. // GetWorkConnFromPool try to get a new work connections from pool
  70. // for quickly response, we immediately send the StartWorkConn message to frpc after take out one from pool
  71. func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error) {
  72. xl := xlog.FromContextSafe(pxy.ctx)
  73. // try all connections from the pool
  74. for i := 0; i < pxy.poolCount+1; i++ {
  75. if workConn, err = pxy.getWorkConnFn(); err != nil {
  76. xl.Warn("failed to get work connection: %v", err)
  77. return
  78. }
  79. xl.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
  80. xl.Spawn().AppendPrefix(pxy.GetName())
  81. workConn = frpNet.NewContextConn(workConn, pxy.ctx)
  82. var (
  83. srcAddr string
  84. dstAddr string
  85. srcPortStr string
  86. dstPortStr string
  87. srcPort int
  88. dstPort int
  89. )
  90. if src != nil {
  91. srcAddr, srcPortStr, _ = net.SplitHostPort(src.String())
  92. srcPort, _ = strconv.Atoi(srcPortStr)
  93. }
  94. if dst != nil {
  95. dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String())
  96. dstPort, _ = strconv.Atoi(dstPortStr)
  97. }
  98. err := msg.WriteMsg(workConn, &msg.StartWorkConn{
  99. ProxyName: pxy.GetName(),
  100. SrcAddr: srcAddr,
  101. SrcPort: uint16(srcPort),
  102. DstAddr: dstAddr,
  103. DstPort: uint16(dstPort),
  104. Error: "",
  105. })
  106. if err != nil {
  107. xl.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
  108. workConn.Close()
  109. } else {
  110. break
  111. }
  112. }
  113. if err != nil {
  114. xl.Error("try to get work connection failed in the end")
  115. return
  116. }
  117. return
  118. }
  119. // startListenHandler start a goroutine handler for each listener.
  120. // p: p will just be passed to handler(Proxy, frpNet.Conn).
  121. // handler: each proxy type can set different handler function to deal with connections accepted from listeners.
  122. func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, stats.Collector, config.ServerCommonConf)) {
  123. xl := xlog.FromContextSafe(pxy.ctx)
  124. for _, listener := range pxy.listeners {
  125. go func(l net.Listener) {
  126. for {
  127. // block
  128. // if listener is closed, err returned
  129. c, err := l.Accept()
  130. if err != nil {
  131. xl.Info("listener is closed")
  132. return
  133. }
  134. xl.Debug("get a user connection [%s]", c.RemoteAddr().String())
  135. go handler(p, c, pxy.statsCollector, pxy.serverCfg)
  136. }
  137. }(listener)
  138. }
  139. }
  140. func NewProxy(ctx context.Context, runId string, rc *controller.ResourceController, statsCollector stats.Collector, poolCount int,
  141. getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf) (pxy Proxy, err error) {
  142. xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName)
  143. basePxy := BaseProxy{
  144. name: pxyConf.GetBaseInfo().ProxyName,
  145. rc: rc,
  146. statsCollector: statsCollector,
  147. listeners: make([]net.Listener, 0),
  148. poolCount: poolCount,
  149. getWorkConnFn: getWorkConnFn,
  150. serverCfg: serverCfg,
  151. xl: xl,
  152. ctx: xlog.NewContext(ctx, xl),
  153. }
  154. switch cfg := pxyConf.(type) {
  155. case *config.TcpProxyConf:
  156. basePxy.usedPortsNum = 1
  157. pxy = &TcpProxy{
  158. BaseProxy: &basePxy,
  159. cfg: cfg,
  160. }
  161. case *config.TcpMuxProxyConf:
  162. pxy = &TcpMuxProxy{
  163. BaseProxy: &basePxy,
  164. cfg: cfg,
  165. }
  166. case *config.HttpProxyConf:
  167. pxy = &HttpProxy{
  168. BaseProxy: &basePxy,
  169. cfg: cfg,
  170. }
  171. case *config.HttpsProxyConf:
  172. pxy = &HttpsProxy{
  173. BaseProxy: &basePxy,
  174. cfg: cfg,
  175. }
  176. case *config.UdpProxyConf:
  177. basePxy.usedPortsNum = 1
  178. pxy = &UdpProxy{
  179. BaseProxy: &basePxy,
  180. cfg: cfg,
  181. }
  182. case *config.StcpProxyConf:
  183. pxy = &StcpProxy{
  184. BaseProxy: &basePxy,
  185. cfg: cfg,
  186. }
  187. case *config.XtcpProxyConf:
  188. pxy = &XtcpProxy{
  189. BaseProxy: &basePxy,
  190. cfg: cfg,
  191. }
  192. default:
  193. return pxy, fmt.Errorf("proxy type not support")
  194. }
  195. return
  196. }
  197. // HandleUserTcpConnection is used for incoming tcp user connections.
  198. // It can be used for tcp, http, https type.
  199. func HandleUserTcpConnection(pxy Proxy, userConn net.Conn, statsCollector stats.Collector, serverCfg config.ServerCommonConf) {
  200. xl := xlog.FromContextSafe(pxy.Context())
  201. defer userConn.Close()
  202. // try all connections from the pool
  203. workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
  204. if err != nil {
  205. return
  206. }
  207. defer workConn.Close()
  208. var local io.ReadWriteCloser = workConn
  209. cfg := pxy.GetConf().GetBaseInfo()
  210. xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
  211. if cfg.UseEncryption {
  212. local, err = frpIo.WithEncryption(local, []byte(serverCfg.Token))
  213. if err != nil {
  214. xl.Error("create encryption stream error: %v", err)
  215. return
  216. }
  217. }
  218. if cfg.UseCompression {
  219. local = frpIo.WithCompression(local)
  220. }
  221. xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
  222. workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
  223. statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()})
  224. inCount, outCount := frpIo.Join(local, userConn)
  225. statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()})
  226. statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
  227. ProxyName: pxy.GetName(),
  228. TrafficBytes: inCount,
  229. })
  230. statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
  231. ProxyName: pxy.GetName(),
  232. TrafficBytes: outCount,
  233. })
  234. xl.Debug("join connections closed")
  235. }
  236. type ProxyManager struct {
  237. // proxies indexed by proxy name
  238. pxys map[string]Proxy
  239. mu sync.RWMutex
  240. }
  241. func NewProxyManager() *ProxyManager {
  242. return &ProxyManager{
  243. pxys: make(map[string]Proxy),
  244. }
  245. }
  246. func (pm *ProxyManager) Add(name string, pxy Proxy) error {
  247. pm.mu.Lock()
  248. defer pm.mu.Unlock()
  249. if _, ok := pm.pxys[name]; ok {
  250. return fmt.Errorf("proxy name [%s] is already in use", name)
  251. }
  252. pm.pxys[name] = pxy
  253. return nil
  254. }
  255. func (pm *ProxyManager) Del(name string) {
  256. pm.mu.Lock()
  257. defer pm.mu.Unlock()
  258. delete(pm.pxys, name)
  259. }
  260. func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) {
  261. pm.mu.RLock()
  262. defer pm.mu.RUnlock()
  263. pxy, ok = pm.pxys[name]
  264. return
  265. }