session_test.go 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442
  1. package yamux
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "reflect"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "testing"
  14. "time"
  15. )
  16. type logCapture struct{ bytes.Buffer }
  17. func (l *logCapture) logs() []string {
  18. return strings.Split(strings.TrimSpace(l.String()), "\n")
  19. }
  20. func (l *logCapture) match(expect []string) bool {
  21. return reflect.DeepEqual(l.logs(), expect)
  22. }
  23. func captureLogs(s *Session) *logCapture {
  24. buf := new(logCapture)
  25. s.logger = log.New(buf, "", 0)
  26. return buf
  27. }
  28. type pipeConn struct {
  29. reader *io.PipeReader
  30. writer *io.PipeWriter
  31. writeBlocker sync.Mutex
  32. }
  33. func (p *pipeConn) Read(b []byte) (int, error) {
  34. return p.reader.Read(b)
  35. }
  36. func (p *pipeConn) Write(b []byte) (int, error) {
  37. p.writeBlocker.Lock()
  38. defer p.writeBlocker.Unlock()
  39. return p.writer.Write(b)
  40. }
  41. func (p *pipeConn) Close() error {
  42. p.reader.Close()
  43. return p.writer.Close()
  44. }
  45. func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) {
  46. read1, write1 := io.Pipe()
  47. read2, write2 := io.Pipe()
  48. conn1 := &pipeConn{reader: read1, writer: write2}
  49. conn2 := &pipeConn{reader: read2, writer: write1}
  50. return conn1, conn2
  51. }
  52. func testConf() *Config {
  53. conf := DefaultConfig()
  54. conf.AcceptBacklog = 64
  55. conf.KeepAliveInterval = 100 * time.Millisecond
  56. conf.ConnectionWriteTimeout = 250 * time.Millisecond
  57. return conf
  58. }
  59. func testConfNoKeepAlive() *Config {
  60. conf := testConf()
  61. conf.EnableKeepAlive = false
  62. return conf
  63. }
  64. func testClientServer() (*Session, *Session) {
  65. return testClientServerConfig(testConf())
  66. }
  67. func testClientServerConfig(conf *Config) (*Session, *Session) {
  68. conn1, conn2 := testConn()
  69. client, _ := Client(conn1, conf)
  70. server, _ := Server(conn2, conf)
  71. return client, server
  72. }
  73. func TestPing(t *testing.T) {
  74. client, server := testClientServer()
  75. defer client.Close()
  76. defer server.Close()
  77. rtt, err := client.Ping()
  78. if err != nil {
  79. t.Fatalf("err: %v", err)
  80. }
  81. if rtt == 0 {
  82. t.Fatalf("bad: %v", rtt)
  83. }
  84. rtt, err = server.Ping()
  85. if err != nil {
  86. t.Fatalf("err: %v", err)
  87. }
  88. if rtt == 0 {
  89. t.Fatalf("bad: %v", rtt)
  90. }
  91. }
  92. func TestPing_Timeout(t *testing.T) {
  93. client, server := testClientServerConfig(testConfNoKeepAlive())
  94. defer client.Close()
  95. defer server.Close()
  96. // Prevent the client from responding
  97. clientConn := client.conn.(*pipeConn)
  98. clientConn.writeBlocker.Lock()
  99. errCh := make(chan error, 1)
  100. go func() {
  101. _, err := server.Ping() // Ping via the server session
  102. errCh <- err
  103. }()
  104. select {
  105. case err := <-errCh:
  106. if err != ErrTimeout {
  107. t.Fatalf("err: %v", err)
  108. }
  109. case <-time.After(client.config.ConnectionWriteTimeout * 2):
  110. t.Fatalf("failed to timeout within expected %v", client.config.ConnectionWriteTimeout)
  111. }
  112. // Verify that we recover, even if we gave up
  113. clientConn.writeBlocker.Unlock()
  114. go func() {
  115. _, err := server.Ping() // Ping via the server session
  116. errCh <- err
  117. }()
  118. select {
  119. case err := <-errCh:
  120. if err != nil {
  121. t.Fatalf("err: %v", err)
  122. }
  123. case <-time.After(client.config.ConnectionWriteTimeout):
  124. t.Fatalf("timeout")
  125. }
  126. }
  127. func TestCloseBeforeAck(t *testing.T) {
  128. cfg := testConf()
  129. cfg.AcceptBacklog = 8
  130. client, server := testClientServerConfig(cfg)
  131. defer client.Close()
  132. defer server.Close()
  133. for i := 0; i < 8; i++ {
  134. s, err := client.OpenStream()
  135. if err != nil {
  136. t.Fatal(err)
  137. }
  138. s.Close()
  139. }
  140. for i := 0; i < 8; i++ {
  141. s, err := server.AcceptStream()
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. s.Close()
  146. }
  147. done := make(chan struct{})
  148. go func() {
  149. defer close(done)
  150. s, err := client.OpenStream()
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. s.Close()
  155. }()
  156. select {
  157. case <-done:
  158. case <-time.After(time.Second * 5):
  159. t.Fatal("timed out trying to open stream")
  160. }
  161. }
  162. func TestAccept(t *testing.T) {
  163. client, server := testClientServer()
  164. defer client.Close()
  165. defer server.Close()
  166. if client.NumStreams() != 0 {
  167. t.Fatalf("bad")
  168. }
  169. if server.NumStreams() != 0 {
  170. t.Fatalf("bad")
  171. }
  172. wg := &sync.WaitGroup{}
  173. wg.Add(4)
  174. go func() {
  175. defer wg.Done()
  176. stream, err := server.AcceptStream()
  177. if err != nil {
  178. t.Fatalf("err: %v", err)
  179. }
  180. if id := stream.StreamID(); id != 1 {
  181. t.Fatalf("bad: %v", id)
  182. }
  183. if err := stream.Close(); err != nil {
  184. t.Fatalf("err: %v", err)
  185. }
  186. }()
  187. go func() {
  188. defer wg.Done()
  189. stream, err := client.AcceptStream()
  190. if err != nil {
  191. t.Fatalf("err: %v", err)
  192. }
  193. if id := stream.StreamID(); id != 2 {
  194. t.Fatalf("bad: %v", id)
  195. }
  196. if err := stream.Close(); err != nil {
  197. t.Fatalf("err: %v", err)
  198. }
  199. }()
  200. go func() {
  201. defer wg.Done()
  202. stream, err := server.OpenStream()
  203. if err != nil {
  204. t.Fatalf("err: %v", err)
  205. }
  206. if id := stream.StreamID(); id != 2 {
  207. t.Fatalf("bad: %v", id)
  208. }
  209. if err := stream.Close(); err != nil {
  210. t.Fatalf("err: %v", err)
  211. }
  212. }()
  213. go func() {
  214. defer wg.Done()
  215. stream, err := client.OpenStream()
  216. if err != nil {
  217. t.Fatalf("err: %v", err)
  218. }
  219. if id := stream.StreamID(); id != 1 {
  220. t.Fatalf("bad: %v", id)
  221. }
  222. if err := stream.Close(); err != nil {
  223. t.Fatalf("err: %v", err)
  224. }
  225. }()
  226. doneCh := make(chan struct{})
  227. go func() {
  228. wg.Wait()
  229. close(doneCh)
  230. }()
  231. select {
  232. case <-doneCh:
  233. case <-time.After(time.Second):
  234. panic("timeout")
  235. }
  236. }
  237. func TestClose_closeTimeout(t *testing.T) {
  238. conf := testConf()
  239. conf.StreamCloseTimeout = 10 * time.Millisecond
  240. client, server := testClientServerConfig(conf)
  241. defer client.Close()
  242. defer server.Close()
  243. if client.NumStreams() != 0 {
  244. t.Fatalf("bad")
  245. }
  246. if server.NumStreams() != 0 {
  247. t.Fatalf("bad")
  248. }
  249. wg := &sync.WaitGroup{}
  250. wg.Add(2)
  251. // Open a stream on the client but only close it on the server.
  252. // We want to see if the stream ever gets cleaned up on the client.
  253. var clientStream *Stream
  254. go func() {
  255. defer wg.Done()
  256. var err error
  257. clientStream, err = client.OpenStream()
  258. if err != nil {
  259. t.Fatalf("err: %v", err)
  260. }
  261. }()
  262. go func() {
  263. defer wg.Done()
  264. stream, err := server.AcceptStream()
  265. if err != nil {
  266. t.Fatalf("err: %v", err)
  267. }
  268. if err := stream.Close(); err != nil {
  269. t.Fatalf("err: %v", err)
  270. }
  271. }()
  272. doneCh := make(chan struct{})
  273. go func() {
  274. wg.Wait()
  275. close(doneCh)
  276. }()
  277. select {
  278. case <-doneCh:
  279. case <-time.After(time.Second):
  280. panic("timeout")
  281. }
  282. // We should have zero streams after our timeout period
  283. time.Sleep(100 * time.Millisecond)
  284. if v := server.NumStreams(); v > 0 {
  285. t.Fatalf("should have zero streams: %d", v)
  286. }
  287. if v := client.NumStreams(); v > 0 {
  288. t.Fatalf("should have zero streams: %d", v)
  289. }
  290. if _, err := clientStream.Write([]byte("hello")); err == nil {
  291. t.Fatal("should error on write")
  292. } else if err.Error() != "connection reset" {
  293. t.Fatalf("expected connection reset, got %q", err)
  294. }
  295. }
  296. func TestNonNilInterface(t *testing.T) {
  297. _, server := testClientServer()
  298. server.Close()
  299. conn, err := server.Accept()
  300. if err != nil && conn != nil {
  301. t.Error("bad: accept should return a connection of nil value")
  302. }
  303. conn, err = server.Open()
  304. if err != nil && conn != nil {
  305. t.Error("bad: open should return a connection of nil value")
  306. }
  307. }
  308. func TestSendData_Small(t *testing.T) {
  309. client, server := testClientServer()
  310. defer client.Close()
  311. defer server.Close()
  312. wg := &sync.WaitGroup{}
  313. wg.Add(2)
  314. go func() {
  315. defer wg.Done()
  316. stream, err := server.AcceptStream()
  317. if err != nil {
  318. t.Fatalf("err: %v", err)
  319. }
  320. if server.NumStreams() != 1 {
  321. t.Fatalf("bad")
  322. }
  323. buf := make([]byte, 4)
  324. for i := 0; i < 1000; i++ {
  325. n, err := stream.Read(buf)
  326. if err != nil {
  327. t.Fatalf("err: %v", err)
  328. }
  329. if n != 4 {
  330. t.Fatalf("short read: %d", n)
  331. }
  332. if string(buf) != "test" {
  333. t.Fatalf("bad: %s", buf)
  334. }
  335. }
  336. if err := stream.Close(); err != nil {
  337. t.Fatalf("err: %v", err)
  338. }
  339. }()
  340. go func() {
  341. defer wg.Done()
  342. stream, err := client.Open()
  343. if err != nil {
  344. t.Fatalf("err: %v", err)
  345. }
  346. if client.NumStreams() != 1 {
  347. t.Fatalf("bad")
  348. }
  349. for i := 0; i < 1000; i++ {
  350. n, err := stream.Write([]byte("test"))
  351. if err != nil {
  352. t.Fatalf("err: %v", err)
  353. }
  354. if n != 4 {
  355. t.Fatalf("short write %d", n)
  356. }
  357. }
  358. if err := stream.Close(); err != nil {
  359. t.Fatalf("err: %v", err)
  360. }
  361. }()
  362. doneCh := make(chan struct{})
  363. go func() {
  364. wg.Wait()
  365. close(doneCh)
  366. }()
  367. select {
  368. case <-doneCh:
  369. case <-time.After(time.Second):
  370. panic("timeout")
  371. }
  372. if client.NumStreams() != 0 {
  373. t.Fatalf("bad")
  374. }
  375. if server.NumStreams() != 0 {
  376. t.Fatalf("bad")
  377. }
  378. }
  379. func TestSendData_Large(t *testing.T) {
  380. client, server := testClientServer()
  381. defer client.Close()
  382. defer server.Close()
  383. const (
  384. sendSize = 250 * 1024 * 1024
  385. recvSize = 4 * 1024
  386. )
  387. data := make([]byte, sendSize)
  388. for idx := range data {
  389. data[idx] = byte(idx % 256)
  390. }
  391. wg := &sync.WaitGroup{}
  392. wg.Add(2)
  393. go func() {
  394. defer wg.Done()
  395. stream, err := server.AcceptStream()
  396. if err != nil {
  397. t.Fatalf("err: %v", err)
  398. }
  399. var sz int
  400. buf := make([]byte, recvSize)
  401. for i := 0; i < sendSize/recvSize; i++ {
  402. n, err := stream.Read(buf)
  403. if err != nil {
  404. t.Fatalf("err: %v", err)
  405. }
  406. if n != recvSize {
  407. t.Fatalf("short read: %d", n)
  408. }
  409. sz += n
  410. for idx := range buf {
  411. if buf[idx] != byte(idx%256) {
  412. t.Fatalf("bad: %v %v %v", i, idx, buf[idx])
  413. }
  414. }
  415. }
  416. if err := stream.Close(); err != nil {
  417. t.Fatalf("err: %v", err)
  418. }
  419. t.Logf("cap=%d, n=%d\n", stream.recvBuf.Cap(), sz)
  420. }()
  421. go func() {
  422. defer wg.Done()
  423. stream, err := client.Open()
  424. if err != nil {
  425. t.Fatalf("err: %v", err)
  426. }
  427. n, err := stream.Write(data)
  428. if err != nil {
  429. t.Fatalf("err: %v", err)
  430. }
  431. if n != len(data) {
  432. t.Fatalf("short write %d", n)
  433. }
  434. if err := stream.Close(); err != nil {
  435. t.Fatalf("err: %v", err)
  436. }
  437. }()
  438. doneCh := make(chan struct{})
  439. go func() {
  440. wg.Wait()
  441. close(doneCh)
  442. }()
  443. select {
  444. case <-doneCh:
  445. case <-time.After(5 * time.Second):
  446. panic("timeout")
  447. }
  448. }
  449. func TestGoAway(t *testing.T) {
  450. client, server := testClientServer()
  451. defer client.Close()
  452. defer server.Close()
  453. if err := server.GoAway(); err != nil {
  454. t.Fatalf("err: %v", err)
  455. }
  456. _, err := client.Open()
  457. if err != ErrRemoteGoAway {
  458. t.Fatalf("err: %v", err)
  459. }
  460. }
  461. func TestManyStreams(t *testing.T) {
  462. client, server := testClientServer()
  463. defer client.Close()
  464. defer server.Close()
  465. wg := &sync.WaitGroup{}
  466. acceptor := func(i int) {
  467. defer wg.Done()
  468. stream, err := server.AcceptStream()
  469. if err != nil {
  470. t.Fatalf("err: %v", err)
  471. }
  472. defer stream.Close()
  473. buf := make([]byte, 512)
  474. for {
  475. n, err := stream.Read(buf)
  476. if err == io.EOF {
  477. return
  478. }
  479. if err != nil {
  480. t.Fatalf("err: %v", err)
  481. }
  482. if n == 0 {
  483. t.Fatalf("err: %v", err)
  484. }
  485. }
  486. }
  487. sender := func(i int) {
  488. defer wg.Done()
  489. stream, err := client.Open()
  490. if err != nil {
  491. t.Fatalf("err: %v", err)
  492. }
  493. defer stream.Close()
  494. msg := fmt.Sprintf("%08d", i)
  495. for i := 0; i < 1000; i++ {
  496. n, err := stream.Write([]byte(msg))
  497. if err != nil {
  498. t.Fatalf("err: %v", err)
  499. }
  500. if n != len(msg) {
  501. t.Fatalf("short write %d", n)
  502. }
  503. }
  504. }
  505. for i := 0; i < 50; i++ {
  506. wg.Add(2)
  507. go acceptor(i)
  508. go sender(i)
  509. }
  510. wg.Wait()
  511. }
  512. func TestManyStreams_PingPong(t *testing.T) {
  513. client, server := testClientServer()
  514. defer client.Close()
  515. defer server.Close()
  516. wg := &sync.WaitGroup{}
  517. ping := []byte("ping")
  518. pong := []byte("pong")
  519. acceptor := func(i int) {
  520. defer wg.Done()
  521. stream, err := server.AcceptStream()
  522. if err != nil {
  523. t.Fatalf("err: %v", err)
  524. }
  525. defer stream.Close()
  526. buf := make([]byte, 4)
  527. for {
  528. // Read the 'ping'
  529. n, err := stream.Read(buf)
  530. if err == io.EOF {
  531. return
  532. }
  533. if err != nil {
  534. t.Fatalf("err: %v", err)
  535. }
  536. if n != 4 {
  537. t.Fatalf("err: %v", err)
  538. }
  539. if !bytes.Equal(buf, ping) {
  540. t.Fatalf("bad: %s", buf)
  541. }
  542. // Shrink the internal buffer!
  543. stream.Shrink()
  544. // Write out the 'pong'
  545. n, err = stream.Write(pong)
  546. if err != nil {
  547. t.Fatalf("err: %v", err)
  548. }
  549. if n != 4 {
  550. t.Fatalf("err: %v", err)
  551. }
  552. }
  553. }
  554. sender := func(i int) {
  555. defer wg.Done()
  556. stream, err := client.OpenStream()
  557. if err != nil {
  558. t.Fatalf("err: %v", err)
  559. }
  560. defer stream.Close()
  561. buf := make([]byte, 4)
  562. for i := 0; i < 1000; i++ {
  563. // Send the 'ping'
  564. n, err := stream.Write(ping)
  565. if err != nil {
  566. t.Fatalf("err: %v", err)
  567. }
  568. if n != 4 {
  569. t.Fatalf("short write %d", n)
  570. }
  571. // Read the 'pong'
  572. n, err = stream.Read(buf)
  573. if err != nil {
  574. t.Fatalf("err: %v", err)
  575. }
  576. if n != 4 {
  577. t.Fatalf("err: %v", err)
  578. }
  579. if !bytes.Equal(buf, pong) {
  580. t.Fatalf("bad: %s", buf)
  581. }
  582. // Shrink the buffer
  583. stream.Shrink()
  584. }
  585. }
  586. for i := 0; i < 50; i++ {
  587. wg.Add(2)
  588. go acceptor(i)
  589. go sender(i)
  590. }
  591. wg.Wait()
  592. }
  593. func TestHalfClose(t *testing.T) {
  594. client, server := testClientServer()
  595. defer client.Close()
  596. defer server.Close()
  597. stream, err := client.Open()
  598. if err != nil {
  599. t.Fatalf("err: %v", err)
  600. }
  601. if _, err = stream.Write([]byte("a")); err != nil {
  602. t.Fatalf("err: %v", err)
  603. }
  604. stream2, err := server.Accept()
  605. if err != nil {
  606. t.Fatalf("err: %v", err)
  607. }
  608. stream2.Close() // Half close
  609. buf := make([]byte, 4)
  610. n, err := stream2.Read(buf)
  611. if err != nil {
  612. t.Fatalf("err: %v", err)
  613. }
  614. if n != 1 {
  615. t.Fatalf("bad: %v", n)
  616. }
  617. // Send more
  618. if _, err = stream.Write([]byte("bcd")); err != nil {
  619. t.Fatalf("err: %v", err)
  620. }
  621. stream.Close()
  622. // Read after close
  623. n, err = stream2.Read(buf)
  624. if err != nil {
  625. t.Fatalf("err: %v", err)
  626. }
  627. if n != 3 {
  628. t.Fatalf("bad: %v", n)
  629. }
  630. // EOF after close
  631. n, err = stream2.Read(buf)
  632. if err != io.EOF {
  633. t.Fatalf("err: %v", err)
  634. }
  635. if n != 0 {
  636. t.Fatalf("bad: %v", n)
  637. }
  638. }
  639. func TestReadDeadline(t *testing.T) {
  640. client, server := testClientServer()
  641. defer client.Close()
  642. defer server.Close()
  643. stream, err := client.Open()
  644. if err != nil {
  645. t.Fatalf("err: %v", err)
  646. }
  647. defer stream.Close()
  648. stream2, err := server.Accept()
  649. if err != nil {
  650. t.Fatalf("err: %v", err)
  651. }
  652. defer stream2.Close()
  653. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  654. t.Fatalf("err: %v", err)
  655. }
  656. buf := make([]byte, 4)
  657. _, err = stream.Read(buf)
  658. if err != ErrTimeout {
  659. t.Fatalf("err: %v", err)
  660. }
  661. // See https://github.com/hashicorp/yamux/issues/90
  662. // The standard library's http server package will read from connections in
  663. // the background to detect if they are alive.
  664. //
  665. // It sets a read deadline on connections and detect if the returned error
  666. // is a network timeout error which implements net.Error.
  667. //
  668. // The HTTP server will cancel all server requests if it isn't timeout error
  669. // from the connection.
  670. //
  671. // We assert that we return an error meeting the interface to avoid
  672. // accidently breaking yamux session compatability with the standard
  673. // library's http server implementation.
  674. if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
  675. t.Fatalf("reading timeout error is expected to implement net.Error and return true when calling Timeout()")
  676. }
  677. }
  678. func TestReadDeadline_BlockedRead(t *testing.T) {
  679. client, server := testClientServer()
  680. defer client.Close()
  681. defer server.Close()
  682. stream, err := client.Open()
  683. if err != nil {
  684. t.Fatalf("err: %v", err)
  685. }
  686. defer stream.Close()
  687. stream2, err := server.Accept()
  688. if err != nil {
  689. t.Fatalf("err: %v", err)
  690. }
  691. defer stream2.Close()
  692. // Start a read that will block
  693. errCh := make(chan error, 1)
  694. go func() {
  695. buf := make([]byte, 4)
  696. _, err := stream.Read(buf)
  697. errCh <- err
  698. close(errCh)
  699. }()
  700. // Wait to ensure the read has started.
  701. time.Sleep(5 * time.Millisecond)
  702. // Update the read deadline
  703. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  704. t.Fatalf("err: %v", err)
  705. }
  706. select {
  707. case <-time.After(100 * time.Millisecond):
  708. t.Fatal("expected read timeout")
  709. case err := <-errCh:
  710. if err != ErrTimeout {
  711. t.Fatalf("expected ErrTimeout; got %v", err)
  712. }
  713. }
  714. }
  715. func TestWriteDeadline(t *testing.T) {
  716. client, server := testClientServer()
  717. defer client.Close()
  718. defer server.Close()
  719. stream, err := client.Open()
  720. if err != nil {
  721. t.Fatalf("err: %v", err)
  722. }
  723. defer stream.Close()
  724. stream2, err := server.Accept()
  725. if err != nil {
  726. t.Fatalf("err: %v", err)
  727. }
  728. defer stream2.Close()
  729. if err := stream.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)); err != nil {
  730. t.Fatalf("err: %v", err)
  731. }
  732. buf := make([]byte, 512)
  733. for i := 0; i < int(initialStreamWindow); i++ {
  734. _, err := stream.Write(buf)
  735. if err != nil && err == ErrTimeout {
  736. return
  737. } else if err != nil {
  738. t.Fatalf("err: %v", err)
  739. }
  740. }
  741. t.Fatalf("Expected timeout")
  742. }
  743. func TestWriteDeadline_BlockedWrite(t *testing.T) {
  744. client, server := testClientServer()
  745. defer client.Close()
  746. defer server.Close()
  747. stream, err := client.Open()
  748. if err != nil {
  749. t.Fatalf("err: %v", err)
  750. }
  751. defer stream.Close()
  752. stream2, err := server.Accept()
  753. if err != nil {
  754. t.Fatalf("err: %v", err)
  755. }
  756. defer stream2.Close()
  757. // Start a goroutine making writes that will block
  758. errCh := make(chan error, 1)
  759. go func() {
  760. buf := make([]byte, 512)
  761. for i := 0; i < int(initialStreamWindow); i++ {
  762. _, err := stream.Write(buf)
  763. if err == nil {
  764. continue
  765. }
  766. errCh <- err
  767. close(errCh)
  768. return
  769. }
  770. close(errCh)
  771. }()
  772. // Wait to ensure the write has started.
  773. time.Sleep(5 * time.Millisecond)
  774. // Update the write deadline
  775. if err := stream.SetWriteDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  776. t.Fatalf("err: %v", err)
  777. }
  778. select {
  779. case <-time.After(1 * time.Second):
  780. t.Fatal("expected write timeout")
  781. case err := <-errCh:
  782. if err != ErrTimeout {
  783. t.Fatalf("expected ErrTimeout; got %v", err)
  784. }
  785. }
  786. }
  787. func TestBacklogExceeded(t *testing.T) {
  788. client, server := testClientServer()
  789. defer client.Close()
  790. defer server.Close()
  791. // Fill the backlog
  792. max := client.config.AcceptBacklog
  793. for i := 0; i < max; i++ {
  794. stream, err := client.Open()
  795. if err != nil {
  796. t.Fatalf("err: %v", err)
  797. }
  798. defer stream.Close()
  799. if _, err := stream.Write([]byte("foo")); err != nil {
  800. t.Fatalf("err: %v", err)
  801. }
  802. }
  803. // Attempt to open a new stream
  804. errCh := make(chan error, 1)
  805. go func() {
  806. _, err := client.Open()
  807. errCh <- err
  808. }()
  809. // Shutdown the server
  810. go func() {
  811. time.Sleep(10 * time.Millisecond)
  812. server.Close()
  813. }()
  814. select {
  815. case err := <-errCh:
  816. if err == nil {
  817. t.Fatalf("open should fail")
  818. }
  819. case <-time.After(time.Second):
  820. t.Fatalf("timeout")
  821. }
  822. }
  823. func TestKeepAlive(t *testing.T) {
  824. client, server := testClientServer()
  825. defer client.Close()
  826. defer server.Close()
  827. time.Sleep(200 * time.Millisecond)
  828. // Ping value should increase
  829. client.pingLock.Lock()
  830. defer client.pingLock.Unlock()
  831. if client.pingID == 0 {
  832. t.Fatalf("should ping")
  833. }
  834. server.pingLock.Lock()
  835. defer server.pingLock.Unlock()
  836. if server.pingID == 0 {
  837. t.Fatalf("should ping")
  838. }
  839. }
  840. func TestKeepAlive_Timeout(t *testing.T) {
  841. conn1, conn2 := testConn()
  842. clientConf := testConf()
  843. clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes
  844. clientConf.EnableKeepAlive = false // Just test one direction, so it's deterministic who hangs up on whom
  845. client, _ := Client(conn1, clientConf)
  846. defer client.Close()
  847. server, _ := Server(conn2, testConf())
  848. defer server.Close()
  849. _ = captureLogs(client) // Client logs aren't part of the test
  850. serverLogs := captureLogs(server)
  851. errCh := make(chan error, 1)
  852. go func() {
  853. _, err := server.Accept() // Wait until server closes
  854. errCh <- err
  855. }()
  856. // Prevent the client from responding
  857. clientConn := client.conn.(*pipeConn)
  858. clientConn.writeBlocker.Lock()
  859. select {
  860. case err := <-errCh:
  861. if err != ErrKeepAliveTimeout {
  862. t.Fatalf("unexpected error: %v", err)
  863. }
  864. case <-time.After(1 * time.Second):
  865. t.Fatalf("timeout waiting for timeout")
  866. }
  867. if !server.IsClosed() {
  868. t.Fatalf("server should have closed")
  869. }
  870. if !serverLogs.match([]string{"[ERR] yamux: keepalive failed: i/o deadline reached"}) {
  871. t.Fatalf("server log incorect: %v", serverLogs.logs())
  872. }
  873. }
  874. func TestLargeWindow(t *testing.T) {
  875. conf := DefaultConfig()
  876. conf.MaxStreamWindowSize *= 2
  877. client, server := testClientServerConfig(conf)
  878. defer client.Close()
  879. defer server.Close()
  880. stream, err := client.Open()
  881. if err != nil {
  882. t.Fatalf("err: %v", err)
  883. }
  884. defer stream.Close()
  885. stream2, err := server.Accept()
  886. if err != nil {
  887. t.Fatalf("err: %v", err)
  888. }
  889. defer stream2.Close()
  890. stream.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
  891. buf := make([]byte, conf.MaxStreamWindowSize)
  892. n, err := stream.Write(buf)
  893. if err != nil {
  894. t.Fatalf("err: %v", err)
  895. }
  896. if n != len(buf) {
  897. t.Fatalf("short write: %d", n)
  898. }
  899. }
  900. type UnlimitedReader struct{}
  901. func (u *UnlimitedReader) Read(p []byte) (int, error) {
  902. runtime.Gosched()
  903. return len(p), nil
  904. }
  905. func TestSendData_VeryLarge(t *testing.T) {
  906. client, server := testClientServer()
  907. defer client.Close()
  908. defer server.Close()
  909. var n int64 = 1 * 1024 * 1024 * 1024
  910. var workers int = 16
  911. wg := &sync.WaitGroup{}
  912. wg.Add(workers * 2)
  913. for i := 0; i < workers; i++ {
  914. go func() {
  915. defer wg.Done()
  916. stream, err := server.AcceptStream()
  917. if err != nil {
  918. t.Fatalf("err: %v", err)
  919. }
  920. defer stream.Close()
  921. buf := make([]byte, 4)
  922. _, err = stream.Read(buf)
  923. if err != nil {
  924. t.Fatalf("err: %v", err)
  925. }
  926. if !bytes.Equal(buf, []byte{0, 1, 2, 3}) {
  927. t.Fatalf("bad header")
  928. }
  929. recv, err := io.Copy(ioutil.Discard, stream)
  930. if err != nil {
  931. t.Fatalf("err: %v", err)
  932. }
  933. if recv != n {
  934. t.Fatalf("bad: %v", recv)
  935. }
  936. }()
  937. }
  938. for i := 0; i < workers; i++ {
  939. go func() {
  940. defer wg.Done()
  941. stream, err := client.Open()
  942. if err != nil {
  943. t.Fatalf("err: %v", err)
  944. }
  945. defer stream.Close()
  946. _, err = stream.Write([]byte{0, 1, 2, 3})
  947. if err != nil {
  948. t.Fatalf("err: %v", err)
  949. }
  950. unlimited := &UnlimitedReader{}
  951. sent, err := io.Copy(stream, io.LimitReader(unlimited, n))
  952. if err != nil {
  953. t.Fatalf("err: %v", err)
  954. }
  955. if sent != n {
  956. t.Fatalf("bad: %v", sent)
  957. }
  958. }()
  959. }
  960. doneCh := make(chan struct{})
  961. go func() {
  962. wg.Wait()
  963. close(doneCh)
  964. }()
  965. select {
  966. case <-doneCh:
  967. case <-time.After(20 * time.Second):
  968. panic("timeout")
  969. }
  970. }
  971. func TestBacklogExceeded_Accept(t *testing.T) {
  972. client, server := testClientServer()
  973. defer client.Close()
  974. defer server.Close()
  975. max := 5 * client.config.AcceptBacklog
  976. go func() {
  977. for i := 0; i < max; i++ {
  978. stream, err := server.Accept()
  979. if err != nil {
  980. t.Fatalf("err: %v", err)
  981. }
  982. defer stream.Close()
  983. }
  984. }()
  985. // Fill the backlog
  986. for i := 0; i < max; i++ {
  987. stream, err := client.Open()
  988. if err != nil {
  989. t.Fatalf("err: %v", err)
  990. }
  991. defer stream.Close()
  992. if _, err := stream.Write([]byte("foo")); err != nil {
  993. t.Fatalf("err: %v", err)
  994. }
  995. }
  996. }
  997. func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
  998. client, server := testClientServerConfig(testConfNoKeepAlive())
  999. defer client.Close()
  1000. defer server.Close()
  1001. var wg sync.WaitGroup
  1002. wg.Add(2)
  1003. // Choose a huge flood size that we know will result in a window update.
  1004. flood := int64(client.config.MaxStreamWindowSize) - 1
  1005. // The server will accept a new stream and then flood data to it.
  1006. go func() {
  1007. defer wg.Done()
  1008. stream, err := server.AcceptStream()
  1009. if err != nil {
  1010. t.Fatalf("err: %v", err)
  1011. }
  1012. defer stream.Close()
  1013. n, err := stream.Write(make([]byte, flood))
  1014. if err != nil {
  1015. t.Fatalf("err: %v", err)
  1016. }
  1017. if int64(n) != flood {
  1018. t.Fatalf("short write: %d", n)
  1019. }
  1020. }()
  1021. // The client will open a stream, block outbound writes, and then
  1022. // listen to the flood from the server, which should time out since
  1023. // it won't be able to send the window update.
  1024. go func() {
  1025. defer wg.Done()
  1026. stream, err := client.OpenStream()
  1027. if err != nil {
  1028. t.Fatalf("err: %v", err)
  1029. }
  1030. defer stream.Close()
  1031. conn := client.conn.(*pipeConn)
  1032. conn.writeBlocker.Lock()
  1033. _, err = stream.Read(make([]byte, flood))
  1034. if err != ErrConnectionWriteTimeout {
  1035. t.Fatalf("err: %v", err)
  1036. }
  1037. }()
  1038. wg.Wait()
  1039. }
  1040. func TestSession_PartialReadWindowUpdate(t *testing.T) {
  1041. client, server := testClientServerConfig(testConfNoKeepAlive())
  1042. defer client.Close()
  1043. defer server.Close()
  1044. var wg sync.WaitGroup
  1045. wg.Add(1)
  1046. // Choose a huge flood size that we know will result in a window update.
  1047. flood := int64(client.config.MaxStreamWindowSize)
  1048. var wr *Stream
  1049. // The server will accept a new stream and then flood data to it.
  1050. go func() {
  1051. defer wg.Done()
  1052. var err error
  1053. wr, err = server.AcceptStream()
  1054. if err != nil {
  1055. t.Fatalf("err: %v", err)
  1056. }
  1057. defer wr.Close()
  1058. if wr.sendWindow != client.config.MaxStreamWindowSize {
  1059. t.Fatalf("sendWindow: exp=%d, got=%d", client.config.MaxStreamWindowSize, wr.sendWindow)
  1060. }
  1061. n, err := wr.Write(make([]byte, flood))
  1062. if err != nil {
  1063. t.Fatalf("err: %v", err)
  1064. }
  1065. if int64(n) != flood {
  1066. t.Fatalf("short write: %d", n)
  1067. }
  1068. if wr.sendWindow != 0 {
  1069. t.Fatalf("sendWindow: exp=%d, got=%d", 0, wr.sendWindow)
  1070. }
  1071. }()
  1072. stream, err := client.OpenStream()
  1073. if err != nil {
  1074. t.Fatalf("err: %v", err)
  1075. }
  1076. defer stream.Close()
  1077. wg.Wait()
  1078. _, err = stream.Read(make([]byte, flood/2+1))
  1079. if exp := uint32(flood/2 + 1); wr.sendWindow != exp {
  1080. t.Errorf("sendWindow: exp=%d, got=%d", exp, wr.sendWindow)
  1081. }
  1082. }
  1083. func TestSession_sendNoWait_Timeout(t *testing.T) {
  1084. client, server := testClientServerConfig(testConfNoKeepAlive())
  1085. defer client.Close()
  1086. defer server.Close()
  1087. var wg sync.WaitGroup
  1088. wg.Add(2)
  1089. go func() {
  1090. defer wg.Done()
  1091. stream, err := server.AcceptStream()
  1092. if err != nil {
  1093. t.Fatalf("err: %v", err)
  1094. }
  1095. defer stream.Close()
  1096. }()
  1097. // The client will open the stream and then block outbound writes, we'll
  1098. // probe sendNoWait once it gets into that state.
  1099. go func() {
  1100. defer wg.Done()
  1101. stream, err := client.OpenStream()
  1102. if err != nil {
  1103. t.Fatalf("err: %v", err)
  1104. }
  1105. defer stream.Close()
  1106. conn := client.conn.(*pipeConn)
  1107. conn.writeBlocker.Lock()
  1108. hdr := header(make([]byte, headerSize))
  1109. hdr.encode(typePing, flagACK, 0, 0)
  1110. for {
  1111. err = client.sendNoWait(hdr)
  1112. if err == nil {
  1113. continue
  1114. } else if err == ErrConnectionWriteTimeout {
  1115. break
  1116. } else {
  1117. t.Fatalf("err: %v", err)
  1118. }
  1119. }
  1120. }()
  1121. wg.Wait()
  1122. }
  1123. func TestSession_PingOfDeath(t *testing.T) {
  1124. client, server := testClientServerConfig(testConfNoKeepAlive())
  1125. defer client.Close()
  1126. defer server.Close()
  1127. var wg sync.WaitGroup
  1128. wg.Add(2)
  1129. var doPingOfDeath sync.Mutex
  1130. doPingOfDeath.Lock()
  1131. // This is used later to block outbound writes.
  1132. conn := server.conn.(*pipeConn)
  1133. // The server will accept a stream, block outbound writes, and then
  1134. // flood its send channel so that no more headers can be queued.
  1135. go func() {
  1136. defer wg.Done()
  1137. stream, err := server.AcceptStream()
  1138. if err != nil {
  1139. t.Fatalf("err: %v", err)
  1140. }
  1141. defer stream.Close()
  1142. conn.writeBlocker.Lock()
  1143. for {
  1144. hdr := header(make([]byte, headerSize))
  1145. hdr.encode(typePing, 0, 0, 0)
  1146. err = server.sendNoWait(hdr)
  1147. if err == nil {
  1148. continue
  1149. } else if err == ErrConnectionWriteTimeout {
  1150. break
  1151. } else {
  1152. t.Fatalf("err: %v", err)
  1153. }
  1154. }
  1155. doPingOfDeath.Unlock()
  1156. }()
  1157. // The client will open a stream and then send the server a ping once it
  1158. // can no longer write. This makes sure the server doesn't deadlock reads
  1159. // while trying to reply to the ping with no ability to write.
  1160. go func() {
  1161. defer wg.Done()
  1162. stream, err := client.OpenStream()
  1163. if err != nil {
  1164. t.Fatalf("err: %v", err)
  1165. }
  1166. defer stream.Close()
  1167. // This ping will never unblock because the ping id will never
  1168. // show up in a response.
  1169. doPingOfDeath.Lock()
  1170. go func() { client.Ping() }()
  1171. // Wait for a while to make sure the previous ping times out,
  1172. // then turn writes back on and make sure a ping works again.
  1173. time.Sleep(2 * server.config.ConnectionWriteTimeout)
  1174. conn.writeBlocker.Unlock()
  1175. if _, err = client.Ping(); err != nil {
  1176. t.Fatalf("err: %v", err)
  1177. }
  1178. }()
  1179. wg.Wait()
  1180. }
  1181. func TestSession_ConnectionWriteTimeout(t *testing.T) {
  1182. client, server := testClientServerConfig(testConfNoKeepAlive())
  1183. defer client.Close()
  1184. defer server.Close()
  1185. var wg sync.WaitGroup
  1186. wg.Add(2)
  1187. go func() {
  1188. defer wg.Done()
  1189. stream, err := server.AcceptStream()
  1190. if err != nil {
  1191. t.Fatalf("err: %v", err)
  1192. }
  1193. defer stream.Close()
  1194. }()
  1195. // The client will open the stream and then block outbound writes, we'll
  1196. // tee up a write and make sure it eventually times out.
  1197. go func() {
  1198. defer wg.Done()
  1199. stream, err := client.OpenStream()
  1200. if err != nil {
  1201. t.Fatalf("err: %v", err)
  1202. }
  1203. defer stream.Close()
  1204. conn := client.conn.(*pipeConn)
  1205. conn.writeBlocker.Lock()
  1206. // Since the write goroutine is blocked then this will return a
  1207. // timeout since it can't get feedback about whether the write
  1208. // worked.
  1209. n, err := stream.Write([]byte("hello"))
  1210. if err != ErrConnectionWriteTimeout {
  1211. t.Fatalf("err: %v", err)
  1212. }
  1213. if n != 0 {
  1214. t.Fatalf("lied about writes: %d", n)
  1215. }
  1216. }()
  1217. wg.Wait()
  1218. }