Browse Source

Adding logging with configurable sink

Armon Dadgar 10 years ago
parent
commit
2ce53d05ab
3 changed files with 25 additions and 0 deletions
  1. 5 0
      mux.go
  2. 17 0
      session.go
  3. 3 0
      stream.go

+ 5 - 0
mux.go

@@ -3,6 +3,7 @@ package yamux
 import (
 	"fmt"
 	"io"
+	"os"
 	"time"
 )
 
@@ -22,6 +23,9 @@ type Config struct {
 	// MaxStreamWindowSize is used to control the maximum
 	// window size that we allow for a stream.
 	MaxStreamWindowSize uint32
+
+	// LogOutput is used to control the log destination
+	LogOutput io.Writer
 }
 
 // DefaultConfig is used to return a default configuration
@@ -31,6 +35,7 @@ func DefaultConfig() *Config {
 		EnableKeepAlive:     true,
 		KeepAliveInterval:   30 * time.Second,
 		MaxStreamWindowSize: initialStreamWindow,
+		LogOutput:           os.Stderr,
 	}
 }
 

+ 17 - 0
session.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
+	"log"
 	"math"
 	"net"
 	"sync"
@@ -30,6 +31,9 @@ type Session struct {
 	// config holds our configuration
 	config *Config
 
+	// logger is used for our logs
+	logger *log.Logger
+
 	// conn is the underlying connection
 	conn io.ReadWriteCloser
 
@@ -71,6 +75,7 @@ type sendReady struct {
 func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
 	s := &Session{
 		config:     config,
+		logger:     log.New(config.LogOutput, "", log.LstdFlags),
 		conn:       conn,
 		bufRead:    bufio.NewReader(conn),
 		pings:      make(map[uint32]chan struct{}),
@@ -285,6 +290,7 @@ func (s *Session) send() {
 				for sent < len(ready.Hdr) {
 					n, err := s.conn.Write(ready.Hdr[sent:])
 					if err != nil {
+						s.logger.Printf("[ERR] yamux: Failed to write header: %v", err)
 						asyncSendErr(ready.Err, err)
 						s.exitErr(err)
 						return
@@ -297,6 +303,7 @@ func (s *Session) send() {
 			if ready.Body != nil {
 				_, err := io.Copy(s.conn, ready.Body)
 				if err != nil {
+					s.logger.Printf("[ERR] yamux: Failed to write body: %v", err)
 					asyncSendErr(ready.Err, err)
 					s.exitErr(err)
 					return
@@ -318,12 +325,16 @@ func (s *Session) recv() {
 	for {
 		// Read the header
 		if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
+			if err != io.EOF {
+				s.logger.Printf("[ERR] yamux: Failed to read header: %v", err)
+			}
 			s.exitErr(err)
 			return
 		}
 
 		// Verify the version
 		if hdr.Version() != protoVersion {
+			s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
 			s.exitErr(ErrInvalidVersion)
 			return
 		}
@@ -372,6 +383,7 @@ func (s *Session) handleStreamMessage(hdr header) error {
 		// Drain any data on the wire
 		if hdr.MsgType() == typeData && hdr.Length() > 0 {
 			if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil {
+				s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err)
 				return nil
 			}
 		}
@@ -426,10 +438,13 @@ func (s *Session) handleGoAway(hdr header) error {
 	case goAwayNormal:
 		atomic.SwapInt32(&s.remoteGoAway, 1)
 	case goAwayProtoErr:
+		s.logger.Printf("[ERR] yamux: received protocol error go away")
 		return fmt.Errorf("yamux protocol error")
 	case goAwayInternalErr:
+		s.logger.Printf("[ERR] yamux: received internal error go away")
 		return fmt.Errorf("remote yamux internal error")
 	default:
+		s.logger.Printf("[ERR] yamux: received unexpected go away")
 		return fmt.Errorf("unexpected go away received")
 	}
 	return nil
@@ -452,6 +467,7 @@ func (s *Session) incomingStream(id uint32) error {
 
 	// Check if stream already exists
 	if _, ok := s.streams[id]; ok {
+		s.logger.Printf("[ERR] yamux: duplicate stream declared")
 		s.sendNoWait(s.goAway(goAwayProtoErr))
 		return ErrDuplicateStream
 	}
@@ -465,6 +481,7 @@ func (s *Session) incomingStream(id uint32) error {
 		return nil
 	default:
 		// Backlog exceeded! RST the stream
+		s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
 		delete(s.streams, id)
 		stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0)
 		return s.sendNoWait(stream.sendHdr)

+ 3 - 0
stream.go

@@ -348,6 +348,7 @@ func (s *Stream) processFlags(flags uint16) error {
 			s.session.closeStream(s.id, true)
 			s.notifyWaiting()
 		default:
+			s.session.logger.Printf("[ERR] yamux: unexpected FIN flag in state %d", s.state)
 			return ErrUnexpectedFlag
 		}
 	} else if flags&flagRST == flagRST {
@@ -388,6 +389,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
 		return nil
 	}
 	if length > atomic.LoadUint32(&s.recvWindow) {
+		s.session.logger.Printf("[ERR] yamux: receive window exceeded")
 		return ErrRecvWindowExceeded
 	}
 
@@ -411,6 +413,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
 
 	// Copy to our buffer anything left
 	if _, err := io.Copy(&s.recvBuf, conn); err != nil {
+		s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err)
 		s.recvLock.Unlock()
 		return err
 	}