|
@@ -33,9 +33,8 @@ type Stream struct {
|
|
|
state streamState
|
|
|
stateLock sync.Mutex
|
|
|
|
|
|
- recvBuf bytes.Buffer
|
|
|
- waitingBuffer *directBuffer
|
|
|
- recvLock sync.Mutex
|
|
|
+ recvBuf bytes.Buffer
|
|
|
+ recvLock sync.Mutex
|
|
|
|
|
|
controlHdr header
|
|
|
controlErr chan error
|
|
@@ -52,11 +51,6 @@ type Stream struct {
|
|
|
writeDeadline time.Time
|
|
|
}
|
|
|
|
|
|
-type directBuffer struct {
|
|
|
- buf []byte
|
|
|
- bytes int
|
|
|
-}
|
|
|
-
|
|
|
// newStream is used to construct a new stream within
|
|
|
// a given session for an ID
|
|
|
func newStream(session *Session, id uint32, state streamState) *Stream {
|
|
@@ -89,8 +83,6 @@ func (s *Stream) StreamID() uint32 {
|
|
|
// Read is used to read from the stream
|
|
|
func (s *Stream) Read(b []byte) (n int, err error) {
|
|
|
defer asyncNotify(s.recvNotifyCh)
|
|
|
- var dBuf *directBuffer
|
|
|
-
|
|
|
START:
|
|
|
s.stateLock.Lock()
|
|
|
switch s.state {
|
|
@@ -110,11 +102,6 @@ START:
|
|
|
// If there is no data available, block
|
|
|
s.recvLock.Lock()
|
|
|
if s.recvBuf.Len() == 0 {
|
|
|
- // Mark ourself as waiting potentially
|
|
|
- if s.waitingBuffer == nil {
|
|
|
- dBuf = &directBuffer{buf: b}
|
|
|
- s.waitingBuffer = dBuf
|
|
|
- }
|
|
|
s.recvLock.Unlock()
|
|
|
goto WAIT
|
|
|
}
|
|
@@ -135,16 +122,8 @@ WAIT:
|
|
|
}
|
|
|
select {
|
|
|
case <-s.recvNotifyCh:
|
|
|
- if dBuf != nil && dBuf.bytes > 0 {
|
|
|
- err = s.sendWindowUpdate()
|
|
|
- return dBuf.bytes, err
|
|
|
- }
|
|
|
goto START
|
|
|
case <-timeout:
|
|
|
- if dBuf != nil && dBuf.bytes > 0 {
|
|
|
- err = s.sendWindowUpdate()
|
|
|
- return dBuf.bytes, err
|
|
|
- }
|
|
|
return 0, ErrTimeout
|
|
|
}
|
|
|
}
|
|
@@ -394,30 +373,19 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
|
|
|
return ErrRecvWindowExceeded
|
|
|
}
|
|
|
|
|
|
- // Decrement the receive window
|
|
|
- atomic.AddUint32(&s.recvWindow, ^uint32(length-1))
|
|
|
-
|
|
|
// Wrap in a limited reader
|
|
|
conn = &io.LimitedReader{R: conn, N: int64(length)}
|
|
|
|
|
|
- // Copy into waiting buffer if any
|
|
|
+ // Copy into buffer
|
|
|
s.recvLock.Lock()
|
|
|
- if s.waitingBuffer != nil {
|
|
|
- n, err := conn.Read(s.waitingBuffer.buf)
|
|
|
- s.waitingBuffer.bytes = n
|
|
|
- s.waitingBuffer = nil
|
|
|
- if err != nil {
|
|
|
- s.recvLock.Unlock()
|
|
|
- return err
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Copy to our buffer anything left
|
|
|
if _, err := io.Copy(&s.recvBuf, conn); err != nil {
|
|
|
s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err)
|
|
|
s.recvLock.Unlock()
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
+ // Decrement the receive window
|
|
|
+ atomic.AddUint32(&s.recvWindow, ^uint32(length-1))
|
|
|
s.recvLock.Unlock()
|
|
|
|
|
|
// Unblock any readers
|