Browse Source

window update unit test for partial read; benchmark large buffer

Stuart Carnie 7 years ago
parent
commit
cf433c5a30
2 changed files with 68 additions and 11 deletions
  1. 0 5
      bench_test.go
  2. 68 6
      session_test.go

+ 0 - 5
bench_test.go

@@ -1,7 +1,6 @@
 package yamux
 
 import (
-	"fmt"
 	"testing"
 )
 
@@ -85,7 +84,6 @@ func BenchmarkSendRecvLarge(b *testing.B) {
 	client, server := testClientServer()
 	defer client.Close()
 	defer server.Close()
-
 	const sendSize = 512 * 1024 * 1024
 	const recvSize = 4 * 1024
 
@@ -107,9 +105,6 @@ func BenchmarkSendRecvLarge(b *testing.B) {
 					b.Fatalf("err: %v", err)
 				}
 			}
-
-			fmt.Printf("Capacity of rcv buffer = %v, length of rcv window = %v\n", stream.recvBuf.Cap(), stream.recvWindow)
-
 		}
 		close(recvDone)
 	}()

+ 68 - 6
session_test.go

@@ -376,7 +376,12 @@ func TestSendData_Large(t *testing.T) {
 	defer client.Close()
 	defer server.Close()
 
-	data := make([]byte, 512*1024)
+	const (
+		sendSize = 250 * 1024 * 1024
+		recvSize = 4 * 1024
+	)
+
+	data := make([]byte, sendSize)
 	for idx := range data {
 		data[idx] = byte(idx % 256)
 	}
@@ -390,16 +395,17 @@ func TestSendData_Large(t *testing.T) {
 		if err != nil {
 			t.Fatalf("err: %v", err)
 		}
-
-		buf := make([]byte, 4*1024)
-		for i := 0; i < 128; i++ {
+		var sz int
+		buf := make([]byte, recvSize)
+		for i := 0; i < sendSize/recvSize; i++ {
 			n, err := stream.Read(buf)
 			if err != nil {
 				t.Fatalf("err: %v", err)
 			}
-			if n != 4*1024 {
+			if n != recvSize {
 				t.Fatalf("short read: %d", n)
 			}
+			sz += n
 			for idx := range buf {
 				if buf[idx] != byte(idx%256) {
 					t.Fatalf("bad: %v %v %v", i, idx, buf[idx])
@@ -410,6 +416,8 @@ func TestSendData_Large(t *testing.T) {
 		if err := stream.Close(); err != nil {
 			t.Fatalf("err: %v", err)
 		}
+
+		t.Logf("cap=%d, n=%d\n", stream.recvBuf.Cap(), sz)
 	}()
 
 	go func() {
@@ -439,7 +447,7 @@ func TestSendData_Large(t *testing.T) {
 	}()
 	select {
 	case <-doneCh:
-	case <-time.After(time.Second):
+	case <-time.After(5 * time.Second):
 		panic("timeout")
 	}
 }
@@ -1026,6 +1034,60 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
 	wg.Wait()
 }
 
+func TestSession_PartialReadWindowUpdate(t *testing.T) {
+	client, server := testClientServerConfig(testConfNoKeepAlive())
+	defer client.Close()
+	defer server.Close()
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+
+	// Choose a huge flood size that we know will result in a window update.
+	flood := int64(client.config.MaxStreamWindowSize)
+	var wr *Stream
+
+	// The server will accept a new stream and then flood data to it.
+	go func() {
+		defer wg.Done()
+
+		var err error
+		wr, err = server.AcceptStream()
+		if err != nil {
+			t.Fatalf("err: %v", err)
+		}
+		defer wr.Close()
+
+		if wr.sendWindow != client.config.MaxStreamWindowSize {
+			t.Fatalf("sendWindow: exp=%d, got=%d", client.config.MaxStreamWindowSize, wr.sendWindow)
+		}
+
+		n, err := wr.Write(make([]byte, flood))
+		if err != nil {
+			t.Fatalf("err: %v", err)
+		}
+		if int64(n) != flood {
+			t.Fatalf("short write: %d", n)
+		}
+		if wr.sendWindow != 0 {
+			t.Fatalf("sendWindow: exp=%d, got=%d", 0, wr.sendWindow)
+		}
+	}()
+
+	stream, err := client.OpenStream()
+	if err != nil {
+		t.Fatalf("err: %v", err)
+	}
+	defer stream.Close()
+
+	wg.Wait()
+
+	_, err = stream.Read(make([]byte, flood/2+1))
+
+	if exp := uint32(flood/2 + 1); wr.sendWindow != exp {
+		t.Errorf("sendWindow: exp=%d, got=%d", exp, wr.sendWindow)
+	}
+}
+
 func TestSession_sendNoWait_Timeout(t *testing.T) {
 	client, server := testClientServerConfig(testConfNoKeepAlive())
 	defer client.Close()