proxy.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "fmt"
  17. "io"
  18. "net"
  19. "strconv"
  20. "sync"
  21. "github.com/fatedier/frp/models/config"
  22. "github.com/fatedier/frp/models/msg"
  23. "github.com/fatedier/frp/server/controller"
  24. "github.com/fatedier/frp/server/stats"
  25. "github.com/fatedier/frp/utils/log"
  26. frpNet "github.com/fatedier/frp/utils/net"
  27. frpIo "github.com/fatedier/golib/io"
  28. )
  29. type GetWorkConnFn func() (frpNet.Conn, error)
  30. type Proxy interface {
  31. Run() (remoteAddr string, err error)
  32. GetName() string
  33. GetConf() config.ProxyConf
  34. GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Conn, err error)
  35. GetUsedPortsNum() int
  36. Close()
  37. log.Logger
  38. }
  39. type BaseProxy struct {
  40. name string
  41. rc *controller.ResourceController
  42. statsCollector stats.Collector
  43. listeners []frpNet.Listener
  44. usedPortsNum int
  45. poolCount int
  46. getWorkConnFn GetWorkConnFn
  47. serverCfg config.ServerCommonConf
  48. mu sync.RWMutex
  49. log.Logger
  50. }
  51. func (pxy *BaseProxy) GetName() string {
  52. return pxy.name
  53. }
  54. func (pxy *BaseProxy) GetUsedPortsNum() int {
  55. return pxy.usedPortsNum
  56. }
  57. func (pxy *BaseProxy) Close() {
  58. pxy.Info("proxy closing")
  59. for _, l := range pxy.listeners {
  60. l.Close()
  61. }
  62. }
  63. // GetWorkConnFromPool try to get a new work connections from pool
  64. // for quickly response, we immediately send the StartWorkConn message to frpc after take out one from pool
  65. func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn frpNet.Conn, err error) {
  66. // try all connections from the pool
  67. for i := 0; i < pxy.poolCount+1; i++ {
  68. if workConn, err = pxy.getWorkConnFn(); err != nil {
  69. pxy.Warn("failed to get work connection: %v", err)
  70. return
  71. }
  72. pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
  73. workConn.AddLogPrefix(pxy.GetName())
  74. var (
  75. srcAddr string
  76. dstAddr string
  77. srcPortStr string
  78. dstPortStr string
  79. srcPort int
  80. dstPort int
  81. )
  82. if src != nil {
  83. srcAddr, srcPortStr, _ = net.SplitHostPort(src.String())
  84. srcPort, _ = strconv.Atoi(srcPortStr)
  85. }
  86. if dst != nil {
  87. dstAddr, dstPortStr, _ = net.SplitHostPort(dst.String())
  88. dstPort, _ = strconv.Atoi(dstPortStr)
  89. }
  90. err := msg.WriteMsg(workConn, &msg.StartWorkConn{
  91. ProxyName: pxy.GetName(),
  92. SrcAddr: srcAddr,
  93. SrcPort: uint16(srcPort),
  94. DstAddr: dstAddr,
  95. DstPort: uint16(dstPort),
  96. })
  97. if err != nil {
  98. workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
  99. workConn.Close()
  100. } else {
  101. break
  102. }
  103. }
  104. if err != nil {
  105. pxy.Error("try to get work connection failed in the end")
  106. return
  107. }
  108. return
  109. }
  110. // startListenHandler start a goroutine handler for each listener.
  111. // p: p will just be passed to handler(Proxy, frpNet.Conn).
  112. // handler: each proxy type can set different handler function to deal with connections accepted from listeners.
  113. func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn, stats.Collector, config.ServerCommonConf)) {
  114. for _, listener := range pxy.listeners {
  115. go func(l frpNet.Listener) {
  116. for {
  117. // block
  118. // if listener is closed, err returned
  119. c, err := l.Accept()
  120. if err != nil {
  121. pxy.Info("listener is closed")
  122. return
  123. }
  124. pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
  125. go handler(p, c, pxy.statsCollector, pxy.serverCfg)
  126. }
  127. }(listener)
  128. }
  129. }
  130. func NewProxy(runId string, rc *controller.ResourceController, statsCollector stats.Collector, poolCount int,
  131. getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf) (pxy Proxy, err error) {
  132. basePxy := BaseProxy{
  133. name: pxyConf.GetBaseInfo().ProxyName,
  134. rc: rc,
  135. statsCollector: statsCollector,
  136. listeners: make([]frpNet.Listener, 0),
  137. poolCount: poolCount,
  138. getWorkConnFn: getWorkConnFn,
  139. Logger: log.NewPrefixLogger(runId),
  140. serverCfg: serverCfg,
  141. }
  142. switch cfg := pxyConf.(type) {
  143. case *config.TcpProxyConf:
  144. basePxy.usedPortsNum = 1
  145. pxy = &TcpProxy{
  146. BaseProxy: &basePxy,
  147. cfg: cfg,
  148. }
  149. case *config.HttpProxyConf:
  150. pxy = &HttpProxy{
  151. BaseProxy: &basePxy,
  152. cfg: cfg,
  153. }
  154. case *config.HttpsProxyConf:
  155. pxy = &HttpsProxy{
  156. BaseProxy: &basePxy,
  157. cfg: cfg,
  158. }
  159. case *config.UdpProxyConf:
  160. basePxy.usedPortsNum = 1
  161. pxy = &UdpProxy{
  162. BaseProxy: &basePxy,
  163. cfg: cfg,
  164. }
  165. case *config.StcpProxyConf:
  166. pxy = &StcpProxy{
  167. BaseProxy: &basePxy,
  168. cfg: cfg,
  169. }
  170. case *config.XtcpProxyConf:
  171. pxy = &XtcpProxy{
  172. BaseProxy: &basePxy,
  173. cfg: cfg,
  174. }
  175. default:
  176. return pxy, fmt.Errorf("proxy type not support")
  177. }
  178. pxy.AddLogPrefix(pxy.GetName())
  179. return
  180. }
  181. // HandleUserTcpConnection is used for incoming tcp user connections.
  182. // It can be used for tcp, http, https type.
  183. func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector stats.Collector, serverCfg config.ServerCommonConf) {
  184. defer userConn.Close()
  185. // try all connections from the pool
  186. workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
  187. if err != nil {
  188. return
  189. }
  190. defer workConn.Close()
  191. var local io.ReadWriteCloser = workConn
  192. cfg := pxy.GetConf().GetBaseInfo()
  193. if cfg.UseEncryption {
  194. local, err = frpIo.WithEncryption(local, []byte(serverCfg.Token))
  195. if err != nil {
  196. pxy.Error("create encryption stream error: %v", err)
  197. return
  198. }
  199. }
  200. if cfg.UseCompression {
  201. local = frpIo.WithCompression(local)
  202. }
  203. pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
  204. workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
  205. statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()})
  206. inCount, outCount := frpIo.Join(local, userConn)
  207. statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()})
  208. statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
  209. ProxyName: pxy.GetName(),
  210. TrafficBytes: inCount,
  211. })
  212. statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
  213. ProxyName: pxy.GetName(),
  214. TrafficBytes: outCount,
  215. })
  216. pxy.Debug("join connections closed")
  217. }
  218. type ProxyManager struct {
  219. // proxies indexed by proxy name
  220. pxys map[string]Proxy
  221. mu sync.RWMutex
  222. }
  223. func NewProxyManager() *ProxyManager {
  224. return &ProxyManager{
  225. pxys: make(map[string]Proxy),
  226. }
  227. }
  228. func (pm *ProxyManager) Add(name string, pxy Proxy) error {
  229. pm.mu.Lock()
  230. defer pm.mu.Unlock()
  231. if _, ok := pm.pxys[name]; ok {
  232. return fmt.Errorf("proxy name [%s] is already in use", name)
  233. }
  234. pm.pxys[name] = pxy
  235. return nil
  236. }
  237. func (pm *ProxyManager) Del(name string) {
  238. pm.mu.Lock()
  239. defer pm.mu.Unlock()
  240. delete(pm.pxys, name)
  241. }
  242. func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) {
  243. pm.mu.RLock()
  244. defer pm.mu.RUnlock()
  245. pxy, ok = pm.pxys[name]
  246. return
  247. }