server.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. // Copyright 2023 The frp Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package ssh
  15. import (
  16. "context"
  17. "encoding/binary"
  18. "errors"
  19. "fmt"
  20. "net"
  21. "strings"
  22. "sync"
  23. "time"
  24. libio "github.com/fatedier/golib/io"
  25. "github.com/samber/lo"
  26. "github.com/spf13/cobra"
  27. "golang.org/x/crypto/ssh"
  28. "github.com/fatedier/frp/client/proxy"
  29. "github.com/fatedier/frp/pkg/config"
  30. v1 "github.com/fatedier/frp/pkg/config/v1"
  31. "github.com/fatedier/frp/pkg/msg"
  32. "github.com/fatedier/frp/pkg/util/log"
  33. netpkg "github.com/fatedier/frp/pkg/util/net"
  34. "github.com/fatedier/frp/pkg/util/util"
  35. "github.com/fatedier/frp/pkg/util/xlog"
  36. "github.com/fatedier/frp/pkg/virtual"
  37. )
  38. const (
  39. // https://datatracker.ietf.org/doc/html/rfc4254#page-16
  40. ChannelTypeServerOpenChannel = "forwarded-tcpip"
  41. RequestTypeForward = "tcpip-forward"
  42. )
  43. type tcpipForward struct {
  44. Host string
  45. Port uint32
  46. }
  47. // https://datatracker.ietf.org/doc/html/rfc4254#page-16
  48. type forwardedTCPPayload struct {
  49. Addr string
  50. Port uint32
  51. // can be default empty value but do not delete it
  52. // because ssh protocol shoule be reserved
  53. OriginAddr string
  54. OriginPort uint32
  55. }
  56. type TunnelServer struct {
  57. underlyingConn net.Conn
  58. sshConn *ssh.ServerConn
  59. sc *ssh.ServerConfig
  60. vc *virtual.Client
  61. peerServerListener *netpkg.InternalListener
  62. doneCh chan struct{}
  63. closeDoneChOnce sync.Once
  64. }
  65. func NewTunnelServer(conn net.Conn, sc *ssh.ServerConfig, peerServerListener *netpkg.InternalListener) (*TunnelServer, error) {
  66. s := &TunnelServer{
  67. underlyingConn: conn,
  68. sc: sc,
  69. peerServerListener: peerServerListener,
  70. doneCh: make(chan struct{}),
  71. }
  72. return s, nil
  73. }
  74. func (s *TunnelServer) Run() error {
  75. sshConn, channels, requests, err := ssh.NewServerConn(s.underlyingConn, s.sc)
  76. if err != nil {
  77. return err
  78. }
  79. s.sshConn = sshConn
  80. addr, extraPayload, err := s.waitForwardAddrAndExtraPayload(channels, requests, 3*time.Second)
  81. if err != nil {
  82. return err
  83. }
  84. clientCfg, pc, err := s.parseClientAndProxyConfigurer(addr, extraPayload)
  85. if err != nil {
  86. return err
  87. }
  88. clientCfg.Complete()
  89. if sshConn.Permissions != nil {
  90. clientCfg.User = util.EmptyOr(sshConn.Permissions.Extensions["user"], clientCfg.User)
  91. }
  92. pc.Complete(clientCfg.User)
  93. vc, err := virtual.NewClient(virtual.ClientOptions{
  94. Common: clientCfg,
  95. Spec: &msg.ClientSpec{
  96. Type: "ssh-tunnel",
  97. // If ssh does not require authentication, then the virtual client needs to authenticate through a token.
  98. // Otherwise, once ssh authentication is passed, the virtual client does not need to authenticate again.
  99. AlwaysAuthPass: !s.sc.NoClientAuth,
  100. },
  101. HandleWorkConnCb: func(base *v1.ProxyBaseConfig, workConn net.Conn, m *msg.StartWorkConn) bool {
  102. // join workConn and ssh channel
  103. c, err := s.openConn(addr)
  104. if err != nil {
  105. return false
  106. }
  107. libio.Join(c, workConn)
  108. return false
  109. },
  110. })
  111. if err != nil {
  112. return err
  113. }
  114. s.vc = vc
  115. // transfer connection from virtual client to server peer listener
  116. go func() {
  117. l := s.vc.PeerListener()
  118. for {
  119. conn, err := l.Accept()
  120. if err != nil {
  121. return
  122. }
  123. _ = s.peerServerListener.PutConn(conn)
  124. }
  125. }()
  126. xl := xlog.New().AddPrefix(xlog.LogPrefix{Name: "sshVirtualClient", Value: "sshVirtualClient", Priority: 100})
  127. ctx := xlog.NewContext(context.Background(), xl)
  128. go func() {
  129. _ = s.vc.Run(ctx)
  130. // If vc.Run returns, it means that the virtual client has been closed, and the ssh tunnel connection should be closed.
  131. // One scenario is that the virtual client exits due to login failure.
  132. s.closeDoneChOnce.Do(func() {
  133. _ = sshConn.Close()
  134. close(s.doneCh)
  135. })
  136. }()
  137. s.vc.UpdateProxyConfigurer([]v1.ProxyConfigurer{pc})
  138. if err := s.waitProxyStatusReady(pc.GetBaseConfig().Name, time.Second); err != nil {
  139. log.Warn("wait proxy status ready error: %v", err)
  140. } else {
  141. _ = sshConn.Wait()
  142. }
  143. s.vc.Close()
  144. log.Trace("ssh tunnel connection from %v closed", sshConn.RemoteAddr())
  145. s.closeDoneChOnce.Do(func() {
  146. _ = sshConn.Close()
  147. close(s.doneCh)
  148. })
  149. return nil
  150. }
  151. func (s *TunnelServer) waitForwardAddrAndExtraPayload(
  152. channels <-chan ssh.NewChannel,
  153. requests <-chan *ssh.Request,
  154. timeout time.Duration,
  155. ) (*tcpipForward, string, error) {
  156. addrCh := make(chan *tcpipForward, 1)
  157. extraPayloadCh := make(chan string, 1)
  158. // get forward address
  159. go func() {
  160. addrGot := false
  161. for req := range requests {
  162. switch req.Type {
  163. case RequestTypeForward:
  164. if !addrGot {
  165. payload := tcpipForward{}
  166. if err := ssh.Unmarshal(req.Payload, &payload); err != nil {
  167. return
  168. }
  169. addrGot = true
  170. addrCh <- &payload
  171. }
  172. default:
  173. if req.WantReply {
  174. _ = req.Reply(true, nil)
  175. }
  176. }
  177. }
  178. }()
  179. // get extra payload
  180. go func() {
  181. for newChannel := range channels {
  182. // extraPayload will send to extraPayloadCh
  183. go s.handleNewChannel(newChannel, extraPayloadCh)
  184. }
  185. }()
  186. var (
  187. addr *tcpipForward
  188. extraPayload string
  189. )
  190. timer := time.NewTimer(timeout)
  191. defer timer.Stop()
  192. for {
  193. select {
  194. case v := <-addrCh:
  195. addr = v
  196. case extra := <-extraPayloadCh:
  197. extraPayload = extra
  198. case <-timer.C:
  199. return nil, "", fmt.Errorf("get addr and extra payload timeout")
  200. }
  201. if addr != nil && extraPayload != "" {
  202. break
  203. }
  204. }
  205. return addr, extraPayload, nil
  206. }
  207. func (s *TunnelServer) parseClientAndProxyConfigurer(_ *tcpipForward, extraPayload string) (*v1.ClientCommonConfig, v1.ProxyConfigurer, error) {
  208. cmd := &cobra.Command{}
  209. args := strings.Split(extraPayload, " ")
  210. if len(args) < 1 {
  211. return nil, nil, fmt.Errorf("invalid extra payload")
  212. }
  213. proxyType := strings.TrimSpace(args[0])
  214. supportTypes := []string{"tcp", "http", "https", "tcpmux", "stcp"}
  215. if !lo.Contains(supportTypes, proxyType) {
  216. return nil, nil, fmt.Errorf("invalid proxy type: %s, support types: %v", proxyType, supportTypes)
  217. }
  218. pc := v1.NewProxyConfigurerByType(v1.ProxyType(proxyType))
  219. if pc == nil {
  220. return nil, nil, fmt.Errorf("new proxy configurer error")
  221. }
  222. config.RegisterProxyFlags(cmd, pc)
  223. clientCfg := v1.ClientCommonConfig{}
  224. config.RegisterClientCommonConfigFlags(cmd, &clientCfg)
  225. if err := cmd.ParseFlags(args); err != nil {
  226. return nil, nil, fmt.Errorf("parse flags from ssh client error: %v", err)
  227. }
  228. // if name is not set, generate a random one
  229. if pc.GetBaseConfig().Name == "" {
  230. id, err := util.RandIDWithLen(8)
  231. if err != nil {
  232. return nil, nil, fmt.Errorf("generate random id error: %v", err)
  233. }
  234. pc.GetBaseConfig().Name = fmt.Sprintf("sshtunnel-%s-%s", proxyType, id)
  235. }
  236. return &clientCfg, pc, nil
  237. }
  238. func (s *TunnelServer) handleNewChannel(channel ssh.NewChannel, extraPayloadCh chan string) {
  239. ch, reqs, err := channel.Accept()
  240. if err != nil {
  241. return
  242. }
  243. go s.keepAlive(ch)
  244. for req := range reqs {
  245. if req.Type != "exec" {
  246. continue
  247. }
  248. if len(req.Payload) <= 4 {
  249. continue
  250. }
  251. end := 4 + binary.BigEndian.Uint32(req.Payload[:4])
  252. if len(req.Payload) < int(end) {
  253. continue
  254. }
  255. extraPayload := string(req.Payload[4:end])
  256. select {
  257. case extraPayloadCh <- extraPayload:
  258. default:
  259. }
  260. }
  261. }
  262. func (s *TunnelServer) keepAlive(ch ssh.Channel) {
  263. tk := time.NewTicker(time.Second * 30)
  264. defer tk.Stop()
  265. for {
  266. select {
  267. case <-tk.C:
  268. _, err := ch.SendRequest("heartbeat", false, nil)
  269. if err != nil {
  270. return
  271. }
  272. case <-s.doneCh:
  273. return
  274. }
  275. }
  276. }
  277. func (s *TunnelServer) openConn(addr *tcpipForward) (net.Conn, error) {
  278. payload := forwardedTCPPayload{
  279. Addr: addr.Host,
  280. Port: addr.Port,
  281. }
  282. channel, reqs, err := s.sshConn.OpenChannel(ChannelTypeServerOpenChannel, ssh.Marshal(&payload))
  283. if err != nil {
  284. return nil, fmt.Errorf("open ssh channel error: %v", err)
  285. }
  286. go ssh.DiscardRequests(reqs)
  287. conn := netpkg.WrapReadWriteCloserToConn(channel, s.underlyingConn)
  288. return conn, nil
  289. }
  290. func (s *TunnelServer) waitProxyStatusReady(name string, timeout time.Duration) error {
  291. ticker := time.NewTicker(100 * time.Millisecond)
  292. defer ticker.Stop()
  293. timer := time.NewTimer(timeout)
  294. defer timer.Stop()
  295. for {
  296. select {
  297. case <-ticker.C:
  298. ps, err := s.vc.Service().GetProxyStatus(name)
  299. if err != nil {
  300. continue
  301. }
  302. switch ps.Phase {
  303. case proxy.ProxyPhaseRunning:
  304. return nil
  305. case proxy.ProxyPhaseStartErr, proxy.ProxyPhaseClosed:
  306. return errors.New(ps.Err)
  307. }
  308. case <-timer.C:
  309. return fmt.Errorf("wait proxy status ready timeout")
  310. case <-s.doneCh:
  311. return fmt.Errorf("ssh tunnel server closed")
  312. }
  313. }
  314. }