浏览代码

Merge branch 'multibulk_streaming_new'

Conflicts:
	lib/Predis.php
Daniele Alessandri 15 年之前
父节点
当前提交
d03c7c42b2
共有 1 个文件被更改,包括 116 次插入4 次删除
  1. 116 4
      lib/Predis.php

+ 116 - 4
lib/Predis.php

@@ -348,6 +348,15 @@ class ResponseMultiBulkHandler implements IResponseHandler {
     }
 }
 
+class ResponseMultiBulkStreamHandler implements IResponseHandler {
+    public function handle($socket, $prefix, $rawLength) {
+        if (!is_numeric($rawLength)) {
+            throw new ClientException("Cannot parse '$rawLength' as data length");
+        }
+        return new Utilities\MultiBulkResponseIterator($socket, (int)$rawLength);
+    }
+}
+
 class ResponseIntegerHandler implements IResponseHandler {
     public function handle(ResponseReader $reader, $socket, $number) {
         if (is_numeric($number)) {
@@ -381,7 +390,7 @@ class ResponseReader {
             '-' => new ResponseErrorHandler(), 
             ':' => new ResponseIntegerHandler(), 
             '$' => new ResponseBulkHandler(), 
-            '*' => new ResponseMultiBulkHandler()
+            '*' => new ResponseMultiBulkHandler(), 
         );
     }
 
@@ -467,7 +476,11 @@ class CommandPipeline {
             $connection->writeCommand($command);
         }
         for ($i = 0; $i < $sizeofPipe; $i++) {
-            $this->_returnValues[] = $connection->readResponse($commands[$i]);
+            $response = $connection->readResponse($commands[$i]);
+            $this->_returnValues[] = ($response instanceof \Iterator
+                ? iterator_to_array($response)
+                : $response
+            );
             unset($commands[$i]);
         }
         $this->_pipelineBuffer = array();
@@ -566,7 +579,10 @@ class MultiExecBlock {
                 return;
             }
 
-            $execReply = $this->_redisClient->exec();
+            $execReply = (($reply = $this->_redisClient->exec()) instanceof \Iterator
+                ? iterator_to_array($reply)
+                : $reply
+            );
             $commands  = &$this->_commands;
             $sizeofReplies = count($execReply);
 
@@ -576,7 +592,10 @@ class MultiExecBlock {
             }
 
             for ($i = 0; $i < $sizeofReplies; $i++) {
-                $returnValues[] = $commands[$i]->parseResponse($execReply[$i]);
+                $returnValues[] = $commands[$i]->parseResponse($execReply[$i] instanceof \Iterator
+                    ? iterator_to_array($execReply[$i])
+                    : $execReply[$i]
+                );
                 unset($commands[$i]);
             }
         }
@@ -1211,6 +1230,96 @@ class HashRing {
     }
 }
 
+abstract class MultiBulkResponseIteratorBase implements \Iterator, \Countable {
+    protected $_position, $_current, $_replySize;
+
+    public function rewind() {
+        // NOOP
+    }
+
+    public function current() {
+        return $this->_current;
+    }
+
+    public function key() {
+        return $this->_position;
+    }
+
+    public function next() {
+        if (++$this->_position < $this->_replySize) {
+            $this->_current = $this->getValue();
+        }
+        return $this->_position;
+    }
+
+    public function valid() {
+        return $this->_position < $this->_replySize;
+    }
+
+    public function count() {
+        // NOTE: use count if you want to get the size of the current multi-bulk 
+        //       response without using iterator_count (which actually consumes 
+        //       our iterator to calculate the size, and we cannot perform a rewind)
+        return $this->_replySize;
+    }
+
+    protected abstract function getValue();
+}
+
+class MultiBulkResponseIterator extends MultiBulkResponseIteratorBase {
+    private $_connection;
+
+    public function __construct($socket, $size) {
+        $this->_connection = $socket;
+        $this->_position   = 0;
+        $this->_current    = $size > 0 ? $this->getValue() : null;
+        $this->_replySize  = $size;
+    }
+
+    public function __destruct() {
+        // when the iterator is garbage-collected (e.g. it goes out of the
+        // scope of a foreach) but it has not reached its end, we must sync
+        // the client with the queued elements that have not been read from
+        // the connection with the server.
+        $this->sync();
+    }
+
+    public function sync() {
+        while ($this->valid()) {
+            $this->next();
+        }
+    }
+
+    protected function getValue() {
+        return \Predis\Response::read($this->_connection);
+    }
+}
+
+class MultiBulkResponseKVIterator extends MultiBulkResponseIteratorBase {
+    private $_iterator;
+
+    public function __construct(MultiBulkResponseIterator $iterator) {
+        $virtualSize = count($iterator) / 2;
+
+        $this->_iterator   = $iterator;
+        $this->_position   = 0;
+        $this->_current    = $virtualSize > 0 ? $this->getValue() : null;
+        $this->_replySize  = $virtualSize;
+    }
+
+    public function __destruct() {
+        $this->_iterator->sync();
+    }
+
+    protected function getValue() {
+        $k = $this->_iterator->current();
+        $this->_iterator->next();
+        $v = $this->_iterator->current();
+        $this->_iterator->next();
+        return array($k, $v);
+    }
+}
+
 /* ------------------------------------------------------------------------- */
 
 namespace Predis\Commands;
@@ -1502,6 +1611,9 @@ class ZSetRange extends \Predis\InlineCommand {
         $arguments = $this->getArguments();
         if (count($arguments) === 4) {
             if (strtolower($arguments[3]) === 'withscores') {
+                if ($data instanceof \Iterator) {
+                    return new \Predis\Utilities\MultiBulkResponseKVIterator($data);
+                }
                 $result = array();
                 for ($i = 0; $i < count($data); $i++) {
                     $result[] = array($data[$i], $data[++$i]);