session_test.go 22 KB


  1. package yamux
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "reflect"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "testing"
  13. "time"
  14. )
  15. type logCapture struct{ bytes.Buffer }
  16. func (l *logCapture) logs() []string {
  17. return strings.Split(strings.TrimSpace(l.String()), "\n")
  18. }
  19. func (l *logCapture) match(expect []string) bool {
  20. return reflect.DeepEqual(l.logs(), expect)
  21. }
  22. func captureLogs(s *Session) *logCapture {
  23. buf := new(logCapture)
  24. s.logger = log.New(buf, "", 0)
  25. return buf
  26. }
  27. type pipeConn struct {
  28. reader *io.PipeReader
  29. writer *io.PipeWriter
  30. writeBlocker sync.Mutex
  31. }
  32. func (p *pipeConn) Read(b []byte) (int, error) {
  33. return p.reader.Read(b)
  34. }
  35. func (p *pipeConn) Write(b []byte) (int, error) {
  36. p.writeBlocker.Lock()
  37. defer p.writeBlocker.Unlock()
  38. return p.writer.Write(b)
  39. }
  40. func (p *pipeConn) Close() error {
  41. p.reader.Close()
  42. return p.writer.Close()
  43. }
  44. func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) {
  45. read1, write1 := io.Pipe()
  46. read2, write2 := io.Pipe()
  47. conn1 := &pipeConn{reader: read1, writer: write2}
  48. conn2 := &pipeConn{reader: read2, writer: write1}
  49. return conn1, conn2
  50. }
  51. func testConf() *Config {
  52. conf := DefaultConfig()
  53. conf.AcceptBacklog = 64
  54. conf.KeepAliveInterval = 100 * time.Millisecond
  55. conf.ConnectionWriteTimeout = 250 * time.Millisecond
  56. return conf
  57. }
  58. func testConfNoKeepAlive() *Config {
  59. conf := testConf()
  60. conf.EnableKeepAlive = false
  61. return conf
  62. }
  63. func testClientServer() (*Session, *Session) {
  64. return testClientServerConfig(testConf())
  65. }
  66. func testClientServerConfig(conf *Config) (*Session, *Session) {
  67. conn1, conn2 := testConn()
  68. client, _ := Client(conn1, conf)
  69. server, _ := Server(conn2, conf)
  70. return client, server
  71. }
  72. func TestPing(t *testing.T) {
  73. client, server := testClientServer()
  74. defer client.Close()
  75. defer server.Close()
  76. rtt, err := client.Ping()
  77. if err != nil {
  78. t.Fatalf("err: %v", err)
  79. }
  80. if rtt == 0 {
  81. t.Fatalf("bad: %v", rtt)
  82. }
  83. rtt, err = server.Ping()
  84. if err != nil {
  85. t.Fatalf("err: %v", err)
  86. }
  87. if rtt == 0 {
  88. t.Fatalf("bad: %v", rtt)
  89. }
  90. }
  91. func TestPing_Timeout(t *testing.T) {
  92. client, server := testClientServerConfig(testConfNoKeepAlive())
  93. defer client.Close()
  94. defer server.Close()
  95. // Prevent the client from responding
  96. clientConn := client.conn.(*pipeConn)
  97. clientConn.writeBlocker.Lock()
  98. errCh := make(chan error, 1)
  99. go func() {
  100. _, err := server.Ping() // Ping via the server session
  101. errCh <- err
  102. }()
  103. select {
  104. case err := <-errCh:
  105. if err != ErrTimeout {
  106. t.Fatalf("err: %v", err)
  107. }
  108. case <-time.After(client.config.ConnectionWriteTimeout * 2):
  109. t.Fatalf("failed to timeout within expected %v", client.config.ConnectionWriteTimeout)
  110. }
  111. // Verify that we recover, even if we gave up
  112. clientConn.writeBlocker.Unlock()
  113. go func() {
  114. _, err := server.Ping() // Ping via the server session
  115. errCh <- err
  116. }()
  117. select {
  118. case err := <-errCh:
  119. if err != nil {
  120. t.Fatalf("err: %v", err)
  121. }
  122. case <-time.After(client.config.ConnectionWriteTimeout):
  123. t.Fatalf("timeout")
  124. }
  125. }
  126. func TestAccept(t *testing.T) {
  127. client, server := testClientServer()
  128. defer client.Close()
  129. defer server.Close()
  130. if client.NumStreams() != 0 {
  131. t.Fatalf("bad")
  132. }
  133. if server.NumStreams() != 0 {
  134. t.Fatalf("bad")
  135. }
  136. wg := &sync.WaitGroup{}
  137. wg.Add(4)
  138. go func() {
  139. defer wg.Done()
  140. stream, err := server.AcceptStream()
  141. if err != nil {
  142. t.Fatalf("err: %v", err)
  143. }
  144. if id := stream.StreamID(); id != 1 {
  145. t.Fatalf("bad: %v", id)
  146. }
  147. if err := stream.Close(); err != nil {
  148. t.Fatalf("err: %v", err)
  149. }
  150. }()
  151. go func() {
  152. defer wg.Done()
  153. stream, err := client.AcceptStream()
  154. if err != nil {
  155. t.Fatalf("err: %v", err)
  156. }
  157. if id := stream.StreamID(); id != 2 {
  158. t.Fatalf("bad: %v", id)
  159. }
  160. if err := stream.Close(); err != nil {
  161. t.Fatalf("err: %v", err)
  162. }
  163. }()
  164. go func() {
  165. defer wg.Done()
  166. stream, err := server.OpenStream()
  167. if err != nil {
  168. t.Fatalf("err: %v", err)
  169. }
  170. if id := stream.StreamID(); id != 2 {
  171. t.Fatalf("bad: %v", id)
  172. }
  173. if err := stream.Close(); err != nil {
  174. t.Fatalf("err: %v", err)
  175. }
  176. }()
  177. go func() {
  178. defer wg.Done()
  179. stream, err := client.OpenStream()
  180. if err != nil {
  181. t.Fatalf("err: %v", err)
  182. }
  183. if id := stream.StreamID(); id != 1 {
  184. t.Fatalf("bad: %v", id)
  185. }
  186. if err := stream.Close(); err != nil {
  187. t.Fatalf("err: %v", err)
  188. }
  189. }()
  190. doneCh := make(chan struct{})
  191. go func() {
  192. wg.Wait()
  193. close(doneCh)
  194. }()
  195. select {
  196. case <-doneCh:
  197. case <-time.After(time.Second):
  198. panic("timeout")
  199. }
  200. }
  201. func TestNonNilInterface(t *testing.T) {
  202. _, server := testClientServer()
  203. server.Close()
  204. conn, err := server.Accept()
  205. if err != nil && conn != nil {
  206. t.Error("bad: accept should return a connection of nil value")
  207. }
  208. conn, err = server.Open()
  209. if err != nil && conn != nil {
  210. t.Error("bad: open should return a connection of nil value")
  211. }
  212. }
  213. func TestSendData_Small(t *testing.T) {
  214. client, server := testClientServer()
  215. defer client.Close()
  216. defer server.Close()
  217. wg := &sync.WaitGroup{}
  218. wg.Add(2)
  219. go func() {
  220. defer wg.Done()
  221. stream, err := server.AcceptStream()
  222. if err != nil {
  223. t.Fatalf("err: %v", err)
  224. }
  225. if server.NumStreams() != 1 {
  226. t.Fatalf("bad")
  227. }
  228. buf := make([]byte, 4)
  229. for i := 0; i < 1000; i++ {
  230. n, err := stream.Read(buf)
  231. if err != nil {
  232. t.Fatalf("err: %v", err)
  233. }
  234. if n != 4 {
  235. t.Fatalf("short read: %d", n)
  236. }
  237. if string(buf) != "test" {
  238. t.Fatalf("bad: %s", buf)
  239. }
  240. }
  241. if err := stream.Close(); err != nil {
  242. t.Fatalf("err: %v", err)
  243. }
  244. }()
  245. go func() {
  246. defer wg.Done()
  247. stream, err := client.Open()
  248. if err != nil {
  249. t.Fatalf("err: %v", err)
  250. }
  251. if client.NumStreams() != 1 {
  252. t.Fatalf("bad")
  253. }
  254. for i := 0; i < 1000; i++ {
  255. n, err := stream.Write([]byte("test"))
  256. if err != nil {
  257. t.Fatalf("err: %v", err)
  258. }
  259. if n != 4 {
  260. t.Fatalf("short write %d", n)
  261. }
  262. }
  263. if err := stream.Close(); err != nil {
  264. t.Fatalf("err: %v", err)
  265. }
  266. }()
  267. doneCh := make(chan struct{})
  268. go func() {
  269. wg.Wait()
  270. close(doneCh)
  271. }()
  272. select {
  273. case <-doneCh:
  274. case <-time.After(time.Second):
  275. panic("timeout")
  276. }
  277. if client.NumStreams() != 0 {
  278. t.Fatalf("bad")
  279. }
  280. if server.NumStreams() != 0 {
  281. t.Fatalf("bad")
  282. }
  283. }
  284. func TestSendData_Large(t *testing.T) {
  285. client, server := testClientServer()
  286. defer client.Close()
  287. defer server.Close()
  288. data := make([]byte, 512*1024)
  289. for idx := range data {
  290. data[idx] = byte(idx % 256)
  291. }
  292. wg := &sync.WaitGroup{}
  293. wg.Add(2)
  294. go func() {
  295. defer wg.Done()
  296. stream, err := server.AcceptStream()
  297. if err != nil {
  298. t.Fatalf("err: %v", err)
  299. }
  300. buf := make([]byte, 4*1024)
  301. for i := 0; i < 128; i++ {
  302. n, err := stream.Read(buf)
  303. if err != nil {
  304. t.Fatalf("err: %v", err)
  305. }
  306. if n != 4*1024 {
  307. t.Fatalf("short read: %d", n)
  308. }
  309. for idx := range buf {
  310. if buf[idx] != byte(idx%256) {
  311. t.Fatalf("bad: %v %v %v", i, idx, buf[idx])
  312. }
  313. }
  314. }
  315. if err := stream.Close(); err != nil {
  316. t.Fatalf("err: %v", err)
  317. }
  318. }()
  319. go func() {
  320. defer wg.Done()
  321. stream, err := client.Open()
  322. if err != nil {
  323. t.Fatalf("err: %v", err)
  324. }
  325. n, err := stream.Write(data)
  326. if err != nil {
  327. t.Fatalf("err: %v", err)
  328. }
  329. if n != len(data) {
  330. t.Fatalf("short write %d", n)
  331. }
  332. if err := stream.Close(); err != nil {
  333. t.Fatalf("err: %v", err)
  334. }
  335. }()
  336. doneCh := make(chan struct{})
  337. go func() {
  338. wg.Wait()
  339. close(doneCh)
  340. }()
  341. select {
  342. case <-doneCh:
  343. case <-time.After(time.Second):
  344. panic("timeout")
  345. }
  346. }
  347. func TestGoAway(t *testing.T) {
  348. client, server := testClientServer()
  349. defer client.Close()
  350. defer server.Close()
  351. if err := server.GoAway(); err != nil {
  352. t.Fatalf("err: %v", err)
  353. }
  354. _, err := client.Open()
  355. if err != ErrRemoteGoAway {
  356. t.Fatalf("err: %v", err)
  357. }
  358. }
  359. func TestManyStreams(t *testing.T) {
  360. client, server := testClientServer()
  361. defer client.Close()
  362. defer server.Close()
  363. wg := &sync.WaitGroup{}
  364. acceptor := func(i int) {
  365. defer wg.Done()
  366. stream, err := server.AcceptStream()
  367. if err != nil {
  368. t.Fatalf("err: %v", err)
  369. }
  370. defer stream.Close()
  371. buf := make([]byte, 512)
  372. for {
  373. n, err := stream.Read(buf)
  374. if err == io.EOF {
  375. return
  376. }
  377. if err != nil {
  378. t.Fatalf("err: %v", err)
  379. }
  380. if n == 0 {
  381. t.Fatalf("err: %v", err)
  382. }
  383. }
  384. }
  385. sender := func(i int) {
  386. defer wg.Done()
  387. stream, err := client.Open()
  388. if err != nil {
  389. t.Fatalf("err: %v", err)
  390. }
  391. defer stream.Close()
  392. msg := fmt.Sprintf("%08d", i)
  393. for i := 0; i < 1000; i++ {
  394. n, err := stream.Write([]byte(msg))
  395. if err != nil {
  396. t.Fatalf("err: %v", err)
  397. }
  398. if n != len(msg) {
  399. t.Fatalf("short write %d", n)
  400. }
  401. }
  402. }
  403. for i := 0; i < 50; i++ {
  404. wg.Add(2)
  405. go acceptor(i)
  406. go sender(i)
  407. }
  408. wg.Wait()
  409. }
  410. func TestManyStreams_PingPong(t *testing.T) {
  411. client, server := testClientServer()
  412. defer client.Close()
  413. defer server.Close()
  414. wg := &sync.WaitGroup{}
  415. ping := []byte("ping")
  416. pong := []byte("pong")
  417. acceptor := func(i int) {
  418. defer wg.Done()
  419. stream, err := server.AcceptStream()
  420. if err != nil {
  421. t.Fatalf("err: %v", err)
  422. }
  423. defer stream.Close()
  424. buf := make([]byte, 4)
  425. for {
  426. n, err := stream.Read(buf)
  427. if err == io.EOF {
  428. return
  429. }
  430. if err != nil {
  431. t.Fatalf("err: %v", err)
  432. }
  433. if n != 4 {
  434. t.Fatalf("err: %v", err)
  435. }
  436. if !bytes.Equal(buf, ping) {
  437. t.Fatalf("bad: %s", buf)
  438. }
  439. n, err = stream.Write(pong)
  440. if err != nil {
  441. t.Fatalf("err: %v", err)
  442. }
  443. if n != 4 {
  444. t.Fatalf("err: %v", err)
  445. }
  446. }
  447. }
  448. sender := func(i int) {
  449. defer wg.Done()
  450. stream, err := client.Open()
  451. if err != nil {
  452. t.Fatalf("err: %v", err)
  453. }
  454. defer stream.Close()
  455. buf := make([]byte, 4)
  456. for i := 0; i < 1000; i++ {
  457. n, err := stream.Write(ping)
  458. if err != nil {
  459. t.Fatalf("err: %v", err)
  460. }
  461. if n != 4 {
  462. t.Fatalf("short write %d", n)
  463. }
  464. n, err = stream.Read(buf)
  465. if err != nil {
  466. t.Fatalf("err: %v", err)
  467. }
  468. if n != 4 {
  469. t.Fatalf("err: %v", err)
  470. }
  471. if !bytes.Equal(buf, pong) {
  472. t.Fatalf("bad: %s", buf)
  473. }
  474. }
  475. }
  476. for i := 0; i < 50; i++ {
  477. wg.Add(2)
  478. go acceptor(i)
  479. go sender(i)
  480. }
  481. wg.Wait()
  482. }
  483. func TestHalfClose(t *testing.T) {
  484. client, server := testClientServer()
  485. defer client.Close()
  486. defer server.Close()
  487. stream, err := client.Open()
  488. if err != nil {
  489. t.Fatalf("err: %v", err)
  490. }
  491. if _, err := stream.Write([]byte("a")); err != nil {
  492. t.Fatalf("err: %v", err)
  493. }
  494. stream2, err := server.Accept()
  495. if err != nil {
  496. t.Fatalf("err: %v", err)
  497. }
  498. stream2.Close() // Half close
  499. buf := make([]byte, 4)
  500. n, err := stream2.Read(buf)
  501. if err != nil {
  502. t.Fatalf("err: %v", err)
  503. }
  504. if n != 1 {
  505. t.Fatalf("bad: %v", n)
  506. }
  507. // Send more
  508. if _, err := stream.Write([]byte("bcd")); err != nil {
  509. t.Fatalf("err: %v", err)
  510. }
  511. stream.Close()
  512. // Read after close
  513. n, err = stream2.Read(buf)
  514. if err != nil {
  515. t.Fatalf("err: %v", err)
  516. }
  517. if n != 3 {
  518. t.Fatalf("bad: %v", n)
  519. }
  520. // EOF after close
  521. n, err = stream2.Read(buf)
  522. if err != io.EOF {
  523. t.Fatalf("err: %v", err)
  524. }
  525. if n != 0 {
  526. t.Fatalf("bad: %v", n)
  527. }
  528. }
  529. func TestReadDeadline(t *testing.T) {
  530. client, server := testClientServer()
  531. defer client.Close()
  532. defer server.Close()
  533. stream, err := client.Open()
  534. if err != nil {
  535. t.Fatalf("err: %v", err)
  536. }
  537. defer stream.Close()
  538. stream2, err := server.Accept()
  539. if err != nil {
  540. t.Fatalf("err: %v", err)
  541. }
  542. defer stream2.Close()
  543. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  544. t.Fatalf("err: %v", err)
  545. }
  546. buf := make([]byte, 4)
  547. if _, err := stream.Read(buf); err != ErrTimeout {
  548. t.Fatalf("err: %v", err)
  549. }
  550. }
  551. func TestWriteDeadline(t *testing.T) {
  552. client, server := testClientServer()
  553. defer client.Close()
  554. defer server.Close()
  555. stream, err := client.Open()
  556. if err != nil {
  557. t.Fatalf("err: %v", err)
  558. }
  559. defer stream.Close()
  560. stream2, err := server.Accept()
  561. if err != nil {
  562. t.Fatalf("err: %v", err)
  563. }
  564. defer stream2.Close()
  565. if err := stream.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)); err != nil {
  566. t.Fatalf("err: %v", err)
  567. }
  568. buf := make([]byte, 512)
  569. for i := 0; i < int(initialStreamWindow); i++ {
  570. _, err := stream.Write(buf)
  571. if err != nil && err == ErrTimeout {
  572. return
  573. } else if err != nil {
  574. t.Fatalf("err: %v", err)
  575. }
  576. }
  577. t.Fatalf("Expected timeout")
  578. }
  579. func TestBacklogExceeded(t *testing.T) {
  580. client, server := testClientServer()
  581. defer client.Close()
  582. defer server.Close()
  583. // Fill the backlog
  584. max := client.config.AcceptBacklog
  585. for i := 0; i < max; i++ {
  586. stream, err := client.Open()
  587. if err != nil {
  588. t.Fatalf("err: %v", err)
  589. }
  590. defer stream.Close()
  591. if _, err := stream.Write([]byte("foo")); err != nil {
  592. t.Fatalf("err: %v", err)
  593. }
  594. }
  595. // Attempt to open a new stream
  596. errCh := make(chan error, 1)
  597. go func() {
  598. _, err := client.Open()
  599. errCh <- err
  600. }()
  601. // Shutdown the server
  602. go func() {
  603. time.Sleep(10 * time.Millisecond)
  604. server.Close()
  605. }()
  606. select {
  607. case err := <-errCh:
  608. if err == nil {
  609. t.Fatalf("open should fail")
  610. }
  611. case <-time.After(time.Second):
  612. t.Fatalf("timeout")
  613. }
  614. }
  615. func TestKeepAlive(t *testing.T) {
  616. client, server := testClientServer()
  617. defer client.Close()
  618. defer server.Close()
  619. time.Sleep(200 * time.Millisecond)
  620. // Ping value should increase
  621. client.pingLock.Lock()
  622. defer client.pingLock.Unlock()
  623. if client.pingID == 0 {
  624. t.Fatalf("should ping")
  625. }
  626. server.pingLock.Lock()
  627. defer server.pingLock.Unlock()
  628. if server.pingID == 0 {
  629. t.Fatalf("should ping")
  630. }
  631. }
  632. func TestKeepAlive_Timeout(t *testing.T) {
  633. conn1, conn2 := testConn()
  634. clientConf := testConf()
  635. clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes
  636. clientConf.EnableKeepAlive = false // Just test one direction, so it's deterministic who hangs up on whom
  637. client, _ := Client(conn1, clientConf)
  638. defer client.Close()
  639. server, _ := Server(conn2, testConf())
  640. defer server.Close()
  641. clientLogs := captureLogs(client)
  642. serverLogs := captureLogs(server)
  643. errCh := make(chan error, 1)
  644. go func() {
  645. _, err := server.Accept() // Wait until server closes
  646. errCh <- err
  647. }()
  648. // Prevent the client from responding
  649. clientConn := client.conn.(*pipeConn)
  650. clientConn.writeBlocker.Lock()
  651. select {
  652. case err := <-errCh:
  653. if err != ErrKeepAliveTimeout {
  654. t.Fatalf("unexpected error: %v", err)
  655. }
  656. case <-time.After(1 * time.Second):
  657. t.Fatalf("timeout waiting for timeout")
  658. }
  659. if !client.IsClosed() {
  660. t.Fatalf("client should have closed")
  661. }
  662. if !server.IsClosed() {
  663. t.Fatalf("server should have closed")
  664. }
  665. if clientLogs.Len() != 0 {
  666. t.Fatalf("client log incorect: %v", clientLogs.logs())
  667. }
  668. if !serverLogs.match([]string{"[ERR] yamux: keepalive failed: i/o deadline reached"}) {
  669. t.Fatalf("server log incorect: %v", serverLogs.logs())
  670. }
  671. }
  672. func TestLargeWindow(t *testing.T) {
  673. conf := DefaultConfig()
  674. conf.MaxStreamWindowSize *= 2
  675. client, server := testClientServerConfig(conf)
  676. defer client.Close()
  677. defer server.Close()
  678. stream, err := client.Open()
  679. if err != nil {
  680. t.Fatalf("err: %v", err)
  681. }
  682. defer stream.Close()
  683. stream2, err := server.Accept()
  684. if err != nil {
  685. t.Fatalf("err: %v", err)
  686. }
  687. defer stream2.Close()
  688. stream.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
  689. buf := make([]byte, conf.MaxStreamWindowSize)
  690. n, err := stream.Write(buf)
  691. if err != nil {
  692. t.Fatalf("err: %v", err)
  693. }
  694. if n != len(buf) {
  695. t.Fatalf("short write: %d", n)
  696. }
  697. }
  698. type UnlimitedReader struct{}
  699. func (u *UnlimitedReader) Read(p []byte) (int, error) {
  700. runtime.Gosched()
  701. return len(p), nil
  702. }
  703. func TestSendData_VeryLarge(t *testing.T) {
  704. client, server := testClientServer()
  705. defer client.Close()
  706. defer server.Close()
  707. var n int64 = 1 * 1024 * 1024 * 1024
  708. var workers int = 16
  709. wg := &sync.WaitGroup{}
  710. wg.Add(workers * 2)
  711. for i := 0; i < workers; i++ {
  712. go func() {
  713. defer wg.Done()
  714. stream, err := server.AcceptStream()
  715. if err != nil {
  716. t.Fatalf("err: %v", err)
  717. }
  718. defer stream.Close()
  719. buf := make([]byte, 4)
  720. _, err = stream.Read(buf)
  721. if err != nil {
  722. t.Fatalf("err: %v", err)
  723. }
  724. if !bytes.Equal(buf, []byte{0, 1, 2, 3}) {
  725. t.Fatalf("bad header")
  726. }
  727. recv, err := io.Copy(ioutil.Discard, stream)
  728. if err != nil {
  729. t.Fatalf("err: %v", err)
  730. }
  731. if recv != n {
  732. t.Fatalf("bad: %v", recv)
  733. }
  734. }()
  735. }
  736. for i := 0; i < workers; i++ {
  737. go func() {
  738. defer wg.Done()
  739. stream, err := client.Open()
  740. if err != nil {
  741. t.Fatalf("err: %v", err)
  742. }
  743. defer stream.Close()
  744. _, err = stream.Write([]byte{0, 1, 2, 3})
  745. if err != nil {
  746. t.Fatalf("err: %v", err)
  747. }
  748. unlimited := &UnlimitedReader{}
  749. sent, err := io.Copy(stream, io.LimitReader(unlimited, n))
  750. if err != nil {
  751. t.Fatalf("err: %v", err)
  752. }
  753. if sent != n {
  754. t.Fatalf("bad: %v", sent)
  755. }
  756. }()
  757. }
  758. doneCh := make(chan struct{})
  759. go func() {
  760. wg.Wait()
  761. close(doneCh)
  762. }()
  763. select {
  764. case <-doneCh:
  765. case <-time.After(20 * time.Second):
  766. panic("timeout")
  767. }
  768. }
  769. func TestBacklogExceeded_Accept(t *testing.T) {
  770. client, server := testClientServer()
  771. defer client.Close()
  772. defer server.Close()
  773. max := 5 * client.config.AcceptBacklog
  774. go func() {
  775. for i := 0; i < max; i++ {
  776. stream, err := server.Accept()
  777. if err != nil {
  778. t.Fatalf("err: %v", err)
  779. }
  780. defer stream.Close()
  781. }
  782. }()
  783. // Fill the backlog
  784. for i := 0; i < max; i++ {
  785. stream, err := client.Open()
  786. if err != nil {
  787. t.Fatalf("err: %v", err)
  788. }
  789. defer stream.Close()
  790. if _, err := stream.Write([]byte("foo")); err != nil {
  791. t.Fatalf("err: %v", err)
  792. }
  793. }
  794. }
  795. func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
  796. client, server := testClientServerConfig(testConfNoKeepAlive())
  797. defer client.Close()
  798. defer server.Close()
  799. var wg sync.WaitGroup
  800. wg.Add(2)
  801. // Choose a huge flood size that we know will result in a window update.
  802. flood := int64(client.config.MaxStreamWindowSize) - 1
  803. // The server will accept a new stream and then flood data to it.
  804. go func() {
  805. defer wg.Done()
  806. stream, err := server.AcceptStream()
  807. if err != nil {
  808. t.Fatalf("err: %v", err)
  809. }
  810. defer stream.Close()
  811. n, err := stream.Write(make([]byte, flood))
  812. if err != nil {
  813. t.Fatalf("err: %v", err)
  814. }
  815. if int64(n) != flood {
  816. t.Fatalf("short write: %d", n)
  817. }
  818. }()
  819. // The client will open a stream, block outbound writes, and then
  820. // listen to the flood from the server, which should time out since
  821. // it won't be able to send the window update.
  822. go func() {
  823. defer wg.Done()
  824. stream, err := client.OpenStream()
  825. if err != nil {
  826. t.Fatalf("err: %v", err)
  827. }
  828. defer stream.Close()
  829. conn := client.conn.(*pipeConn)
  830. conn.writeBlocker.Lock()
  831. _, err = stream.Read(make([]byte, flood))
  832. if err != ErrConnectionWriteTimeout {
  833. t.Fatalf("err: %v", err)
  834. }
  835. }()
  836. wg.Wait()
  837. }
  838. func TestSession_sendNoWait_Timeout(t *testing.T) {
  839. client, server := testClientServerConfig(testConfNoKeepAlive())
  840. defer client.Close()
  841. defer server.Close()
  842. var wg sync.WaitGroup
  843. wg.Add(2)
  844. go func() {
  845. defer wg.Done()
  846. stream, err := server.AcceptStream()
  847. if err != nil {
  848. t.Fatalf("err: %v", err)
  849. }
  850. defer stream.Close()
  851. }()
  852. // The client will open the stream and then block outbound writes, we'll
  853. // probe sendNoWait once it gets into that state.
  854. go func() {
  855. defer wg.Done()
  856. stream, err := client.OpenStream()
  857. if err != nil {
  858. t.Fatalf("err: %v", err)
  859. }
  860. defer stream.Close()
  861. conn := client.conn.(*pipeConn)
  862. conn.writeBlocker.Lock()
  863. hdr := header(make([]byte, headerSize))
  864. hdr.encode(typePing, flagACK, 0, 0)
  865. for {
  866. err = client.sendNoWait(hdr)
  867. if err == nil {
  868. continue
  869. } else if err == ErrConnectionWriteTimeout {
  870. break
  871. } else {
  872. t.Fatalf("err: %v", err)
  873. }
  874. }
  875. }()
  876. wg.Wait()
  877. }
  878. func TestSession_PingOfDeath(t *testing.T) {
  879. client, server := testClientServerConfig(testConfNoKeepAlive())
  880. defer client.Close()
  881. defer server.Close()
  882. var wg sync.WaitGroup
  883. wg.Add(2)
  884. var doPingOfDeath sync.Mutex
  885. doPingOfDeath.Lock()
  886. // This is used later to block outbound writes.
  887. conn := server.conn.(*pipeConn)
  888. // The server will accept a stream, block outbound writes, and then
  889. // flood its send channel so that no more headers can be queued.
  890. go func() {
  891. defer wg.Done()
  892. stream, err := server.AcceptStream()
  893. if err != nil {
  894. t.Fatalf("err: %v", err)
  895. }
  896. defer stream.Close()
  897. conn.writeBlocker.Lock()
  898. for {
  899. hdr := header(make([]byte, headerSize))
  900. hdr.encode(typePing, 0, 0, 0)
  901. err = server.sendNoWait(hdr)
  902. if err == nil {
  903. continue
  904. } else if err == ErrConnectionWriteTimeout {
  905. break
  906. } else {
  907. t.Fatalf("err: %v", err)
  908. }
  909. }
  910. doPingOfDeath.Unlock()
  911. }()
  912. // The client will open a stream and then send the server a ping once it
  913. // can no longer write. This makes sure the server doesn't deadlock reads
  914. // while trying to reply to the ping with no ability to write.
  915. go func() {
  916. defer wg.Done()
  917. stream, err := client.OpenStream()
  918. if err != nil {
  919. t.Fatalf("err: %v", err)
  920. }
  921. defer stream.Close()
  922. // This ping will never unblock because the ping id will never
  923. // show up in a response.
  924. doPingOfDeath.Lock()
  925. go func() { client.Ping() }()
  926. // Wait for a while to make sure the previous ping times out,
  927. // then turn writes back on and make sure a ping works again.
  928. time.Sleep(2 * server.config.ConnectionWriteTimeout)
  929. conn.writeBlocker.Unlock()
  930. if _, err = client.Ping(); err != nil {
  931. t.Fatalf("err: %v", err)
  932. }
  933. }()
  934. wg.Wait()
  935. }
  936. func TestSession_ConnectionWriteTimeout(t *testing.T) {
  937. client, server := testClientServerConfig(testConfNoKeepAlive())
  938. defer client.Close()
  939. defer server.Close()
  940. var wg sync.WaitGroup
  941. wg.Add(2)
  942. go func() {
  943. defer wg.Done()
  944. stream, err := server.AcceptStream()
  945. if err != nil {
  946. t.Fatalf("err: %v", err)
  947. }
  948. defer stream.Close()
  949. }()
  950. // The client will open the stream and then block outbound writes, we'll
  951. // tee up a write and make sure it eventually times out.
  952. go func() {
  953. defer wg.Done()
  954. stream, err := client.OpenStream()
  955. if err != nil {
  956. t.Fatalf("err: %v", err)
  957. }
  958. defer stream.Close()
  959. conn := client.conn.(*pipeConn)
  960. conn.writeBlocker.Lock()
  961. // Since the write goroutine is blocked then this will return a
  962. // timeout since it can't get feedback about whether the write
  963. // worked.
  964. n, err := stream.Write([]byte("hello"))
  965. if err != ErrConnectionWriteTimeout {
  966. t.Fatalf("err: %v", err)
  967. }
  968. if n != 0 {
  969. t.Fatalf("lied about writes: %d", n)
  970. }
  971. }()
  972. wg.Wait()
  973. }