session_test.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package yamux
  2. import (
  3. "io"
  4. "sync"
  5. "testing"
  6. "time"
  7. )
  8. type pipeConn struct {
  9. reader *io.PipeReader
  10. writer *io.PipeWriter
  11. }
  12. func (p *pipeConn) Read(b []byte) (int, error) {
  13. return p.reader.Read(b)
  14. }
  15. func (p *pipeConn) Write(b []byte) (int, error) {
  16. return p.writer.Write(b)
  17. }
  18. func (p *pipeConn) Close() error {
  19. p.reader.Close()
  20. return p.writer.Close()
  21. }
  22. func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) {
  23. read1, write1 := io.Pipe()
  24. read2, write2 := io.Pipe()
  25. return &pipeConn{read1, write2}, &pipeConn{read2, write1}
  26. }
  27. func TestPing(t *testing.T) {
  28. conn1, conn2 := testConn()
  29. client := Client(conn1, nil)
  30. defer client.Close()
  31. server := Server(conn2, nil)
  32. defer server.Close()
  33. rtt, err := client.Ping()
  34. if err != nil {
  35. t.Fatalf("err: %v", err)
  36. }
  37. if rtt == 0 {
  38. t.Fatalf("bad: %v", rtt)
  39. }
  40. rtt, err = server.Ping()
  41. if err != nil {
  42. t.Fatalf("err: %v", err)
  43. }
  44. if rtt == 0 {
  45. t.Fatalf("bad: %v", rtt)
  46. }
  47. }
  48. func TestAccept(t *testing.T) {
  49. conn1, conn2 := testConn()
  50. client := Client(conn1, nil)
  51. defer client.Close()
  52. server := Server(conn2, nil)
  53. defer server.Close()
  54. wg := &sync.WaitGroup{}
  55. wg.Add(4)
  56. go func() {
  57. defer wg.Done()
  58. stream, err := server.AcceptStream()
  59. if err != nil {
  60. t.Fatalf("err: %v", err)
  61. }
  62. if id := stream.StreamID(); id != 1 {
  63. t.Fatalf("bad: %v", id)
  64. }
  65. if err := stream.Close(); err != nil {
  66. t.Fatalf("err: %v", err)
  67. }
  68. }()
  69. go func() {
  70. defer wg.Done()
  71. stream, err := client.AcceptStream()
  72. if err != nil {
  73. t.Fatalf("err: %v", err)
  74. }
  75. if id := stream.StreamID(); id != 2 {
  76. t.Fatalf("bad: %v", id)
  77. }
  78. if err := stream.Close(); err != nil {
  79. t.Fatalf("err: %v", err)
  80. }
  81. }()
  82. go func() {
  83. defer wg.Done()
  84. stream, err := server.Open()
  85. if err != nil {
  86. t.Fatalf("err: %v", err)
  87. }
  88. if id := stream.StreamID(); id != 2 {
  89. t.Fatalf("bad: %v", id)
  90. }
  91. if err := stream.Close(); err != nil {
  92. t.Fatalf("err: %v", err)
  93. }
  94. }()
  95. go func() {
  96. defer wg.Done()
  97. stream, err := client.Open()
  98. if err != nil {
  99. t.Fatalf("err: %v", err)
  100. }
  101. if id := stream.StreamID(); id != 1 {
  102. t.Fatalf("bad: %v", id)
  103. }
  104. if err := stream.Close(); err != nil {
  105. t.Fatalf("err: %v", err)
  106. }
  107. }()
  108. doneCh := make(chan struct{})
  109. go func() {
  110. wg.Wait()
  111. close(doneCh)
  112. }()
  113. select {
  114. case <-doneCh:
  115. case <-time.After(time.Second):
  116. panic("timeout")
  117. }
  118. }
  119. func TestSendData_Small(t *testing.T) {
  120. conn1, conn2 := testConn()
  121. client := Client(conn1, nil)
  122. defer client.Close()
  123. server := Server(conn2, nil)
  124. defer server.Close()
  125. wg := &sync.WaitGroup{}
  126. wg.Add(2)
  127. go func() {
  128. defer wg.Done()
  129. stream, err := server.AcceptStream()
  130. if err != nil {
  131. t.Fatalf("err: %v", err)
  132. }
  133. buf := make([]byte, 4)
  134. for i := 0; i < 1000; i++ {
  135. n, err := stream.Read(buf)
  136. if err != nil {
  137. t.Fatalf("err: %v", err)
  138. }
  139. if n != 4 {
  140. t.Fatalf("short read: %d", n)
  141. }
  142. if string(buf) != "test" {
  143. t.Fatalf("bad: %s", buf)
  144. }
  145. }
  146. if err := stream.Close(); err != nil {
  147. t.Fatalf("err: %v", err)
  148. }
  149. }()
  150. go func() {
  151. defer wg.Done()
  152. stream, err := client.Open()
  153. if err != nil {
  154. t.Fatalf("err: %v", err)
  155. }
  156. for i := 0; i < 1000; i++ {
  157. n, err := stream.Write([]byte("test"))
  158. if err != nil {
  159. t.Fatalf("err: %v", err)
  160. }
  161. if n != 4 {
  162. t.Fatalf("short write %d", n)
  163. }
  164. }
  165. if err := stream.Close(); err != nil {
  166. t.Fatalf("err: %v", err)
  167. }
  168. }()
  169. doneCh := make(chan struct{})
  170. go func() {
  171. wg.Wait()
  172. close(doneCh)
  173. }()
  174. select {
  175. case <-doneCh:
  176. case <-time.After(time.Second):
  177. panic("timeout")
  178. }
  179. }
  180. func TestSendData_Large(t *testing.T) {
  181. conn1, conn2 := testConn()
  182. client := Client(conn1, nil)
  183. defer client.Close()
  184. server := Server(conn2, nil)
  185. defer server.Close()
  186. data := make([]byte, 512*1024)
  187. for idx := range data {
  188. data[idx] = byte(idx % 256)
  189. }
  190. wg := &sync.WaitGroup{}
  191. wg.Add(2)
  192. go func() {
  193. defer wg.Done()
  194. stream, err := server.AcceptStream()
  195. if err != nil {
  196. t.Fatalf("err: %v", err)
  197. }
  198. buf := make([]byte, 4*1024)
  199. for i := 0; i < 128; i++ {
  200. n, err := stream.Read(buf)
  201. if err != nil {
  202. t.Fatalf("err: %v", err)
  203. }
  204. if n != 4*1024 {
  205. t.Fatalf("short read: %d", n)
  206. }
  207. for idx := range buf {
  208. if buf[idx] != byte(idx%256) {
  209. t.Fatalf("bad: %v %v %v", i, idx, buf[idx])
  210. }
  211. }
  212. }
  213. if err := stream.Close(); err != nil {
  214. t.Fatalf("err: %v", err)
  215. }
  216. }()
  217. go func() {
  218. defer wg.Done()
  219. stream, err := client.Open()
  220. if err != nil {
  221. t.Fatalf("err: %v", err)
  222. }
  223. n, err := stream.Write(data)
  224. if err != nil {
  225. t.Fatalf("err: %v", err)
  226. }
  227. if n != len(data) {
  228. t.Fatalf("short write %d", n)
  229. }
  230. if err := stream.Close(); err != nil {
  231. t.Fatalf("err: %v", err)
  232. }
  233. }()
  234. doneCh := make(chan struct{})
  235. go func() {
  236. wg.Wait()
  237. close(doneCh)
  238. }()
  239. select {
  240. case <-doneCh:
  241. case <-time.After(time.Second):
  242. panic("timeout")
  243. }
  244. }