123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- package proxy
- import (
- "io"
- "net"
- "reflect"
- "time"
- fmux "github.com/hashicorp/yamux"
- "github.com/quic-go/quic-go"
- v1 "github.com/fatedier/frp/pkg/config/v1"
- "github.com/fatedier/frp/pkg/msg"
- "github.com/fatedier/frp/pkg/nathole"
- "github.com/fatedier/frp/pkg/transport"
- netpkg "github.com/fatedier/frp/pkg/util/net"
- )
- func init() {
- RegisterProxyFactory(reflect.TypeOf(&v1.XTCPProxyConfig{}), NewXTCPProxy)
- }
- type XTCPProxy struct {
- *BaseProxy
- cfg *v1.XTCPProxyConfig
- }
- func NewXTCPProxy(baseProxy *BaseProxy, cfg v1.ProxyConfigurer) Proxy {
- unwrapped, ok := cfg.(*v1.XTCPProxyConfig)
- if !ok {
- return nil
- }
- return &XTCPProxy{
- BaseProxy: baseProxy,
- cfg: unwrapped,
- }
- }
- func (pxy *XTCPProxy) InWorkConn(conn net.Conn, startWorkConnMsg *msg.StartWorkConn) {
- xl := pxy.xl
- defer conn.Close()
- var natHoleSidMsg msg.NatHoleSid
- err := msg.ReadMsgInto(conn, &natHoleSidMsg)
- if err != nil {
- xl.Errorf("xtcp read from workConn error: %v", err)
- return
- }
- xl.Tracef("nathole prepare start")
- prepareResult, err := nathole.Prepare([]string{pxy.clientCfg.NatHoleSTUNServer})
- if err != nil {
- xl.Warnf("nathole prepare error: %v", err)
- return
- }
- xl.Infof("nathole prepare success, nat type: %s, behavior: %s, addresses: %v, assistedAddresses: %v",
- prepareResult.NatType, prepareResult.Behavior, prepareResult.Addrs, prepareResult.AssistedAddrs)
- defer prepareResult.ListenConn.Close()
-
- transactionID := nathole.NewTransactionID()
- natHoleClientMsg := &msg.NatHoleClient{
- TransactionID: transactionID,
- ProxyName: pxy.cfg.Name,
- Sid: natHoleSidMsg.Sid,
- MappedAddrs: prepareResult.Addrs,
- AssistedAddrs: prepareResult.AssistedAddrs,
- }
- xl.Tracef("nathole exchange info start")
- natHoleRespMsg, err := nathole.ExchangeInfo(pxy.ctx, pxy.msgTransporter, transactionID, natHoleClientMsg, 5*time.Second)
- if err != nil {
- xl.Warnf("nathole exchange info error: %v", err)
- return
- }
- xl.Infof("get natHoleRespMsg, sid [%s], protocol [%s], candidate address %v, assisted address %v, detectBehavior: %+v",
- natHoleRespMsg.Sid, natHoleRespMsg.Protocol, natHoleRespMsg.CandidateAddrs,
- natHoleRespMsg.AssistedAddrs, natHoleRespMsg.DetectBehavior)
- listenConn := prepareResult.ListenConn
- newListenConn, raddr, err := nathole.MakeHole(pxy.ctx, listenConn, natHoleRespMsg, []byte(pxy.cfg.Secretkey))
- if err != nil {
- listenConn.Close()
- xl.Warnf("make hole error: %v", err)
- _ = pxy.msgTransporter.Send(&msg.NatHoleReport{
- Sid: natHoleRespMsg.Sid,
- Success: false,
- })
- return
- }
- listenConn = newListenConn
- xl.Infof("establishing nat hole connection successful, sid [%s], remoteAddr [%s]", natHoleRespMsg.Sid, raddr)
- _ = pxy.msgTransporter.Send(&msg.NatHoleReport{
- Sid: natHoleRespMsg.Sid,
- Success: true,
- })
- if natHoleRespMsg.Protocol == "kcp" {
- pxy.listenByKCP(listenConn, raddr, startWorkConnMsg)
- return
- }
-
- pxy.listenByQUIC(listenConn, raddr, startWorkConnMsg)
- }
- func (pxy *XTCPProxy) listenByKCP(listenConn *net.UDPConn, raddr *net.UDPAddr, startWorkConnMsg *msg.StartWorkConn) {
- xl := pxy.xl
- listenConn.Close()
- laddr, _ := net.ResolveUDPAddr("udp", listenConn.LocalAddr().String())
- lConn, err := net.DialUDP("udp", laddr, raddr)
- if err != nil {
- xl.Warnf("dial udp error: %v", err)
- return
- }
- defer lConn.Close()
- remote, err := netpkg.NewKCPConnFromUDP(lConn, true, raddr.String())
- if err != nil {
- xl.Warnf("create kcp connection from udp connection error: %v", err)
- return
- }
- fmuxCfg := fmux.DefaultConfig()
- fmuxCfg.KeepAliveInterval = 10 * time.Second
- fmuxCfg.MaxStreamWindowSize = 6 * 1024 * 1024
- fmuxCfg.LogOutput = io.Discard
- session, err := fmux.Server(remote, fmuxCfg)
- if err != nil {
- xl.Errorf("create mux session error: %v", err)
- return
- }
- defer session.Close()
- for {
- muxConn, err := session.Accept()
- if err != nil {
- xl.Errorf("accept connection error: %v", err)
- return
- }
- go pxy.HandleTCPWorkConnection(muxConn, startWorkConnMsg, []byte(pxy.cfg.Secretkey))
- }
- }
- func (pxy *XTCPProxy) listenByQUIC(listenConn *net.UDPConn, _ *net.UDPAddr, startWorkConnMsg *msg.StartWorkConn) {
- xl := pxy.xl
- defer listenConn.Close()
- tlsConfig, err := transport.NewServerTLSConfig("", "", "")
- if err != nil {
- xl.Warnf("create tls config error: %v", err)
- return
- }
- tlsConfig.NextProtos = []string{"frp"}
- quicListener, err := quic.Listen(listenConn, tlsConfig,
- &quic.Config{
- MaxIdleTimeout: time.Duration(pxy.clientCfg.Transport.QUIC.MaxIdleTimeout) * time.Second,
- MaxIncomingStreams: int64(pxy.clientCfg.Transport.QUIC.MaxIncomingStreams),
- KeepAlivePeriod: time.Duration(pxy.clientCfg.Transport.QUIC.KeepalivePeriod) * time.Second,
- },
- )
- if err != nil {
- xl.Warnf("dial quic error: %v", err)
- return
- }
-
- c, err := quicListener.Accept(pxy.ctx)
- if err != nil {
- xl.Errorf("quic accept connection error: %v", err)
- return
- }
- for {
- stream, err := c.AcceptStream(pxy.ctx)
- if err != nil {
- xl.Debugf("quic accept stream error: %v", err)
- _ = c.CloseWithError(0, "")
- return
- }
- go pxy.HandleTCPWorkConnection(netpkg.QuicStreamToNetConn(stream, c), startWorkConnMsg, []byte(pxy.cfg.Secretkey))
- }
- }
|