Browse Source

Complete redesign of the network connection API, it should make much more sense now.

Daniele Alessandri 14 years ago
parent
commit
89c311d0de

+ 6 - 9
lib/Predis/Client.php

@@ -67,9 +67,8 @@ class Client {
 
         $options = $this->_options;
         $connection = self::newConnection($parameters);
-        $protocol = $connection->getProtocol();
-        $protocol->setOption('iterable_multibulk', $options->iterable_multibulk);
-        $protocol->setOption('throw_errors', $options->throw_errors);
+        $connection->setProtocolOption('iterable_multibulk', $options->iterable_multibulk);
+        $connection->setProtocolOption('throw_errors', $options->throw_errors);
         $this->pushInitCommands($connection);
 
         $callback = $this->_options->on_connection_initialized;
@@ -225,10 +224,8 @@ class Client {
     private static function ensureDefaultSchemes() {
         if (!isset(self::$_connectionSchemes)) {
             self::$_connectionSchemes = array(
-                'tcp'   => '\Predis\Network\TcpConnection',
-                'unix'  => '\Predis\Network\UnixDomainSocketConnection',
-                // Compatibility with older versions.
-                'redis' => '\Predis\Network\TcpConnection',
+                'tcp'   => '\Predis\Network\StreamConnection',
+                'unix'  => '\Predis\Network\StreamConnection',
             );
         }
     }
@@ -252,9 +249,9 @@ class Client {
         return self::$_connectionSchemes[$scheme];
     }
 
-    private static function newConnection(ConnectionParameters $parameters, IRedisProtocol $protocol = null) {
+    private static function newConnection(ConnectionParameters $parameters) {
         $connection = self::getConnectionClass($parameters->scheme);
-        return new $connection($parameters, $protocol);
+        return new $connection($parameters);
     }
 
     public static function newConnectionByScheme($scheme, $parameters = array()) {

+ 1 - 2
lib/Predis/Iterators/MultiBulkResponseSimple.php

@@ -10,7 +10,6 @@ class MultiBulkResponseSimple extends MultiBulkResponse {
 
     public function __construct(IConnectionSingle $connection, $size) {
         $this->_connection = $connection;
-        $this->_protocol   = $connection->getProtocol();
         $this->_position   = 0;
         $this->_current    = $size > 0 ? $this->getValue() : null;
         $this->_replySize  = $size;
@@ -39,6 +38,6 @@ class MultiBulkResponseSimple extends MultiBulkResponse {
     }
 
     protected function getValue() {
-        return $this->_protocol->read($this->_connection);
+        return $this->_connection->read();
     }
 }

+ 87 - 0
lib/Predis/Network/ComposableStreamConnection.php

@@ -0,0 +1,87 @@
+<?php
+
+namespace Predis\Network;
+
+use Predis\ICommand;
+use Predis\ConnectionParameters;
+use Predis\CommunicationException;
+use Predis\Protocols\IRedisProtocol;
+use Predis\Protocols\TextProtocol;
+
+class ComposableStreamConnection extends StreamConnection implements IConnectionComposable {
+    private $_protocol;
+
+    public function __construct(ConnectionParameters $parameters, IRedisProtocol $protocol = null) {
+        parent::__construct($parameters);
+        $this->_protocol = $protocol ?: new TextProtocol();
+    }
+
+    public function setProtocol(IRedisProtocol $protocol) {
+        if ($protocol === null) {
+            throw new \InvalidArgumentException("The protocol instance cannot be a null value");
+        }
+        $this->_protocol = $protocol;
+    }
+
+    public function getProtocol() {
+        return $this->_protocol;
+    }
+
+    public function setProtocolOption($option, $value) {
+        return $this->_protocol->setOption($option, $value);
+    }
+
+    public function writeBytes($value) {
+        $socket = $this->getResource();
+        while (($length = strlen($value)) > 0) {
+            $written = fwrite($socket, $value);
+            if ($length === $written) {
+                return true;
+            }
+            if ($written === false || $written === 0) {
+                $this->onCommunicationException('Error while writing bytes to the server');
+            }
+            $value = substr($value, $written);
+        }
+        return true;
+    }
+
+    public function readBytes($length) {
+        if ($length <= 0) {
+            throw new \InvalidArgumentException('Length parameter must be greater than 0');
+        }
+        $socket = $this->getResource();
+        $value  = '';
+        do {
+            $chunk = fread($socket, $length);
+            if ($chunk === false || $chunk === '') {
+                $this->onCommunicationException('Error while reading bytes from the server');
+            }
+            $value .= $chunk;
+        }
+        while (($length -= strlen($chunk)) > 0);
+        return $value;
+    }
+
+    public function readLine() {
+        $socket = $this->getResource();
+        $value  = '';
+        do {
+            $chunk = fgets($socket);
+            if ($chunk === false || $chunk === '') {
+                $this->onCommunicationException('Error while reading line from the server');
+            }
+            $value .= $chunk;
+        }
+        while (substr($value, -2) !== "\r\n");
+        return substr($value, 0, -2);
+    }
+
+    public function writeCommand(ICommand $command) {
+        $this->_protocol->write($this, $command);
+    }
+
+    public function read() {
+        return $this->_protocol->read($this);
+    }
+}

+ 6 - 25
lib/Predis/Network/ConnectionBase.php

@@ -7,16 +7,14 @@ use Predis\ICommand;
 use Predis\ConnectionParameters;
 use Predis\ClientException;
 use Predis\CommunicationException;
-use Predis\Protocols\IRedisProtocol;
 
 abstract class ConnectionBase implements IConnectionSingle {
-    private $_cachedId;
-    protected $_params, $_socket, $_initCmds, $_protocol;
+    private $_cachedId, $_resource;
+    protected $_params, $_initCmds;
 
-    public function __construct(ConnectionParameters $parameters, IRedisProtocol $protocol) {
+    public function __construct(ConnectionParameters $parameters) {
         $this->_initCmds = array();
         $this->_params   = $parameters;
-        $this->_protocol = $protocol;
     }
 
     public function __destruct() {
@@ -24,7 +22,7 @@ abstract class ConnectionBase implements IConnectionSingle {
     }
 
     public function isConnected() {
-        return is_resource($this->_socket);
+        return is_resource($this->_resource);
     }
 
     protected abstract function createResource();
@@ -33,13 +31,7 @@ abstract class ConnectionBase implements IConnectionSingle {
         if ($this->isConnected()) {
             throw new ClientException('Connection already estabilished');
         }
-        $this->createResource();
-    }
-
-    public function disconnect() {
-        if ($this->isConnected()) {
-            fclose($this->_socket);
-        }
+        $this->_resource = $this->createResource();
     }
 
     public function pushInitCommand(ICommand $command){
@@ -64,24 +56,13 @@ abstract class ConnectionBase implements IConnectionSingle {
         if (!$this->isConnected()) {
             $this->connect();
         }
-        return $this->_socket;
+        return $this->_resource;
     }
 
     public function getParameters() {
         return $this->_params;
     }
 
-    public function getProtocol() {
-        return $this->_protocol;
-    }
-
-    public function setProtocol(IRedisProtocol $protocol) {
-        if ($protocol === null) {
-            throw new \InvalidArgumentException("The protocol instance cannot be a null value");
-        }
-        $this->_protocol = $protocol;
-    }
-
     public function __toString() {
         if (!isset($this->_cachedId)) {
             $this->_cachedId = "{$this->_params->host}:{$this->_params->port}";

+ 13 - 0
lib/Predis/Network/IConnectionComposable.php

@@ -0,0 +1,13 @@
+<?php
+
+namespace Predis\Network;
+
+use Predis\Protocols\IRedisProtocol;
+
+interface IConnectionComposable extends IConnectionSingle {
+    public function setProtocol(IRedisProtocol $protocol);
+    public function getProtocol();
+    public function writeBytes($buffer);
+    public function readBytes($length);
+    public function readLine();
+}

+ 4 - 7
lib/Predis/Network/IConnectionSingle.php

@@ -3,15 +3,12 @@
 namespace Predis\Network;
 
 use Predis\ICommand;
-use Predis\Protocols\IRedisProtocol;
 
 interface IConnectionSingle extends IConnection {
-    public function getParameters();
-    public function getProtocol();
-    public function setProtocol(IRedisProtocol $protocol);
     public function __toString();
-    public function writeBytes($buffer);
-    public function readBytes($length);
-    public function readLine();
+    public function getResource();
+    public function getParameters();
+    public function setProtocolOption($option, $value);
     public function pushInitCommand(ICommand $command);
+    public function read();
 }

+ 215 - 0
lib/Predis/Network/StreamConnection.php

@@ -0,0 +1,215 @@
+<?php
+
+namespace Predis\Network;
+
+use Predis\ICommand;
+use Predis\ResponseError;
+use Predis\ResponseQueued;
+use Predis\ServerException;
+use Predis\ConnectionParameters;
+use Predis\CommunicationException;
+use Predis\Protocols\TextCommandSerializer;
+use Predis\Iterators\MultiBulkResponseSimple;
+
+class StreamConnection extends ConnectionBase {
+    private $_commandSerializer, $_mbiterable, $_throwErrors;
+
+    public function __construct(ConnectionParameters $parameters) {
+        parent::__construct($this->checkParameters($parameters));
+        $this->_commandSerializer = new TextCommandSerializer();
+    }
+
+    public function __destruct() {
+        if (!$this->_params->connection_persistent) {
+            $this->disconnect();
+        }
+    }
+
+    protected function checkParameters(ConnectionParameters $parameters) {
+        switch ($parameters->scheme) {
+            case 'unix':
+                $pathToSocket = $parameters->path;
+                if (!isset($pathToSocket)) {
+                    throw new \InvalidArgumentException('Missing UNIX domain socket path');
+                }
+                if (!file_exists($pathToSocket)) {
+                    throw new \InvalidArgumentException("Could not find $pathToSocket");
+                }
+            case 'tcp':
+                return $parameters;
+            default:
+                throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}");
+        }
+    }
+
+    protected function createResource() {
+        $parameters = $this->_params;
+        $initializer = array($this, "{$parameters->scheme}StreamInitializer");
+        return call_user_func($initializer, $parameters);
+    }
+
+    private function tcpStreamInitializer(ConnectionParameters $parameters) {
+        $uri = sprintf('tcp://%s:%d/', $parameters->host, $parameters->port);
+        $flags = STREAM_CLIENT_CONNECT;
+        if ($parameters->connection_async) {
+            $flags |= STREAM_CLIENT_ASYNC_CONNECT;
+        }
+        if ($parameters->connection_persistent) {
+            $flags |= STREAM_CLIENT_PERSISTENT;
+        }
+        $resource = @stream_socket_client(
+            $uri, $errno, $errstr, $parameters->connection_timeout, $flags
+        );
+        if (!$resource) {
+            $this->onCommunicationException(trim($errstr), $errno);
+        }
+        if (isset($parameters->read_write_timeout)) {
+            $timeoutSeconds  = floor($parameters->read_write_timeout);
+            $timeoutUSeconds = ($parameters->read_write_timeout - $timeoutSeconds) * 1000000;
+            stream_set_timeout($resource, $timeoutSeconds, $timeoutUSeconds);
+        }
+        return $resource;
+    }
+
+    private function unixStreamInitializer(ConnectionParameters $parameters) {
+        $uri = sprintf('unix:///%s', $parameters->path);
+        $flags = STREAM_CLIENT_CONNECT;
+        if ($parameters->connection_persistent) {
+            $flags |= STREAM_CLIENT_PERSISTENT;
+        }
+        $resource = @stream_socket_client(
+            $uri, $errno, $errstr, $parameters->connection_timeout, $flags
+        );
+        if (!$resource) {
+            $this->onCommunicationException(trim($errstr), $errno);
+        }
+        return $resource;
+    }
+
+    public function connect() {
+        parent::connect();
+        if (count($this->_initCmds) > 0){
+            $this->sendInitializationCommands();
+        }
+    }
+
+    public function disconnect() {
+        if ($this->isConnected()) {
+            fclose($this->getResource());
+        }
+    }
+
+    private function sendInitializationCommands() {
+        foreach ($this->_initCmds as $command) {
+            $this->writeCommand($command);
+        }
+        foreach ($this->_initCmds as $command) {
+            $this->readResponse($command);
+        }
+    }
+
+    private function write($buffer) {
+        $socket = $this->getResource();
+        while (($length = strlen($buffer)) > 0) {
+            $written = fwrite($socket, $buffer);
+            if ($length === $written) {
+                return;
+            }
+            if ($written === false || $written === 0) {
+                $this->onCommunicationException(
+                    'Error while writing bytes to the server'
+                );
+            }
+            $value = substr($buffer, $written);
+        }
+    }
+
+    public function read() {
+        $socket = $this->getResource();
+        $chunk  = fgets($socket);
+        if ($chunk === false || $chunk === '') {
+            $this->onCommunicationException(
+                'Error while reading line from the server'
+            );
+        }
+        $prefix  = $chunk[0];
+        $payload = substr($chunk, 1, -2);
+        switch ($prefix) {
+            case '+':    // inline
+                switch ($payload) {
+                    case 'OK':
+                        return true;
+                    case 'QUEUED':
+                        return new ResponseQueued();
+                    default:
+                        return $payload;
+                }
+
+            case '$':    // bulk
+                $size = (int) $payload;
+                if ($size === -1) {
+                    return null;
+                }
+                $bulkData = '';
+                $bytesLeft = ($size += 2);
+                do {
+                    $chunk = fread($socket, min($bytesLeft, 4096));
+                    if ($chunk === false || $chunk === '') {
+                        $this->onCommunicationException(
+                            'Error while reading bytes from the server'
+                        );
+                    }
+                    $bulkData .= $chunk;
+                    $bytesLeft = $size - strlen($bulkData);
+                } while ($bytesLeft > 0);
+                return substr($bulkData, 0, -2);
+
+            case '*':    // multi bulk
+                $count = (int) $payload;
+                if ($count === -1) {
+                    return null;
+                }
+                if ($this->_mbiterable === true) {
+                    return new MultiBulkResponseSimple($this, $count);
+                }
+                $multibulk = array();
+                for ($i = 0; $i < $count; $i++) {
+                    $multibulk[$i] = $this->read();
+                }
+                return $multibulk;
+
+            case ':':    // integer
+                return (int) $payload;
+
+            case '-':    // error
+                $errorMessage = substr($payload, 4);
+                if ($this->_throwErrors) {
+                    throw new ServerException($errorMessage);
+                }
+                return new ResponseError($errorMessage);
+
+            default:
+                $this->onCommunicationException("Unknown prefix: '$prefix'");
+        }
+    }
+
+    public function writeCommand(ICommand $command) {
+        $this->write($this->_commandSerializer->serialize($command));
+    }
+
+    public function readResponse(ICommand $command) {
+        $reply = $this->read();
+        return isset($reply->skipParse) ? $reply : $command->parseResponse($reply);
+    }
+
+    public function setProtocolOption($option, $value) {
+        switch ($option) {
+            case 'iterable_multibulk':
+                $this->_mbiterable = (bool) $value;
+                break;
+            case 'throw_errors':
+                $this->_throwErrors = (bool) $value;
+                break;
+        }
+    }
+}

+ 0 - 124
lib/Predis/Network/TcpConnection.php

@@ -1,124 +0,0 @@
-<?php
-
-namespace Predis\Network;
-
-use Predis\ICommand;
-use Predis\ConnectionParameters;
-use Predis\CommunicationException;
-use Predis\Protocols\IRedisProtocol;
-use Predis\Protocols\TextProtocol;
-
-class TcpConnection extends ConnectionBase implements IConnectionSingle {
-    public function __construct(ConnectionParameters $parameters, IRedisProtocol $protocol = null) {
-        parent::__construct($this->checkParameters($parameters), $protocol ?: new TextProtocol());
-    }
-
-    public function __destruct() {
-        if (!$this->_params->connection_persistent) {
-            $this->disconnect();
-        }
-    }
-
-    protected function checkParameters(ConnectionParameters $parameters) {
-        $scheme = $parameters->scheme;
-        if ($scheme != 'tcp' && $scheme != 'redis') {
-            throw new \InvalidArgumentException("Invalid scheme: {$scheme}");
-        }
-        return $parameters;
-    }
-
-    protected function createResource() {
-        $uri = sprintf('tcp://%s:%d/', $this->_params->host, $this->_params->port);
-        $connectFlags = STREAM_CLIENT_CONNECT;
-        if ($this->_params->connection_async) {
-            $connectFlags |= STREAM_CLIENT_ASYNC_CONNECT;
-        }
-        if ($this->_params->connection_persistent) {
-            $connectFlags |= STREAM_CLIENT_PERSISTENT;
-        }
-        $this->_socket = @stream_socket_client(
-            $uri, $errno, $errstr, $this->_params->connection_timeout, $connectFlags
-        );
-
-        if (!$this->_socket) {
-            $this->onCommunicationException(trim($errstr), $errno);
-        }
-
-        if (isset($this->_params->read_write_timeout)) {
-            $timeoutSeconds  = floor($this->_params->read_write_timeout);
-            $timeoutUSeconds = ($this->_params->read_write_timeout - $timeoutSeconds) * 1000000;
-            stream_set_timeout($this->_socket, $timeoutSeconds, $timeoutUSeconds);
-        }
-    }
-
-    private function sendInitializationCommands() {
-        foreach ($this->_initCmds as $command) {
-            $this->writeCommand($command);
-        }
-        foreach ($this->_initCmds as $command) {
-            $this->readResponse($command);
-        }
-    }
-
-    public function connect() {
-        parent::connect();
-        if (count($this->_initCmds) > 0){
-            $this->sendInitializationCommands();
-        }
-    }
-
-    public function writeCommand(ICommand $command) {
-        $this->_protocol->write($this, $command);
-    }
-
-    public function readResponse(ICommand $command) {
-        $response = $this->_protocol->read($this);
-        return isset($response->skipParse) ? $response : $command->parseResponse($response);
-    }
-
-    public function writeBytes($value) {
-        $socket = $this->getResource();
-        while (($length = strlen($value)) > 0) {
-            $written = fwrite($socket, $value);
-            if ($length === $written) {
-                return true;
-            }
-            if ($written === false || $written === 0) {
-                $this->onCommunicationException('Error while writing bytes to the server');
-            }
-            $value = substr($value, $written);
-        }
-        return true;
-    }
-
-    public function readBytes($length) {
-        if ($length <= 0) {
-            throw new \InvalidArgumentException('Length parameter must be greater than 0');
-        }
-        $socket = $this->getResource();
-        $value  = '';
-        do {
-            $chunk = fread($socket, $length);
-            if ($chunk === false || $chunk === '') {
-                $this->onCommunicationException('Error while reading bytes from the server');
-            }
-            $value .= $chunk;
-        }
-        while (($length -= strlen($chunk)) > 0);
-        return $value;
-    }
-
-    public function readLine() {
-        $socket = $this->getResource();
-        $value  = '';
-        do {
-            $chunk = fgets($socket);
-            if ($chunk === false || $chunk === '') {
-                $this->onCommunicationException('Error while reading line from the server');
-            }
-            $value .= $chunk;
-        }
-        while (substr($value, -2) !== "\r\n");
-        return substr($value, 0, -2);
-    }
-}

+ 0 - 36
lib/Predis/Network/UnixDomainSocketConnection.php

@@ -1,36 +0,0 @@
-<?php
-
-namespace Predis\Network;
-
-use Predis\ConnectionParameters;
-use Predis\CommunicationException;
-
-class UnixDomainSocketConnection extends TcpConnection {
-    protected function checkParameters(ConnectionParameters $parameters) {
-        if ($parameters->scheme != 'unix') {
-            throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}");
-        }
-        $pathToSocket = $parameters->path;
-        if (!isset($pathToSocket)) {
-            throw new \InvalidArgumentException('Missing UNIX domain socket path');
-        }
-        if (!file_exists($pathToSocket)) {
-            throw new \InvalidArgumentException("Could not find $pathToSocket");
-        }
-        return $parameters;
-    }
-
-    protected function createResource() {
-        $uri = sprintf('unix:///%s', $this->_params->path);
-        $connectFlags = STREAM_CLIENT_CONNECT;
-        if ($this->_params->connection_persistent) {
-            $connectFlags |= STREAM_CLIENT_PERSISTENT;
-        }
-        $this->_socket = @stream_socket_client(
-            $uri, $errno, $errstr, $this->_params->connection_timeout, $connectFlags
-        );
-        if (!$this->_socket) {
-            $this->onCommunicationException(trim($errstr), $errno);
-        }
-    }
-}

+ 3 - 3
lib/Predis/Protocols/ComposableTextProtocol.php

@@ -3,7 +3,7 @@
 namespace Predis\Protocols;
 
 use Predis\ICommand;
-use Predis\Network\IConnectionSingle;
+use Predis\Network\IConnectionComposable;
 
 class ComposableTextProtocol implements IRedisProtocolExtended {
     private $_serializer, $_reader;
@@ -43,11 +43,11 @@ class ComposableTextProtocol implements IRedisProtocolExtended {
         return $this->_serializer->serialize($command);
     }
 
-    public function write(IConnectionSingle $connection, ICommand $command) {
+    public function write(IConnectionComposable $connection, ICommand $command) {
         $connection->writeBytes($this->_serializer->serialize($command));
     }
 
-    public function read(IConnectionSingle $connection) {
+    public function read(IConnectionComposable $connection) {
         return $this->_reader->read($connection);
     }
 

+ 3 - 3
lib/Predis/Protocols/IRedisProtocol.php

@@ -3,10 +3,10 @@
 namespace Predis\Protocols;
 
 use Predis\ICommand;
-use Predis\Network\IConnectionSingle;
+use Predis\Network\IConnectionComposable;
 
 interface IRedisProtocol {
-    public function write(IConnectionSingle $connection, ICommand $command);
-    public function read(IConnectionSingle $connection);
+    public function write(IConnectionComposable $connection, ICommand $command);
+    public function read(IConnectionComposable $connection);
     public function setOption($option, $value);
 }

+ 2 - 2
lib/Predis/Protocols/IResponseHandler.php

@@ -2,8 +2,8 @@
 
 namespace Predis\Protocols;
 
-use Predis\Network\IConnectionSingle;
+use Predis\Network\IConnectionComposable;
 
 interface IResponseHandler {
-    function handle(IConnectionSingle $connection, $payload);
+    function handle(IConnectionComposable $connection, $payload);
 }

+ 2 - 2
lib/Predis/Protocols/ResponseBulkHandler.php

@@ -5,10 +5,10 @@ namespace Predis\Protocols;
 use Predis\Utils;
 use Predis\CommunicationException;
 use Predis\MalformedServerResponse;
-use Predis\Network\IConnectionSingle;
+use Predis\Network\IConnectionComposable;
 
 class ResponseBulkHandler implements IResponseHandler {
-    public function handle(IConnectionSingle $connection, $lengthString) {
+    public function handle(IConnectionComposable $connection, $lengthString) {
         $length = (int) $lengthString;
         if ($length != $lengthString) {
             Utils::onCommunicationException(new MalformedServerResponse(

+ 4 - 3
lib/Predis/Protocols/ResponseErrorHandler.php

@@ -2,10 +2,11 @@
 
 namespace Predis\Protocols;
 
-use Predis\Network\IConnectionSingle;
+use Predis\ServerException;
+use Predis\Network\IConnectionComposable;
 
 class ResponseErrorHandler implements IResponseHandler {
-    public function handle(IConnectionSingle $connection, $errorMessage) {
-        throw new \Predis\ServerException(substr($errorMessage, 4));
+    public function handle(IConnectionComposable $connection, $errorMessage) {
+        throw new ServerException(substr($errorMessage, 4));
     }
 }

+ 4 - 3
lib/Predis/Protocols/ResponseErrorSilentHandler.php

@@ -2,10 +2,11 @@
 
 namespace Predis\Protocols;
 
-use Predis\Network\IConnectionSingle;
+use Predis\ResponseError;
+use Predis\Network\IConnectionComposable;
 
 class ResponseErrorSilentHandler implements IResponseHandler {
-    public function handle(IConnectionSingle $connection, $errorMessage) {
-        return new \Predis\ResponseError(substr($errorMessage, 4));
+    public function handle(IConnectionComposable $connection, $errorMessage) {
+        return new ResponseError(substr($errorMessage, 4));
     }
 }

+ 2 - 3
lib/Predis/Protocols/ResponseIntegerHandler.php

@@ -3,12 +3,11 @@
 namespace Predis\Protocols;
 
 use Predis\Utils;
-use Predis\CommunicationException;
 use Predis\MalformedServerResponse;
-use Predis\Network\IConnectionSingle;
+use Predis\Network\IConnectionComposable;
 
 class ResponseIntegerHandler implements IResponseHandler {
-    public function handle(IConnectionSingle $connection, $number) {
+    public function handle(IConnectionComposable $connection, $number) {
         if (is_numeric($number)) {
             return (int) $number;
         }

+ 2 - 3
lib/Predis/Protocols/ResponseMultiBulkHandler.php

@@ -3,12 +3,11 @@
 namespace Predis\Protocols;
 
 use Predis\Utils;
-use Predis\CommunicationException;
 use Predis\MalformedServerResponse;
-use Predis\Network\IConnectionSingle;
+use Predis\Network\IConnectionComposable;
 
 class ResponseMultiBulkHandler implements IResponseHandler {
-    public function handle(IConnectionSingle $connection, $lengthString) {
+    public function handle(IConnectionComposable $connection, $lengthString) {
         $length = (int) $lengthString;
         if ($length != $lengthString) {
             Utils::onCommunicationException(new MalformedServerResponse(

+ 2 - 3
lib/Predis/Protocols/ResponseMultiBulkStreamHandler.php

@@ -3,13 +3,12 @@
 namespace Predis\Protocols;
 
 use Predis\Utils;
-use Predis\CommunicationException;
 use Predis\MalformedServerResponse;
-use Predis\Network\IConnectionSingle;
+use Predis\Network\IConnectionComposable;
 use Predis\Iterators\MultiBulkResponseSimple;
 
 class ResponseMultiBulkStreamHandler implements IResponseHandler {
-    public function handle(IConnectionSingle $connection, $lengthString) {
+    public function handle(IConnectionComposable $connection, $lengthString) {
         $length = (int) $lengthString;
         if ($length != $lengthString) {
             Utils::onCommunicationException(new MalformedServerResponse(

+ 10 - 8
lib/Predis/Protocols/ResponseStatusHandler.php

@@ -2,16 +2,18 @@
 
 namespace Predis\Protocols;
 
-use Predis\Network\IConnectionSingle;
+use Predis\ResponseQueued;
+use Predis\Network\IConnectionComposable;
 
 class ResponseStatusHandler implements IResponseHandler {
-    public function handle(IConnectionSingle $connection, $status) {
-        if ($status === 'OK') {
-            return true;
+    public function handle(IConnectionComposable $connection, $status) {
+        switch ($status) {
+            case 'OK':
+                return true;
+            case 'QUEUED':
+                return new ResponseQueued();
+            default:
+                return $status;
         }
-        if ($status === 'QUEUED') {
-            return new \Predis\ResponseQueued();
-        }
-        return $status;
     }
 }

+ 16 - 46
lib/Predis/Protocols/TextProtocol.php

@@ -4,7 +4,7 @@ namespace Predis\Protocols;
 
 use Predis\ICommand;
 use Predis\CommunicationException;
-use Predis\Network\IConnectionSingle;
+use Predis\Network\IConnectionComposable;
 use Predis\Iterators\MultiBulkResponseSimple;
 
 class TextProtocol implements IRedisProtocol {
@@ -20,7 +20,7 @@ class TextProtocol implements IRedisProtocol {
     const PREFIX_BULK       = '$';
     const PREFIX_MULTI_BULK = '*';
 
-    const BUFFER_SIZE = 8192;
+    const BUFFER_SIZE = 4096;
 
     private $_mbiterable, $_throwErrors, $_serializer;
 
@@ -30,61 +30,31 @@ class TextProtocol implements IRedisProtocol {
         $this->_serializer  = new TextCommandSerializer();
     }
 
-    public function write(IConnectionSingle $connection, ICommand $command) {
-        $buffer = $this->_serializer->serialize($command);
-        $socket = $connection->getResource();
-        while (($length = strlen($buffer)) > 0) {
-            $written = fwrite($socket, $buffer);
-            if ($length === $written) {
-                return;
-            }
-            if ($written === false || $written === 0) {
-                throw new CommunicationException(
-                    $connection, 'Error while writing bytes to the server'
-                );
-            }
-            $value = substr($buffer, $written);
-        }
+    public function write(IConnectionComposable $connection, ICommand $command) {
+        $connection->writeBytes($this->_serializer->serialize($command));
     }
 
-    public function read(IConnectionSingle $connection) {
-        $socket = $connection->getResource();
-        $chunk  = fgets($socket);
-        if ($chunk === false || $chunk === '') {
-            throw new CommunicationException(
-                $connection, 'Error while reading line from the server'
-            );
-        }
-        $prefix  = $chunk[0];
-        $payload = substr($chunk, 1, -2);
+    public function read(IConnectionComposable $connection) {
+        $chunk = $connection->readLine();
+        $prefix = $chunk[0];
+        $payload = substr($chunk, 1);
         switch ($prefix) {
             case '+':    // inline
-                if ($payload === 'OK') {
-                    return true;
-                }
-                if ($payload === 'QUEUED') {
-                    return new \Predis\ResponseQueued();
+                switch ($payload) {
+                    case 'OK':
+                        return true;
+                    case 'QUEUED':
+                        return new ResponseQueued();
+                    default:
+                        return $payload;
                 }
-                return $payload;
 
             case '$':    // bulk
                 $size = (int) $payload;
                 if ($size === -1) {
                     return null;
                 }
-                $bulkData = '';
-                $bytesLeft = ($size += 2);
-                do {
-                    $chunk = fread($socket, min($bytesLeft, self::BUFFER_SIZE));
-                    if ($chunk === false || $chunk === '') {
-                        throw new CommunicationException(
-                            $connection, 'Error while reading bytes from the server'
-                        );
-                    }
-                    $bulkData .= $chunk;
-                    $bytesLeft = $size - strlen($bulkData);
-                } while ($bytesLeft > 0);
-                return substr($bulkData, 0, -2);
+                return substr($connection->readBytes($size + 2), 0, -2);
 
             case '*':    // multi bulk
                 $count = (int) $payload;

+ 3 - 3
lib/Predis/Protocols/TextResponseReader.php

@@ -5,7 +5,7 @@ namespace Predis\Protocols;
 use Predis\Utils;
 use Predis\CommunicationException;
 use Predis\MalformedServerResponse;
-use Predis\Network\IConnectionSingle;
+use Predis\Network\IConnectionComposable;
 
 class TextResponseReader implements IResponseReader {
     private $_prefixHandlers;
@@ -34,7 +34,7 @@ class TextResponseReader implements IResponseReader {
         }
     }
 
-    public function read(IConnectionSingle $connection) {
+    public function read(IConnectionComposable $connection) {
         $header = $connection->readLine();
         if ($header === '') {
             $this->throwMalformedResponse($connection, 'Unexpected empty header');
@@ -48,7 +48,7 @@ class TextResponseReader implements IResponseReader {
         return $handler->handle($connection, substr($header, 1));
     }
 
-    private function throwMalformedResponse(IConnectionSingle $connection, $message) {
+    private function throwMalformedResponse(IConnectionComposable $connection, $message) {
         Utils::onCommunicationException(new MalformedServerResponse(
             $connection, $message
         ));

+ 1 - 4
lib/Predis/PubSubContext.php

@@ -130,10 +130,7 @@ class PubSubContext implements \Iterator {
     }
 
     private function getValue() {
-        $connection = $this->_client->getConnection();
-        $protocol   = $connection->getProtocol();
-        $response   = $protocol->read($connection);
-
+        $response = $this->_client->getConnection()->read();
         switch ($response[0]) {
             case self::SUBSCRIBE:
             case self::UNSUBSCRIBE:

+ 1 - 1
test/PredisShared.php

@@ -74,7 +74,7 @@ class RC {
     public static function helperForBlockingPops($op) {
         // TODO: I admit that this helper is kinda lame and it does not run
         //       in a separate process to properly test BLPOP/BRPOP
-        $redisUri = sprintf('redis://%s:%d/?database=%d', RC::SERVER_HOST, RC::SERVER_PORT, RC::DEFAULT_DATABASE);
+        $redisUri = sprintf('tcp://%s:%d/?database=%d', RC::SERVER_HOST, RC::SERVER_PORT, RC::DEFAULT_DATABASE);
         $handle = popen('php', 'w');
         fwrite($handle, "<?php
         require __DIR__.'/test/bootstrap.php';