proxy.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package proxy
  15. import (
  16. "fmt"
  17. "io"
  18. "sync"
  19. "github.com/fatedier/frp/g"
  20. "github.com/fatedier/frp/models/config"
  21. "github.com/fatedier/frp/models/msg"
  22. "github.com/fatedier/frp/server/controller"
  23. "github.com/fatedier/frp/server/stats"
  24. "github.com/fatedier/frp/utils/log"
  25. frpNet "github.com/fatedier/frp/utils/net"
  26. frpIo "github.com/fatedier/golib/io"
  27. )
  28. type GetWorkConnFn func() (frpNet.Conn, error)
  29. type Proxy interface {
  30. Run() (remoteAddr string, err error)
  31. GetName() string
  32. GetConf() config.ProxyConf
  33. GetWorkConnFromPool() (workConn frpNet.Conn, err error)
  34. GetUsedPortsNum() int
  35. Close()
  36. log.Logger
  37. }
  38. type BaseProxy struct {
  39. name string
  40. rc *controller.ResourceController
  41. statsCollector stats.Collector
  42. listeners []frpNet.Listener
  43. usedPortsNum int
  44. poolCount int
  45. getWorkConnFn GetWorkConnFn
  46. mu sync.RWMutex
  47. log.Logger
  48. }
  49. func (pxy *BaseProxy) GetName() string {
  50. return pxy.name
  51. }
  52. func (pxy *BaseProxy) GetUsedPortsNum() int {
  53. return pxy.usedPortsNum
  54. }
  55. func (pxy *BaseProxy) Close() {
  56. pxy.Info("proxy closing")
  57. for _, l := range pxy.listeners {
  58. l.Close()
  59. }
  60. }
  61. func (pxy *BaseProxy) GetWorkConnFromPool() (workConn frpNet.Conn, err error) {
  62. // try all connections from the pool
  63. for i := 0; i < pxy.poolCount+1; i++ {
  64. if workConn, err = pxy.getWorkConnFn(); err != nil {
  65. pxy.Warn("failed to get work connection: %v", err)
  66. return
  67. }
  68. pxy.Info("get a new work connection: [%s]", workConn.RemoteAddr().String())
  69. workConn.AddLogPrefix(pxy.GetName())
  70. err := msg.WriteMsg(workConn, &msg.StartWorkConn{
  71. ProxyName: pxy.GetName(),
  72. })
  73. if err != nil {
  74. workConn.Warn("failed to send message to work connection from pool: %v, times: %d", err, i)
  75. workConn.Close()
  76. } else {
  77. break
  78. }
  79. }
  80. if err != nil {
  81. pxy.Error("try to get work connection failed in the end")
  82. return
  83. }
  84. return
  85. }
  86. // startListenHandler start a goroutine handler for each listener.
  87. // p: p will just be passed to handler(Proxy, frpNet.Conn).
  88. // handler: each proxy type can set different handler function to deal with connections accepted from listeners.
  89. func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, frpNet.Conn, stats.Collector)) {
  90. for _, listener := range pxy.listeners {
  91. go func(l frpNet.Listener) {
  92. for {
  93. // block
  94. // if listener is closed, err returned
  95. c, err := l.Accept()
  96. if err != nil {
  97. pxy.Info("listener is closed")
  98. return
  99. }
  100. pxy.Debug("get a user connection [%s]", c.RemoteAddr().String())
  101. go handler(p, c, pxy.statsCollector)
  102. }
  103. }(listener)
  104. }
  105. }
  106. func NewProxy(runId string, rc *controller.ResourceController, statsCollector stats.Collector, poolCount int,
  107. getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf) (pxy Proxy, err error) {
  108. basePxy := BaseProxy{
  109. name: pxyConf.GetBaseInfo().ProxyName,
  110. rc: rc,
  111. statsCollector: statsCollector,
  112. listeners: make([]frpNet.Listener, 0),
  113. poolCount: poolCount,
  114. getWorkConnFn: getWorkConnFn,
  115. Logger: log.NewPrefixLogger(runId),
  116. }
  117. switch cfg := pxyConf.(type) {
  118. case *config.TcpProxyConf:
  119. basePxy.usedPortsNum = 1
  120. pxy = &TcpProxy{
  121. BaseProxy: &basePxy,
  122. cfg: cfg,
  123. }
  124. case *config.HttpProxyConf:
  125. pxy = &HttpProxy{
  126. BaseProxy: &basePxy,
  127. cfg: cfg,
  128. }
  129. case *config.HttpsProxyConf:
  130. pxy = &HttpsProxy{
  131. BaseProxy: &basePxy,
  132. cfg: cfg,
  133. }
  134. case *config.UdpProxyConf:
  135. basePxy.usedPortsNum = 1
  136. pxy = &UdpProxy{
  137. BaseProxy: &basePxy,
  138. cfg: cfg,
  139. }
  140. case *config.StcpProxyConf:
  141. pxy = &StcpProxy{
  142. BaseProxy: &basePxy,
  143. cfg: cfg,
  144. }
  145. case *config.XtcpProxyConf:
  146. pxy = &XtcpProxy{
  147. BaseProxy: &basePxy,
  148. cfg: cfg,
  149. }
  150. default:
  151. return pxy, fmt.Errorf("proxy type not support")
  152. }
  153. pxy.AddLogPrefix(pxy.GetName())
  154. return
  155. }
  156. // HandleUserTcpConnection is used for incoming tcp user connections.
  157. // It can be used for tcp, http, https type.
  158. func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn, statsCollector stats.Collector) {
  159. defer userConn.Close()
  160. // try all connections from the pool
  161. workConn, err := pxy.GetWorkConnFromPool()
  162. if err != nil {
  163. return
  164. }
  165. defer workConn.Close()
  166. var local io.ReadWriteCloser = workConn
  167. cfg := pxy.GetConf().GetBaseInfo()
  168. if cfg.UseEncryption {
  169. local, err = frpIo.WithEncryption(local, []byte(g.GlbServerCfg.Token))
  170. if err != nil {
  171. pxy.Error("create encryption stream error: %v", err)
  172. return
  173. }
  174. }
  175. if cfg.UseCompression {
  176. local = frpIo.WithCompression(local)
  177. }
  178. pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
  179. workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
  180. statsCollector.Mark(stats.TypeOpenConnection, &stats.OpenConnectionPayload{ProxyName: pxy.GetName()})
  181. inCount, outCount := frpIo.Join(local, userConn)
  182. statsCollector.Mark(stats.TypeCloseConnection, &stats.CloseConnectionPayload{ProxyName: pxy.GetName()})
  183. statsCollector.Mark(stats.TypeAddTrafficIn, &stats.AddTrafficInPayload{
  184. ProxyName: pxy.GetName(),
  185. TrafficBytes: inCount,
  186. })
  187. statsCollector.Mark(stats.TypeAddTrafficOut, &stats.AddTrafficOutPayload{
  188. ProxyName: pxy.GetName(),
  189. TrafficBytes: outCount,
  190. })
  191. pxy.Debug("join connections closed")
  192. }
  193. type ProxyManager struct {
  194. // proxies indexed by proxy name
  195. pxys map[string]Proxy
  196. mu sync.RWMutex
  197. }
  198. func NewProxyManager() *ProxyManager {
  199. return &ProxyManager{
  200. pxys: make(map[string]Proxy),
  201. }
  202. }
  203. func (pm *ProxyManager) Add(name string, pxy Proxy) error {
  204. pm.mu.Lock()
  205. defer pm.mu.Unlock()
  206. if _, ok := pm.pxys[name]; ok {
  207. return fmt.Errorf("proxy name [%s] is already in use", name)
  208. }
  209. pm.pxys[name] = pxy
  210. return nil
  211. }
  212. func (pm *ProxyManager) Del(name string) {
  213. pm.mu.Lock()
  214. defer pm.mu.Unlock()
  215. delete(pm.pxys, name)
  216. }
  217. func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) {
  218. pm.mu.RLock()
  219. defer pm.mu.RUnlock()
  220. pxy, ok = pm.pxys[name]
  221. return
  222. }