Просмотр исходного кода

Backported changes from the mainline library to the PHP 5.2 branch (up to commit f073bc8)

Daniele Alessandri 14 лет назад
Родитель
Сommit
7d181160a3
2 измененных файлов с 252 добавлено и 42 удалено
  1. 28 0
      CHANGELOG
  2. 224 42
      lib/Predis.php

+ 28 - 0
CHANGELOG

@@ -1,3 +1,31 @@
+v0.6.1 (2010-xx-xx)
+  * New commands available in the Redis v2.2 profile (dev):
+      - Misc.  : WATCH, UNWATCH
+
+  * Minor internal improvements and clean ups.
+
+  * The constructor of Predis_Client::__construct now accepts also instances
+    of Predis_ConnectionParameters.
+
+  * Predis_MultiExecBlock and Predis_PubSubContext will throw an exception
+    when trying to create their instances using a profile that does not
+    support the required Redis commands or when the client is connected to
+    a cluster of connections.
+
+  * Various improvements to Predis_MultiExecBlock:
+      - fixes and more consistent behaviour across various usage cases.
+      - support for WATCH and UNWATCH when using the current development
+        profile (Redis v2.2) and aborted transactions.
+
+  * New method signature for Predis_Client::multiExec(). Now it is able to
+    accept an array of options for the underlying Predis_MultiExecBlock, but
+    it is still backwards compatible with previous releases of Predis.
+
+  * New method signature for Predis_Client::pipeline(). Now it is able to
+    accept an array of options for the underlying Predis_CommandPipeline,
+    but it is still backwards compatible with previous releases of Predis.
+    Predis_Client::pipelineSafe() is to be considered obsolete.
+
 v0.6.0 (2010-05-24)
   * Switched to the new multi-bulk request protocol for all of the commands 
     in the Redis 1.2 and Redis 2.0 profiles. Inline and bulk requests are now 

+ 224 - 42
lib/Predis.php

@@ -4,6 +4,9 @@ class PredisException extends Exception { }
 // Client-side errors
 class Predis_ClientException extends PredisException { }
 
+// Aborted multi/exec
+class Predis_AbortedMultiExec extends PredisException { }
+
 // Server-side errors
 class Predis_ServerException extends PredisException {
     public function toResponseError() {
@@ -113,7 +116,9 @@ class Predis_Client {
     }
 
     private function createConnection($parameters) {
-        $params     = new Predis_ConnectionParameters($parameters);
+        $params     = $parameters instanceof Predis_ConnectionParameters
+                          ? $parameters
+                          : new Predis_ConnectionParameters($parameters);
         $connection = new Predis_Connection($params, $this->_responseReader);
 
         if ($params->password !== null) {
@@ -155,7 +160,7 @@ class Predis_Client {
     }
 
     public function getClientFor($connectionAlias) {
-        if (!($this->_connection instanceof Predis_ConnectionCluster)) {
+        if (!Predis_Shared_Utils::isCluster($this->_connection)) {
             throw new Predis_ClientException(
                 'This method is supported only when the client is connected to a cluster of connections'
             );
@@ -191,7 +196,7 @@ class Predis_Client {
             return $this->_connection;
         }
         else {
-            return $this->_connection instanceof Predis_ConnectionCluster 
+            return Predis_Shared_Utils::isCluster($this->_connection) 
                 ? $this->_connection->getConnectionById($id)
                 : $this->_connection;
         }
@@ -212,7 +217,7 @@ class Predis_Client {
 
     public function executeCommandOnShards(Predis_Command $command) {
         $replies = array();
-        if ($this->_connection instanceof Predis_ConnectionCluster) {
+        if (Predis_Shared_Utils::isCluster($this->_connection)) {
             foreach($this->_connection as $connection) {
                 $replies[] = $connection->executeCommand($command);
             }
@@ -224,22 +229,50 @@ class Predis_Client {
     }
 
     public function rawCommand($rawCommandData, $closesConnection = false) {
-        if ($this->_connection instanceof Predis_ConnectionCluster) {
+        if (Predis_Shared_Utils::isCluster($this->_connection)) {
             throw new Predis_ClientException('Cannot send raw commands when connected to a cluster of Redis servers');
         }
         return $this->_connection->rawCommand($rawCommandData, $closesConnection);
     }
 
-    public function pipeline($pipelineBlock = null) {
-        return $this->pipelineExecute(new Predis_CommandPipeline($this), $pipelineBlock);
+    public function pipeline(/* arguments */) {
+        $argv = func_get_args();
+        $argc = func_num_args();
+
+        if ($argc === 0) {
+            return $this->initPipeline();
+        }
+        else if ($argc === 1) {
+            list($arg0) = $argv;
+            return is_array($arg0) ? $this->initPipeline($arg0) : $this->initPipeline(null, $arg0);
+        }
+        else if ($argc === 2) {
+            list($arg0, $arg1) = $argv;
+            return $this->initPipeline($arg0, $arg1);
+        }
     }
 
     public function pipelineSafe($pipelineBlock = null) {
-        $connection = $this->getConnection();
-        $pipeline   = new Predis_CommandPipeline($this, $connection instanceof Predis_Connection
-            ? new Predis_Pipeline_SafeExecutor($connection)
-            : new Predis_Pipeline_SafeClusterExecutor($connection)
-        );
+        return $this->initPipeline(array('safe' => true), $pipelineBlock);
+    }
+
+    private function initPipeline(Array $options = null, $pipelineBlock = null) {
+        $pipeline = null;
+        if (isset($options)) {
+            if (isset($options['safe']) && $options['safe'] == true) {
+                $connection = $this->getConnection();
+                $pipeline   = new Predis_CommandPipeline($this, $connection instanceof Predis_Connection
+                    ? new Predis_Pipeline_SafeExecutor($connection)
+                    : new Predis_Pipeline_SafeClusterExecutor($connection)
+                );
+            }
+            else {
+                $pipeline = new Predis_CommandPipeline($this);
+            }
+        }
+        else {
+            $pipeline = new Predis_CommandPipeline($this);
+        }
         return $this->pipelineExecute($pipeline, $pipelineBlock);
     }
 
@@ -247,9 +280,26 @@ class Predis_Client {
         return $block !== null ? $pipeline->execute($block) : $pipeline;
     }
 
-    public function multiExec($multiExecBlock = null) {
-        $multiExec = new Predis_MultiExecBlock($this);
-        return $multiExecBlock !== null ? $multiExec->execute($multiExecBlock) : $multiExec;
+    public function multiExec(/* arguments */) {
+        $argv = func_get_args();
+        $argc = func_num_args();
+
+        if ($argc === 0) {
+            return $this->initMultiExec();
+        }
+        else if ($argc === 1) {
+            list($arg0) = $argv;
+            return is_array($arg0) ? $this->initMultiExec($arg0) : $this->initMultiExec(null, $arg0);
+        }
+        else if ($argc === 2) {
+            list($arg0, $arg1) = $argv;
+            return $this->initMultiExec($arg0, $arg1);
+        }
+    }
+
+    private function initMultiExec(Array $options = null, $transBlock = null) {
+        $multi = isset($options) ? new Predis_MultiExecBlock($this, $options) : new Predis_MultiExecBlock($this);
+        return $transBlock !== null ? $multi->execute($transBlock) : $multi;
     }
 
     public function pubSubContext() {
@@ -433,8 +483,7 @@ abstract class Predis_Command {
         $this->_hash = null;
     }
 
-    protected function getArguments() {
-        // TODO: why getArguments is protected?
+    public function getArguments() {
         return isset($this->_arguments) ? $this->_arguments : array();
     }
 
@@ -577,8 +626,9 @@ class Predis_ResponseMultiBulkHandler implements Predis_IResponseHandler {
         $list = array();
 
         if ($listLength > 0) {
+            $reader = $connection->getResponseReader();
             for ($i = 0; $i < $listLength; $i++) {
-                $list[] = $connection->getResponseReader()->read($connection);
+                $list[] = $reader->read($connection);
             }
         }
 
@@ -770,23 +820,58 @@ class Predis_CommandPipeline {
 }
 
 class Predis_MultiExecBlock {
-    private $_redisClient, $_commands, $_initialized, $_discarded;
+    private $_initialized, $_discarded, $_insideBlock;
+    private $_redisClient, $_options, $_commands;
+    private $_supportsWatch;
 
-    public function __construct(Predis_Client $redisClient) {
+    public function __construct(Predis_Client $redisClient, Array $options = null) {
+        $this->checkCapabilities($redisClient);
         $this->_initialized = false;
         $this->_discarded   = false;
+        $this->_insideBlock = false;
         $this->_redisClient = $redisClient;
+        $this->_options     = isset($options) ? $options : array();
         $this->_commands    = array();
     }
 
+    private function checkCapabilities(Predis_Client $redisClient) {
+        if (Predis_Shared_Utils::isCluster($redisClient->getConnection())) {
+            throw new Predis_ClientException(
+                'Cannot initialize a MULTI/EXEC context over a cluster of connections'
+            );
+        }
+        $profile = $redisClient->getProfile();
+        if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
+            throw new Predis_ClientException(
+                'The current profile does not support MULTI, EXEC and DISCARD commands'
+            );
+        }
+        $this->_supportsWatch = $profile->supportsCommands(array('watch', 'unwatch'));
+    }
+
+    private function isWatchSupported() {
+        if ($this->_supportsWatch === false) {
+            throw new Predis_ClientException(
+                'The current profile does not support WATCH and UNWATCH commands'
+            );
+        }
+    }
+
     private function initialize() {
         if ($this->_initialized === false) {
+            if (isset($this->_options['watch'])) {
+                $this->watch($this->_options['watch']);
+            }
             $this->_redisClient->multi();
             $this->_initialized = true;
             $this->_discarded   = false;
         }
     }
 
+    private function setInsideBlock($value) {
+        $this->_insideBlock = $value;
+    }
+
     public function __call($method, $arguments) {
         $this->initialize();
         $command  = $this->_redisClient->createCommand($method, $arguments);
@@ -800,6 +885,34 @@ class Predis_MultiExecBlock {
         }
     }
 
+    public function watch($keys) {
+        $this->isWatchSupported();
+        if ($this->_initialized === true) {
+            throw new Predis_ClientException('WATCH inside MULTI is not allowed');
+        }
+
+        $reply = null;
+        if (is_array($keys)) {
+            $reply = array();
+            foreach ($keys as $key) {
+                $reply = $this->_redisClient->watch($keys);
+            }
+        }
+        else {
+            $reply = $this->_redisClient->watch($keys);
+        }
+        return $reply;
+    }
+
+    public function multi() {
+        $this->initialize();
+    }
+
+    public function unwatch() {
+        $this->isWatchSupported();
+        $this->_redisClient->unwatch();
+    }
+
     public function discard() {
         $this->_redisClient->discard();
         $this->_commands    = array();
@@ -807,7 +920,17 @@ class Predis_MultiExecBlock {
         $this->_discarded   = true;
     }
 
+    public function exec() {
+        return $this->execute();
+    }
+
     public function execute($block = null) {
+        if ($this->_insideBlock === true) {
+            throw new Predis_ClientException(
+                "Cannot invoke 'execute' or 'exec' inside an active client transaction block"
+            );
+        }
+
         if ($block && !is_callable($block)) {
             throw new InvalidArgumentException('Argument passed must be a callable object');
         }
@@ -817,17 +940,21 @@ class Predis_MultiExecBlock {
 
         try {
             if ($block !== null) {
+                $this->setInsideBlock(true);
                 $block($this);
+                $this->setInsideBlock(false);
             }
 
             if ($this->_discarded === true) {
                 return;
             }
 
-            $execReply = (($reply = $this->_redisClient->exec()) instanceof Iterator
-                ? iterator_to_array($reply)
-                : $reply
-            );
+            $reply = $this->_redisClient->exec();
+            if ($reply === null) {
+                throw new Predis_AbortedMultiExec('The current transaction has been aborted by the server');
+            }
+
+            $execReply = $reply instanceof Iterator ? iterator_to_array($reply) : $reply;
             $commands  = &$this->_commands;
             $sizeofReplies = count($execReply);
 
@@ -844,6 +971,7 @@ class Predis_MultiExecBlock {
             }
         }
         catch (Exception $exception) {
+            $this->setInsideBlock(false);
             $blockException = $exception;
         }
 
@@ -863,12 +991,16 @@ class Predis_PubSubContext implements Iterator {
     const MESSAGE      = 'message';
     const PMESSAGE     = 'pmessage';
 
-    private $_redisClient, $_subscriptions, $_isStillValid, $_position;
+    const STATUS_VALID       = 0x0001;
+    const STATUS_SUBSCRIBED  = 0x0010;
+    const STATUS_PSUBSCRIBED = 0x0100;
+
+    private $_redisClient, $_position;
 
     public function __construct(Predis_Client $redisClient) {
-        $this->_redisClient   = $redisClient;
-        $this->_isStillValid  = true;
-        $this->_subscriptions = false;
+        $this->checkCapabilities($redisClient);
+        $this->_redisClient = $redisClient;
+        $this->_statusFlags = self::STATUS_VALID;
     }
 
     public function __destruct() {
@@ -878,10 +1010,29 @@ class Predis_PubSubContext implements Iterator {
         }
     }
 
+    private function checkCapabilities(Predis_Client $redisClient) {
+        if (Predis_Shared_Utils::isCluster($redisClient->getConnection())) {
+            throw new Predis_ClientException(
+                'Cannot initialize a PUB/SUB context over a cluster of connections'
+            );
+        }
+        $profile = $redisClient->getProfile();
+        $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
+        if ($profile->supportsCommands($commands) === false) {
+            throw new Predis_ClientException(
+                'The current profile does not support PUB/SUB related commands'
+            );
+        }
+    }
+
+    private function isFlagSet($value) {
+        return ($this->_statusFlags & $value) === $value;
+    }
+
     public function subscribe(/* arguments */) {
         $args = func_get_args();
         $this->writeCommand(self::SUBSCRIBE, $args);
-        $this->_subscriptions = true;
+        $this->_statusFlags |= self::STATUS_SUBSCRIBED;
     }
 
     public function unsubscribe(/* arguments */) {
@@ -892,7 +1043,7 @@ class Predis_PubSubContext implements Iterator {
     public function psubscribe(/* arguments */) {
         $args = func_get_args();
         $this->writeCommand(self::PSUBSCRIBE, $args);
-        $this->_subscriptions = true;
+        $this->_statusFlags |= self::STATUS_PSUBSCRIBED;
     }
 
     public function punsubscribe(/* arguments */) {
@@ -902,10 +1053,12 @@ class Predis_PubSubContext implements Iterator {
 
     public function closeContext() {
         if ($this->valid()) {
-            // TODO: as an optimization, we should not send both 
-            //       commands if one of them has not been issued.
-            $this->unsubscribe();
-            $this->punsubscribe();
+            if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) {
+                $this->unsubscribe();
+            }
+            if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) {
+                $this->punsubscribe();
+            }
         }
     }
 
@@ -930,19 +1083,20 @@ class Predis_PubSubContext implements Iterator {
     }
 
     public function next() {
-        if ($this->_isStillValid) {
+        if ($this->isFlagSet(self::STATUS_VALID)) {
             $this->_position++;
         }
         return $this->_position;
     }
 
     public function valid() {
-        return $this->_subscriptions && $this->_isStillValid;
+        $subscriptions = self::STATUS_SUBSCRIBED + self::STATUS_PSUBSCRIBED;
+        return $this->isFlagSet(self::STATUS_VALID)
+            && ($this->_statusFlags & $subscriptions) > 0;
     }
 
     private function invalidate() {
-        $this->_isStillValid = false;
-        $this->_subscriptions = false;
+        $this->_statusFlags = 0x0000;
     }
 
     private function getValue() {
@@ -1379,6 +1533,15 @@ abstract class Predis_RedisServerProfile {
         return new $profile();
     }
 
+    public function supportsCommands(Array $commands) {
+        foreach ($commands as $command) {
+            if ($this->supportsCommand($command) === false) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     public function supportsCommand($command) {
         return isset($this->_registeredCommands[$command]);
     }
@@ -1656,6 +1819,13 @@ class Predis_RedisServer_v2_0 extends Predis_RedisServer_v1_2 {
 
 class Predis_RedisServer_vNext extends Predis_RedisServer_v2_0 {
     public function getVersion() { return '2.1'; }
+    public function getSupportedCommands() {
+        return array_merge(parent::getSupportedCommands(), array(
+            /* transactions */
+            'watch'                     => 'Predis_Commands_Watch',
+            'unwatch'                   => 'Predis_Commands_Unwatch',
+        ));
+    }
 }
 
 /* ------------------------------------------------------------------------- */
@@ -1693,7 +1863,6 @@ class Predis_Pipeline_StandardExecutor implements Predis_Pipeline_IPipelineExecu
 
 class Predis_Pipeline_SafeExecutor implements Predis_Pipeline_IPipelineExecutor {
     public function execute(Predis_IConnection $connection, &$commands) {
-        $firstServerException = null;
         $sizeofPipe = count($commands);
         $values = array();
 
@@ -1720,9 +1889,6 @@ class Predis_Pipeline_SafeExecutor implements Predis_Pipeline_IPipelineExecutor
                 $values[] = $exception->toResponseError();
             }
             catch (Predis_CommunicationException $exception) {
-                if ($throwExceptions) {
-                    throw $exception;
-                }
                 $toAdd  = count($commands) - count($values);
                 $values = array_merge($values, array_fill(0, $toAdd, $exception));
                 break;
@@ -1952,6 +2118,10 @@ class Predis_Distribution_KetamaPureRing extends Predis_Distribution_HashRing {
 /* ------------------------------------------------------------------------- */
 
 class Predis_Shared_Utils {
+    public static function isCluster(Predis_IConnection $connection) {
+        return $connection instanceof Predis_ConnectionCluster;
+    }
+
     public static function onCommunicationException(Predis_CommunicationException $exception) {
         if ($exception->shouldResetConnection()) {
             $connection = $exception->getConnection();
@@ -2646,6 +2816,18 @@ class Predis_Commands_Publish extends Predis_MultiBulkCommand {
     public function getCommandId() { return 'PUBLISH'; }
 }
 
+class Predis_Commands_Watch extends Predis_MultiBulkCommand {
+    public function canBeHashed()  { return false; }
+    public function getCommandId() { return 'WATCH'; }
+    public function parseResponse($data) { return (bool) $data; }
+}
+
+class Predis_Commands_Unwatch extends Predis_MultiBulkCommand {
+    public function canBeHashed()  { return false; }
+    public function getCommandId() { return 'UNWATCH'; }
+    public function parseResponse($data) { return (bool) $data; }
+}
+
 /* persistence control commands */
 class Predis_Commands_Save extends Predis_MultiBulkCommand {
     public function canBeHashed()  { return false; }