group.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package features
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "github.com/onsi/ginkgo/v2"
  9. "github.com/fatedier/frp/pkg/transport"
  10. "github.com/fatedier/frp/test/e2e/framework"
  11. "github.com/fatedier/frp/test/e2e/framework/consts"
  12. "github.com/fatedier/frp/test/e2e/mock/server/httpserver"
  13. "github.com/fatedier/frp/test/e2e/mock/server/streamserver"
  14. "github.com/fatedier/frp/test/e2e/pkg/request"
  15. )
  16. var _ = ginkgo.Describe("[Feature: Group]", func() {
  17. f := framework.NewDefaultFramework()
  18. newHTTPServer := func(port int, respContent string) *httpserver.Server {
  19. return httpserver.New(
  20. httpserver.WithBindPort(port),
  21. httpserver.WithHandler(framework.SpecifiedHTTPBodyHandler([]byte(respContent))),
  22. )
  23. }
  24. validateFooBarResponse := func(resp *request.Response) bool {
  25. if string(resp.Content) == "foo" || string(resp.Content) == "bar" {
  26. return true
  27. }
  28. return false
  29. }
  30. doFooBarHTTPRequest := func(vhostPort int, host string) []string {
  31. results := []string{}
  32. var wait sync.WaitGroup
  33. var mu sync.Mutex
  34. expectFn := func() {
  35. framework.NewRequestExpect(f).Port(vhostPort).
  36. RequestModify(func(r *request.Request) {
  37. r.HTTP().HTTPHost(host)
  38. }).
  39. Ensure(validateFooBarResponse, func(resp *request.Response) bool {
  40. mu.Lock()
  41. defer mu.Unlock()
  42. results = append(results, string(resp.Content))
  43. return true
  44. })
  45. }
  46. for i := 0; i < 10; i++ {
  47. wait.Add(1)
  48. go func() {
  49. defer wait.Done()
  50. expectFn()
  51. }()
  52. }
  53. wait.Wait()
  54. return results
  55. }
  56. ginkgo.Describe("Load Balancing", func() {
  57. ginkgo.It("TCP", func() {
  58. serverConf := consts.DefaultServerConfig
  59. clientConf := consts.DefaultClientConfig
  60. fooPort := f.AllocPort()
  61. fooServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(fooPort), streamserver.WithRespContent([]byte("foo")))
  62. f.RunServer("", fooServer)
  63. barPort := f.AllocPort()
  64. barServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(barPort), streamserver.WithRespContent([]byte("bar")))
  65. f.RunServer("", barServer)
  66. remotePort := f.AllocPort()
  67. clientConf += fmt.Sprintf(`
  68. [[proxies]]
  69. name = "foo"
  70. type = "tcp"
  71. localPort = %d
  72. remotePort = %d
  73. loadBalancer.group = "test"
  74. loadBalancer.groupKey = "123"
  75. [[proxies]]
  76. name = "bar"
  77. type = "tcp"
  78. localPort = %d
  79. remotePort = %d
  80. loadBalancer.group = "test"
  81. loadBalancer.groupKey = "123"
  82. `, fooPort, remotePort, barPort, remotePort)
  83. f.RunProcesses([]string{serverConf}, []string{clientConf})
  84. fooCount := 0
  85. barCount := 0
  86. for i := 0; i < 10; i++ {
  87. framework.NewRequestExpect(f).Explain("times " + strconv.Itoa(i)).Port(remotePort).Ensure(func(resp *request.Response) bool {
  88. switch string(resp.Content) {
  89. case "foo":
  90. fooCount++
  91. case "bar":
  92. barCount++
  93. default:
  94. return false
  95. }
  96. return true
  97. })
  98. }
  99. framework.ExpectTrue(fooCount > 1 && barCount > 1, "fooCount: %d, barCount: %d", fooCount, barCount)
  100. })
  101. ginkgo.It("HTTPS", func() {
  102. vhostHTTPSPort := f.AllocPort()
  103. serverConf := consts.DefaultServerConfig + fmt.Sprintf(`
  104. vhostHTTPSPort = %d
  105. `, vhostHTTPSPort)
  106. clientConf := consts.DefaultClientConfig
  107. tlsConfig, err := transport.NewServerTLSConfig("", "", "")
  108. framework.ExpectNoError(err)
  109. fooPort := f.AllocPort()
  110. fooServer := httpserver.New(
  111. httpserver.WithBindPort(fooPort),
  112. httpserver.WithHandler(framework.SpecifiedHTTPBodyHandler([]byte("foo"))),
  113. httpserver.WithTLSConfig(tlsConfig),
  114. )
  115. f.RunServer("", fooServer)
  116. barPort := f.AllocPort()
  117. barServer := httpserver.New(
  118. httpserver.WithBindPort(barPort),
  119. httpserver.WithHandler(framework.SpecifiedHTTPBodyHandler([]byte("bar"))),
  120. httpserver.WithTLSConfig(tlsConfig),
  121. )
  122. f.RunServer("", barServer)
  123. clientConf += fmt.Sprintf(`
  124. [[proxies]]
  125. name = "foo"
  126. type = "https"
  127. localPort = %d
  128. customDomains = ["example.com"]
  129. loadBalancer.group = "test"
  130. loadBalancer.groupKey = "123"
  131. [[proxies]]
  132. name = "bar"
  133. type = "https"
  134. localPort = %d
  135. customDomains = ["example.com"]
  136. loadBalancer.group = "test"
  137. loadBalancer.groupKey = "123"
  138. `, fooPort, barPort)
  139. f.RunProcesses([]string{serverConf}, []string{clientConf})
  140. fooCount := 0
  141. barCount := 0
  142. for i := 0; i < 10; i++ {
  143. framework.NewRequestExpect(f).
  144. Explain("times " + strconv.Itoa(i)).
  145. Port(vhostHTTPSPort).
  146. RequestModify(func(r *request.Request) {
  147. r.HTTPS().HTTPHost("example.com").TLSConfig(&tls.Config{
  148. ServerName: "example.com",
  149. InsecureSkipVerify: true,
  150. })
  151. }).
  152. Ensure(func(resp *request.Response) bool {
  153. switch string(resp.Content) {
  154. case "foo":
  155. fooCount++
  156. case "bar":
  157. barCount++
  158. default:
  159. return false
  160. }
  161. return true
  162. })
  163. }
  164. framework.ExpectTrue(fooCount > 1 && barCount > 1, "fooCount: %d, barCount: %d", fooCount, barCount)
  165. })
  166. })
  167. ginkgo.Describe("Health Check", func() {
  168. ginkgo.It("TCP", func() {
  169. serverConf := consts.DefaultServerConfig
  170. clientConf := consts.DefaultClientConfig
  171. fooPort := f.AllocPort()
  172. fooServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(fooPort), streamserver.WithRespContent([]byte("foo")))
  173. f.RunServer("", fooServer)
  174. barPort := f.AllocPort()
  175. barServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(barPort), streamserver.WithRespContent([]byte("bar")))
  176. f.RunServer("", barServer)
  177. remotePort := f.AllocPort()
  178. clientConf += fmt.Sprintf(`
  179. [[proxies]]
  180. name = "foo"
  181. type = "tcp"
  182. localPort = %d
  183. remotePort = %d
  184. loadBalancer.group = "test"
  185. loadBalancer.groupKey = "123"
  186. healthCheck.type = "tcp"
  187. healthCheck.intervalSeconds = 1
  188. [[proxies]]
  189. name = "bar"
  190. type = "tcp"
  191. localPort = %d
  192. remotePort = %d
  193. loadBalancer.group = "test"
  194. loadBalancer.groupKey = "123"
  195. healthCheck.type = "tcp"
  196. healthCheck.intervalSeconds = 1
  197. `, fooPort, remotePort, barPort, remotePort)
  198. f.RunProcesses([]string{serverConf}, []string{clientConf})
  199. // check foo and bar is ok
  200. results := []string{}
  201. for i := 0; i < 10; i++ {
  202. framework.NewRequestExpect(f).Port(remotePort).Ensure(validateFooBarResponse, func(resp *request.Response) bool {
  203. results = append(results, string(resp.Content))
  204. return true
  205. })
  206. }
  207. framework.ExpectContainElements(results, []string{"foo", "bar"})
  208. // close bar server, check foo is ok
  209. barServer.Close()
  210. time.Sleep(2 * time.Second)
  211. for i := 0; i < 10; i++ {
  212. framework.NewRequestExpect(f).Port(remotePort).ExpectResp([]byte("foo")).Ensure()
  213. }
  214. // resume bar server, check foo and bar is ok
  215. f.RunServer("", barServer)
  216. time.Sleep(2 * time.Second)
  217. results = []string{}
  218. for i := 0; i < 10; i++ {
  219. framework.NewRequestExpect(f).Port(remotePort).Ensure(validateFooBarResponse, func(resp *request.Response) bool {
  220. results = append(results, string(resp.Content))
  221. return true
  222. })
  223. }
  224. framework.ExpectContainElements(results, []string{"foo", "bar"})
  225. })
  226. ginkgo.It("HTTP", func() {
  227. vhostPort := f.AllocPort()
  228. serverConf := consts.DefaultServerConfig + fmt.Sprintf(`
  229. vhostHTTPPort = %d
  230. `, vhostPort)
  231. clientConf := consts.DefaultClientConfig
  232. fooPort := f.AllocPort()
  233. fooServer := newHTTPServer(fooPort, "foo")
  234. f.RunServer("", fooServer)
  235. barPort := f.AllocPort()
  236. barServer := newHTTPServer(barPort, "bar")
  237. f.RunServer("", barServer)
  238. clientConf += fmt.Sprintf(`
  239. [[proxies]]
  240. name = "foo"
  241. type = "http"
  242. localPort = %d
  243. customDomains = ["example.com"]
  244. loadBalancer.group = "test"
  245. loadBalancer.groupKey = "123"
  246. healthCheck.type = "http"
  247. healthCheck.intervalSeconds = 1
  248. healthCheck.path = "/healthz"
  249. [[proxies]]
  250. name = "bar"
  251. type = "http"
  252. localPort = %d
  253. customDomains = ["example.com"]
  254. loadBalancer.group = "test"
  255. loadBalancer.groupKey = "123"
  256. healthCheck.type = "http"
  257. healthCheck.intervalSeconds = 1
  258. healthCheck.path = "/healthz"
  259. `, fooPort, barPort)
  260. f.RunProcesses([]string{serverConf}, []string{clientConf})
  261. // send first HTTP request
  262. var contents []string
  263. framework.NewRequestExpect(f).Port(vhostPort).
  264. RequestModify(func(r *request.Request) {
  265. r.HTTP().HTTPHost("example.com")
  266. }).
  267. Ensure(func(resp *request.Response) bool {
  268. contents = append(contents, string(resp.Content))
  269. return true
  270. })
  271. // send second HTTP request, should be forwarded to another service
  272. framework.NewRequestExpect(f).Port(vhostPort).
  273. RequestModify(func(r *request.Request) {
  274. r.HTTP().HTTPHost("example.com")
  275. }).
  276. Ensure(func(resp *request.Response) bool {
  277. contents = append(contents, string(resp.Content))
  278. return true
  279. })
  280. framework.ExpectContainElements(contents, []string{"foo", "bar"})
  281. // check foo and bar is ok
  282. results := doFooBarHTTPRequest(vhostPort, "example.com")
  283. framework.ExpectContainElements(results, []string{"foo", "bar"})
  284. // close bar server, check foo is ok
  285. barServer.Close()
  286. time.Sleep(2 * time.Second)
  287. results = doFooBarHTTPRequest(vhostPort, "example.com")
  288. framework.ExpectContainElements(results, []string{"foo"})
  289. framework.ExpectNotContainElements(results, []string{"bar"})
  290. // resume bar server, check foo and bar is ok
  291. f.RunServer("", barServer)
  292. time.Sleep(2 * time.Second)
  293. results = doFooBarHTTPRequest(vhostPort, "example.com")
  294. framework.ExpectContainElements(results, []string{"foo", "bar"})
  295. })
  296. })
  297. })