server.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. // Copyright 2023 The frp Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package ssh
  15. import (
  16. "context"
  17. "encoding/binary"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "strings"
  22. "sync"
  23. "time"
  24. libio "github.com/fatedier/golib/io"
  25. "github.com/samber/lo"
  26. "github.com/spf13/cobra"
  27. "golang.org/x/crypto/ssh"
  28. "github.com/fatedier/frp/client/proxy"
  29. "github.com/fatedier/frp/pkg/config"
  30. v1 "github.com/fatedier/frp/pkg/config/v1"
  31. "github.com/fatedier/frp/pkg/msg"
  32. "github.com/fatedier/frp/pkg/util/log"
  33. netpkg "github.com/fatedier/frp/pkg/util/net"
  34. "github.com/fatedier/frp/pkg/util/util"
  35. "github.com/fatedier/frp/pkg/util/xlog"
  36. "github.com/fatedier/frp/pkg/virtual"
  37. )
  38. const (
  39. // https://datatracker.ietf.org/doc/html/rfc4254#page-16
  40. ChannelTypeServerOpenChannel = "forwarded-tcpip"
  41. RequestTypeForward = "tcpip-forward"
  42. )
  43. type tcpipForward struct {
  44. Host string
  45. Port uint32
  46. }
  47. // https://datatracker.ietf.org/doc/html/rfc4254#page-16
  48. type forwardedTCPPayload struct {
  49. Addr string
  50. Port uint32
  51. OriginAddr string
  52. OriginPort uint32
  53. }
  54. type TunnelServer struct {
  55. underlyingConn net.Conn
  56. sshConn *ssh.ServerConn
  57. sc *ssh.ServerConfig
  58. vc *virtual.Client
  59. peerServerListener *netpkg.InternalListener
  60. doneCh chan struct{}
  61. closeDoneChOnce sync.Once
  62. }
  63. func NewTunnelServer(conn net.Conn, sc *ssh.ServerConfig, peerServerListener *netpkg.InternalListener) (*TunnelServer, error) {
  64. s := &TunnelServer{
  65. underlyingConn: conn,
  66. sc: sc,
  67. peerServerListener: peerServerListener,
  68. doneCh: make(chan struct{}),
  69. }
  70. return s, nil
  71. }
  72. func (s *TunnelServer) Run() error {
  73. sshConn, channels, requests, err := ssh.NewServerConn(s.underlyingConn, s.sc)
  74. if err != nil {
  75. return err
  76. }
  77. s.sshConn = sshConn
  78. addr, extraPayload, err := s.waitForwardAddrAndExtraPayload(channels, requests, 3*time.Second)
  79. if err != nil {
  80. return err
  81. }
  82. clientCfg, pc, err := s.parseClientAndProxyConfigurer(addr, extraPayload)
  83. if err != nil {
  84. return err
  85. }
  86. clientCfg.Complete()
  87. if sshConn.Permissions != nil {
  88. clientCfg.User = util.EmptyOr(sshConn.Permissions.Extensions["user"], clientCfg.User)
  89. }
  90. pc.Complete(clientCfg.User)
  91. vc, err := virtual.NewClient(virtual.ClientOptions{
  92. Common: clientCfg,
  93. Spec: &msg.ClientSpec{
  94. Type: "ssh-tunnel",
  95. // If ssh does not require authentication, then the virtual client needs to authenticate through a token.
  96. // Otherwise, once ssh authentication is passed, the virtual client does not need to authenticate again.
  97. AlwaysAuthPass: !s.sc.NoClientAuth,
  98. },
  99. HandleWorkConnCb: func(base *v1.ProxyBaseConfig, workConn net.Conn, m *msg.StartWorkConn) bool {
  100. // join workConn and ssh channel
  101. c, err := s.openConn(addr)
  102. if err != nil {
  103. log.Trace("open conn error: %v", err)
  104. workConn.Close()
  105. return false
  106. }
  107. libio.Join(c, workConn)
  108. return false
  109. },
  110. })
  111. if err != nil {
  112. return err
  113. }
  114. s.vc = vc
  115. // transfer connection from virtual client to server peer listener
  116. go func() {
  117. l := s.vc.PeerListener()
  118. for {
  119. conn, err := l.Accept()
  120. if err != nil {
  121. return
  122. }
  123. _ = s.peerServerListener.PutConn(conn)
  124. }
  125. }()
  126. xl := xlog.New().AddPrefix(xlog.LogPrefix{Name: "sshVirtualClient", Value: "sshVirtualClient", Priority: 100})
  127. ctx := xlog.NewContext(context.Background(), xl)
  128. go func() {
  129. _ = s.vc.Run(ctx)
  130. // If vc.Run returns, it means that the virtual client has been closed, and the ssh tunnel connection should be closed.
  131. // One scenario is that the virtual client exits due to login failure.
  132. s.closeDoneChOnce.Do(func() {
  133. _ = sshConn.Close()
  134. close(s.doneCh)
  135. })
  136. }()
  137. s.vc.UpdateProxyConfigurer([]v1.ProxyConfigurer{pc})
  138. if err := s.waitProxyStatusReady(pc.GetBaseConfig().Name, time.Second); err != nil {
  139. log.Warn("wait proxy status ready error: %v", err)
  140. } else {
  141. _ = sshConn.Wait()
  142. }
  143. s.vc.Close()
  144. log.Trace("ssh tunnel connection from %v closed", sshConn.RemoteAddr())
  145. s.closeDoneChOnce.Do(func() {
  146. _ = sshConn.Close()
  147. close(s.doneCh)
  148. })
  149. return nil
  150. }
  151. func (s *TunnelServer) waitForwardAddrAndExtraPayload(
  152. channels <-chan ssh.NewChannel,
  153. requests <-chan *ssh.Request,
  154. timeout time.Duration,
  155. ) (*tcpipForward, string, error) {
  156. addrCh := make(chan *tcpipForward, 1)
  157. extraPayloadCh := make(chan string, 1)
  158. // get forward address
  159. go func() {
  160. addrGot := false
  161. for req := range requests {
  162. if req.Type == RequestTypeForward && !addrGot {
  163. payload := tcpipForward{}
  164. if err := ssh.Unmarshal(req.Payload, &payload); err != nil {
  165. return
  166. }
  167. addrGot = true
  168. addrCh <- &payload
  169. }
  170. if req.WantReply {
  171. _ = req.Reply(true, nil)
  172. }
  173. }
  174. }()
  175. // get extra payload
  176. go func() {
  177. for newChannel := range channels {
  178. // extraPayload will send to extraPayloadCh
  179. go s.handleNewChannel(newChannel, extraPayloadCh)
  180. }
  181. }()
  182. var (
  183. addr *tcpipForward
  184. extraPayload string
  185. )
  186. timer := time.NewTimer(timeout)
  187. defer timer.Stop()
  188. for {
  189. select {
  190. case v := <-addrCh:
  191. addr = v
  192. case extra := <-extraPayloadCh:
  193. extraPayload = extra
  194. case <-timer.C:
  195. return nil, "", fmt.Errorf("get addr and extra payload timeout")
  196. }
  197. if addr != nil && extraPayload != "" {
  198. break
  199. }
  200. }
  201. return addr, extraPayload, nil
  202. }
  203. func (s *TunnelServer) parseClientAndProxyConfigurer(_ *tcpipForward, extraPayload string) (*v1.ClientCommonConfig, v1.ProxyConfigurer, error) {
  204. cmd := &cobra.Command{}
  205. args := strings.Split(extraPayload, " ")
  206. if len(args) < 1 {
  207. return nil, nil, fmt.Errorf("invalid extra payload")
  208. }
  209. proxyType := strings.TrimSpace(args[0])
  210. supportTypes := []string{"tcp", "http", "https", "tcpmux", "stcp"}
  211. if !lo.Contains(supportTypes, proxyType) {
  212. return nil, nil, fmt.Errorf("invalid proxy type: %s, support types: %v", proxyType, supportTypes)
  213. }
  214. pc := v1.NewProxyConfigurerByType(v1.ProxyType(proxyType))
  215. if pc == nil {
  216. return nil, nil, fmt.Errorf("new proxy configurer error")
  217. }
  218. config.RegisterProxyFlags(cmd, pc)
  219. clientCfg := v1.ClientCommonConfig{}
  220. config.RegisterClientCommonConfigFlags(cmd, &clientCfg)
  221. if err := cmd.ParseFlags(args); err != nil {
  222. return nil, nil, fmt.Errorf("parse flags from ssh client error: %v", err)
  223. }
  224. // if name is not set, generate a random one
  225. if pc.GetBaseConfig().Name == "" {
  226. id, err := util.RandIDWithLen(8)
  227. if err != nil {
  228. return nil, nil, fmt.Errorf("generate random id error: %v", err)
  229. }
  230. pc.GetBaseConfig().Name = fmt.Sprintf("sshtunnel-%s-%s", proxyType, id)
  231. }
  232. return &clientCfg, pc, nil
  233. }
  234. func (s *TunnelServer) handleNewChannel(channel ssh.NewChannel, extraPayloadCh chan string) {
  235. ch, reqs, err := channel.Accept()
  236. if err != nil {
  237. return
  238. }
  239. go s.keepAlive(ch)
  240. for req := range reqs {
  241. if req.WantReply {
  242. _ = req.Reply(true, nil)
  243. }
  244. if req.Type != "exec" || len(req.Payload) <= 4 {
  245. continue
  246. }
  247. end := 4 + binary.BigEndian.Uint32(req.Payload[:4])
  248. if len(req.Payload) < int(end) {
  249. continue
  250. }
  251. extraPayload := string(req.Payload[4:end])
  252. select {
  253. case extraPayloadCh <- extraPayload:
  254. default:
  255. }
  256. }
  257. }
  258. func (s *TunnelServer) keepAlive(ch ssh.Channel) {
  259. tk := time.NewTicker(time.Second * 30)
  260. defer tk.Stop()
  261. for {
  262. select {
  263. case <-tk.C:
  264. _, err := ch.SendRequest("heartbeat", false, nil)
  265. if err != nil {
  266. return
  267. }
  268. case <-s.doneCh:
  269. return
  270. }
  271. }
  272. }
  273. func (s *TunnelServer) openConn(addr *tcpipForward) (net.Conn, error) {
  274. payload := forwardedTCPPayload{
  275. Addr: addr.Host,
  276. Port: addr.Port,
  277. // Note: Here is just for compatibility, not the real source address.
  278. OriginAddr: addr.Host,
  279. OriginPort: addr.Port,
  280. }
  281. channel, reqs, err := s.sshConn.OpenChannel(ChannelTypeServerOpenChannel, ssh.Marshal(&payload))
  282. if err != nil {
  283. return nil, fmt.Errorf("open ssh channel error: %v", err)
  284. }
  285. go ssh.DiscardRequests(reqs)
  286. conn := netpkg.WrapReadWriteCloserToConn(channel, s.underlyingConn)
  287. return conn, nil
  288. }
  289. func (s *TunnelServer) waitProxyStatusReady(name string, timeout time.Duration) error {
  290. ticker := time.NewTicker(100 * time.Millisecond)
  291. defer ticker.Stop()
  292. timer := time.NewTimer(timeout)
  293. defer timer.Stop()
  294. for {
  295. select {
  296. case <-ticker.C:
  297. ps, err := s.vc.Service().GetProxyStatus(name)
  298. if err != nil {
  299. continue
  300. }
  301. switch ps.Phase {
  302. case proxy.ProxyPhaseRunning:
  303. return nil
  304. case proxy.ProxyPhaseStartErr, proxy.ProxyPhaseClosed:
  305. return errors.New(ps.Err)
  306. }
  307. case <-timer.C:
  308. return fmt.Errorf("wait proxy status ready timeout")
  309. case <-s.doneCh:
  310. return fmt.Errorf("ssh tunnel server closed")
  311. }
  312. }
  313. }