Browse Source

Validate configuration, remove session window

Armon Dadgar 11 years ago
parent
commit
a5150340d3
5 changed files with 45 additions and 36 deletions
  1. 2 2
      README.md
  2. 0 3
      const.go
  3. 31 14
      mux.go
  4. 11 16
      session_test.go
  5. 1 1
      stream.go

+ 2 - 2
README.md

@@ -115,8 +115,8 @@ hard close a stream immediately.
 
 
 ## Flow Control
 ## Flow Control
 
 
-When Yamux is initially started there is a 2MB window size for
-the session. Additionally, each new stream has a 256KB window size.
+When Yamux is initially starts each stream with a 256KB window size.
+There is no window size for the session.
 
 
 To prevent the session or streams from stalling, window update
 To prevent the session or streams from stalling, window update
 frames should be sent regularly. Yamux can be configured to provide
 frames should be sent regularly. Yamux can be configured to provide

+ 0 - 3
const.go

@@ -93,9 +93,6 @@ const (
 )
 )
 
 
 const (
 const (
-	// initialSessionWindow is the initial session window size
-	initialSessionWindow uint32 = 2 * 1024 * 1024
-
 	// initialStreamWindow is the initial stream window size
 	// initialStreamWindow is the initial stream window size
 	initialStreamWindow uint32 = 256 * 1024
 	initialStreamWindow uint32 = 256 * 1024
 )
 )

+ 31 - 14
mux.go

@@ -1,6 +1,7 @@
 package yamux
 package yamux
 
 
 import (
 import (
+	"fmt"
 	"io"
 	"io"
 	"time"
 	"time"
 )
 )
@@ -22,10 +23,6 @@ type Config struct {
 	// KeepAliveInterval is how often to perform the keep alive
 	// KeepAliveInterval is how often to perform the keep alive
 	KeepAliveInterval time.Duration
 	KeepAliveInterval time.Duration
 
 
-	// MaxSessionWindowSize is used to control the maximum
-	// window size that we allow for a session.
-	MaxSessionWindowSize uint32
-
 	// MaxStreamWindowSize is used to control the maximum
 	// MaxStreamWindowSize is used to control the maximum
 	// window size that we allow for a stream.
 	// window size that we allow for a stream.
 	MaxStreamWindowSize uint32
 	MaxStreamWindowSize uint32
@@ -34,30 +31,50 @@ type Config struct {
 // DefaultConfig is used to return a default configuration
 // DefaultConfig is used to return a default configuration
 func DefaultConfig() *Config {
 func DefaultConfig() *Config {
 	return &Config{
 	return &Config{
-		AcceptBacklog:        256,
-		EnableCompression:    true,
-		EnableKeepAlive:      true,
-		KeepAliveInterval:    30 * time.Second,
-		MaxSessionWindowSize: initialSessionWindow,
-		MaxStreamWindowSize:  initialStreamWindow,
+		AcceptBacklog:       256,
+		EnableCompression:   true,
+		EnableKeepAlive:     true,
+		KeepAliveInterval:   30 * time.Second,
+		MaxStreamWindowSize: initialStreamWindow,
 	}
 	}
 }
 }
 
 
+// VerifyConfig is used to verify the sanity of configuration
+func VerifyConfig(config *Config) error {
+	if config.AcceptBacklog <= 0 {
+		return fmt.Errorf("backlog must be positive")
+	}
+	if config.KeepAliveInterval == 0 {
+		return fmt.Errorf("keep-alive interval must be positive")
+	}
+	if config.MaxStreamWindowSize < initialStreamWindow {
+		return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow)
+	}
+	return nil
+}
+
 // Server is used to initialize a new server-side connection.
 // Server is used to initialize a new server-side connection.
 // There must be at most one server-side connection. If a nil config is
 // There must be at most one server-side connection. If a nil config is
 // provided, the DefaultConfiguration will be used.
 // provided, the DefaultConfiguration will be used.
-func Server(conn io.ReadWriteCloser, config *Config) *Session {
+func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) {
 	if config == nil {
 	if config == nil {
 		config = DefaultConfig()
 		config = DefaultConfig()
 	}
 	}
-	return newSession(config, conn, false)
+	if err := VerifyConfig(config); err != nil {
+		return nil, err
+	}
+	return newSession(config, conn, false), nil
 }
 }
 
 
 // Client is used to initialize a new client-side connection.
 // Client is used to initialize a new client-side connection.
 // There must be at most one client-side connection.
 // There must be at most one client-side connection.
-func Client(conn io.ReadWriteCloser, config *Config) *Session {
+func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) {
 	if config == nil {
 	if config == nil {
 		config = DefaultConfig()
 		config = DefaultConfig()
 	}
 	}
-	return newSession(config, conn, true)
+
+	if err := VerifyConfig(config); err != nil {
+		return nil, err
+	}
+	return newSession(config, conn, true), nil
 }
 }

+ 11 - 16
session_test.go

@@ -31,12 +31,16 @@ func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) {
 	return &pipeConn{read1, write2}, &pipeConn{read2, write1}
 	return &pipeConn{read1, write2}, &pipeConn{read2, write1}
 }
 }
 
 
-func TestPing(t *testing.T) {
+func testClientServer() (*Session, *Session) {
 	conn1, conn2 := testConn()
 	conn1, conn2 := testConn()
-	client := Client(conn1, nil)
-	defer client.Close()
+	client, _ := Client(conn1, nil)
+	server, _ := Server(conn2, nil)
+	return client, server
+}
 
 
-	server := Server(conn2, nil)
+func TestPing(t *testing.T) {
+	client, server := testClientServer()
+	defer client.Close()
 	defer server.Close()
 	defer server.Close()
 
 
 	rtt, err := client.Ping()
 	rtt, err := client.Ping()
@@ -57,11 +61,8 @@ func TestPing(t *testing.T) {
 }
 }
 
 
 func TestAccept(t *testing.T) {
 func TestAccept(t *testing.T) {
-	conn1, conn2 := testConn()
-	client := Client(conn1, nil)
+	client, server := testClientServer()
 	defer client.Close()
 	defer client.Close()
-
-	server := Server(conn2, nil)
 	defer server.Close()
 	defer server.Close()
 
 
 	wg := &sync.WaitGroup{}
 	wg := &sync.WaitGroup{}
@@ -137,11 +138,8 @@ func TestAccept(t *testing.T) {
 }
 }
 
 
 func TestSendData_Small(t *testing.T) {
 func TestSendData_Small(t *testing.T) {
-	conn1, conn2 := testConn()
-	client := Client(conn1, nil)
+	client, server := testClientServer()
 	defer client.Close()
 	defer client.Close()
-
-	server := Server(conn2, nil)
 	defer server.Close()
 	defer server.Close()
 
 
 	wg := &sync.WaitGroup{}
 	wg := &sync.WaitGroup{}
@@ -208,11 +206,8 @@ func TestSendData_Small(t *testing.T) {
 }
 }
 
 
 func TestSendData_Large(t *testing.T) {
 func TestSendData_Large(t *testing.T) {
-	conn1, conn2 := testConn()
-	client := Client(conn1, nil)
+	client, server := testClientServer()
 	defer client.Close()
 	defer client.Close()
-
-	server := Server(conn2, nil)
 	defer server.Close()
 	defer server.Close()
 
 
 	data := make([]byte, 512*1024)
 	data := make([]byte, 512*1024)

+ 1 - 1
stream.go

@@ -48,10 +48,10 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
 		id:         id,
 		id:         id,
 		session:    session,
 		session:    session,
 		state:      state,
 		state:      state,
+		sendHdr:    header(make([]byte, headerSize)),
 		recvWindow: initialStreamWindow,
 		recvWindow: initialStreamWindow,
 		sendWindow: initialStreamWindow,
 		sendWindow: initialStreamWindow,
 		notifyCh:   make(chan struct{}, 1),
 		notifyCh:   make(chan struct{}, 1),
-		sendHdr:    header(make([]byte, headerSize)),
 	}
 	}
 	return s
 	return s
 }
 }