session_test.go 19 KB


  1. package yamux
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "runtime"
  8. "sync"
  9. "testing"
  10. "time"
  11. )
  12. type pipeConn struct {
  13. reader *io.PipeReader
  14. writer *io.PipeWriter
  15. writeBlocker sync.Mutex
  16. }
  17. func (p *pipeConn) Read(b []byte) (int, error) {
  18. return p.reader.Read(b)
  19. }
  20. func (p *pipeConn) Write(b []byte) (int, error) {
  21. p.writeBlocker.Lock()
  22. defer p.writeBlocker.Unlock()
  23. return p.writer.Write(b)
  24. }
  25. func (p *pipeConn) Close() error {
  26. p.reader.Close()
  27. return p.writer.Close()
  28. }
  29. func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) {
  30. read1, write1 := io.Pipe()
  31. read2, write2 := io.Pipe()
  32. conn1 := &pipeConn{reader: read1, writer: write2}
  33. conn2 := &pipeConn{reader: read2, writer: write1}
  34. return conn1, conn2
  35. }
  36. func testConf() *Config {
  37. conf := DefaultConfig()
  38. conf.AcceptBacklog = 64
  39. conf.KeepAliveInterval = 100 * time.Millisecond
  40. conf.ConnectionWriteTimeout = 250 * time.Millisecond
  41. return conf
  42. }
  43. func testConfNoKeepAlive() *Config {
  44. conf := testConf()
  45. conf.EnableKeepAlive = false
  46. return conf
  47. }
  48. func testClientServer() (*Session, *Session) {
  49. return testClientServerConfig(testConf())
  50. }
  51. func testClientServerConfig(conf *Config) (*Session, *Session) {
  52. conn1, conn2 := testConn()
  53. client, _ := Client(conn1, conf)
  54. server, _ := Server(conn2, conf)
  55. return client, server
  56. }
  57. func TestPing(t *testing.T) {
  58. client, server := testClientServer()
  59. defer client.Close()
  60. defer server.Close()
  61. rtt, err := client.Ping()
  62. if err != nil {
  63. t.Fatalf("err: %v", err)
  64. }
  65. if rtt == 0 {
  66. t.Fatalf("bad: %v", rtt)
  67. }
  68. rtt, err = server.Ping()
  69. if err != nil {
  70. t.Fatalf("err: %v", err)
  71. }
  72. if rtt == 0 {
  73. t.Fatalf("bad: %v", rtt)
  74. }
  75. }
  76. func TestAccept(t *testing.T) {
  77. client, server := testClientServer()
  78. defer client.Close()
  79. defer server.Close()
  80. if client.NumStreams() != 0 {
  81. t.Fatalf("bad")
  82. }
  83. if server.NumStreams() != 0 {
  84. t.Fatalf("bad")
  85. }
  86. wg := &sync.WaitGroup{}
  87. wg.Add(4)
  88. go func() {
  89. defer wg.Done()
  90. stream, err := server.AcceptStream()
  91. if err != nil {
  92. t.Fatalf("err: %v", err)
  93. }
  94. if id := stream.StreamID(); id != 1 {
  95. t.Fatalf("bad: %v", id)
  96. }
  97. if err := stream.Close(); err != nil {
  98. t.Fatalf("err: %v", err)
  99. }
  100. }()
  101. go func() {
  102. defer wg.Done()
  103. stream, err := client.AcceptStream()
  104. if err != nil {
  105. t.Fatalf("err: %v", err)
  106. }
  107. if id := stream.StreamID(); id != 2 {
  108. t.Fatalf("bad: %v", id)
  109. }
  110. if err := stream.Close(); err != nil {
  111. t.Fatalf("err: %v", err)
  112. }
  113. }()
  114. go func() {
  115. defer wg.Done()
  116. stream, err := server.OpenStream()
  117. if err != nil {
  118. t.Fatalf("err: %v", err)
  119. }
  120. if id := stream.StreamID(); id != 2 {
  121. t.Fatalf("bad: %v", id)
  122. }
  123. if err := stream.Close(); err != nil {
  124. t.Fatalf("err: %v", err)
  125. }
  126. }()
  127. go func() {
  128. defer wg.Done()
  129. stream, err := client.OpenStream()
  130. if err != nil {
  131. t.Fatalf("err: %v", err)
  132. }
  133. if id := stream.StreamID(); id != 1 {
  134. t.Fatalf("bad: %v", id)
  135. }
  136. if err := stream.Close(); err != nil {
  137. t.Fatalf("err: %v", err)
  138. }
  139. }()
  140. doneCh := make(chan struct{})
  141. go func() {
  142. wg.Wait()
  143. close(doneCh)
  144. }()
  145. select {
  146. case <-doneCh:
  147. case <-time.After(time.Second):
  148. panic("timeout")
  149. }
  150. }
  151. func TestNonNilInterface(t *testing.T) {
  152. _, server := testClientServer()
  153. server.Close()
  154. conn, err := server.Accept()
  155. if err != nil && conn != nil {
  156. t.Error("bad: accept should return a connection of nil value")
  157. }
  158. conn, err = server.Open()
  159. if err != nil && conn != nil {
  160. t.Error("bad: open should return a connection of nil value")
  161. }
  162. }
  163. func TestSendData_Small(t *testing.T) {
  164. client, server := testClientServer()
  165. defer client.Close()
  166. defer server.Close()
  167. wg := &sync.WaitGroup{}
  168. wg.Add(2)
  169. go func() {
  170. defer wg.Done()
  171. stream, err := server.AcceptStream()
  172. if err != nil {
  173. t.Fatalf("err: %v", err)
  174. }
  175. if server.NumStreams() != 1 {
  176. t.Fatalf("bad")
  177. }
  178. buf := make([]byte, 4)
  179. for i := 0; i < 1000; i++ {
  180. n, err := stream.Read(buf)
  181. if err != nil {
  182. t.Fatalf("err: %v", err)
  183. }
  184. if n != 4 {
  185. t.Fatalf("short read: %d", n)
  186. }
  187. if string(buf) != "test" {
  188. t.Fatalf("bad: %s", buf)
  189. }
  190. }
  191. if err := stream.Close(); err != nil {
  192. t.Fatalf("err: %v", err)
  193. }
  194. }()
  195. go func() {
  196. defer wg.Done()
  197. stream, err := client.Open()
  198. if err != nil {
  199. t.Fatalf("err: %v", err)
  200. }
  201. if client.NumStreams() != 1 {
  202. t.Fatalf("bad")
  203. }
  204. for i := 0; i < 1000; i++ {
  205. n, err := stream.Write([]byte("test"))
  206. if err != nil {
  207. t.Fatalf("err: %v", err)
  208. }
  209. if n != 4 {
  210. t.Fatalf("short write %d", n)
  211. }
  212. }
  213. if err := stream.Close(); err != nil {
  214. t.Fatalf("err: %v", err)
  215. }
  216. }()
  217. doneCh := make(chan struct{})
  218. go func() {
  219. wg.Wait()
  220. close(doneCh)
  221. }()
  222. select {
  223. case <-doneCh:
  224. case <-time.After(time.Second):
  225. panic("timeout")
  226. }
  227. if client.NumStreams() != 0 {
  228. t.Fatalf("bad")
  229. }
  230. if server.NumStreams() != 0 {
  231. t.Fatalf("bad")
  232. }
  233. }
  234. func TestSendData_Large(t *testing.T) {
  235. client, server := testClientServer()
  236. defer client.Close()
  237. defer server.Close()
  238. data := make([]byte, 512*1024)
  239. for idx := range data {
  240. data[idx] = byte(idx % 256)
  241. }
  242. wg := &sync.WaitGroup{}
  243. wg.Add(2)
  244. go func() {
  245. defer wg.Done()
  246. stream, err := server.AcceptStream()
  247. if err != nil {
  248. t.Fatalf("err: %v", err)
  249. }
  250. buf := make([]byte, 4*1024)
  251. for i := 0; i < 128; i++ {
  252. n, err := stream.Read(buf)
  253. if err != nil {
  254. t.Fatalf("err: %v", err)
  255. }
  256. if n != 4*1024 {
  257. t.Fatalf("short read: %d", n)
  258. }
  259. for idx := range buf {
  260. if buf[idx] != byte(idx%256) {
  261. t.Fatalf("bad: %v %v %v", i, idx, buf[idx])
  262. }
  263. }
  264. }
  265. if err := stream.Close(); err != nil {
  266. t.Fatalf("err: %v", err)
  267. }
  268. }()
  269. go func() {
  270. defer wg.Done()
  271. stream, err := client.Open()
  272. if err != nil {
  273. t.Fatalf("err: %v", err)
  274. }
  275. n, err := stream.Write(data)
  276. if err != nil {
  277. t.Fatalf("err: %v", err)
  278. }
  279. if n != len(data) {
  280. t.Fatalf("short write %d", n)
  281. }
  282. if err := stream.Close(); err != nil {
  283. t.Fatalf("err: %v", err)
  284. }
  285. }()
  286. doneCh := make(chan struct{})
  287. go func() {
  288. wg.Wait()
  289. close(doneCh)
  290. }()
  291. select {
  292. case <-doneCh:
  293. case <-time.After(time.Second):
  294. panic("timeout")
  295. }
  296. }
  297. func TestGoAway(t *testing.T) {
  298. client, server := testClientServer()
  299. defer client.Close()
  300. defer server.Close()
  301. if err := server.GoAway(); err != nil {
  302. t.Fatalf("err: %v", err)
  303. }
  304. _, err := client.Open()
  305. if err != ErrRemoteGoAway {
  306. t.Fatalf("err: %v", err)
  307. }
  308. }
  309. func TestManyStreams(t *testing.T) {
  310. client, server := testClientServer()
  311. defer client.Close()
  312. defer server.Close()
  313. wg := &sync.WaitGroup{}
  314. acceptor := func(i int) {
  315. defer wg.Done()
  316. stream, err := server.AcceptStream()
  317. if err != nil {
  318. t.Fatalf("err: %v", err)
  319. }
  320. defer stream.Close()
  321. buf := make([]byte, 512)
  322. for {
  323. n, err := stream.Read(buf)
  324. if err == io.EOF {
  325. return
  326. }
  327. if err != nil {
  328. t.Fatalf("err: %v", err)
  329. }
  330. if n == 0 {
  331. t.Fatalf("err: %v", err)
  332. }
  333. }
  334. }
  335. sender := func(i int) {
  336. defer wg.Done()
  337. stream, err := client.Open()
  338. if err != nil {
  339. t.Fatalf("err: %v", err)
  340. }
  341. defer stream.Close()
  342. msg := fmt.Sprintf("%08d", i)
  343. for i := 0; i < 1000; i++ {
  344. n, err := stream.Write([]byte(msg))
  345. if err != nil {
  346. t.Fatalf("err: %v", err)
  347. }
  348. if n != len(msg) {
  349. t.Fatalf("short write %d", n)
  350. }
  351. }
  352. }
  353. for i := 0; i < 50; i++ {
  354. wg.Add(2)
  355. go acceptor(i)
  356. go sender(i)
  357. }
  358. wg.Wait()
  359. }
  360. func TestManyStreams_PingPong(t *testing.T) {
  361. client, server := testClientServer()
  362. defer client.Close()
  363. defer server.Close()
  364. wg := &sync.WaitGroup{}
  365. ping := []byte("ping")
  366. pong := []byte("pong")
  367. acceptor := func(i int) {
  368. defer wg.Done()
  369. stream, err := server.AcceptStream()
  370. if err != nil {
  371. t.Fatalf("err: %v", err)
  372. }
  373. defer stream.Close()
  374. buf := make([]byte, 4)
  375. for {
  376. n, err := stream.Read(buf)
  377. if err == io.EOF {
  378. return
  379. }
  380. if err != nil {
  381. t.Fatalf("err: %v", err)
  382. }
  383. if n != 4 {
  384. t.Fatalf("err: %v", err)
  385. }
  386. if !bytes.Equal(buf, ping) {
  387. t.Fatalf("bad: %s", buf)
  388. }
  389. n, err = stream.Write(pong)
  390. if err != nil {
  391. t.Fatalf("err: %v", err)
  392. }
  393. if n != 4 {
  394. t.Fatalf("err: %v", err)
  395. }
  396. }
  397. }
  398. sender := func(i int) {
  399. defer wg.Done()
  400. stream, err := client.Open()
  401. if err != nil {
  402. t.Fatalf("err: %v", err)
  403. }
  404. defer stream.Close()
  405. buf := make([]byte, 4)
  406. for i := 0; i < 1000; i++ {
  407. n, err := stream.Write(ping)
  408. if err != nil {
  409. t.Fatalf("err: %v", err)
  410. }
  411. if n != 4 {
  412. t.Fatalf("short write %d", n)
  413. }
  414. n, err = stream.Read(buf)
  415. if err != nil {
  416. t.Fatalf("err: %v", err)
  417. }
  418. if n != 4 {
  419. t.Fatalf("err: %v", err)
  420. }
  421. if !bytes.Equal(buf, pong) {
  422. t.Fatalf("bad: %s", buf)
  423. }
  424. }
  425. }
  426. for i := 0; i < 50; i++ {
  427. wg.Add(2)
  428. go acceptor(i)
  429. go sender(i)
  430. }
  431. wg.Wait()
  432. }
  433. func TestHalfClose(t *testing.T) {
  434. client, server := testClientServer()
  435. defer client.Close()
  436. defer server.Close()
  437. stream, err := client.Open()
  438. if err != nil {
  439. t.Fatalf("err: %v", err)
  440. }
  441. if _, err := stream.Write([]byte("a")); err != nil {
  442. t.Fatalf("err: %v", err)
  443. }
  444. stream2, err := server.Accept()
  445. if err != nil {
  446. t.Fatalf("err: %v", err)
  447. }
  448. stream2.Close() // Half close
  449. buf := make([]byte, 4)
  450. n, err := stream2.Read(buf)
  451. if err != nil {
  452. t.Fatalf("err: %v", err)
  453. }
  454. if n != 1 {
  455. t.Fatalf("bad: %v", n)
  456. }
  457. // Send more
  458. if _, err := stream.Write([]byte("bcd")); err != nil {
  459. t.Fatalf("err: %v", err)
  460. }
  461. stream.Close()
  462. // Read after close
  463. n, err = stream2.Read(buf)
  464. if err != nil {
  465. t.Fatalf("err: %v", err)
  466. }
  467. if n != 3 {
  468. t.Fatalf("bad: %v", n)
  469. }
  470. // EOF after close
  471. n, err = stream2.Read(buf)
  472. if err != io.EOF {
  473. t.Fatalf("err: %v", err)
  474. }
  475. if n != 0 {
  476. t.Fatalf("bad: %v", n)
  477. }
  478. }
  479. func TestReadDeadline(t *testing.T) {
  480. client, server := testClientServer()
  481. defer client.Close()
  482. defer server.Close()
  483. stream, err := client.Open()
  484. if err != nil {
  485. t.Fatalf("err: %v", err)
  486. }
  487. defer stream.Close()
  488. stream2, err := server.Accept()
  489. if err != nil {
  490. t.Fatalf("err: %v", err)
  491. }
  492. defer stream2.Close()
  493. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  494. t.Fatalf("err: %v", err)
  495. }
  496. buf := make([]byte, 4)
  497. if _, err := stream.Read(buf); err != ErrTimeout {
  498. t.Fatalf("err: %v", err)
  499. }
  500. }
  501. func TestWriteDeadline(t *testing.T) {
  502. client, server := testClientServer()
  503. defer client.Close()
  504. defer server.Close()
  505. stream, err := client.Open()
  506. if err != nil {
  507. t.Fatalf("err: %v", err)
  508. }
  509. defer stream.Close()
  510. stream2, err := server.Accept()
  511. if err != nil {
  512. t.Fatalf("err: %v", err)
  513. }
  514. defer stream2.Close()
  515. if err := stream.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)); err != nil {
  516. t.Fatalf("err: %v", err)
  517. }
  518. buf := make([]byte, 512)
  519. for i := 0; i < int(initialStreamWindow); i++ {
  520. _, err := stream.Write(buf)
  521. if err != nil && err == ErrTimeout {
  522. return
  523. } else if err != nil {
  524. t.Fatalf("err: %v", err)
  525. }
  526. }
  527. t.Fatalf("Expected timeout")
  528. }
  529. func TestBacklogExceeded(t *testing.T) {
  530. client, server := testClientServer()
  531. defer client.Close()
  532. defer server.Close()
  533. // Fill the backlog
  534. max := client.config.AcceptBacklog
  535. for i := 0; i < max; i++ {
  536. stream, err := client.Open()
  537. if err != nil {
  538. t.Fatalf("err: %v", err)
  539. }
  540. defer stream.Close()
  541. if _, err := stream.Write([]byte("foo")); err != nil {
  542. t.Fatalf("err: %v", err)
  543. }
  544. }
  545. // Attempt to open a new stream
  546. errCh := make(chan error, 1)
  547. go func() {
  548. _, err := client.Open()
  549. errCh <- err
  550. }()
  551. // Shutdown the server
  552. go func() {
  553. time.Sleep(10 * time.Millisecond)
  554. server.Close()
  555. }()
  556. select {
  557. case err := <-errCh:
  558. if err == nil {
  559. t.Fatalf("open should fail")
  560. }
  561. case <-time.After(time.Second):
  562. t.Fatalf("timeout")
  563. }
  564. }
  565. func TestKeepAlive(t *testing.T) {
  566. client, server := testClientServer()
  567. defer client.Close()
  568. defer server.Close()
  569. time.Sleep(200 * time.Millisecond)
  570. // Ping value should increase
  571. client.pingLock.Lock()
  572. defer client.pingLock.Unlock()
  573. if client.pingID == 0 {
  574. t.Fatalf("should ping")
  575. }
  576. server.pingLock.Lock()
  577. defer server.pingLock.Unlock()
  578. if server.pingID == 0 {
  579. t.Fatalf("should ping")
  580. }
  581. }
  582. func TestLargeWindow(t *testing.T) {
  583. conf := DefaultConfig()
  584. conf.MaxStreamWindowSize *= 2
  585. client, server := testClientServerConfig(conf)
  586. defer client.Close()
  587. defer server.Close()
  588. stream, err := client.Open()
  589. if err != nil {
  590. t.Fatalf("err: %v", err)
  591. }
  592. defer stream.Close()
  593. stream2, err := server.Accept()
  594. if err != nil {
  595. t.Fatalf("err: %v", err)
  596. }
  597. defer stream2.Close()
  598. stream.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
  599. buf := make([]byte, conf.MaxStreamWindowSize)
  600. n, err := stream.Write(buf)
  601. if err != nil {
  602. t.Fatalf("err: %v", err)
  603. }
  604. if n != len(buf) {
  605. t.Fatalf("short write: %d", n)
  606. }
  607. }
  608. type UnlimitedReader struct{}
  609. func (u *UnlimitedReader) Read(p []byte) (int, error) {
  610. runtime.Gosched()
  611. return len(p), nil
  612. }
  613. func TestSendData_VeryLarge(t *testing.T) {
  614. client, server := testClientServer()
  615. defer client.Close()
  616. defer server.Close()
  617. var n int64 = 1 * 1024 * 1024 * 1024
  618. var workers int = 16
  619. wg := &sync.WaitGroup{}
  620. wg.Add(workers * 2)
  621. for i := 0; i < workers; i++ {
  622. go func() {
  623. defer wg.Done()
  624. stream, err := server.AcceptStream()
  625. if err != nil {
  626. t.Fatalf("err: %v", err)
  627. }
  628. defer stream.Close()
  629. buf := make([]byte, 4)
  630. _, err = stream.Read(buf)
  631. if err != nil {
  632. t.Fatalf("err: %v", err)
  633. }
  634. if !bytes.Equal(buf, []byte{0, 1, 2, 3}) {
  635. t.Fatalf("bad header")
  636. }
  637. recv, err := io.Copy(ioutil.Discard, stream)
  638. if err != nil {
  639. t.Fatalf("err: %v", err)
  640. }
  641. if recv != n {
  642. t.Fatalf("bad: %v", recv)
  643. }
  644. }()
  645. }
  646. for i := 0; i < workers; i++ {
  647. go func() {
  648. defer wg.Done()
  649. stream, err := client.Open()
  650. if err != nil {
  651. t.Fatalf("err: %v", err)
  652. }
  653. defer stream.Close()
  654. _, err = stream.Write([]byte{0, 1, 2, 3})
  655. if err != nil {
  656. t.Fatalf("err: %v", err)
  657. }
  658. unlimited := &UnlimitedReader{}
  659. sent, err := io.Copy(stream, io.LimitReader(unlimited, n))
  660. if err != nil {
  661. t.Fatalf("err: %v", err)
  662. }
  663. if sent != n {
  664. t.Fatalf("bad: %v", sent)
  665. }
  666. }()
  667. }
  668. doneCh := make(chan struct{})
  669. go func() {
  670. wg.Wait()
  671. close(doneCh)
  672. }()
  673. select {
  674. case <-doneCh:
  675. case <-time.After(20 * time.Second):
  676. panic("timeout")
  677. }
  678. }
  679. func TestBacklogExceeded_Accept(t *testing.T) {
  680. client, server := testClientServer()
  681. defer client.Close()
  682. defer server.Close()
  683. max := 5 * client.config.AcceptBacklog
  684. go func() {
  685. for i := 0; i < max; i++ {
  686. stream, err := server.Accept()
  687. if err != nil {
  688. t.Fatalf("err: %v", err)
  689. }
  690. defer stream.Close()
  691. }
  692. }()
  693. // Fill the backlog
  694. for i := 0; i < max; i++ {
  695. stream, err := client.Open()
  696. if err != nil {
  697. t.Fatalf("err: %v", err)
  698. }
  699. defer stream.Close()
  700. if _, err := stream.Write([]byte("foo")); err != nil {
  701. t.Fatalf("err: %v", err)
  702. }
  703. }
  704. }
  705. func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
  706. client, server := testClientServerConfig(testConfNoKeepAlive())
  707. defer client.Close()
  708. defer server.Close()
  709. var wg sync.WaitGroup
  710. wg.Add(2)
  711. // Choose a huge flood size that we know will result in a window update.
  712. flood := int64(client.config.MaxStreamWindowSize) - 1
  713. // The server will accept a new stream and then flood data to it.
  714. go func() {
  715. defer wg.Done()
  716. stream, err := server.AcceptStream()
  717. if err != nil {
  718. t.Fatalf("err: %v", err)
  719. }
  720. defer stream.Close()
  721. n, err := stream.Write(make([]byte, flood))
  722. if err != nil {
  723. t.Fatalf("err: %v", err)
  724. }
  725. if int64(n) != flood {
  726. t.Fatalf("short write: %d", n)
  727. }
  728. }()
  729. // The client will open a stream, block outbound writes, and then
  730. // listen to the flood from the server, which should time out since
  731. // it won't be able to send the window update.
  732. go func() {
  733. defer wg.Done()
  734. stream, err := client.OpenStream()
  735. if err != nil {
  736. t.Fatalf("err: %v", err)
  737. }
  738. defer stream.Close()
  739. conn := client.conn.(*pipeConn)
  740. conn.writeBlocker.Lock()
  741. _, err = stream.Read(make([]byte, flood))
  742. if err != ErrConnectionWriteTimeout {
  743. t.Fatalf("err: %v", err)
  744. }
  745. }()
  746. wg.Wait()
  747. }
  748. func TestSession_sendNoWait_Timeout(t *testing.T) {
  749. client, server := testClientServerConfig(testConfNoKeepAlive())
  750. defer client.Close()
  751. defer server.Close()
  752. var wg sync.WaitGroup
  753. wg.Add(2)
  754. go func() {
  755. defer wg.Done()
  756. stream, err := server.AcceptStream()
  757. if err != nil {
  758. t.Fatalf("err: %v", err)
  759. }
  760. defer stream.Close()
  761. }()
  762. // The client will open the stream and then block outbound writes, we'll
  763. // probe sendNoWait once it gets into that state.
  764. go func() {
  765. defer wg.Done()
  766. stream, err := client.OpenStream()
  767. if err != nil {
  768. t.Fatalf("err: %v", err)
  769. }
  770. defer stream.Close()
  771. conn := client.conn.(*pipeConn)
  772. conn.writeBlocker.Lock()
  773. hdr := header(make([]byte, headerSize))
  774. hdr.encode(typePing, flagACK, 0, 0)
  775. for {
  776. err = client.sendNoWait(hdr)
  777. if err == nil {
  778. continue
  779. } else if err == ErrConnectionWriteTimeout {
  780. break
  781. } else {
  782. t.Fatalf("err: %v", err)
  783. }
  784. }
  785. }()
  786. wg.Wait()
  787. }
  788. func TestSession_PingOfDeath(t *testing.T) {
  789. client, server := testClientServerConfig(testConfNoKeepAlive())
  790. defer client.Close()
  791. defer server.Close()
  792. var wg sync.WaitGroup
  793. wg.Add(2)
  794. var doPingOfDeath sync.Mutex
  795. doPingOfDeath.Lock()
  796. // This is used later to block outbound writes.
  797. conn := server.conn.(*pipeConn)
  798. // The server will accept a stream, block outbound writes, and then
  799. // flood its send channel so that no more headers can be queued.
  800. go func() {
  801. defer wg.Done()
  802. stream, err := server.AcceptStream()
  803. if err != nil {
  804. t.Fatalf("err: %v", err)
  805. }
  806. defer stream.Close()
  807. conn.writeBlocker.Lock()
  808. for {
  809. hdr := header(make([]byte, headerSize))
  810. hdr.encode(typePing, 0, 0, 0)
  811. err = server.sendNoWait(hdr)
  812. if err == nil {
  813. continue
  814. } else if err == ErrConnectionWriteTimeout {
  815. break
  816. } else {
  817. t.Fatalf("err: %v", err)
  818. }
  819. }
  820. doPingOfDeath.Unlock()
  821. }()
  822. // The client will open a stream and then send the server a ping once it
  823. // can no longer write. This makes sure the server doesn't deadlock reads
  824. // while trying to reply to the ping with no ability to write.
  825. go func() {
  826. defer wg.Done()
  827. stream, err := client.OpenStream()
  828. if err != nil {
  829. t.Fatalf("err: %v", err)
  830. }
  831. defer stream.Close()
  832. // This ping will never unblock because the ping id will never
  833. // show up in a response.
  834. doPingOfDeath.Lock()
  835. go func() { client.Ping() }()
  836. // Wait for a while to make sure the previous ping times out,
  837. // then turn writes back on and make sure a ping works again.
  838. time.Sleep(2 * server.config.ConnectionWriteTimeout)
  839. conn.writeBlocker.Unlock()
  840. if _, err = client.Ping(); err != nil {
  841. t.Fatalf("err: %v", err)
  842. }
  843. }()
  844. wg.Wait()
  845. }
  846. func TestSession_ConnectionWriteTimeout(t *testing.T) {
  847. client, server := testClientServerConfig(testConfNoKeepAlive())
  848. defer client.Close()
  849. defer server.Close()
  850. var wg sync.WaitGroup
  851. wg.Add(2)
  852. go func() {
  853. defer wg.Done()
  854. stream, err := server.AcceptStream()
  855. if err != nil {
  856. t.Fatalf("err: %v", err)
  857. }
  858. defer stream.Close()
  859. }()
  860. // The client will open the stream and then block outbound writes, we'll
  861. // tee up a write and make sure it eventually times out.
  862. go func() {
  863. defer wg.Done()
  864. stream, err := client.OpenStream()
  865. if err != nil {
  866. t.Fatalf("err: %v", err)
  867. }
  868. defer stream.Close()
  869. conn := client.conn.(*pipeConn)
  870. conn.writeBlocker.Lock()
  871. // Since the write goroutine is blocked then this will return a
  872. // timeout since it can't get feedback about whether the write
  873. // worked.
  874. n, err := stream.Write([]byte("hello"))
  875. if err != ErrConnectionWriteTimeout {
  876. t.Fatalf("err: %v", err)
  877. }
  878. if n != 0 {
  879. t.Fatalf("lied about writes: %d", n)
  880. }
  881. }()
  882. wg.Wait()
  883. }