Explorar el Código

more e2e tests (#1845)

fatedier hace 4 años
padre
commit
c9fe23eb10

+ 30 - 4
client/proxy/proxy.go

@@ -479,12 +479,25 @@ func (pxy *UDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
 	// close resources releated with old workConn
 	pxy.Close()
 
+	var rwc io.ReadWriteCloser = conn
+	var err error
 	if pxy.limiter != nil {
-		rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
+		rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
 			return conn.Close()
 		})
-		conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
 	}
+	if pxy.cfg.UseEncryption {
+		rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
+		if err != nil {
+			conn.Close()
+			xl.Error("create encryption stream error: %v", err)
+			return
+		}
+	}
+	if pxy.cfg.UseCompression {
+		rwc = frpIo.WithCompression(rwc)
+	}
+	conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
 
 	pxy.mu.Lock()
 	pxy.workConn = conn
@@ -579,12 +592,25 @@ func (pxy *SUDPProxy) InWorkConn(conn net.Conn, m *msg.StartWorkConn) {
 	xl := pxy.xl
 	xl.Info("incoming a new work connection for sudp proxy, %s", conn.RemoteAddr().String())
 
+	var rwc io.ReadWriteCloser = conn
+	var err error
 	if pxy.limiter != nil {
-		rwc := frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
+		rwc = frpIo.WrapReadWriteCloser(limit.NewReader(conn, pxy.limiter), limit.NewWriter(conn, pxy.limiter), func() error {
 			return conn.Close()
 		})
-		conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
 	}
+	if pxy.cfg.UseEncryption {
+		rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.clientCfg.Token))
+		if err != nil {
+			conn.Close()
+			xl.Error("create encryption stream error: %v", err)
+			return
+		}
+	}
+	if pxy.cfg.UseCompression {
+		rwc = frpIo.WithCompression(rwc)
+	}
+	conn = frpNet.WrapReadWriteCloserToConn(rwc, conn)
 
 	workConn := conn
 	readCh := make(chan *msg.UDPPacket, 1024)

+ 17 - 3
client/visitor.go

@@ -488,8 +488,9 @@ func (sv *SUDPVisitor) worker(workConn net.Conn) {
 	xl.Info("sudp worker is closed")
 }
 
-func (sv *SUDPVisitor) getNewVisitorConn() (visitorConn net.Conn, err error) {
-	visitorConn, err = sv.ctl.connectServer()
+func (sv *SUDPVisitor) getNewVisitorConn() (net.Conn, error) {
+	xl := xlog.FromContextSafe(sv.ctx)
+	visitorConn, err := sv.ctl.connectServer()
 	if err != nil {
 		return nil, fmt.Errorf("frpc connect frps error: %v", err)
 	}
@@ -518,7 +519,20 @@ func (sv *SUDPVisitor) getNewVisitorConn() (visitorConn net.Conn, err error) {
 	if newVisitorConnRespMsg.Error != "" {
 		return nil, fmt.Errorf("start new visitor connection error: %s", newVisitorConnRespMsg.Error)
 	}
-	return
+
+	var remote io.ReadWriteCloser
+	remote = visitorConn
+	if sv.cfg.UseEncryption {
+		remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk))
+		if err != nil {
+			xl.Error("create encryption stream error: %v", err)
+			return nil, err
+		}
+	}
+	if sv.cfg.UseCompression {
+		remote = frpIo.WithCompression(remote)
+	}
+	return frpNet.WrapReadWriteCloserToConn(remote, visitorConn), nil
 }
 
 func (sv *SUDPVisitor) Close() {

+ 2 - 1
go.mod

@@ -17,7 +17,7 @@ require (
 	github.com/klauspost/cpuid v1.2.0 // indirect
 	github.com/klauspost/reedsolomon v1.9.1 // indirect
 	github.com/mattn/go-runewidth v0.0.4 // indirect
-	github.com/onsi/ginkgo v1.12.2
+	github.com/onsi/ginkgo v1.12.3
 	github.com/onsi/gomega v1.10.1
 	github.com/pires/go-proxyproto v0.0.0-20190111085350-4d51b51e3bfc
 	github.com/pquerna/cachecontrol v0.0.0-20180517163645-1555304b9b35 // indirect
@@ -33,6 +33,7 @@ require (
 	github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect
 	golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
 	golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
+	golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980 // indirect
 	golang.org/x/time v0.0.0-20191024005414-555d28b269f0
 	gopkg.in/square/go-jose.v2 v2.4.1 // indirect
 	k8s.io/apimachinery v0.18.3

+ 4 - 2
go.sum

@@ -115,8 +115,8 @@ github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
-github.com/onsi/ginkgo v1.12.2 h1:Ke9m3h2Hu0wsZ45yewCqhYr3Z+emcNTuLY2nMWCkrSI=
-github.com/onsi/ginkgo v1.12.2/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
+github.com/onsi/ginkgo v1.12.3 h1:+RYp9QczoWz9zfUyLP/5SLXQVhfr6gZOoKGfQqHuLZQ=
+github.com/onsi/ginkgo v1.12.3/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
 github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
 github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
 github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
@@ -206,6 +206,8 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4=
 golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980 h1:OjiUf46hAmXblsZdnoSXsEUSKU8r1UEzcL5RVZ4gO9Y=
+golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=

+ 5 - 1
hack/run-e2e.sh

@@ -8,4 +8,8 @@ if [ $? -ne 0 ]; then
     go get -u github.com/onsi/ginkgo/ginkgo
 fi
 
-ginkgo -nodes=1 ${ROOT}/test/e2e -- -frpc-path=${ROOT}/bin/frpc -frps-path=${ROOT}/bin/frps -log-level=debug
+debug=false
+if [ x${DEBUG} == x"true" ]; then
+    debug=true
+fi
+ginkgo -nodes=4 ${ROOT}/test/e2e -- -frpc-path=${ROOT}/bin/frpc -frps-path=${ROOT}/bin/frps -log-level=debug -debug=${debug}

+ 21 - 4
server/proxy/udp.go

@@ -17,6 +17,7 @@ package proxy
 import (
 	"context"
 	"fmt"
+	"io"
 	"net"
 	"time"
 
@@ -24,8 +25,10 @@ import (
 	"github.com/fatedier/frp/models/msg"
 	"github.com/fatedier/frp/models/proto/udp"
 	"github.com/fatedier/frp/server/metrics"
+	frpNet "github.com/fatedier/frp/utils/net"
 
 	"github.com/fatedier/golib/errors"
+	frpIo "github.com/fatedier/golib/io"
 )
 
 type UDPProxy struct {
@@ -174,14 +177,28 @@ func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
 				}
 				continue
 			}
-			// close the old workConn and replac it with a new one
+			// close the old workConn and replace it with a new one
 			if pxy.workConn != nil {
 				pxy.workConn.Close()
 			}
-			pxy.workConn = workConn
+
+			var rwc io.ReadWriteCloser = workConn
+			if pxy.cfg.UseEncryption {
+				rwc, err = frpIo.WithEncryption(rwc, []byte(pxy.serverCfg.Token))
+				if err != nil {
+					xl.Error("create encryption stream error: %v", err)
+					workConn.Close()
+					continue
+				}
+			}
+			if pxy.cfg.UseCompression {
+				rwc = frpIo.WithCompression(rwc)
+			}
+
+			pxy.workConn = frpNet.WrapReadWriteCloserToConn(rwc, workConn)
 			ctx, cancel := context.WithCancel(context.Background())
-			go workConnReaderFn(workConn)
-			go workConnSenderFn(workConn, ctx)
+			go workConnReaderFn(pxy.workConn)
+			go workConnSenderFn(pxy.workConn, ctx)
 			_, ok := <-pxy.checkCloseCh
 			cancel()
 			if !ok {

+ 198 - 0
test/e2e/basic/basic.go

@@ -0,0 +1,198 @@
+package basic
+
+import (
+	"fmt"
+	"strings"
+	"time"
+
+	"github.com/fatedier/frp/test/e2e/framework"
+	"github.com/fatedier/frp/test/e2e/framework/consts"
+
+	. "github.com/onsi/ginkgo"
+)
+
+var connTimeout = 2 * time.Second
+
+var _ = Describe("[Feature: Basic]", func() {
+	f := framework.NewDefaultFramework()
+
+	Describe("TCP && UDP", func() {
+		types := []string{"tcp", "udp"}
+		for _, t := range types {
+			proxyType := t
+			It(fmt.Sprintf("Expose a %s echo server", strings.ToUpper(proxyType)), func() {
+				serverConf := consts.DefaultServerConfig
+				clientConf := consts.DefaultClientConfig
+
+				localPortName := ""
+				protocol := "tcp"
+				switch proxyType {
+				case "tcp":
+					localPortName = framework.TCPEchoServerPort
+					protocol = "tcp"
+				case "udp":
+					localPortName = framework.UDPEchoServerPort
+					protocol = "udp"
+				}
+				getProxyConf := func(proxyName string, portName string, extra string) string {
+					return fmt.Sprintf(`
+				[%s]
+				type = %s
+				local_port = {{ .%s }}
+				remote_port = {{ .%s }}
+				`+extra, proxyName, proxyType, localPortName, portName)
+				}
+
+				tests := []struct {
+					proxyName   string
+					portName    string
+					extraConfig string
+				}{
+					{
+						proxyName: "normal",
+						portName:  framework.GenPortName("Normal"),
+					},
+					{
+						proxyName:   "with-encryption",
+						portName:    framework.GenPortName("WithEncryption"),
+						extraConfig: "use_encryption = true",
+					},
+					{
+						proxyName:   "with-compression",
+						portName:    framework.GenPortName("WithCompression"),
+						extraConfig: "use_compression = true",
+					},
+					{
+						proxyName: "with-encryption-and-compression",
+						portName:  framework.GenPortName("WithEncryptionAndCompression"),
+						extraConfig: `
+						use_encryption = true
+						use_compression = true
+						`,
+					},
+				}
+
+				// build all client config
+				for _, test := range tests {
+					clientConf += getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n"
+				}
+				// run frps and frpc
+				f.RunProcesses([]string{serverConf}, []string{clientConf})
+
+				for _, test := range tests {
+					framework.ExpectRequest(protocol, f.UsedPorts[test.portName],
+						[]byte(consts.TestString), []byte(consts.TestString), connTimeout, test.proxyName)
+				}
+			})
+		}
+	})
+
+	Describe("STCP && SUDP", func() {
+		types := []string{"stcp", "sudp"}
+		for _, t := range types {
+			proxyType := t
+			It(fmt.Sprintf("Expose echo server with %s", strings.ToUpper(proxyType)), func() {
+				serverConf := consts.DefaultServerConfig
+				clientServerConf := consts.DefaultClientConfig
+				clientVisitorConf := consts.DefaultClientConfig
+
+				localPortName := ""
+				protocol := "tcp"
+				switch proxyType {
+				case "stcp":
+					localPortName = framework.TCPEchoServerPort
+					protocol = "tcp"
+				case "sudp":
+					localPortName = framework.UDPEchoServerPort
+					protocol = "udp"
+				}
+
+				correctSK := "abc"
+				wrongSK := "123"
+
+				getProxyServerConf := func(proxyName string, extra string) string {
+					return fmt.Sprintf(`
+				[%s]
+				type = %s
+				role = server
+				sk = %s
+				local_port = {{ .%s }}
+				`+extra, proxyName, proxyType, correctSK, localPortName)
+				}
+				getProxyVisitorConf := func(proxyName string, portName, visitorSK, extra string) string {
+					return fmt.Sprintf(`
+				[%s]
+				type = %s
+				role = visitor
+				server_name = %s
+				sk = %s
+				bind_port = {{ .%s }}
+				`+extra, proxyName, proxyType, proxyName, visitorSK, portName)
+				}
+
+				tests := []struct {
+					proxyName    string
+					bindPortName string
+					visitorSK    string
+					extraConfig  string
+					expectError  bool
+				}{
+					{
+						proxyName:    "normal",
+						bindPortName: framework.GenPortName("Normal"),
+						visitorSK:    correctSK,
+					},
+					{
+						proxyName:    "with-encryption",
+						bindPortName: framework.GenPortName("WithEncryption"),
+						visitorSK:    correctSK,
+						extraConfig:  "use_encryption = true",
+					},
+					{
+						proxyName:    "with-compression",
+						bindPortName: framework.GenPortName("WithCompression"),
+						visitorSK:    correctSK,
+						extraConfig:  "use_compression = true",
+					},
+					{
+						proxyName:    "with-encryption-and-compression",
+						bindPortName: framework.GenPortName("WithEncryptionAndCompression"),
+						visitorSK:    correctSK,
+						extraConfig: `
+						use_encryption = true
+						use_compression = true
+						`,
+					},
+					{
+						proxyName:    "with-error-sk",
+						bindPortName: framework.GenPortName("WithErrorSK"),
+						visitorSK:    wrongSK,
+						expectError:  true,
+					},
+				}
+
+				// build all client config
+				for _, test := range tests {
+					clientServerConf += getProxyServerConf(test.proxyName, test.extraConfig) + "\n"
+				}
+				for _, test := range tests {
+					clientVisitorConf += getProxyVisitorConf(test.proxyName, test.bindPortName, test.visitorSK, test.extraConfig) + "\n"
+				}
+				// run frps and frpc
+				f.RunProcesses([]string{serverConf}, []string{clientServerConf, clientVisitorConf})
+
+				for _, test := range tests {
+					expectResp := []byte(consts.TestString)
+					if test.expectError {
+						framework.ExpectRequestError(protocol, f.UsedPorts[test.bindPortName],
+							[]byte(consts.TestString), connTimeout, test.proxyName)
+						continue
+					}
+
+					framework.ExpectRequest(protocol, f.UsedPorts[test.bindPortName],
+						[]byte(consts.TestString), expectResp, connTimeout, test.proxyName)
+				}
+			})
+		}
+	})
+})

+ 95 - 0
test/e2e/basic/client_server.go

@@ -0,0 +1,95 @@
+package basic
+
+import (
+	"fmt"
+
+	"github.com/fatedier/frp/test/e2e/framework"
+	"github.com/fatedier/frp/test/e2e/framework/consts"
+
+	. "github.com/onsi/ginkgo"
+)
+
+type generalTestConfigures struct {
+	server      string
+	client      string
+	expectError bool
+}
+
+func defineClientServerTest(desc string, f *framework.Framework, configures *generalTestConfigures) {
+	It(desc, func() {
+		serverConf := consts.DefaultServerConfig
+		clientConf := consts.DefaultClientConfig
+
+		serverConf += fmt.Sprintf(`
+				%s
+				`, configures.server)
+
+		clientConf += fmt.Sprintf(`
+				%s
+
+				[tcp]
+				type = tcp
+				local_port = {{ .%s }}
+				remote_port = {{ .%s }}
+
+				[udp]
+				type = udp
+				local_port = {{ .%s }}
+				remote_port = {{ .%s }}
+				`, configures.client,
+			framework.TCPEchoServerPort, framework.GenPortName("TCP"),
+			framework.UDPEchoServerPort, framework.GenPortName("UDP"),
+		)
+
+		f.RunProcesses([]string{serverConf}, []string{clientConf})
+
+		if !configures.expectError {
+			framework.ExpectTCPRequest(f.UsedPorts[framework.GenPortName("TCP")],
+				[]byte(consts.TestString), []byte(consts.TestString), connTimeout, "tcp proxy")
+			framework.ExpectUDPRequest(f.UsedPorts[framework.GenPortName("UDP")],
+				[]byte(consts.TestString), []byte(consts.TestString), connTimeout, "udp proxy")
+		} else {
+			framework.ExpectTCPRequestError(f.UsedPorts[framework.GenPortName("TCP")],
+				[]byte(consts.TestString), connTimeout, "tcp proxy")
+			framework.ExpectUDPRequestError(f.UsedPorts[framework.GenPortName("UDP")],
+				[]byte(consts.TestString), connTimeout, "udp proxy")
+		}
+	})
+}
+
+var _ = Describe("[Feature: Client-Server]", func() {
+	f := framework.NewDefaultFramework()
+
+	Describe("Protocol", func() {
+		supportProtocols := []string{"tcp", "kcp", "websocket"}
+		for _, protocol := range supportProtocols {
+			configures := &generalTestConfigures{
+				server: fmt.Sprintf(`
+				kcp_bind_port = {{ .%s }}
+				protocol = %s"
+				`, consts.PortServerName, protocol),
+				client: "protocol = " + protocol,
+			}
+			defineClientServerTest(protocol, f, configures)
+		}
+	})
+
+	Describe("Authentication", func() {
+		func() {
+			configures := &generalTestConfigures{
+				server: "token = 123456",
+				client: "token = 123456",
+			}
+			defineClientServerTest("Token Correct", f, configures)
+		}()
+
+		func() {
+			configures := &generalTestConfigures{
+				server:      "token = 123456",
+				client:      "token = invalid",
+				expectError: true,
+			}
+			defineClientServerTest("Token Incorrect", f, configures)
+		}()
+	})
+})

+ 2 - 1
test/e2e/e2e.go

@@ -33,7 +33,8 @@ var _ = ginkgo.SynchronizedAfterSuite(func() {
 func RunE2ETests(t *testing.T) {
 	gomega.RegisterFailHandler(framework.Fail)
 
-	log.Info("Starting e2e run %q on Ginkgo node %d", framework.RunID, config.GinkgoConfig.ParallelNode)
+	log.Info("Starting e2e run %q on Ginkgo node %d of total %d",
+		framework.RunID, config.GinkgoConfig.ParallelNode, config.GinkgoConfig.ParallelTotal)
 	ginkgo.RunSpecs(t, "frp e2e suite")
 }
 

+ 6 - 0
test/e2e/e2e_test.go

@@ -8,6 +8,12 @@ import (
 
 	"github.com/fatedier/frp/test/e2e/framework"
 	"github.com/fatedier/frp/utils/log"
+
+	// test source
+	_ "github.com/fatedier/frp/test/e2e/basic"
+	_ "github.com/fatedier/frp/test/e2e/plugin"
+
+	_ "github.com/onsi/ginkgo"
 )
 
 // handleFlags sets up all flags and parses the command line.

+ 7 - 12
test/e2e/examples.go

@@ -10,31 +10,26 @@ import (
 	. "github.com/onsi/ginkgo"
 )
 
-var connTimeout = 5 * time.Second
+var connTimeout = 2 * time.Second
 
 var _ = Describe("[Feature: Example]", func() {
 	f := framework.NewDefaultFramework()
 
 	Describe("TCP", func() {
 		It("Expose a TCP echo server", func() {
-			serverConf := `
-			[common]
-			bind_port = {{ .PortServer }}
-			`
-
-			clientConf := fmt.Sprintf(`
-			[common]
-			server_port = {{ .PortServer }}
+			serverConf := consts.DefaultServerConfig
+			clientConf := consts.DefaultClientConfig
 
+			clientConf += fmt.Sprintf(`
 			[tcp]
 			type = tcp
 			local_port = {{ .%s }}
-			remote_port = {{ .PortTCP }}
-			`, framework.TCPEchoServerPort)
+			remote_port = {{ .%s }}
+			`, framework.TCPEchoServerPort, framework.GenPortName("TCP"))
 
 			f.RunProcesses([]string{serverConf}, []string{clientConf})
 
-			framework.ExpectTCPReuqest(f.UsedPorts["PortTCP"], []byte(consts.TestString), []byte(consts.TestString), connTimeout)
+			framework.ExpectTCPRequest(f.UsedPorts[framework.GenPortName("TCP")], []byte(consts.TestString), []byte(consts.TestString), connTimeout)
 		})
 	})
 })

+ 16 - 0
test/e2e/framework/consts/consts.go

@@ -3,3 +3,19 @@ package consts
 const (
 	TestString = "frp is a fast reverse proxy to help you expose a local server behind a NAT or firewall to the internet."
 )
+
+const (
+	PortServerName = "PortServer"
+)
+
+const (
+	DefaultServerConfig = `
+	[common]
+	bind_port = {{ .PortServer }}
+	`
+
+	DefaultClientConfig = `
+	[common]
+	server_port = {{ .PortServer }}
+	`
+)

+ 29 - 9
test/e2e/framework/framework.go

@@ -60,10 +60,6 @@ func NewFramework(opt Options) *Framework {
 	f := &Framework{
 		portAllocator: port.NewAllocator(opt.FromPortIndex, opt.ToPortIndex, opt.TotalParallelNode, opt.CurrentNodeIndex-1),
 	}
-	f.mockServers = NewMockServers(f.portAllocator)
-	if err := f.mockServers.Run(); err != nil {
-		Failf("%v", err)
-	}
 
 	ginkgo.BeforeEach(f.BeforeEach)
 	ginkgo.AfterEach(f.AfterEach)
@@ -79,6 +75,11 @@ func (f *Framework) BeforeEach() {
 	dir, err := ioutil.TempDir(os.TempDir(), "frpe2e-test-*")
 	ExpectNoError(err)
 	f.TempDirectory = dir
+
+	f.mockServers = NewMockServers(f.portAllocator)
+	if err := f.mockServers.Run(); err != nil {
+		Failf("%v", err)
+	}
 }
 
 func (f *Framework) AfterEach() {
@@ -88,19 +89,38 @@ func (f *Framework) AfterEach() {
 
 	RemoveCleanupAction(f.cleanupHandle)
 
-	os.RemoveAll(f.TempDirectory)
-	f.TempDirectory = ""
-	f.UsedPorts = nil
-	f.serverConfPaths = nil
-	f.clientConfPaths = nil
+	// stop processor
 	for _, p := range f.serverProcesses {
 		p.Stop()
+		if TestContext.Debug {
+			fmt.Println(p.ErrorOutput())
+			fmt.Println(p.StdOutput())
+		}
 	}
 	for _, p := range f.clientProcesses {
 		p.Stop()
+		if TestContext.Debug {
+			fmt.Println(p.ErrorOutput())
+			fmt.Println(p.StdOutput())
+		}
 	}
 	f.serverProcesses = nil
 	f.clientProcesses = nil
+
+	// close mock servers
+	f.mockServers.Close()
+
+	// clean directory
+	os.RemoveAll(f.TempDirectory)
+	f.TempDirectory = ""
+	f.serverConfPaths = nil
+	f.clientConfPaths = nil
+
+	// release used ports
+	for _, port := range f.UsedPorts {
+		f.portAllocator.Release(port)
+	}
+	f.UsedPorts = nil
 }
 
 var portRegex = regexp.MustCompile(`{{ \.Port.*? }}`)

+ 25 - 0
test/e2e/framework/mockservers.go

@@ -1,6 +1,9 @@
 package framework
 
 import (
+	"fmt"
+	"os"
+
 	"github.com/fatedier/frp/test/e2e/mock/echoserver"
 	"github.com/fatedier/frp/test/e2e/pkg/port"
 )
@@ -8,11 +11,13 @@ import (
 const (
 	TCPEchoServerPort = "TCPEchoServerPort"
 	UDPEchoServerPort = "UDPEchoServerPort"
+	UDSEchoServerAddr = "UDSEchoServerAddr"
 )
 
 type MockServers struct {
 	tcpEchoServer *echoserver.Server
 	udpEchoServer *echoserver.Server
+	udsEchoServer *echoserver.Server
 }
 
 func NewMockServers(portAllocator *port.Allocator) *MockServers {
@@ -31,6 +36,15 @@ func NewMockServers(portAllocator *port.Allocator) *MockServers {
 		BindPort:  int32(udpPort),
 		RepeatNum: 1,
 	})
+
+	udsIndex := portAllocator.Get()
+	udsAddr := fmt.Sprintf("%s/frp_echo_server_%d.sock", os.TempDir(), udsIndex)
+	os.Remove(udsAddr)
+	s.udsEchoServer = echoserver.New(echoserver.Options{
+		Type:      echoserver.Unix,
+		BindAddr:  udsAddr,
+		RepeatNum: 1,
+	})
 	return s
 }
 
@@ -41,13 +55,24 @@ func (m *MockServers) Run() error {
 	if err := m.udpEchoServer.Run(); err != nil {
 		return err
 	}
+	if err := m.udsEchoServer.Run(); err != nil {
+		return err
+	}
 	return nil
 }
 
+func (m *MockServers) Close() {
+	m.tcpEchoServer.Close()
+	m.udpEchoServer.Close()
+	m.udsEchoServer.Close()
+	os.Remove(m.udsEchoServer.GetOptions().BindAddr)
+}
+
 func (m *MockServers) GetTemplateParams() map[string]interface{} {
 	ret := make(map[string]interface{})
 	ret[TCPEchoServerPort] = m.tcpEchoServer.GetOptions().BindPort
 	ret[UDPEchoServerPort] = m.udpEchoServer.GetOptions().BindPort
+	ret[UDSEchoServerAddr] = m.udsEchoServer.GetOptions().BindAddr
 	return ret
 }
 

+ 41 - 3
test/e2e/framework/request.go

@@ -6,8 +6,46 @@ import (
 	"github.com/fatedier/frp/test/e2e/pkg/request"
 )
 
-func ExpectTCPReuqest(port int, in, out []byte, timeout time.Duration) {
+func ExpectRequest(protocol string, port int, in, out []byte, timeout time.Duration, explain ...interface{}) {
+	switch protocol {
+	case "tcp":
+		ExpectTCPRequest(port, in, out, timeout, explain...)
+	case "udp":
+		ExpectUDPRequest(port, in, out, timeout, explain...)
+	default:
+		Failf("ExpectRequest not support protocol: %s", protocol)
+	}
+}
+
+func ExpectRequestError(protocol string, port int, in []byte, timeout time.Duration, explain ...interface{}) {
+	switch protocol {
+	case "tcp":
+		ExpectTCPRequestError(port, in, timeout, explain...)
+	case "udp":
+		ExpectUDPRequestError(port, in, timeout, explain...)
+	default:
+		Failf("ExpectRequestError not support protocol: %s", protocol)
+	}
+}
+
+func ExpectTCPRequest(port int, in, out []byte, timeout time.Duration, explain ...interface{}) {
 	res, err := request.SendTCPRequest(port, in, timeout)
-	ExpectNoError(err)
-	ExpectEqual(string(out), res)
+	ExpectNoError(err, explain...)
+	ExpectEqual(string(out), res, explain...)
+}
+
+func ExpectTCPRequestError(port int, in []byte, timeout time.Duration, explain ...interface{}) {
+	_, err := request.SendTCPRequest(port, in, timeout)
+	ExpectError(err, explain...)
+}
+
+func ExpectUDPRequest(port int, in, out []byte, timeout time.Duration, explain ...interface{}) {
+	res, err := request.SendUDPRequest(port, in, timeout)
+	ExpectNoError(err, explain...)
+	ExpectEqual(string(out), res, explain...)
+}
+
+func ExpectUDPRequestError(port int, in []byte, timeout time.Duration, explain ...interface{}) {
+	_, err := request.SendUDPRequest(port, in, timeout)
+	ExpectError(err, explain...)
 }

+ 10 - 0
test/e2e/framework/test_context.go

@@ -4,12 +4,15 @@ import (
 	"flag"
 	"fmt"
 	"os"
+
+	"github.com/onsi/ginkgo/config"
 )
 
 type TestContextType struct {
 	FRPClientPath string
 	FRPServerPath string
 	LogLevel      string
+	Debug         bool
 }
 
 var TestContext TestContextType
@@ -23,9 +26,16 @@ var TestContext TestContextType
 // regardless whether the test is actually in the test suite.
 //
 func RegisterCommonFlags(flags *flag.FlagSet) {
+	// Turn on EmitSpecProgress to get spec progress (especially on interrupt)
+	config.GinkgoConfig.EmitSpecProgress = true
+
+	// Randomize specs as well as suites
+	config.GinkgoConfig.RandomizeAllSpecs = true
+
 	flags.StringVar(&TestContext.FRPClientPath, "frpc-path", "../../bin/frpc", "The frp client binary to use.")
 	flags.StringVar(&TestContext.FRPServerPath, "frps-path", "../../bin/frps", "The frp server binary to use.")
 	flags.StringVar(&TestContext.LogLevel, "log-level", "debug", "Log level.")
+	flags.BoolVar(&TestContext.Debug, "debug", false, "Enable debug mode to print detail info.")
 }
 
 func ValidateTestContext(t *TestContextType) error {

+ 4 - 0
test/e2e/framework/util.go

@@ -12,3 +12,7 @@ func init() {
 	uuid, _ := uuid.NewUUID()
 	RunID = uuid.String()
 }
+
+func GenPortName(name string) string {
+	return "Port" + name
+}

+ 7 - 0
test/e2e/pkg/process/process.go

@@ -10,6 +10,7 @@ type Process struct {
 	cmd         *exec.Cmd
 	cancel      context.CancelFunc
 	errorOutput *bytes.Buffer
+	stdOutput   *bytes.Buffer
 
 	beforeStopHandler func()
 }
@@ -22,7 +23,9 @@ func New(path string, params []string) *Process {
 		cancel: cancel,
 	}
 	p.errorOutput = bytes.NewBufferString("")
+	p.stdOutput = bytes.NewBufferString("")
 	cmd.Stderr = p.errorOutput
+	cmd.Stdout = p.stdOutput
 	return p
 }
 
@@ -42,6 +45,10 @@ func (p *Process) ErrorOutput() string {
 	return p.errorOutput.String()
 }
 
+func (p *Process) StdOutput() string {
+	return p.stdOutput.String()
+}
+
 func (p *Process) SetBeforeStopHandler(fn func()) {
 	p.beforeStopHandler = fn
 }

+ 18 - 9
test/e2e/pkg/request/request.go

@@ -6,26 +6,35 @@ import (
 	"time"
 )
 
-func SendTCPRequest(port int, content []byte, timeout time.Duration) (res string, err error) {
+func SendTCPRequest(port int, content []byte, timeout time.Duration) (string, error) {
 	c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", port))
 	if err != nil {
-		err = fmt.Errorf("connect to tcp server error: %v", err)
-		return
+		return "", fmt.Errorf("connect to tcp server error: %v", err)
 	}
 	defer c.Close()
 
 	c.SetDeadline(time.Now().Add(timeout))
-	return sendTCPRequestByConn(c, content)
+	return sendRequestByConn(c, content)
 }
 
-func sendTCPRequestByConn(c net.Conn, content []byte) (res string, err error) {
+func SendUDPRequest(port int, content []byte, timeout time.Duration) (string, error) {
+	c, err := net.Dial("udp", fmt.Sprintf("127.0.0.1:%d", port))
+	if err != nil {
+		return "", fmt.Errorf("connect to udp server error:  %v", err)
+	}
+	defer c.Close()
+
+	c.SetDeadline(time.Now().Add(timeout))
+	return sendRequestByConn(c, content)
+}
+
+func sendRequestByConn(c net.Conn, content []byte) (string, error) {
 	c.Write(content)
 
 	buf := make([]byte, 2048)
-	n, errRet := c.Read(buf)
-	if errRet != nil {
-		err = fmt.Errorf("read from tcp error: %v", errRet)
-		return
+	n, err := c.Read(buf)
+	if err != nil {
+		return "", fmt.Errorf("read error: %v", err)
 	}
 	return string(buf[:n]), nil
 }

+ 76 - 0
test/e2e/plugin/client_plugins.go

@@ -0,0 +1,76 @@
+package plugin
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/fatedier/frp/test/e2e/framework"
+	"github.com/fatedier/frp/test/e2e/framework/consts"
+
+	. "github.com/onsi/ginkgo"
+)
+
+var connTimeout = 2 * time.Second
+
+var _ = Describe("[Feature: Client-Plugins]", func() {
+	f := framework.NewDefaultFramework()
+
+	Describe("UnixDomainSocket", func() {
+		It("Expose a unix domain socket echo server", func() {
+			serverConf := consts.DefaultServerConfig
+			clientConf := consts.DefaultClientConfig
+
+			getProxyConf := func(proxyName string, portName string, extra string) string {
+				return fmt.Sprintf(`
+				[%s]
+				type = tcp
+				remote_port = {{ .%s }}
+				plugin = unix_domain_socket
+				plugin_unix_path = {{ .%s }}
+				`+extra, proxyName, portName, framework.UDSEchoServerAddr)
+			}
+
+			tests := []struct {
+				proxyName   string
+				portName    string
+				extraConfig string
+			}{
+				{
+					proxyName: "normal",
+					portName:  framework.GenPortName("Normal"),
+				},
+				{
+					proxyName:   "with-encryption",
+					portName:    framework.GenPortName("WithEncryption"),
+					extraConfig: "use_encryption = true",
+				},
+				{
+					proxyName:   "with-compression",
+					portName:    framework.GenPortName("WithCompression"),
+					extraConfig: "use_compression = true",
+				},
+				{
+					proxyName: "with-encryption-and-compression",
+					portName:  framework.GenPortName("WithEncryptionAndCompression"),
+					extraConfig: `
+					use_encryption = true
+					use_compression = true
+					`,
+				},
+			}
+
+			// build all client config
+			for _, test := range tests {
+				clientConf += getProxyConf(test.proxyName, test.portName, test.extraConfig) + "\n"
+			}
+			// run frps and frpc
+			f.RunProcesses([]string{serverConf}, []string{clientConf})
+
+			for _, test := range tests {
+				framework.ExpectTCPRequest(f.UsedPorts[test.portName],
+					[]byte(consts.TestString), []byte(consts.TestString),
+					connTimeout, test.proxyName)
+			}
+		})
+	})
+})

+ 3 - 1
utils/net/udp.go

@@ -246,7 +246,9 @@ func (l *UDPListener) Accept() (net.Conn, error) {
 func (l *UDPListener) Close() error {
 	if !l.closeFlag {
 		l.closeFlag = true
-		l.readConn.Close()
+		if l.readConn != nil {
+			l.readConn.Close()
+		}
 	}
 	return nil
 }