فهرست منبع

Merge branch 'hyves_contribs_port' into master. Huge thanks to Lorenzo Castelli for a lot of awesome improvements and ideas.

Daniele Alessandri 15 سال پیش
والد
کامیت
39052b181e
4فایلهای تغییر یافته به همراه534 افزوده شده و 161 حذف شده
  1. 44 10
      CHANGELOG
  2. 6 5
      README.markdown
  3. 474 138
      lib/Predis.php
  4. 10 8
      test/RedisCommandsTest.php

+ 44 - 10
CHANGELOG

@@ -1,4 +1,4 @@
-v0.x.x
+v0.6.0
   * New commands added for the Redis 2.0 (DEV) profile: 
       - Strings: APPEND, SUBSTR
       - ZSets  : ZCOUNT, ZRANK, ZUNION, ZINTER, ZREMBYRANK, ZREVRANK
@@ -20,17 +20,51 @@ v0.x.x
 
   * MultiExecBlock instances can handle the new DISCARD command.
 
-  * Connections can now be identified by an alias using the "alias" parameter 
-    in the connection arguments. When connected to a cluster of Redis servers, 
-    this is useful to get a certain connection out of a cluster of connections.
+  * Introduced client-level options with the new Predis\ClientOptions class. 
+    Options can be passed to Predis\Client::__construct in its second argument 
+    as an array or an instance of Predis\ClientOptions. For brevity's sake and 
+    compatibility with older versions, the constructor of Predis\Client still 
+    accepts an instance of Predis\RedisServerProfile in its second argument.
+    The currently supported client options are:
+      - profile [default: "2.0" as of Predis 0.6.0]
+        specifies which server profile to use when connecting to Redis. This 
+        option accepts an instance of Predis\RedisServerProfile or a string 
+        that indicates the target version.
+      - key_distribution [default: Predis\Utilities\HashRing]
+        specifies which key distribution algorithm to use to distribute keys 
+        among the servers that compose a cluster. This option accepts an 
+        instance of Predis\Utilities\IRing so that users can implement their 
+        own key distribution strategy. The new Predis\Utilities\KetamaPureRing 
+        class provides a pure-PHP implementation of the Ketama algorithm.
+      - throw_on_error [default: TRUE]
+        server errors can optionally be handled "silently": instead of throwing 
+        an exception, the client returns an error response type.
+      - iterable_multibulk [EXPERIMENTAL - default: FALSE]
+        in addition to the classic way of fetching a whole multibulk reply 
+        into an array, the client can now optionally stream a multibulk reply 
+        down to the user code by using PHP iterators. It is just a little bit 
+        slower, but it can save a lot of memory in certain scenarios.
 
-  * In addition to the classic way of fetching a whole multibulk reply into an 
-    array, the client can now optionally stream a multibulk reply down to the 
-    user code by using PHP iterators. It is just a little bit slower, but it 
-    can save a lot of memory in certain scenarios.
+  * New parameters for connections: 
+      - alias [default: not set]
+        every connection can now be identified by an alias that is useful to 
+        get a certain connection when connected to a cluster of Redis servers.
+      - weight [default: not set]
+        allows the client to balance the keys asymmetrically across multiple 
+        servers. This might be useful when you have servers with different 
+        amounts of memory and you want to distribute the load of your keys 
+        accordingly.
+      - connection_async [default: FALSE]
+        estabilish connections to servers in a non-blocking way, so that the 
+        client is not blocked while the underlying resource performs the actual 
+        connection.
+      - connection_persistent [default: FALSE]
+        the underlying connection resource is left open when a script ends its 
+        lifecycle. Persistent connections can lead to unpredictable or strange 
+        behaviours, so they should be used with extreme care.
 
-  * Server errors can optionally be handled "silently": instead of throwing an 
-    exception, the client returns a error response type.
+  * Connections now support float values for the connection_timeout parameter 
+    to express timeouts with a microsecond resolution.
 
 v0.5.1
   * RPOPLPUSH has been changed from bulk command to inline command in Redis

+ 6 - 5
README.markdown

@@ -3,9 +3,7 @@
 ## About ##
 
 Predis is a flexible and feature-complete PHP client library for the Redis key-value 
-database.
-
-Predis is currently a work-in-progress and it comes in two flavors:
+database. It currently comes in two flavors:
 
  - the mainline client library, which targets PHP 5.3.x and leverages a lot of the 
    features introduced in this new version of the PHP interpreter.
@@ -17,9 +15,9 @@ Please refer to the TODO file to see which issues are still pending and what is
 to be implemented soon in Predis.
 
 
-## Features ##
+## Main features ##
 
-- Client-side sharding (support for consistent hashing of keys)
+- Client-side sharding (support for consistent hashing and custom distribution algorithms)
 - Command pipelining on single and multiple connections (transparent)
 - Lazy connections (connections to Redis instances are only established just in time)
 - Flexible system to define and register your own set of commands to a client instance
@@ -133,6 +131,9 @@ variable set to E_ALL.
 
 [Daniele Alessandri](mailto:suppakilla@gmail.com)
 
+## Contributors ##
+
+[Lorenzo Castelli](http://github.com/lcastelli)
 
 ## License ##
 

+ 474 - 138
lib/Predis.php

@@ -2,33 +2,48 @@
 namespace Predis;
 
 class PredisException extends \Exception { }
-class ClientException extends PredisException { }
-class ServerException extends PredisException { }
-class MalformedServerResponse extends ServerException { }
+class ClientException extends PredisException { }                   // Client-side errors
+
+class ServerException extends PredisException {                     // Server-side errors
+    public function toResponseError() {
+        return new ResponseError($this->getMessage());
+    }
+}
+
+class CommunicationException extends PredisException {              // Communication errors
+    private $_connection;
+
+    public function __construct(Connection $connection, $message = null, $code = null) {
+        $this->_connection = $connection;
+        parent::__construct($message, $code);
+    }
+
+    public function getConnection() { return $this->_connection; }
+    public function shouldResetConnection() {  return true; }
+}
+
+class MalformedServerResponse extends CommunicationException { }    // Unexpected responses
 
 /* ------------------------------------------------------------------------- */
 
 class Client {
-    private $_connection, $_serverProfile, $_responseReader;
+    private $_options, $_connection, $_serverProfile, $_responseReader;
 
-    public function __construct($parameters = null, RedisServerProfile $serverProfile = null) {
+    public function __construct($parameters = null, $clientOptions = null) {
         $this->_responseReader = new ResponseReader();
-        $this->setProfile($serverProfile ?: RedisServerProfile::getDefault());
+        $this->setupClient($clientOptions ?: new ClientOptions());
         $this->setupConnection($parameters);
     }
 
-    public function __destruct() {
-        $this->_connection->disconnect();
-    }
-
     public static function create(/* arguments */) {
         $argv = func_get_args();
         $argc = func_num_args();
 
-        $serverProfile = null;
+        $options = null;
         $lastArg = $argv[$argc-1];
-        if ($argc > 0 && !is_string($lastArg) && is_subclass_of($lastArg, '\Predis\RedisServerProfile')) {
-            $serverProfile = array_pop($argv);
+        if ($argc > 0 && !is_string($lastArg) && ($lastArg instanceof ClientOptions ||
+            is_subclass_of($lastArg, '\Predis\RedisServerProfile'))) {
+            $options = array_pop($argv);
             $argc--;
         }
 
@@ -36,7 +51,45 @@ class Client {
             throw new ClientException('Missing connection parameters');
         }
 
-        return new Client($argc === 1 ? $argv[0] : $argv, $serverProfile);
+        return new Client($argc === 1 ? $argv[0] : $argv, $options);
+    }
+
+    private static function filterClientOptions($options) {
+        if ($options instanceof ClientOptions) {
+            return $options;
+        }
+        if (is_array($options)) {
+            return new ClientOptions($options);
+        }
+        if ($options instanceof RedisServerProfile) {
+            return new ClientOptions(array(
+                'profile' => $options
+            ));
+        }
+        if (is_string($options)) {
+            return new ClientOptions(array(
+                'profile' => RedisServerProfile::get($options)
+            ));
+        }
+        throw new \InvalidArgumentException("Invalid type for client options");
+    }
+
+    private function setupClient($options) {
+        $this->_options = self::filterClientOptions($options);
+
+        $this->setProfile($this->_options->profile);
+        if ($this->_options->iterable_multibulk === true) {
+            $this->_responseReader->setHandler(
+                ResponseReader::PREFIX_MULTI_BULK, 
+                new ResponseMultiBulkStreamHandler()
+            );
+        }
+        if ($this->_options->throw_on_error === false) {
+            $this->_responseReader->setHandler(
+                ResponseReader::PREFIX_ERROR, 
+                new ResponseErrorSilentHandler()
+            );
+        }
     }
 
     private function setupConnection($parameters) {
@@ -45,7 +98,7 @@ class Client {
         }
 
         if (is_array($parameters) && isset($parameters[0])) {
-            $cluster = new ConnectionCluster();
+            $cluster = new ConnectionCluster($this->_options->key_distribution);
             foreach ($parameters as $shardParams) {
                 $cluster->add($this->createConnection($shardParams));
             }
@@ -123,34 +176,26 @@ class Client {
 
     public function __call($method, $arguments) {
         $command = $this->_serverProfile->createCommand($method, $arguments);
-        return $this->executeCommand($command);
+        return $this->_connection->executeCommand($command);
     }
 
     public function createCommand($method, $arguments = array()) {
         return $this->_serverProfile->createCommand($method, $arguments);
     }
 
-    private function executeCommandInternal(IConnection $connection, Command $command) {
-        $connection->writeCommand($command);
-        if ($command->closesConnection()) {
-            return $connection->disconnect();
-        }
-        return $connection->readResponse($command);
-    }
-
     public function executeCommand(Command $command) {
-        return self::executeCommandInternal($this->_connection, $command);
+        return $this->_connection->executeCommand($command);
     }
 
     public function executeCommandOnShards(Command $command) {
         $replies = array();
         if ($this->_connection instanceof \Predis\ConnectionCluster) {
             foreach($this->_connection as $connection) {
-                $replies[] = self::executeCommandInternal($connection, $command);
+                $replies[] = $connection->executeCommand($command);
             }
         }
         else {
-            $replies[] = self::executeCommandInternal($this->_connection, $command);
+            $replies[] = $this->_connection->executeCommand($command);
         }
         return $replies;
     }
@@ -175,6 +220,114 @@ class Client {
 
 /* ------------------------------------------------------------------------- */
 
+interface IClientOptionsHandler {
+    public function validate($option, $value);
+    public function getDefault();
+}
+
+class ClientOptionsProfile implements IClientOptionsHandler {
+    public function validate($option, $value) {
+        if ($value instanceof \Predis\RedisServerProfile) {
+            return $value;
+        }
+        if (is_string($value)) {
+            return \Predis\RedisServerProfile::get($value);
+        }
+        throw new \InvalidArgumentException("Invalid value for option $option");
+    }
+
+    public function getDefault() {
+        return \Predis\RedisServerProfile::getDefault();
+    }
+}
+
+class ClientOptionsKeyDistribution implements IClientOptionsHandler {
+    public function validate($option, $value) {
+        if ($value instanceof \Predis\Utilities\IRing) {
+            return $value;
+        }
+        if (is_string($value)) {
+            $valueReflection = new \ReflectionClass($value);
+            if ($valueReflection->isSubclassOf('\Predis\Utilities\IRing')) {
+                return new $value;
+            }
+        }
+        throw new \InvalidArgumentException("Invalid value for option $option");
+    }
+
+    public function getDefault() {
+        return new \Predis\Utilities\HashRing();
+    }
+}
+
+class ClientOptionsIterableMultiBulk implements IClientOptionsHandler {
+    public function validate($option, $value) {
+        return (bool) $value;
+    }
+
+    public function getDefault() {
+        return false;
+    }
+}
+
+class ClientOptionsThrowOnError implements IClientOptionsHandler {
+    public function validate($option, $value) {
+        return (bool) $value;
+    }
+
+    public function getDefault() {
+        return true;
+    }
+}
+
+class ClientOptions {
+    private static $_optionsHandlers;
+    private $_options;
+
+    public function __construct($options = null) {
+        self::initializeOptionsHandlers();
+        $this->initializeOptions($options ?: array());
+    }
+
+    private static function initializeOptionsHandlers() {
+        if (!isset(self::$_optionsHandlers)) {
+            self::$_optionsHandlers = self::getOptionsHandlers();
+        }
+    }
+
+    private static function getOptionsHandlers() {
+        return array(
+            'profile'    => new \Predis\ClientOptionsProfile(),
+            'key_distribution' => new \Predis\ClientOptionsKeyDistribution(),
+            'iterable_multibulk' => new \Predis\ClientOptionsIterableMultiBulk(),
+            'throw_on_error' => new \Predis\ClientOptionsThrowOnError(),
+        );
+    }
+
+    private function initializeOptions($options) {
+        foreach ($options as $option => $value) {
+            if (isset(self::$_optionsHandlers[$option])) {
+                $handler = self::$_optionsHandlers[$option];
+                $this->_options[$option] = $handler->validate($option, $value);
+            }
+        }
+    }
+
+    public function __get($option) {
+        if (!isset($this->_options[$option])) {
+            $defaultValue = self::$_optionsHandlers[$option]->getDefault();
+            $this->_options[$option] = $defaultValue;
+        }
+        return $this->_options[$option];
+    }
+
+    public function __isset($option) {
+        return isset(self::$_optionsHandlers[$option]);
+    }
+}
+
+/* ------------------------------------------------------------------------- */
+
 abstract class Command {
     private $_arguments, $_hash;
 
@@ -186,7 +339,7 @@ abstract class Command {
         return true;
     }
 
-    public function getHash() {
+    public function getHash(Utilities\IRing $ring) {
         if (isset($this->_hash)) {
             return $this->_hash;
         }
@@ -200,7 +353,7 @@ abstract class Command {
                     $key = substr($key, ++$start, $end - $start);
                 }
 
-                $this->_hash = crc32($key);
+                $this->_hash = $ring->generateKey($key);
                 return $this->_hash;
             }
         }
@@ -217,10 +370,12 @@ abstract class Command {
 
     public function setArguments(/* arguments */) {
         $this->_arguments = $this->filterArguments(func_get_args());
+        $this->_hash = null;
     }
 
     public function setArgumentsArray(Array $arguments) {
         $this->_arguments = $this->filterArguments($arguments);
+        $this->_hash = null;
     }
 
     protected function getArguments() {
@@ -319,33 +474,40 @@ class ResponseErrorSilentHandler implements IResponseHandler {
 
 class ResponseBulkHandler implements IResponseHandler {
     public function handle(Connection $connection, $dataLength) {
-        $socket = $connection->getSocket();
-
         if (!is_numeric($dataLength)) {
-            throw new ClientException("Cannot parse '$dataLength' as data length");
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                $connection, "Cannot parse '$dataLength' as data length"
+            ));
         }
 
         if ($dataLength > 0) {
-            $value = stream_get_contents($socket, $dataLength);
-            if ($value === false) {
-                throw new ClientException('An error has occurred while reading from the network stream');
-            }
-            fread($socket, 2);
+            $value = $connection->readBytes($dataLength);
+            self::discardNewLine($connection);
             return $value;
         }
         else if ($dataLength == 0) {
-            fread($socket, 2);
+            self::discardNewLine($connection);
             return '';
         }
 
         return null;
     }
+
+    private static function discardNewLine(Connection $connection) {
+        if ($connection->readBytes(2) !== ResponseReader::NEWLINE) {
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                $connection, 'Did not receive a new-line at the end of a bulk response'
+            ));
+        }
+    }
 }
 
 class ResponseMultiBulkHandler implements IResponseHandler {
     public function handle(Connection $connection, $rawLength) {
         if (!is_numeric($rawLength)) {
-            throw new ClientException("Cannot parse '$rawLength' as data length");
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                $connection, "Cannot parse '$rawLength' as data length"
+            ));
         }
 
         $listLength = (int) $rawLength;
@@ -368,7 +530,9 @@ class ResponseMultiBulkHandler implements IResponseHandler {
 class ResponseMultiBulkStreamHandler implements IResponseHandler {
     public function handle(Connection $connection, $rawLength) {
         if (!is_numeric($rawLength)) {
-            throw new ClientException("Cannot parse '$rawLength' as data length");
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                $connection, "Cannot parse '$rawLength' as data length"
+            ));
         }
         return new Utilities\MultiBulkResponseIterator($connection, (int)$rawLength);
     }
@@ -381,7 +545,9 @@ class ResponseIntegerHandler implements IResponseHandler {
         }
         else {
             if ($number !== ResponseReader::NULL) {
-                throw new ClientException("Cannot parse '$number' as numeric response");
+                Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                    $connection, "Cannot parse '$number' as numeric response"
+                ));
             }
             return null;
         }
@@ -417,57 +583,31 @@ class ResponseReader {
         );
     }
 
-    private function setHandler($prefix, IResponseHandler $handler) {
+    public function setHandler($prefix, IResponseHandler $handler) {
         $this->_prefixHandlers[$prefix] = $handler;
     }
 
-    public function setOption($option, $value) {
-        switch ($option) {
-            case 'iterable_multibulk_replies':
-            case 'iterableMultiBulkReplies':
-                $this->setHandler(self::PREFIX_MULTI_BULK, $value == true 
-                    ? new ResponseMultiBulkStreamHandler()
-                    : new ResponseMultiBulkHandler()
-                );
-                break;
-            case 'errorThrowException':
-            case 'error_throw_exception':
-                $this->setHandler(self::PREFIX_ERROR, $value == true 
-                    ? new ResponseErrorHandler()
-                    : new ResponseErrorSilentHandler()
-                );
-                break;
-            default:
-                throw new \InvalidArgumentException("Unknown option: $option");
-        }
-    }
-
-    public function getOption($option) {
-        switch ($option) {
-            case 'iterable_multibulk_replies':
-            case 'iterableMultiBulkReplies':
-                return $this->_prefixHandlers[self::PREFIX_MULTI_BULK] 
-                    instanceof ResponseMultiBulkStreamHandler;
-            case 'errorThrowException':
-            case 'error_throw_exception':
-                return $this->_prefixHandlers[self::PREFIX_ERROR] 
-                    instanceof ResponseErrorHandler;
-            default:
-                throw new \InvalidArgumentException("Unknown option: $option");
+    public function getHandler($prefix) {
+        if (isset($this->_prefixHandlers[$prefix])) {
+            return $this->_prefixHandlers[$prefix];
         }
     }
 
     public function read(Connection $connection) {
-        $header  = fgets($connection->getSocket());
-        if ($header === false) {
-           throw new ClientException('An error has occurred while reading from the network stream');
+        $header = $connection->readLine();
+        if ($header === '') {
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                $connection, 'Unexpected empty header'
+            ));
         }
 
         $prefix  = $header[0];
-        $payload = substr($header, 1, -2);
+        $payload = strlen($header) > 1 ? substr($header, 1) : '';
 
         if (!isset($this->_prefixHandlers[$prefix])) {
-            throw new MalformedServerResponse("Unknown prefix '$prefix'");
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                $connection, "Unknown prefix '$prefix'"
+            ));
         }
 
         $handler = $this->_prefixHandlers[$prefix];
@@ -623,7 +763,7 @@ class MultiExecBlock {
             return $this;
         }
         else {
-            throw new ClientException('The server did not respond with a QUEUED status reply');
+            $this->malformedServerResponse('The server did not respond with a QUEUED status reply');
         }
     }
 
@@ -659,8 +799,7 @@ class MultiExecBlock {
             $sizeofReplies = count($execReply);
 
             if ($sizeofReplies !== count($commands)) {
-                // TODO: think of a better exception message
-                throw new ClientException("Out-of-sync");
+                $this->malformedServerResponse('Unexpected number of responses for a MultiExecBlock');
             }
 
             for ($i = 0; $i < $sizeofReplies; $i++) {
@@ -681,6 +820,15 @@ class MultiExecBlock {
 
         return $returnValues;
     }
+
+    private function malformedServerResponse($message) {
+        // NOTE: a MULTI/EXEC block cannot be initialized on a clustered 
+        //       connection, which means that Predis\Client::getConnection 
+        //       will always return an instance of Predis\Connection.
+        Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+            $this->_redisClient->getConnection(), $message
+        ));
+    }
 }
 
 /* ------------------------------------------------------------------------- */
@@ -716,6 +864,12 @@ class ConnectionParameters {
                     case 'password':
                         $details['password'] = $v;
                         break;
+                    case 'connection_async':
+                        $details['connection_async'] = $v;
+                        break;
+                    case 'connection_persistent':
+                        $details['connection_persistent'] = $v;
+                        break;
                     case 'connection_timeout':
                         $details['connection_timeout'] = $v;
                         break;
@@ -725,6 +879,9 @@ class ConnectionParameters {
                     case 'alias':
                         $details['alias'] = $v;
                         break;
+                    case 'weight':
+                        $details['weight'] = $v;
+                        break;
                 }
             }
             $parsed = array_merge($parsed, $details);
@@ -743,9 +900,12 @@ class ConnectionParameters {
             'port' => (int) self::getParamOrDefault($parameters, 'port', self::DEFAULT_PORT), 
             'database' => self::getParamOrDefault($parameters, 'database'), 
             'password' => self::getParamOrDefault($parameters, 'password'), 
+            'connection_async'   => self::getParamOrDefault($parameters, 'connection_async', false), 
+            'connection_persistent' => self::getParamOrDefault($parameters, 'connection_persistent', false), 
             'connection_timeout' => self::getParamOrDefault($parameters, 'connection_timeout', self::DEFAULT_TIMEOUT), 
             'read_write_timeout' => self::getParamOrDefault($parameters, 'read_write_timeout'), 
-            'alias' => self::getParamOrDefault($parameters, 'alias'), 
+            'alias'  => self::getParamOrDefault($parameters, 'alias'), 
+            'weight' => self::getParamOrDefault($parameters, 'weight'), 
         );
     }
 
@@ -764,6 +924,7 @@ interface IConnection {
     public function isConnected();
     public function writeCommand(Command $command);
     public function readResponse(Command $command);
+    public function executeCommand(Command $command);
 }
 
 class Connection implements IConnection {
@@ -776,7 +937,9 @@ class Connection implements IConnection {
     }
 
     public function __destruct() {
-        $this->disconnect();
+        if (!$this->_params->connection_persistent) {
+            $this->disconnect();
+        }
     }
 
     public function isConnected() {
@@ -788,13 +951,25 @@ class Connection implements IConnection {
             throw new ClientException('Connection already estabilished');
         }
         $uri = sprintf('tcp://%s:%d/', $this->_params->host, $this->_params->port);
-        $this->_socket = @stream_socket_client($uri, $errno, $errstr, $this->_params->connection_timeout);
+        $connectFlags = STREAM_CLIENT_CONNECT;
+        if ($this->_params->connection_async) {
+            $connectFlags |= STREAM_CLIENT_ASYNC_CONNECT;
+        }
+        if ($this->_params->connection_persistent) {
+            $connectFlags |= STREAM_CLIENT_PERSISTENT;
+        }
+        $this->_socket = @stream_socket_client(
+            $uri, $errno, $errstr, $this->_params->connection_timeout, $connectFlags
+        );
+
         if (!$this->_socket) {
-            throw new ClientException(trim($errstr), $errno);
+            $this->onCommunicationException(trim($errstr), $errno);
         }
 
         if (isset($this->_params->read_write_timeout)) {
-            stream_set_timeout($this->_socket, $this->_params->read_write_timeout);
+            $timeoutSeconds  = floor($this->_params->read_write_timeout);
+            $timeoutUSeconds = ($this->_params->read_write_timeout - $timeoutSeconds) * 1000000;
+            stream_set_timeout($this->_socket, $timeoutSeconds, $timeoutUSeconds);
         }
 
         if (count($this->_initCmds) > 0){
@@ -821,14 +996,14 @@ class Connection implements IConnection {
         }
     }
 
+    private function onCommunicationException($message, $code = null) {
+        Utilities\Shared::onCommunicationException(
+            new CommunicationException($this, $message, $code)
+        );
+    }
+
     public function writeCommand(Command $command) {
-        $written = fwrite($this->getSocket(), $command());
-        if ($written === false){
-           throw new ClientException(sprintf(
-               'An error has occurred while writing command %s on the network stream',
-               $command->getCommandId()
-           ));
-        }
+        $this->writeBytes($command());
     }
 
     public function readResponse(Command $command) {
@@ -837,18 +1012,69 @@ class Connection implements IConnection {
         return $skipparse ? $response : $command->parseResponse($response);
     }
 
-    public function rawCommand($rawCommandData, $closesConnection = false) {
-        $socket = $this->getSocket();
-        $written = fwrite($socket, $rawCommandData);
-        if ($written === false){
-           throw new ClientException('An error has occurred while writing a raw command on the network stream');
+    public function executeCommand(Command $command) {
+        $this->writeCommand($command);
+        if ($command->closesConnection()) {
+            return $this->disconnect();
         }
+        return $this->readResponse($command);
+    }
+
+    public function rawCommand($rawCommandData, $closesConnection = false) {
+        $this->writeBytes($rawCommandData);
         if ($closesConnection) {
+            $this->disconnect();
             return;
         }
         return $this->_reader->read($this);
     }
 
+    public function writeBytes($value) {
+        $socket = $this->getSocket();
+        while (($length = strlen($value)) > 0) {
+            $written = fwrite($socket, $value);
+            if ($length === $written) {
+                return true;
+            }
+            if ($written === false || $written === 0) {
+                $this->onCommunicationException('Error while writing bytes to the server');
+            }
+            $value = substr($value, $written);
+        }
+        return true;
+    }
+
+    public function readBytes($length) {
+        if ($length == 0) {
+            throw new \InvalidArgumentException('Length parameter must be greater than 0');
+        }
+        $socket = $this->getSocket();
+        $value  = '';
+        do {
+            $chunk = fread($socket, $length);
+            if ($chunk === false || $chunk === '') {
+                $this->onCommunicationException('Error while reading bytes from the server');
+            }
+            $value .= $chunk;
+        }
+        while (($length -= strlen($chunk)) > 0);
+        return $value;
+    }
+
+    public function readLine() {
+        $socket = $this->getSocket();
+        $value  = '';
+        do {
+            $chunk = fgets($socket);
+            if ($chunk === false || strlen($chunk) == 0) {
+                $this->onCommunicationException('Error while reading line from the server');
+            }
+            $value .= $chunk;
+        }
+        while (substr($value, -2) !== ResponseReader::NEWLINE);
+        return substr($value, 0, -2);
+    }
+
     public function getSocket() {
         if (!$this->isConnected()) {
             $this->connect();
@@ -860,8 +1086,8 @@ class Connection implements IConnection {
         return $this->_reader;
     }
 
-    public function getAlias() {
-        return $this->_params->alias;
+    public function getParameters() {
+        return $this->_params;
     }
 
     public function __toString() {
@@ -872,13 +1098,9 @@ class Connection implements IConnection {
 class ConnectionCluster implements IConnection, \IteratorAggregate {
     private $_pool, $_ring;
 
-    public function __construct() {
+    public function __construct(Utilities\IRing $ring = null) {
         $this->_pool = array();
-        $this->_ring = new Utilities\HashRing();
-    }
-
-    public function __destruct() {
-        $this->disconnect();
+        $this->_ring = $ring ?: new Utilities\HashRing();
     }
 
     public function isConnected() {
@@ -903,23 +1125,23 @@ class ConnectionCluster implements IConnection, \IteratorAggregate {
     }
 
     public function add(Connection $connection) {
-        $connectionAlias = $connection->getAlias();
-        if (isset($connectionAlias)) {
-            $this->_pool[$connectionAlias] = $connection;
+        $parameters = $connection->getParameters();
+        if (isset($parameters->alias)) {
+            $this->_pool[$parameters->alias] = $connection;
         }
         else {
             $this->_pool[] = $connection;
         }
-        $this->_ring->add($connection);
+        $this->_ring->add($connection, $parameters->weight);
     }
 
-    private function getConnection(Command $command) {
+    public function getConnection(Command $command) {
         if ($command->canBeHashed() === false) {
             throw new ClientException(
                 sprintf("Cannot send '%s' commands to a cluster of connections.", $command->getCommandId())
             );
         }
-        return $this->_ring->get($command->getHash());
+        return $this->_ring->get($command->getHash($this->_ring));
     }
 
     public function getConnectionById($id = null) {
@@ -937,6 +1159,12 @@ class ConnectionCluster implements IConnection, \IteratorAggregate {
     public function readResponse(Command $command) {
         return $this->getConnection($command)->readResponse($command);
     }
+
+    public function executeCommand(Command $command) {
+        $connection = $this->getConnection($command);
+        $connection->writeCommand($command);
+        return $connection->readResponse($command);
+    }
 }
 
 /* ------------------------------------------------------------------------- */
@@ -1266,52 +1494,121 @@ class RedisServer_vNext extends RedisServer_v1_2 {
 
 namespace Predis\Utilities;
 
-class HashRing {
+class Shared {
+    public static function onCommunicationException(\Predis\CommunicationException $exception) {
+        if ($exception->shouldResetConnection()) {
+            $connection = $exception->getConnection();
+            if ($connection->isConnected()) {
+                $connection->disconnect();
+            }
+        }
+        throw $exception;
+    }
+}
+
+interface IRing {
+    public function add($node, $weight = null);
+    public function remove($node);
+    public function get($key);
+    public function generateKey($value);
+}
+
+class HashRing implements IRing {
     const DEFAULT_REPLICAS = 128;
-    private $_ring, $_ringKeys, $_replicas;
+    const DEFAULT_WEIGHT   = 100;
+    private $_nodes, $_ring, $_ringKeys, $_ringKeysCount, $_replicas;
 
     public function __construct($replicas = self::DEFAULT_REPLICAS) {
         $this->_replicas = $replicas;
-        $this->_ring     = array();
-        $this->_ringKeys = array();
+        $this->_nodes    = array();
     }
 
-    public function add($node) {
-        $nodeHash = (string) $node;
-        $replicas = $this->_replicas;
-        for ($i = 0; $i < $replicas; $i++) {
-            $key = crc32($nodeHash . ':' . $i);
-            $this->_ring[$key] = $node;
+    public function add($node, $weight = null) {
+        // NOTE: in case of collisions in the hashes of the nodes, the node added
+        //       last wins, thus the order in which nodes are added is significant.
+        $this->_nodes[] = array('object' => $node, 'weight' => (int) $weight ?: $this::DEFAULT_WEIGHT);
+        $this->reset();
+    }
+
+    public function remove($node) {
+        // NOTE: a node is removed by resetting the ring so that it's recreated from 
+        //       scratch, in order to reassign possible hashes with collisions to the 
+        //       right node according to the order in which they were added in the 
+        //       first place.
+        for ($i = 0; $i < count($this->_nodes); ++$i) {
+            if ($this->_nodes[$i]['object'] === $node) {
+                array_splice($this->_nodes, $i, 1);
+                $this->reset();
+                break;
+            }
+        }
+    }
+
+    private function reset() {
+        unset($this->_ring);
+        unset($this->_ringKeys);
+        unset($this->_ringKeysCount);
+    }
+
+    private function isInitialized() {
+        return isset($this->_ringKeys);
+    }
+
+    private function computeTotalWeight() {
+        // TODO: array_reduce + lambda for PHP 5.3
+        $totalWeight = 0;
+        foreach ($this->_nodes as $node) {
+            $totalWeight += $node['weight'];
+        }
+        return $totalWeight;
+    }
+
+    private function initialize() {
+        if ($this->isInitialized()) {
+            return;
+        }
+        if (count($this->_nodes) === 0) {
+            throw new \LogicException('Cannot initialize empty hashring');
+        }
+
+        $this->_ring = array();
+        $totalWeight = $this->computeTotalWeight();
+        $nodesCount  = count($this->_nodes);
+        foreach ($this->_nodes as $node) {
+            $weightRatio = $node['weight'] / $totalWeight;
+            $this->addNodeToRing($this->_ring, $node, $nodesCount, $this->_replicas, $weightRatio);
         }
         ksort($this->_ring, SORT_NUMERIC);
         $this->_ringKeys = array_keys($this->_ring);
+        $this->_ringKeysCount = count($this->_ringKeys);
     }
 
-    public function remove($node) {
-        $nodeHash = (string) $node;
-        $replicas = $this->_replicas;
+    protected function addNodeToRing(&$ring, $node, $totalNodes, $replicas, $weightRatio) {
+        $nodeObject = $node['object'];
+        $nodeHash = (string) $nodeObject;
+        $replicas = (int) round($weightRatio * $totalNodes * $replicas);
         for ($i = 0; $i < $replicas; $i++) {
-            $key = crc32($nodeHash . ':' . $i);
-            unset($this->_ring[$key]);
-            $this->_ringKeys = array_filter($this->_ringKeys, function($rk) use($key) {
-                return $rk !== $key;
-            });
+            $key = crc32("$nodeHash:$i");
+            $ring[$key] = $nodeObject;
         }
     }
 
+    public function generateKey($value) {
+        return crc32($value);
+    }
+
     public function get($key) {
         return $this->_ring[$this->getNodeKey($key)];
     }
 
     private function getNodeKey($key) {
+        $this->initialize();
         $ringKeys = $this->_ringKeys;
-
-        $upper = count($ringKeys) - 1;
+        $upper = $this->_ringKeysCount - 1;
         $lower = 0;
-        $index = 0;
 
         while ($lower <= $upper) {
-            $index = ($lower + $upper) / 2;
+            $index = ($lower + $upper) >> 1;
             $item  = $ringKeys[$index];
             if ($item > $key) {
                 $upper = $index - 1;
@@ -1320,10 +1617,49 @@ class HashRing {
                 $lower = $index + 1;
             }
             else {
-                return $index;
+                return $item;
             }
         }
-        return $ringKeys[$upper];
+        return $ringKeys[$this->wrapAroundStrategy($upper, $lower, $this->_ringKeysCount)];
+    }
+
+    protected function wrapAroundStrategy($upper, $lower, $ringKeysCount) {
+        // NOTE: binary search for the last item in _ringkeys with a value 
+        //       less or equal to the key. If no such item exists, return the 
+        //       last item.
+        return $upper >= 0 ? $upper : $ringKeysCount - 1;
+    }
+}
+
+class KetamaPureRing extends HashRing {
+    const DEFAULT_REPLICAS = 160;
+
+    public function __construct() {
+        parent::__construct($this::DEFAULT_REPLICAS);
+    }
+
+    protected function addNodeToRing(&$ring, $node, $totalNodes, $replicas, $weightRatio) {
+        $nodeObject = $node['object'];
+        $nodeHash = (string) $nodeObject;
+        $replicas = (int) floor($weightRatio * $totalNodes * ($replicas / 4));
+        for ($i = 0; $i < $replicas; $i++) {
+            $unpackedDigest = unpack('V4', md5("$nodeHash-$i", true));
+            foreach ($unpackedDigest as $key) {
+                $ring[$key] = $nodeObject;
+            }
+        }
+    }
+
+    public function generateKey($value) {
+        $hash = unpack('V', md5($value, true));
+        return $hash[1];
+    }
+
+    protected function wrapAroundStrategy($upper, $lower, $ringKeysCount) {
+        // NOTE: binary search for the first item in _ringkeys with a value 
+        //       greater or equal to the key. If no such item exists, return the 
+        //       first item.
+        return $lower < $ringKeysCount ? $lower : 0;
     }
 }
 

+ 10 - 8
test/RedisCommandsTest.php

@@ -201,7 +201,7 @@ class RedisCommandTestSuite extends PHPUnit_Framework_TestCase {
         $this->redis->setAdd('fooSet', 'bar');
         $this->assertEquals('set', $this->redis->type('fooSet'));
 
-        $this->redis->zsetAdd('fooZSet', 'bar', 0);
+        $this->redis->zsetAdd('fooZSet', 0, 'bar');
         $this->assertEquals('zset', $this->redis->type('fooZSet'));
     }
 
@@ -412,7 +412,10 @@ class RedisCommandTestSuite extends PHPUnit_Framework_TestCase {
             $this->redis->listRange('numbers', -100, 100)
         );
 
-        $this->assertNull($this->redis->listRange('keyDoesNotExist', 0, 1));
+        $this->assertEquals(
+            array(), 
+            $this->redis->listRange('keyDoesNotExist', 0, 1)
+        );
 
         // should throw an exception when trying to do a LRANGE on non-list types
         RC::testForServerException($this, RC::EXCEPTION_WRONG_TYPE, function($test) {
@@ -580,7 +583,7 @@ class RedisCommandTestSuite extends PHPUnit_Framework_TestCase {
         $this->assertEquals(0, $this->redis->listPopLastPushHead('numbers', 'temporary'));
         $this->assertEquals(0, $this->redis->listLength('numbers'));
         $this->assertEquals(3, $this->redis->listLength('temporary'));
-        $this->assertNull($this->redis->listRange('numbers', 0, -1));
+        $this->assertEquals(array(), $this->redis->listRange('numbers', 0, -1));
         $this->assertEquals($numbers, $this->redis->listRange('temporary', 0, -1));
 
         $numbers = RC::pushTailAndReturn($this->redis, 'numbers', array(0, 1, 2));
@@ -589,7 +592,7 @@ class RedisCommandTestSuite extends PHPUnit_Framework_TestCase {
         $this->redis->listPopLastPushHead('numbers', 'numbers');
         $this->assertEquals($numbers, $this->redis->listRange('numbers', 0, -1));
 
-        $this->assertEquals(null, $this->redis->listPopLastPushHead('listDoesNotExist1', 'listDoesNotExist2'));
+        $this->assertNull($this->redis->listPopLastPushHead('listDoesNotExist1', 'listDoesNotExist2'));
 
         RC::testForServerException($this, RC::EXCEPTION_WRONG_TYPE, function($test) {
             $test->redis->set('foo', 'bar');
@@ -757,7 +760,7 @@ class RedisCommandTestSuite extends PHPUnit_Framework_TestCase {
 
         $this->assertTrue(RC::sameValuesInArrays($set, $this->redis->setMembers('set')));
 
-        $this->assertNull($this->redis->setMembers('setDoesNotExist'));
+        $this->assertEquals(array(), $this->redis->setMembers('setDoesNotExist'));
 
         // wrong type
         $this->redis->set('foo', 'bar');
@@ -780,8 +783,7 @@ class RedisCommandTestSuite extends PHPUnit_Framework_TestCase {
             $this->redis->setIntersection('setA', 'setB')
         ));
 
-        // TODO: should nil really be considered an empty set?
-        $this->assertNull($this->redis->setIntersection('setA', 'setDoesNotExist'));
+        $this->assertEquals(array(), $this->redis->setIntersection('setA', 'setDoesNotExist'));
 
         // wrong type
         $this->redis->set('foo', 'bar');
@@ -811,7 +813,7 @@ class RedisCommandTestSuite extends PHPUnit_Framework_TestCase {
         ));
 
         $this->redis->delete('setC');
-        $this->assertNull($this->redis->setIntersection('setC', 'setDoesNotExist'));
+        $this->assertEquals(array(), $this->redis->setIntersection('setC', 'setDoesNotExist'));
         $this->assertFalse($this->redis->exists('setC'));
 
         // existing keys are replaced by SINTERSTORE