Browse Source

improve http group load balancing (#3131)

fatedier 2 years ago
parent
commit
cf66ca10b4
5 changed files with 119 additions and 47 deletions
  1. 1 1
      pkg/util/version/version.go
  2. 41 38
      pkg/util/vhost/http.go
  3. 17 6
      pkg/util/vhost/vhost.go
  4. 37 2
      server/group/http.go
  5. 23 0
      test/e2e/features/group.go

+ 1 - 1
pkg/util/version/version.go

@@ -19,7 +19,7 @@ import (
 	"strings"
 )
 
-var version = "0.44.0"
+var version = "0.45.0"
 
 func Full() string {
 	return version

+ 41 - 38
pkg/util/vhost/http.go

@@ -60,19 +60,28 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) *
 		// Modify incoming requests by route policies.
 		Director: func(req *http.Request) {
 			req.URL.Scheme = "http"
-			url := req.Context().Value(RouteInfoURL).(string)
-			routeByHTTPUser := req.Context().Value(RouteInfoHTTPUser).(string)
-			oldHost, _ := util.CanonicalHost(req.Context().Value(RouteInfoHost).(string))
-			rc := rp.GetRouteConfig(oldHost, url, routeByHTTPUser)
+			reqRouteInfo := req.Context().Value(RouteInfoKey).(*RequestRouteInfo)
+			oldHost, _ := util.CanonicalHost(reqRouteInfo.Host)
+
+			rc := rp.GetRouteConfig(oldHost, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
 			if rc != nil {
 				if rc.RewriteHost != "" {
 					req.Host = rc.RewriteHost
 				}
-				// Set {domain}.{location}.{routeByHTTPUser} as URL host here to let http transport reuse connections.
-				// TODO(fatedier): use proxy name instead?
+
+				var endpoint string
+				if rc.ChooseEndpointFn != nil {
+					// ignore error here, it will use CreateConnFn instead later
+					endpoint, _ = rc.ChooseEndpointFn()
+					reqRouteInfo.Endpoint = endpoint
+					frpLog.Trace("choose endpoint name [%s] for http request host [%s] path [%s] httpuser [%s]",
+						endpoint, oldHost, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
+				}
+				// Set {domain}.{location}.{routeByHTTPUser}.{endpoint} as URL host here to let http transport reuse connections.
 				req.URL.Host = rc.Domain + "." +
 					base64.StdEncoding.EncodeToString([]byte(rc.Location)) + "." +
-					base64.StdEncoding.EncodeToString([]byte(rc.RouteByHTTPUser))
+					base64.StdEncoding.EncodeToString([]byte(rc.RouteByHTTPUser)) + "." +
+					base64.StdEncoding.EncodeToString([]byte(endpoint))
 
 				for k, v := range rc.Headers {
 					req.Header.Set(k, v)
@@ -85,12 +94,9 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) *
 		Transport: &http.Transport{
 			ResponseHeaderTimeout: rp.responseHeaderTimeout,
 			IdleConnTimeout:       60 * time.Second,
+			MaxIdleConnsPerHost:   5,
 			DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
-				url := ctx.Value(RouteInfoURL).(string)
-				host, _ := util.CanonicalHost(ctx.Value(RouteInfoHost).(string))
-				routerByHTTPUser := ctx.Value(RouteInfoHTTPUser).(string)
-				remote := ctx.Value(RouteInfoRemote).(string)
-				return rp.CreateConnection(host, url, routerByHTTPUser, remote)
+				return rp.CreateConnection(ctx.Value(RouteInfoKey).(*RequestRouteInfo), true)
 			},
 			Proxy: func(req *http.Request) (*url.URL, error) {
 				// Use proxy mode if there is host in HTTP first request line.
@@ -100,7 +106,7 @@ func NewHTTPReverseProxy(option HTTPReverseProxyOptions, vhostRouter *Routers) *
 				// Normal:
 				// GET / HTTP/1.1
 				// Host: example.com
-				urlHost := req.Context().Value(RouteInfoURLHost).(string)
+				urlHost := req.Context().Value(RouteInfoKey).(*RequestRouteInfo).URLHost
 				if urlHost != "" {
 					return req.URL, nil
 				}
@@ -143,14 +149,6 @@ func (rp *HTTPReverseProxy) GetRouteConfig(domain, location, routeByHTTPUser str
 	return nil
 }
 
-func (rp *HTTPReverseProxy) GetRealHost(domain, location, routeByHTTPUser string) (host string) {
-	vr, ok := rp.getVhost(domain, location, routeByHTTPUser)
-	if ok {
-		host = vr.payload.(*RouteConfig).RewriteHost
-	}
-	return
-}
-
 func (rp *HTTPReverseProxy) GetHeaders(domain, location, routeByHTTPUser string) (headers map[string]string) {
 	vr, ok := rp.getVhost(domain, location, routeByHTTPUser)
 	if ok {
@@ -160,15 +158,22 @@ func (rp *HTTPReverseProxy) GetHeaders(domain, location, routeByHTTPUser string)
 }
 
 // CreateConnection create a new connection by route config
-func (rp *HTTPReverseProxy) CreateConnection(domain, location, routeByHTTPUser string, remoteAddr string) (net.Conn, error) {
-	vr, ok := rp.getVhost(domain, location, routeByHTTPUser)
+func (rp *HTTPReverseProxy) CreateConnection(reqRouteInfo *RequestRouteInfo, byEndpoint bool) (net.Conn, error) {
+	host, _ := util.CanonicalHost(reqRouteInfo.Host)
+	vr, ok := rp.getVhost(host, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
 	if ok {
+		if byEndpoint {
+			fn := vr.payload.(*RouteConfig).CreateConnByEndpointFn
+			if fn != nil {
+				return fn(reqRouteInfo.Endpoint, reqRouteInfo.RemoteAddr)
+			}
+		}
 		fn := vr.payload.(*RouteConfig).CreateConnFn
 		if fn != nil {
-			return fn(remoteAddr)
+			return fn(reqRouteInfo.RemoteAddr)
 		}
 	}
-	return nil, fmt.Errorf("%v: %s %s %s", ErrNoRouteFound, domain, location, routeByHTTPUser)
+	return nil, fmt.Errorf("%v: %s %s %s", ErrNoRouteFound, host, reqRouteInfo.URL, reqRouteInfo.HTTPUser)
 }
 
 func (rp *HTTPReverseProxy) CheckAuth(domain, location, routeByHTTPUser, user, passwd string) bool {
@@ -244,12 +249,7 @@ func (rp *HTTPReverseProxy) connectHandler(rw http.ResponseWriter, req *http.Req
 		return
 	}
 
-	url := req.Context().Value(RouteInfoURL).(string)
-	routeByHTTPUser := req.Context().Value(RouteInfoHTTPUser).(string)
-	domain, _ := util.CanonicalHost(req.Context().Value(RouteInfoHost).(string))
-	remoteAddr := req.Context().Value(RouteInfoRemote).(string)
-
-	remote, err := rp.CreateConnection(domain, url, routeByHTTPUser, remoteAddr)
+	remote, err := rp.CreateConnection(req.Context().Value(RouteInfoKey).(*RequestRouteInfo), false)
 	if err != nil {
 		_ = notFoundResponse().Write(client)
 		client.Close()
@@ -278,11 +278,6 @@ func parseBasicAuth(auth string) (username, password string, ok bool) {
 }
 
 func (rp *HTTPReverseProxy) injectRequestInfoToCtx(req *http.Request) *http.Request {
-	newctx := req.Context()
-	newctx = context.WithValue(newctx, RouteInfoURL, req.URL.Path)
-	newctx = context.WithValue(newctx, RouteInfoHost, req.Host)
-	newctx = context.WithValue(newctx, RouteInfoURLHost, req.URL.Host)
-
 	user := ""
 	// If url host isn't empty, it's a proxy request. Get http user from Proxy-Authorization header.
 	if req.URL.Host != "" {
@@ -294,8 +289,16 @@ func (rp *HTTPReverseProxy) injectRequestInfoToCtx(req *http.Request) *http.Requ
 	if user == "" {
 		user, _, _ = req.BasicAuth()
 	}
-	newctx = context.WithValue(newctx, RouteInfoHTTPUser, user)
-	newctx = context.WithValue(newctx, RouteInfoRemote, req.RemoteAddr)
+
+	reqRouteInfo := &RequestRouteInfo{
+		URL:        req.URL.Path,
+		Host:       req.Host,
+		HTTPUser:   user,
+		RemoteAddr: req.RemoteAddr,
+		URLHost:    req.URL.Host,
+	}
+	newctx := req.Context()
+	newctx = context.WithValue(newctx, RouteInfoKey, reqRouteInfo)
 	return req.Clone(newctx)
 }
 

+ 17 - 6
pkg/util/vhost/vhost.go

@@ -29,13 +29,18 @@ import (
 type RouteInfo string
 
 const (
-	RouteInfoURL      RouteInfo = "url"
-	RouteInfoHost     RouteInfo = "host"
-	RouteInfoHTTPUser RouteInfo = "httpUser"
-	RouteInfoRemote   RouteInfo = "remote"
-	RouteInfoURLHost  RouteInfo = "urlHost"
+	RouteInfoKey RouteInfo = "routeInfo"
 )
 
+type RequestRouteInfo struct {
+	URL        string
+	Host       string
+	HTTPUser   string
+	RemoteAddr string
+	URLHost    string
+	Endpoint   string
+}
+
 type (
 	muxFunc         func(net.Conn) (net.Conn, map[string]string, error)
 	httpAuthFunc    func(net.Conn, string, string, string) (bool, error)
@@ -75,8 +80,12 @@ func NewMuxer(
 	return mux, nil
 }
 
+type ChooseEndpointFunc func() (string, error)
+
 type CreateConnFunc func(remoteAddr string) (net.Conn, error)
 
+type CreateConnByEndpointFunc func(endpoint, remoteAddr string) (net.Conn, error)
+
 // RouteConfig is the params used to match HTTP requests
 type RouteConfig struct {
 	Domain          string
@@ -87,7 +96,9 @@ type RouteConfig struct {
 	Headers         map[string]string
 	RouteByHTTPUser string
 
-	CreateConnFn CreateConnFunc
+	CreateConnFn           CreateConnFunc
+	ChooseEndpointFn       ChooseEndpointFunc
+	CreateConnByEndpointFn CreateConnByEndpointFunc
 }
 
 // listen for a new domain name, if rewriteHost is not empty  and rewriteFunc is not nil

+ 37 - 2
server/group/http.go

@@ -10,7 +10,7 @@ import (
 )
 
 type HTTPGroupController struct {
-	// groups by indexKey
+	// groups indexed by group name
 	groups map[string]*HTTPGroup
 
 	// register createConn for each group to vhostRouter.
@@ -65,7 +65,7 @@ type HTTPGroup struct {
 	location        string
 	routeByHTTPUser string
 
-	// CreateConnFuncs indexed by echo proxy name
+	// CreateConnFuncs indexed by proxy name
 	createFuncs map[string]vhost.CreateConnFunc
 	pxyNames    []string
 	index       uint64
@@ -91,6 +91,8 @@ func (g *HTTPGroup) Register(
 		// the first proxy in this group
 		tmp := routeConfig // copy object
 		tmp.CreateConnFn = g.createConn
+		tmp.ChooseEndpointFn = g.chooseEndpoint
+		tmp.CreateConnByEndpointFn = g.createConnByEndpoint
 		err = g.ctl.vhostRouter.Add(routeConfig.Domain, routeConfig.Location, routeConfig.RouteByHTTPUser, &tmp)
 		if err != nil {
 			return
@@ -161,3 +163,36 @@ func (g *HTTPGroup) createConn(remoteAddr string) (net.Conn, error) {
 
 	return f(remoteAddr)
 }
+
+func (g *HTTPGroup) chooseEndpoint() (string, error) {
+	newIndex := atomic.AddUint64(&g.index, 1)
+	name := ""
+
+	g.mu.RLock()
+	group := g.group
+	domain := g.domain
+	location := g.location
+	routeByHTTPUser := g.routeByHTTPUser
+	if len(g.pxyNames) > 0 {
+		name = g.pxyNames[int(newIndex)%len(g.pxyNames)]
+	}
+	g.mu.RUnlock()
+
+	if name == "" {
+		return "", fmt.Errorf("no healthy endpoint for http group [%s], domain [%s], location [%s], routeByHTTPUser [%s]",
+			group, domain, location, routeByHTTPUser)
+	}
+	return name, nil
+}
+
+func (g *HTTPGroup) createConnByEndpoint(endpoint, remoteAddr string) (net.Conn, error) {
+	var f vhost.CreateConnFunc
+	g.mu.RLock()
+	f = g.createFuncs[endpoint]
+	g.mu.RUnlock()
+
+	if f == nil {
+		return nil, fmt.Errorf("no CreateConnFunc for endpoint [%s] in group [%s]", endpoint, g.group)
+	}
+	return f(remoteAddr)
+}

+ 23 - 0
test/e2e/features/group.go

@@ -217,6 +217,29 @@ var _ = ginkgo.Describe("[Feature: Group]", func() {
 
 			f.RunProcesses([]string{serverConf}, []string{clientConf})
 
+			// send first HTTP request
+			var contents []string
+			framework.NewRequestExpect(f).Port(vhostPort).
+				RequestModify(func(r *request.Request) {
+					r.HTTP().HTTPHost("example.com")
+				}).
+				Ensure(func(resp *request.Response) bool {
+					contents = append(contents, string(resp.Content))
+					return true
+				})
+
+			// send second HTTP request, should be forwarded to another service
+			framework.NewRequestExpect(f).Port(vhostPort).
+				RequestModify(func(r *request.Request) {
+					r.HTTP().HTTPHost("example.com")
+				}).
+				Ensure(func(resp *request.Response) bool {
+					contents = append(contents, string(resp.Content))
+					return true
+				})
+
+			framework.ExpectContainElements(contents, []string{"foo", "bar"})
+
 			// check foo and bar is ok
 			results := doFooBarHTTPRequest(vhostPort, "example.com")
 			framework.ExpectContainElements(results, []string{"foo", "bar"})