|
@@ -12,8 +12,8 @@ import (
|
|
|
)
|
|
|
|
|
|
type pipeConn struct {
|
|
|
- reader *io.PipeReader
|
|
|
- writer *io.PipeWriter
|
|
|
+ reader *io.PipeReader
|
|
|
+ writer *io.PipeWriter
|
|
|
writeBlocker sync.Mutex
|
|
|
}
|
|
|
|
|
@@ -44,7 +44,7 @@ func testClientServer() (*Session, *Session) {
|
|
|
conf := DefaultConfig()
|
|
|
conf.AcceptBacklog = 64
|
|
|
conf.KeepAliveInterval = 100 * time.Millisecond
|
|
|
- conf.HeaderWriteTimeout = 250 * time.Millisecond
|
|
|
+ conf.ConnectionWriteTimeout = 250 * time.Millisecond
|
|
|
return testClientServerConfig(conf)
|
|
|
}
|
|
|
|
|
@@ -852,7 +852,7 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
|
|
|
conn.writeBlocker.Lock()
|
|
|
|
|
|
_, err = stream.Read(make([]byte, flood))
|
|
|
- if err != ErrHeaderWriteTimeout {
|
|
|
+ if err != ErrConnectionWriteTimeout {
|
|
|
t.Fatalf("err: %v", err)
|
|
|
}
|
|
|
}()
|
|
@@ -898,7 +898,7 @@ func TestSession_sendNoWait_Timeout(t *testing.T) {
|
|
|
err = client.sendNoWait(hdr)
|
|
|
if err == nil {
|
|
|
continue
|
|
|
- } else if err == ErrHeaderWriteTimeout {
|
|
|
+ } else if err == ErrConnectionWriteTimeout {
|
|
|
break
|
|
|
} else {
|
|
|
t.Fatalf("err: %v", err)
|
|
@@ -941,7 +941,7 @@ func TestSession_PingOfDeath(t *testing.T) {
|
|
|
err = server.sendNoWait(hdr)
|
|
|
if err == nil {
|
|
|
continue
|
|
|
- } else if err == ErrHeaderWriteTimeout {
|
|
|
+ } else if err == ErrConnectionWriteTimeout {
|
|
|
break
|
|
|
} else {
|
|
|
t.Fatalf("err: %v", err)
|
|
@@ -970,7 +970,7 @@ func TestSession_PingOfDeath(t *testing.T) {
|
|
|
|
|
|
// Wait for a while to make sure the previous ping times out,
|
|
|
// then turn writes back on and make sure a ping works again.
|
|
|
- time.Sleep(2 * server.config.HeaderWriteTimeout)
|
|
|
+ time.Sleep(2 * server.config.ConnectionWriteTimeout)
|
|
|
conn.writeBlocker.Unlock()
|
|
|
if _, err = client.Ping(); err != nil {
|
|
|
t.Fatalf("err: %v", err)
|
|
@@ -979,3 +979,56 @@ func TestSession_PingOfDeath(t *testing.T) {
|
|
|
|
|
|
wg.Wait()
|
|
|
}
|
|
|
+
|
|
|
+func TestSession_ConnectionWriteTimeout(t *testing.T) {
|
|
|
+ // Disable keepalives so they don't detect the failed connection
|
|
|
+ // before the user's write does.
|
|
|
+ conf := DefaultConfig()
|
|
|
+ conf.EnableKeepAlive = false
|
|
|
+ conf.ConnectionWriteTimeout = 250 * time.Millisecond
|
|
|
+
|
|
|
+ client, server := testClientServerConfig(conf)
|
|
|
+ defer client.Close()
|
|
|
+ defer server.Close()
|
|
|
+
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ wg.Add(2)
|
|
|
+
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ stream, err := server.AcceptStream()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("err: %v", err)
|
|
|
+ }
|
|
|
+ defer stream.Close()
|
|
|
+ }()
|
|
|
+
|
|
|
+ // The client will open the stream and then block outbound writes, we'll
|
|
|
+ // tee up a write and make sure it eventually times out.
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+
|
|
|
+ stream, err := client.OpenStream()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("err: %v", err)
|
|
|
+ }
|
|
|
+ defer stream.Close()
|
|
|
+
|
|
|
+ conn := client.conn.(*pipeConn)
|
|
|
+ conn.writeBlocker.Lock()
|
|
|
+
|
|
|
+ // Since the write goroutine is blocked then this will return a
|
|
|
+ // timeout since it can't get feedback about whether the write
|
|
|
+ // worked.
|
|
|
+ n, err := stream.Write([]byte("hello"))
|
|
|
+ if err != ErrConnectionWriteTimeout {
|
|
|
+ t.Fatalf("err: %v", err)
|
|
|
+ }
|
|
|
+ if n != 0 {
|
|
|
+ t.Fatalf("lied about writes: %d", n)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+}
|