Browse Source

new proxy type: stcp(secret tcp)

fatedier 7 years ago
parent
commit
171bc8dd22

+ 62 - 36
client/control.go

@@ -25,7 +25,7 @@ import (
 	"github.com/fatedier/frp/models/msg"
 	"github.com/fatedier/frp/utils/crypto"
 	"github.com/fatedier/frp/utils/log"
-	"github.com/fatedier/frp/utils/net"
+	frpNet "github.com/fatedier/frp/utils/net"
 	"github.com/fatedier/frp/utils/util"
 	"github.com/fatedier/frp/utils/version"
 	"github.com/xtaci/smux"
@@ -48,8 +48,14 @@ type Control struct {
 	// proxies
 	proxies map[string]Proxy
 
+	// vistor configures
+	vistorCfgs map[string]config.ProxyConf
+
+	// vistors
+	vistors map[string]Vistor
+
 	// control connection
-	conn net.Conn
+	conn frpNet.Conn
 
 	// tcp stream multiplexing, if enabled
 	session *smux.Session
@@ -77,7 +83,7 @@ type Control struct {
 	log.Logger
 }
 
-func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf) *Control {
+func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf, vistorCfgs map[string]config.ProxyConf) *Control {
 	loginMsg := &msg.Login{
 		Arch:      runtime.GOARCH,
 		Os:        runtime.GOOS,
@@ -86,14 +92,16 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf) *Control {
 		Version:   version.Full(),
 	}
 	return &Control{
-		svr:      svr,
-		loginMsg: loginMsg,
-		pxyCfgs:  pxyCfgs,
-		proxies:  make(map[string]Proxy),
-		sendCh:   make(chan msg.Message, 10),
-		readCh:   make(chan msg.Message, 10),
-		closedCh: make(chan int),
-		Logger:   log.NewPrefixLogger(""),
+		svr:        svr,
+		loginMsg:   loginMsg,
+		pxyCfgs:    pxyCfgs,
+		vistorCfgs: vistorCfgs,
+		proxies:    make(map[string]Proxy),
+		vistors:    make(map[string]Vistor),
+		sendCh:     make(chan msg.Message, 10),
+		readCh:     make(chan msg.Message, 10),
+		closedCh:   make(chan int),
+		Logger:     log.NewPrefixLogger(""),
 	}
 }
 
@@ -105,16 +113,17 @@ func NewControl(svr *Service, pxyCfgs map[string]config.ProxyConf) *Control {
 // 6. In controler(): ini readCh, sendCh, closedCh
 // 7. In controler(): start new reader(), writer(), manager()
 // controler() will keep running
-func (ctl *Control) Run() error {
+func (ctl *Control) Run() (err error) {
 	for {
-		err := ctl.login()
+		err = ctl.login()
 		if err != nil {
+			ctl.Warn("login to server failed: %v", err)
+
 			// if login_fail_exit is true, just exit this program
 			// otherwise sleep a while and continues relogin to server
 			if config.ClientCommonCfg.LoginFailExit {
-				return err
+				return
 			} else {
-				ctl.Warn("login to server fail: %v", err)
 				time.Sleep(30 * time.Second)
 			}
 		} else {
@@ -133,29 +142,25 @@ func (ctl *Control) Run() error {
 		cfg.UnMarshalToMsg(&newProxyMsg)
 		ctl.sendCh <- &newProxyMsg
 	}
-	return nil
-}
 
-func (ctl *Control) NewWorkConn() {
-	var (
-		workConn net.Conn
-		err      error
-	)
-	if config.ClientCommonCfg.TcpMux {
-		stream, err := ctl.session.OpenStream()
+	// start all local vistors
+	for _, cfg := range ctl.vistorCfgs {
+		vistor := NewVistor(ctl, cfg)
+		err = vistor.Run()
 		if err != nil {
-			ctl.Warn("start new work connection error: %v", err)
-			return
+			vistor.Warn("start error: %v", err)
+			continue
 		}
-		workConn = net.WrapConn(stream)
+		ctl.vistors[cfg.GetName()] = vistor
+		vistor.Info("start vistor success")
+	}
+	return nil
+}
 
-	} else {
-		workConn, err = net.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol,
-			fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort))
-		if err != nil {
-			ctl.Warn("start new work connection error: %v", err)
-			return
-		}
+func (ctl *Control) NewWorkConn() {
+	workConn, err := ctl.connectServer()
+	if err != nil {
+		return
 	}
 
 	m := &msg.NewWorkConn{
@@ -199,7 +204,7 @@ func (ctl *Control) login() (err error) {
 		ctl.session.Close()
 	}
 
-	conn, err := net.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol,
+	conn, err := frpNet.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol,
 		fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort))
 	if err != nil {
 		return err
@@ -221,7 +226,7 @@ func (ctl *Control) login() (err error) {
 			session.Close()
 			return errRet
 		}
-		conn = net.WrapConn(stream)
+		conn = frpNet.WrapConn(stream)
 		ctl.session = session
 	}
 
@@ -261,6 +266,27 @@ func (ctl *Control) login() (err error) {
 	return nil
 }
 
+func (ctl *Control) connectServer() (conn frpNet.Conn, err error) {
+	if config.ClientCommonCfg.TcpMux {
+		stream, errRet := ctl.session.OpenStream()
+		if errRet != nil {
+			err = errRet
+			ctl.Warn("start new connection to server error: %v", err)
+			return
+		}
+		conn = frpNet.WrapConn(stream)
+
+	} else {
+		conn, err = frpNet.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol,
+			fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort))
+		if err != nil {
+			ctl.Warn("start new connection to server error: %v", err)
+			return
+		}
+	}
+	return
+}
+
 func (ctl *Control) reader() {
 	defer func() {
 		if err := recover(); err != nil {

+ 34 - 1
client/proxy.go

@@ -31,7 +31,7 @@ import (
 	frpNet "github.com/fatedier/frp/utils/net"
 )
 
-// Proxy defines how to work for different proxy type.
+// Proxy defines how to deal with work connections for different proxy type.
 type Proxy interface {
 	Run() error
 
@@ -67,6 +67,11 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy) {
 			BaseProxy: baseProxy,
 			cfg:       cfg,
 		}
+	case *config.StcpProxyConf:
+		pxy = &StcpProxy{
+			BaseProxy: baseProxy,
+			cfg:       cfg,
+		}
 	}
 	return
 }
@@ -162,6 +167,34 @@ func (pxy *HttpsProxy) InWorkConn(conn frpNet.Conn) {
 	HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn)
 }
 
+// STCP
+type StcpProxy struct {
+	BaseProxy
+
+	cfg         *config.StcpProxyConf
+	proxyPlugin plugin.Plugin
+}
+
+func (pxy *StcpProxy) Run() (err error) {
+	if pxy.cfg.Plugin != "" {
+		pxy.proxyPlugin, err = plugin.Create(pxy.cfg.Plugin, pxy.cfg.PluginParams)
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+func (pxy *StcpProxy) Close() {
+	if pxy.proxyPlugin != nil {
+		pxy.proxyPlugin.Close()
+	}
+}
+
+func (pxy *StcpProxy) InWorkConn(conn frpNet.Conn) {
+	HandleTcpWorkConnection(&pxy.cfg.LocalSvrConf, pxy.proxyPlugin, &pxy.cfg.BaseProxyConf, conn)
+}
+
 // UDP
 type UdpProxy struct {
 	BaseProxy

+ 2 - 2
client/service.go

@@ -23,11 +23,11 @@ type Service struct {
 	closedCh chan int
 }
 
-func NewService(pxyCfgs map[string]config.ProxyConf) (svr *Service) {
+func NewService(pxyCfgs map[string]config.ProxyConf, vistorCfgs map[string]config.ProxyConf) (svr *Service) {
 	svr = &Service{
 		closedCh: make(chan int),
 	}
-	ctl := NewControl(svr, pxyCfgs)
+	ctl := NewControl(svr, pxyCfgs, vistorCfgs)
 	svr.ctl = ctl
 	return
 }

+ 145 - 0
client/vistor.go

@@ -0,0 +1,145 @@
+// Copyright 2017 fatedier, fatedier@gmail.com
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package client
+
+import (
+	"io"
+	"sync"
+	"time"
+
+	"github.com/fatedier/frp/models/config"
+	"github.com/fatedier/frp/models/msg"
+	frpIo "github.com/fatedier/frp/utils/io"
+	"github.com/fatedier/frp/utils/log"
+	frpNet "github.com/fatedier/frp/utils/net"
+	"github.com/fatedier/frp/utils/util"
+)
+
+// Vistor is used for forward traffics from local port tot remote service.
+type Vistor interface {
+	Run() error
+	Close()
+	log.Logger
+}
+
+func NewVistor(ctl *Control, pxyConf config.ProxyConf) (vistor Vistor) {
+	baseVistor := BaseVistor{
+		ctl:    ctl,
+		Logger: log.NewPrefixLogger(pxyConf.GetName()),
+	}
+	switch cfg := pxyConf.(type) {
+	case *config.StcpProxyConf:
+		vistor = &StcpVistor{
+			BaseVistor: baseVistor,
+			cfg:        cfg,
+		}
+	}
+	return
+}
+
+type BaseVistor struct {
+	ctl    *Control
+	l      frpNet.Listener
+	closed bool
+	mu     sync.RWMutex
+	log.Logger
+}
+
+type StcpVistor struct {
+	BaseVistor
+
+	cfg *config.StcpProxyConf
+}
+
+func (sv *StcpVistor) Run() (err error) {
+	sv.l, err = frpNet.ListenTcp(sv.cfg.BindAddr, int64(sv.cfg.BindPort))
+	if err != nil {
+		return
+	}
+
+	go sv.worker()
+	return
+}
+
+func (sv *StcpVistor) Close() {
+	sv.l.Close()
+}
+
+func (sv *StcpVistor) worker() {
+	for {
+		conn, err := sv.l.Accept()
+		if err != nil {
+			sv.Warn("stcp local listener closed")
+			return
+		}
+
+		go sv.handleConn(conn)
+	}
+}
+
+func (sv *StcpVistor) handleConn(userConn frpNet.Conn) {
+	defer userConn.Close()
+
+	sv.Debug("get a new stcp user connection")
+	vistorConn, err := sv.ctl.connectServer()
+	if err != nil {
+		return
+	}
+	defer vistorConn.Close()
+
+	now := time.Now().Unix()
+	newVistorConnMsg := &msg.NewVistorConn{
+		ProxyName:      sv.cfg.ServerName,
+		SignKey:        util.GetAuthKey(sv.cfg.Sk, now),
+		Timestamp:      now,
+		UseEncryption:  sv.cfg.UseEncryption,
+		UseCompression: sv.cfg.UseCompression,
+	}
+	err = msg.WriteMsg(vistorConn, newVistorConnMsg)
+	if err != nil {
+		sv.Warn("send newVistorConnMsg to server error: %v", err)
+		return
+	}
+
+	var newVistorConnRespMsg msg.NewVistorConnResp
+	vistorConn.SetReadDeadline(time.Now().Add(10 * time.Second))
+	err = msg.ReadMsgInto(vistorConn, &newVistorConnRespMsg)
+	if err != nil {
+		sv.Warn("get newVistorConnRespMsg error: %v", err)
+		return
+	}
+	vistorConn.SetReadDeadline(time.Time{})
+
+	if newVistorConnRespMsg.Error != "" {
+		sv.Warn("start new vistor connection error: %s", newVistorConnRespMsg.Error)
+		return
+	}
+
+	var remote io.ReadWriteCloser
+	remote = vistorConn
+	if sv.cfg.UseEncryption {
+		remote, err = frpIo.WithEncryption(remote, []byte(sv.cfg.Sk))
+		if err != nil {
+			sv.Error("create encryption stream error: %v", err)
+			return
+		}
+	}
+
+	if sv.cfg.UseCompression {
+		remote = frpIo.WithCompression(remote)
+	}
+
+	frpIo.Join(userConn, remote)
+}

+ 2 - 2
cmd/frpc/main.go

@@ -106,7 +106,7 @@ func main() {
 		}
 	}
 
-	pxyCfgs, err := config.LoadProxyConfFromFile(config.ClientCommonCfg.User, conf, config.ClientCommonCfg.Start)
+	pxyCfgs, vistorCfgs, err := config.LoadProxyConfFromFile(config.ClientCommonCfg.User, conf, config.ClientCommonCfg.Start)
 	if err != nil {
 		fmt.Println(err)
 		os.Exit(1)
@@ -115,7 +115,7 @@ func main() {
 	log.InitLog(config.ClientCommonCfg.LogWay, config.ClientCommonCfg.LogFile,
 		config.ClientCommonCfg.LogLevel, config.ClientCommonCfg.LogMaxDays)
 
-	svr := client.NewService(pxyCfgs)
+	svr := client.NewService(pxyCfgs, vistorCfgs)
 	err = svr.Run()
 	if err != nil {
 		fmt.Println(err)

+ 21 - 0
conf/frpc_full.ini

@@ -110,3 +110,24 @@ remote_port = 6004
 plugin = http_proxy
 plugin_http_user = abc
 plugin_http_passwd = abc
+
+[secret_tcp]
+# If the type is secret tcp, remote_port is useless
+# Who want to connect local port should deploy another frpc with stcp proxy and role is vistor
+type = stcp
+# sk used for authentication for vistors
+sk = abcdefg
+local_ip = 127.0.0.1
+local_port = 22
+use_encryption = false
+use_compression = false
+
+[secret_tcp_vistor]
+# frpc role vistor -> frps -> frpc role server
+role = vistor
+type = stcp
+sk = abcdefg
+bind_addr = 127.0.0.1
+bind_port = 9000
+use_encryption = false
+use_compression = false

+ 93 - 7
models/config/proxy.go

@@ -35,6 +35,7 @@ func init() {
 	proxyConfTypeMap[consts.UdpProxy] = reflect.TypeOf(UdpProxyConf{})
 	proxyConfTypeMap[consts.HttpProxy] = reflect.TypeOf(HttpProxyConf{})
 	proxyConfTypeMap[consts.HttpsProxy] = reflect.TypeOf(HttpsProxyConf{})
+	proxyConfTypeMap[consts.StcpProxy] = reflect.TypeOf(StcpProxyConf{})
 }
 
 // NewConfByType creates a empty ProxyConf object by proxyType.
@@ -388,8 +389,10 @@ func (cfg *HttpProxyConf) LoadFromFile(name string, section ini.Section) (err er
 	if err = cfg.DomainConf.LoadFromFile(name, section); err != nil {
 		return
 	}
-	if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil {
-		return
+	if err = cfg.PluginConf.LoadFromFile(name, section); err != nil {
+		if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil {
+			return
+		}
 	}
 
 	var (
@@ -447,8 +450,10 @@ func (cfg *HttpsProxyConf) LoadFromFile(name string, section ini.Section) (err e
 	if err = cfg.DomainConf.LoadFromFile(name, section); err != nil {
 		return
 	}
-	if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil {
-		return
+	if err = cfg.PluginConf.LoadFromFile(name, section); err != nil {
+		if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil {
+			return
+		}
 	}
 	return
 }
@@ -466,9 +471,81 @@ func (cfg *HttpsProxyConf) Check() (err error) {
 	return
 }
 
+// STCP
+type StcpProxyConf struct {
+	BaseProxyConf
+
+	Role string `json:"role"`
+	Sk   string `json:"sk"`
+
+	// used in role server
+	LocalSvrConf
+	PluginConf
+
+	// used in role vistor
+	ServerName string `json:"server_name"`
+	BindAddr   string `json:"bind_addr"`
+	BindPort   int    `json:"bind_port"`
+}
+
+// Only for role server.
+func (cfg *StcpProxyConf) LoadFromMsg(pMsg *msg.NewProxy) {
+	cfg.BaseProxyConf.LoadFromMsg(pMsg)
+	cfg.Sk = pMsg.Sk
+}
+
+func (cfg *StcpProxyConf) LoadFromFile(name string, section ini.Section) (err error) {
+	if err = cfg.BaseProxyConf.LoadFromFile(name, section); err != nil {
+		return
+	}
+
+	tmpStr := section["role"]
+	if tmpStr == "server" || tmpStr == "vistor" {
+		cfg.Role = tmpStr
+	} else {
+		cfg.Role = "server"
+	}
+
+	cfg.Sk = section["sk"]
+
+	if tmpStr == "vistor" {
+		prefix := section["prefix"]
+		cfg.ServerName = prefix + section["server_name"]
+		if cfg.BindAddr = section["bind_addr"]; cfg.BindAddr == "" {
+			cfg.BindAddr = "127.0.0.1"
+		}
+
+		if tmpStr, ok := section["bind_port"]; ok {
+			if cfg.BindPort, err = strconv.Atoi(tmpStr); err != nil {
+				return fmt.Errorf("Parse conf error: proxy [%s] bind_port error", name)
+			}
+		} else {
+			return fmt.Errorf("Parse conf error: proxy [%s] bind_port not found", name)
+		}
+	} else {
+		if err = cfg.PluginConf.LoadFromFile(name, section); err != nil {
+			if err = cfg.LocalSvrConf.LoadFromFile(name, section); err != nil {
+				return
+			}
+		}
+	}
+	return
+}
+
+func (cfg *StcpProxyConf) UnMarshalToMsg(pMsg *msg.NewProxy) {
+	cfg.BaseProxyConf.UnMarshalToMsg(pMsg)
+	pMsg.Sk = cfg.Sk
+}
+
+func (cfg *StcpProxyConf) Check() (err error) {
+	return
+}
+
 // if len(startProxy) is 0, start all
 // otherwise just start proxies in startProxy map
-func LoadProxyConfFromFile(prefix string, conf ini.File, startProxy map[string]struct{}) (proxyConfs map[string]ProxyConf, err error) {
+func LoadProxyConfFromFile(prefix string, conf ini.File, startProxy map[string]struct{}) (
+	proxyConfs map[string]ProxyConf, vistorConfs map[string]ProxyConf, err error) {
+
 	if prefix != "" {
 		prefix += "."
 	}
@@ -478,14 +555,23 @@ func LoadProxyConfFromFile(prefix string, conf ini.File, startProxy map[string]s
 		startAll = false
 	}
 	proxyConfs = make(map[string]ProxyConf)
+	vistorConfs = make(map[string]ProxyConf)
 	for name, section := range conf {
 		_, shouldStart := startProxy[name]
 		if name != "common" && (startAll || shouldStart) {
+			// some proxy or visotr configure may be used this prefix
+			section["prefix"] = prefix
 			cfg, err := NewProxyConfFromFile(name, section)
 			if err != nil {
-				return proxyConfs, err
+				return proxyConfs, vistorConfs, err
+			}
+
+			role := section["role"]
+			if role == "vistor" {
+				vistorConfs[prefix+name] = cfg
+			} else {
+				proxyConfs[prefix+name] = cfg
 			}
-			proxyConfs[prefix+name] = cfg
 		}
 	}
 	return

+ 1 - 0
models/consts/consts.go

@@ -27,4 +27,5 @@ var (
 	UdpProxy   string = "udp"
 	HttpProxy  string = "http"
 	HttpsProxy string = "https"
+	StcpProxy  string = "stcp"
 )

+ 31 - 11
models/msg/msg.go

@@ -20,17 +20,19 @@ import (
 )
 
 const (
-	TypeLogin         = 'o'
-	TypeLoginResp     = '1'
-	TypeNewProxy      = 'p'
-	TypeNewProxyResp  = '2'
-	TypeCloseProxy    = 'c'
-	TypeNewWorkConn   = 'w'
-	TypeReqWorkConn   = 'r'
-	TypeStartWorkConn = 's'
-	TypePing          = 'h'
-	TypePong          = '4'
-	TypeUdpPacket     = 'u'
+	TypeLogin             = 'o'
+	TypeLoginResp         = '1'
+	TypeNewProxy          = 'p'
+	TypeNewProxyResp      = '2'
+	TypeCloseProxy        = 'c'
+	TypeNewWorkConn       = 'w'
+	TypeReqWorkConn       = 'r'
+	TypeStartWorkConn     = 's'
+	TypeNewVistorConn     = 'v'
+	TypeNewVistorConnResp = '3'
+	TypePing              = 'h'
+	TypePong              = '4'
+	TypeUdpPacket         = 'u'
 )
 
 var (
@@ -50,6 +52,8 @@ func init() {
 	TypeMap[TypeNewWorkConn] = reflect.TypeOf(NewWorkConn{})
 	TypeMap[TypeReqWorkConn] = reflect.TypeOf(ReqWorkConn{})
 	TypeMap[TypeStartWorkConn] = reflect.TypeOf(StartWorkConn{})
+	TypeMap[TypeNewVistorConn] = reflect.TypeOf(NewVistorConn{})
+	TypeMap[TypeNewVistorConnResp] = reflect.TypeOf(NewVistorConnResp{})
 	TypeMap[TypePing] = reflect.TypeOf(Ping{})
 	TypeMap[TypePong] = reflect.TypeOf(Pong{})
 	TypeMap[TypeUdpPacket] = reflect.TypeOf(UdpPacket{})
@@ -100,6 +104,9 @@ type NewProxy struct {
 	HostHeaderRewrite string   `json:"host_header_rewrite"`
 	HttpUser          string   `json:"http_user"`
 	HttpPwd           string   `json:"http_pwd"`
+
+	// stcp
+	Sk string `json:"sk"`
 }
 
 type NewProxyResp struct {
@@ -122,6 +129,19 @@ type StartWorkConn struct {
 	ProxyName string `json:"proxy_name"`
 }
 
+type NewVistorConn struct {
+	ProxyName      string `json:"proxy_name"`
+	SignKey        string `json:"sign_key"`
+	Timestamp      int64  `json:"timestamp"`
+	UseEncryption  bool   `json:"use_encryption"`
+	UseCompression bool   `json:"use_compression"`
+}
+
+type NewVistorConnResp struct {
+	ProxyName string `json:"proxy_name"`
+	Error     string `json:"error"`
+}
+
 type Ping struct {
 }
 

+ 74 - 0
server/manager.go

@@ -16,7 +16,12 @@ package server
 
 import (
 	"fmt"
+	"io"
 	"sync"
+
+	frpIo "github.com/fatedier/frp/utils/io"
+	frpNet "github.com/fatedier/frp/utils/net"
+	"github.com/fatedier/frp/utils/util"
 )
 
 type ControlManager struct {
@@ -87,3 +92,72 @@ func (pm *ProxyManager) GetByName(name string) (pxy Proxy, ok bool) {
 	pxy, ok = pm.pxys[name]
 	return
 }
+
+// Manager for vistor listeners.
+type VistorManager struct {
+	vistorListeners map[string]*frpNet.CustomListener
+	skMap           map[string]string
+
+	mu sync.RWMutex
+}
+
+func NewVistorManager() *VistorManager {
+	return &VistorManager{
+		vistorListeners: make(map[string]*frpNet.CustomListener),
+		skMap:           make(map[string]string),
+	}
+}
+
+func (vm *VistorManager) Listen(name string, sk string) (l *frpNet.CustomListener, err error) {
+	vm.mu.Lock()
+	defer vm.mu.Unlock()
+
+	if _, ok := vm.vistorListeners[name]; ok {
+		err = fmt.Errorf("custom listener for [%s] is repeated", name)
+		return
+	}
+
+	l = frpNet.NewCustomListener()
+	vm.vistorListeners[name] = l
+	vm.skMap[name] = sk
+	return
+}
+
+func (vm *VistorManager) NewConn(name string, conn frpNet.Conn, timestamp int64, signKey string,
+	useEncryption bool, useCompression bool) (err error) {
+
+	vm.mu.RLock()
+	defer vm.mu.RUnlock()
+
+	if l, ok := vm.vistorListeners[name]; ok {
+		var sk string
+		if sk = vm.skMap[name]; util.GetAuthKey(sk, timestamp) != signKey {
+			err = fmt.Errorf("vistor connection of [%s] auth failed", name)
+			return
+		}
+
+		var rwc io.ReadWriteCloser = conn
+		if useEncryption {
+			if rwc, err = frpIo.WithEncryption(rwc, []byte(sk)); err != nil {
+				err = fmt.Errorf("create encryption connection failed: %v", err)
+				return
+			}
+		}
+		if useCompression {
+			rwc = frpIo.WithCompression(rwc)
+		}
+		err = l.PutConn(frpNet.WrapReadWriteCloserToConn(rwc))
+	} else {
+		err = fmt.Errorf("custom listener for [%s] doesn't exist", name)
+		return
+	}
+	return
+}
+
+func (vm *VistorManager) CloseListener(name string) {
+	vm.mu.Lock()
+	defer vm.mu.Unlock()
+
+	delete(vm.vistorListeners, name)
+	delete(vm.skMap, name)
+}

+ 32 - 0
server/proxy.go

@@ -143,6 +143,11 @@ func NewProxy(ctl *Control, pxyConf config.ProxyConf) (pxy Proxy, err error) {
 			BaseProxy: basePxy,
 			cfg:       cfg,
 		}
+	case *config.StcpProxyConf:
+		pxy = &StcpProxy{
+			BaseProxy: basePxy,
+			cfg:       cfg,
+		}
 	default:
 		return pxy, fmt.Errorf("proxy type not support")
 	}
@@ -274,6 +279,33 @@ func (pxy *HttpsProxy) Close() {
 	pxy.BaseProxy.Close()
 }
 
+type StcpProxy struct {
+	BaseProxy
+	cfg *config.StcpProxyConf
+}
+
+func (pxy *StcpProxy) Run() error {
+	listener, err := pxy.ctl.svr.vistorManager.Listen(pxy.GetName(), pxy.cfg.Sk)
+	if err != nil {
+		return err
+	}
+	listener.AddLogPrefix(pxy.name)
+	pxy.listeners = append(pxy.listeners, listener)
+	pxy.Info("stcp proxy custom listen success")
+
+	pxy.startListenHandler(pxy, HandleUserTcpConnection)
+	return nil
+}
+
+func (pxy *StcpProxy) GetConf() config.ProxyConf {
+	return pxy.cfg
+}
+
+func (pxy *StcpProxy) Close() {
+	pxy.BaseProxy.Close()
+	pxy.ctl.svr.vistorManager.CloseListener(pxy.GetName())
+}
+
 type UdpProxy struct {
 	BaseProxy
 	cfg *config.UdpProxyConf

+ 26 - 4
server/service.go

@@ -55,12 +55,16 @@ type Service struct {
 
 	// Manage all proxies.
 	pxyManager *ProxyManager
+
+	// Manage all vistor listeners.
+	vistorManager *VistorManager
 }
 
 func NewService() (svr *Service, err error) {
 	svr = &Service{
-		ctlManager: NewControlManager(),
-		pxyManager: NewProxyManager(),
+		ctlManager:    NewControlManager(),
+		pxyManager:    NewProxyManager(),
+		vistorManager: NewVistorManager(),
 	}
 
 	// Init assets.
@@ -176,6 +180,20 @@ func (svr *Service) HandleListener(l frpNet.Listener) {
 					}
 				case *msg.NewWorkConn:
 					svr.RegisterWorkConn(conn, m)
+				case *msg.NewVistorConn:
+					if err = svr.RegisterVistorConn(conn, m); err != nil {
+						conn.Warn("%v", err)
+						msg.WriteMsg(conn, &msg.NewVistorConnResp{
+							ProxyName: m.ProxyName,
+							Error:     err.Error(),
+						})
+						conn.Close()
+					} else {
+						msg.WriteMsg(conn, &msg.NewVistorConnResp{
+							ProxyName: m.ProxyName,
+							Error:     "",
+						})
+					}
 				default:
 					log.Warn("Error message type for the new connection [%s]", conn.RemoteAddr().String())
 					conn.Close()
@@ -262,9 +280,13 @@ func (svr *Service) RegisterWorkConn(workConn frpNet.Conn, newMsg *msg.NewWorkCo
 	return
 }
 
+func (svr *Service) RegisterVistorConn(vistorConn frpNet.Conn, newMsg *msg.NewVistorConn) error {
+	return svr.vistorManager.NewConn(newMsg.ProxyName, vistorConn, newMsg.Timestamp, newMsg.SignKey,
+		newMsg.UseEncryption, newMsg.UseCompression)
+}
+
 func (svr *Service) RegisterProxy(name string, pxy Proxy) error {
-	err := svr.pxyManager.Add(name, pxy)
-	return err
+	return svr.pxyManager.Add(name, pxy)
 }
 
 func (svr *Service) DelProxy(name string) {

+ 5 - 0
utils/log/log.go

@@ -88,6 +88,7 @@ func Trace(format string, v ...interface{}) {
 // Logger
 type Logger interface {
 	AddLogPrefix(string)
+	GetPrefixStr() string
 	GetAllPrefix() []string
 	ClearLogPrefix()
 	Error(string, ...interface{})
@@ -119,6 +120,10 @@ func (pl *PrefixLogger) AddLogPrefix(prefix string) {
 	pl.allPrefix = append(pl.allPrefix, prefix)
 }
 
+func (pl *PrefixLogger) GetPrefixStr() string {
+	return pl.prefix
+}
+
 func (pl *PrefixLogger) GetAllPrefix() []string {
 	return pl.allPrefix
 }

+ 53 - 0
utils/net/listener.go

@@ -15,8 +15,11 @@
 package net
 
 import (
+	"fmt"
 	"net"
+	"sync"
 
+	"github.com/fatedier/frp/utils/errors"
 	"github.com/fatedier/frp/utils/log"
 )
 
@@ -44,3 +47,53 @@ func (logL *LogListener) Accept() (Conn, error) {
 	c, err := logL.l.Accept()
 	return WrapConn(c), err
 }
+
+// Custom listener
+type CustomListener struct {
+	conns  chan Conn
+	closed bool
+	mu     sync.Mutex
+
+	log.Logger
+}
+
+func NewCustomListener() *CustomListener {
+	return &CustomListener{
+		conns:  make(chan Conn, 64),
+		Logger: log.NewPrefixLogger(""),
+	}
+}
+
+func (l *CustomListener) Accept() (Conn, error) {
+	conn, ok := <-l.conns
+	if !ok {
+		return nil, fmt.Errorf("listener closed")
+	}
+	conn.AddLogPrefix(l.GetPrefixStr())
+	return conn, nil
+}
+
+func (l *CustomListener) PutConn(conn Conn) error {
+	err := errors.PanicToError(func() {
+		select {
+		case l.conns <- conn:
+		default:
+			conn.Close()
+		}
+	})
+	return err
+}
+
+func (l *CustomListener) Close() error {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	if !l.closed {
+		close(l.conns)
+		l.closed = true
+	}
+	return nil
+}
+
+func (l *CustomListener) Addr() net.Addr {
+	return (*net.TCPAddr)(nil)
+}