conn.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package conn
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. "github.com/fatedier/frp/pkg/utils/log"
  9. )
  10. type Listener struct {
  11. Addr net.Addr
  12. Conns chan *Conn
  13. }
  14. // wait util get one
  15. func (l *Listener) GetConn() (conn *Conn) {
  16. conn = <-l.Conns
  17. return conn
  18. }
  19. type Conn struct {
  20. TcpConn *net.TCPConn
  21. Reader *bufio.Reader
  22. }
  23. func (c *Conn) ConnectServer(host string, port int64) (err error) {
  24. servertAddr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%d", host, port))
  25. if err != nil {
  26. return err
  27. }
  28. conn, err := net.DialTCP("tcp", nil, servertAddr)
  29. if err != nil {
  30. return err
  31. }
  32. c.TcpConn = conn
  33. c.Reader = bufio.NewReader(c.TcpConn)
  34. return nil
  35. }
  36. func (c *Conn) GetRemoteAddr() (addr string) {
  37. return c.TcpConn.RemoteAddr().String()
  38. }
  39. func (c *Conn) GetLocalAddr() (addr string) {
  40. return c.TcpConn.LocalAddr().String()
  41. }
  42. func (c *Conn) ReadLine() (buff string, err error) {
  43. buff, err = c.Reader.ReadString('\n')
  44. return buff, err
  45. }
  46. func (c *Conn) Write(content string) (err error) {
  47. _, err = c.TcpConn.Write([]byte(content))
  48. return err
  49. }
  50. func (c *Conn) Close() {
  51. if c.TcpConn != nil {
  52. c.TcpConn.Close()
  53. }
  54. }
  55. func Listen(bindAddr string, bindPort int64) (l *Listener, err error) {
  56. tcpAddr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%d", bindAddr, bindPort))
  57. listener, err := net.ListenTCP("tcp", tcpAddr)
  58. if err != nil {
  59. return l, err
  60. }
  61. l = &Listener{
  62. Addr: listener.Addr(),
  63. Conns: make(chan *Conn),
  64. }
  65. go func() {
  66. for {
  67. conn, err := listener.AcceptTCP()
  68. if err != nil {
  69. log.Error("Accept new tcp connection error, %v", err)
  70. continue
  71. }
  72. c := &Conn{
  73. TcpConn: conn,
  74. }
  75. c.Reader = bufio.NewReader(c.TcpConn)
  76. l.Conns <- c
  77. }
  78. }()
  79. return l, err
  80. }
  81. // will block until conn close
  82. func Join(c1 *Conn, c2 *Conn) {
  83. var wait sync.WaitGroup
  84. pipe := func(to *Conn, from *Conn) {
  85. defer to.Close()
  86. defer from.Close()
  87. defer wait.Done()
  88. var err error
  89. _, err = io.Copy(to.TcpConn, from.TcpConn)
  90. if err != nil {
  91. log.Warn("join conns error, %v", err)
  92. }
  93. }
  94. wait.Add(2)
  95. go pipe(c1, c2)
  96. go pipe(c2, c1)
  97. wait.Wait()
  98. return
  99. }