proxy.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net"
  20. "strconv"
  21. "sync"
  22. "time"
  23. frpIo "github.com/fatedier/golib/io"
  24. "golang.org/x/time/rate"
  25. "github.com/fatedier/frp/pkg/config"
  26. "github.com/fatedier/frp/pkg/msg"
  27. plugin "github.com/fatedier/frp/pkg/plugin/server"
  28. "github.com/fatedier/frp/pkg/util/limit"
  29. frpNet "github.com/fatedier/frp/pkg/util/net"
  30. "github.com/fatedier/frp/pkg/util/xlog"
  31. "github.com/fatedier/frp/server/controller"
  32. "github.com/fatedier/frp/server/metrics"
  33. )
  34. type GetWorkConnFn func() (net.Conn, error)
  35. type Proxy interface {
  36. Context() context.Context
  37. Run() (remoteAddr string, err error)
  38. GetName() string
  39. GetConf() config.ProxyConf
  40. GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error)
  41. GetUsedPortsNum() int
  42. GetResourceController() *controller.ResourceController
  43. GetUserInfo() plugin.UserInfo
  44. GetLimiter() *rate.Limiter
  45. GetLoginMsg() *msg.Login
  46. Close()
  47. }
  48. type BaseProxy struct {
  49. name string
  50. rc *controller.ResourceController
  51. listeners []net.Listener
  52. usedPortsNum int
  53. poolCount int
  54. getWorkConnFn GetWorkConnFn
  55. serverCfg config.ServerCommonConf
  56. limiter *rate.Limiter
  57. userInfo plugin.UserInfo
  58. loginMsg *msg.Login
  59. mu sync.RWMutex
  60. xl *xlog.Logger
  61. ctx context.Context
  62. }
  63. func (pxy *BaseProxy) GetName() string {
  64. return pxy.name
  65. }
  66. func (pxy *BaseProxy) Context() context.Context {
  67. return pxy.ctx
  68. }
  69. func (pxy *BaseProxy) GetUsedPortsNum() int {
  70. return pxy.usedPortsNum
  71. }
  72. func (pxy *BaseProxy) GetResourceController() *controller.ResourceController {
  73. return pxy.rc
  74. }
  75. func (pxy *BaseProxy) GetUserInfo() plugin.UserInfo {
  76. return pxy.userInfo
  77. }
  78. func (pxy *BaseProxy) GetLoginMsg() *msg.Login {
  79. return pxy.loginMsg
  80. }
  81. func (pxy *BaseProxy) Close() {
  82. xl := xlog.FromContextSafe(pxy.ctx)
  83. xl.Info("proxy closing")
  84. for _, l := range pxy.listeners {
  85. l.Close()
  86. }
  87. }
  88. // GetWorkConnFromPool try to get a new work connections from pool
  89. // for quickly response, we immediately send the StartWorkConn message to frpc after take out one from pool
  90. func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error) {
  91. xl := xlog.FromContextSafe(pxy.ctx)
  92. // try all connections from the pool
  93. for i := 0; i < pxy.poolCount+1; i++ {
  94. if workConn, err = pxy.getWorkConnFn(); err != nil {
  95. xl.Warn("failed to get work connection: %v", err)
  96. return
  97. }
  98. xl.Debug("get a new work connection: [%s]", workConn.RemoteAddr().String())
  99. xl.Spawn().AppendPrefix(pxy.GetName())
  100. workConn = frpNet.NewContextConn(pxy.ctx, workConn)
  101. var (
  102. srcAddr string
  103. dstAddr string
  104. srcPortStr string
  105. dstPortStr string
  106. srcPort int
  107. dstPort int
  108. )
  109. if src != nil {
  110. srcAddr, srcPortStr, _ = net.SplitHostPort(src.String())
  111. srcPort, _ = strconv.Atoi(srcPortStr)
  112. }
  113. if dst != nil {
  114. dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String())
  115. dstPort, _ = strconv.Atoi(dstPortStr)
  116. }
  117. err := msg.WriteMsg(workConn, &msg.StartWorkConn{
  118. ProxyName: pxy.GetName(),
  119. SrcAddr: srcAddr,
  120. SrcPort: uint16(srcPort),
  121. DstAddr: dstAddr,
  122. DstPort: uint16(dstPort),
  123. Error: "",
  124. })
  125. if err != nil {
  126. xl.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
  127. workConn.Close()
  128. } else {
  129. break
  130. }
  131. }
  132. if err != nil {
  133. xl.Error("try to get work connection failed in the end")
  134. return
  135. }
  136. return
  137. }
  138. // startListenHandler start a goroutine handler for each listener.
  139. // p: p will just be passed to handler(Proxy, frpNet.Conn).
  140. // handler: each proxy type can set different handler function to deal with connections accepted from listeners.
  141. func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, config.ServerCommonConf)) {
  142. xl := xlog.FromContextSafe(pxy.ctx)
  143. for _, listener := range pxy.listeners {
  144. go func(l net.Listener) {
  145. var tempDelay time.Duration // how long to sleep on accept failure
  146. for {
  147. // block
  148. // if listener is closed, err returned
  149. c, err := l.Accept()
  150. if err != nil {
  151. if err, ok := err.(interface{ Temporary() bool }); ok && err.Temporary() {
  152. if tempDelay == 0 {
  153. tempDelay = 5 * time.Millisecond
  154. } else {
  155. tempDelay *= 2
  156. }
  157. if max := 1 * time.Second; tempDelay > max {
  158. tempDelay = max
  159. }
  160. xl.Info("met temporary error: %s, sleep for %s ...", err, tempDelay)
  161. time.Sleep(tempDelay)
  162. continue
  163. }
  164. xl.Warn("listener is closed: %s", err)
  165. return
  166. }
  167. xl.Info("get a user connection [%s]", c.RemoteAddr().String())
  168. go handler(p, c, pxy.serverCfg)
  169. }
  170. }(listener)
  171. }
  172. }
  173. func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.ResourceController, poolCount int,
  174. getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf, loginMsg *msg.Login,
  175. ) (pxy Proxy, err error) {
  176. xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseInfo().ProxyName)
  177. var limiter *rate.Limiter
  178. limitBytes := pxyConf.GetBaseInfo().BandwidthLimit.Bytes()
  179. if limitBytes > 0 && pxyConf.GetBaseInfo().BandwidthLimitMode == config.BandwidthLimitModeServer {
  180. limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
  181. }
  182. basePxy := BaseProxy{
  183. name: pxyConf.GetBaseInfo().ProxyName,
  184. rc: rc,
  185. listeners: make([]net.Listener, 0),
  186. poolCount: poolCount,
  187. getWorkConnFn: getWorkConnFn,
  188. serverCfg: serverCfg,
  189. limiter: limiter,
  190. xl: xl,
  191. ctx: xlog.NewContext(ctx, xl),
  192. userInfo: userInfo,
  193. loginMsg: loginMsg,
  194. }
  195. switch cfg := pxyConf.(type) {
  196. case *config.TCPProxyConf:
  197. basePxy.usedPortsNum = 1
  198. pxy = &TCPProxy{
  199. BaseProxy: &basePxy,
  200. cfg: cfg,
  201. }
  202. case *config.TCPMuxProxyConf:
  203. pxy = &TCPMuxProxy{
  204. BaseProxy: &basePxy,
  205. cfg: cfg,
  206. }
  207. case *config.HTTPProxyConf:
  208. pxy = &HTTPProxy{
  209. BaseProxy: &basePxy,
  210. cfg: cfg,
  211. }
  212. case *config.HTTPSProxyConf:
  213. pxy = &HTTPSProxy{
  214. BaseProxy: &basePxy,
  215. cfg: cfg,
  216. }
  217. case *config.UDPProxyConf:
  218. basePxy.usedPortsNum = 1
  219. pxy = &UDPProxy{
  220. BaseProxy: &basePxy,
  221. cfg: cfg,
  222. }
  223. case *config.STCPProxyConf:
  224. pxy = &STCPProxy{
  225. BaseProxy: &basePxy,
  226. cfg: cfg,
  227. }
  228. case *config.XTCPProxyConf:
  229. pxy = &XTCPProxy{
  230. BaseProxy: &basePxy,
  231. cfg: cfg,
  232. }
  233. case *config.SUDPProxyConf:
  234. pxy = &SUDPProxy{
  235. BaseProxy: &basePxy,
  236. cfg: cfg,
  237. }
  238. default:
  239. return pxy, fmt.Errorf("proxy type not support")
  240. }
  241. return
  242. }
  243. // HandleUserTCPConnection is used for incoming user TCP connections.
  244. // It can be used for tcp, http, https type.
  245. func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.ServerCommonConf) {
  246. xl := xlog.FromContextSafe(pxy.Context())
  247. defer userConn.Close()
  248. // server plugin hook
  249. rc := pxy.GetResourceController()
  250. content := &plugin.NewUserConnContent{
  251. User: pxy.GetUserInfo(),
  252. ProxyName: pxy.GetName(),
  253. ProxyType: pxy.GetConf().GetBaseInfo().ProxyType,
  254. RemoteAddr: userConn.RemoteAddr().String(),
  255. }
  256. _, err := rc.PluginManager.NewUserConn(content)
  257. if err != nil {
  258. xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
  259. return
  260. }
  261. // try all connections from the pool
  262. workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
  263. if err != nil {
  264. return
  265. }
  266. defer workConn.Close()
  267. var local io.ReadWriteCloser = workConn
  268. cfg := pxy.GetConf().GetBaseInfo()
  269. xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
  270. if cfg.UseEncryption {
  271. local, err = frpIo.WithEncryption(local, []byte(serverCfg.Token))
  272. if err != nil {
  273. xl.Error("create encryption stream error: %v", err)
  274. return
  275. }
  276. }
  277. if cfg.UseCompression {
  278. local = frpIo.WithCompression(local)
  279. }
  280. if pxy.GetLimiter() != nil {
  281. local = frpIo.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
  282. return local.Close()
  283. })
  284. }
  285. xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
  286. workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
  287. name := pxy.GetName()
  288. proxyType := pxy.GetConf().GetBaseInfo().ProxyType
  289. metrics.Server.OpenConnection(name, proxyType)
  290. inCount, outCount, _ := frpIo.Join(local, userConn)
  291. metrics.Server.CloseConnection(name, proxyType)
  292. metrics.Server.AddTrafficIn(name, proxyType, inCount)
  293. metrics.Server.AddTrafficOut(name, proxyType, outCount)
  294. xl.Debug("join connections closed")
  295. }
  296. type Manager struct {
  297. // proxies indexed by proxy name
  298. pxys map[string]Proxy
  299. mu sync.RWMutex
  300. }
  301. func NewManager() *Manager {
  302. return &Manager{
  303. pxys: make(map[string]Proxy),
  304. }
  305. }
  306. func (pm *Manager) Add(name string, pxy Proxy) error {
  307. pm.mu.Lock()
  308. defer pm.mu.Unlock()
  309. if _, ok := pm.pxys[name]; ok {
  310. return fmt.Errorf("proxy name [%s] is already in use", name)
  311. }
  312. pm.pxys[name] = pxy
  313. return nil
  314. }
  315. func (pm *Manager) Del(name string) {
  316. pm.mu.Lock()
  317. defer pm.mu.Unlock()
  318. delete(pm.pxys, name)
  319. }
  320. func (pm *Manager) GetByName(name string) (pxy Proxy, ok bool) {
  321. pm.mu.RLock()
  322. defer pm.mu.RUnlock()
  323. pxy, ok = pm.pxys[name]
  324. return
  325. }