Selaa lähdekoodia

Backported changes from the mainline library to the PHP 5.2 branch (up to commit 84cb28e)

Daniele Alessandri 15 vuotta sitten
vanhempi
commit
cf758f23d4
5 muutettua tiedostoa jossa 232 lisäystä ja 49 poistoa
  1. 0 2
      TODO
  2. 49 0
      examples/PubSubContext.php
  3. 175 43
      lib/Predis.php
  4. 4 4
      test/PredisClientFeatures.php
  5. 4 0
      test/RedisCommandsTest.php

+ 0 - 2
TODO

@@ -5,7 +5,5 @@
   full battery of tests targeting specific functions of this library is still 
   missing.
 
-* Add a PubSubBlock class.
-
 * Missing tests for new commands: 
     PubSub  : PUBLISH, SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE

+ 49 - 0
examples/PubSubContext.php

@@ -0,0 +1,49 @@
+<?php
+require_once '../lib/Predis.php';
+
+// Create a client and disable r/w timeout on the socket
+$redis  = new Predis_Client('redis://127.0.0.1:6379/?read_write_timeout=-1', 'dev');
+
+// Initialize a new pubsub context
+$pubsub = $redis->pubSubContext();
+
+// Subscribe to your channels
+$pubsub->subscribe('control_channel');
+$pubsub->subscribe('notifications');
+
+// Start processing the pubsup messages. Open a terminal and use redis-cli 
+// to push messages to the channels. Examples:
+//   ./redis-cli PUBLISH notifications "this is a test"
+//   ./redis-cli PUBLISH control_channel quit_loop
+foreach ($pubsub as $message) {
+    switch ($message->kind) {
+        case 'subscribe':
+            echo "Subscribed to {$message->channel}\n";
+            break;
+        case 'message':
+            if ($message->channel == 'control_channel') {
+                if ($message->payload == 'quit_loop') {
+                    echo "Aborting pubsub loop...\n";
+                    $pubsub->unsubscribe();
+                }
+                else {
+                    echo "Received an unregognized command: {$message->payload}.\n";
+                }
+            }
+            else {
+                echo "Received the following message from {$message->channel}:\n",
+                     "  {$message->payload}\n\n";
+            }
+            break;
+    }
+}
+
+// Always unset the pubsub context instance when you are done! The 
+// class destructor will take care of cleanups and prevent protocol 
+// desynchronizations between the client and the server.
+unset($pubsub);
+
+// Say goodbye :-)
+$info = $redis->info();
+print_r("Goodbye from Redis v{$info['redis_version']}!\n");
+?>

+ 175 - 43
lib/Predis.php

@@ -219,6 +219,10 @@ class Predis_Client {
         $multiExec = new Predis_MultiExecBlock($this);
         return $multiExecBlock !== null ? $multiExec->execute($multiExecBlock) : $multiExec;
     }
+
+    public function pubSubContext() {
+        return new Predis_PubSubContext($this);
+    }
 }
 
 /* ------------------------------------------------------------------------- */
@@ -246,12 +250,12 @@ class Predis_ClientOptionsProfile implements Predis_IClientOptionsHandler {
 
 class Predis_ClientOptionsKeyDistribution implements Predis_IClientOptionsHandler {
     public function validate($option, $value) {
-        if ($value instanceof Predis_IDistributionAlgorithm) {
+        if ($value instanceof Predis_Distribution_IDistributionAlgorithm) {
             return $value;
         }
         if (is_string($value)) {
             $valueReflection = new ReflectionClass($value);
-            if ($valueReflection->isSubclassOf('Predis_IDistributionAlgorithm')) {
+            if ($valueReflection->isSubclassOf('Predis_Distribution_IDistributionAlgorithm')) {
                 return new $value;
             }
         }
@@ -259,7 +263,7 @@ class Predis_ClientOptionsKeyDistribution implements Predis_IClientOptionsHandle
     }
 
     public function getDefault() {
-        return new Predis_Utilities_HashRing();
+        return new Predis_Distribution_HashRing();
     }
 }
 
@@ -356,7 +360,7 @@ abstract class Predis_Command {
         return true;
     }
 
-    public function getHash(Predis_IDistributionAlgorithm $distributor) {
+    public function getHash(Predis_Distribution_IDistributionAlgorithm $distributor) {
         if (isset($this->_hash)) {
             return $this->_hash;
         }
@@ -493,7 +497,7 @@ class Predis_ResponseErrorSilentHandler implements Predis_IResponseHandler {
 class Predis_ResponseBulkHandler implements Predis_IResponseHandler {
     public function handle(Predis_Connection $connection, $dataLength) {
         if (!is_numeric($dataLength)) {
-            Predis_Utilities_Shared::onCommunicationException(new Predis_MalformedServerResponse(
+            Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
                 $connection, "Cannot parse '$dataLength' as data length"
             ));
         }
@@ -513,7 +517,7 @@ class Predis_ResponseBulkHandler implements Predis_IResponseHandler {
 
     private static function discardNewLine(Predis_Connection $connection) {
         if ($connection->readBytes(2) !== Predis_Protocol::NEWLINE) {
-            Predis_Utilities_Shared::onCommunicationException(new Predis_MalformedServerResponse(
+            Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
                 $connection, 'Did not receive a new-line at the end of a bulk response'
             ));
         }
@@ -523,7 +527,7 @@ class Predis_ResponseBulkHandler implements Predis_IResponseHandler {
 class Predis_ResponseMultiBulkHandler implements Predis_IResponseHandler {
     public function handle(Predis_Connection $connection, $rawLength) {
         if (!is_numeric($rawLength)) {
-            Predis_Utilities_Shared::onCommunicationException(new Predis_MalformedServerResponse(
+            Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
                 $connection, "Cannot parse '$rawLength' as data length"
             ));
         }
@@ -548,11 +552,11 @@ class Predis_ResponseMultiBulkHandler implements Predis_IResponseHandler {
 class Predis_ResponseMultiBulkStreamHandler implements Predis_IResponseHandler {
     public function handle(Predis_Connection $connection, $rawLength) {
         if (!is_numeric($rawLength)) {
-            Predis_Utilities_Shared::onCommunicationException(new Predis_MalformedServerResponse(
+            Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
                 $connection, "Cannot parse '$rawLength' as data length"
             ));
         }
-        return new Predis_Utilities_MultiBulkResponseIterator($connection, (int)$rawLength);
+        return new Predis_Shared_MultiBulkResponseIterator($connection, (int)$rawLength);
     }
 }
 
@@ -563,7 +567,7 @@ class Predis_ResponseIntegerHandler implements Predis_IResponseHandler {
         }
         else {
             if ($number !== Predis_Protocol::NULL) {
-                Predis_Utilities_Shared::onCommunicationException(new Predis_MalformedServerResponse(
+                Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
                     $connection, "Cannot parse '$number' as numeric response"
                 ));
             }
@@ -602,7 +606,7 @@ class Predis_ResponseReader {
     public function read(Predis_Connection $connection) {
         $header = $connection->readLine();
         if ($header === '') {
-            Predis_Utilities_Shared::onCommunicationException(new Predis_MalformedServerResponse(
+            Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
                 $connection, 'Unexpected empty header'
             ));
         }
@@ -611,7 +615,7 @@ class Predis_ResponseReader {
         $payload = strlen($header) > 1 ? substr($header, 1) : '';
 
         if (!isset($this->_prefixHandlers[$prefix])) {
-            Predis_Utilities_Shared::onCommunicationException(new Predis_MalformedServerResponse(
+            Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
                 $connection, "Unknown prefix '$prefix'"
             ));
         }
@@ -714,7 +718,7 @@ class Predis_CommandPipeline {
 
     public function execute($block = null) {
         if ($block && !is_callable($block)) {
-            throw new RuntimeException('Argument passed must be a callable object');
+            throw new InvalidArgumentException('Argument passed must be a callable object');
         }
 
         // TODO: do not reuse previously executed pipelines
@@ -782,7 +786,7 @@ class Predis_MultiExecBlock {
 
     public function execute($block = null) {
         if ($block && !is_callable($block)) {
-            throw new RuntimeException('Argument passed must be a callable object');
+            throw new InvalidArgumentException('Argument passed must be a callable object');
         }
 
         $blockException = null;
@@ -828,6 +832,130 @@ class Predis_MultiExecBlock {
     }
 }
 
+class Predis_PubSubContext implements Iterator {
+    const SUBSCRIBE    = 'subscribe';
+    const UNSUBSCRIBE  = 'unsubscribe';
+    const PSUBSCRIBE   = 'psubscribe';
+    const PUNSUBSCRIBE = 'punsubscribe';
+    const MESSAGE      = 'message';
+    const PMESSAGE     = 'pmessage';
+
+    private $_redisClient, $_subscriptions, $_isStillValid, $_position;
+
+    public function __construct(Predis_Client $redisClient) {
+        $this->_redisClient   = $redisClient;
+        $this->_isStillValid  = true;
+        $this->_subscriptions = false;
+    }
+
+    public function __destruct() {
+        if ($this->valid()) {
+            $this->_redisClient->unsubscribe();
+            $this->_redisClient->punsubscribe();
+        }
+    }
+
+    public function subscribe(/* arguments */) {
+        $args = func_get_args();
+        $this->writeCommand(self::SUBSCRIBE, $args);
+        $this->_subscriptions = true;
+    }
+
+    public function unsubscribe(/* arguments */) {
+        $args = func_get_args();
+        $this->writeCommand(self::UNSUBSCRIBE, $args);
+    }
+
+    public function psubscribe(/* arguments */) {
+        $args = func_get_args();
+        $this->writeCommand(self::PSUBSCRIBE, $args);
+        $this->_subscriptions = true;
+    }
+
+    public function punsubscribe(/* arguments */) {
+        $args = func_get_args();
+        $this->writeCommand(self::PUNSUBSCRIBE, $args);
+    }
+
+    public function closeContext() {
+        if ($this->valid()) {
+            // TODO: as an optimization, we should not send both 
+            //       commands if one of them has not been issued.
+            $this->unsubscribe();
+            $this->punsubscribe();
+        }
+    }
+
+    private function writeCommand($method, $arguments) {
+        if (count($arguments) === 1 && is_array($arguments[0])) {
+            $arguments = $arguments[0];
+        }
+        $command = $this->_redisClient->createCommand($method, $arguments);
+        $this->_redisClient->getConnection()->writeCommand($command);
+    }
+
+    public function rewind() {
+        // NOOP
+    }
+
+    public function current() {
+        return $this->getValue();
+    }
+
+    public function key() {
+        return $this->_position;
+    }
+
+    public function next() {
+        if ($this->_isStillValid) {
+            $this->_position++;
+        }
+        return $this->_position;
+    }
+
+    public function valid() {
+        return $this->_subscriptions && $this->_isStillValid;
+    }
+
+    private function invalidate() {
+        $this->_isStillValid = false;
+        $this->_subscriptions = false;
+    }
+
+    private function getValue() {
+        $reader     = $this->_redisClient->getResponseReader();
+        $connection = $this->_redisClient->getConnection();
+        $response   = $reader->read($connection);
+
+        switch ($response[0]) {
+            case self::SUBSCRIBE:
+            case self::UNSUBSCRIBE:
+            case self::PSUBSCRIBE:
+            case self::PUNSUBSCRIBE:
+                if ($response[2] === 0) {
+                    $this->invalidate();
+                }
+            case self::MESSAGE:
+                return (object) array(
+                    'kind'    => $response[0],
+                    'channel' => $response[1],
+                    'payload' => $response[2],
+                );
+            case self::PMESSAGE:
+                return (object) array(
+                    'kind'    => $response[0],
+                    'pattern' => $response[1],
+                    'channel' => $response[2],
+                    'payload' => $response[3],
+                );
+            default:
+                throw new Predis_ClientException(
+                    "Received an unknown message type {$response[0]} inside of a pubsub context"
+                );
+        }
+    }
+}
+
 /* ------------------------------------------------------------------------- */
 
 class Predis_ConnectionParameters {
@@ -915,13 +1043,6 @@ class Predis_ConnectionParameters {
     }
 }
 
-interface Predis_IDistributionAlgorithm {
-    public function add($node, $weight = null);
-    public function remove($node);
-    public function get($key);
-    public function generateKey($value);
-}
-
 interface Predis_IConnection {
     public function connect();
     public function disconnect();
@@ -1001,7 +1122,7 @@ class Predis_Connection implements Predis_IConnection {
     }
 
     private function onCommunicationException($message, $code = null) {
-        Predis_Utilities_Shared::onCommunicationException(
+        Predis_Shared_Utils::onCommunicationException(
             new Predis_CommunicationException($this, $message, $code)
         );
     }
@@ -1102,9 +1223,9 @@ class Predis_Connection implements Predis_IConnection {
 class Predis_ConnectionCluster implements Predis_IConnection, IteratorAggregate {
     private $_pool, $_distributor;
 
-    public function __construct(Predis_IDistributionAlgorithm $distributor = null) {
+    public function __construct(Predis_Distribution_IDistributionAlgorithm $distributor = null) {
         $this->_pool = array();
-        $this->_distributor = $distributor !== null ? $distributor : new Predis_Utilities_HashRing();
+        $this->_distributor = $distributor !== null ? $distributor : new Predis_Distribution_HashRing();
     }
 
     public function isConnected() {
@@ -1523,19 +1644,16 @@ class Predis_RedisServer_vNext extends Predis_RedisServer_v2_0 {
 
 /* ------------------------------------------------------------------------- */
 
-class Predis_Utilities_Shared {
-    public static function onCommunicationException(Predis_CommunicationException $exception) {
-        if ($exception->shouldResetConnection()) {
-            $connection = $exception->getConnection();
-            if ($connection->isConnected()) {
-                $connection->disconnect();
-            }
-        }
-        throw $exception;
-    }
+interface Predis_Distribution_IDistributionAlgorithm {
+    public function add($node, $weight = null);
+    public function remove($node);
+    public function get($key);
+    public function generateKey($value);
 }
 
-class Predis_Utilities_HashRing implements Predis_IDistributionAlgorithm {
+class Predis_Distribution_EmptyRingException extends Exception { }
+
+class Predis_Distribution_HashRing implements Predis_Distribution_IDistributionAlgorithm {
     const DEFAULT_REPLICAS = 128;
     const DEFAULT_WEIGHT   = 100;
     private $_nodes, $_ring, $_ringKeys, $_ringKeysCount, $_replicas;
@@ -1594,7 +1712,7 @@ class Predis_Utilities_HashRing implements Predis_IDistributionAlgorithm {
             return;
         }
         if (count($this->_nodes) === 0) {
-            throw new LogicException('Cannot initialize empty hashring');
+            throw new Predis_Distribution_EmptyRingException('Cannot initialize empty hashring');
         }
 
         $this->_ring = array();
@@ -1657,7 +1775,7 @@ class Predis_Utilities_HashRing implements Predis_IDistributionAlgorithm {
     }
 }
 
-class Predis_Utilities_KetamaPureRing extends Predis_Utilities_HashRing {
+class Predis_Distribution_KetamaPureRing extends Predis_Distribution_HashRing {
     const DEFAULT_REPLICAS = 160;
 
     public function __construct() {
@@ -1689,7 +1807,21 @@ class Predis_Utilities_KetamaPureRing extends Predis_Utilities_HashRing {
     }
 }
 
-abstract class Predis_Utilities_MultiBulkResponseIteratorBase implements Iterator, Countable {
+/* ------------------------------------------------------------------------- */
+
+class Predis_Shared_Utils {
+    public static function onCommunicationException(Predis_CommunicationException $exception) {
+        if ($exception->shouldResetConnection()) {
+            $connection = $exception->getConnection();
+            if ($connection->isConnected()) {
+                $connection->disconnect();
+            }
+        }
+        throw $exception;
+    }
+}
+
+abstract class Predis_Shared_MultiBulkResponseIteratorBase implements Iterator, Countable {
     protected $_position, $_current, $_replySize;
 
     public function rewind() {
@@ -1725,7 +1857,7 @@ abstract class Predis_Utilities_MultiBulkResponseIteratorBase implements Iterato
     protected abstract function getValue();
 }
 
-class Predis_Utilities_MultiBulkResponseIterator extends Predis_Utilities_MultiBulkResponseIteratorBase {
+class Predis_Shared_MultiBulkResponseIterator extends Predis_Shared_MultiBulkResponseIteratorBase {
     private $_connection;
 
     public function __construct(Predis_Connection $connection, $size) {
@@ -1755,10 +1887,10 @@ class Predis_Utilities_MultiBulkResponseIterator extends Predis_Utilities_MultiB
     }
 }
 
-class Predis_Utilities_MultiBulkResponseKVIterator extends Predis_Utilities_MultiBulkResponseIteratorBase {
+class Predis_Shared_MultiBulkResponseKVIterator extends Predis_Shared_MultiBulkResponseIteratorBase {
     private $_iterator;
 
-    public function __construct(Predis_Utilities_MultiBulkResponseIterator $iterator) {
+    public function __construct(Predis_Shared_MultiBulkResponseIterator $iterator) {
         $virtualSize = count($iterator) / 2;
 
         $this->_iterator   = $iterator;
@@ -2116,7 +2248,7 @@ class Predis_Commands_ZSetRange extends Predis_MultiBulkCommand {
         if (count($arguments) === 4) {
             if (strtolower($arguments[3]) === 'withscores') {
                 if ($data instanceof Iterator) {
-                    return new Predis_Utilities_MultiBulkResponseKVIterator($data);
+                    return new Predis_Shared_MultiBulkResponseKVIterator($data);
                 }
                 $result = array();
                 for ($i = 0; $i < count($data); $i++) {
@@ -2241,7 +2373,7 @@ class Predis_Commands_HashGetAll extends Predis_MultiBulkCommand {
     public function getCommandId() { return 'HGETALL'; }
     public function parseResponse($data) {
         if ($data instanceof Iterator) {
-            return new Predis_Utilities_MultiBulkResponseKVIterator($data);
+            return new Predis_Shared_MultiBulkResponseKVIterator($data);
         }
         $result = array();
         for ($i = 0; $i < count($data); $i++) {

+ 4 - 4
test/PredisClientFeatures.php

@@ -93,7 +93,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
         $this->assertEquals('PING', $cmd->getCommandId());
         $this->assertFalse($cmd->closesConnection());
         $this->assertFalse($cmd->canBeHashed());
-        $this->assertNull($cmd->getHash(new Predis_Utilities_HashRing()));
+        $this->assertNull($cmd->getHash(new Predis_Distribution_HashRing()));
         $this->assertEquals("PING\r\n", $cmd->invoke());
     }
 
@@ -105,7 +105,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
         $this->assertEquals('GET', $cmd->getCommandId());
         $this->assertFalse($cmd->closesConnection());
         $this->assertTrue($cmd->canBeHashed());
-        $this->assertNotNull($cmd->getHash(new Predis_Utilities_HashRing()));
+        $this->assertNotNull($cmd->getHash(new Predis_Distribution_HashRing()));
         $this->assertEquals("GET key\r\n", $cmd->invoke());
     }
 
@@ -117,7 +117,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
         $this->assertEquals('SET', $cmd->getCommandId());
         $this->assertFalse($cmd->closesConnection());
         $this->assertTrue($cmd->canBeHashed());
-        $this->assertNotNull($cmd->getHash(new Predis_Utilities_HashRing()));
+        $this->assertNotNull($cmd->getHash(new Predis_Distribution_HashRing()));
         $this->assertEquals("SET key 5\r\nvalue\r\n", $cmd->invoke());
     }
 
@@ -129,7 +129,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
         $this->assertEquals('MSET', $cmd->getCommandId());
         $this->assertFalse($cmd->closesConnection());
         $this->assertFalse($cmd->canBeHashed());
-        $this->assertNull($cmd->getHash(new Predis_Utilities_HashRing()));
+        $this->assertNull($cmd->getHash(new Predis_Distribution_HashRing()));
         $this->assertEquals("*5\r\n$4\r\nMSET\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n", $cmd->invoke());
     }
 

+ 4 - 0
test/RedisCommandsTest.php

@@ -80,6 +80,7 @@ class RedisCommandTestSuite extends PHPUnit_Framework_TestCase {
         $this->assertFalse($this->redis->exists('hoge'));
     }
 
+
     /* commands operating on string values */
 
     function testSet() {
@@ -203,6 +204,9 @@ class RedisCommandTestSuite extends PHPUnit_Framework_TestCase {
 
         $this->redis->zadd('fooZSet', 0, 'bar');
         $this->assertEquals('zset', $this->redis->type('fooZSet'));
+
+        $this->redis->hset('fooHash', 'value', 'bar');
+        $this->assertEquals('hash', $this->redis->type('fooHash'));
     }
 
     function testAppend() {