Browse Source

update github.com/pires/go-proxyproto to v0.5.0

fatedier 3 years ago
parent
commit
fe4e9b55f3

+ 5 - 5
client/proxy/proxy.go

@@ -757,12 +757,12 @@ func HandleTCPWorkConnection(ctx context.Context, localInfo *config.LocalSvrConf
 			if m.DstAddr == "" {
 				m.DstAddr = "127.0.0.1"
 			}
+			srcAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.SrcAddr, strconv.Itoa(int(m.SrcPort))))
+			dstAddr, _ := net.ResolveTCPAddr("tcp", net.JoinHostPort(m.DstAddr, strconv.Itoa(int(m.DstPort))))
 			h := &pp.Header{
-				Command:            pp.PROXY,
-				SourceAddress:      net.ParseIP(m.SrcAddr),
-				SourcePort:         m.SrcPort,
-				DestinationAddress: net.ParseIP(m.DstAddr),
-				DestinationPort:    m.DstPort,
+				Command:         pp.PROXY,
+				SourceAddr:      srcAddr,
+				DestinationAddr: dstAddr,
 			}
 
 			if strings.Contains(m.SrcAddr, ".") {

+ 1 - 1
go.mod

@@ -18,7 +18,7 @@ require (
 	github.com/leodido/go-urn v1.2.1 // indirect
 	github.com/onsi/ginkgo v1.16.4
 	github.com/onsi/gomega v1.13.0
-	github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc
+	github.com/pires/go-proxyproto v0.5.0
 	github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
 	github.com/prometheus/client_golang v1.11.0
 	github.com/rakyll/statik v0.1.1

+ 2 - 2
go.sum

@@ -260,8 +260,8 @@ github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak=
 github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY=
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
-github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc h1:lNOt1SMsgHXTdpuGw+RpnJtzUcCb/oRKZP65pBy9pr8=
-github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc/go.mod h1:6/gX3+E/IYGa0wMORlSMla999awQFdbaeQCHjSMKIzY=
+github.com/pires/go-proxyproto v0.5.0 h1:A4Jv4ZCaV3AFJeGh5mGwkz4iuWUYMlQ7IoO/GTuSuLo=
+github.com/pires/go-proxyproto v0.5.0/go.mod h1:Odh9VFOZJCf9G8cLW5o435Xf1J95Jw9Gw5rnCjcwzAY=
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=

+ 1 - 1
hack/run-e2e.sh

@@ -17,4 +17,4 @@ if [ x${LOG_LEVEL} != x"" ]; then
     logLevel=${LOG_LEVEL}
 fi
 
-ginkgo -nodes=5 -slowSpecThreshold=10 ${ROOT}/test/e2e -- -frpc-path=${ROOT}/bin/frpc -frps-path=${ROOT}/bin/frps -log-level=${logLevel} -debug=${debug}
+ginkgo -nodes=5 -slowSpecThreshold=20 ${ROOT}/test/e2e -- -frpc-path=${ROOT}/bin/frpc -frps-path=${ROOT}/bin/frps -log-level=${logLevel} -debug=${debug}

+ 1 - 1
test/e2e/basic/http.go

@@ -272,7 +272,7 @@ var _ = Describe("[Feature: HTTP]", func() {
 			Ensure()
 	})
 
-	It("websocket", func() {
+	It("Websocket protocol", func() {
 		vhostHTTPPort := f.AllocPort()
 		serverConf := getDefaultServerConf(vhostHTTPPort)
 

+ 1 - 0
test/e2e/e2e_test.go

@@ -11,6 +11,7 @@ import (
 
 	// test source
 	_ "github.com/fatedier/frp/test/e2e/basic"
+	_ "github.com/fatedier/frp/test/e2e/features"
 	_ "github.com/fatedier/frp/test/e2e/plugin"
 
 	_ "github.com/onsi/ginkgo"

+ 47 - 0
test/e2e/features/bandwidth_limit.go

@@ -0,0 +1,47 @@
+package features
+
+import (
+	"fmt"
+	"strings"
+	"time"
+
+	"github.com/fatedier/frp/test/e2e/framework"
+	"github.com/fatedier/frp/test/e2e/framework/consts"
+	"github.com/fatedier/frp/test/e2e/mock/server/streamserver"
+	"github.com/fatedier/frp/test/e2e/pkg/request"
+
+	. "github.com/onsi/ginkgo"
+)
+
+var _ = Describe("[Feature: Bandwidth Limit]", func() {
+	f := framework.NewDefaultFramework()
+
+	It("Proxy Bandwidth Limit", func() {
+		serverConf := consts.DefaultServerConfig
+		clientConf := consts.DefaultClientConfig
+
+		localPort := f.AllocPort()
+		localServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(localPort))
+		f.RunServer("", localServer)
+
+		remotePort := f.AllocPort()
+		clientConf += fmt.Sprintf(`
+			[tcp]
+			type = tcp
+			local_port = %d
+			remote_port = %d
+			bandwidth_limit = 10KB
+			`, localPort, remotePort)
+
+		f.RunProcesses([]string{serverConf}, []string{clientConf})
+
+		content := strings.Repeat("a", 50*1024) // 5KB
+		start := time.Now()
+		framework.NewRequestExpect(f).Port(remotePort).RequestModify(func(r *request.Request) {
+			r.Body([]byte(content)).Timeout(30 * time.Second)
+		}).ExpectResp([]byte(content)).Ensure()
+		duration := time.Now().Sub(start)
+
+		framework.ExpectTrue(duration.Seconds() > 7, "100Kb with 10KB limit, want > 7 seconds, but got %d seconds", duration.Seconds())
+	})
+})

+ 1 - 1
test/e2e/basic/chaos.go → test/e2e/features/chaos.go

@@ -1,4 +1,4 @@
-package basic
+package features
 
 import (
 	"fmt"

+ 1 - 1
test/e2e/basic/group.go → test/e2e/features/group.go

@@ -1,4 +1,4 @@
-package basic
+package features
 
 import (
 	"fmt"

+ 20 - 0
test/e2e/features/real_ip.go

@@ -0,0 +1,20 @@
+package features
+
+import (
+	"github.com/fatedier/frp/test/e2e/framework"
+
+	. "github.com/onsi/ginkgo"
+)
+
+var _ = Describe("[Feature: Real IP]", func() {
+	f := framework.NewDefaultFramework()
+
+	It("HTTP X-Forwarded-For", func() {
+		// TODO
+		_ = f
+	})
+
+	It("Proxy Protocol", func() {
+		// TODO
+	})
+})

+ 3 - 3
test/e2e/framework/mockservers.go

@@ -31,8 +31,8 @@ func NewMockServers(portAllocator *port.Allocator) *MockServers {
 	tcpPort := portAllocator.Get()
 	udpPort := portAllocator.Get()
 	httpPort := portAllocator.Get()
-	s.tcpEchoServer = streamserver.New(streamserver.TCP, streamserver.WithBindPort(tcpPort), streamserver.WithEchoMode(true))
-	s.udpEchoServer = streamserver.New(streamserver.UDP, streamserver.WithBindPort(udpPort), streamserver.WithEchoMode(true))
+	s.tcpEchoServer = streamserver.New(streamserver.TCP, streamserver.WithBindPort(tcpPort))
+	s.udpEchoServer = streamserver.New(streamserver.UDP, streamserver.WithBindPort(udpPort))
 	s.httpSimpleServer = httpserver.New(httpserver.WithBindPort(httpPort), httpserver.WithHandler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
 		w.Write([]byte(consts.TestString))
 	})))
@@ -40,7 +40,7 @@ func NewMockServers(portAllocator *port.Allocator) *MockServers {
 	udsIndex := portAllocator.Get()
 	udsAddr := fmt.Sprintf("%s/frp_echo_server_%d.sock", os.TempDir(), udsIndex)
 	os.Remove(udsAddr)
-	s.udsEchoServer = streamserver.New(streamserver.Unix, streamserver.WithBindAddr(udsAddr), streamserver.WithEchoMode(true))
+	s.udsEchoServer = streamserver.New(streamserver.Unix, streamserver.WithBindAddr(udsAddr))
 	return s
 }
 

+ 5 - 24
test/e2e/mock/server/streamserver/server.go

@@ -5,6 +5,7 @@ import (
 	"net"
 
 	libnet "github.com/fatedier/frp/pkg/util/net"
+	"github.com/fatedier/frp/test/e2e/pkg/rpc"
 )
 
 type Type string
@@ -20,9 +21,6 @@ type Server struct {
 	bindAddr    string
 	bindPort    int
 	respContent []byte
-	bufSize     int64
-
-	echoMode bool
 
 	l net.Listener
 }
@@ -33,7 +31,6 @@ func New(netType Type, options ...Option) *Server {
 	s := &Server{
 		netType:  netType,
 		bindAddr: "127.0.0.1",
-		bufSize:  2048,
 	}
 
 	for _, option := range options {
@@ -63,20 +60,6 @@ func WithRespContent(content []byte) Option {
 	}
 }
 
-func WithBufSize(bufSize int64) Option {
-	return func(s *Server) *Server {
-		s.bufSize = bufSize
-		return s
-	}
-}
-
-func WithEchoMode(echoMode bool) Option {
-	return func(s *Server) *Server {
-		s.echoMode = echoMode
-		return s
-	}
-}
-
 func (s *Server) Run() error {
 	if err := s.initListener(); err != nil {
 		return err
@@ -118,18 +101,16 @@ func (s *Server) initListener() (err error) {
 func (s *Server) handle(c net.Conn) {
 	defer c.Close()
 
-	buf := make([]byte, s.bufSize)
 	for {
-		n, err := c.Read(buf)
+		buf, err := rpc.ReadBytes(c)
 		if err != nil {
 			return
 		}
 
-		if s.echoMode {
-			c.Write(buf[:n])
-		} else {
-			c.Write(s.respContent)
+		if len(s.respContent) > 0 {
+			buf = s.respContent
 		}
+		rpc.WriteBytes(c, buf)
 	}
 }
 

+ 4 - 4
test/e2e/pkg/request/request.go

@@ -11,6 +11,7 @@ import (
 	"strconv"
 	"time"
 
+	"github.com/fatedier/frp/test/e2e/pkg/rpc"
 	libnet "github.com/fatedier/golib/net"
 )
 
@@ -210,15 +211,14 @@ func sendHTTPRequest(method, urlstr string, host string, headers map[string]stri
 }
 
 func sendRequestByConn(c net.Conn, content []byte) ([]byte, error) {
-	_, err := c.Write(content)
+	_, err := rpc.WriteBytes(c, content)
 	if err != nil {
 		return nil, fmt.Errorf("write error: %v", err)
 	}
 
-	buf := make([]byte, 2048)
-	n, err := c.Read(buf)
+	buf, err := rpc.ReadBytes(c)
 	if err != nil {
 		return nil, fmt.Errorf("read error: %v", err)
 	}
-	return buf[:n], nil
+	return buf, nil
 }

+ 35 - 0
test/e2e/pkg/rpc/rpc.go

@@ -0,0 +1,35 @@
+package rpc
+
+import (
+	"bufio"
+	"bytes"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+func WriteBytes(w io.Writer, buf []byte) (int, error) {
+	out := bytes.NewBuffer(nil)
+	binary.Write(out, binary.BigEndian, int64(len(buf)))
+	out.Write(buf)
+	return w.Write(out.Bytes())
+}
+
+func ReadBytes(r io.Reader) ([]byte, error) {
+	// To compatible with UDP connection, use bufio reader here to avoid lost conent.
+	rd := bufio.NewReader(r)
+
+	var length int64
+	if err := binary.Read(rd, binary.BigEndian, &length); err != nil {
+		return nil, err
+	}
+	buffer := make([]byte, length)
+	n, err := io.ReadFull(rd, buffer)
+	if err != nil {
+		return nil, err
+	}
+	if int64(n) != length {
+		return nil, errors.New("invalid length")
+	}
+	return buffer, nil
+}