session_test.go 22 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142
  1. package yamux
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "reflect"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "testing"
  13. "time"
  14. )
  15. type logCapture struct{ bytes.Buffer }
  16. func (l *logCapture) logs() []string {
  17. return strings.Split(strings.TrimSpace(l.String()), "\n")
  18. }
  19. func (l *logCapture) match(expect []string) bool {
  20. return reflect.DeepEqual(l.logs(), expect)
  21. }
  22. func captureLogs(s *Session) *logCapture {
  23. buf := new(logCapture)
  24. s.logger = log.New(buf, "", 0)
  25. return buf
  26. }
  27. type pipeConn struct {
  28. reader *io.PipeReader
  29. writer *io.PipeWriter
  30. writeBlocker sync.Mutex
  31. }
  32. func (p *pipeConn) Read(b []byte) (int, error) {
  33. return p.reader.Read(b)
  34. }
  35. func (p *pipeConn) Write(b []byte) (int, error) {
  36. p.writeBlocker.Lock()
  37. defer p.writeBlocker.Unlock()
  38. return p.writer.Write(b)
  39. }
  40. func (p *pipeConn) Close() error {
  41. p.reader.Close()
  42. return p.writer.Close()
  43. }
  44. func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) {
  45. read1, write1 := io.Pipe()
  46. read2, write2 := io.Pipe()
  47. conn1 := &pipeConn{reader: read1, writer: write2}
  48. conn2 := &pipeConn{reader: read2, writer: write1}
  49. return conn1, conn2
  50. }
  51. func testConf() *Config {
  52. conf := DefaultConfig()
  53. conf.AcceptBacklog = 64
  54. conf.KeepAliveInterval = 100 * time.Millisecond
  55. conf.ConnectionWriteTimeout = 250 * time.Millisecond
  56. return conf
  57. }
  58. func testConfNoKeepAlive() *Config {
  59. conf := testConf()
  60. conf.EnableKeepAlive = false
  61. return conf
  62. }
  63. func testClientServer() (*Session, *Session) {
  64. return testClientServerConfig(testConf())
  65. }
  66. func testClientServerConfig(conf *Config) (*Session, *Session) {
  67. conn1, conn2 := testConn()
  68. client, _ := Client(conn1, conf)
  69. server, _ := Server(conn2, conf)
  70. return client, server
  71. }
  72. func TestPing(t *testing.T) {
  73. client, server := testClientServer()
  74. defer client.Close()
  75. defer server.Close()
  76. rtt, err := client.Ping()
  77. if err != nil {
  78. t.Fatalf("err: %v", err)
  79. }
  80. if rtt == 0 {
  81. t.Fatalf("bad: %v", rtt)
  82. }
  83. rtt, err = server.Ping()
  84. if err != nil {
  85. t.Fatalf("err: %v", err)
  86. }
  87. if rtt == 0 {
  88. t.Fatalf("bad: %v", rtt)
  89. }
  90. }
  91. func TestPing_Timeout(t *testing.T) {
  92. client, server := testClientServerConfig(testConfNoKeepAlive())
  93. defer client.Close()
  94. defer server.Close()
  95. // Prevent the client from responding
  96. clientConn := client.conn.(*pipeConn)
  97. clientConn.writeBlocker.Lock()
  98. errCh := make(chan error, 1)
  99. go func() {
  100. _, err := server.Ping() // Ping via the server session
  101. errCh <- err
  102. }()
  103. select {
  104. case err := <-errCh:
  105. if err != ErrTimeout {
  106. t.Fatalf("err: %v", err)
  107. }
  108. case <-time.After(client.config.ConnectionWriteTimeout * 2):
  109. t.Fatalf("failed to timeout within expected %v", client.config.ConnectionWriteTimeout)
  110. }
  111. // Verify that we recover, even if we gave up
  112. clientConn.writeBlocker.Unlock()
  113. go func() {
  114. _, err := server.Ping() // Ping via the server session
  115. errCh <- err
  116. }()
  117. select {
  118. case err := <-errCh:
  119. if err != nil {
  120. t.Fatalf("err: %v", err)
  121. }
  122. case <-time.After(client.config.ConnectionWriteTimeout):
  123. t.Fatalf("timeout")
  124. }
  125. }
  126. func TestAccept(t *testing.T) {
  127. client, server := testClientServer()
  128. defer client.Close()
  129. defer server.Close()
  130. if client.NumStreams() != 0 {
  131. t.Fatalf("bad")
  132. }
  133. if server.NumStreams() != 0 {
  134. t.Fatalf("bad")
  135. }
  136. wg := &sync.WaitGroup{}
  137. wg.Add(4)
  138. go func() {
  139. defer wg.Done()
  140. stream, err := server.AcceptStream()
  141. if err != nil {
  142. t.Fatalf("err: %v", err)
  143. }
  144. if id := stream.StreamID(); id != 1 {
  145. t.Fatalf("bad: %v", id)
  146. }
  147. if err := stream.Close(); err != nil {
  148. t.Fatalf("err: %v", err)
  149. }
  150. }()
  151. go func() {
  152. defer wg.Done()
  153. stream, err := client.AcceptStream()
  154. if err != nil {
  155. t.Fatalf("err: %v", err)
  156. }
  157. if id := stream.StreamID(); id != 2 {
  158. t.Fatalf("bad: %v", id)
  159. }
  160. if err := stream.Close(); err != nil {
  161. t.Fatalf("err: %v", err)
  162. }
  163. }()
  164. go func() {
  165. defer wg.Done()
  166. stream, err := server.OpenStream()
  167. if err != nil {
  168. t.Fatalf("err: %v", err)
  169. }
  170. if id := stream.StreamID(); id != 2 {
  171. t.Fatalf("bad: %v", id)
  172. }
  173. if err := stream.Close(); err != nil {
  174. t.Fatalf("err: %v", err)
  175. }
  176. }()
  177. go func() {
  178. defer wg.Done()
  179. stream, err := client.OpenStream()
  180. if err != nil {
  181. t.Fatalf("err: %v", err)
  182. }
  183. if id := stream.StreamID(); id != 1 {
  184. t.Fatalf("bad: %v", id)
  185. }
  186. if err := stream.Close(); err != nil {
  187. t.Fatalf("err: %v", err)
  188. }
  189. }()
  190. doneCh := make(chan struct{})
  191. go func() {
  192. wg.Wait()
  193. close(doneCh)
  194. }()
  195. select {
  196. case <-doneCh:
  197. case <-time.After(time.Second):
  198. panic("timeout")
  199. }
  200. }
  201. func TestNonNilInterface(t *testing.T) {
  202. _, server := testClientServer()
  203. server.Close()
  204. conn, err := server.Accept()
  205. if err != nil && conn != nil {
  206. t.Error("bad: accept should return a connection of nil value")
  207. }
  208. conn, err = server.Open()
  209. if err != nil && conn != nil {
  210. t.Error("bad: open should return a connection of nil value")
  211. }
  212. }
  213. func TestSendData_Small(t *testing.T) {
  214. client, server := testClientServer()
  215. defer client.Close()
  216. defer server.Close()
  217. wg := &sync.WaitGroup{}
  218. wg.Add(2)
  219. go func() {
  220. defer wg.Done()
  221. stream, err := server.AcceptStream()
  222. if err != nil {
  223. t.Fatalf("err: %v", err)
  224. }
  225. if server.NumStreams() != 1 {
  226. t.Fatalf("bad")
  227. }
  228. buf := make([]byte, 4)
  229. for i := 0; i < 1000; i++ {
  230. n, err := stream.Read(buf)
  231. if err != nil {
  232. t.Fatalf("err: %v", err)
  233. }
  234. if n != 4 {
  235. t.Fatalf("short read: %d", n)
  236. }
  237. if string(buf) != "test" {
  238. t.Fatalf("bad: %s", buf)
  239. }
  240. }
  241. if err := stream.Close(); err != nil {
  242. t.Fatalf("err: %v", err)
  243. }
  244. }()
  245. go func() {
  246. defer wg.Done()
  247. stream, err := client.Open()
  248. if err != nil {
  249. t.Fatalf("err: %v", err)
  250. }
  251. if client.NumStreams() != 1 {
  252. t.Fatalf("bad")
  253. }
  254. for i := 0; i < 1000; i++ {
  255. n, err := stream.Write([]byte("test"))
  256. if err != nil {
  257. t.Fatalf("err: %v", err)
  258. }
  259. if n != 4 {
  260. t.Fatalf("short write %d", n)
  261. }
  262. }
  263. if err := stream.Close(); err != nil {
  264. t.Fatalf("err: %v", err)
  265. }
  266. }()
  267. doneCh := make(chan struct{})
  268. go func() {
  269. wg.Wait()
  270. close(doneCh)
  271. }()
  272. select {
  273. case <-doneCh:
  274. case <-time.After(time.Second):
  275. panic("timeout")
  276. }
  277. if client.NumStreams() != 0 {
  278. t.Fatalf("bad")
  279. }
  280. if server.NumStreams() != 0 {
  281. t.Fatalf("bad")
  282. }
  283. }
  284. func TestSendData_Large(t *testing.T) {
  285. client, server := testClientServer()
  286. defer client.Close()
  287. defer server.Close()
  288. data := make([]byte, 512*1024)
  289. for idx := range data {
  290. data[idx] = byte(idx % 256)
  291. }
  292. wg := &sync.WaitGroup{}
  293. wg.Add(2)
  294. go func() {
  295. defer wg.Done()
  296. stream, err := server.AcceptStream()
  297. if err != nil {
  298. t.Fatalf("err: %v", err)
  299. }
  300. buf := make([]byte, 4*1024)
  301. for i := 0; i < 128; i++ {
  302. n, err := stream.Read(buf)
  303. if err != nil {
  304. t.Fatalf("err: %v", err)
  305. }
  306. if n != 4*1024 {
  307. t.Fatalf("short read: %d", n)
  308. }
  309. for idx := range buf {
  310. if buf[idx] != byte(idx%256) {
  311. t.Fatalf("bad: %v %v %v", i, idx, buf[idx])
  312. }
  313. }
  314. }
  315. if err := stream.Close(); err != nil {
  316. t.Fatalf("err: %v", err)
  317. }
  318. }()
  319. go func() {
  320. defer wg.Done()
  321. stream, err := client.Open()
  322. if err != nil {
  323. t.Fatalf("err: %v", err)
  324. }
  325. n, err := stream.Write(data)
  326. if err != nil {
  327. t.Fatalf("err: %v", err)
  328. }
  329. if n != len(data) {
  330. t.Fatalf("short write %d", n)
  331. }
  332. if err := stream.Close(); err != nil {
  333. t.Fatalf("err: %v", err)
  334. }
  335. }()
  336. doneCh := make(chan struct{})
  337. go func() {
  338. wg.Wait()
  339. close(doneCh)
  340. }()
  341. select {
  342. case <-doneCh:
  343. case <-time.After(time.Second):
  344. panic("timeout")
  345. }
  346. }
  347. func TestGoAway(t *testing.T) {
  348. client, server := testClientServer()
  349. defer client.Close()
  350. defer server.Close()
  351. if err := server.GoAway(); err != nil {
  352. t.Fatalf("err: %v", err)
  353. }
  354. _, err := client.Open()
  355. if err != ErrRemoteGoAway {
  356. t.Fatalf("err: %v", err)
  357. }
  358. }
  359. func TestManyStreams(t *testing.T) {
  360. client, server := testClientServer()
  361. defer client.Close()
  362. defer server.Close()
  363. wg := &sync.WaitGroup{}
  364. acceptor := func(i int) {
  365. defer wg.Done()
  366. stream, err := server.AcceptStream()
  367. if err != nil {
  368. t.Fatalf("err: %v", err)
  369. }
  370. defer stream.Close()
  371. buf := make([]byte, 512)
  372. for {
  373. n, err := stream.Read(buf)
  374. if err == io.EOF {
  375. return
  376. }
  377. if err != nil {
  378. t.Fatalf("err: %v", err)
  379. }
  380. if n == 0 {
  381. t.Fatalf("err: %v", err)
  382. }
  383. }
  384. }
  385. sender := func(i int) {
  386. defer wg.Done()
  387. stream, err := client.Open()
  388. if err != nil {
  389. t.Fatalf("err: %v", err)
  390. }
  391. defer stream.Close()
  392. msg := fmt.Sprintf("%08d", i)
  393. for i := 0; i < 1000; i++ {
  394. n, err := stream.Write([]byte(msg))
  395. if err != nil {
  396. t.Fatalf("err: %v", err)
  397. }
  398. if n != len(msg) {
  399. t.Fatalf("short write %d", n)
  400. }
  401. }
  402. }
  403. for i := 0; i < 50; i++ {
  404. wg.Add(2)
  405. go acceptor(i)
  406. go sender(i)
  407. }
  408. wg.Wait()
  409. }
  410. func TestManyStreams_PingPong(t *testing.T) {
  411. client, server := testClientServer()
  412. defer client.Close()
  413. defer server.Close()
  414. wg := &sync.WaitGroup{}
  415. ping := []byte("ping")
  416. pong := []byte("pong")
  417. acceptor := func(i int) {
  418. defer wg.Done()
  419. stream, err := server.AcceptStream()
  420. if err != nil {
  421. t.Fatalf("err: %v", err)
  422. }
  423. defer stream.Close()
  424. buf := make([]byte, 4)
  425. for {
  426. n, err := stream.Read(buf)
  427. if err == io.EOF {
  428. return
  429. }
  430. if err != nil {
  431. t.Fatalf("err: %v", err)
  432. }
  433. if n != 4 {
  434. t.Fatalf("err: %v", err)
  435. }
  436. if !bytes.Equal(buf, ping) {
  437. t.Fatalf("bad: %s", buf)
  438. }
  439. n, err = stream.Write(pong)
  440. if err != nil {
  441. t.Fatalf("err: %v", err)
  442. }
  443. if n != 4 {
  444. t.Fatalf("err: %v", err)
  445. }
  446. }
  447. }
  448. sender := func(i int) {
  449. defer wg.Done()
  450. stream, err := client.Open()
  451. if err != nil {
  452. t.Fatalf("err: %v", err)
  453. }
  454. defer stream.Close()
  455. buf := make([]byte, 4)
  456. for i := 0; i < 1000; i++ {
  457. n, err := stream.Write(ping)
  458. if err != nil {
  459. t.Fatalf("err: %v", err)
  460. }
  461. if n != 4 {
  462. t.Fatalf("short write %d", n)
  463. }
  464. n, err = stream.Read(buf)
  465. if err != nil {
  466. t.Fatalf("err: %v", err)
  467. }
  468. if n != 4 {
  469. t.Fatalf("err: %v", err)
  470. }
  471. if !bytes.Equal(buf, pong) {
  472. t.Fatalf("bad: %s", buf)
  473. }
  474. }
  475. }
  476. for i := 0; i < 50; i++ {
  477. wg.Add(2)
  478. go acceptor(i)
  479. go sender(i)
  480. }
  481. wg.Wait()
  482. }
  483. func TestHalfClose(t *testing.T) {
  484. client, server := testClientServer()
  485. defer client.Close()
  486. defer server.Close()
  487. stream, err := client.Open()
  488. if err != nil {
  489. t.Fatalf("err: %v", err)
  490. }
  491. if _, err := stream.Write([]byte("a")); err != nil {
  492. t.Fatalf("err: %v", err)
  493. }
  494. stream2, err := server.Accept()
  495. if err != nil {
  496. t.Fatalf("err: %v", err)
  497. }
  498. stream2.Close() // Half close
  499. buf := make([]byte, 4)
  500. n, err := stream2.Read(buf)
  501. if err != nil {
  502. t.Fatalf("err: %v", err)
  503. }
  504. if n != 1 {
  505. t.Fatalf("bad: %v", n)
  506. }
  507. // Send more
  508. if _, err := stream.Write([]byte("bcd")); err != nil {
  509. t.Fatalf("err: %v", err)
  510. }
  511. stream.Close()
  512. // Read after close
  513. n, err = stream2.Read(buf)
  514. if err != nil {
  515. t.Fatalf("err: %v", err)
  516. }
  517. if n != 3 {
  518. t.Fatalf("bad: %v", n)
  519. }
  520. // EOF after close
  521. n, err = stream2.Read(buf)
  522. if err != io.EOF {
  523. t.Fatalf("err: %v", err)
  524. }
  525. if n != 0 {
  526. t.Fatalf("bad: %v", n)
  527. }
  528. }
  529. func TestReadDeadline(t *testing.T) {
  530. client, server := testClientServer()
  531. defer client.Close()
  532. defer server.Close()
  533. stream, err := client.Open()
  534. if err != nil {
  535. t.Fatalf("err: %v", err)
  536. }
  537. defer stream.Close()
  538. stream2, err := server.Accept()
  539. if err != nil {
  540. t.Fatalf("err: %v", err)
  541. }
  542. defer stream2.Close()
  543. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  544. t.Fatalf("err: %v", err)
  545. }
  546. buf := make([]byte, 4)
  547. if _, err := stream.Read(buf); err != ErrTimeout {
  548. t.Fatalf("err: %v", err)
  549. }
  550. }
  551. func TestWriteDeadline(t *testing.T) {
  552. client, server := testClientServer()
  553. defer client.Close()
  554. defer server.Close()
  555. stream, err := client.Open()
  556. if err != nil {
  557. t.Fatalf("err: %v", err)
  558. }
  559. defer stream.Close()
  560. stream2, err := server.Accept()
  561. if err != nil {
  562. t.Fatalf("err: %v", err)
  563. }
  564. defer stream2.Close()
  565. if err := stream.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)); err != nil {
  566. t.Fatalf("err: %v", err)
  567. }
  568. buf := make([]byte, 512)
  569. for i := 0; i < int(initialStreamWindow); i++ {
  570. _, err := stream.Write(buf)
  571. if err != nil && err == ErrTimeout {
  572. return
  573. } else if err != nil {
  574. t.Fatalf("err: %v", err)
  575. }
  576. }
  577. t.Fatalf("Expected timeout")
  578. }
  579. func TestBacklogExceeded(t *testing.T) {
  580. client, server := testClientServer()
  581. defer client.Close()
  582. defer server.Close()
  583. // Fill the backlog
  584. max := client.config.AcceptBacklog
  585. for i := 0; i < max; i++ {
  586. stream, err := client.Open()
  587. if err != nil {
  588. t.Fatalf("err: %v", err)
  589. }
  590. defer stream.Close()
  591. if _, err := stream.Write([]byte("foo")); err != nil {
  592. t.Fatalf("err: %v", err)
  593. }
  594. }
  595. // Attempt to open a new stream
  596. errCh := make(chan error, 1)
  597. go func() {
  598. _, err := client.Open()
  599. errCh <- err
  600. }()
  601. // Shutdown the server
  602. go func() {
  603. time.Sleep(10 * time.Millisecond)
  604. server.Close()
  605. }()
  606. select {
  607. case err := <-errCh:
  608. if err == nil {
  609. t.Fatalf("open should fail")
  610. }
  611. case <-time.After(time.Second):
  612. t.Fatalf("timeout")
  613. }
  614. }
  615. func TestKeepAlive(t *testing.T) {
  616. client, server := testClientServer()
  617. defer client.Close()
  618. defer server.Close()
  619. time.Sleep(200 * time.Millisecond)
  620. // Ping value should increase
  621. client.pingLock.Lock()
  622. defer client.pingLock.Unlock()
  623. if client.pingID == 0 {
  624. t.Fatalf("should ping")
  625. }
  626. server.pingLock.Lock()
  627. defer server.pingLock.Unlock()
  628. if server.pingID == 0 {
  629. t.Fatalf("should ping")
  630. }
  631. }
  632. func TestKeepAlive_Timeout(t *testing.T) {
  633. conn1, conn2 := testConn()
  634. clientConf := testConf()
  635. clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes
  636. clientConf.EnableKeepAlive = false // Just test one direction, so it's deterministic who hangs up on whom
  637. client, _ := Client(conn1, clientConf)
  638. defer client.Close()
  639. server, _ := Server(conn2, testConf())
  640. defer server.Close()
  641. _ = captureLogs(client) // Client logs aren't part of the test
  642. serverLogs := captureLogs(server)
  643. errCh := make(chan error, 1)
  644. go func() {
  645. _, err := server.Accept() // Wait until server closes
  646. errCh <- err
  647. }()
  648. // Prevent the client from responding
  649. clientConn := client.conn.(*pipeConn)
  650. clientConn.writeBlocker.Lock()
  651. select {
  652. case err := <-errCh:
  653. if err != ErrKeepAliveTimeout {
  654. t.Fatalf("unexpected error: %v", err)
  655. }
  656. case <-time.After(1 * time.Second):
  657. t.Fatalf("timeout waiting for timeout")
  658. }
  659. if !server.IsClosed() {
  660. t.Fatalf("server should have closed")
  661. }
  662. if !serverLogs.match([]string{"[ERR] yamux: keepalive failed: i/o deadline reached"}) {
  663. t.Fatalf("server log incorect: %v", serverLogs.logs())
  664. }
  665. }
  666. func TestLargeWindow(t *testing.T) {
  667. conf := DefaultConfig()
  668. conf.MaxStreamWindowSize *= 2
  669. client, server := testClientServerConfig(conf)
  670. defer client.Close()
  671. defer server.Close()
  672. stream, err := client.Open()
  673. if err != nil {
  674. t.Fatalf("err: %v", err)
  675. }
  676. defer stream.Close()
  677. stream2, err := server.Accept()
  678. if err != nil {
  679. t.Fatalf("err: %v", err)
  680. }
  681. defer stream2.Close()
  682. stream.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
  683. buf := make([]byte, conf.MaxStreamWindowSize)
  684. n, err := stream.Write(buf)
  685. if err != nil {
  686. t.Fatalf("err: %v", err)
  687. }
  688. if n != len(buf) {
  689. t.Fatalf("short write: %d", n)
  690. }
  691. }
  692. type UnlimitedReader struct{}
  693. func (u *UnlimitedReader) Read(p []byte) (int, error) {
  694. runtime.Gosched()
  695. return len(p), nil
  696. }
  697. func TestSendData_VeryLarge(t *testing.T) {
  698. client, server := testClientServer()
  699. defer client.Close()
  700. defer server.Close()
  701. var n int64 = 1 * 1024 * 1024 * 1024
  702. var workers int = 16
  703. wg := &sync.WaitGroup{}
  704. wg.Add(workers * 2)
  705. for i := 0; i < workers; i++ {
  706. go func() {
  707. defer wg.Done()
  708. stream, err := server.AcceptStream()
  709. if err != nil {
  710. t.Fatalf("err: %v", err)
  711. }
  712. defer stream.Close()
  713. buf := make([]byte, 4)
  714. _, err = stream.Read(buf)
  715. if err != nil {
  716. t.Fatalf("err: %v", err)
  717. }
  718. if !bytes.Equal(buf, []byte{0, 1, 2, 3}) {
  719. t.Fatalf("bad header")
  720. }
  721. recv, err := io.Copy(ioutil.Discard, stream)
  722. if err != nil {
  723. t.Fatalf("err: %v", err)
  724. }
  725. if recv != n {
  726. t.Fatalf("bad: %v", recv)
  727. }
  728. }()
  729. }
  730. for i := 0; i < workers; i++ {
  731. go func() {
  732. defer wg.Done()
  733. stream, err := client.Open()
  734. if err != nil {
  735. t.Fatalf("err: %v", err)
  736. }
  737. defer stream.Close()
  738. _, err = stream.Write([]byte{0, 1, 2, 3})
  739. if err != nil {
  740. t.Fatalf("err: %v", err)
  741. }
  742. unlimited := &UnlimitedReader{}
  743. sent, err := io.Copy(stream, io.LimitReader(unlimited, n))
  744. if err != nil {
  745. t.Fatalf("err: %v", err)
  746. }
  747. if sent != n {
  748. t.Fatalf("bad: %v", sent)
  749. }
  750. }()
  751. }
  752. doneCh := make(chan struct{})
  753. go func() {
  754. wg.Wait()
  755. close(doneCh)
  756. }()
  757. select {
  758. case <-doneCh:
  759. case <-time.After(20 * time.Second):
  760. panic("timeout")
  761. }
  762. }
  763. func TestBacklogExceeded_Accept(t *testing.T) {
  764. client, server := testClientServer()
  765. defer client.Close()
  766. defer server.Close()
  767. max := 5 * client.config.AcceptBacklog
  768. go func() {
  769. for i := 0; i < max; i++ {
  770. stream, err := server.Accept()
  771. if err != nil {
  772. t.Fatalf("err: %v", err)
  773. }
  774. defer stream.Close()
  775. }
  776. }()
  777. // Fill the backlog
  778. for i := 0; i < max; i++ {
  779. stream, err := client.Open()
  780. if err != nil {
  781. t.Fatalf("err: %v", err)
  782. }
  783. defer stream.Close()
  784. if _, err := stream.Write([]byte("foo")); err != nil {
  785. t.Fatalf("err: %v", err)
  786. }
  787. }
  788. }
  789. func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
  790. client, server := testClientServerConfig(testConfNoKeepAlive())
  791. defer client.Close()
  792. defer server.Close()
  793. var wg sync.WaitGroup
  794. wg.Add(2)
  795. // Choose a huge flood size that we know will result in a window update.
  796. flood := int64(client.config.MaxStreamWindowSize) - 1
  797. // The server will accept a new stream and then flood data to it.
  798. go func() {
  799. defer wg.Done()
  800. stream, err := server.AcceptStream()
  801. if err != nil {
  802. t.Fatalf("err: %v", err)
  803. }
  804. defer stream.Close()
  805. n, err := stream.Write(make([]byte, flood))
  806. if err != nil {
  807. t.Fatalf("err: %v", err)
  808. }
  809. if int64(n) != flood {
  810. t.Fatalf("short write: %d", n)
  811. }
  812. }()
  813. // The client will open a stream, block outbound writes, and then
  814. // listen to the flood from the server, which should time out since
  815. // it won't be able to send the window update.
  816. go func() {
  817. defer wg.Done()
  818. stream, err := client.OpenStream()
  819. if err != nil {
  820. t.Fatalf("err: %v", err)
  821. }
  822. defer stream.Close()
  823. conn := client.conn.(*pipeConn)
  824. conn.writeBlocker.Lock()
  825. _, err = stream.Read(make([]byte, flood))
  826. if err != ErrConnectionWriteTimeout {
  827. t.Fatalf("err: %v", err)
  828. }
  829. }()
  830. wg.Wait()
  831. }
  832. func TestSession_sendNoWait_Timeout(t *testing.T) {
  833. client, server := testClientServerConfig(testConfNoKeepAlive())
  834. defer client.Close()
  835. defer server.Close()
  836. var wg sync.WaitGroup
  837. wg.Add(2)
  838. go func() {
  839. defer wg.Done()
  840. stream, err := server.AcceptStream()
  841. if err != nil {
  842. t.Fatalf("err: %v", err)
  843. }
  844. defer stream.Close()
  845. }()
  846. // The client will open the stream and then block outbound writes, we'll
  847. // probe sendNoWait once it gets into that state.
  848. go func() {
  849. defer wg.Done()
  850. stream, err := client.OpenStream()
  851. if err != nil {
  852. t.Fatalf("err: %v", err)
  853. }
  854. defer stream.Close()
  855. conn := client.conn.(*pipeConn)
  856. conn.writeBlocker.Lock()
  857. hdr := header(make([]byte, headerSize))
  858. hdr.encode(typePing, flagACK, 0, 0)
  859. for {
  860. err = client.sendNoWait(hdr)
  861. if err == nil {
  862. continue
  863. } else if err == ErrConnectionWriteTimeout {
  864. break
  865. } else {
  866. t.Fatalf("err: %v", err)
  867. }
  868. }
  869. }()
  870. wg.Wait()
  871. }
  872. func TestSession_PingOfDeath(t *testing.T) {
  873. client, server := testClientServerConfig(testConfNoKeepAlive())
  874. defer client.Close()
  875. defer server.Close()
  876. var wg sync.WaitGroup
  877. wg.Add(2)
  878. var doPingOfDeath sync.Mutex
  879. doPingOfDeath.Lock()
  880. // This is used later to block outbound writes.
  881. conn := server.conn.(*pipeConn)
  882. // The server will accept a stream, block outbound writes, and then
  883. // flood its send channel so that no more headers can be queued.
  884. go func() {
  885. defer wg.Done()
  886. stream, err := server.AcceptStream()
  887. if err != nil {
  888. t.Fatalf("err: %v", err)
  889. }
  890. defer stream.Close()
  891. conn.writeBlocker.Lock()
  892. for {
  893. hdr := header(make([]byte, headerSize))
  894. hdr.encode(typePing, 0, 0, 0)
  895. err = server.sendNoWait(hdr)
  896. if err == nil {
  897. continue
  898. } else if err == ErrConnectionWriteTimeout {
  899. break
  900. } else {
  901. t.Fatalf("err: %v", err)
  902. }
  903. }
  904. doPingOfDeath.Unlock()
  905. }()
  906. // The client will open a stream and then send the server a ping once it
  907. // can no longer write. This makes sure the server doesn't deadlock reads
  908. // while trying to reply to the ping with no ability to write.
  909. go func() {
  910. defer wg.Done()
  911. stream, err := client.OpenStream()
  912. if err != nil {
  913. t.Fatalf("err: %v", err)
  914. }
  915. defer stream.Close()
  916. // This ping will never unblock because the ping id will never
  917. // show up in a response.
  918. doPingOfDeath.Lock()
  919. go func() { client.Ping() }()
  920. // Wait for a while to make sure the previous ping times out,
  921. // then turn writes back on and make sure a ping works again.
  922. time.Sleep(2 * server.config.ConnectionWriteTimeout)
  923. conn.writeBlocker.Unlock()
  924. if _, err = client.Ping(); err != nil {
  925. t.Fatalf("err: %v", err)
  926. }
  927. }()
  928. wg.Wait()
  929. }
  930. func TestSession_ConnectionWriteTimeout(t *testing.T) {
  931. client, server := testClientServerConfig(testConfNoKeepAlive())
  932. defer client.Close()
  933. defer server.Close()
  934. var wg sync.WaitGroup
  935. wg.Add(2)
  936. go func() {
  937. defer wg.Done()
  938. stream, err := server.AcceptStream()
  939. if err != nil {
  940. t.Fatalf("err: %v", err)
  941. }
  942. defer stream.Close()
  943. }()
  944. // The client will open the stream and then block outbound writes, we'll
  945. // tee up a write and make sure it eventually times out.
  946. go func() {
  947. defer wg.Done()
  948. stream, err := client.OpenStream()
  949. if err != nil {
  950. t.Fatalf("err: %v", err)
  951. }
  952. defer stream.Close()
  953. conn := client.conn.(*pipeConn)
  954. conn.writeBlocker.Lock()
  955. // Since the write goroutine is blocked then this will return a
  956. // timeout since it can't get feedback about whether the write
  957. // worked.
  958. n, err := stream.Write([]byte("hello"))
  959. if err != ErrConnectionWriteTimeout {
  960. t.Fatalf("err: %v", err)
  961. }
  962. if n != 0 {
  963. t.Fatalf("lied about writes: %d", n)
  964. }
  965. }()
  966. wg.Wait()
  967. }