Browse Source

Merge pull request #1112 from fatedier/p2p

xtcp: wrap yamux on kcp connections, fix #1103
fatedier 6 years ago
parent
commit
3c03690ab7
5 changed files with 43 additions and 6 deletions
  1. 18 1
      client/proxy/proxy.go
  2. 20 1
      client/visitor.go
  3. 2 2
      cmd/frpc/sub/xtcp.go
  4. 1 1
      go.mod
  5. 2 1
      go.sum

+ 18 - 1
client/proxy/proxy.go

@@ -18,6 +18,7 @@ import (
 	"bytes"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"net"
 	"sync"
 	"time"
@@ -33,6 +34,7 @@ import (
 	"github.com/fatedier/golib/errors"
 	frpIo "github.com/fatedier/golib/io"
 	"github.com/fatedier/golib/pool"
+	fmux "github.com/hashicorp/yamux"
 )
 
 // Proxy defines how to handle work connections for different proxy type.
@@ -302,8 +304,23 @@ func (pxy *XtcpProxy) InWorkConn(conn frpNet.Conn) {
 		return
 	}
 
+	fmuxCfg := fmux.DefaultConfig()
+	fmuxCfg.KeepAliveInterval = 5 * time.Second
+	fmuxCfg.LogOutput = ioutil.Discard
+	sess, err := fmux.Server(kcpConn, fmuxCfg)
+	if err != nil {
+		pxy.Error("create yamux server from kcp connection error: %v", err)
+		return
+	}
+	defer sess.Close()
+	muxConn, err := sess.Accept()
+	if err != nil {
+		pxy.Error("accept for yamux connection error: %v", err)
+		return
+	}
+
 	HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf,
-		frpNet.WrapConn(kcpConn), []byte(pxy.cfg.Sk))
+		frpNet.WrapConn(muxConn), []byte(pxy.cfg.Sk))
 }
 
 // UDP

+ 20 - 1
client/visitor.go

@@ -18,6 +18,7 @@ import (
 	"bytes"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"net"
 	"strconv"
 	"strings"
@@ -35,6 +36,7 @@ import (
 
 	frpIo "github.com/fatedier/golib/io"
 	"github.com/fatedier/golib/pool"
+	fmux "github.com/hashicorp/yamux"
 )
 
 // Visitor is used for forward traffics from local port tot remote service.
@@ -280,6 +282,8 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
 		sv.Error("listen on visitorConn's local adress error: %v", err)
 		return
 	}
+	defer lConn.Close()
+
 	lConn.SetReadDeadline(time.Now().Add(5 * time.Second))
 	sidBuf := pool.GetBuf(1024)
 	n, _, err = lConn.ReadFromUDP(sidBuf)
@@ -314,7 +318,22 @@ func (sv *XtcpVisitor) handleConn(userConn frpNet.Conn) {
 		remote = frpIo.WithCompression(remote)
 	}
 
-	frpIo.Join(userConn, remote)
+	fmuxCfg := fmux.DefaultConfig()
+	fmuxCfg.KeepAliveInterval = 5 * time.Second
+	fmuxCfg.LogOutput = ioutil.Discard
+	sess, err := fmux.Client(remote, fmuxCfg)
+	if err != nil {
+		sv.Error("create yamux session error: %v", err)
+		return
+	}
+	defer sess.Close()
+	muxConn, err := sess.Open()
+	if err != nil {
+		sv.Error("open yamux stream error: %v", err)
+		return
+	}
+
+	frpIo.Join(userConn, muxConn)
 	sv.Debug("join connections closed")
 }
 

+ 2 - 2
cmd/frpc/sub/xtcp.go

@@ -68,7 +68,7 @@ var xtcpCmd = &cobra.Command{
 		if role == "server" {
 			cfg := &config.XtcpProxyConf{}
 			cfg.ProxyName = prefix + proxyName
-			cfg.ProxyType = consts.StcpProxy
+			cfg.ProxyType = consts.XtcpProxy
 			cfg.UseEncryption = useEncryption
 			cfg.UseCompression = useCompression
 			cfg.Role = role
@@ -84,7 +84,7 @@ var xtcpCmd = &cobra.Command{
 		} else if role == "visitor" {
 			cfg := &config.XtcpVisitorConf{}
 			cfg.ProxyName = prefix + proxyName
-			cfg.ProxyType = consts.StcpProxy
+			cfg.ProxyType = consts.XtcpProxy
 			cfg.UseEncryption = useEncryption
 			cfg.UseCompression = useCompression
 			cfg.Role = role

+ 1 - 1
go.mod

@@ -12,7 +12,7 @@ require (
 	github.com/gorilla/context v1.1.1 // indirect
 	github.com/gorilla/mux v1.6.2
 	github.com/gorilla/websocket v1.2.0
-	github.com/hashicorp/yamux v0.0.0-20180314200745-2658be15c5f0
+	github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d
 	github.com/inconshreveable/mousetrap v1.0.0 // indirect
 	github.com/mattn/go-runewidth v0.0.4 // indirect
 	github.com/pkg/errors v0.8.0 // indirect

+ 2 - 1
go.sum

@@ -9,7 +9,8 @@ github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8
 github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
 github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/gorilla/websocket v1.2.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
-github.com/hashicorp/yamux v0.0.0-20180314200745-2658be15c5f0/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
+github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d h1:kJCB4vdITiW1eC1vq2e6IsrXKrZit1bv/TDYFGMp4BQ=
+github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM=
 github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/mattn/go-runewidth v0.0.4 h1:2BvfKmzob6Bmd4YsL0zygOqfdFnK7GR4QL06Do4/p7Y=