server.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package streamserver
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. libnet "github.com/fatedier/frp/pkg/util/net"
  8. "github.com/fatedier/frp/test/e2e/pkg/rpc"
  9. )
  10. type Type string
  11. const (
  12. TCP Type = "tcp"
  13. UDP Type = "udp"
  14. Unix Type = "unix"
  15. )
  16. type Server struct {
  17. netType Type
  18. bindAddr string
  19. bindPort int
  20. respContent []byte
  21. handler func(net.Conn)
  22. l net.Listener
  23. }
  24. type Option func(*Server) *Server
  25. func New(netType Type, options ...Option) *Server {
  26. s := &Server{
  27. netType: netType,
  28. bindAddr: "127.0.0.1",
  29. }
  30. s.handler = s.handle
  31. for _, option := range options {
  32. s = option(s)
  33. }
  34. return s
  35. }
  36. func WithBindAddr(addr string) Option {
  37. return func(s *Server) *Server {
  38. s.bindAddr = addr
  39. return s
  40. }
  41. }
  42. func WithBindPort(port int) Option {
  43. return func(s *Server) *Server {
  44. s.bindPort = port
  45. return s
  46. }
  47. }
  48. func WithRespContent(content []byte) Option {
  49. return func(s *Server) *Server {
  50. s.respContent = content
  51. return s
  52. }
  53. }
  54. func WithCustomHandler(handler func(net.Conn)) Option {
  55. return func(s *Server) *Server {
  56. s.handler = handler
  57. return s
  58. }
  59. }
  60. func (s *Server) Run() error {
  61. if err := s.initListener(); err != nil {
  62. return err
  63. }
  64. go func() {
  65. for {
  66. c, err := s.l.Accept()
  67. if err != nil {
  68. return
  69. }
  70. go s.handler(c)
  71. }
  72. }()
  73. return nil
  74. }
  75. func (s *Server) Close() error {
  76. if s.l != nil {
  77. return s.l.Close()
  78. }
  79. return nil
  80. }
  81. func (s *Server) initListener() (err error) {
  82. switch s.netType {
  83. case TCP:
  84. s.l, err = net.Listen("tcp", fmt.Sprintf("%s:%d", s.bindAddr, s.bindPort))
  85. case UDP:
  86. s.l, err = libnet.ListenUDP(s.bindAddr, s.bindPort)
  87. case Unix:
  88. s.l, err = net.Listen("unix", s.bindAddr)
  89. default:
  90. return fmt.Errorf("unknown server type: %s", s.netType)
  91. }
  92. return err
  93. }
  94. func (s *Server) handle(c net.Conn) {
  95. defer c.Close()
  96. var reader io.Reader = c
  97. if s.netType == UDP {
  98. reader = bufio.NewReader(c)
  99. }
  100. for {
  101. buf, err := rpc.ReadBytes(reader)
  102. if err != nil {
  103. return
  104. }
  105. if len(s.respContent) > 0 {
  106. buf = s.respContent
  107. }
  108. rpc.WriteBytes(c, buf)
  109. }
  110. }
  111. func (s *Server) BindAddr() string {
  112. return s.bindAddr
  113. }
  114. func (s *Server) BindPort() int {
  115. return s.bindPort
  116. }