浏览代码

Implemented the new \Predis\PubSubContext class.

Daniele Alessandri 15 年之前
父节点
当前提交
7312dbe7fc
共有 1 个文件被更改,包括 120 次插入0 次删除
  1. 120 0
      lib/Predis.php

+ 120 - 0
lib/Predis.php

@@ -834,6 +834,126 @@ class MultiExecBlock {
     }
 }
 
+class PubSubContext implements \Iterator {
+    const SUBSCRIBE    = 'subscribe';
+    const UNSUBSCRIBE  = 'unsubscribe';
+    const PSUBSCRIBE   = 'psubscribe';
+    const PUNSUBSCRIBE = 'punsubscribe';
+    const MESSAGE      = 'message';
+    const PMESSAGE     = 'pmessage';
+
+    private $_redisClient, $_subscriptions, $_isStillValid, $_position;
+
+    public function __construct(Client $redisClient) {
+        $this->_redisClient   = $redisClient;
+        $this->_isStillValid  = true;
+        $this->_subscriptions = false;
+    }
+
+    public function __destruct() {
+        if ($this->valid()) {
+            $this->_redisClient->unsubscribe();
+            $this->_redisClient->punsubscribe();
+        }
+    }
+
+    public function subscribe(/* arguments */) {
+        $this->writeCommand(self::SUBSCRIBE, func_get_args());
+        $this->_subscriptions = true;
+    }
+
+    public function unsubscribe(/* arguments */) {
+        $this->writeCommand(self::UNSUBSCRIBE, func_get_args());
+    }
+
+    public function psubscribe(/* arguments */) {
+        $this->writeCommand(self::PSUBSCRIBE, func_get_args());
+        $this->_subscriptions = true;
+    }
+
+    public function punsubscribe(/* arguments */) {
+        $this->writeCommand(self::PUNSUBSCRIBE, func_get_args());
+    }
+
+    public function closeContext() {
+        if ($this->valid()) {
+            // TODO: as an optimization, we should not send both 
+            //       commands if one of them has not been issued.
+            $this->unsubscribe();
+            $this->punsubscribe();
+        }
+    }
+
+    private function writeCommand($method, $arguments) {
+        if (count($arguments) === 1 && is_array($arguments[0])) {
+            $arguments = $arguments[0];
+        }
+        $command = $this->_redisClient->createCommand($method, $arguments);
+        $this->_redisClient->getConnection()->writeCommand($command);
+    }
+
+    public function rewind() {
+        // NOOP
+    }
+
+    public function current() {
+        return $this->getValue();
+    }
+
+    public function key() {
+        return $this->_position;
+    }
+
+    public function next() {
+        if ($this->_isStillValid) {
+            $this->_position++;
+        }
+        return $this->_position;
+    }
+
+    public function valid() {
+        return $this->_subscriptions && $this->_isStillValid;
+    }
+
+    private function invalidate() {
+        $this->_isStillValid = false;
+        $this->_subscriptions = false;
+    }
+
+    private function getValue() {
+        $reader     = $this->_redisClient->getResponseReader();
+        $connection = $this->_redisClient->getConnection();
+        $response   = $reader->read($connection);
+
+        switch ($response[0]) {
+            case self::SUBSCRIBE:
+            case self::UNSUBSCRIBE:
+            case self::PSUBSCRIBE:
+            case self::PUNSUBSCRIBE:
+                if ($response[2] === 0) {
+                    $this->invalidate();
+                }
+            case self::MESSAGE:
+                return (object) array(
+                    'kind'    => $response[0],
+                    'channel' => $response[1],
+                    'payload' => $response[2],
+                );
+            case self::PMESSAGE:
+                return (object) array(
+                    'kind'    => $response[0],
+                    'pattern' => $response[1],
+                    'channel' => $response[2],
+                    'payload' => $response[3],
+                );
+            default:
+                throw new \Predis\ClientException(
+                    "Received an unknown message type {$response[0]} inside of a pubsub context"
+                );
+        }
+    }
+}
+
 /* ------------------------------------------------------------------------- */
 
 class ConnectionParameters {