session_test.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480
  1. package yamux
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "reflect"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "testing"
  14. "time"
  15. )
  16. type logCapture struct{ bytes.Buffer }
  17. func (l *logCapture) logs() []string {
  18. return strings.Split(strings.TrimSpace(l.String()), "\n")
  19. }
  20. func (l *logCapture) match(expect []string) bool {
  21. return reflect.DeepEqual(l.logs(), expect)
  22. }
  23. func captureLogs(s *Session) *logCapture {
  24. buf := new(logCapture)
  25. s.logger = log.New(buf, "", 0)
  26. return buf
  27. }
  28. type pipeConn struct {
  29. reader *io.PipeReader
  30. writer *io.PipeWriter
  31. writeBlocker sync.Mutex
  32. }
  33. func (p *pipeConn) Read(b []byte) (int, error) {
  34. return p.reader.Read(b)
  35. }
  36. func (p *pipeConn) Write(b []byte) (int, error) {
  37. p.writeBlocker.Lock()
  38. defer p.writeBlocker.Unlock()
  39. return p.writer.Write(b)
  40. }
  41. func (p *pipeConn) Close() error {
  42. p.reader.Close()
  43. return p.writer.Close()
  44. }
  45. func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) {
  46. read1, write1 := io.Pipe()
  47. read2, write2 := io.Pipe()
  48. conn1 := &pipeConn{reader: read1, writer: write2}
  49. conn2 := &pipeConn{reader: read2, writer: write1}
  50. return conn1, conn2
  51. }
  52. func testConf() *Config {
  53. conf := DefaultConfig()
  54. conf.AcceptBacklog = 64
  55. conf.KeepAliveInterval = 100 * time.Millisecond
  56. conf.ConnectionWriteTimeout = 250 * time.Millisecond
  57. return conf
  58. }
  59. func testConfNoKeepAlive() *Config {
  60. conf := testConf()
  61. conf.EnableKeepAlive = false
  62. return conf
  63. }
  64. func testClientServer() (*Session, *Session) {
  65. return testClientServerConfig(testConf())
  66. }
  67. func testClientServerConfig(conf *Config) (*Session, *Session) {
  68. conn1, conn2 := testConn()
  69. client, _ := Client(conn1, conf)
  70. server, _ := Server(conn2, conf)
  71. return client, server
  72. }
  73. func TestPing(t *testing.T) {
  74. client, server := testClientServer()
  75. defer client.Close()
  76. defer server.Close()
  77. rtt, err := client.Ping()
  78. if err != nil {
  79. t.Fatalf("err: %v", err)
  80. }
  81. if rtt == 0 {
  82. t.Fatalf("bad: %v", rtt)
  83. }
  84. rtt, err = server.Ping()
  85. if err != nil {
  86. t.Fatalf("err: %v", err)
  87. }
  88. if rtt == 0 {
  89. t.Fatalf("bad: %v", rtt)
  90. }
  91. }
  92. func TestPing_Timeout(t *testing.T) {
  93. client, server := testClientServerConfig(testConfNoKeepAlive())
  94. defer client.Close()
  95. defer server.Close()
  96. // Prevent the client from responding
  97. clientConn := client.conn.(*pipeConn)
  98. clientConn.writeBlocker.Lock()
  99. errCh := make(chan error, 1)
  100. go func() {
  101. _, err := server.Ping() // Ping via the server session
  102. errCh <- err
  103. }()
  104. select {
  105. case err := <-errCh:
  106. if err != ErrTimeout {
  107. t.Fatalf("err: %v", err)
  108. }
  109. case <-time.After(client.config.ConnectionWriteTimeout * 2):
  110. t.Fatalf("failed to timeout within expected %v", client.config.ConnectionWriteTimeout)
  111. }
  112. // Verify that we recover, even if we gave up
  113. clientConn.writeBlocker.Unlock()
  114. go func() {
  115. _, err := server.Ping() // Ping via the server session
  116. errCh <- err
  117. }()
  118. select {
  119. case err := <-errCh:
  120. if err != nil {
  121. t.Fatalf("err: %v", err)
  122. }
  123. case <-time.After(client.config.ConnectionWriteTimeout):
  124. t.Fatalf("timeout")
  125. }
  126. }
  127. func TestCloseBeforeAck(t *testing.T) {
  128. cfg := testConf()
  129. cfg.AcceptBacklog = 8
  130. client, server := testClientServerConfig(cfg)
  131. defer client.Close()
  132. defer server.Close()
  133. for i := 0; i < 8; i++ {
  134. s, err := client.OpenStream()
  135. if err != nil {
  136. t.Fatal(err)
  137. }
  138. s.Close()
  139. }
  140. for i := 0; i < 8; i++ {
  141. s, err := server.AcceptStream()
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. s.Close()
  146. }
  147. done := make(chan struct{})
  148. go func() {
  149. defer close(done)
  150. s, err := client.OpenStream()
  151. if err != nil {
  152. t.Fatal(err)
  153. }
  154. s.Close()
  155. }()
  156. select {
  157. case <-done:
  158. case <-time.After(time.Second * 5):
  159. t.Fatal("timed out trying to open stream")
  160. }
  161. }
  162. func TestAccept(t *testing.T) {
  163. client, server := testClientServer()
  164. defer client.Close()
  165. defer server.Close()
  166. if client.NumStreams() != 0 {
  167. t.Fatalf("bad")
  168. }
  169. if server.NumStreams() != 0 {
  170. t.Fatalf("bad")
  171. }
  172. wg := &sync.WaitGroup{}
  173. wg.Add(4)
  174. go func() {
  175. defer wg.Done()
  176. stream, err := server.AcceptStream()
  177. if err != nil {
  178. t.Fatalf("err: %v", err)
  179. }
  180. if id := stream.StreamID(); id != 1 {
  181. t.Fatalf("bad: %v", id)
  182. }
  183. if err := stream.Close(); err != nil {
  184. t.Fatalf("err: %v", err)
  185. }
  186. }()
  187. go func() {
  188. defer wg.Done()
  189. stream, err := client.AcceptStream()
  190. if err != nil {
  191. t.Fatalf("err: %v", err)
  192. }
  193. if id := stream.StreamID(); id != 2 {
  194. t.Fatalf("bad: %v", id)
  195. }
  196. if err := stream.Close(); err != nil {
  197. t.Fatalf("err: %v", err)
  198. }
  199. }()
  200. go func() {
  201. defer wg.Done()
  202. stream, err := server.OpenStream()
  203. if err != nil {
  204. t.Fatalf("err: %v", err)
  205. }
  206. if id := stream.StreamID(); id != 2 {
  207. t.Fatalf("bad: %v", id)
  208. }
  209. if err := stream.Close(); err != nil {
  210. t.Fatalf("err: %v", err)
  211. }
  212. }()
  213. go func() {
  214. defer wg.Done()
  215. stream, err := client.OpenStream()
  216. if err != nil {
  217. t.Fatalf("err: %v", err)
  218. }
  219. if id := stream.StreamID(); id != 1 {
  220. t.Fatalf("bad: %v", id)
  221. }
  222. if err := stream.Close(); err != nil {
  223. t.Fatalf("err: %v", err)
  224. }
  225. }()
  226. doneCh := make(chan struct{})
  227. go func() {
  228. wg.Wait()
  229. close(doneCh)
  230. }()
  231. select {
  232. case <-doneCh:
  233. case <-time.After(time.Second):
  234. panic("timeout")
  235. }
  236. }
  237. func TestOpenStreamTimeout(t *testing.T) {
  238. const timeout = 25 * time.Millisecond
  239. cfg := testConf()
  240. cfg.StreamOpenTimeout = timeout
  241. client, server := testClientServerConfig(cfg)
  242. defer client.Close()
  243. defer server.Close()
  244. clientLogs := captureLogs(client)
  245. // Open a single stream without a server to acknowledge it.
  246. s, err := client.OpenStream()
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. // Sleep for longer than the stream open timeout.
  251. // Since no ACKs are received, the stream and session should be closed.
  252. time.Sleep(timeout * 5)
  253. if !clientLogs.match([]string{"[ERR] yamux: aborted stream open (destination=yamux:remote): i/o deadline reached"}) {
  254. t.Fatalf("server log incorect: %v", clientLogs.logs())
  255. }
  256. if s.state != streamClosed {
  257. t.Fatalf("stream should have been closed")
  258. }
  259. if !client.IsClosed() {
  260. t.Fatalf("session should have been closed")
  261. }
  262. }
  263. func TestClose_closeTimeout(t *testing.T) {
  264. conf := testConf()
  265. conf.StreamCloseTimeout = 10 * time.Millisecond
  266. client, server := testClientServerConfig(conf)
  267. defer client.Close()
  268. defer server.Close()
  269. if client.NumStreams() != 0 {
  270. t.Fatalf("bad")
  271. }
  272. if server.NumStreams() != 0 {
  273. t.Fatalf("bad")
  274. }
  275. wg := &sync.WaitGroup{}
  276. wg.Add(2)
  277. // Open a stream on the client but only close it on the server.
  278. // We want to see if the stream ever gets cleaned up on the client.
  279. var clientStream *Stream
  280. go func() {
  281. defer wg.Done()
  282. var err error
  283. clientStream, err = client.OpenStream()
  284. if err != nil {
  285. t.Fatalf("err: %v", err)
  286. }
  287. }()
  288. go func() {
  289. defer wg.Done()
  290. stream, err := server.AcceptStream()
  291. if err != nil {
  292. t.Fatalf("err: %v", err)
  293. }
  294. if err := stream.Close(); err != nil {
  295. t.Fatalf("err: %v", err)
  296. }
  297. }()
  298. doneCh := make(chan struct{})
  299. go func() {
  300. wg.Wait()
  301. close(doneCh)
  302. }()
  303. select {
  304. case <-doneCh:
  305. case <-time.After(time.Second):
  306. panic("timeout")
  307. }
  308. // We should have zero streams after our timeout period
  309. time.Sleep(100 * time.Millisecond)
  310. if v := server.NumStreams(); v > 0 {
  311. t.Fatalf("should have zero streams: %d", v)
  312. }
  313. if v := client.NumStreams(); v > 0 {
  314. t.Fatalf("should have zero streams: %d", v)
  315. }
  316. if _, err := clientStream.Write([]byte("hello")); err == nil {
  317. t.Fatal("should error on write")
  318. } else if err.Error() != "connection reset" {
  319. t.Fatalf("expected connection reset, got %q", err)
  320. }
  321. }
  322. func TestNonNilInterface(t *testing.T) {
  323. _, server := testClientServer()
  324. server.Close()
  325. conn, err := server.Accept()
  326. if err != nil && conn != nil {
  327. t.Error("bad: accept should return a connection of nil value")
  328. }
  329. conn, err = server.Open()
  330. if err != nil && conn != nil {
  331. t.Error("bad: open should return a connection of nil value")
  332. }
  333. }
  334. func TestSendData_Small(t *testing.T) {
  335. client, server := testClientServer()
  336. defer client.Close()
  337. defer server.Close()
  338. wg := &sync.WaitGroup{}
  339. wg.Add(2)
  340. go func() {
  341. defer wg.Done()
  342. stream, err := server.AcceptStream()
  343. if err != nil {
  344. t.Fatalf("err: %v", err)
  345. }
  346. if server.NumStreams() != 1 {
  347. t.Fatalf("bad")
  348. }
  349. buf := make([]byte, 4)
  350. for i := 0; i < 1000; i++ {
  351. n, err := stream.Read(buf)
  352. if err != nil {
  353. t.Fatalf("err: %v", err)
  354. }
  355. if n != 4 {
  356. t.Fatalf("short read: %d", n)
  357. }
  358. if string(buf) != "test" {
  359. t.Fatalf("bad: %s", buf)
  360. }
  361. }
  362. if err := stream.Close(); err != nil {
  363. t.Fatalf("err: %v", err)
  364. }
  365. }()
  366. go func() {
  367. defer wg.Done()
  368. stream, err := client.Open()
  369. if err != nil {
  370. t.Fatalf("err: %v", err)
  371. }
  372. if client.NumStreams() != 1 {
  373. t.Fatalf("bad")
  374. }
  375. for i := 0; i < 1000; i++ {
  376. n, err := stream.Write([]byte("test"))
  377. if err != nil {
  378. t.Fatalf("err: %v", err)
  379. }
  380. if n != 4 {
  381. t.Fatalf("short write %d", n)
  382. }
  383. }
  384. if err := stream.Close(); err != nil {
  385. t.Fatalf("err: %v", err)
  386. }
  387. }()
  388. doneCh := make(chan struct{})
  389. go func() {
  390. wg.Wait()
  391. close(doneCh)
  392. }()
  393. select {
  394. case <-doneCh:
  395. if client.NumStreams() != 0 {
  396. t.Fatalf("bad")
  397. }
  398. if server.NumStreams() != 0 {
  399. t.Fatalf("bad")
  400. }
  401. return
  402. case <-time.After(time.Second):
  403. panic("timeout")
  404. }
  405. }
  406. func TestSendData_Large(t *testing.T) {
  407. client, server := testClientServer()
  408. defer client.Close()
  409. defer server.Close()
  410. const (
  411. sendSize = 250 * 1024 * 1024
  412. recvSize = 4 * 1024
  413. )
  414. data := make([]byte, sendSize)
  415. for idx := range data {
  416. data[idx] = byte(idx % 256)
  417. }
  418. wg := &sync.WaitGroup{}
  419. wg.Add(2)
  420. go func() {
  421. defer wg.Done()
  422. stream, err := server.AcceptStream()
  423. if err != nil {
  424. t.Fatalf("err: %v", err)
  425. }
  426. var sz int
  427. buf := make([]byte, recvSize)
  428. for i := 0; i < sendSize/recvSize; i++ {
  429. n, err := stream.Read(buf)
  430. if err != nil {
  431. t.Fatalf("err: %v", err)
  432. }
  433. if n != recvSize {
  434. t.Fatalf("short read: %d", n)
  435. }
  436. sz += n
  437. for idx := range buf {
  438. if buf[idx] != byte(idx%256) {
  439. t.Fatalf("bad: %v %v %v", i, idx, buf[idx])
  440. }
  441. }
  442. }
  443. if err := stream.Close(); err != nil {
  444. t.Fatalf("err: %v", err)
  445. }
  446. t.Logf("cap=%d, n=%d\n", stream.recvBuf.Cap(), sz)
  447. }()
  448. go func() {
  449. defer wg.Done()
  450. stream, err := client.Open()
  451. if err != nil {
  452. t.Fatalf("err: %v", err)
  453. }
  454. n, err := stream.Write(data)
  455. if err != nil {
  456. t.Fatalf("err: %v", err)
  457. }
  458. if n != len(data) {
  459. t.Fatalf("short write %d", n)
  460. }
  461. if err := stream.Close(); err != nil {
  462. t.Fatalf("err: %v", err)
  463. }
  464. }()
  465. doneCh := make(chan struct{})
  466. go func() {
  467. wg.Wait()
  468. close(doneCh)
  469. }()
  470. select {
  471. case <-doneCh:
  472. return
  473. case <-time.After(5 * time.Second):
  474. panic("timeout")
  475. }
  476. }
  477. func TestGoAway(t *testing.T) {
  478. client, server := testClientServer()
  479. defer client.Close()
  480. defer server.Close()
  481. if err := server.GoAway(); err != nil {
  482. t.Fatalf("err: %v", err)
  483. }
  484. _, err := client.Open()
  485. if err != ErrRemoteGoAway {
  486. t.Fatalf("err: %v", err)
  487. }
  488. }
  489. func TestManyStreams(t *testing.T) {
  490. client, server := testClientServer()
  491. defer client.Close()
  492. defer server.Close()
  493. wg := &sync.WaitGroup{}
  494. acceptor := func(i int) {
  495. defer wg.Done()
  496. stream, err := server.AcceptStream()
  497. if err != nil {
  498. t.Fatalf("err: %v", err)
  499. }
  500. defer stream.Close()
  501. buf := make([]byte, 512)
  502. for {
  503. n, err := stream.Read(buf)
  504. if err == io.EOF {
  505. return
  506. }
  507. if err != nil {
  508. t.Fatalf("err: %v", err)
  509. }
  510. if n == 0 {
  511. t.Fatalf("err: %v", err)
  512. }
  513. }
  514. }
  515. sender := func(i int) {
  516. defer wg.Done()
  517. stream, err := client.Open()
  518. if err != nil {
  519. t.Fatalf("err: %v", err)
  520. }
  521. defer stream.Close()
  522. msg := fmt.Sprintf("%08d", i)
  523. for i := 0; i < 1000; i++ {
  524. n, err := stream.Write([]byte(msg))
  525. if err != nil {
  526. t.Fatalf("err: %v", err)
  527. }
  528. if n != len(msg) {
  529. t.Fatalf("short write %d", n)
  530. }
  531. }
  532. }
  533. for i := 0; i < 50; i++ {
  534. wg.Add(2)
  535. go acceptor(i)
  536. go sender(i)
  537. }
  538. wg.Wait()
  539. }
  540. func TestManyStreams_PingPong(t *testing.T) {
  541. client, server := testClientServer()
  542. defer client.Close()
  543. defer server.Close()
  544. wg := &sync.WaitGroup{}
  545. ping := []byte("ping")
  546. pong := []byte("pong")
  547. acceptor := func(i int) {
  548. defer wg.Done()
  549. stream, err := server.AcceptStream()
  550. if err != nil {
  551. t.Fatalf("err: %v", err)
  552. }
  553. defer stream.Close()
  554. buf := make([]byte, 4)
  555. for {
  556. // Read the 'ping'
  557. n, err := stream.Read(buf)
  558. if err == io.EOF {
  559. return
  560. }
  561. if err != nil {
  562. t.Fatalf("err: %v", err)
  563. }
  564. if n != 4 {
  565. t.Fatalf("err: %v", err)
  566. }
  567. if !bytes.Equal(buf, ping) {
  568. t.Fatalf("bad: %s", buf)
  569. }
  570. // Shrink the internal buffer!
  571. stream.Shrink()
  572. // Write out the 'pong'
  573. n, err = stream.Write(pong)
  574. if err != nil {
  575. t.Fatalf("err: %v", err)
  576. }
  577. if n != 4 {
  578. t.Fatalf("err: %v", err)
  579. }
  580. }
  581. }
  582. sender := func(i int) {
  583. defer wg.Done()
  584. stream, err := client.OpenStream()
  585. if err != nil {
  586. t.Fatalf("err: %v", err)
  587. }
  588. defer stream.Close()
  589. buf := make([]byte, 4)
  590. for i := 0; i < 1000; i++ {
  591. // Send the 'ping'
  592. n, err := stream.Write(ping)
  593. if err != nil {
  594. t.Fatalf("err: %v", err)
  595. }
  596. if n != 4 {
  597. t.Fatalf("short write %d", n)
  598. }
  599. // Read the 'pong'
  600. n, err = stream.Read(buf)
  601. if err != nil {
  602. t.Fatalf("err: %v", err)
  603. }
  604. if n != 4 {
  605. t.Fatalf("err: %v", err)
  606. }
  607. if !bytes.Equal(buf, pong) {
  608. t.Fatalf("bad: %s", buf)
  609. }
  610. // Shrink the buffer
  611. stream.Shrink()
  612. }
  613. }
  614. for i := 0; i < 50; i++ {
  615. wg.Add(2)
  616. go acceptor(i)
  617. go sender(i)
  618. }
  619. wg.Wait()
  620. }
  621. func TestHalfClose(t *testing.T) {
  622. client, server := testClientServer()
  623. defer client.Close()
  624. defer server.Close()
  625. stream, err := client.Open()
  626. if err != nil {
  627. t.Fatalf("err: %v", err)
  628. }
  629. if _, err = stream.Write([]byte("a")); err != nil {
  630. t.Fatalf("err: %v", err)
  631. }
  632. stream2, err := server.Accept()
  633. if err != nil {
  634. t.Fatalf("err: %v", err)
  635. }
  636. stream2.Close() // Half close
  637. buf := make([]byte, 4)
  638. n, err := stream2.Read(buf)
  639. if err != nil {
  640. t.Fatalf("err: %v", err)
  641. }
  642. if n != 1 {
  643. t.Fatalf("bad: %v", n)
  644. }
  645. // Send more
  646. if _, err = stream.Write([]byte("bcd")); err != nil {
  647. t.Fatalf("err: %v", err)
  648. }
  649. stream.Close()
  650. // Read after close
  651. n, err = stream2.Read(buf)
  652. if err != nil {
  653. t.Fatalf("err: %v", err)
  654. }
  655. if n != 3 {
  656. t.Fatalf("bad: %v", n)
  657. }
  658. // EOF after close
  659. n, err = stream2.Read(buf)
  660. if err != io.EOF {
  661. t.Fatalf("err: %v", err)
  662. }
  663. if n != 0 {
  664. t.Fatalf("bad: %v", n)
  665. }
  666. }
  667. func TestReadDeadline(t *testing.T) {
  668. client, server := testClientServer()
  669. defer client.Close()
  670. defer server.Close()
  671. stream, err := client.Open()
  672. if err != nil {
  673. t.Fatalf("err: %v", err)
  674. }
  675. defer stream.Close()
  676. stream2, err := server.Accept()
  677. if err != nil {
  678. t.Fatalf("err: %v", err)
  679. }
  680. defer stream2.Close()
  681. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  682. t.Fatalf("err: %v", err)
  683. }
  684. buf := make([]byte, 4)
  685. _, err = stream.Read(buf)
  686. if err != ErrTimeout {
  687. t.Fatalf("err: %v", err)
  688. }
  689. // See https://github.com/hashicorp/yamux/issues/90
  690. // The standard library's http server package will read from connections in
  691. // the background to detect if they are alive.
  692. //
  693. // It sets a read deadline on connections and detect if the returned error
  694. // is a network timeout error which implements net.Error.
  695. //
  696. // The HTTP server will cancel all server requests if it isn't timeout error
  697. // from the connection.
  698. //
  699. // We assert that we return an error meeting the interface to avoid
  700. // accidently breaking yamux session compatability with the standard
  701. // library's http server implementation.
  702. if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
  703. t.Fatalf("reading timeout error is expected to implement net.Error and return true when calling Timeout()")
  704. }
  705. }
  706. func TestReadDeadline_BlockedRead(t *testing.T) {
  707. client, server := testClientServer()
  708. defer client.Close()
  709. defer server.Close()
  710. stream, err := client.Open()
  711. if err != nil {
  712. t.Fatalf("err: %v", err)
  713. }
  714. defer stream.Close()
  715. stream2, err := server.Accept()
  716. if err != nil {
  717. t.Fatalf("err: %v", err)
  718. }
  719. defer stream2.Close()
  720. // Start a read that will block
  721. errCh := make(chan error, 1)
  722. go func() {
  723. buf := make([]byte, 4)
  724. _, err := stream.Read(buf)
  725. errCh <- err
  726. close(errCh)
  727. }()
  728. // Wait to ensure the read has started.
  729. time.Sleep(5 * time.Millisecond)
  730. // Update the read deadline
  731. if err := stream.SetReadDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  732. t.Fatalf("err: %v", err)
  733. }
  734. select {
  735. case <-time.After(100 * time.Millisecond):
  736. t.Fatal("expected read timeout")
  737. case err := <-errCh:
  738. if err != ErrTimeout {
  739. t.Fatalf("expected ErrTimeout; got %v", err)
  740. }
  741. }
  742. }
  743. func TestWriteDeadline(t *testing.T) {
  744. client, server := testClientServer()
  745. defer client.Close()
  746. defer server.Close()
  747. stream, err := client.Open()
  748. if err != nil {
  749. t.Fatalf("err: %v", err)
  750. }
  751. defer stream.Close()
  752. stream2, err := server.Accept()
  753. if err != nil {
  754. t.Fatalf("err: %v", err)
  755. }
  756. defer stream2.Close()
  757. if err := stream.SetWriteDeadline(time.Now().Add(50 * time.Millisecond)); err != nil {
  758. t.Fatalf("err: %v", err)
  759. }
  760. buf := make([]byte, 512)
  761. for i := 0; i < int(initialStreamWindow); i++ {
  762. _, err := stream.Write(buf)
  763. if err != nil && err == ErrTimeout {
  764. return
  765. } else if err != nil {
  766. t.Fatalf("err: %v", err)
  767. }
  768. }
  769. t.Fatalf("Expected timeout")
  770. }
  771. func TestWriteDeadline_BlockedWrite(t *testing.T) {
  772. client, server := testClientServer()
  773. defer client.Close()
  774. defer server.Close()
  775. stream, err := client.Open()
  776. if err != nil {
  777. t.Fatalf("err: %v", err)
  778. }
  779. defer stream.Close()
  780. stream2, err := server.Accept()
  781. if err != nil {
  782. t.Fatalf("err: %v", err)
  783. }
  784. defer stream2.Close()
  785. // Start a goroutine making writes that will block
  786. errCh := make(chan error, 1)
  787. go func() {
  788. buf := make([]byte, 512)
  789. for i := 0; i < int(initialStreamWindow); i++ {
  790. _, err := stream.Write(buf)
  791. if err == nil {
  792. continue
  793. }
  794. errCh <- err
  795. close(errCh)
  796. return
  797. }
  798. close(errCh)
  799. }()
  800. // Wait to ensure the write has started.
  801. time.Sleep(5 * time.Millisecond)
  802. // Update the write deadline
  803. if err := stream.SetWriteDeadline(time.Now().Add(5 * time.Millisecond)); err != nil {
  804. t.Fatalf("err: %v", err)
  805. }
  806. select {
  807. case <-time.After(1 * time.Second):
  808. t.Fatal("expected write timeout")
  809. case err := <-errCh:
  810. if err != ErrTimeout {
  811. t.Fatalf("expected ErrTimeout; got %v", err)
  812. }
  813. }
  814. }
  815. func TestBacklogExceeded(t *testing.T) {
  816. client, server := testClientServer()
  817. defer client.Close()
  818. defer server.Close()
  819. // Fill the backlog
  820. max := client.config.AcceptBacklog
  821. for i := 0; i < max; i++ {
  822. stream, err := client.Open()
  823. if err != nil {
  824. t.Fatalf("err: %v", err)
  825. }
  826. defer stream.Close()
  827. if _, err := stream.Write([]byte("foo")); err != nil {
  828. t.Fatalf("err: %v", err)
  829. }
  830. }
  831. // Attempt to open a new stream
  832. errCh := make(chan error, 1)
  833. go func() {
  834. _, err := client.Open()
  835. errCh <- err
  836. }()
  837. // Shutdown the server
  838. go func() {
  839. time.Sleep(10 * time.Millisecond)
  840. server.Close()
  841. }()
  842. select {
  843. case err := <-errCh:
  844. if err == nil {
  845. t.Fatalf("open should fail")
  846. }
  847. case <-time.After(time.Second):
  848. t.Fatalf("timeout")
  849. }
  850. }
  851. func TestKeepAlive(t *testing.T) {
  852. client, server := testClientServer()
  853. defer client.Close()
  854. defer server.Close()
  855. time.Sleep(200 * time.Millisecond)
  856. // Ping value should increase
  857. client.pingLock.Lock()
  858. defer client.pingLock.Unlock()
  859. if client.pingID == 0 {
  860. t.Fatalf("should ping")
  861. }
  862. server.pingLock.Lock()
  863. defer server.pingLock.Unlock()
  864. if server.pingID == 0 {
  865. t.Fatalf("should ping")
  866. }
  867. }
  868. func TestKeepAlive_Timeout(t *testing.T) {
  869. conn1, conn2 := testConn()
  870. clientConf := testConf()
  871. clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes
  872. clientConf.EnableKeepAlive = false // Just test one direction, so it's deterministic who hangs up on whom
  873. client, _ := Client(conn1, clientConf)
  874. defer client.Close()
  875. server, _ := Server(conn2, testConf())
  876. defer server.Close()
  877. _ = captureLogs(client) // Client logs aren't part of the test
  878. serverLogs := captureLogs(server)
  879. errCh := make(chan error, 1)
  880. go func() {
  881. _, err := server.Accept() // Wait until server closes
  882. errCh <- err
  883. }()
  884. // Prevent the client from responding
  885. clientConn := client.conn.(*pipeConn)
  886. clientConn.writeBlocker.Lock()
  887. select {
  888. case err := <-errCh:
  889. if err != ErrKeepAliveTimeout {
  890. t.Fatalf("unexpected error: %v", err)
  891. }
  892. case <-time.After(1 * time.Second):
  893. t.Fatalf("timeout waiting for timeout")
  894. }
  895. clientConn.writeBlocker.Unlock()
  896. if !server.IsClosed() {
  897. t.Fatalf("server should have closed")
  898. }
  899. if !serverLogs.match([]string{"[ERR] yamux: keepalive failed: i/o deadline reached"}) {
  900. t.Fatalf("server log incorect: %v", serverLogs.logs())
  901. }
  902. }
  903. func TestLargeWindow(t *testing.T) {
  904. conf := DefaultConfig()
  905. conf.MaxStreamWindowSize *= 2
  906. client, server := testClientServerConfig(conf)
  907. defer client.Close()
  908. defer server.Close()
  909. stream, err := client.Open()
  910. if err != nil {
  911. t.Fatalf("err: %v", err)
  912. }
  913. defer stream.Close()
  914. stream2, err := server.Accept()
  915. if err != nil {
  916. t.Fatalf("err: %v", err)
  917. }
  918. defer stream2.Close()
  919. stream.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
  920. buf := make([]byte, conf.MaxStreamWindowSize)
  921. n, err := stream.Write(buf)
  922. if err != nil {
  923. t.Fatalf("err: %v", err)
  924. }
  925. if n != len(buf) {
  926. t.Fatalf("short write: %d", n)
  927. }
  928. }
  929. type UnlimitedReader struct{}
  930. func (u *UnlimitedReader) Read(p []byte) (int, error) {
  931. runtime.Gosched()
  932. return len(p), nil
  933. }
  934. func TestSendData_VeryLarge(t *testing.T) {
  935. client, server := testClientServer()
  936. defer client.Close()
  937. defer server.Close()
  938. var n int64 = 1 * 1024 * 1024 * 1024
  939. var workers int = 16
  940. wg := &sync.WaitGroup{}
  941. wg.Add(workers * 2)
  942. for i := 0; i < workers; i++ {
  943. go func() {
  944. defer wg.Done()
  945. stream, err := server.AcceptStream()
  946. if err != nil {
  947. t.Fatalf("err: %v", err)
  948. }
  949. defer stream.Close()
  950. buf := make([]byte, 4)
  951. _, err = stream.Read(buf)
  952. if err != nil {
  953. t.Fatalf("err: %v", err)
  954. }
  955. if !bytes.Equal(buf, []byte{0, 1, 2, 3}) {
  956. t.Fatalf("bad header")
  957. }
  958. recv, err := io.Copy(ioutil.Discard, stream)
  959. if err != nil {
  960. t.Fatalf("err: %v", err)
  961. }
  962. if recv != n {
  963. t.Fatalf("bad: %v", recv)
  964. }
  965. }()
  966. }
  967. for i := 0; i < workers; i++ {
  968. go func() {
  969. defer wg.Done()
  970. stream, err := client.Open()
  971. if err != nil {
  972. t.Fatalf("err: %v", err)
  973. }
  974. defer stream.Close()
  975. _, err = stream.Write([]byte{0, 1, 2, 3})
  976. if err != nil {
  977. t.Fatalf("err: %v", err)
  978. }
  979. unlimited := &UnlimitedReader{}
  980. sent, err := io.Copy(stream, io.LimitReader(unlimited, n))
  981. if err != nil {
  982. t.Fatalf("err: %v", err)
  983. }
  984. if sent != n {
  985. t.Fatalf("bad: %v", sent)
  986. }
  987. }()
  988. }
  989. doneCh := make(chan struct{})
  990. go func() {
  991. wg.Wait()
  992. close(doneCh)
  993. }()
  994. select {
  995. case <-doneCh:
  996. case <-time.After(20 * time.Second):
  997. panic("timeout")
  998. }
  999. }
  1000. func TestBacklogExceeded_Accept(t *testing.T) {
  1001. client, server := testClientServer()
  1002. defer client.Close()
  1003. defer server.Close()
  1004. max := 5 * client.config.AcceptBacklog
  1005. go func() {
  1006. for i := 0; i < max; i++ {
  1007. stream, err := server.Accept()
  1008. if err != nil {
  1009. t.Fatalf("err: %v", err)
  1010. }
  1011. defer stream.Close()
  1012. }
  1013. }()
  1014. // Fill the backlog
  1015. for i := 0; i < max; i++ {
  1016. stream, err := client.Open()
  1017. if err != nil {
  1018. t.Fatalf("err: %v", err)
  1019. }
  1020. defer stream.Close()
  1021. if _, err := stream.Write([]byte("foo")); err != nil {
  1022. t.Fatalf("err: %v", err)
  1023. }
  1024. }
  1025. }
  1026. func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
  1027. client, server := testClientServerConfig(testConfNoKeepAlive())
  1028. defer client.Close()
  1029. defer server.Close()
  1030. var wg sync.WaitGroup
  1031. wg.Add(2)
  1032. // Choose a huge flood size that we know will result in a window update.
  1033. flood := int64(client.config.MaxStreamWindowSize) - 1
  1034. // The server will accept a new stream and then flood data to it.
  1035. go func() {
  1036. defer wg.Done()
  1037. stream, err := server.AcceptStream()
  1038. if err != nil {
  1039. t.Fatalf("err: %v", err)
  1040. }
  1041. defer stream.Close()
  1042. n, err := stream.Write(make([]byte, flood))
  1043. if err != nil {
  1044. t.Fatalf("err: %v", err)
  1045. }
  1046. if int64(n) != flood {
  1047. t.Fatalf("short write: %d", n)
  1048. }
  1049. }()
  1050. // The client will open a stream, block outbound writes, and then
  1051. // listen to the flood from the server, which should time out since
  1052. // it won't be able to send the window update.
  1053. go func() {
  1054. defer wg.Done()
  1055. stream, err := client.OpenStream()
  1056. if err != nil {
  1057. t.Fatalf("err: %v", err)
  1058. }
  1059. defer stream.Close()
  1060. conn := client.conn.(*pipeConn)
  1061. conn.writeBlocker.Lock()
  1062. defer conn.writeBlocker.Unlock()
  1063. _, err = stream.Read(make([]byte, flood))
  1064. if err != ErrConnectionWriteTimeout {
  1065. t.Fatalf("err: %v", err)
  1066. }
  1067. }()
  1068. wg.Wait()
  1069. }
  1070. func TestSession_PartialReadWindowUpdate(t *testing.T) {
  1071. client, server := testClientServerConfig(testConfNoKeepAlive())
  1072. defer client.Close()
  1073. defer server.Close()
  1074. var wg sync.WaitGroup
  1075. wg.Add(1)
  1076. // Choose a huge flood size that we know will result in a window update.
  1077. flood := int64(client.config.MaxStreamWindowSize)
  1078. var wr *Stream
  1079. // The server will accept a new stream and then flood data to it.
  1080. go func() {
  1081. defer wg.Done()
  1082. var err error
  1083. wr, err = server.AcceptStream()
  1084. if err != nil {
  1085. t.Fatalf("err: %v", err)
  1086. }
  1087. defer wr.Close()
  1088. if wr.sendWindow != client.config.MaxStreamWindowSize {
  1089. t.Fatalf("sendWindow: exp=%d, got=%d", client.config.MaxStreamWindowSize, wr.sendWindow)
  1090. }
  1091. n, err := wr.Write(make([]byte, flood))
  1092. if err != nil {
  1093. t.Fatalf("err: %v", err)
  1094. }
  1095. if int64(n) != flood {
  1096. t.Fatalf("short write: %d", n)
  1097. }
  1098. if wr.sendWindow != 0 {
  1099. t.Fatalf("sendWindow: exp=%d, got=%d", 0, wr.sendWindow)
  1100. }
  1101. }()
  1102. stream, err := client.OpenStream()
  1103. if err != nil {
  1104. t.Fatalf("err: %v", err)
  1105. }
  1106. defer stream.Close()
  1107. wg.Wait()
  1108. _, err = stream.Read(make([]byte, flood/2+1))
  1109. if exp := uint32(flood/2 + 1); wr.sendWindow != exp {
  1110. t.Errorf("sendWindow: exp=%d, got=%d", exp, wr.sendWindow)
  1111. }
  1112. }
  1113. func TestSession_sendNoWait_Timeout(t *testing.T) {
  1114. client, server := testClientServerConfig(testConfNoKeepAlive())
  1115. defer client.Close()
  1116. defer server.Close()
  1117. var wg sync.WaitGroup
  1118. wg.Add(2)
  1119. go func() {
  1120. defer wg.Done()
  1121. stream, err := server.AcceptStream()
  1122. if err != nil {
  1123. t.Fatalf("err: %v", err)
  1124. }
  1125. defer stream.Close()
  1126. }()
  1127. // The client will open the stream and then block outbound writes, we'll
  1128. // probe sendNoWait once it gets into that state.
  1129. go func() {
  1130. defer wg.Done()
  1131. stream, err := client.OpenStream()
  1132. if err != nil {
  1133. t.Fatalf("err: %v", err)
  1134. }
  1135. defer stream.Close()
  1136. conn := client.conn.(*pipeConn)
  1137. conn.writeBlocker.Lock()
  1138. defer conn.writeBlocker.Unlock()
  1139. hdr := header(make([]byte, headerSize))
  1140. hdr.encode(typePing, flagACK, 0, 0)
  1141. for {
  1142. err = client.sendNoWait(hdr)
  1143. if err == nil {
  1144. continue
  1145. } else if err == ErrConnectionWriteTimeout {
  1146. break
  1147. } else {
  1148. t.Fatalf("err: %v", err)
  1149. }
  1150. }
  1151. }()
  1152. wg.Wait()
  1153. }
  1154. func TestSession_PingOfDeath(t *testing.T) {
  1155. client, server := testClientServerConfig(testConfNoKeepAlive())
  1156. defer client.Close()
  1157. defer server.Close()
  1158. var wg sync.WaitGroup
  1159. wg.Add(2)
  1160. var doPingOfDeath sync.Mutex
  1161. doPingOfDeath.Lock()
  1162. // This is used later to block outbound writes.
  1163. conn := server.conn.(*pipeConn)
  1164. // The server will accept a stream, block outbound writes, and then
  1165. // flood its send channel so that no more headers can be queued.
  1166. go func() {
  1167. defer wg.Done()
  1168. stream, err := server.AcceptStream()
  1169. if err != nil {
  1170. t.Fatalf("err: %v", err)
  1171. }
  1172. defer stream.Close()
  1173. conn.writeBlocker.Lock()
  1174. for {
  1175. hdr := header(make([]byte, headerSize))
  1176. hdr.encode(typePing, 0, 0, 0)
  1177. err = server.sendNoWait(hdr)
  1178. if err == nil {
  1179. continue
  1180. } else if err == ErrConnectionWriteTimeout {
  1181. break
  1182. } else {
  1183. t.Fatalf("err: %v", err)
  1184. }
  1185. }
  1186. doPingOfDeath.Unlock()
  1187. }()
  1188. // The client will open a stream and then send the server a ping once it
  1189. // can no longer write. This makes sure the server doesn't deadlock reads
  1190. // while trying to reply to the ping with no ability to write.
  1191. go func() {
  1192. defer wg.Done()
  1193. stream, err := client.OpenStream()
  1194. if err != nil {
  1195. t.Fatalf("err: %v", err)
  1196. }
  1197. defer stream.Close()
  1198. // This ping will never unblock because the ping id will never
  1199. // show up in a response.
  1200. doPingOfDeath.Lock()
  1201. go func() { client.Ping() }()
  1202. // Wait for a while to make sure the previous ping times out,
  1203. // then turn writes back on and make sure a ping works again.
  1204. time.Sleep(2 * server.config.ConnectionWriteTimeout)
  1205. conn.writeBlocker.Unlock()
  1206. if _, err = client.Ping(); err != nil {
  1207. t.Fatalf("err: %v", err)
  1208. }
  1209. }()
  1210. wg.Wait()
  1211. }
  1212. func TestSession_ConnectionWriteTimeout(t *testing.T) {
  1213. client, server := testClientServerConfig(testConfNoKeepAlive())
  1214. defer client.Close()
  1215. defer server.Close()
  1216. var wg sync.WaitGroup
  1217. wg.Add(2)
  1218. go func() {
  1219. defer wg.Done()
  1220. stream, err := server.AcceptStream()
  1221. if err != nil {
  1222. t.Fatalf("err: %v", err)
  1223. }
  1224. defer stream.Close()
  1225. }()
  1226. // The client will open the stream and then block outbound writes, we'll
  1227. // tee up a write and make sure it eventually times out.
  1228. go func() {
  1229. defer wg.Done()
  1230. stream, err := client.OpenStream()
  1231. if err != nil {
  1232. t.Fatalf("err: %v", err)
  1233. }
  1234. defer stream.Close()
  1235. conn := client.conn.(*pipeConn)
  1236. conn.writeBlocker.Lock()
  1237. defer conn.writeBlocker.Unlock()
  1238. // Since the write goroutine is blocked then this will return a
  1239. // timeout since it can't get feedback about whether the write
  1240. // worked.
  1241. n, err := stream.Write([]byte("hello"))
  1242. if err != ErrConnectionWriteTimeout {
  1243. t.Fatalf("err: %v", err)
  1244. }
  1245. if n != 0 {
  1246. t.Fatalf("lied about writes: %d", n)
  1247. }
  1248. }()
  1249. wg.Wait()
  1250. }