proxy.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net"
  20. "reflect"
  21. "strconv"
  22. "strings"
  23. "sync"
  24. "time"
  25. libio "github.com/fatedier/golib/io"
  26. "golang.org/x/time/rate"
  27. "github.com/fatedier/frp/pkg/config/types"
  28. v1 "github.com/fatedier/frp/pkg/config/v1"
  29. "github.com/fatedier/frp/pkg/msg"
  30. plugin "github.com/fatedier/frp/pkg/plugin/server"
  31. "github.com/fatedier/frp/pkg/util/limit"
  32. utilnet "github.com/fatedier/frp/pkg/util/net"
  33. "github.com/fatedier/frp/pkg/util/xlog"
  34. "github.com/fatedier/frp/server/controller"
  35. "github.com/fatedier/frp/server/metrics"
  36. )
  37. var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy) Proxy{}
  38. func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy) Proxy) {
  39. proxyFactoryRegistry[proxyConfType] = factory
  40. }
  41. type GetWorkConnFn func() (net.Conn, error)
  42. type Proxy interface {
  43. Context() context.Context
  44. Run() (remoteAddr string, err error)
  45. GetName() string
  46. GetConfigurer() v1.ProxyConfigurer
  47. GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error)
  48. GetUsedPortsNum() int
  49. GetResourceController() *controller.ResourceController
  50. GetUserInfo() plugin.UserInfo
  51. GetLimiter() *rate.Limiter
  52. GetLoginMsg() *msg.Login
  53. Close()
  54. }
  55. type BaseProxy struct {
  56. name string
  57. rc *controller.ResourceController
  58. listeners []net.Listener
  59. usedPortsNum int
  60. poolCount int
  61. getWorkConnFn GetWorkConnFn
  62. serverCfg *v1.ServerConfig
  63. limiter *rate.Limiter
  64. userInfo plugin.UserInfo
  65. loginMsg *msg.Login
  66. configurer v1.ProxyConfigurer
  67. mu sync.RWMutex
  68. xl *xlog.Logger
  69. ctx context.Context
  70. }
  71. func (pxy *BaseProxy) GetName() string {
  72. return pxy.name
  73. }
  74. func (pxy *BaseProxy) Context() context.Context {
  75. return pxy.ctx
  76. }
  77. func (pxy *BaseProxy) GetUsedPortsNum() int {
  78. return pxy.usedPortsNum
  79. }
  80. func (pxy *BaseProxy) GetResourceController() *controller.ResourceController {
  81. return pxy.rc
  82. }
  83. func (pxy *BaseProxy) GetUserInfo() plugin.UserInfo {
  84. return pxy.userInfo
  85. }
  86. func (pxy *BaseProxy) GetLoginMsg() *msg.Login {
  87. return pxy.loginMsg
  88. }
  89. func (pxy *BaseProxy) GetLimiter() *rate.Limiter {
  90. return pxy.limiter
  91. }
  92. func (pxy *BaseProxy) GetConfigurer() v1.ProxyConfigurer {
  93. return pxy.configurer
  94. }
  95. func (pxy *BaseProxy) Close() {
  96. xl := xlog.FromContextSafe(pxy.ctx)
  97. xl.Info("proxy closing")
  98. for _, l := range pxy.listeners {
  99. l.Close()
  100. }
  101. }
  102. // GetWorkConnFromPool try to get a new work connections from pool
  103. // for quickly response, we immediately send the StartWorkConn message to frpc after take out one from pool
  104. func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn, err error) {
  105. xl := xlog.FromContextSafe(pxy.ctx)
  106. // try all connections from the pool
  107. for i := 0; i < pxy.poolCount+1; i++ {
  108. if workConn, err = pxy.getWorkConnFn(); err != nil {
  109. xl.Warn("failed to get work connection: %v", err)
  110. return
  111. }
  112. xl.Debug("get a new work connection: [%s]", workConn.RemoteAddr().String())
  113. xl.Spawn().AppendPrefix(pxy.GetName())
  114. workConn = utilnet.NewContextConn(pxy.ctx, workConn)
  115. var (
  116. srcAddr string
  117. dstAddr string
  118. srcPortStr string
  119. dstPortStr string
  120. srcPort int
  121. dstPort int
  122. )
  123. if src != nil {
  124. srcAddr, srcPortStr, _ = net.SplitHostPort(src.String())
  125. srcPort, _ = strconv.Atoi(srcPortStr)
  126. }
  127. if dst != nil {
  128. dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String())
  129. dstPort, _ = strconv.Atoi(dstPortStr)
  130. }
  131. err := msg.WriteMsg(workConn, &msg.StartWorkConn{
  132. ProxyName: pxy.GetName(),
  133. SrcAddr: srcAddr,
  134. SrcPort: uint16(srcPort),
  135. DstAddr: dstAddr,
  136. DstPort: uint16(dstPort),
  137. Error: "",
  138. })
  139. if err != nil {
  140. xl.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
  141. workConn.Close()
  142. } else {
  143. break
  144. }
  145. }
  146. if err != nil {
  147. xl.Error("try to get work connection failed in the end")
  148. return
  149. }
  150. return
  151. }
  152. // startCommonTCPListenersHandler start a goroutine handler for each listener.
  153. func (pxy *BaseProxy) startCommonTCPListenersHandler() {
  154. xl := xlog.FromContextSafe(pxy.ctx)
  155. for _, listener := range pxy.listeners {
  156. go func(l net.Listener) {
  157. var tempDelay time.Duration // how long to sleep on accept failure
  158. for {
  159. // block
  160. // if listener is closed, err returned
  161. c, err := l.Accept()
  162. if err != nil {
  163. if err, ok := err.(interface{ Temporary() bool }); ok && err.Temporary() {
  164. if tempDelay == 0 {
  165. tempDelay = 5 * time.Millisecond
  166. } else {
  167. tempDelay *= 2
  168. }
  169. if max := 1 * time.Second; tempDelay > max {
  170. tempDelay = max
  171. }
  172. xl.Info("met temporary error: %s, sleep for %s ...", err, tempDelay)
  173. time.Sleep(tempDelay)
  174. continue
  175. }
  176. xl.Warn("listener is closed: %s", err)
  177. return
  178. }
  179. xl.Info("get a user connection [%s]", c.RemoteAddr().String())
  180. go pxy.handleUserTCPConnection(c)
  181. }
  182. }(listener)
  183. }
  184. }
  185. // HandleUserTCPConnection is used for incoming user TCP connections.
  186. func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) {
  187. xl := xlog.FromContextSafe(pxy.Context())
  188. defer userConn.Close()
  189. serverCfg := pxy.serverCfg
  190. cfg := pxy.configurer.GetBaseConfig()
  191. // server plugin hook
  192. rc := pxy.GetResourceController()
  193. content := &plugin.NewUserConnContent{
  194. User: pxy.GetUserInfo(),
  195. ProxyName: pxy.GetName(),
  196. ProxyType: cfg.Type,
  197. RemoteAddr: userConn.RemoteAddr().String(),
  198. }
  199. _, err := rc.PluginManager.NewUserConn(content)
  200. if err != nil {
  201. xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
  202. return
  203. }
  204. var workConn net.Conn
  205. // try all connections from the pool
  206. if strings.HasPrefix(pxy.GetLoginMsg().User, v1.SSHClientLoginUserPrefix) {
  207. workConn, err = pxy.getWorkConnFn()
  208. } else {
  209. workConn, err = pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
  210. }
  211. if err != nil {
  212. return
  213. }
  214. defer workConn.Close()
  215. var local io.ReadWriteCloser = workConn
  216. xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t",
  217. cfg.Transport.UseEncryption, cfg.Transport.UseCompression)
  218. if cfg.Transport.UseEncryption {
  219. local, err = libio.WithEncryption(local, []byte(serverCfg.Auth.Token))
  220. if err != nil {
  221. xl.Error("create encryption stream error: %v", err)
  222. return
  223. }
  224. }
  225. if cfg.Transport.UseCompression {
  226. var recycleFn func()
  227. local, recycleFn = libio.WithCompressionFromPool(local)
  228. defer recycleFn()
  229. }
  230. if pxy.GetLimiter() != nil {
  231. local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
  232. return local.Close()
  233. })
  234. }
  235. xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
  236. workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
  237. name := pxy.GetName()
  238. proxyType := cfg.Type
  239. metrics.Server.OpenConnection(name, proxyType)
  240. inCount, outCount, _ := libio.Join(local, userConn)
  241. metrics.Server.CloseConnection(name, proxyType)
  242. metrics.Server.AddTrafficIn(name, proxyType, inCount)
  243. metrics.Server.AddTrafficOut(name, proxyType, outCount)
  244. xl.Debug("join connections closed")
  245. }
  246. type Options struct {
  247. UserInfo plugin.UserInfo
  248. LoginMsg *msg.Login
  249. PoolCount int
  250. ResourceController *controller.ResourceController
  251. GetWorkConnFn GetWorkConnFn
  252. Configurer v1.ProxyConfigurer
  253. ServerCfg *v1.ServerConfig
  254. }
  255. func NewProxy(ctx context.Context, options *Options) (pxy Proxy, err error) {
  256. configurer := options.Configurer
  257. xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(configurer.GetBaseConfig().Name)
  258. var limiter *rate.Limiter
  259. limitBytes := configurer.GetBaseConfig().Transport.BandwidthLimit.Bytes()
  260. if limitBytes > 0 && configurer.GetBaseConfig().Transport.BandwidthLimitMode == types.BandwidthLimitModeServer {
  261. limiter = rate.NewLimiter(rate.Limit(float64(limitBytes)), int(limitBytes))
  262. }
  263. basePxy := BaseProxy{
  264. name: configurer.GetBaseConfig().Name,
  265. rc: options.ResourceController,
  266. listeners: make([]net.Listener, 0),
  267. poolCount: options.PoolCount,
  268. getWorkConnFn: options.GetWorkConnFn,
  269. serverCfg: options.ServerCfg,
  270. limiter: limiter,
  271. xl: xl,
  272. ctx: xlog.NewContext(ctx, xl),
  273. userInfo: options.UserInfo,
  274. loginMsg: options.LoginMsg,
  275. configurer: configurer,
  276. }
  277. factory := proxyFactoryRegistry[reflect.TypeOf(configurer)]
  278. if factory == nil {
  279. return pxy, fmt.Errorf("proxy type not support")
  280. }
  281. pxy = factory(&basePxy)
  282. if pxy == nil {
  283. return nil, fmt.Errorf("proxy not created")
  284. }
  285. return pxy, nil
  286. }
  287. type Manager struct {
  288. // proxies indexed by proxy name
  289. pxys map[string]Proxy
  290. mu sync.RWMutex
  291. }
  292. func NewManager() *Manager {
  293. return &Manager{
  294. pxys: make(map[string]Proxy),
  295. }
  296. }
  297. func (pm *Manager) Add(name string, pxy Proxy) error {
  298. pm.mu.Lock()
  299. defer pm.mu.Unlock()
  300. if _, ok := pm.pxys[name]; ok {
  301. return fmt.Errorf("proxy name [%s] is already in use", name)
  302. }
  303. pm.pxys[name] = pxy
  304. return nil
  305. }
  306. func (pm *Manager) Exist(name string) bool {
  307. pm.mu.RLock()
  308. defer pm.mu.RUnlock()
  309. _, ok := pm.pxys[name]
  310. return ok
  311. }
  312. func (pm *Manager) Del(name string) {
  313. pm.mu.Lock()
  314. defer pm.mu.Unlock()
  315. delete(pm.pxys, name)
  316. }
  317. func (pm *Manager) GetByName(name string) (pxy Proxy, ok bool) {
  318. pm.mu.RLock()
  319. defer pm.mu.RUnlock()
  320. pxy, ok = pm.pxys[name]
  321. return
  322. }