conn.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package conn
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. "github.com/fatedier/frp/pkg/utils/log"
  9. )
  10. type Listener struct {
  11. Addr net.Addr
  12. Conns chan *Conn
  13. }
  14. // wait util get one
  15. func (l *Listener) GetConn() (conn *Conn) {
  16. conn = <-l.Conns
  17. return conn
  18. }
  19. type Conn struct {
  20. TcpConn *net.TCPConn
  21. Reader *bufio.Reader
  22. }
  23. func (c *Conn) ConnectServer(host string, port int64) (err error) {
  24. servertAddr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%d", host, port))
  25. if err != nil {
  26. return err
  27. }
  28. conn, err := net.DialTCP("tcp", nil, servertAddr)
  29. if err != nil {
  30. return err
  31. }
  32. c.TcpConn = conn
  33. c.Reader = bufio.NewReader(c.TcpConn)
  34. return nil
  35. }
  36. func (c *Conn) GetRemoteAddr() (addr string) {
  37. return c.TcpConn.RemoteAddr().String()
  38. }
  39. func (c *Conn) GetLocalAddr() (addr string) {
  40. return c.TcpConn.LocalAddr().String()
  41. }
  42. func (c *Conn) ReadLine() (buff string, err error) {
  43. buff, err = c.Reader.ReadString('\n')
  44. return buff, err
  45. }
  46. func (c *Conn) Write(content string) (err error) {
  47. _, err = c.TcpConn.Write([]byte(content))
  48. return err
  49. }
  50. func (c *Conn) Close() {
  51. c.TcpConn.Close()
  52. }
  53. func Listen(bindAddr string, bindPort int64) (l *Listener, err error) {
  54. tcpAddr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%d", bindAddr, bindPort))
  55. listener, err := net.ListenTCP("tcp", tcpAddr)
  56. if err != nil {
  57. return l, err
  58. }
  59. l = &Listener{
  60. Addr: listener.Addr(),
  61. Conns: make(chan *Conn),
  62. }
  63. go func() {
  64. for {
  65. conn, err := listener.AcceptTCP()
  66. if err != nil {
  67. log.Error("Accept new tcp connection error, %v", err)
  68. continue
  69. }
  70. c := &Conn{
  71. TcpConn: conn,
  72. }
  73. c.Reader = bufio.NewReader(c.TcpConn)
  74. l.Conns <- c
  75. }
  76. }()
  77. return l, err
  78. }
  79. // will block until conn close
  80. func Join(c1 *Conn, c2 *Conn) {
  81. var wait sync.WaitGroup
  82. pipe := func(to *Conn, from *Conn) {
  83. defer to.Close()
  84. defer from.Close()
  85. defer wait.Done()
  86. var err error
  87. _, err = io.Copy(to.TcpConn, from.TcpConn)
  88. if err != nil {
  89. log.Warn("join conns error, %v", err)
  90. }
  91. }
  92. wait.Add(2)
  93. go pipe(c1, c2)
  94. go pipe(c2, c1)
  95. wait.Wait()
  96. return
  97. }