123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- package proxy
- import (
- "context"
- "fmt"
- "io"
- "net"
- "reflect"
- "strconv"
- "time"
- "github.com/fatedier/golib/errors"
- libio "github.com/fatedier/golib/io"
- v1 "github.com/fatedier/frp/pkg/config/v1"
- "github.com/fatedier/frp/pkg/msg"
- "github.com/fatedier/frp/pkg/proto/udp"
- "github.com/fatedier/frp/pkg/util/limit"
- netpkg "github.com/fatedier/frp/pkg/util/net"
- "github.com/fatedier/frp/server/metrics"
- )
- func init() {
- RegisterProxyFactory(reflect.TypeOf(&v1.UDPProxyConfig{}), NewUDPProxy)
- }
- type UDPProxy struct {
- *BaseProxy
- cfg *v1.UDPProxyConfig
- realBindPort int
-
- udpConn *net.UDPConn
-
-
- workConn net.Conn
-
- sendCh chan *msg.UDPPacket
-
- readCh chan *msg.UDPPacket
-
- checkCloseCh chan int
- isClosed bool
- }
- func NewUDPProxy(baseProxy *BaseProxy) Proxy {
- unwrapped, ok := baseProxy.GetConfigurer().(*v1.UDPProxyConfig)
- if !ok {
- return nil
- }
- baseProxy.usedPortsNum = 1
- return &UDPProxy{
- BaseProxy: baseProxy,
- cfg: unwrapped,
- }
- }
- func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
- xl := pxy.xl
- pxy.realBindPort, err = pxy.rc.UDPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
- if err != nil {
- return "", fmt.Errorf("acquire port %d error: %v", pxy.cfg.RemotePort, err)
- }
- defer func() {
- if err != nil {
- pxy.rc.UDPPortManager.Release(pxy.realBindPort)
- }
- }()
- remoteAddr = fmt.Sprintf(":%d", pxy.realBindPort)
- pxy.cfg.RemotePort = pxy.realBindPort
- addr, errRet := net.ResolveUDPAddr("udp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realBindPort)))
- if errRet != nil {
- err = errRet
- return
- }
- udpConn, errRet := net.ListenUDP("udp", addr)
- if errRet != nil {
- err = errRet
- xl.Warnf("listen udp port error: %v", err)
- return
- }
- xl.Infof("udp proxy listen port [%d]", pxy.cfg.RemotePort)
- pxy.udpConn = udpConn
- pxy.sendCh = make(chan *msg.UDPPacket, 1024)
- pxy.readCh = make(chan *msg.UDPPacket, 1024)
- pxy.checkCloseCh = make(chan int)
-
- workConnReaderFn := func(conn net.Conn) {
- for {
- var (
- rawMsg msg.Message
- errRet error
- )
- xl.Tracef("loop waiting message from udp workConn")
-
- _ = conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
- if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
- xl.Warnf("read from workConn for udp error: %v", errRet)
- _ = conn.Close()
-
-
- _ = errors.PanicToError(func() {
- pxy.checkCloseCh <- 1
- })
- return
- }
- if err := conn.SetReadDeadline(time.Time{}); err != nil {
- xl.Warnf("set read deadline error: %v", err)
- }
- switch m := rawMsg.(type) {
- case *msg.Ping:
- xl.Tracef("udp work conn get ping message")
- continue
- case *msg.UDPPacket:
- if errRet := errors.PanicToError(func() {
- xl.Tracef("get udp message from workConn: %s", m.Content)
- pxy.readCh <- m
- metrics.Server.AddTrafficOut(
- pxy.GetName(),
- pxy.GetConfigurer().GetBaseConfig().Type,
- int64(len(m.Content)),
- )
- }); errRet != nil {
- conn.Close()
- xl.Infof("reader goroutine for udp work connection closed")
- return
- }
- }
- }
- }
-
- workConnSenderFn := func(conn net.Conn, ctx context.Context) {
- var errRet error
- for {
- select {
- case udpMsg, ok := <-pxy.sendCh:
- if !ok {
- xl.Infof("sender goroutine for udp work connection closed")
- return
- }
- if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
- xl.Infof("sender goroutine for udp work connection closed: %v", errRet)
- conn.Close()
- return
- }
- xl.Tracef("send message to udp workConn: %s", udpMsg.Content)
- metrics.Server.AddTrafficIn(
- pxy.GetName(),
- pxy.GetConfigurer().GetBaseConfig().Type,
- int64(len(udpMsg.Content)),
- )
- continue
- case <-ctx.Done():
- xl.Infof("sender goroutine for udp work connection closed")
- return
- }
- }
- }
- go func() {
-
- time.Sleep(500 * time.Millisecond)
- for {
- workConn, err := pxy.GetWorkConnFromPool(nil, nil)
- if err != nil {
- time.Sleep(1 * time.Second)
-
- select {
- case _, ok := <-pxy.checkCloseCh:
- if !ok {
- return
- }
- default:
- }
- continue
- }
-
- if pxy.workConn != nil {
- pxy.workConn.Close()
- }
- var rwc io.ReadWriteCloser = workConn
- if pxy.cfg.Transport.UseEncryption {
- rwc, err = libio.WithEncryption(rwc, []byte(pxy.serverCfg.Auth.Token))
- if err != nil {
- xl.Errorf("create encryption stream error: %v", err)
- workConn.Close()
- continue
- }
- }
- if pxy.cfg.Transport.UseCompression {
- rwc = libio.WithCompression(rwc)
- }
- if pxy.GetLimiter() != nil {
- rwc = libio.WrapReadWriteCloser(limit.NewReader(rwc, pxy.GetLimiter()), limit.NewWriter(rwc, pxy.GetLimiter()), func() error {
- return rwc.Close()
- })
- }
- pxy.workConn = netpkg.WrapReadWriteCloserToConn(rwc, workConn)
- ctx, cancel := context.WithCancel(context.Background())
- go workConnReaderFn(pxy.workConn)
- go workConnSenderFn(pxy.workConn, ctx)
- _, ok := <-pxy.checkCloseCh
- cancel()
- if !ok {
- return
- }
- }
- }()
-
-
-
-
- go func() {
- udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh, int(pxy.serverCfg.UDPPacketSize))
- pxy.Close()
- }()
- return remoteAddr, nil
- }
- func (pxy *UDPProxy) Close() {
- pxy.mu.Lock()
- defer pxy.mu.Unlock()
- if !pxy.isClosed {
- pxy.isClosed = true
- pxy.BaseProxy.Close()
- if pxy.workConn != nil {
- pxy.workConn.Close()
- }
- pxy.udpConn.Close()
-
- close(pxy.checkCloseCh)
- close(pxy.readCh)
- close(pxy.sendCh)
- }
- pxy.rc.UDPPortManager.Release(pxy.realBindPort)
- }
|