1
0

proxy.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net"
  20. "reflect"
  21. "strconv"
  22. "sync"
  23. "time"
  24. libio "github.com/fatedier/golib/io"
  25. "golang.org/x/time/rate"
  26. "github.com/fatedier/frp/pkg/config"
  27. "github.com/fatedier/frp/pkg/msg"
  28. plugin "github.com/fatedier/frp/pkg/plugin/server"
  29. "github.com/fatedier/frp/pkg/util/limit"
  30. utilnet "github.com/fatedier/frp/pkg/util/net"
  31. "github.com/fatedier/frp/pkg/util/xlog"
  32. "github.com/fatedier/frp/server/controller"
  33. "github.com/fatedier/frp/server/metrics"
  34. )
  35. var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, config.ProxyConf) Proxy{}
  36. func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy, config.ProxyConf) Proxy) {
  37. proxyFactoryRegistry[proxyConfType] = factory
  38. }
  39. type GetWorkConnFn func() (net.Conn, error)
  40. type Proxy interface {
  41. Context() context.Context
  42. Run() (remoteAddr string, err error)
  43. GetName() string
  44. GetConf() config.ProxyConf
  45. GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error)
  46. GetUsedPortsNum() int
  47. GetResourceController() *controller.ResourceController
  48. GetUserInfo() plugin.UserInfo
  49. GetLimiter() *rate.Limiter
  50. GetLoginMsg() *msg.Login
  51. Close()
  52. }
  53. type BaseProxy struct {
  54. name string
  55. rc *controller.ResourceController
  56. listeners []net.Listener
  57. usedPortsNum int
  58. poolCount int
  59. getWorkConnFn GetWorkConnFn
  60. serverCfg config.ServerCommonConf
  61. limiter *rate.Limiter
  62. userInfo plugin.UserInfo
  63. loginMsg *msg.Login
  64. pxyConf config.ProxyConf
  65. mu sync.RWMutex
  66. xl *xlog.Logger
  67. ctx context.Context
  68. }
  69. func (pxy *BaseProxy) GetName() string {
  70. return pxy.name
  71. }
  72. func (pxy *BaseProxy) Context() context.Context {
  73. return pxy.ctx
  74. }
  75. func (pxy *BaseProxy) GetUsedPortsNum() int {
  76. return pxy.usedPortsNum
  77. }
  78. func (pxy *BaseProxy) GetResourceController() *controller.ResourceController {
  79. return pxy.rc
  80. }
  81. func (pxy *BaseProxy) GetUserInfo() plugin.UserInfo {
  82. return pxy.userInfo
  83. }
  84. func (pxy *BaseProxy) GetLoginMsg() *msg.Login {
  85. return pxy.loginMsg
  86. }
  87. func (pxy *BaseProxy) GetLimiter() *rate.Limiter {
  88. return pxy.limiter
  89. }
  90. func (pxy *BaseProxy) Close() {
  91. xl := xlog.FromContextSafe(pxy.ctx)
  92. xl.Info("proxy closing")
  93. for _, l := range pxy.listeners {
  94. l.Close()
  95. }
  96. }
  97. // GetWorkConnFromPool try to get a new work connections from pool
  98. // for quickly response, we immediately send the StartWorkConn message to frpc after take out one from pool
  99. func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error) {
  100. xl := xlog.FromContextSafe(pxy.ctx)
  101. // try all connections from the pool
  102. for i := 0; i < pxy.poolCount+1; i++ {
  103. if workConn, err = pxy.getWorkConnFn(); err != nil {
  104. xl.Warn("failed to get work connection: %v", err)
  105. return
  106. }
  107. xl.Debug("get a new work connection: [%s]", workConn.RemoteAddr().String())
  108. xl.Spawn().AppendPrefix(pxy.GetName())
  109. workConn = utilnet.NewContextConn(pxy.ctx, workConn)
  110. var (
  111. srcAddr string
  112. dstAddr string
  113. srcPortStr string
  114. dstPortStr string
  115. srcPort int
  116. dstPort int
  117. )
  118. if src != nil {
  119. srcAddr, srcPortStr, _ = net.SplitHostPort(src.String())
  120. srcPort, _ = strconv.Atoi(srcPortStr)
  121. }
  122. if dst != nil {
  123. dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String())
  124. dstPort, _ = strconv.Atoi(dstPortStr)
  125. }
  126. err := msg.WriteMsg(workConn, &msg.StartWorkConn{
  127. ProxyName: pxy.GetName(),
  128. SrcAddr: srcAddr,
  129. SrcPort: uint16(srcPort),
  130. DstAddr: dstAddr,
  131. DstPort: uint16(dstPort),
  132. Error: "",
  133. })
  134. if err != nil {
  135. xl.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
  136. workConn.Close()
  137. } else {
  138. break
  139. }
  140. }
  141. if err != nil {
  142. xl.Error("try to get work connection failed in the end")
  143. return
  144. }
  145. return
  146. }
  147. // startCommonTCPListenersHandler start a goroutine handler for each listener.
  148. func (pxy *BaseProxy) startCommonTCPListenersHandler() {
  149. xl := xlog.FromContextSafe(pxy.ctx)
  150. for _, listener := range pxy.listeners {
  151. go func(l net.Listener) {
  152. var tempDelay time.Duration // how long to sleep on accept failure
  153. for {
  154. // block
  155. // if listener is closed, err returned
  156. c, err := l.Accept()
  157. if err != nil {
  158. if err, ok := err.(interface{ Temporary() bool }); ok && err.Temporary() {
  159. if tempDelay == 0 {
  160. tempDelay = 5 * time.Millisecond
  161. } else {
  162. tempDelay *= 2
  163. }
  164. if max := 1 * time.Second; tempDelay > max {
  165. tempDelay = max
  166. }
  167. xl.Info("met temporary error: %s, sleep for %s ...", err, tempDelay)
  168. time.Sleep(tempDelay)
  169. continue
  170. }
  171. xl.Warn("listener is closed: %s", err)
  172. return
  173. }
  174. xl.Info("get a user connection [%s]", c.RemoteAddr().String())
  175. go pxy.handleUserTCPConnection(c)
  176. }
  177. }(listener)
  178. }
  179. }
  180. // HandleUserTCPConnection is used for incoming user TCP connections.
  181. func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) {
  182. xl := xlog.FromContextSafe(pxy.Context())
  183. defer userConn.Close()
  184. serverCfg := pxy.serverCfg
  185. cfg := pxy.pxyConf.GetBaseConfig()
  186. // server plugin hook
  187. rc := pxy.GetResourceController()
  188. content := &plugin.NewUserConnContent{
  189. User: pxy.GetUserInfo(),
  190. ProxyName: pxy.GetName(),
  191. ProxyType: cfg.ProxyType,
  192. RemoteAddr: userConn.RemoteAddr().String(),
  193. }
  194. _, err := rc.PluginManager.NewUserConn(content)
  195. if err != nil {
  196. xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
  197. return
  198. }
  199. // try all connections from the pool
  200. workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
  201. if err != nil {
  202. return
  203. }
  204. defer workConn.Close()
  205. var local io.ReadWriteCloser = workConn
  206. xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
  207. if cfg.UseEncryption {
  208. local, err = libio.WithEncryption(local, []byte(serverCfg.Token))
  209. if err != nil {
  210. xl.Error("create encryption stream error: %v", err)
  211. return
  212. }
  213. }
  214. if cfg.UseCompression {
  215. local = libio.WithCompression(local)
  216. }
  217. if pxy.GetLimiter() != nil {
  218. local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
  219. return local.Close()
  220. })
  221. }
  222. xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
  223. workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
  224. name := pxy.GetName()
  225. proxyType := cfg.ProxyType
  226. metrics.Server.OpenConnection(name, proxyType)
  227. inCount, outCount, _ := libio.Join(local, userConn)
  228. metrics.Server.CloseConnection(name, proxyType)
  229. metrics.Server.AddTrafficIn(name, proxyType, inCount)
  230. metrics.Server.AddTrafficOut(name, proxyType, outCount)
  231. xl.Debug("join connections closed")
  232. }
  233. func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.ResourceController, poolCount int,
  234. getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf, loginMsg *msg.Login,
  235. ) (pxy Proxy, err error) {
  236. xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(pxyConf.GetBaseConfig().ProxyName)
  237. var limiter *rate.Limiter
  238. limitBytes := pxyConf.GetBaseConfig().BandwidthLimit.Bytes()
  239. if limitBytes > 0 && pxyConf.GetBaseConfig().BandwidthLimitMode == config.BandwidthLimitModeServer {
  240. limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
  241. }
  242. basePxy := BaseProxy{
  243. name: pxyConf.GetBaseConfig().ProxyName,
  244. rc: rc,
  245. listeners: make([]net.Listener, 0),
  246. poolCount: poolCount,
  247. getWorkConnFn: getWorkConnFn,
  248. serverCfg: serverCfg,
  249. limiter: limiter,
  250. xl: xl,
  251. ctx: xlog.NewContext(ctx, xl),
  252. userInfo: userInfo,
  253. loginMsg: loginMsg,
  254. pxyConf: pxyConf,
  255. }
  256. factory := proxyFactoryRegistry[reflect.TypeOf(pxyConf)]
  257. if factory == nil {
  258. return pxy, fmt.Errorf("proxy type not support")
  259. }
  260. pxy = factory(&basePxy, pxyConf)
  261. if pxy == nil {
  262. return nil, fmt.Errorf("proxy not created")
  263. }
  264. return pxy, nil
  265. }
  266. type Manager struct {
  267. // proxies indexed by proxy name
  268. pxys map[string]Proxy
  269. mu sync.RWMutex
  270. }
  271. func NewManager() *Manager {
  272. return &Manager{
  273. pxys: make(map[string]Proxy),
  274. }
  275. }
  276. func (pm *Manager) Add(name string, pxy Proxy) error {
  277. pm.mu.Lock()
  278. defer pm.mu.Unlock()
  279. if _, ok := pm.pxys[name]; ok {
  280. return fmt.Errorf("proxy name [%s] is already in use", name)
  281. }
  282. pm.pxys[name] = pxy
  283. return nil
  284. }
  285. func (pm *Manager) Exist(name string) bool {
  286. pm.mu.RLock()
  287. defer pm.mu.RUnlock()
  288. _, ok := pm.pxys[name]
  289. return ok
  290. }
  291. func (pm *Manager) Del(name string) {
  292. pm.mu.Lock()
  293. defer pm.mu.Unlock()
  294. delete(pm.pxys, name)
  295. }
  296. func (pm *Manager) GetByName(name string) (pxy Proxy, ok bool) {
  297. pm.mu.RLock()
  298. defer pm.mu.RUnlock()
  299. pxy, ok = pm.pxys[name]
  300. return
  301. }