Browse Source

add more log info

fatedier 7 years ago
parent
commit
71f7caa1ee
8 changed files with 122 additions and 47 deletions
  1. 8 4
      client/control.go
  2. 29 10
      client/proxy.go
  3. 1 1
      conf/frpc.ini
  4. 1 1
      conf/frps.ini
  5. 15 15
      models/proto/udp/udp.go
  6. 1 0
      server/control.go
  7. 56 16
      server/proxy.go
  8. 11 0
      utils/log/log.go

+ 8 - 4
client/control.go

@@ -144,8 +144,8 @@ func (ctl *Control) NewWorkConn() {
 
 	// dispatch this work connection to related proxy
 	if pxy, ok := ctl.proxies[startMsg.ProxyName]; ok {
-		go pxy.InWorkConn(workConn)
 		workConn.Info("start a new work connection")
+		go pxy.InWorkConn(workConn)
 	} else {
 		workConn.Close()
 	}
@@ -288,7 +288,7 @@ func (ctl *Control) manager() {
 				}
 				cfg, ok := ctl.pxyCfgs[m.ProxyName]
 				if !ok {
-					// it will never go to this branch
+					// it will never go to this branch now
 					ctl.Warn("[%s] no proxy conf found", m.ProxyName)
 					continue
 				}
@@ -317,12 +317,12 @@ func (ctl *Control) controler() {
 	maxDelayTime := 30 * time.Second
 	delayTime := time.Second
 
-	checkInterval := 60 * time.Second
+	checkInterval := 30 * time.Second
 	checkProxyTicker := time.NewTicker(checkInterval)
 	for {
 		select {
 		case <-checkProxyTicker.C:
-			// Every 60 seconds, check which proxy registered failed and reregister it to server.
+			// Every 30 seconds, check which proxy registered failed and reregister it to server.
 			for _, cfg := range ctl.pxyCfgs {
 				if _, exist := ctl.proxies[cfg.GetName()]; !exist {
 					ctl.Info("try to reregister proxy [%s]", cfg.GetName())
@@ -337,6 +337,10 @@ func (ctl *Control) controler() {
 				// close related channels
 				close(ctl.readCh)
 				close(ctl.sendCh)
+
+				for _, pxy := range ctl.proxies {
+					pxy.Close()
+				}
 				time.Sleep(time.Second)
 
 				// loop util reconnect to server success

+ 29 - 10
client/proxy.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"io"
 	"net"
+	"sync"
 
 	"github.com/fatedier/frp/models/config"
 	"github.com/fatedier/frp/models/msg"
@@ -69,7 +70,9 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy) {
 }
 
 type BaseProxy struct {
-	ctl *Control
+	ctl    *Control
+	closed bool
+	mu     sync.RWMutex
 	log.Logger
 }
 
@@ -151,20 +154,34 @@ func (pxy *UdpProxy) Run() (err error) {
 }
 
 func (pxy *UdpProxy) Close() {
-	pxy.workConn.Close()
-	close(pxy.readCh)
-	close(pxy.sendCh)
+	pxy.mu.Lock()
+	defer pxy.mu.Unlock()
+
+	if !pxy.closed {
+		pxy.closed = true
+		if pxy.workConn != nil {
+			pxy.workConn.Close()
+		}
+		if pxy.readCh != nil {
+			close(pxy.readCh)
+		}
+		if pxy.sendCh != nil {
+			close(pxy.sendCh)
+		}
+	}
 }
 
 func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) {
-	if pxy.workConn != nil {
-		pxy.workConn.Close()
-		close(pxy.readCh)
-		close(pxy.sendCh)
-	}
+	pxy.Info("incoming a new work connection for udp proxy")
+	// close resources releated with old workConn
+	pxy.Close()
+
+	pxy.mu.Lock()
 	pxy.workConn = conn
 	pxy.readCh = make(chan *msg.UdpPacket, 64)
 	pxy.sendCh = make(chan *msg.UdpPacket, 64)
+	pxy.closed = false
+	pxy.mu.Unlock()
 
 	workConnReaderFn := func(conn net.Conn) {
 		for {
@@ -174,9 +191,10 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) {
 				return
 			}
 			if errRet := errors.PanicToError(func() {
+				pxy.Trace("get udp package from workConn: %s", udpMsg.Content)
 				pxy.readCh <- &udpMsg
 			}); errRet != nil {
-				pxy.Info("reader goroutine for udp work connection closed")
+				pxy.Info("reader goroutine for udp work connection closed: %v", errRet)
 				return
 			}
 		}
@@ -184,6 +202,7 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) {
 	workConnSenderFn := func(conn net.Conn) {
 		var errRet error
 		for udpMsg := range pxy.sendCh {
+			pxy.Trace("send udp package to workConn: %s", udpMsg.Content)
 			if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
 				pxy.Info("sender goroutine for udp work connection closed")
 				return

+ 1 - 1
conf/frpc.ini

@@ -11,7 +11,7 @@ server_port = 7000
 # console or real logFile path like ./frpc.log
 log_file = ./frpc.log
 
-# debug, info, warn, error
+# trace, debug, info, warn, error
 log_level = info
 
 log_max_days = 3

+ 1 - 1
conf/frps.ini

@@ -21,7 +21,7 @@ dashboard_pwd = admin
 # console or real logFile path like ./frps.log
 log_file = ./frps.log
 
-# debug, info, warn, error
+# trace, debug, info, warn, error
 log_level = info
 
 log_max_days = 3

+ 15 - 15
models/proto/udp/udp.go

@@ -51,22 +51,22 @@ func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UdpPacket, sendCh
 	}()
 
 	// write
-	go func() {
-		buf := pool.GetBuf(1500)
-		defer pool.PutBuf(buf)
-		for {
-			n, remoteAddr, err := udpConn.ReadFromUDP(buf)
-			if err != nil {
-				udpConn.Close()
-				return
-			}
-			udpMsg := NewUdpPacket(buf[:n], nil, remoteAddr)
-			select {
-			case sendCh <- udpMsg:
-			default:
-			}
+	buf := pool.GetBuf(1500)
+	defer pool.PutBuf(buf)
+	for {
+		n, remoteAddr, err := udpConn.ReadFromUDP(buf)
+		if err != nil {
+			udpConn.Close()
+			return
 		}
-	}()
+		// buf[:n] will be encoded to string, so the bytes can be reused
+		udpMsg := NewUdpPacket(buf[:n], nil, remoteAddr)
+		select {
+		case sendCh <- udpMsg:
+		default:
+		}
+	}
+	return
 }
 
 func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UdpPacket, sendCh chan<- *msg.UdpPacket) {

+ 1 - 0
server/control.go

@@ -258,6 +258,7 @@ func (ctl *Control) stoper() {
 	ctl.writerShutdown.WaitDown()
 
 	ctl.conn.Close()
+	ctl.readerShutdown.WaitDown()
 
 	close(ctl.workConnCh)
 	for workConn := range ctl.workConnCh {

+ 56 - 16
server/proxy.go

@@ -19,6 +19,7 @@ import (
 	"fmt"
 	"io"
 	"net"
+	"sync"
 	"time"
 
 	"github.com/fatedier/frp/models/config"
@@ -45,6 +46,7 @@ type BaseProxy struct {
 	name      string
 	ctl       *Control
 	listeners []frpNet.Listener
+	mu        sync.RWMutex
 	log.Logger
 }
 
@@ -276,11 +278,23 @@ type UdpProxy struct {
 	BaseProxy
 	cfg *config.UdpProxyConf
 
-	udpConn      *net.UDPConn
-	workConn     net.Conn
-	sendCh       chan *msg.UdpPacket
-	readCh       chan *msg.UdpPacket
+	// udpConn is the listener of udp packages
+	udpConn *net.UDPConn
+
+	// there are always only one workConn at the same time
+	// get another one if it closed
+	workConn net.Conn
+
+	// sendCh is used for sending packages to workConn
+	sendCh chan *msg.UdpPacket
+
+	// readCh is used for reading packages from workConn
+	readCh chan *msg.UdpPacket
+
+	// checkCloseCh is used for watching if workConn is closed
 	checkCloseCh chan int
+
+	isClosed bool
 }
 
 func (pxy *UdpProxy) Run() (err error) {
@@ -300,39 +314,49 @@ func (pxy *UdpProxy) Run() (err error) {
 	pxy.readCh = make(chan *msg.UdpPacket, 64)
 	pxy.checkCloseCh = make(chan int)
 
+	// read message from workConn, if it returns any error, notify proxy to start a new workConn
 	workConnReaderFn := func(conn net.Conn) {
 		for {
 			var udpMsg msg.UdpPacket
+			pxy.Trace("loop waiting message from udp workConn")
 			if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
 				pxy.Warn("read from workConn for udp error: %v", errRet)
 				conn.Close()
 				// notify proxy to start a new work connection
+				// ignore error here, it means the proxy is closed
 				errors.PanicToError(func() {
 					pxy.checkCloseCh <- 1
 				})
 				return
 			}
 			if errRet := errors.PanicToError(func() {
+				pxy.Trace("get udp message from workConn: %s", udpMsg.Content)
 				pxy.readCh <- &udpMsg
 				StatsAddTrafficOut(pxy.GetName(), int64(len(udpMsg.Content)))
 			}); errRet != nil {
+				conn.Close()
 				pxy.Info("reader goroutine for udp work connection closed")
 				return
 			}
 		}
 	}
+
+	// send message to workConn
 	workConnSenderFn := func(conn net.Conn, ctx context.Context) {
 		var errRet error
 		for {
 			select {
 			case udpMsg, ok := <-pxy.sendCh:
 				if !ok {
+					pxy.Info("sender goroutine for udp work condition closed")
 					return
 				}
 				if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {
 					pxy.Info("sender goroutine for udp work connection closed: %v", errRet)
+					conn.Close()
 					return
 				} else {
+					pxy.Trace("send message to udp workConn: %s", udpMsg.Content)
 					StatsAddTrafficIn(pxy.GetName(), int64(len(udpMsg.Content)))
 					continue
 				}
@@ -344,12 +368,12 @@ func (pxy *UdpProxy) Run() (err error) {
 	}
 
 	go func() {
+		// Sleep a while for waiting control send the NewProxyResp to client.
+		time.Sleep(500 * time.Millisecond)
 		for {
-			// Sleep a while for waiting control send the NewProxyResp to client.
-			time.Sleep(500 * time.Millisecond)
 			workConn, err := pxy.GetWorkConnFromPool()
 			if err != nil {
-				time.Sleep(5 * time.Second)
+				time.Sleep(1 * time.Second)
 				// check if proxy is closed
 				select {
 				case _, ok := <-pxy.checkCloseCh:
@@ -360,6 +384,10 @@ func (pxy *UdpProxy) Run() (err error) {
 				}
 				continue
 			}
+			// close the old workConn and replac it with a new one
+			if pxy.workConn != nil {
+				pxy.workConn.Close()
+			}
 			pxy.workConn = workConn
 			ctx, cancel := context.WithCancel(context.Background())
 			go workConnReaderFn(workConn)
@@ -372,10 +400,14 @@ func (pxy *UdpProxy) Run() (err error) {
 		}
 	}()
 
-	// Read from user connections and send wrapped udp message to sendCh.
+	// Read from user connections and send wrapped udp message to sendCh (forwarded by workConn).
 	// Client will transfor udp message to local udp service and waiting for response for a while.
-	// Response will be wrapped to be transfored in work connection to server.
-	udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh)
+	// Response will be wrapped to be forwarded by work connection to server.
+	// Close readCh and sendCh at the end.
+	go func() {
+		udp.ForwardUserConn(udpConn, pxy.readCh, pxy.sendCh)
+		pxy.Close()
+	}()
 	return nil
 }
 
@@ -384,12 +416,20 @@ func (pxy *UdpProxy) GetConf() config.ProxyConf {
 }
 
 func (pxy *UdpProxy) Close() {
-	pxy.BaseProxy.Close()
-	pxy.workConn.Close()
-	pxy.udpConn.Close()
-	close(pxy.checkCloseCh)
-	close(pxy.readCh)
-	close(pxy.sendCh)
+	pxy.mu.Lock()
+	defer pxy.mu.Unlock()
+	if !pxy.isClosed {
+		pxy.isClosed = true
+
+		pxy.BaseProxy.Close()
+		pxy.workConn.Close()
+		pxy.udpConn.Close()
+
+		// all channels only closed here
+		close(pxy.checkCloseCh)
+		close(pxy.readCh)
+		close(pxy.sendCh)
+	}
 }
 
 // HandleUserTcpConnection is used for incoming tcp user connections.

+ 11 - 0
utils/log/log.go

@@ -55,6 +55,8 @@ func SetLogLevel(logLevel string) {
 		level = 6
 	case "debug":
 		level = 7
+	case "trace":
+		level = 8
 	default:
 		level = 4
 	}
@@ -79,6 +81,10 @@ func Debug(format string, v ...interface{}) {
 	Log.Debug(format, v...)
 }
 
+func Trace(format string, v ...interface{}) {
+	Log.Trace(format, v...)
+}
+
 // Logger
 type Logger interface {
 	AddLogPrefix(string)
@@ -88,6 +94,7 @@ type Logger interface {
 	Warn(string, ...interface{})
 	Info(string, ...interface{})
 	Debug(string, ...interface{})
+	Trace(string, ...interface{})
 }
 
 type PrefixLogger struct {
@@ -136,3 +143,7 @@ func (pl *PrefixLogger) Info(format string, v ...interface{}) {
 func (pl *PrefixLogger) Debug(format string, v ...interface{}) {
 	Log.Debug(pl.prefix+format, v...)
 }
+
+func (pl *PrefixLogger) Trace(format string, v ...interface{}) {
+	Log.Trace(pl.prefix+format, v...)
+}