Browse Source

Merge branch 'pubsub_experimental'

Daniele Alessandri 15 năm trước cách đây
mục cha
commit
84cb28e3ac
3 tập tin đã thay đổi với 173 bổ sung2 xóa
  1. 0 2
      TODO
  2. 49 0
      examples/PubSubContext.php
  3. 124 0
      lib/Predis.php

+ 0 - 2
TODO

@@ -5,7 +5,5 @@
   full battery of tests targeting specific functions of this library is still 
   full battery of tests targeting specific functions of this library is still 
   missing.
   missing.
 
 
-* Add a PubSubBlock class.
-
 * Missing tests for new commands: 
 * Missing tests for new commands: 
     PubSub  : PUBLISH, SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE
     PubSub  : PUBLISH, SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE

+ 49 - 0
examples/PubSubContext.php

@@ -0,0 +1,49 @@
+<?php
+require_once '../lib/Predis.php';
+
+// Create a client and disable r/w timeout on the socket
+$redis  = new \Predis\Client('redis://127.0.0.1:6379/?read_write_timeout=-1', 'dev');
+
+// Initialize a new pubsub context
+$pubsub = $redis->pubSubContext();
+
+// Subscribe to your channels
+$pubsub->subscribe('control_channel');
+$pubsub->subscribe('notifications');
+
+// Start processing the pubsup messages. Open a terminal and use redis-cli 
+// to push messages to the channels. Examples:
+//   ./redis-cli PUBLISH notifications "this is a test"
+//   ./redis-cli PUBLISH control_channel quit_loop
+foreach ($pubsub as $message) {
+    switch ($message->kind) {
+        case 'subscribe':
+            echo "Subscribed to {$message->channel}\n";
+            break;
+        case 'message':
+            if ($message->channel == 'control_channel') {
+                if ($message->payload == 'quit_loop') {
+                    echo "Aborting pubsub loop...\n";
+                    $pubsub->unsubscribe();
+                }
+                else {
+                    echo "Received an unregognized command: {$message->payload}.\n";
+                }
+            }
+            else {
+                echo "Received the following message from {$message->channel}:\n",
+                     "  {$message->payload}\n\n";
+            }
+            break;
+    }
+}
+
+// Always unset the pubsub context instance when you are done! The 
+// class destructor will take care of cleanups and prevent protocol 
+// desynchronizations between the client and the server.
+unset($pubsub);
+
+// Say goodbye :-)
+$info = $redis->info();
+print_r("Goodbye from Redis v{$info['redis_version']}!\n");
+?>

+ 124 - 0
lib/Predis.php

@@ -216,6 +216,10 @@ class Client {
         $multiExec = new MultiExecBlock($this);
         $multiExec = new MultiExecBlock($this);
         return $multiExecBlock !== null ? $multiExec->execute($multiExecBlock) : $multiExec;
         return $multiExecBlock !== null ? $multiExec->execute($multiExecBlock) : $multiExec;
     }
     }
+
+    public function pubSubContext() {
+        return new PubSubContext($this);
+    }
 }
 }
 
 
 /* ------------------------------------------------------------------------- */
 /* ------------------------------------------------------------------------- */
@@ -834,6 +838,126 @@ class MultiExecBlock {
     }
     }
 }
 }
 
 
+class PubSubContext implements \Iterator {
+    const SUBSCRIBE    = 'subscribe';
+    const UNSUBSCRIBE  = 'unsubscribe';
+    const PSUBSCRIBE   = 'psubscribe';
+    const PUNSUBSCRIBE = 'punsubscribe';
+    const MESSAGE      = 'message';
+    const PMESSAGE     = 'pmessage';
+
+    private $_redisClient, $_subscriptions, $_isStillValid, $_position;
+
+    public function __construct(Client $redisClient) {
+        $this->_redisClient   = $redisClient;
+        $this->_isStillValid  = true;
+        $this->_subscriptions = false;
+    }
+
+    public function __destruct() {
+        if ($this->valid()) {
+            $this->_redisClient->unsubscribe();
+            $this->_redisClient->punsubscribe();
+        }
+    }
+
+    public function subscribe(/* arguments */) {
+        $this->writeCommand(self::SUBSCRIBE, func_get_args());
+        $this->_subscriptions = true;
+    }
+
+    public function unsubscribe(/* arguments */) {
+        $this->writeCommand(self::UNSUBSCRIBE, func_get_args());
+    }
+
+    public function psubscribe(/* arguments */) {
+        $this->writeCommand(self::PSUBSCRIBE, func_get_args());
+        $this->_subscriptions = true;
+    }
+
+    public function punsubscribe(/* arguments */) {
+        $this->writeCommand(self::PUNSUBSCRIBE, func_get_args());
+    }
+
+    public function closeContext() {
+        if ($this->valid()) {
+            // TODO: as an optimization, we should not send both 
+            //       commands if one of them has not been issued.
+            $this->unsubscribe();
+            $this->punsubscribe();
+        }
+    }
+
+    private function writeCommand($method, $arguments) {
+        if (count($arguments) === 1 && is_array($arguments[0])) {
+            $arguments = $arguments[0];
+        }
+        $command = $this->_redisClient->createCommand($method, $arguments);
+        $this->_redisClient->getConnection()->writeCommand($command);
+    }
+
+    public function rewind() {
+        // NOOP
+    }
+
+    public function current() {
+        return $this->getValue();
+    }
+
+    public function key() {
+        return $this->_position;
+    }
+
+    public function next() {
+        if ($this->_isStillValid) {
+            $this->_position++;
+        }
+        return $this->_position;
+    }
+
+    public function valid() {
+        return $this->_subscriptions && $this->_isStillValid;
+    }
+
+    private function invalidate() {
+        $this->_isStillValid = false;
+        $this->_subscriptions = false;
+    }
+
+    private function getValue() {
+        $reader     = $this->_redisClient->getResponseReader();
+        $connection = $this->_redisClient->getConnection();
+        $response   = $reader->read($connection);
+
+        switch ($response[0]) {
+            case self::SUBSCRIBE:
+            case self::UNSUBSCRIBE:
+            case self::PSUBSCRIBE:
+            case self::PUNSUBSCRIBE:
+                if ($response[2] === 0) {
+                    $this->invalidate();
+                }
+            case self::MESSAGE:
+                return (object) array(
+                    'kind'    => $response[0],
+                    'channel' => $response[1],
+                    'payload' => $response[2],
+                );
+            case self::PMESSAGE:
+                return (object) array(
+                    'kind'    => $response[0],
+                    'pattern' => $response[1],
+                    'channel' => $response[2],
+                    'payload' => $response[3],
+                );
+            default:
+                throw new \Predis\ClientException(
+                    "Received an unknown message type {$response[0]} inside of a pubsub context"
+                );
+        }
+    }
+}
+
 /* ------------------------------------------------------------------------- */
 /* ------------------------------------------------------------------------- */
 
 
 class ConnectionParameters {
 class ConnectionParameters {