فهرست منبع

Refactored the pipeline bits. Now a pipeline buffer can be flushed from inside a pipeline block to free up the memory used by the commands stored in the buffer on the client.

Daniele Alessandri 16 سال پیش
والد
کامیت
894ed83fea
1فایلهای تغییر یافته به همراه53 افزوده شده و 33 حذف شده
  1. 53 33
      lib/Predis.php

+ 53 - 33
lib/Predis.php

@@ -4,7 +4,6 @@ namespace Predis;
 class PredisException extends \Exception { }
 class ClientException extends PredisException { }
 class ServerException extends PredisException { }
-class PipelineException extends ClientException { }
 class MalformedServerResponse extends ServerException { }
 
 /* ------------------------------------------------------------------------- */
@@ -95,36 +94,8 @@ class Client {
     }
 
     public function pipeline(\Closure $pipelineBlock) {
-        $pipelineBlockException = null;
-        $returnValues = array();
-
-        try {
-            $pipeline = new CommandPipeline($this);
-            $this->_pipelining = true;
-            $pipelineBlock($pipeline);
-            // TODO: this should be moved entirely into the 
-            //       self-contained CommandPipeline instance.
-            $recordedCommands = $pipeline->getRecordedCommands();
-
-            foreach ($recordedCommands as $command) {
-                $this->_connection->writeCommand($command);
-            }
-            foreach ($recordedCommands as $command) {
-                $returnValues[] = $this->_connection->readResponse($command);
-            }
-        }
-        catch (\Exception $exception) {
-            $pipelineBlockException = $exception;
-        }
-
-        $this->_pipelining = false;
-
-        if ($pipelineBlockException !== null) {
-            throw new PipelineException('An exception has occurred inside of a pipeline block', 
-                null, $pipelineBlockException);
-        }
-
-        return $returnValues;
+        $pipeline = new CommandPipeline($this);
+        return $pipeline->execute($pipelineBlock);
     }
 
     public function registerCommands(Array $commands) {
@@ -492,11 +463,12 @@ class Response {
 }
 
 class CommandPipeline {
-    private $_redisClient, $_pipelineBuffer;
+    private $_redisClient, $_pipelineBuffer, $_returnValues, $_running;
 
     public function __construct(Client $redisClient) {
         $this->_redisClient    = $redisClient;
         $this->_pipelineBuffer = array();
+        $this->_returnValues   = array();
     }
 
     public function __call($method, $arguments) {
@@ -508,9 +480,57 @@ class CommandPipeline {
         $this->_pipelineBuffer[] = $command;
     }
 
-    public function getRecordedCommands() {
+    private function getRecordedCommands() {
         return $this->_pipelineBuffer;
     }
+
+    public function flushPipeline() {
+        if (count($this->_pipelineBuffer) === 0) {
+            return;
+        }
+
+        $connection = $this->_redisClient->getConnection();
+        $commands   = &$this->getRecordedCommands();
+
+        foreach ($commands as $command) {
+            $connection->writeCommand($command);
+        }
+        foreach ($commands as $command) {
+            $this->_returnValues[] = $connection->readResponse($command);
+        }
+
+        $this->_pipelineBuffer = array();
+    }
+
+    private function setRunning($bool) {
+        // TODO: I am honest when I say that I don't like this approach.
+        if ($bool == true && $this->_running == true) {
+            throw new ClientException("This pipeline is already opened");
+        }
+
+        $this->_running = $bool;
+    }
+
+    public function execute(\Closure $block) {
+        $this->setRunning(true);
+        $pipelineBlockException = null;
+
+        try {
+            $block($this);
+            $this->flushPipeline();
+        }
+        catch (\Exception $exception) {
+            $pipelineBlockException = $exception;
+        }
+
+        $this->setRunning(false);
+
+        if ($pipelineBlockException !== null) {
+            throw $pipelineBlockException;
+        }
+
+        return $this->_returnValues;
+    }
 }
 
 /* ------------------------------------------------------------------------- */