1
0

vclient.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package ssh
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "sync/atomic"
  7. "time"
  8. "golang.org/x/crypto/ssh"
  9. "github.com/fatedier/frp/pkg/config"
  10. v1 "github.com/fatedier/frp/pkg/config/v1"
  11. "github.com/fatedier/frp/pkg/msg"
  12. plugin "github.com/fatedier/frp/pkg/plugin/server"
  13. "github.com/fatedier/frp/pkg/util/log"
  14. frp_net "github.com/fatedier/frp/pkg/util/net"
  15. "github.com/fatedier/frp/pkg/util/util"
  16. "github.com/fatedier/frp/pkg/util/xlog"
  17. "github.com/fatedier/frp/server/controller"
  18. "github.com/fatedier/frp/server/proxy"
  19. )
  20. // VirtualService is a client VirtualService run in frps
  21. type VirtualService struct {
  22. clientCfg v1.ClientCommonConfig
  23. pxyCfg v1.ProxyConfigurer
  24. serverCfg v1.ServerConfig
  25. sshSvc *Service
  26. // uniq id got from frps, attach it in loginMsg
  27. runID string
  28. loginMsg *msg.Login
  29. // All resource managers and controllers
  30. rc *controller.ResourceController
  31. exit uint32 // 0 means not exit
  32. // SSHService context
  33. ctx context.Context
  34. // call cancel to stop SSHService
  35. cancel context.CancelFunc
  36. replyCh chan interface{}
  37. pxy proxy.Proxy
  38. }
  39. func NewVirtualService(
  40. ctx context.Context,
  41. clientCfg v1.ClientCommonConfig,
  42. serverCfg v1.ServerConfig,
  43. logMsg msg.Login,
  44. rc *controller.ResourceController,
  45. pxyCfg v1.ProxyConfigurer,
  46. sshSvc *Service,
  47. replyCh chan interface{},
  48. ) (svr *VirtualService, err error) {
  49. svr = &VirtualService{
  50. clientCfg: clientCfg,
  51. serverCfg: serverCfg,
  52. rc: rc,
  53. loginMsg: &logMsg,
  54. sshSvc: sshSvc,
  55. pxyCfg: pxyCfg,
  56. ctx: ctx,
  57. exit: 0,
  58. replyCh: replyCh,
  59. }
  60. svr.runID, err = util.RandID()
  61. if err != nil {
  62. return nil, err
  63. }
  64. go svr.loopCheck()
  65. return
  66. }
  67. func (svr *VirtualService) Run(ctx context.Context) (err error) {
  68. ctx, cancel := context.WithCancel(ctx)
  69. svr.ctx = xlog.NewContext(ctx, xlog.New())
  70. svr.cancel = cancel
  71. remoteAddr, err := svr.RegisterProxy(&msg.NewProxy{
  72. ProxyName: svr.pxyCfg.(*v1.TCPProxyConfig).Name,
  73. ProxyType: svr.pxyCfg.(*v1.TCPProxyConfig).Type,
  74. RemotePort: svr.pxyCfg.(*v1.TCPProxyConfig).RemotePort,
  75. })
  76. if err != nil {
  77. return err
  78. }
  79. log.Info("run a reverse proxy on port: %v", remoteAddr)
  80. return nil
  81. }
  82. func (svr *VirtualService) Close() {
  83. svr.GracefulClose(time.Duration(0))
  84. }
  85. func (svr *VirtualService) GracefulClose(d time.Duration) {
  86. atomic.StoreUint32(&svr.exit, 1)
  87. svr.pxy.Close()
  88. if svr.cancel != nil {
  89. svr.cancel()
  90. }
  91. svr.replyCh <- &VProxyError{}
  92. }
  93. func (svr *VirtualService) loopCheck() {
  94. <-svr.sshSvc.Exit()
  95. svr.pxy.Close()
  96. log.Info("virtual client service close")
  97. }
  98. func (svr *VirtualService) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err error) {
  99. var pxyConf v1.ProxyConfigurer
  100. pxyConf, err = config.NewProxyConfigurerFromMsg(pxyMsg, &svr.serverCfg)
  101. if err != nil {
  102. return
  103. }
  104. // User info
  105. userInfo := plugin.UserInfo{
  106. User: svr.loginMsg.User,
  107. Metas: svr.loginMsg.Metas,
  108. RunID: svr.runID,
  109. }
  110. svr.pxy, err = proxy.NewProxy(svr.ctx, &proxy.Options{
  111. LoginMsg: svr.loginMsg,
  112. UserInfo: userInfo,
  113. Configurer: pxyConf,
  114. ResourceController: svr.rc,
  115. GetWorkConnFn: svr.GetWorkConn,
  116. PoolCount: 10,
  117. ServerCfg: &svr.serverCfg,
  118. })
  119. if err != nil {
  120. return remoteAddr, err
  121. }
  122. remoteAddr, err = svr.pxy.Run()
  123. if err != nil {
  124. log.Warn("proxy run error: %v", err)
  125. return
  126. }
  127. defer func() {
  128. if err != nil {
  129. log.Warn("proxy close")
  130. svr.pxy.Close()
  131. }
  132. }()
  133. return
  134. }
  135. func (svr *VirtualService) GetWorkConn() (workConn net.Conn, err error) {
  136. // tell ssh client open a new stream for work
  137. payload := forwardedTCPPayload{
  138. Addr: svr.serverCfg.BindAddr, // TODO refine
  139. Port: uint32(svr.pxyCfg.(*v1.TCPProxyConfig).RemotePort),
  140. }
  141. channel, reqs, err := svr.sshSvc.SSHConn().OpenChannel(ChannelTypeServerOpenChannel, ssh.Marshal(payload))
  142. if err != nil {
  143. return nil, fmt.Errorf("open ssh channel error: %v", err)
  144. }
  145. go ssh.DiscardRequests(reqs)
  146. workConn = frp_net.WrapReadWriteCloserToConn(channel, svr.sshSvc.tcpConn)
  147. return workConn, nil
  148. }