|
@@ -4,6 +4,8 @@ import (
|
|
|
"bytes"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
+ "io/ioutil"
|
|
|
+ "runtime"
|
|
|
"sync"
|
|
|
"testing"
|
|
|
"time"
|
|
@@ -643,3 +645,85 @@ func TestLargeWindow(t *testing.T) {
|
|
|
t.Fatalf("short write: %d", n)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+type UnlimitedReader struct{}
|
|
|
+
|
|
|
+func (u *UnlimitedReader) Read(p []byte) (int, error) {
|
|
|
+ runtime.Gosched()
|
|
|
+ return len(p), nil
|
|
|
+}
|
|
|
+
|
|
|
+func TestSendData_VeryLarge(t *testing.T) {
|
|
|
+ client, server := testClientServer()
|
|
|
+ defer client.Close()
|
|
|
+ defer server.Close()
|
|
|
+
|
|
|
+ var n int64 = 1 * 1024 * 1024 * 1024
|
|
|
+ var workers int = 16
|
|
|
+
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ wg.Add(workers * 2)
|
|
|
+
|
|
|
+ for i := 0; i < workers; i++ {
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ stream, err := server.AcceptStream()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("err: %v", err)
|
|
|
+ }
|
|
|
+ defer stream.Close()
|
|
|
+
|
|
|
+ buf := make([]byte, 4)
|
|
|
+ _, err = stream.Read(buf)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("err: %v", err)
|
|
|
+ }
|
|
|
+ if !bytes.Equal(buf, []byte{0, 1, 2, 3}) {
|
|
|
+ t.Fatalf("bad header")
|
|
|
+ }
|
|
|
+
|
|
|
+ recv, err := io.Copy(ioutil.Discard, stream)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("err: %v", err)
|
|
|
+ }
|
|
|
+ if recv != n {
|
|
|
+ t.Fatalf("bad: %v", recv)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
+ for i := 0; i < workers; i++ {
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ stream, err := client.Open()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("err: %v", err)
|
|
|
+ }
|
|
|
+ defer stream.Close()
|
|
|
+
|
|
|
+ _, err = stream.Write([]byte{0, 1, 2, 3})
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("err: %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ unlimited := &UnlimitedReader{}
|
|
|
+ sent, err := io.Copy(stream, io.LimitReader(unlimited, n))
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("err: %v", err)
|
|
|
+ }
|
|
|
+ if sent != n {
|
|
|
+ t.Fatalf("bad: %v", sent)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
+
|
|
|
+ doneCh := make(chan struct{})
|
|
|
+ go func() {
|
|
|
+ wg.Wait()
|
|
|
+ close(doneCh)
|
|
|
+ }()
|
|
|
+ select {
|
|
|
+ case <-doneCh:
|
|
|
+ case <-time.After(20 * time.Second):
|
|
|
+ panic("timeout")
|
|
|
+ }
|
|
|
+}
|