Browse Source

Merge pull request #998 from fatedier/health

frpc: support health check
fatedier 6 years ago
parent
commit
4f0ee5980d

+ 1 - 1
Makefile

@@ -34,7 +34,7 @@ gotest:
 	go test -v --cover ./utils/...
 
 ci:
-	go test -count=1 -v ./tests/...
+	go test -count=1 -p=1 -v ./tests/...
 
 alltest: gotest ci
 	

+ 2 - 1
client/admin_api.go

@@ -24,6 +24,7 @@ import (
 
 	ini "github.com/vaughan0/go-ini"
 
+	"github.com/fatedier/frp/client/proxy"
 	"github.com/fatedier/frp/g"
 	"github.com/fatedier/frp/models/config"
 	"github.com/fatedier/frp/utils/log"
@@ -121,7 +122,7 @@ func (a ByProxyStatusResp) Len() int           { return len(a) }
 func (a ByProxyStatusResp) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
 func (a ByProxyStatusResp) Less(i, j int) bool { return strings.Compare(a[i].Name, a[j].Name) < 0 }
 
-func NewProxyStatusResp(status *ProxyStatus) ProxyStatusResp {
+func NewProxyStatusResp(status *proxy.ProxyStatus) ProxyStatusResp {
 	psr := ProxyStatusResp{
 		Name:   status.Name,
 		Type:   status.Type,

+ 24 - 28
client/control.go

@@ -21,6 +21,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/fatedier/frp/client/proxy"
 	"github.com/fatedier/frp/g"
 	"github.com/fatedier/frp/models/config"
 	"github.com/fatedier/frp/models/msg"
@@ -37,7 +38,8 @@ type Control struct {
 	runId string
 
 	// manage all proxies
-	pm *ProxyManager
+	pxyCfgs map[string]config.ProxyConf
+	pm      *proxy.ProxyManager
 
 	// manage all visitors
 	vm *VisitorManager
@@ -76,6 +78,7 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs m
 		runId:              runId,
 		conn:               conn,
 		session:            session,
+		pxyCfgs:            pxyCfgs,
 		sendCh:             make(chan msg.Message, 100),
 		readCh:             make(chan msg.Message, 100),
 		closedCh:           make(chan struct{}),
@@ -85,8 +88,8 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs m
 		msgHandlerShutdown: shutdown.New(),
 		Logger:             log.NewPrefixLogger(""),
 	}
-	ctl.pm = NewProxyManager(ctl.sendCh, "")
-	ctl.pm.Reload(pxyCfgs, false)
+	ctl.pm = proxy.NewProxyManager(ctl.sendCh, runId)
+
 	ctl.vm = NewVisitorManager(ctl)
 	ctl.vm.Reload(visitorCfgs)
 	return ctl
@@ -95,10 +98,10 @@ func NewControl(runId string, conn frpNet.Conn, session *fmux.Session, pxyCfgs m
 func (ctl *Control) Run() {
 	go ctl.worker()
 
-	// start all local visitors and send NewProxy message for all configured proxies
-	ctl.pm.Reset(ctl.sendCh, ctl.runId)
-	ctl.pm.CheckAndStartProxy([]string{ProxyStatusNew})
+	// start all proxies
+	ctl.pm.Reload(ctl.pxyCfgs)
 
+	// start all visitors
 	go ctl.vm.Run()
 	return
 }
@@ -142,7 +145,7 @@ func (ctl *Control) HandleNewProxyResp(inMsg *msg.NewProxyResp) {
 }
 
 func (ctl *Control) Close() error {
-	ctl.pm.CloseProxies()
+	ctl.conn.Close()
 	return nil
 }
 
@@ -275,33 +278,26 @@ func (ctl *Control) worker() {
 	go ctl.reader()
 	go ctl.writer()
 
-	checkInterval := 60 * time.Second
-	checkProxyTicker := time.NewTicker(checkInterval)
-
-	for {
-		select {
-		case <-checkProxyTicker.C:
-			// check which proxy registered failed and reregister it to server
-			ctl.pm.CheckAndStartProxy([]string{ProxyStatusStartErr, ProxyStatusClosed})
-		case <-ctl.closedCh:
-			// close related channels and wait until other goroutines done
-			close(ctl.readCh)
-			ctl.readerShutdown.WaitDone()
-			ctl.msgHandlerShutdown.WaitDone()
+	select {
+	case <-ctl.closedCh:
+		// close related channels and wait until other goroutines done
+		close(ctl.readCh)
+		ctl.readerShutdown.WaitDone()
+		ctl.msgHandlerShutdown.WaitDone()
 
-			close(ctl.sendCh)
-			ctl.writerShutdown.WaitDone()
+		close(ctl.sendCh)
+		ctl.writerShutdown.WaitDone()
 
-			ctl.pm.CloseProxies()
+		ctl.pm.Close()
+		ctl.vm.Close()
 
-			close(ctl.closedDoneCh)
-			return
-		}
+		close(ctl.closedDoneCh)
+		return
 	}
 }
 
 func (ctl *Control) ReloadConf(pxyCfgs map[string]config.ProxyConf, visitorCfgs map[string]config.VisitorConf) error {
 	ctl.vm.Reload(visitorCfgs)
-	err := ctl.pm.Reload(pxyCfgs, true)
-	return err
+	ctl.pm.Reload(pxyCfgs)
+	return nil
 }

+ 28 - 0
client/event/event.go

@@ -0,0 +1,28 @@
+package event
+
+import (
+	"errors"
+
+	"github.com/fatedier/frp/models/msg"
+)
+
+type EventType int
+
+const (
+	EvStartProxy EventType = iota
+	EvCloseProxy
+)
+
+var (
+	ErrPayloadType = errors.New("error payload type")
+)
+
+type EventHandler func(evType EventType, payload interface{}) error
+
+type StartProxyPayload struct {
+	NewProxyMsg *msg.NewProxy
+}
+
+type CloseProxyPayload struct {
+	CloseProxyMsg *msg.CloseProxy
+}

+ 44 - 13
client/health.go → client/health/health.go

@@ -12,13 +12,21 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package client
+package health
 
 import (
 	"context"
+	"errors"
+	"fmt"
 	"net"
 	"net/http"
 	"time"
+
+	"github.com/fatedier/frp/utils/log"
+)
+
+var (
+	ErrHealthCheckType = errors.New("error health check type")
 )
 
 type HealthCheckMonitor struct {
@@ -40,6 +48,8 @@ type HealthCheckMonitor struct {
 
 	ctx    context.Context
 	cancel context.CancelFunc
+
+	l log.Logger
 }
 
 func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFailedTimes int, addr string, url string,
@@ -70,6 +80,10 @@ func NewHealthCheckMonitor(checkType string, intervalS int, timeoutS int, maxFai
 	}
 }
 
+func (monitor *HealthCheckMonitor) SetLogger(l log.Logger) {
+	monitor.l = l
+}
+
 func (monitor *HealthCheckMonitor) Start() {
 	go monitor.checkWorker()
 }
@@ -81,7 +95,7 @@ func (monitor *HealthCheckMonitor) Stop() {
 func (monitor *HealthCheckMonitor) checkWorker() {
 	for {
 		ctx, cancel := context.WithDeadline(monitor.ctx, time.Now().Add(monitor.timeout))
-		ok := monitor.doCheck(ctx)
+		err := monitor.doCheck(ctx)
 
 		// check if this monitor has been closed
 		select {
@@ -92,14 +106,26 @@ func (monitor *HealthCheckMonitor) checkWorker() {
 			cancel()
 		}
 
-		if ok {
+		if err == nil {
+			if monitor.l != nil {
+				monitor.l.Trace("do one health check success")
+			}
 			if !monitor.statusOK && monitor.statusNormalFn != nil {
+				if monitor.l != nil {
+					monitor.l.Info("health check status change to success")
+				}
 				monitor.statusOK = true
 				monitor.statusNormalFn()
 			}
 		} else {
+			if monitor.l != nil {
+				monitor.l.Warn("do one health check failed: %v", err)
+			}
 			monitor.failedTimes++
 			if monitor.statusOK && int(monitor.failedTimes) >= monitor.maxFailedTimes && monitor.statusFailedFn != nil {
+				if monitor.l != nil {
+					monitor.l.Warn("health check status change to failed")
+				}
 				monitor.statusOK = false
 				monitor.statusFailedFn()
 			}
@@ -109,39 +135,44 @@ func (monitor *HealthCheckMonitor) checkWorker() {
 	}
 }
 
-func (monitor *HealthCheckMonitor) doCheck(ctx context.Context) bool {
+func (monitor *HealthCheckMonitor) doCheck(ctx context.Context) error {
 	switch monitor.checkType {
 	case "tcp":
 		return monitor.doTcpCheck(ctx)
 	case "http":
 		return monitor.doHttpCheck(ctx)
 	default:
-		return false
+		return ErrHealthCheckType
 	}
 }
 
-func (monitor *HealthCheckMonitor) doTcpCheck(ctx context.Context) bool {
+func (monitor *HealthCheckMonitor) doTcpCheck(ctx context.Context) error {
+	// if tcp address is not specified, always return nil
+	if monitor.addr == "" {
+		return nil
+	}
+
 	var d net.Dialer
 	conn, err := d.DialContext(ctx, "tcp", monitor.addr)
 	if err != nil {
-		return false
+		return err
 	}
 	conn.Close()
-	return true
+	return nil
 }
 
-func (monitor *HealthCheckMonitor) doHttpCheck(ctx context.Context) bool {
+func (monitor *HealthCheckMonitor) doHttpCheck(ctx context.Context) error {
 	req, err := http.NewRequest("GET", monitor.url, nil)
 	if err != nil {
-		return false
+		return err
 	}
 	resp, err := http.DefaultClient.Do(req)
 	if err != nil {
-		return false
+		return err
 	}
 
 	if resp.StatusCode/100 != 2 {
-		return false
+		return fmt.Errorf("do http health check, StatusCode is [%d] not 2xx", resp.StatusCode)
 	}
-	return true
+	return nil
 }

+ 1 - 1
client/proxy.go → client/proxy/proxy.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package client
+package proxy
 
 import (
 	"bytes"

+ 138 - 0
client/proxy/proxy_manager.go

@@ -0,0 +1,138 @@
+package proxy
+
+import (
+	"fmt"
+	"sync"
+
+	"github.com/fatedier/frp/client/event"
+	"github.com/fatedier/frp/models/config"
+	"github.com/fatedier/frp/models/msg"
+	"github.com/fatedier/frp/utils/log"
+	frpNet "github.com/fatedier/frp/utils/net"
+
+	"github.com/fatedier/golib/errors"
+)
+
+type ProxyManager struct {
+	sendCh  chan (msg.Message)
+	proxies map[string]*ProxyWrapper
+
+	closed bool
+	mu     sync.RWMutex
+
+	logPrefix string
+	log.Logger
+}
+
+func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
+	return &ProxyManager{
+		proxies:   make(map[string]*ProxyWrapper),
+		sendCh:    msgSendCh,
+		closed:    false,
+		logPrefix: logPrefix,
+		Logger:    log.NewPrefixLogger(logPrefix),
+	}
+}
+
+func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
+	pm.mu.RLock()
+	pxy, ok := pm.proxies[name]
+	pm.mu.RUnlock()
+	if !ok {
+		return fmt.Errorf("proxy [%s] not found", name)
+	}
+
+	err := pxy.SetRunningStatus(remoteAddr, serverRespErr)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (pm *ProxyManager) Close() {
+	pm.mu.RLock()
+	defer pm.mu.RUnlock()
+	for _, pxy := range pm.proxies {
+		pxy.Stop()
+	}
+}
+
+func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
+	pm.mu.RLock()
+	pw, ok := pm.proxies[name]
+	pm.mu.RUnlock()
+	if ok {
+		pw.InWorkConn(workConn)
+	} else {
+		workConn.Close()
+	}
+}
+
+func (pm *ProxyManager) HandleEvent(evType event.EventType, payload interface{}) error {
+	var m msg.Message
+	switch e := payload.(type) {
+	case *event.StartProxyPayload:
+		m = e.NewProxyMsg
+	case *event.CloseProxyPayload:
+		m = e.CloseProxyMsg
+	default:
+		return event.ErrPayloadType
+	}
+
+	err := errors.PanicToError(func() {
+		pm.sendCh <- m
+	})
+	return err
+}
+
+func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
+	ps := make([]*ProxyStatus, 0)
+	pm.mu.RLock()
+	defer pm.mu.RUnlock()
+	for _, pxy := range pm.proxies {
+		ps = append(ps, pxy.GetStatus())
+	}
+	return ps
+}
+
+func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf) {
+	pm.mu.Lock()
+	defer pm.mu.Unlock()
+
+	delPxyNames := make([]string, 0)
+	for name, pxy := range pm.proxies {
+		del := false
+		cfg, ok := pxyCfgs[name]
+		if !ok {
+			del = true
+		} else {
+			if !pxy.Cfg.Compare(cfg) {
+				del = true
+			}
+		}
+
+		if del {
+			delPxyNames = append(delPxyNames, name)
+			delete(pm.proxies, name)
+
+			pxy.Stop()
+		}
+	}
+	if len(delPxyNames) > 0 {
+		pm.Info("proxy removed: %v", delPxyNames)
+	}
+
+	addPxyNames := make([]string, 0)
+	for name, cfg := range pxyCfgs {
+		if _, ok := pm.proxies[name]; !ok {
+			pxy := NewProxyWrapper(cfg, pm.HandleEvent, pm.logPrefix)
+			pm.proxies[name] = pxy
+			addPxyNames = append(addPxyNames, name)
+
+			pxy.Start()
+		}
+	}
+	if len(addPxyNames) > 0 {
+		pm.Info("proxy added: %v", addPxyNames)
+	}
+}

+ 244 - 0
client/proxy/proxy_wrapper.go

@@ -0,0 +1,244 @@
+package proxy
+
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/fatedier/frp/client/event"
+	"github.com/fatedier/frp/client/health"
+	"github.com/fatedier/frp/models/config"
+	"github.com/fatedier/frp/models/msg"
+	"github.com/fatedier/frp/utils/log"
+	frpNet "github.com/fatedier/frp/utils/net"
+
+	"github.com/fatedier/golib/errors"
+)
+
+const (
+	ProxyStatusNew         = "new"
+	ProxyStatusWaitStart   = "wait start"
+	ProxyStatusStartErr    = "start error"
+	ProxyStatusRunning     = "running"
+	ProxyStatusCheckFailed = "check failed"
+	ProxyStatusClosed      = "closed"
+)
+
+var (
+	statusCheckInterval time.Duration = 3 * time.Second
+	waitResponseTimeout               = 20 * time.Second
+	startErrTimeout                   = 30 * time.Second
+)
+
+type ProxyStatus struct {
+	Name   string           `json:"name"`
+	Type   string           `json:"type"`
+	Status string           `json:"status"`
+	Err    string           `json:"err"`
+	Cfg    config.ProxyConf `json:"cfg"`
+
+	// Got from server.
+	RemoteAddr string `json:"remote_addr"`
+}
+
+type ProxyWrapper struct {
+	ProxyStatus
+
+	// underlying proxy
+	pxy Proxy
+
+	// if ProxyConf has healcheck config
+	// monitor will watch if it is alive
+	monitor *health.HealthCheckMonitor
+
+	// event handler
+	handler event.EventHandler
+
+	health           uint32
+	lastSendStartMsg time.Time
+	lastStartErr     time.Time
+	closeCh          chan struct{}
+	healthNotifyCh   chan struct{}
+	mu               sync.RWMutex
+
+	log.Logger
+}
+
+func NewProxyWrapper(cfg config.ProxyConf, eventHandler event.EventHandler, logPrefix string) *ProxyWrapper {
+	baseInfo := cfg.GetBaseInfo()
+	pw := &ProxyWrapper{
+		ProxyStatus: ProxyStatus{
+			Name:   baseInfo.ProxyName,
+			Type:   baseInfo.ProxyType,
+			Status: ProxyStatusNew,
+			Cfg:    cfg,
+		},
+		closeCh:        make(chan struct{}),
+		healthNotifyCh: make(chan struct{}),
+		handler:        eventHandler,
+		Logger:         log.NewPrefixLogger(logPrefix),
+	}
+	pw.AddLogPrefix(pw.Name)
+
+	if baseInfo.HealthCheckType != "" {
+		pw.health = 1 // means failed
+		pw.monitor = health.NewHealthCheckMonitor(baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS,
+			baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr,
+			baseInfo.HealthCheckUrl, pw.statusNormalCallback, pw.statusFailedCallback)
+		pw.monitor.SetLogger(pw.Logger)
+		pw.Trace("enable health check monitor")
+	}
+
+	pw.pxy = NewProxy(pw.Cfg)
+	return pw
+}
+
+func (pw *ProxyWrapper) SetRunningStatus(remoteAddr string, respErr string) error {
+	pw.mu.Lock()
+	defer pw.mu.Unlock()
+	if pw.Status != ProxyStatusWaitStart {
+		return fmt.Errorf("status not wait start, ignore start message")
+	}
+
+	pw.RemoteAddr = remoteAddr
+	if respErr != "" {
+		pw.Status = ProxyStatusStartErr
+		pw.Err = respErr
+		pw.lastStartErr = time.Now()
+		return fmt.Errorf(pw.Err)
+	}
+
+	if err := pw.pxy.Run(); err != nil {
+		pw.Status = ProxyStatusStartErr
+		pw.Err = err.Error()
+		pw.lastStartErr = time.Now()
+		return err
+	}
+
+	pw.Status = ProxyStatusRunning
+	pw.Err = ""
+	return nil
+}
+
+func (pw *ProxyWrapper) Start() {
+	go pw.checkWorker()
+	if pw.monitor != nil {
+		go pw.monitor.Start()
+	}
+}
+
+func (pw *ProxyWrapper) Stop() {
+	pw.mu.Lock()
+	defer pw.mu.Unlock()
+	close(pw.closeCh)
+	close(pw.healthNotifyCh)
+	pw.pxy.Close()
+	if pw.monitor != nil {
+		pw.monitor.Stop()
+	}
+	pw.Status = ProxyStatusClosed
+
+	pw.handler(event.EvCloseProxy, &event.CloseProxyPayload{
+		CloseProxyMsg: &msg.CloseProxy{
+			ProxyName: pw.Name,
+		},
+	})
+}
+
+func (pw *ProxyWrapper) checkWorker() {
+	if pw.monitor != nil {
+		// let monitor do check request first
+		time.Sleep(500 * time.Millisecond)
+	}
+	for {
+		// check proxy status
+		now := time.Now()
+		if atomic.LoadUint32(&pw.health) == 0 {
+			pw.mu.Lock()
+			if pw.Status == ProxyStatusNew ||
+				pw.Status == ProxyStatusCheckFailed ||
+				(pw.Status == ProxyStatusWaitStart && now.After(pw.lastSendStartMsg.Add(waitResponseTimeout))) ||
+				(pw.Status == ProxyStatusStartErr && now.After(pw.lastStartErr.Add(startErrTimeout))) {
+
+				pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusWaitStart)
+				pw.Status = ProxyStatusWaitStart
+
+				var newProxyMsg msg.NewProxy
+				pw.Cfg.MarshalToMsg(&newProxyMsg)
+				pw.lastSendStartMsg = now
+				pw.handler(event.EvStartProxy, &event.StartProxyPayload{
+					NewProxyMsg: &newProxyMsg,
+				})
+			}
+			pw.mu.Unlock()
+		} else {
+			pw.mu.Lock()
+			if pw.Status == ProxyStatusRunning || pw.Status == ProxyStatusWaitStart {
+				pw.handler(event.EvCloseProxy, &event.CloseProxyPayload{
+					CloseProxyMsg: &msg.CloseProxy{
+						ProxyName: pw.Name,
+					},
+				})
+				pw.Trace("change status from [%s] to [%s]", pw.Status, ProxyStatusCheckFailed)
+				pw.Status = ProxyStatusCheckFailed
+			}
+			pw.mu.Unlock()
+		}
+
+		select {
+		case <-pw.closeCh:
+			return
+		case <-time.After(statusCheckInterval):
+		case <-pw.healthNotifyCh:
+		}
+	}
+}
+
+func (pw *ProxyWrapper) statusNormalCallback() {
+	atomic.StoreUint32(&pw.health, 0)
+	errors.PanicToError(func() {
+		select {
+		case pw.healthNotifyCh <- struct{}{}:
+		default:
+		}
+	})
+	pw.Info("health check success")
+}
+
+func (pw *ProxyWrapper) statusFailedCallback() {
+	atomic.StoreUint32(&pw.health, 1)
+	errors.PanicToError(func() {
+		select {
+		case pw.healthNotifyCh <- struct{}{}:
+		default:
+		}
+	})
+	pw.Info("health check failed")
+}
+
+func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
+	pw.mu.RLock()
+	pxy := pw.pxy
+	pw.mu.RUnlock()
+	if pxy != nil {
+		workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
+		go pxy.InWorkConn(workConn)
+	} else {
+		workConn.Close()
+	}
+}
+
+func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
+	pw.mu.RLock()
+	defer pw.mu.RUnlock()
+	ps := &ProxyStatus{
+		Name:       pw.Name,
+		Type:       pw.Type,
+		Status:     pw.Status,
+		Err:        pw.Err,
+		Cfg:        pw.Cfg,
+		RemoteAddr: pw.RemoteAddr,
+	}
+	return ps
+}

+ 0 - 311
client/proxy_manager.go

@@ -1,311 +0,0 @@
-package client
-
-import (
-	"fmt"
-	"sync"
-
-	"github.com/fatedier/frp/models/config"
-	"github.com/fatedier/frp/models/msg"
-	"github.com/fatedier/frp/utils/log"
-	frpNet "github.com/fatedier/frp/utils/net"
-
-	"github.com/fatedier/golib/errors"
-)
-
-const (
-	ProxyStatusNew         = "new"
-	ProxyStatusStartErr    = "start error"
-	ProxyStatusWaitStart   = "wait start"
-	ProxyStatusRunning     = "running"
-	ProxyStatusCheckFailed = "check failed"
-	ProxyStatusClosed      = "closed"
-)
-
-type ProxyManager struct {
-	sendCh  chan (msg.Message)
-	proxies map[string]*ProxyWrapper
-	closed  bool
-	mu      sync.RWMutex
-
-	log.Logger
-}
-
-func NewProxyManager(msgSendCh chan (msg.Message), logPrefix string) *ProxyManager {
-	return &ProxyManager{
-		proxies: make(map[string]*ProxyWrapper),
-		sendCh:  msgSendCh,
-		closed:  false,
-		Logger:  log.NewPrefixLogger(logPrefix),
-	}
-}
-
-func (pm *ProxyManager) Reset(msgSendCh chan (msg.Message), logPrefix string) {
-	pm.mu.Lock()
-	defer pm.mu.Unlock()
-	pm.closed = false
-	pm.sendCh = msgSendCh
-	pm.ClearLogPrefix()
-	pm.AddLogPrefix(logPrefix)
-}
-
-// Must hold the lock before calling this function.
-func (pm *ProxyManager) sendMsg(m msg.Message) error {
-	err := errors.PanicToError(func() {
-		pm.sendCh <- m
-	})
-	if err != nil {
-		pm.closed = true
-	}
-	return err
-}
-
-func (pm *ProxyManager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
-	pm.mu.Lock()
-	defer pm.mu.Unlock()
-	if pm.closed {
-		return fmt.Errorf("ProxyManager is closed now")
-	}
-
-	pxy, ok := pm.proxies[name]
-	if !ok {
-		return fmt.Errorf("no proxy found")
-	}
-
-	if err := pxy.Start(remoteAddr, serverRespErr); err != nil {
-		errRet := err
-		err = pm.sendMsg(&msg.CloseProxy{
-			ProxyName: name,
-		})
-		if err != nil {
-			errRet = fmt.Errorf("send CloseProxy message error")
-		}
-		return errRet
-	}
-	return nil
-}
-
-func (pm *ProxyManager) CloseProxies() {
-	pm.mu.RLock()
-	defer pm.mu.RUnlock()
-	for _, pxy := range pm.proxies {
-		pxy.Close()
-	}
-}
-
-// pxyStatus: check and start proxies in which status
-func (pm *ProxyManager) CheckAndStartProxy(pxyStatus []string) {
-	pm.mu.RLock()
-	defer pm.mu.RUnlock()
-	if pm.closed {
-		pm.Warn("CheckAndStartProxy error: ProxyManager is closed now")
-		return
-	}
-
-	for _, pxy := range pm.proxies {
-		status := pxy.GetStatusStr()
-		for _, s := range pxyStatus {
-			if status == s {
-				var newProxyMsg msg.NewProxy
-				pxy.Cfg.MarshalToMsg(&newProxyMsg)
-				err := pm.sendMsg(&newProxyMsg)
-				if err != nil {
-					pm.Warn("[%s] proxy send NewProxy message error")
-					return
-				}
-				pxy.WaitStart()
-				break
-			}
-		}
-	}
-}
-
-func (pm *ProxyManager) Reload(pxyCfgs map[string]config.ProxyConf, startNow bool) error {
-	pm.mu.Lock()
-	defer func() {
-		pm.mu.Unlock()
-		if startNow {
-			go pm.CheckAndStartProxy([]string{ProxyStatusNew})
-		}
-	}()
-	if pm.closed {
-		err := fmt.Errorf("Reload error: ProxyManager is closed now")
-		pm.Warn(err.Error())
-		return err
-	}
-
-	delPxyNames := make([]string, 0)
-	for name, pxy := range pm.proxies {
-		del := false
-		cfg, ok := pxyCfgs[name]
-		if !ok {
-			del = true
-		} else {
-			if !pxy.Cfg.Compare(cfg) {
-				del = true
-			}
-		}
-
-		if del {
-			delPxyNames = append(delPxyNames, name)
-			delete(pm.proxies, name)
-
-			pxy.Close()
-			err := pm.sendMsg(&msg.CloseProxy{
-				ProxyName: name,
-			})
-			if err != nil {
-				err = fmt.Errorf("Reload error: ProxyManager is closed now")
-				pm.Warn(err.Error())
-				return err
-			}
-		}
-	}
-	pm.Info("proxy removed: %v", delPxyNames)
-
-	addPxyNames := make([]string, 0)
-	for name, cfg := range pxyCfgs {
-		if _, ok := pm.proxies[name]; !ok {
-			pxy := NewProxyWrapper(cfg)
-			pm.proxies[name] = pxy
-			addPxyNames = append(addPxyNames, name)
-		}
-	}
-	pm.Info("proxy added: %v", addPxyNames)
-	return nil
-}
-
-func (pm *ProxyManager) HandleWorkConn(name string, workConn frpNet.Conn) {
-	pm.mu.RLock()
-	pw, ok := pm.proxies[name]
-	pm.mu.RUnlock()
-	if ok {
-		pw.InWorkConn(workConn)
-	} else {
-		workConn.Close()
-	}
-}
-
-func (pm *ProxyManager) GetAllProxyStatus() []*ProxyStatus {
-	ps := make([]*ProxyStatus, 0)
-	pm.mu.RLock()
-	defer pm.mu.RUnlock()
-	for _, pxy := range pm.proxies {
-		ps = append(ps, pxy.GetStatus())
-	}
-	return ps
-}
-
-type ProxyStatus struct {
-	Name   string           `json:"name"`
-	Type   string           `json:"type"`
-	Status string           `json:"status"`
-	Err    string           `json:"err"`
-	Cfg    config.ProxyConf `json:"cfg"`
-
-	// Got from server.
-	RemoteAddr string `json:"remote_addr"`
-}
-
-// ProxyWrapper is a wrapper of Proxy interface only used in ProxyManager
-// Add additional proxy status info
-type ProxyWrapper struct {
-	Name   string
-	Type   string
-	Status string
-	Err    string
-	Cfg    config.ProxyConf
-
-	RemoteAddr string
-
-	pxy Proxy
-
-	mu sync.RWMutex
-}
-
-func NewProxyWrapper(cfg config.ProxyConf) *ProxyWrapper {
-	return &ProxyWrapper{
-		Name:   cfg.GetBaseInfo().ProxyName,
-		Type:   cfg.GetBaseInfo().ProxyType,
-		Status: ProxyStatusNew,
-		Cfg:    cfg,
-		pxy:    nil,
-	}
-}
-
-func (pw *ProxyWrapper) GetStatusStr() string {
-	pw.mu.RLock()
-	defer pw.mu.RUnlock()
-	return pw.Status
-}
-
-func (pw *ProxyWrapper) GetStatus() *ProxyStatus {
-	pw.mu.RLock()
-	defer pw.mu.RUnlock()
-	ps := &ProxyStatus{
-		Name:       pw.Name,
-		Type:       pw.Type,
-		Status:     pw.Status,
-		Err:        pw.Err,
-		Cfg:        pw.Cfg,
-		RemoteAddr: pw.RemoteAddr,
-	}
-	return ps
-}
-
-func (pw *ProxyWrapper) WaitStart() {
-	pw.mu.Lock()
-	defer pw.mu.Unlock()
-	pw.Status = ProxyStatusWaitStart
-}
-
-func (pw *ProxyWrapper) Start(remoteAddr string, serverRespErr string) error {
-	if pw.pxy != nil {
-		pw.pxy.Close()
-		pw.pxy = nil
-	}
-
-	if serverRespErr != "" {
-		pw.mu.Lock()
-		pw.Status = ProxyStatusStartErr
-		pw.RemoteAddr = remoteAddr
-		pw.Err = serverRespErr
-		pw.mu.Unlock()
-		return fmt.Errorf(serverRespErr)
-	}
-
-	pxy := NewProxy(pw.Cfg)
-	pw.mu.Lock()
-	defer pw.mu.Unlock()
-	pw.RemoteAddr = remoteAddr
-	if err := pxy.Run(); err != nil {
-		pw.Status = ProxyStatusStartErr
-		pw.Err = err.Error()
-		return err
-	}
-	pw.Status = ProxyStatusRunning
-	pw.Err = ""
-	pw.pxy = pxy
-	return nil
-}
-
-func (pw *ProxyWrapper) InWorkConn(workConn frpNet.Conn) {
-	pw.mu.RLock()
-	pxy := pw.pxy
-	pw.mu.RUnlock()
-	if pxy != nil {
-		workConn.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
-		go pxy.InWorkConn(workConn)
-	} else {
-		workConn.Close()
-	}
-}
-
-func (pw *ProxyWrapper) Close() {
-	pw.mu.Lock()
-	defer pw.mu.Unlock()
-	if pw.pxy != nil {
-		pw.pxy.Close()
-		pw.pxy = nil
-	}
-	pw.Status = ProxyStatusClosed
-}

+ 14 - 2
client/visitor_manager.go

@@ -96,7 +96,9 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) {
 			delete(vm.visitors, name)
 		}
 	}
-	log.Info("visitor removed: %v", delNames)
+	if len(delNames) > 0 {
+		log.Info("visitor removed: %v", delNames)
+	}
 
 	addNames := make([]string, 0)
 	for name, cfg := range cfgs {
@@ -106,6 +108,16 @@ func (vm *VisitorManager) Reload(cfgs map[string]config.VisitorConf) {
 			vm.startVisitor(cfg)
 		}
 	}
-	log.Info("visitor added: %v", addNames)
+	if len(addNames) > 0 {
+		log.Info("visitor added: %v", addNames)
+	}
 	return
 }
+
+func (vm *VisitorManager) Close() {
+	vm.mu.Lock()
+	defer vm.mu.Unlock()
+	for _, v := range vm.visitors {
+		v.Close()
+	}
+}

+ 5 - 0
cmd/frpc/sub/root.go

@@ -166,6 +166,11 @@ func parseClientCommonCfgFromCmd() (err error) {
 	g.GlbClientCfg.LogLevel = logLevel
 	g.GlbClientCfg.LogFile = logFile
 	g.GlbClientCfg.LogMaxDays = int64(logMaxDays)
+	if logFile == "console" {
+		g.GlbClientCfg.LogWay = "console"
+	} else {
+		g.GlbClientCfg.LogWay = "file"
+	}
 	return nil
 }
 

+ 6 - 3
cmd/frps/root.go

@@ -52,7 +52,6 @@ var (
 	dashboardPwd      string
 	assetsDir         string
 	logFile           string
-	logWay            string
 	logLevel          string
 	logMaxDays        int64
 	token             string
@@ -81,7 +80,6 @@ func init() {
 	rootCmd.PersistentFlags().StringVarP(&dashboardUser, "dashboard_user", "", "admin", "dashboard user")
 	rootCmd.PersistentFlags().StringVarP(&dashboardPwd, "dashboard_pwd", "", "admin", "dashboard password")
 	rootCmd.PersistentFlags().StringVarP(&logFile, "log_file", "", "console", "log file")
-	rootCmd.PersistentFlags().StringVarP(&logWay, "log_way", "", "console", "log way")
 	rootCmd.PersistentFlags().StringVarP(&logLevel, "log_level", "", "info", "log level")
 	rootCmd.PersistentFlags().Int64VarP(&logMaxDays, "log_max_days", "", 3, "log_max_days")
 	rootCmd.PersistentFlags().StringVarP(&token, "token", "t", "", "auth token")
@@ -175,7 +173,6 @@ func parseServerCommonCfgFromCmd() (err error) {
 	g.GlbServerCfg.DashboardUser = dashboardUser
 	g.GlbServerCfg.DashboardPwd = dashboardPwd
 	g.GlbServerCfg.LogFile = logFile
-	g.GlbServerCfg.LogWay = logWay
 	g.GlbServerCfg.LogLevel = logLevel
 	g.GlbServerCfg.LogMaxDays = logMaxDays
 	g.GlbServerCfg.Token = token
@@ -194,6 +191,12 @@ func parseServerCommonCfgFromCmd() (err error) {
 		}
 	}
 	g.GlbServerCfg.MaxPortsPerClient = maxPortsPerClient
+
+	if logFile == "console" {
+		g.GlbClientCfg.LogWay = "console"
+	} else {
+		g.GlbClientCfg.LogWay = "file"
+	}
 	return
 }
 

+ 2 - 0
conf/frpc_full.ini

@@ -77,6 +77,8 @@ group_key = 123456
 # frpc will connect local service's port to detect it's healthy status
 health_check_type = tcp
 health_check_interval_s = 10
+health_check_max_failed = 1
+health_check_timeout_s = 3
 
 [ssh_random]
 type = tcp

+ 28 - 6
models/config/proxy.go

@@ -170,6 +170,17 @@ func (cfg *BaseProxyConf) UnmarshalFromIni(prefix string, name string, section i
 	if err := cfg.HealthCheckConf.UnmarshalFromIni(prefix, name, section); err != nil {
 		return err
 	}
+
+	if cfg.HealthCheckType == "tcp" && cfg.Plugin == "" {
+		cfg.HealthCheckAddr = cfg.LocalIp + fmt.Sprintf(":%d", cfg.LocalPort)
+	}
+	if cfg.HealthCheckType == "http" && cfg.Plugin == "" && cfg.HealthCheckUrl != "" {
+		s := fmt.Sprintf("http://%s:%d", cfg.LocalIp, cfg.LocalPort)
+		if !strings.HasPrefix(cfg.HealthCheckUrl, "/") {
+			s += "/"
+		}
+		cfg.HealthCheckUrl = s + cfg.HealthCheckUrl
+	}
 	return nil
 }
 
@@ -381,7 +392,7 @@ func (cfg *LocalSvrConf) checkForCli() (err error) {
 // Health check info
 type HealthCheckConf struct {
 	HealthCheckType      string `json:"health_check_type"` // tcp | http
-	HealthCheckTimeout   int    `json:"health_check_timeout"`
+	HealthCheckTimeoutS  int    `json:"health_check_timeout_s"`
 	HealthCheckMaxFailed int    `json:"health_check_max_failed"`
 	HealthCheckIntervalS int    `json:"health_check_interval_s"`
 	HealthCheckUrl       string `json:"health_check_url"`
@@ -392,8 +403,10 @@ type HealthCheckConf struct {
 
 func (cfg *HealthCheckConf) compare(cmp *HealthCheckConf) bool {
 	if cfg.HealthCheckType != cmp.HealthCheckType ||
-		cfg.HealthCheckUrl != cmp.HealthCheckUrl ||
-		cfg.HealthCheckIntervalS != cmp.HealthCheckIntervalS {
+		cfg.HealthCheckTimeoutS != cmp.HealthCheckTimeoutS ||
+		cfg.HealthCheckMaxFailed != cmp.HealthCheckMaxFailed ||
+		cfg.HealthCheckIntervalS != cmp.HealthCheckIntervalS ||
+		cfg.HealthCheckUrl != cmp.HealthCheckUrl {
 		return false
 	}
 	return true
@@ -403,6 +416,18 @@ func (cfg *HealthCheckConf) UnmarshalFromIni(prefix string, name string, section
 	cfg.HealthCheckType = section["health_check_type"]
 	cfg.HealthCheckUrl = section["health_check_url"]
 
+	if tmpStr, ok := section["health_check_timeout_s"]; ok {
+		if cfg.HealthCheckTimeoutS, err = strconv.Atoi(tmpStr); err != nil {
+			return fmt.Errorf("Parse conf error: proxy [%s] health_check_timeout_s error", name)
+		}
+	}
+
+	if tmpStr, ok := section["health_check_max_failed"]; ok {
+		if cfg.HealthCheckMaxFailed, err = strconv.Atoi(tmpStr); err != nil {
+			return fmt.Errorf("Parse conf error: proxy [%s] health_check_max_failed error", name)
+		}
+	}
+
 	if tmpStr, ok := section["health_check_interval_s"]; ok {
 		if cfg.HealthCheckIntervalS, err = strconv.Atoi(tmpStr); err != nil {
 			return fmt.Errorf("Parse conf error: proxy [%s] health_check_interval_s error", name)
@@ -419,9 +444,6 @@ func (cfg *HealthCheckConf) checkForCli() error {
 		if cfg.HealthCheckType == "http" && cfg.HealthCheckUrl == "" {
 			return fmt.Errorf("health_check_url is required for health check type 'http'")
 		}
-		if cfg.HealthCheckIntervalS <= 0 {
-			return fmt.Errorf("health_check_interval_s is required and should greater than 0")
-		}
 	}
 	return nil
 }

+ 1 - 0
server/group/group.go

@@ -22,4 +22,5 @@ var (
 	ErrGroupAuthFailed    = errors.New("group auth failed")
 	ErrGroupParamsInvalid = errors.New("group params invalid")
 	ErrListenerClosed     = errors.New("group listener closed")
+	ErrGroupDifferentPort = errors.New("group should have same remote port")
 )

+ 5 - 1
server/group/tcp.go

@@ -114,10 +114,14 @@ func (tg *TcpGroup) Listen(proxyName string, group string, groupKey string, addr
 		}
 		go tg.worker()
 	} else {
-		if tg.group != group || tg.addr != addr || tg.port != port {
+		if tg.group != group || tg.addr != addr {
 			err = ErrGroupParamsInvalid
 			return
 		}
+		if tg.port != port {
+			err = ErrGroupDifferentPort
+			return
+		}
 		if tg.groupKey != groupKey {
 			err = ErrGroupAuthFailed
 			return

+ 247 - 0
tests/ci/health/health_test.go

@@ -0,0 +1,247 @@
+package health
+
+import (
+	"net/http"
+	"os"
+	"strings"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/fatedier/frp/tests/config"
+	"github.com/fatedier/frp/tests/consts"
+	"github.com/fatedier/frp/tests/mock"
+	"github.com/fatedier/frp/tests/util"
+
+	"github.com/stretchr/testify/assert"
+)
+
+const FRPS_CONF = `
+[common]
+bind_addr = 0.0.0.0
+bind_port = 14000
+vhost_http_port = 14000
+log_file = console
+log_level = debug
+token = 123456
+`
+
+const FRPC_CONF = `
+[common]
+server_addr = 127.0.0.1
+server_port = 14000
+log_file = console
+log_level = debug
+token = 123456
+
+[tcp1]
+type = tcp
+local_port = 15001
+remote_port = 15000
+group = test
+group_key = 123
+health_check_type = tcp
+health_check_interval_s = 1
+
+[tcp2]
+type = tcp
+local_port = 15002
+remote_port = 15000
+group = test
+group_key = 123
+health_check_type = tcp
+health_check_interval_s = 1
+
+[http1]
+type = http
+local_port = 15003
+custom_domains = test1.com
+health_check_type = http
+health_check_interval_s = 1
+health_check_url = /health
+
+[http2]
+type = http
+local_port = 15004
+custom_domains = test2.com
+health_check_type = http
+health_check_interval_s = 1
+health_check_url = /health
+`
+
+func TestHealthCheck(t *testing.T) {
+	assert := assert.New(t)
+
+	// ****** start backgroud services ******
+	echoSvc1 := mock.NewEchoServer(15001, 1, "echo1")
+	err := echoSvc1.Start()
+	if assert.NoError(err) {
+		defer echoSvc1.Stop()
+	}
+
+	echoSvc2 := mock.NewEchoServer(15002, 1, "echo2")
+	err = echoSvc2.Start()
+	if assert.NoError(err) {
+		defer echoSvc2.Stop()
+	}
+
+	var healthMu sync.RWMutex
+	svc1Health := true
+	svc2Health := true
+	httpSvc1 := mock.NewHttpServer(15003, func(w http.ResponseWriter, r *http.Request) {
+		if strings.Contains(r.URL.Path, "health") {
+			healthMu.RLock()
+			defer healthMu.RUnlock()
+			if svc1Health {
+				w.WriteHeader(200)
+			} else {
+				w.WriteHeader(500)
+			}
+		} else {
+			w.Write([]byte("http1"))
+		}
+	})
+	err = httpSvc1.Start()
+	if assert.NoError(err) {
+		defer httpSvc1.Stop()
+	}
+
+	httpSvc2 := mock.NewHttpServer(15004, func(w http.ResponseWriter, r *http.Request) {
+		if strings.Contains(r.URL.Path, "health") {
+			healthMu.RLock()
+			defer healthMu.RUnlock()
+			if svc2Health {
+				w.WriteHeader(200)
+			} else {
+				w.WriteHeader(500)
+			}
+		} else {
+			w.Write([]byte("http2"))
+		}
+	})
+	err = httpSvc2.Start()
+	if assert.NoError(err) {
+		defer httpSvc2.Stop()
+	}
+
+	time.Sleep(200 * time.Millisecond)
+
+	// ****** start frps and frpc ******
+	frpsCfgPath, err := config.GenerateConfigFile(consts.FRPS_NORMAL_CONFIG, FRPS_CONF)
+	if assert.NoError(err) {
+		defer os.Remove(frpsCfgPath)
+	}
+
+	frpcCfgPath, err := config.GenerateConfigFile(consts.FRPC_NORMAL_CONFIG, FRPC_CONF)
+	if assert.NoError(err) {
+		defer os.Remove(frpcCfgPath)
+	}
+
+	frpsProcess := util.NewProcess(consts.FRPS_SUB_BIN_PATH, []string{"-c", frpsCfgPath})
+	err = frpsProcess.Start()
+	if assert.NoError(err) {
+		defer frpsProcess.Stop()
+	}
+
+	time.Sleep(100 * time.Millisecond)
+
+	frpcProcess := util.NewProcess(consts.FRPC_SUB_BIN_PATH, []string{"-c", frpcCfgPath})
+	err = frpcProcess.Start()
+	if assert.NoError(err) {
+		defer frpcProcess.Stop()
+	}
+	time.Sleep(1000 * time.Millisecond)
+
+	// ****** healcheck type tcp ******
+	// echo1 and echo2 is ok
+	result := make([]string, 0)
+	res, err := util.SendTcpMsg("127.0.0.1:15000", "echo")
+	assert.NoError(err)
+	result = append(result, res)
+
+	res, err = util.SendTcpMsg("127.0.0.1:15000", "echo")
+	assert.NoError(err)
+	result = append(result, res)
+
+	assert.Contains(result, "echo1")
+	assert.Contains(result, "echo2")
+
+	// close echo2 server, echo1 is work
+	echoSvc2.Stop()
+	time.Sleep(1200 * time.Millisecond)
+
+	result = make([]string, 0)
+	res, err = util.SendTcpMsg("127.0.0.1:15000", "echo")
+	assert.NoError(err)
+	result = append(result, res)
+
+	res, err = util.SendTcpMsg("127.0.0.1:15000", "echo")
+	assert.NoError(err)
+	result = append(result, res)
+
+	assert.NotContains(result, "echo2")
+
+	// resume echo2 server, all services are ok
+	echoSvc2 = mock.NewEchoServer(15002, 1, "echo2")
+	err = echoSvc2.Start()
+	if assert.NoError(err) {
+		defer echoSvc2.Stop()
+	}
+
+	time.Sleep(1200 * time.Millisecond)
+
+	result = make([]string, 0)
+	res, err = util.SendTcpMsg("127.0.0.1:15000", "echo")
+	assert.NoError(err)
+	result = append(result, res)
+
+	res, err = util.SendTcpMsg("127.0.0.1:15000", "echo")
+	assert.NoError(err)
+	result = append(result, res)
+
+	assert.Contains(result, "echo1")
+	assert.Contains(result, "echo2")
+
+	// ****** healcheck type http ******
+	// http1 and http2 is ok
+	code, body, _, err := util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test1.com", nil, "")
+	assert.NoError(err)
+	assert.Equal(200, code)
+	assert.Equal("http1", body)
+
+	code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test2.com", nil, "")
+	assert.NoError(err)
+	assert.Equal(200, code)
+	assert.Equal("http2", body)
+
+	// http2 health check error
+	healthMu.Lock()
+	svc2Health = false
+	healthMu.Unlock()
+	time.Sleep(1200 * time.Millisecond)
+
+	code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test1.com", nil, "")
+	assert.NoError(err)
+	assert.Equal(200, code)
+	assert.Equal("http1", body)
+
+	code, _, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test2.com", nil, "")
+	assert.NoError(err)
+	assert.Equal(404, code)
+
+	// resume http2 service, http1 and http2 are ok
+	healthMu.Lock()
+	svc2Health = true
+	healthMu.Unlock()
+	time.Sleep(1200 * time.Millisecond)
+
+	code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test1.com", nil, "")
+	assert.NoError(err)
+	assert.Equal(200, code)
+	assert.Equal("http1", body)
+
+	code, body, _, err = util.SendHttpMsg("GET", "http://127.0.0.1:14000/xxx", "test2.com", nil, "")
+	assert.NoError(err)
+	assert.Equal(200, code)
+	assert.Equal("http2", body)
+}

+ 19 - 11
tests/ci/normal_test.go

@@ -12,7 +12,7 @@ import (
 	"github.com/gorilla/websocket"
 	"github.com/stretchr/testify/assert"
 
-	"github.com/fatedier/frp/client"
+	"github.com/fatedier/frp/client/proxy"
 	"github.com/fatedier/frp/server/ports"
 	"github.com/fatedier/frp/tests/consts"
 	"github.com/fatedier/frp/tests/mock"
@@ -22,13 +22,21 @@ import (
 )
 
 func TestMain(m *testing.M) {
-	go mock.StartTcpEchoServer(consts.TEST_TCP_PORT)
-	go mock.StartTcpEchoServer2(consts.TEST_TCP2_PORT)
+	var err error
+	tcpEcho1 := mock.NewEchoServer(consts.TEST_TCP_PORT, 1, "")
+	tcpEcho2 := mock.NewEchoServer(consts.TEST_TCP2_PORT, 2, "")
+
+	if err = tcpEcho1.Start(); err != nil {
+		panic(err)
+	}
+	if err = tcpEcho2.Start(); err != nil {
+		panic(err)
+	}
+
 	go mock.StartUdpEchoServer(consts.TEST_UDP_PORT)
 	go mock.StartUnixDomainServer(consts.TEST_UNIX_DOMAIN_ADDR)
 	go mock.StartHttpServer(consts.TEST_HTTP_PORT)
 
-	var err error
 	p1 := util.NewProcess(consts.FRPS_BIN_PATH, []string{"-c", "./auto_test_frps.ini"})
 	if err = p1.Start(); err != nil {
 		panic(err)
@@ -210,31 +218,31 @@ func TestAllowPorts(t *testing.T) {
 	// Port not allowed
 	status, err := util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyTcpPortNotAllowed)
 	if assert.NoError(err) {
-		assert.Equal(client.ProxyStatusStartErr, status.Status)
+		assert.Equal(proxy.ProxyStatusStartErr, status.Status)
 		assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error()))
 	}
 
 	status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyUdpPortNotAllowed)
 	if assert.NoError(err) {
-		assert.Equal(client.ProxyStatusStartErr, status.Status)
+		assert.Equal(proxy.ProxyStatusStartErr, status.Status)
 		assert.True(strings.Contains(status.Err, ports.ErrPortNotAllowed.Error()))
 	}
 
 	status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyTcpPortUnavailable)
 	if assert.NoError(err) {
-		assert.Equal(client.ProxyStatusStartErr, status.Status)
+		assert.Equal(proxy.ProxyStatusStartErr, status.Status)
 		assert.True(strings.Contains(status.Err, ports.ErrPortUnAvailable.Error()))
 	}
 
 	// Port normal
 	status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyTcpPortNormal)
 	if assert.NoError(err) {
-		assert.Equal(client.ProxyStatusRunning, status.Status)
+		assert.Equal(proxy.ProxyStatusRunning, status.Status)
 	}
 
 	status, err = util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyUdpPortNormal)
 	if assert.NoError(err) {
-		assert.Equal(client.ProxyStatusRunning, status.Status)
+		assert.Equal(proxy.ProxyStatusRunning, status.Status)
 	}
 }
 
@@ -263,7 +271,7 @@ func TestPluginHttpProxy(t *testing.T) {
 	assert := assert.New(t)
 	status, err := util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, consts.ProxyHttpProxy)
 	if assert.NoError(err) {
-		assert.Equal(client.ProxyStatusRunning, status.Status)
+		assert.Equal(proxy.ProxyStatusRunning, status.Status)
 
 		// http proxy
 		addr := status.RemoteAddr
@@ -291,7 +299,7 @@ func TestRangePortsMapping(t *testing.T) {
 		name := fmt.Sprintf("%s_%d", consts.ProxyRangeTcpPrefix, i)
 		status, err := util.GetProxyStatus(consts.ADMIN_ADDR, consts.ADMIN_USER, consts.ADMIN_PWD, name)
 		if assert.NoError(err) {
-			assert.Equal(client.ProxyStatusRunning, status.Status)
+			assert.Equal(proxy.ProxyStatusRunning, status.Status)
 		}
 	}
 }

+ 0 - 2
tests/ci/reconnect_test.go

@@ -17,7 +17,6 @@ const FRPS_RECONNECT_CONF = `
 bind_addr = 0.0.0.0
 bind_port = 20000
 log_file = console
-# debug, info, warn, error
 log_level = debug
 token = 123456
 `
@@ -27,7 +26,6 @@ const FRPC_RECONNECT_CONF = `
 server_addr = 127.0.0.1
 server_port = 20000
 log_file = console
-# debug, info, warn, error
 log_level = debug
 token = 123456
 admin_port = 21000

+ 6 - 2
tests/ci/reload_test.go

@@ -84,7 +84,8 @@ func TestReload(t *testing.T) {
 
 	frpcCfgPath, err := config.GenerateConfigFile(consts.FRPC_NORMAL_CONFIG, FRPC_RELOAD_CONF_1)
 	if assert.NoError(err) {
-		defer os.Remove(frpcCfgPath)
+		rmFile1 := frpcCfgPath
+		defer os.Remove(rmFile1)
 	}
 
 	frpsProcess := util.NewProcess(consts.FRPS_BIN_PATH, []string{"-c", frpsCfgPath})
@@ -120,7 +121,10 @@ func TestReload(t *testing.T) {
 
 	// reload frpc config
 	frpcCfgPath, err = config.GenerateConfigFile(consts.FRPC_NORMAL_CONFIG, FRPC_RELOAD_CONF_2)
-	assert.NoError(err)
+	if assert.NoError(err) {
+		rmFile2 := frpcCfgPath
+		defer os.Remove(rmFile2)
+	}
 	err = util.ReloadConf("127.0.0.1:21000", "abc", "abc")
 	assert.NoError(err)
 

+ 3 - 0
tests/consts/consts.go

@@ -6,6 +6,9 @@ var (
 	FRPS_BIN_PATH = "../../bin/frps"
 	FRPC_BIN_PATH = "../../bin/frpc"
 
+	FRPS_SUB_BIN_PATH = "../../../bin/frps"
+	FRPC_SUB_BIN_PATH = "../../../bin/frpc"
+
 	FRPS_NORMAL_CONFIG = "./auto_test_frps.ini"
 	FRPC_NORMAL_CONFIG = "./auto_test_frpc.ini"
 

+ 42 - 48
tests/mock/echo_server.go

@@ -10,40 +10,48 @@ import (
 	frpNet "github.com/fatedier/frp/utils/net"
 )
 
-func StartTcpEchoServer(port int) {
-	l, err := frpNet.ListenTcp("127.0.0.1", port)
-	if err != nil {
-		fmt.Printf("echo server listen error: %v\n", err)
-		return
-	}
+type EchoServer struct {
+	l frpNet.Listener
 
-	for {
-		c, err := l.Accept()
-		if err != nil {
-			fmt.Printf("echo server accept error: %v\n", err)
-			return
-		}
+	port        int
+	repeatedNum int
+	specifyStr  string
+}
 
-		go echoWorker(c)
+func NewEchoServer(port int, repeatedNum int, specifyStr string) *EchoServer {
+	if repeatedNum <= 0 {
+		repeatedNum = 1
+	}
+	return &EchoServer{
+		port:        port,
+		repeatedNum: repeatedNum,
+		specifyStr:  specifyStr,
 	}
 }
 
-func StartTcpEchoServer2(port int) {
-	l, err := frpNet.ListenTcp("127.0.0.1", port)
+func (es *EchoServer) Start() error {
+	l, err := frpNet.ListenTcp("127.0.0.1", es.port)
 	if err != nil {
-		fmt.Printf("echo server2 listen error: %v\n", err)
-		return
+		fmt.Printf("echo server listen error: %v\n", err)
+		return err
 	}
+	es.l = l
 
-	for {
-		c, err := l.Accept()
-		if err != nil {
-			fmt.Printf("echo server2 accept error: %v\n", err)
-			return
+	go func() {
+		for {
+			c, err := l.Accept()
+			if err != nil {
+				return
+			}
+
+			go echoWorker(c, es.repeatedNum, es.specifyStr)
 		}
+	}()
+	return nil
+}
 
-		go echoWorker2(c)
-	}
+func (es *EchoServer) Stop() {
+	es.l.Close()
 }
 
 func StartUdpEchoServer(port int) {
@@ -60,7 +68,7 @@ func StartUdpEchoServer(port int) {
 			return
 		}
 
-		go echoWorker(c)
+		go echoWorker(c, 1, "")
 	}
 }
 
@@ -80,11 +88,11 @@ func StartUnixDomainServer(unixPath string) {
 			return
 		}
 
-		go echoWorker(c)
+		go echoWorker(c, 1, "")
 	}
 }
 
-func echoWorker(c net.Conn) {
+func echoWorker(c net.Conn, repeatedNum int, specifyStr string) {
 	buf := make([]byte, 2048)
 
 	for {
@@ -99,28 +107,14 @@ func echoWorker(c net.Conn) {
 			}
 		}
 
-		c.Write(buf[:n])
-	}
-}
-
-func echoWorker2(c net.Conn) {
-	buf := make([]byte, 2048)
-
-	for {
-		n, err := c.Read(buf)
-		if err != nil {
-			if err == io.EOF {
-				c.Close()
-				break
-			} else {
-				fmt.Printf("echo server read error: %v\n", err)
-				return
+		if specifyStr != "" {
+			c.Write([]byte(specifyStr))
+		} else {
+			var w []byte
+			for i := 0; i < repeatedNum; i++ {
+				w = append(w, buf[:n]...)
 			}
+			c.Write(w)
 		}
-
-		var w []byte
-		w = append(w, buf[:n]...)
-		w = append(w, buf[:n]...)
-		c.Write(w)
 	}
 }

+ 31 - 0
tests/mock/http_server.go

@@ -3,6 +3,7 @@ package mock
 import (
 	"fmt"
 	"log"
+	"net"
 	"net/http"
 	"regexp"
 	"strings"
@@ -12,6 +13,36 @@ import (
 	"github.com/gorilla/websocket"
 )
 
+type HttpServer struct {
+	l net.Listener
+
+	port    int
+	handler http.HandlerFunc
+}
+
+func NewHttpServer(port int, handler http.HandlerFunc) *HttpServer {
+	return &HttpServer{
+		port:    port,
+		handler: handler,
+	}
+}
+
+func (hs *HttpServer) Start() error {
+	l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", hs.port))
+	if err != nil {
+		fmt.Printf("http server listen error: %v\n", err)
+		return err
+	}
+	hs.l = l
+
+	go http.Serve(l, http.HandlerFunc(hs.handler))
+	return nil
+}
+
+func (hs *HttpServer) Stop() {
+	hs.l.Close()
+}
+
 var upgrader = websocket.Upgrader{}
 
 func StartHttpServer(port int) {

+ 21 - 3
tests/util/process.go

@@ -1,22 +1,29 @@
 package util
 
 import (
+	"bytes"
 	"context"
 	"os/exec"
 )
 
 type Process struct {
-	cmd    *exec.Cmd
-	cancel context.CancelFunc
+	cmd         *exec.Cmd
+	cancel      context.CancelFunc
+	errorOutput *bytes.Buffer
+
+	beforeStopHandler func()
 }
 
 func NewProcess(path string, params []string) *Process {
 	ctx, cancel := context.WithCancel(context.Background())
 	cmd := exec.CommandContext(ctx, path, params...)
-	return &Process{
+	p := &Process{
 		cmd:    cmd,
 		cancel: cancel,
 	}
+	p.errorOutput = bytes.NewBufferString("")
+	cmd.Stderr = p.errorOutput
+	return p
 }
 
 func (p *Process) Start() error {
@@ -24,6 +31,17 @@ func (p *Process) Start() error {
 }
 
 func (p *Process) Stop() error {
+	if p.beforeStopHandler != nil {
+		p.beforeStopHandler()
+	}
 	p.cancel()
 	return p.cmd.Wait()
 }
+
+func (p *Process) ErrorOutput() string {
+	return p.errorOutput.String()
+}
+
+func (p *Process) SetBeforeStopHandler(fn func()) {
+	p.beforeStopHandler = fn
+}

+ 1 - 1
utils/version/version.go

@@ -19,7 +19,7 @@ import (
 	"strings"
 )
 
-var version string = "0.21.0"
+var version string = "0.22.0"
 
 func Full() string {
 	return version