Browse Source

Merge pull request #3 from fatedier/dev

Merge pull request #3 from fatedier/dev
Hurricanezwf 9 năm trước cách đây
mục cha
commit
b2ca78fa04

+ 1 - 1
.travis.yml

@@ -3,7 +3,7 @@ language: go
 
 go:
     - 1.4.2
-    - 1.5.2 
+    - 1.5.1
 
 install:
     - make

+ 5 - 4
Makefile

@@ -1,4 +1,5 @@
 export PATH := $(GOPATH)/bin:$(PATH)
+export NEW_GOPATH := $(shell pwd)
 
 all: build
 
@@ -9,13 +10,13 @@ godep:
 	godep restore
 
 fmt:
-	@godep go fmt ./...
+	@GOPATH=$(NEW_GOPATH) godep go fmt ./...
 
 frps:
-	godep go build -o bin/frps ./cmd/frps
+	GOPATH=$(NEW_GOPATH) godep go build -o bin/frps ./src/frp/cmd/frps
 
 frpc:
-	godep go build -o bin/frpc ./cmd/frpc
+	GOPATH=$(NEW_GOPATH) godep go build -o bin/frpc ./src/frp/cmd/frpc
 
 test:
-	@godep go test ./...
+	@GOPATH=$(NEW_GOPATH) godep go test ./...

+ 1 - 1
conf/frpc.ini

@@ -1,7 +1,7 @@
 # common是必须的section
 [common]
 server_addr = 127.0.0.1
-bind_port = 7000
+server_port = 7000
 log_file = ./frpc.log
 # debug, info, warn, error
 log_level = debug

+ 8 - 9
cmd/frpc/control.go → src/frp/cmd/frpc/control.go

@@ -7,15 +7,13 @@ import (
 	"sync"
 	"time"
 
-	"github.com/fatedier/frp/models/client"
-	"github.com/fatedier/frp/models/consts"
-	"github.com/fatedier/frp/models/msg"
-	"github.com/fatedier/frp/utils/conn"
-	"github.com/fatedier/frp/utils/log"
+	"frp/models/client"
+	"frp/models/consts"
+	"frp/models/msg"
+	"frp/utils/conn"
+	"frp/utils/log"
 )
 
-var isHeartBeatContinue bool = true
-
 func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
 	defer wait.Done()
 
@@ -30,9 +28,10 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) {
 		// ignore response content now
 		_, err := c.ReadLine()
 		if err == io.EOF {
-			isHeartBeatContinue = false
 			log.Debug("ProxyName [%s], server close this control conn", cli.Name)
 			var sleepTime time.Duration = 1
+
+			// loop until connect to server
 			for {
 				log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort)
 				tmpConn, err := loginToServer(cli)
@@ -114,5 +113,5 @@ func startHeartBeat(c *conn.Conn) {
 			break
 		}
 	}
-	log.Info("heartbeat exit")
+	log.Debug("heartbeat exit")
 }

+ 2 - 2
cmd/frpc/main.go → src/frp/cmd/frpc/main.go

@@ -4,8 +4,8 @@ import (
 	"os"
 	"sync"
 
-	"github.com/fatedier/frp/models/client"
-	"github.com/fatedier/frp/utils/log"
+	"frp/models/client"
+	"frp/utils/log"
 )
 
 func main() {

+ 17 - 11
cmd/frps/control.go → src/frp/cmd/frps/control.go

@@ -6,16 +6,19 @@ import (
 	"io"
 	"time"
 
-	"github.com/fatedier/frp/models/consts"
-	"github.com/fatedier/frp/models/msg"
-	"github.com/fatedier/frp/models/server"
-	"github.com/fatedier/frp/utils/conn"
-	"github.com/fatedier/frp/utils/log"
+	"frp/models/consts"
+	"frp/models/msg"
+	"frp/models/server"
+	"frp/utils/conn"
+	"frp/utils/log"
 )
 
 func ProcessControlConn(l *conn.Listener) {
 	for {
-		c := l.GetConn()
+		c, err := l.GetConn()
+		if err != nil {
+			return
+		}
 		log.Debug("Get one new conn, %v", c.GetRemoteAddr())
 		go controlWorker(c)
 	}
@@ -47,7 +50,6 @@ func controlWorker(c *conn.Conn) {
 	}
 
 	if needRes {
-		// control conn
 		defer c.Close()
 
 		buf, _ := json.Marshal(clientCtlRes)
@@ -62,7 +64,7 @@ func controlWorker(c *conn.Conn) {
 		return
 	}
 
-	// others is from server to client
+	// other messages is from server to client
 	s, ok := server.ProxyServers[clientCtlReq.ProxyName]
 	if !ok {
 		log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName)
@@ -138,7 +140,7 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne
 			return
 		}
 
-		s.CliConnChan <- c
+		s.GetNewCliConn(c)
 	} else {
 		info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type)
 		log.Warn(info)
@@ -153,8 +155,8 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) {
 	isContinueRead := true
 	f := func() {
 		isContinueRead = false
-		c.Close()
 		s.Close()
+		log.Error("ProxyName [%s], client heartbeat timeout", s.Name)
 	}
 	timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f)
 	defer timer.Stop()
@@ -164,13 +166,17 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) {
 		if err != nil {
 			if err == io.EOF {
 				log.Warn("ProxyName [%s], client is dead!", s.Name)
-				c.Close()
 				s.Close()
 				break
+			} else if c.IsClosed() {
+				log.Warn("ProxyName [%s], client connection is closed", s.Name)
+				break
 			}
+
 			log.Error("ProxyName [%s], read error: %v", s.Name, err)
 			continue
 		}
+		log.Debug("ProxyName [%s], get heartbeat", s.Name)
 
 		timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second)
 	}

+ 3 - 3
cmd/frps/main.go → src/frp/cmd/frps/main.go

@@ -3,9 +3,9 @@ package main
 import (
 	"os"
 
-	"github.com/fatedier/frp/models/server"
-	"github.com/fatedier/frp/utils/conn"
-	"github.com/fatedier/frp/utils/log"
+	"frp/models/server"
+	"frp/utils/conn"
+	"frp/utils/log"
 )
 
 func main() {

+ 4 - 4
models/client/client.go → src/frp/models/client/client.go

@@ -3,10 +3,10 @@ package client
 import (
 	"encoding/json"
 
-	"github.com/fatedier/frp/models/consts"
-	"github.com/fatedier/frp/models/msg"
-	"github.com/fatedier/frp/utils/conn"
-	"github.com/fatedier/frp/utils/log"
+	"frp/models/consts"
+	"frp/models/msg"
+	"frp/utils/conn"
+	"frp/utils/log"
 )
 
 type ProxyClient struct {

+ 0 - 0
models/client/config.go → src/frp/models/client/config.go


+ 0 - 0
models/consts/consts.go → src/frp/models/consts/consts.go


+ 0 - 0
models/msg/msg.go → src/frp/models/msg/msg.go


+ 1 - 0
models/server/config.go → src/frp/models/server/config.go

@@ -15,6 +15,7 @@ var (
 	LogLevel         string = "warn"
 	LogWay           string = "file"
 	HeartBeatTimeout int64  = 30
+	UserConnTimeout  int64  = 10
 )
 
 var ProxyServers map[string]*ProxyServer = make(map[string]*ProxyServer)

+ 40 - 20
models/server/server.go → src/frp/models/server/server.go

@@ -3,29 +3,30 @@ package server
 import (
 	"container/list"
 	"sync"
+	"time"
 
-	"github.com/fatedier/frp/models/consts"
-	"github.com/fatedier/frp/utils/conn"
-	"github.com/fatedier/frp/utils/log"
+	"frp/models/consts"
+	"frp/utils/conn"
+	"frp/utils/log"
 )
 
 type ProxyServer struct {
-	Name        string
-	Passwd      string
-	BindAddr    string
-	ListenPort  int64
-	Status      int64
-	CliConnChan chan *conn.Conn // get client conns from control goroutine
-
-	listener     *conn.Listener // accept new connection from remote users
-	ctlMsgChan   chan int64     // every time accept a new user conn, put "1" to the channel
-	userConnList *list.List     // store user conns
+	Name       string
+	Passwd     string
+	BindAddr   string
+	ListenPort int64
+	Status     int64
+
+	listener     *conn.Listener  // accept new connection from remote users
+	ctlMsgChan   chan int64      // every time accept a new user conn, put "1" to the channel
+	cliConnChan  chan *conn.Conn // get client conns from control goroutine
+	userConnList *list.List      // store user conns
 	mutex        sync.Mutex
 }
 
 func (p *ProxyServer) Init() {
 	p.Status = consts.Idle
-	p.CliConnChan = make(chan *conn.Conn)
+	p.cliConnChan = make(chan *conn.Conn)
 	p.ctlMsgChan = make(chan int64)
 	p.userConnList = list.New()
 }
@@ -48,13 +49,13 @@ func (p *ProxyServer) Start() (err error) {
 
 	p.Status = consts.Working
 
-	// start a goroutine for listener
+	// start a goroutine for listener to accept user connection
 	go func() {
 		for {
 			// block
-			// if listener is closed, get nil
-			c := p.listener.GetConn()
-			if c == nil {
+			// if listener is closed, err returned
+			c, err := p.listener.GetConn()
+			if err != nil {
 				log.Info("ProxyName [%s], listener is closed", p.Name)
 				return
 			}
@@ -73,13 +74,28 @@ func (p *ProxyServer) Start() (err error) {
 
 			// put msg to control conn
 			p.ctlMsgChan <- 1
+
+			// set timeout
+			time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() {
+				p.Lock()
+				defer p.Unlock()
+				element := p.userConnList.Front()
+				if element == nil {
+					return
+				}
+
+				userConn := element.Value.(*conn.Conn)
+				if userConn == c {
+					log.Warn("ProxyName [%s], user conn [%s] timeout", p.Name, c.GetRemoteAddr())
+				}
+			})
 		}
 	}()
 
 	// start another goroutine for join two conns from client and user
 	go func() {
 		for {
-			cliConn, ok := <-p.CliConnChan
+			cliConn, ok := <-p.cliConnChan
 			if !ok {
 				return
 			}
@@ -114,7 +130,7 @@ func (p *ProxyServer) Close() {
 	p.Status = consts.Idle
 	p.listener.Close()
 	close(p.ctlMsgChan)
-	close(p.CliConnChan)
+	close(p.cliConnChan)
 	p.userConnList = list.New()
 	p.Unlock()
 }
@@ -128,3 +144,7 @@ func (p *ProxyServer) WaitUserConn() (closeFlag bool) {
 	}
 	return
 }
+
+func (p *ProxyServer) GetNewCliConn(c *conn.Conn) {
+	p.cliConnChan <- c
+}

+ 0 - 0
utils/broadcast/broadcast.go → src/frp/utils/broadcast/broadcast.go


+ 0 - 0
utils/broadcast/broadcast_test.go → src/frp/utils/broadcast/broadcast_test.go


+ 7 - 7
utils/conn/conn.go → src/frp/utils/conn/conn.go

@@ -7,7 +7,7 @@ import (
 	"net"
 	"sync"
 
-	"github.com/fatedier/frp/utils/log"
+	"frp/utils/log"
 )
 
 type Listener struct {
@@ -52,15 +52,15 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) {
 	return l, err
 }
 
-// wait util get one new connection or close
-// if listener is closed, return nil
-func (l *Listener) GetConn() (conn *Conn) {
+// wait util get one new connection or listener is closed
+// if listener is closed, err returned
+func (l *Listener) GetConn() (conn *Conn, err error) {
 	var ok bool
 	conn, ok = <-l.conns
 	if !ok {
-		return nil
+		return conn, fmt.Errorf("channel close")
 	}
-	return conn
+	return conn, nil
 }
 
 func (l *Listener) Close() {
@@ -116,7 +116,7 @@ func (c *Conn) Write(content string) (err error) {
 }
 
 func (c *Conn) Close() {
-	if c.TcpConn != nil {
+	if c.TcpConn != nil && c.closeFlag == false {
 		c.closeFlag = true
 		c.TcpConn.Close()
 	}

+ 0 - 0
utils/log/log.go → src/frp/utils/log/log.go


+ 0 - 0
utils/pcrypto/pcrypto.go → src/frp/utils/pcrypto/pcrypto.go


+ 0 - 0
utils/pcrypto/pcrypto_test.go → src/frp/utils/pcrypto/pcrypto_test.go