proxy.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net"
  20. "strconv"
  21. "sync"
  22. "github.com/fatedier/frp/models/config"
  23. "github.com/fatedier/frp/models/msg"
  24. plugin "github.com/fatedier/frp/models/plugin/server"
  25. "github.com/fatedier/frp/server/controller"
  26. "github.com/fatedier/frp/server/metrics"
  27. frpNet "github.com/fatedier/frp/utils/net"
  28. "github.com/fatedier/frp/utils/xlog"
  29. frpIo "github.com/fatedier/golib/io"
  30. )
  31. type GetWorkConnFn func() (net.Conn, error)
  32. type Proxy interface {
  33. Context() context.Context
  34. Run() (remoteAddr string, err error)
  35. GetName() string
  36. GetConf() config.ProxyConf
  37. GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error)
  38. GetUsedPortsNum() int
  39. GetResourceController() *controller.ResourceController
  40. GetUserInfo() plugin.UserInfo
  41. Close()
  42. }
  43. type BaseProxy struct {
  44. name string
  45. rc *controller.ResourceController
  46. listeners []net.Listener
  47. usedPortsNum int
  48. poolCount int
  49. getWorkConnFn GetWorkConnFn
  50. serverCfg config.ServerCommonConf
  51. userInfo plugin.UserInfo
  52. mu sync.RWMutex
  53. xl *xlog.Logger
  54. ctx context.Context
  55. }
  56. func (pxy *BaseProxy) GetName() string {
  57. return pxy.name
  58. }
  59. func (pxy *BaseProxy) Context() context.Context {
  60. return pxy.ctx
  61. }
  62. func (pxy *BaseProxy) GetUsedPortsNum() int {
  63. return pxy.usedPortsNum
  64. }
  65. func (pxy *BaseProxy) GetResourceController() *controller.ResourceController {
  66. return pxy.rc
  67. }
  68. func (pxy *BaseProxy) GetUserInfo() plugin.UserInfo {
  69. return pxy.userInfo
  70. }
  71. func (pxy *BaseProxy) Close() {
  72. xl := xlog.FromContextSafe(pxy.ctx)
  73. xl.Info("proxy closing")
  74. for _, l := range pxy.listeners {
  75. l.Close()
  76. }
  77. }
  78. // GetWorkConnFromPool try to get a new work connections from pool
  79. // for quickly response, we immediately send the StartWorkConn message to frpc after take out one from pool
  80. func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error) {
  81. xl := xlog.FromContextSafe(pxy.ctx)
  82. // try all connections from the pool
  83. for i := 0; i < pxy.poolCount+1; i++ {
  84. if workConn, err = pxy.getWorkConnFn(); err != nil {
  85. xl.Warn("failed to get work connection: %v", err)
  86. return
  87. }
  88. xl.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
  89. xl.Spawn().AppendPrefix(pxy.GetName())
  90. workConn = frpNet.NewContextConn(pxy.ctx, workConn)
  91. var (
  92. srcAddr string
  93. dstAddr string
  94. srcPortStr string
  95. dstPortStr string
  96. srcPort int
  97. dstPort int
  98. )
  99. if src != nil {
  100. srcAddr, srcPortStr, _ = net.SplitHostPort(src.String())
  101. srcPort, _ = strconv.Atoi(srcPortStr)
  102. }
  103. if dst != nil {
  104. dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String())
  105. dstPort, _ = strconv.Atoi(dstPortStr)
  106. }
  107. err := msg.WriteMsg(workConn, &msg.StartWorkConn{
  108. ProxyName: pxy.GetName(),
  109. SrcAddr: srcAddr,
  110. SrcPort: uint16(srcPort),
  111. DstAddr: dstAddr,
  112. DstPort: uint16(dstPort),
  113. Error: "",
  114. })
  115. if err != nil {
  116. xl.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
  117. workConn.Close()
  118. } else {
  119. break
  120. }
  121. }
  122. if err != nil {
  123. xl.Error("try to get work connection failed in the end")
  124. return
  125. }
  126. return
  127. }
  128. // startListenHandler start a goroutine handler for each listener.
  129. // p: p will just be passed to handler(Proxy, frpNet.Conn).
  130. // handler: each proxy type can set different handler function to deal with connections accepted from listeners.
  131. func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, config.ServerCommonConf)) {
  132. xl := xlog.FromContextSafe(pxy.ctx)
  133. for _, listener := range pxy.listeners {
  134. go func(l net.Listener) {
  135. for {
  136. // block
  137. // if listener is closed, err returned
  138. c, err := l.Accept()
  139. if err != nil {
  140. xl.Info("listener is closed")
  141. return
  142. }
  143. xl.Debug("get a user connection [%s]", c.RemoteAddr().String())
  144. go handler(p, c, pxy.serverCfg)
  145. }
  146. }(listener)
  147. }
  148. }
  149. func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.ResourceController, poolCount int,
  150. getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf) (pxy Proxy, err error) {
  151. xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName)
  152. basePxy := BaseProxy{
  153. name: pxyConf.GetBaseInfo().ProxyName,
  154. rc: rc,
  155. listeners: make([]net.Listener, 0),
  156. poolCount: poolCount,
  157. getWorkConnFn: getWorkConnFn,
  158. serverCfg: serverCfg,
  159. xl: xl,
  160. ctx: xlog.NewContext(ctx, xl),
  161. userInfo: userInfo,
  162. }
  163. switch cfg := pxyConf.(type) {
  164. case *config.TCPProxyConf:
  165. basePxy.usedPortsNum = 1
  166. pxy = &TCPProxy{
  167. BaseProxy: &basePxy,
  168. cfg: cfg,
  169. }
  170. case *config.TCPMuxProxyConf:
  171. pxy = &TCPMuxProxy{
  172. BaseProxy: &basePxy,
  173. cfg: cfg,
  174. }
  175. case *config.HTTPProxyConf:
  176. pxy = &HTTPProxy{
  177. BaseProxy: &basePxy,
  178. cfg: cfg,
  179. }
  180. case *config.HTTPSProxyConf:
  181. pxy = &HTTPSProxy{
  182. BaseProxy: &basePxy,
  183. cfg: cfg,
  184. }
  185. case *config.UDPProxyConf:
  186. basePxy.usedPortsNum = 1
  187. pxy = &UDPProxy{
  188. BaseProxy: &basePxy,
  189. cfg: cfg,
  190. }
  191. case *config.STCPProxyConf:
  192. pxy = &STCPProxy{
  193. BaseProxy: &basePxy,
  194. cfg: cfg,
  195. }
  196. case *config.XTCPProxyConf:
  197. pxy = &XTCPProxy{
  198. BaseProxy: &basePxy,
  199. cfg: cfg,
  200. }
  201. case *config.SUDPProxyConf:
  202. pxy = &SUDPProxy{
  203. BaseProxy: &basePxy,
  204. cfg: cfg,
  205. }
  206. default:
  207. return pxy, fmt.Errorf("proxy type not support")
  208. }
  209. return
  210. }
  211. // HandleUserTCPConnection is used for incoming user TCP connections.
  212. // It can be used for tcp, http, https type.
  213. func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.ServerCommonConf) {
  214. xl := xlog.FromContextSafe(pxy.Context())
  215. defer userConn.Close()
  216. // server plugin hook
  217. rc := pxy.GetResourceController()
  218. content := &plugin.NewUserConnContent{
  219. User: pxy.GetUserInfo(),
  220. ProxyName: pxy.GetName(),
  221. ProxyType: pxy.GetConf().GetBaseInfo().ProxyType,
  222. RemoteAddr: userConn.RemoteAddr().String(),
  223. }
  224. _, err := rc.PluginManager.NewUserConn(content)
  225. if err != nil {
  226. xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
  227. return
  228. }
  229. // try all connections from the pool
  230. workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
  231. if err != nil {
  232. return
  233. }
  234. defer workConn.Close()
  235. var local io.ReadWriteCloser = workConn
  236. cfg := pxy.GetConf().GetBaseInfo()
  237. xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
  238. if cfg.UseEncryption {
  239. local, err = frpIo.WithEncryption(local, []byte(serverCfg.Token))
  240. if err != nil {
  241. xl.Error("create encryption stream error: %v", err)
  242. return
  243. }
  244. }
  245. if cfg.UseCompression {
  246. local = frpIo.WithCompression(local)
  247. }
  248. xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
  249. workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
  250. name := pxy.GetName()
  251. proxyType := pxy.GetConf().GetBaseInfo().ProxyType
  252. metrics.Server.OpenConnection(name, proxyType)
  253. inCount, outCount := frpIo.Join(local, userConn)
  254. metrics.Server.CloseConnection(name, proxyType)
  255. metrics.Server.AddTrafficIn(name, proxyType, inCount)
  256. metrics.Server.AddTrafficOut(name, proxyType, outCount)
  257. xl.Debug("join connections closed")
  258. }
  259. type Manager struct {
  260. // proxies indexed by proxy name
  261. pxys map[string]Proxy
  262. mu sync.RWMutex
  263. }
  264. func NewManager() *Manager {
  265. return &Manager{
  266. pxys: make(map[string]Proxy),
  267. }
  268. }
  269. func (pm *Manager) Add(name string, pxy Proxy) error {
  270. pm.mu.Lock()
  271. defer pm.mu.Unlock()
  272. if _, ok := pm.pxys[name]; ok {
  273. return fmt.Errorf("proxy name [%s] is already in use", name)
  274. }
  275. pm.pxys[name] = pxy
  276. return nil
  277. }
  278. func (pm *Manager) Del(name string) {
  279. pm.mu.Lock()
  280. defer pm.mu.Unlock()
  281. delete(pm.pxys, name)
  282. }
  283. func (pm *Manager) GetByName(name string) (pxy Proxy, ok bool) {
  284. pm.mu.RLock()
  285. defer pm.mu.RUnlock()
  286. pxy, ok = pm.pxys[name]
  287. return
  288. }