Преглед на файлове

Merge branch 'pipeline'

Conflicts:
	lib/Predis.php
Daniele Alessandri преди 15 години
родител
ревизия
c1fb041014
променени са 1 файла, в които са добавени 61 реда и са изтрити 46 реда
  1. 61 46
      lib/Predis.php

+ 61 - 46
lib/Predis.php

@@ -4,7 +4,6 @@ namespace Predis;
 class PredisException extends \Exception { }
 class ClientException extends PredisException { }
 class ServerException extends PredisException { }
-class PipelineException extends ClientException { }
 class MalformedServerResponse extends ServerException { }
 
 /* ------------------------------------------------------------------------- */
@@ -13,12 +12,10 @@ class Client {
     // TODO: command arguments should be sanitized or checked for bad arguments 
     //       (e.g. CRLF in keys for inline commands)
 
-    private $_connection, $_registeredCommands, $_pipelining;
+    private $_connection, $_registeredCommands;
 
     public function __construct($host = Connection::DEFAULT_HOST, $port = Connection::DEFAULT_PORT) {
-        $this->_pipelining = false;
         $this->_registeredCommands = self::initializeDefaultCommands();
-
         $this->setConnection($this->createConnection(
             func_num_args() === 1 && is_array($host) || @stripos('redis://') === 0
                 ? $host
@@ -108,16 +105,11 @@ class Client {
     }
 
     public function executeCommand(Command $command) {
-        if ($this->_pipelining === false) {
-            $this->_connection->writeCommand($command);
-            if ($command->closesConnection()) {
-                return $this->_connection->disconnect();
-            }
-            return $this->_connection->readResponse($command);
-        }
-        else {
-            $this->_pipelineBuffer[] = $command;
+        $this->_connection->writeCommand($command);
+        if ($command->closesConnection()) {
+            return $this->_connection->disconnect();
         }
+        return $this->_connection->readResponse($command);
     }
 
     public function rawCommand($rawCommandData, $closesConnection = false) {
@@ -129,37 +121,9 @@ class Client {
         return $this->_connection->rawCommand($rawCommandData, $closesConnection);
     }
 
-    public function pipeline(\Closure $pipelineBlock) {
-        $pipelineBlockException = null;
-        $returnValues = array();
-
-        try {
-            $pipeline = new CommandPipeline($this);
-            $this->_pipelining = true;
-            $pipelineBlock($pipeline);
-            // TODO: this should be moved entirely into the 
-            //       self-contained CommandPipeline instance.
-            $recordedCommands = $pipeline->getRecordedCommands();
-
-            foreach ($recordedCommands as $command) {
-                $this->_connection->writeCommand($command);
-            }
-            foreach ($recordedCommands as $command) {
-                $returnValues[] = $this->_connection->readResponse($command);
-            }
-        }
-        catch (\Exception $exception) {
-            $pipelineBlockException = $exception;
-        }
-
-        $this->_pipelining = false;
-
-        if ($pipelineBlockException !== null) {
-            throw new PipelineException('An exception has occurred inside of a pipeline block', 
-                null, $pipelineBlockException);
-        }
-
-        return $returnValues;
+    public function pipeline(\Closure $pipelineBlock = null) {
+        $pipeline = new CommandPipeline($this);
+        return $pipelineBlock !== null ? $pipeline->execute($pipelineBlock) : $pipeline;
     }
 
     public function registerCommands(Array $commands) {
@@ -529,11 +493,12 @@ class Response {
 }
 
 class CommandPipeline {
-    private $_redisClient, $_pipelineBuffer;
+    private $_redisClient, $_pipelineBuffer, $_returnValues, $_running;
 
     public function __construct(Client $redisClient) {
         $this->_redisClient    = $redisClient;
         $this->_pipelineBuffer = array();
+        $this->_returnValues   = array();
     }
 
     public function __call($method, $arguments) {
@@ -545,9 +510,59 @@ class CommandPipeline {
         $this->_pipelineBuffer[] = $command;
     }
 
-    public function getRecordedCommands() {
+    private function getRecordedCommands() {
         return $this->_pipelineBuffer;
     }
+
+    public function flushPipeline() {
+        if (count($this->_pipelineBuffer) === 0) {
+            return;
+        }
+
+        $connection = $this->_redisClient->getConnection();
+        $commands   = &$this->getRecordedCommands();
+
+        foreach ($commands as $command) {
+            $connection->writeCommand($command);
+        }
+        foreach ($commands as $command) {
+            $this->_returnValues[] = $connection->readResponse($command);
+        }
+
+        $this->_pipelineBuffer = array();
+    }
+
+    private function setRunning($bool) {
+        // TODO: I am honest when I say that I don't like this approach.
+        if ($bool == true && $this->_running == true) {
+            throw new ClientException("This pipeline is already opened");
+        }
+
+        $this->_running = $bool;
+    }
+
+    public function execute(\Closure $block = null) {
+        $this->setRunning(true);
+        $pipelineBlockException = null;
+
+        try {
+            if ($block !== null) {
+                $block($this);
+            }
+            $this->flushPipeline();
+        }
+        catch (\Exception $exception) {
+            $pipelineBlockException = $exception;
+        }
+
+        $this->setRunning(false);
+
+        if ($pipelineBlockException !== null) {
+            throw $pipelineBlockException;
+        }
+
+        return $this->_returnValues;
+    }
 }
 
 /* ------------------------------------------------------------------------- */