Browse Source

add e2e tests (#2334)

fatedier 4 years ago
parent
commit
fbaa5f866e

+ 1 - 1
client/proxy/proxy_wrapper.go

@@ -227,7 +227,7 @@ func (pw *Wrapper) InWorkConn(workConn net.Conn, m *msg.StartWorkConn) {
 	pw.mu.RLock()
 	pxy := pw.pxy
 	pw.mu.RUnlock()
-	if pxy != nil {
+	if pxy != nil && pw.Phase == ProxyPhaseRunning {
 		xl.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
 		go pxy.InWorkConn(workConn, m)
 	} else {

+ 3 - 2
client/visitor.go

@@ -366,7 +366,7 @@ func (sv *SUDPVisitor) Run() (err error) {
 	sv.sendCh = make(chan *msg.UDPPacket, 1024)
 	sv.readCh = make(chan *msg.UDPPacket, 1024)
 
-	xl.Info("sudp start to work")
+	xl.Info("sudp start to work, listen on %s", addr)
 
 	go sv.dispatcher()
 	go udp.ForwardUserConn(sv.udpConn, sv.readCh, sv.sendCh, int(sv.ctl.clientCfg.UDPPacketSize))
@@ -446,7 +446,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn) {
 			case *msg.UDPPacket:
 				if errRet := errors.PanicToError(func() {
 					sv.readCh <- m
-					xl.Trace("frpc visitor get udp packet from frpc")
+					xl.Trace("frpc visitor get udp packet from workConn: %s", m.Content)
 				}); errRet != nil {
 					xl.Info("reader goroutine for udp work connection closed")
 					return
@@ -475,6 +475,7 @@ func (sv *SUDPVisitor) worker(workConn net.Conn) {
 					xl.Warn("sender goroutine for udp work connection closed: %v", errRet)
 					return
 				}
+				xl.Trace("send udp package to workConn: %s", udpMsg.Content)
 			case <-closeCh:
 				return
 			}

+ 1 - 1
go.mod

@@ -11,7 +11,7 @@ require (
 	github.com/google/uuid v1.1.1
 	github.com/gorilla/mux v1.7.3
 	github.com/gorilla/websocket v1.4.0
-	github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d
+	github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864
 	github.com/inconshreveable/mousetrap v1.0.0 // indirect
 	github.com/klauspost/cpuid v1.2.0 // indirect
 	github.com/klauspost/reedsolomon v1.9.1 // indirect

+ 2 - 2
go.sum

@@ -78,8 +78,8 @@ github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
 github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
 github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
 github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
-github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d h1:kJCB4vdITiW1eC1vq2e6IsrXKrZit1bv/TDYFGMp4BQ=
-github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
+github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864 h1:Y4V+SFe7d3iH+9pJCoeWIOS5/xBJIFsltS7E+KJSsJY=
+github.com/hashicorp/yamux v0.0.0-20210316155119-a95892c5f864/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
 github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=

+ 1 - 1
server/proxy/udp.go

@@ -60,7 +60,7 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
 	xl := pxy.xl
 	pxy.realPort, err = pxy.rc.UDPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
 	if err != nil {
-		return
+		return "", fmt.Errorf("acquire port %d error: %v", pxy.cfg.RemotePort, err)
 	}
 	defer func() {
 		if err != nil {

+ 97 - 23
test/e2e/basic/basic.go

@@ -3,16 +3,15 @@ package basic
 import (
 	"fmt"
 	"strings"
-	"time"
 
 	"github.com/fatedier/frp/test/e2e/framework"
 	"github.com/fatedier/frp/test/e2e/framework/consts"
+	"github.com/fatedier/frp/test/e2e/pkg/port"
+	"github.com/fatedier/frp/test/e2e/pkg/request"
 
 	. "github.com/onsi/ginkgo"
 )
 
-var connTimeout = 2 * time.Second
-
 var _ = Describe("[Feature: Basic]", func() {
 	f := framework.NewDefaultFramework()
 
@@ -50,21 +49,21 @@ var _ = Describe("[Feature: Basic]", func() {
 				}{
 					{
 						proxyName: "normal",
-						portName:  framework.GenPortName("Normal"),
+						portName:  port.GenName("Normal"),
 					},
 					{
 						proxyName:   "with-encryption",
-						portName:    framework.GenPortName("WithEncryption"),
+						portName:    port.GenName("WithEncryption"),
 						extraConfig: "use_encryption = true",
 					},
 					{
 						proxyName:   "with-compression",
-						portName:    framework.GenPortName("WithCompression"),
+						portName:    port.GenName("WithCompression"),
 						extraConfig: "use_compression = true",
 					},
 					{
 						proxyName: "with-encryption-and-compression",
-						portName:  framework.GenPortName("WithEncryptionAndCompression"),
+						portName:  port.GenName("WithEncryptionAndCompression"),
 						extraConfig: `
 						use_encryption = true
 						use_compression = true
@@ -80,8 +79,11 @@ var _ = Describe("[Feature: Basic]", func() {
 				f.RunProcesses([]string{serverConf}, []string{clientConf})
 
 				for _, test := range tests {
-					framework.ExpectRequest(protocol, f.UsedPorts[test.portName],
-						[]byte(consts.TestString), []byte(consts.TestString), connTimeout, test.proxyName)
+					framework.NewRequestExpect(f).
+						Request(framework.SetRequestProtocol(protocol)).
+						PortName(test.portName).
+						Explain(test.proxyName).
+						Ensure()
 				}
 			})
 		}
@@ -139,24 +141,24 @@ var _ = Describe("[Feature: Basic]", func() {
 				}{
 					{
 						proxyName:    "normal",
-						bindPortName: framework.GenPortName("Normal"),
+						bindPortName: port.GenName("Normal"),
 						visitorSK:    correctSK,
 					},
 					{
 						proxyName:    "with-encryption",
-						bindPortName: framework.GenPortName("WithEncryption"),
+						bindPortName: port.GenName("WithEncryption"),
 						visitorSK:    correctSK,
 						extraConfig:  "use_encryption = true",
 					},
 					{
 						proxyName:    "with-compression",
-						bindPortName: framework.GenPortName("WithCompression"),
+						bindPortName: port.GenName("WithCompression"),
 						visitorSK:    correctSK,
 						extraConfig:  "use_compression = true",
 					},
 					{
 						proxyName:    "with-encryption-and-compression",
-						bindPortName: framework.GenPortName("WithEncryptionAndCompression"),
+						bindPortName: port.GenName("WithEncryptionAndCompression"),
 						visitorSK:    correctSK,
 						extraConfig: `
 						use_encryption = true
@@ -165,7 +167,7 @@ var _ = Describe("[Feature: Basic]", func() {
 					},
 					{
 						proxyName:    "with-error-sk",
-						bindPortName: framework.GenPortName("WithErrorSK"),
+						bindPortName: port.GenName("WithErrorSK"),
 						visitorSK:    wrongSK,
 						expectError:  true,
 					},
@@ -182,17 +184,89 @@ var _ = Describe("[Feature: Basic]", func() {
 				f.RunProcesses([]string{serverConf}, []string{clientServerConf, clientVisitorConf})
 
 				for _, test := range tests {
-					expectResp := []byte(consts.TestString)
-					if test.expectError {
-						framework.ExpectRequestError(protocol, f.UsedPorts[test.bindPortName],
-							[]byte(consts.TestString), connTimeout, test.proxyName)
-						continue
-					}
-
-					framework.ExpectRequest(protocol, f.UsedPorts[test.bindPortName],
-						[]byte(consts.TestString), expectResp, connTimeout, test.proxyName)
+					framework.NewRequestExpect(f).
+						Request(framework.SetRequestProtocol(protocol)).
+						PortName(test.bindPortName).
+						Explain(test.proxyName).
+						ExpectError(test.expectError).
+						Ensure()
+
 				}
 			})
 		}
 	})
+
+	Describe("TCPMUX", func() {
+		It("Type tcpmux", func() {
+			serverConf := consts.DefaultServerConfig
+			clientConf := consts.DefaultClientConfig
+
+			tcpmuxHTTPConnectPortName := port.GenName("TCPMUX")
+			serverConf += fmt.Sprintf(`
+			tcpmux_httpconnect_port = {{ .%s }}
+			`, tcpmuxHTTPConnectPortName)
+
+			getProxyConf := func(proxyName string, extra string) string {
+				return fmt.Sprintf(`
+				[%s]
+				type = tcpmux
+				multiplexer = httpconnect
+				local_port = {{ .%s }}
+				custom_domains = %s
+				`+extra, proxyName, framework.TCPEchoServerPort, proxyName)
+			}
+
+			tests := []struct {
+				proxyName   string
+				portName    string
+				extraConfig string
+			}{
+				{
+					proxyName: "normal",
+				},
+				{
+					proxyName:   "with-encryption",
+					extraConfig: "use_encryption = true",
+				},
+				{
+					proxyName:   "with-compression",
+					extraConfig: "use_compression = true",
+				},
+				{
+					proxyName: "with-encryption-and-compression",
+					extraConfig: `
+						use_encryption = true
+						use_compression = true
+					`,
+				},
+			}
+
+			// build all client config
+			for _, test := range tests {
+				clientConf += getProxyConf(test.proxyName, test.extraConfig) + "\n"
+			}
+			// run frps and frpc
+			f.RunProcesses([]string{serverConf}, []string{clientConf})
+
+			// Request without HTTP connect should get error
+			framework.NewRequestExpect(f).
+				PortName(tcpmuxHTTPConnectPortName).
+				ExpectError(true).
+				Explain("request without HTTP connect expect error").
+				Ensure()
+
+			proxyURL := fmt.Sprintf("http://127.0.0.1:%d", f.PortByName(tcpmuxHTTPConnectPortName))
+			// Request with incorrect connect hostname
+			framework.NewRequestExpect(f).Request(func(r *request.Request) {
+				r.Proxy(proxyURL, "invalid")
+			}).ExpectError(true).Explain("request without HTTP connect expect error").Ensure()
+
+			// Request with correct connect hostname
+			for _, test := range tests {
+				framework.NewRequestExpect(f).Request(func(r *request.Request) {
+					r.Proxy(proxyURL, test.proxyName)
+				}).Explain(test.proxyName).Ensure()
+			}
+		})
+	})
 })

+ 9 - 13
test/e2e/basic/client_server.go

@@ -6,6 +6,7 @@ import (
 
 	"github.com/fatedier/frp/test/e2e/framework"
 	"github.com/fatedier/frp/test/e2e/framework/consts"
+	"github.com/fatedier/frp/test/e2e/pkg/port"
 
 	. "github.com/onsi/ginkgo"
 )
@@ -16,6 +17,7 @@ type generalTestConfigures struct {
 	expectError bool
 }
 
+// defineClientServerTest test a normal tcp and udp proxy with specified TestConfigures.
 func defineClientServerTest(desc string, f *framework.Framework, configures *generalTestConfigures) {
 	It(desc, func() {
 		serverConf := consts.DefaultServerConfig
@@ -25,6 +27,8 @@ func defineClientServerTest(desc string, f *framework.Framework, configures *gen
 				%s
 				`, configures.server)
 
+		tcpPortName := port.GenName("TCP")
+		udpPortName := port.GenName("UDP")
 		clientConf += fmt.Sprintf(`
 				%s
 
@@ -38,23 +42,15 @@ func defineClientServerTest(desc string, f *framework.Framework, configures *gen
 				local_port = {{ .%s }}
 				remote_port = {{ .%s }}
 				`, configures.client,
-			framework.TCPEchoServerPort, framework.GenPortName("TCP"),
-			framework.UDPEchoServerPort, framework.GenPortName("UDP"),
+			framework.TCPEchoServerPort, tcpPortName,
+			framework.UDPEchoServerPort, udpPortName,
 		)
 
 		f.RunProcesses([]string{serverConf}, []string{clientConf})
 
-		if !configures.expectError {
-			framework.ExpectTCPRequest(f.UsedPorts[framework.GenPortName("TCP")],
-				[]byte(consts.TestString), []byte(consts.TestString), connTimeout, "tcp proxy")
-			framework.ExpectUDPRequest(f.UsedPorts[framework.GenPortName("UDP")],
-				[]byte(consts.TestString), []byte(consts.TestString), connTimeout, "udp proxy")
-		} else {
-			framework.ExpectTCPRequestError(f.UsedPorts[framework.GenPortName("TCP")],
-				[]byte(consts.TestString), connTimeout, "tcp proxy")
-			framework.ExpectUDPRequestError(f.UsedPorts[framework.GenPortName("UDP")],
-				[]byte(consts.TestString), connTimeout, "udp proxy")
-		}
+		framework.NewRequestExpect(f).PortName(tcpPortName).ExpectError(configures.expectError).Explain("tcp proxy").Ensure()
+		framework.NewRequestExpect(f).Request(framework.SetRequestProtocol("udp")).
+			PortName(udpPortName).ExpectError(configures.expectError).Explain("udp proxy").Ensure()
 	})
 }
 

+ 79 - 0
test/e2e/basic/server.go

@@ -0,0 +1,79 @@
+package basic
+
+import (
+	"fmt"
+
+	"github.com/fatedier/frp/test/e2e/framework"
+	"github.com/fatedier/frp/test/e2e/framework/consts"
+	"github.com/fatedier/frp/test/e2e/pkg/port"
+	"github.com/fatedier/frp/test/e2e/pkg/request"
+
+	. "github.com/onsi/ginkgo"
+)
+
+var _ = Describe("[Feature: Server Manager]", func() {
+	f := framework.NewDefaultFramework()
+
+	It("Ports Whitelist", func() {
+		serverConf := consts.DefaultServerConfig
+		clientConf := consts.DefaultClientConfig
+
+		serverConf += `
+			allow_ports = 10000-20000,20002,30000-50000
+		`
+
+		tcpPortName := port.GenName("TCP", port.WithRangePorts(10000, 20000))
+		udpPortName := port.GenName("UDP", port.WithRangePorts(30000, 50000))
+		clientConf += fmt.Sprintf(`
+			[tcp-allowded-in-range]
+			type = tcp
+			local_port = {{ .%s }}
+			remote_port = {{ .%s }}
+			`, framework.TCPEchoServerPort, tcpPortName)
+		clientConf += fmt.Sprintf(`
+			[tcp-port-not-allowed]
+			type = tcp
+			local_port = {{ .%s }}
+			remote_port = 20001
+			`, framework.TCPEchoServerPort)
+		clientConf += fmt.Sprintf(`
+			[tcp-port-unavailable]
+			type = tcp
+			local_port = {{ .%s }}
+			remote_port = {{ .%s }}
+			`, framework.TCPEchoServerPort, consts.PortServerName)
+		clientConf += fmt.Sprintf(`
+			[udp-allowed-in-range]
+			type = udp
+			local_port = {{ .%s }}
+			remote_port = {{ .%s }}
+			`, framework.UDPEchoServerPort, udpPortName)
+		clientConf += fmt.Sprintf(`
+			[udp-port-not-allowed]
+			type = udp
+			local_port = {{ .%s }}
+			remote_port = 20003
+			`, framework.UDPEchoServerPort)
+
+		f.RunProcesses([]string{serverConf}, []string{clientConf})
+
+		// TCP
+		// Allowed in range
+		framework.NewRequestExpect(f).PortName(tcpPortName).Ensure()
+
+		// Not Allowed
+		framework.NewRequestExpect(f).Request(framework.SetRequestPort(20001)).ExpectError(true).Ensure()
+
+		// Unavailable, already bind by frps
+		framework.NewRequestExpect(f).PortName(consts.PortServerName).ExpectError(true).Ensure()
+
+		// UDP
+		// Allowed in range
+		framework.NewRequestExpect(f).Request(framework.SetRequestProtocol("udp")).PortName(udpPortName).Ensure()
+
+		// Not Allowed
+		framework.NewRequestExpect(f).Request(func(r *request.Request) {
+			r.UDP().Port(20003)
+		}).ExpectError(true).Ensure()
+	})
+})

+ 4 - 5
test/e2e/examples.go

@@ -2,16 +2,14 @@ package e2e
 
 import (
 	"fmt"
-	"time"
 
 	"github.com/fatedier/frp/test/e2e/framework"
 	"github.com/fatedier/frp/test/e2e/framework/consts"
+	"github.com/fatedier/frp/test/e2e/pkg/port"
 
 	. "github.com/onsi/ginkgo"
 )
 
-var connTimeout = 2 * time.Second
-
 var _ = Describe("[Feature: Example]", func() {
 	f := framework.NewDefaultFramework()
 
@@ -20,16 +18,17 @@ var _ = Describe("[Feature: Example]", func() {
 			serverConf := consts.DefaultServerConfig
 			clientConf := consts.DefaultClientConfig
 
+			portName := port.GenName("TCP")
 			clientConf += fmt.Sprintf(`
 			[tcp]
 			type = tcp
 			local_port = {{ .%s }}
 			remote_port = {{ .%s }}
-			`, framework.TCPEchoServerPort, framework.GenPortName("TCP"))
+			`, framework.TCPEchoServerPort, portName)
 
 			f.RunProcesses([]string{serverConf}, []string{clientConf})
 
-			framework.ExpectTCPRequest(f.UsedPorts[framework.GenPortName("TCP")], []byte(consts.TestString), []byte(consts.TestString), connTimeout)
+			framework.NewRequestExpect(f).PortName(portName).Ensure()
 		})
 	})
 })

+ 23 - 6
test/e2e/framework/consts/consts.go

@@ -1,21 +1,38 @@
 package consts
 
-const (
-	TestString = "frp is a fast reverse proxy to help you expose a local server behind a NAT or firewall to the internet."
+import (
+	"fmt"
+	"time"
+
+	"github.com/fatedier/frp/test/e2e/pkg/port"
 )
 
 const (
-	PortServerName = "PortServer"
+	TestString = "frp is a fast reverse proxy to help you expose a local server behind a NAT or firewall to the internet."
+
+	DefaultTimeout = 2 * time.Second
 )
 
-const (
+var (
+	PortServerName  string
+	PortClientAdmin string
+
 	DefaultServerConfig = `
 	[common]
-	bind_port = {{ .PortServer }}
+	bind_port = {{ .%s }}
+	log_level = trace
 	`
 
 	DefaultClientConfig = `
 	[common]
-	server_port = {{ .PortServer }}
+	server_port = {{ .%s }}
+	log_level = trace
 	`
 )
+
+func init() {
+	PortServerName = port.GenName("Server")
+	PortClientAdmin = port.GenName("ClientAdmin")
+	DefaultServerConfig = fmt.Sprintf(DefaultServerConfig, port.GenName("Server"))
+	DefaultClientConfig = fmt.Sprintf(DefaultClientConfig, port.GenName("Server"))
+}

+ 12 - 4
test/e2e/framework/framework.go

@@ -25,8 +25,11 @@ type Options struct {
 
 type Framework struct {
 	TempDirectory string
-	UsedPorts     map[string]int
 
+	// ports used in this framework indexed by port name.
+	usedPorts map[string]int
+
+	// portAllocator to alloc port for this test case.
 	portAllocator *port.Allocator
 
 	// Multiple mock servers used for e2e testing.
@@ -117,10 +120,10 @@ func (f *Framework) AfterEach() {
 	f.clientConfPaths = nil
 
 	// release used ports
-	for _, port := range f.UsedPorts {
+	for _, port := range f.usedPorts {
 		f.portAllocator.Release(port)
 	}
-	f.UsedPorts = nil
+	f.usedPorts = nil
 }
 
 var portRegex = regexp.MustCompile(`{{ \.Port.*? }}`)
@@ -151,7 +154,7 @@ func (f *Framework) genPortsFromTemplates(templates []string) (ports map[string]
 	}()
 
 	for name := range ports {
-		port := f.portAllocator.Get()
+		port := f.portAllocator.GetByName(name)
 		if port <= 0 {
 			return nil, fmt.Errorf("can't allocate port")
 		}
@@ -161,6 +164,7 @@ func (f *Framework) genPortsFromTemplates(templates []string) (ports map[string]
 
 }
 
+// RenderTemplates alloc all ports for port names placeholder.
 func (f *Framework) RenderTemplates(templates []string) (outs []string, ports map[string]int, err error) {
 	ports, err = f.genPortsFromTemplates(templates)
 	if err != nil {
@@ -185,3 +189,7 @@ func (f *Framework) RenderTemplates(templates []string) (outs []string, ports ma
 	}
 	return
 }
+
+func (f *Framework) PortByName(name string) int {
+	return f.usedPorts[name]
+}

+ 3 - 2
test/e2e/framework/process.go

@@ -28,7 +28,7 @@ func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []str
 	ExpectNoError(err)
 	ExpectTrue(len(templates) > 0)
 
-	f.UsedPorts = ports
+	f.usedPorts = ports
 
 	for i := range serverTemplates {
 		path := filepath.Join(f.TempDirectory, fmt.Sprintf("frp-e2e-server-%d", i))
@@ -40,8 +40,8 @@ func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []str
 		f.serverProcesses = append(f.serverProcesses, p)
 		err = p.Start()
 		ExpectNoError(err)
-		time.Sleep(500 * time.Millisecond)
 	}
+	time.Sleep(time.Second)
 
 	for i := range clientTemplates {
 		index := i + len(serverTemplates)
@@ -56,4 +56,5 @@ func (f *Framework) RunProcesses(serverTemplates []string, clientTemplates []str
 		ExpectNoError(err)
 		time.Sleep(500 * time.Millisecond)
 	}
+	time.Sleep(500 * time.Millisecond)
 }

+ 64 - 30
test/e2e/framework/request.go

@@ -1,51 +1,85 @@
 package framework
 
 import (
-	"time"
-
+	"github.com/fatedier/frp/test/e2e/framework/consts"
 	"github.com/fatedier/frp/test/e2e/pkg/request"
 )
 
-func ExpectRequest(protocol string, port int, in, out []byte, timeout time.Duration, explain ...interface{}) {
-	switch protocol {
-	case "tcp":
-		ExpectTCPRequest(port, in, out, timeout, explain...)
-	case "udp":
-		ExpectUDPRequest(port, in, out, timeout, explain...)
-	default:
-		Failf("ExpectRequest not support protocol: %s", protocol)
+func SetRequestProtocol(protocol string) func(*request.Request) {
+	return func(r *request.Request) {
+		r.Protocol(protocol)
 	}
 }
 
-func ExpectRequestError(protocol string, port int, in []byte, timeout time.Duration, explain ...interface{}) {
-	switch protocol {
-	case "tcp":
-		ExpectTCPRequestError(port, in, timeout, explain...)
-	case "udp":
-		ExpectUDPRequestError(port, in, timeout, explain...)
-	default:
-		Failf("ExpectRequestError not support protocol: %s", protocol)
+func SetRequestPort(port int) func(*request.Request) {
+	return func(r *request.Request) {
+		r.Port(port)
 	}
 }
 
-func ExpectTCPRequest(port int, in, out []byte, timeout time.Duration, explain ...interface{}) {
-	res, err := request.SendTCPRequest(port, in, timeout)
+// NewRequest return a default TCP request with default timeout and content.
+func NewRequest() *request.Request {
+	return request.New().
+		Timeout(consts.DefaultTimeout).
+		Body([]byte(consts.TestString))
+}
+
+func ExpectResponse(req *request.Request, expectResp []byte, explain ...interface{}) {
+	ret, err := req.Do()
 	ExpectNoError(err, explain...)
-	ExpectEqual(string(out), res, explain...)
+	ExpectEqualValues(expectResp, ret, explain...)
 }
 
-func ExpectTCPRequestError(port int, in []byte, timeout time.Duration, explain ...interface{}) {
-	_, err := request.SendTCPRequest(port, in, timeout)
+func ExpectResponseError(req *request.Request, explain ...interface{}) {
+	_, err := req.Do()
 	ExpectError(err, explain...)
 }
 
-func ExpectUDPRequest(port int, in, out []byte, timeout time.Duration, explain ...interface{}) {
-	res, err := request.SendUDPRequest(port, in, timeout)
-	ExpectNoError(err, explain...)
-	ExpectEqual(string(out), res, explain...)
+type RequestExpect struct {
+	req *request.Request
+
+	f           *Framework
+	expectResp  []byte
+	expectError bool
+	explain     []interface{}
 }
 
-func ExpectUDPRequestError(port int, in []byte, timeout time.Duration, explain ...interface{}) {
-	_, err := request.SendUDPRequest(port, in, timeout)
-	ExpectError(err, explain...)
+func NewRequestExpect(f *Framework) *RequestExpect {
+	return &RequestExpect{
+		req:         NewRequest(),
+		f:           f,
+		expectResp:  []byte(consts.TestString),
+		expectError: false,
+		explain:     make([]interface{}, 0),
+	}
+}
+
+func (e *RequestExpect) Request(f func(r *request.Request)) *RequestExpect {
+	f(e.req)
+	return e
+}
+
+func (e *RequestExpect) PortName(name string) *RequestExpect {
+	if e.f != nil {
+		e.req.Port(e.f.PortByName(name))
+	}
+	return e
+}
+
+func (e *RequestExpect) ExpectError(expectErr bool) *RequestExpect {
+	e.expectError = expectErr
+	return e
+}
+
+func (e *RequestExpect) Explain(explain ...interface{}) *RequestExpect {
+	e.explain = explain
+	return e
+}
+
+func (e *RequestExpect) Ensure() {
+	if e.expectError {
+		ExpectResponseError(e.req, e.explain...)
+	} else {
+		ExpectResponse(e.req, e.expectResp, e.explain...)
+	}
 }

+ 0 - 4
test/e2e/framework/util.go

@@ -12,7 +12,3 @@ func init() {
 	uuid, _ := uuid.NewUUID()
 	RunID = uuid.String()
 }
-
-func GenPortName(name string) string {
-	return "Port" + name
-}

+ 58 - 1
test/e2e/pkg/port/port.go

@@ -3,6 +3,7 @@ package port
 import (
 	"fmt"
 	"net"
+	"sync"
 
 	"k8s.io/apimachinery/pkg/util/sets"
 )
@@ -10,6 +11,7 @@ import (
 type Allocator struct {
 	reserved sets.Int
 	used     sets.Int
+	mu       sync.Mutex
 }
 
 // NewAllocator return a port allocator for testing.
@@ -29,8 +31,27 @@ func NewAllocator(from int, to int, mod int, index int) *Allocator {
 }
 
 func (pa *Allocator) Get() int {
+	return pa.GetByName("")
+}
+
+func (pa *Allocator) GetByName(portName string) int {
+	var builder *nameBuilder
+	if portName == "" {
+		builder = &nameBuilder{}
+	} else {
+		var err error
+		builder, err = unmarshalFromName(portName)
+		if err != nil {
+			fmt.Println(err, portName)
+			return 0
+		}
+	}
+
+	pa.mu.Lock()
+	defer pa.mu.Unlock()
+
 	for i := 0; i < 10; i++ {
-		port, _ := pa.reserved.PopAny()
+		port := pa.getByRange(builder.rangePortFrom, builder.rangePortTo)
 		if port == 0 {
 			return 0
 		}
@@ -43,13 +64,49 @@ func (pa *Allocator) Get() int {
 			continue
 		}
 		l.Close()
+
+		udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("127.0.0.1:%d", port))
+		if err != nil {
+			continue
+		}
+		udpConn, err := net.ListenUDP("udp", udpAddr)
+		if err != nil {
+			// Maybe not controlled by us, mark it used.
+			pa.used.Insert(port)
+			continue
+		}
+		udpConn.Close()
+
 		pa.used.Insert(port)
 		return port
 	}
 	return 0
 }
 
+func (pa *Allocator) getByRange(from, to int) int {
+	if from <= 0 {
+		port, _ := pa.reserved.PopAny()
+		return port
+	}
+
+	// choose a random port between from - to
+	ports := pa.reserved.UnsortedList()
+	for _, port := range ports {
+		if port >= from && port <= to {
+			return port
+		}
+	}
+	return 0
+}
+
 func (pa *Allocator) Release(port int) {
+	if port <= 0 {
+		return
+	}
+
+	pa.mu.Lock()
+	defer pa.mu.Unlock()
+
 	if pa.used.Has(port) {
 		pa.used.Delete(port)
 		pa.reserved.Insert(port)

+ 67 - 0
test/e2e/pkg/port/util.go

@@ -0,0 +1,67 @@
+package port
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+)
+
+const (
+	NameDelimiter = "_"
+)
+
+type NameOption func(*nameBuilder) *nameBuilder
+
+type nameBuilder struct {
+	name          string
+	rangePortFrom int
+	rangePortTo   int
+}
+
+func unmarshalFromName(name string) (*nameBuilder, error) {
+	var builder nameBuilder
+	arrs := strings.Split(name, NameDelimiter)
+	switch len(arrs) {
+	case 2:
+		builder.name = arrs[1]
+	case 4:
+		builder.name = arrs[1]
+		if fromPort, err := strconv.Atoi(arrs[2]); err != nil {
+			return nil, fmt.Errorf("error range port from")
+		} else {
+			builder.rangePortFrom = fromPort
+		}
+		if toPort, err := strconv.Atoi(arrs[3]); err != nil {
+			return nil, fmt.Errorf("error range port to")
+		} else {
+			builder.rangePortTo = toPort
+		}
+	default:
+		return nil, fmt.Errorf("error port name format")
+	}
+	return &builder, nil
+}
+
+func (builder *nameBuilder) String() string {
+	name := fmt.Sprintf("Port%s%s", NameDelimiter, builder.name)
+	if builder.rangePortFrom > 0 && builder.rangePortTo > 0 && builder.rangePortTo > builder.rangePortFrom {
+		name += fmt.Sprintf("%s%d%s%d", NameDelimiter, builder.rangePortFrom, NameDelimiter, builder.rangePortTo)
+	}
+	return name
+}
+
+func WithRangePorts(from, to int) NameOption {
+	return func(builder *nameBuilder) *nameBuilder {
+		builder.rangePortFrom = from
+		builder.rangePortTo = to
+		return builder
+	}
+}
+
+func GenName(name string, options ...NameOption) string {
+	builder := &nameBuilder{name: name}
+	for _, option := range options {
+		option(builder)
+	}
+	return builder.String()
+}

+ 107 - 8
test/e2e/pkg/request/request.go

@@ -4,12 +4,108 @@ import (
 	"fmt"
 	"net"
 	"time"
+
+	libnet "github.com/fatedier/golib/net"
 )
 
-func SendTCPRequest(port int, content []byte, timeout time.Duration) (string, error) {
+type Request struct {
+	protocol  string
+	addr      string
+	port      int
+	body      []byte
+	timeout   time.Duration
+	proxyURL  string
+	proxyHost string
+}
+
+func New() *Request {
+	return &Request{
+		protocol: "tcp",
+	}
+}
+
+func (r *Request) Protocol(protocol string) *Request {
+	r.protocol = protocol
+	return r
+}
+
+func (r *Request) TCP() *Request {
+	r.protocol = "tcp"
+	return r
+}
+
+func (r *Request) UDP() *Request {
+	r.protocol = "udp"
+	return r
+}
+
+func (r *Request) Proxy(url, host string) *Request {
+	r.proxyURL = url
+	r.proxyHost = host
+	return r
+}
+
+func (r *Request) Addr(addr string) *Request {
+	r.addr = addr
+	return r
+}
+
+func (r *Request) Port(port int) *Request {
+	r.port = port
+	return r
+}
+
+func (r *Request) Timeout(timeout time.Duration) *Request {
+	r.timeout = timeout
+	return r
+}
+
+func (r *Request) Body(content []byte) *Request {
+	r.body = content
+	return r
+}
+
+func (r *Request) Do() ([]byte, error) {
+	var (
+		conn net.Conn
+		err  error
+	)
+	if len(r.proxyURL) > 0 {
+		if r.protocol != "tcp" {
+			return nil, fmt.Errorf("only tcp protocol is allowed for proxy")
+		}
+		conn, err = libnet.DialTcpByProxy(r.proxyURL, r.proxyHost)
+		if err != nil {
+			return nil, err
+		}
+	} else {
+		if r.addr == "" {
+			r.addr = fmt.Sprintf("127.0.0.1:%d", r.port)
+		}
+		switch r.protocol {
+		case "tcp":
+			conn, err = net.Dial("tcp", r.addr)
+		case "udp":
+			conn, err = net.Dial("udp", r.addr)
+		default:
+			return nil, fmt.Errorf("invalid protocol")
+		}
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	defer conn.Close()
+	if r.timeout > 0 {
+		conn.SetDeadline(time.Now().Add(r.timeout))
+	}
+	return sendRequestByConn(conn, r.body)
+}
+
+func SendTCPRequest(port int, content []byte, timeout time.Duration) ([]byte, error) {
 	c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port))
 	if err != nil {
-		return "", fmt.Errorf("connect to tcp server error: %v", err)
+		return nil, fmt.Errorf("connect to tcp server error: %v", err)
 	}
 	defer c.Close()
 
@@ -17,10 +113,10 @@ func SendTCPRequest(port int, content []byte, timeout time.Duration) (string, er
 	return sendRequestByConn(c, content)
 }
 
-func SendUDPRequest(port int, content []byte, timeout time.Duration) (string, error) {
+func SendUDPRequest(port int, content []byte, timeout time.Duration) ([]byte, error) {
 	c, err := net.Dial("udp", fmt.Sprintf("127.0.0.1:%d", port))
 	if err != nil {
-		return "", fmt.Errorf("connect to udp server error:  %v", err)
+		return nil, fmt.Errorf("connect to udp server error:  %v", err)
 	}
 	defer c.Close()
 
@@ -28,13 +124,16 @@ func SendUDPRequest(port int, content []byte, timeout time.Duration) (string, er
 	return sendRequestByConn(c, content)
 }
 
-func sendRequestByConn(c net.Conn, content []byte) (string, error) {
-	c.Write(content)
+func sendRequestByConn(c net.Conn, content []byte) ([]byte, error) {
+	_, err := c.Write(content)
+	if err != nil {
+		return nil, fmt.Errorf("write error: %v", err)
+	}
 
 	buf := make([]byte, 2048)
 	n, err := c.Read(buf)
 	if err != nil {
-		return "", fmt.Errorf("read error: %v", err)
+		return nil, fmt.Errorf("read error: %v", err)
 	}
-	return string(buf[:n]), nil
+	return buf[:n], nil
 }

+ 10 - 10
test/e2e/plugin/client_plugins.go

@@ -2,16 +2,14 @@ package plugin
 
 import (
 	"fmt"
-	"time"
 
 	"github.com/fatedier/frp/test/e2e/framework"
 	"github.com/fatedier/frp/test/e2e/framework/consts"
+	"github.com/fatedier/frp/test/e2e/pkg/port"
 
 	. "github.com/onsi/ginkgo"
 )
 
-var connTimeout = 2 * time.Second
-
 var _ = Describe("[Feature: Client-Plugins]", func() {
 	f := framework.NewDefaultFramework()
 
@@ -37,21 +35,21 @@ var _ = Describe("[Feature: Client-Plugins]", func() {
 			}{
 				{
 					proxyName: "normal",
-					portName:  framework.GenPortName("Normal"),
+					portName:  port.GenName("Normal"),
 				},
 				{
 					proxyName:   "with-encryption",
-					portName:    framework.GenPortName("WithEncryption"),
+					portName:    port.GenName("WithEncryption"),
 					extraConfig: "use_encryption = true",
 				},
 				{
 					proxyName:   "with-compression",
-					portName:    framework.GenPortName("WithCompression"),
+					portName:    port.GenName("WithCompression"),
 					extraConfig: "use_compression = true",
 				},
 				{
 					proxyName: "with-encryption-and-compression",
-					portName:  framework.GenPortName("WithEncryptionAndCompression"),
+					portName:  port.GenName("WithEncryptionAndCompression"),
 					extraConfig: `
 					use_encryption = true
 					use_compression = true
@@ -67,9 +65,11 @@ var _ = Describe("[Feature: Client-Plugins]", func() {
 			f.RunProcesses([]string{serverConf}, []string{clientConf})
 
 			for _, test := range tests {
-				framework.ExpectTCPRequest(f.UsedPorts[test.portName],
-					[]byte(consts.TestString), []byte(consts.TestString),
-					connTimeout, test.proxyName)
+				framework.ExpectResponse(
+					framework.NewRequest().Port(f.PortByName(test.portName)),
+					[]byte(consts.TestString),
+					test.proxyName,
+				)
 			}
 		})
 	})

+ 0 - 7
tests/ci/auto_test_frpc.ini

@@ -132,13 +132,6 @@ custom_domains = test6.frp.com
 host_header_rewrite = test6.frp.com
 header_X-From-Where = frp
 
-[tcpmuxhttpconnect]
-type = tcpmux
-multiplexer = httpconnect
-local_ip = 127.0.0.1
-local_port = 10701
-custom_domains = tunnel1
-
 [wildcard_http]
 type = http
 local_ip = 127.0.0.1

+ 0 - 46
tests/ci/normal_test.go

@@ -5,7 +5,6 @@ import (
 	"net/http"
 	"net/url"
 	"os"
-	"strings"
 	"testing"
 	"time"
 
@@ -13,7 +12,6 @@ import (
 	"github.com/stretchr/testify/assert"
 
 	"github.com/fatedier/frp/client/proxy"
-	"github.com/fatedier/frp/server/ports"
 	"github.com/fatedier/frp/tests/consts"
 	"github.com/fatedier/frp/tests/mock"
 	"github.com/fatedier/frp/tests/util"
@@ -155,17 +153,6 @@ func TestHTTP(t *testing.T) {
 	}
 }
 
-func TestTCPMux(t *testing.T) {
-	assert := assert.New(t)
-
-	conn, err := gnet.DialTcpByProxy(fmt.Sprintf("http://%s:%d", "127.0.0.1", consts.TEST_TCP_MUX_FRP_PORT), "tunnel1")
-	if assert.NoError(err) {
-		res, err := util.SendTCPMsgByConn(conn, consts.TEST_TCP_ECHO_STR)
-		assert.NoError(err)
-		assert.Equal(consts.TEST_TCP_ECHO_STR, res)
-	}
-}
-
 func TestWebSocket(t *testing.T) {
 	assert := assert.New(t)
 
@@ -182,39 +169,6 @@ func TestWebSocket(t *testing.T) {
 	assert.Equal(consts.TEST_HTTP_NORMAL_STR, string(msg))
 }
 
-func TestAllowPorts(t *testing.T) {
-	assert := assert.New(t)
-	// Port not allowed
-	status, err := util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyTCPPortNotAllowed)
-	if assert.NoError(err) {
-		assert.Equal(proxy.ProxyPhaseStartErr, status.Status)
-		assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error()))
-	}
-
-	status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyUDPPortNotAllowed)
-	if assert.NoError(err) {
-		assert.Equal(proxy.ProxyPhaseStartErr, status.Status)
-		assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error()))
-	}
-
-	status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyTCPPortUnavailable)
-	if assert.NoError(err) {
-		assert.Equal(proxy.ProxyPhaseStartErr, status.Status)
-		assert.True(strings.Contains(status.Err, ports.ErrPortUnAvailable.Error()))
-	}
-
-	// Port normal
-	status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyTCPPortNormal)
-	if assert.NoError(err) {
-		assert.Equal(proxy.ProxyPhaseRunning, status.Status)
-	}
-
-	status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyUDPPortNormal)
-	if assert.NoError(err) {
-		assert.Equal(proxy.ProxyPhaseRunning, status.Status)
-	}
-}
-
 func TestRandomPort(t *testing.T) {
 	assert := assert.New(t)
 	// tcp