Browse Source

Generalizes the header write timeout to a broader "safety valve" write timeout.

James Phillips 9 years ago
parent
commit
dfbd354eb2
4 changed files with 88 additions and 40 deletions
  1. 4 4
      const.go
  2. 11 11
      mux.go
  3. 13 18
      session.go
  4. 60 7
      session_test.go

+ 4 - 4
const.go

@@ -29,10 +29,6 @@ var (
 	// ErrReceiveWindowExceeded indicates the window was exceeded
 	ErrRecvWindowExceeded = fmt.Errorf("recv window exceeded")
 
-	// ErrHeaderWriteTimeout indicates that we hit an IO deadline waiting
-	// for a header to be written.
-	ErrHeaderWriteTimeout = fmt.Errorf("header write timeout")
-
 	// ErrTimeout is used when we reach an IO deadline
 	ErrTimeout = fmt.Errorf("i/o deadline reached")
 
@@ -48,6 +44,10 @@ var (
 	// ErrConnectionReset is sent if a stream is reset. This can happen
 	// if the backlog is exceeded, or if there was a remote GoAway.
 	ErrConnectionReset = fmt.Errorf("connection reset")
+
+	// ErrConnectionWriteTimeout indicates that we hit the "safety valve"
+	// timeout writing to the underlying stream connection.
+	ErrConnectionWriteTimeout = fmt.Errorf("connection write timeout")
 )
 
 const (

+ 11 - 11
mux.go

@@ -20,11 +20,11 @@ type Config struct {
 	// KeepAliveInterval is how often to perform the keep alive
 	KeepAliveInterval time.Duration
 
-	// HeaderWriteTimeout is how long we will wait to perform a blocking
-	// operation writing a header, after which we will throw an error and
-	// close the stream. Headers are small, so this should be set to a value
-	// after which you suspect there is something wrong with the connection.
-	HeaderWriteTimeout time.Duration
+	// ConnectionWriteTimeout is meant to be a "safety valve" timeout after
+	// we which will suspect a problem with the underlying connection and
+	// close it. This is only applied to writes, where's there's generally
+	// an expectation that things will move along quickly.
+	ConnectionWriteTimeout time.Duration
 
 	// MaxStreamWindowSize is used to control the maximum
 	// window size that we allow for a stream.
@@ -37,12 +37,12 @@ type Config struct {
 // DefaultConfig is used to return a default configuration
 func DefaultConfig() *Config {
 	return &Config{
-		AcceptBacklog:       256,
-		EnableKeepAlive:     true,
-		KeepAliveInterval:   30 * time.Second,
-		HeaderWriteTimeout:  10 * time.Second,
-		MaxStreamWindowSize: initialStreamWindow,
-		LogOutput:           os.Stderr,
+		AcceptBacklog:          256,
+		EnableKeepAlive:        true,
+		KeepAliveInterval:      30 * time.Second,
+		ConnectionWriteTimeout: 10 * time.Second,
+		MaxStreamWindowSize:    initialStreamWindow,
+		LogOutput:              os.Stderr,
 	}
 }
 

+ 13 - 18
session.go

@@ -300,24 +300,19 @@ func (s *Session) waitForSend(hdr header, body io.Reader) error {
 }
 
 // waitForSendErr waits to send a header with optional data, checking for a
-// potential shutdown. If the body is not supplied then we will enforce the
-// configured HeaderWriteTimeout, since this is a small control header.
+// potential shutdown. Since there's the expectation that sends can happen
+// in a timely manner, we enforce the connection write timeout here.
 func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
-	var timeout <- chan time.Time
-	if body == nil {
-		timer := time.NewTimer(s.config.HeaderWriteTimeout)
-		defer timer.Stop()
-
-		timeout = timer.C
-	}
+	timer := time.NewTimer(s.config.ConnectionWriteTimeout)
+	defer timer.Stop()
 
 	ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
 	select {
 	case s.sendCh <- ready:
 	case <-s.shutdownCh:
 		return ErrSessionShutdown
-	case <-timeout:
-		return ErrHeaderWriteTimeout
+	case <-timer.C:
+		return ErrConnectionWriteTimeout
 	}
 
 	select {
@@ -325,16 +320,16 @@ func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) e
 		return err
 	case <-s.shutdownCh:
 		return ErrSessionShutdown
-	case <-timeout:
-		return ErrHeaderWriteTimeout
+	case <-timer.C:
+		return ErrConnectionWriteTimeout
 	}
 }
 
-// sendNoWait does a send without waiting. Since there's still a case where
-// sendCh itself can be full, we will enforce the configured HeaderWriteTimeout,
-// since this is a small control header.
+// sendNoWait does a send without waiting. Since there's the expectation that
+// the send happens right here, we enforce the connection write timeout if we
+// can't queue the header to be sent.
 func (s *Session) sendNoWait(hdr header) error {
-	timer := time.NewTimer(s.config.HeaderWriteTimeout)
+	timer := time.NewTimer(s.config.ConnectionWriteTimeout)
 	defer timer.Stop()
 
 	select {
@@ -343,7 +338,7 @@ func (s *Session) sendNoWait(hdr header) error {
 	case <-s.shutdownCh:
 		return ErrSessionShutdown
 	case <-timer.C:
-		return ErrHeaderWriteTimeout
+		return ErrConnectionWriteTimeout
 	}
 }
 

+ 60 - 7
session_test.go

@@ -12,8 +12,8 @@ import (
 )
 
 type pipeConn struct {
-	reader *io.PipeReader
-	writer *io.PipeWriter
+	reader       *io.PipeReader
+	writer       *io.PipeWriter
 	writeBlocker sync.Mutex
 }
 
@@ -44,7 +44,7 @@ func testClientServer() (*Session, *Session) {
 	conf := DefaultConfig()
 	conf.AcceptBacklog = 64
 	conf.KeepAliveInterval = 100 * time.Millisecond
-	conf.HeaderWriteTimeout = 250 * time.Millisecond
+	conf.ConnectionWriteTimeout = 250 * time.Millisecond
 	return testClientServerConfig(conf)
 }
 
@@ -852,7 +852,7 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) {
 		conn.writeBlocker.Lock()
 
 		_, err = stream.Read(make([]byte, flood))
-		if err != ErrHeaderWriteTimeout {
+		if err != ErrConnectionWriteTimeout {
 			t.Fatalf("err: %v", err)
 		}
 	}()
@@ -898,7 +898,7 @@ func TestSession_sendNoWait_Timeout(t *testing.T) {
 			err = client.sendNoWait(hdr)
 			if err == nil {
 				continue
-			} else if err == ErrHeaderWriteTimeout {
+			} else if err == ErrConnectionWriteTimeout {
 				break
 			} else {
 				t.Fatalf("err: %v", err)
@@ -941,7 +941,7 @@ func TestSession_PingOfDeath(t *testing.T) {
 			err = server.sendNoWait(hdr)
 			if err == nil {
 				continue
-			} else if err == ErrHeaderWriteTimeout {
+			} else if err == ErrConnectionWriteTimeout {
 				break
 			} else {
 				t.Fatalf("err: %v", err)
@@ -970,7 +970,7 @@ func TestSession_PingOfDeath(t *testing.T) {
 
 		// Wait for a while to make sure the previous ping times out,
 		// then turn writes back on and make sure a ping works again.
-		time.Sleep(2 * server.config.HeaderWriteTimeout)
+		time.Sleep(2 * server.config.ConnectionWriteTimeout)
 		conn.writeBlocker.Unlock()
 		if _, err = client.Ping(); err != nil {
 			t.Fatalf("err: %v", err)
@@ -979,3 +979,56 @@ func TestSession_PingOfDeath(t *testing.T) {
 
 	wg.Wait()
 }
+
+func TestSession_ConnectionWriteTimeout(t *testing.T) {
+	// Disable keepalives so they don't detect the failed connection
+	// before the user's write does.
+	conf := DefaultConfig()
+	conf.EnableKeepAlive = false
+	conf.ConnectionWriteTimeout = 250 * time.Millisecond
+
+	client, server := testClientServerConfig(conf)
+	defer client.Close()
+	defer server.Close()
+
+	var wg sync.WaitGroup
+	wg.Add(2)
+
+	go func() {
+		defer wg.Done()
+
+		stream, err := server.AcceptStream()
+		if err != nil {
+			t.Fatalf("err: %v", err)
+		}
+		defer stream.Close()
+	}()
+
+	// The client will open the stream and then block outbound writes, we'll
+	// tee up a write and make sure it eventually times out.
+	go func() {
+		defer wg.Done()
+
+		stream, err := client.OpenStream()
+		if err != nil {
+			t.Fatalf("err: %v", err)
+		}
+		defer stream.Close()
+
+		conn := client.conn.(*pipeConn)
+		conn.writeBlocker.Lock()
+
+		// Since the write goroutine is blocked then this will return a
+		// timeout since it can't get feedback about whether the write
+		// worked.
+		n, err := stream.Write([]byte("hello"))
+		if err != ErrConnectionWriteTimeout {
+			t.Fatalf("err: %v", err)
+		}
+		if n != 0 {
+			t.Fatalf("lied about writes: %d", n)
+		}
+	}()
+
+	wg.Wait()
+}