Browse Source

Optimize command pipelines.

Using SplQueue instead of a plain PHP array to queue command instance in
the pipeline is faster and makes the underlying implementation definitely
more clean.

This change is not going to be backported to Predis v0.7 due to changes
in Predis\Pipeline\PipelineContextInterface.
Daniele Alessandri 13 years ago
parent
commit
8a84f8b6cf

+ 4 - 0
CHANGELOG.md

@@ -29,6 +29,10 @@ v0.8.0 (201x-xx-xx)
 
 - `Predis\Options\Option` is now abstract, see `Predis\Option\AbstractOption`.
 
+- Command pipelines have been optimized for both speed and code cleanness, but
+  at the cost of bringing a breaking change in the signature of the interface
+  for pipeline executors.
+
 
 v0.7.2 (2012-04-01)
 ===============================================================================

+ 4 - 3
lib/Predis/Pipeline/FireAndForgetExecutor.php

@@ -11,6 +11,7 @@
 
 namespace Predis\Pipeline;
 
+use SplQueue;
 use Predis\Connection\ConnectionInterface;
 use Predis\Connection\ReplicationConnectionInterface;
 
@@ -39,12 +40,12 @@ class FireAndForgetExecutor implements PipelineExecutorInterface
     /**
      * {@inheritdoc}
      */
-    public function execute(ConnectionInterface $connection, Array &$commands)
+    public function execute(ConnectionInterface $connection, SplQueue $commands)
     {
         $this->checkConnection($connection);
 
-        foreach ($commands as $command) {
-            $connection->writeCommand($command);
+        while (!$commands->isEmpty()) {
+            $connection->writeCommand($commands->dequeue());
         }
 
         $connection->disconnect();

+ 11 - 10
lib/Predis/Pipeline/PipelineContext.php

@@ -11,6 +11,7 @@
 
 namespace Predis\Pipeline;
 
+use SplQueue;
 use Predis\ClientInterface;
 use Predis\BasicClientInterface;
 use Predis\ExecutableContextInterface;
@@ -28,8 +29,8 @@ class PipelineContext implements BasicClientInterface, ExecutableContextInterfac
 {
     private $client;
     private $executor;
+    private $pipeline;
 
-    private $pipeline = array();
     private $replies = array();
     private $running = false;
 
@@ -41,6 +42,7 @@ class PipelineContext implements BasicClientInterface, ExecutableContextInterfac
     {
         $this->client = $client;
         $this->executor = $this->createExecutor($client, $options ?: array());
+        $this->pipeline = new SplQueue();
     }
 
     /**
@@ -96,7 +98,7 @@ class PipelineContext implements BasicClientInterface, ExecutableContextInterfac
      */
     protected function recordCommand(CommandInterface $command)
     {
-        $this->pipeline[] = $command;
+        $this->pipeline->enqueue($command);
     }
 
     /**
@@ -117,15 +119,14 @@ class PipelineContext implements BasicClientInterface, ExecutableContextInterfac
      */
     public function flushPipeline($send = true)
     {
-        if (count($this->pipeline) > 0) {
-            if ($send) {
-                $connection = $this->client->getConnection();
-                $replies = $this->executor->execute($connection, $this->pipeline);
+        if ($send && !$this->pipeline->isEmpty()) {
+            $connection = $this->client->getConnection();
+            $replies = $this->executor->execute($connection, $this->pipeline);
 
-                $this->replies = array_merge($this->replies, $replies);
-            }
-
-            $this->pipeline = array();
+            $this->replies = array_merge($this->replies, $replies);
+        }
+        else {
+            $this->pipeline = new SplQueue();
         }
 
         return $this;

+ 3 - 2
lib/Predis/Pipeline/PipelineExecutorInterface.php

@@ -11,6 +11,7 @@
 
 namespace Predis\Pipeline;
 
+use SplQueue;
 use Predis\Connection\ConnectionInterface;
 
 /**
@@ -25,8 +26,8 @@ interface PipelineExecutorInterface
      * Writes a list of commands to the network and reads back their replies.
      *
      * @param ConnectionInterface $connection Connection to Redis.
-     * @param array $commands List of commands.
+     * @param SplQueue $commands Commands queued for execution.
      * @return array
      */
-    public function execute(ConnectionInterface $connection, Array &$commands);
+    public function execute(ConnectionInterface $connection, SplQueue $commands);
 }

+ 9 - 9
lib/Predis/Pipeline/SafeClusterExecutor.php

@@ -11,6 +11,7 @@
 
 namespace Predis\Pipeline;
 
+use SplQueue;
 use Predis\CommunicationException;
 use Predis\Connection\ConnectionInterface;
 
@@ -26,11 +27,11 @@ class SafeClusterExecutor implements PipelineExecutorInterface
     /**
      * {@inheritdoc}
      */
-    public function execute(ConnectionInterface $connection, Array &$commands)
+    public function execute(ConnectionInterface $connection, SplQueue $commands)
     {
-        $connectionExceptions = array();
-        $sizeofPipe = count($commands);
+        $size = count($commands);
         $values = array();
+        $connectionExceptions = array();
 
         foreach ($commands as $command) {
             $cmdConnection = $connection->getConnection($command);
@@ -47,24 +48,23 @@ class SafeClusterExecutor implements PipelineExecutorInterface
             }
         }
 
-        for ($i = 0; $i < $sizeofPipe; $i++) {
-            $command = $commands[$i];
-            unset($commands[$i]);
+        for ($i = 0; $i < $size; $i++) {
+            $command = $commands->dequeue();
 
             $cmdConnection = $connection->getConnection($command);
             $connectionObjectHash = spl_object_hash($cmdConnection);
 
             if (isset($connectionExceptions[$connectionObjectHash])) {
-                $values[] = $connectionExceptions[$connectionObjectHash];
+                $values[$i] = $connectionExceptions[$connectionObjectHash];
                 continue;
             }
 
             try {
                 $response = $cmdConnection->readResponse($command);
-                $values[] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
+                $values[$i] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
             }
             catch (CommunicationException $exception) {
-                $values[] = $exception;
+                $values[$i] = $exception;
                 $connectionExceptions[$connectionObjectHash] = $exception;
             }
         }

+ 7 - 7
lib/Predis/Pipeline/SafeExecutor.php

@@ -11,6 +11,7 @@
 
 namespace Predis\Pipeline;
 
+use SplQueue;
 use Predis\ServerException;
 use Predis\CommunicationException;
 use Predis\Connection\ConnectionInterface;
@@ -26,9 +27,9 @@ class SafeExecutor implements PipelineExecutorInterface
     /**
      * {@inheritdoc}
      */
-    public function execute(ConnectionInterface $connection, Array &$commands)
+    public function execute(ConnectionInterface $connection, SplQueue $commands)
     {
-        $sizeofPipe = count($commands);
+        $size = count($commands);
         $values = array();
 
         foreach ($commands as $command) {
@@ -36,17 +37,16 @@ class SafeExecutor implements PipelineExecutorInterface
                 $connection->writeCommand($command);
             }
             catch (CommunicationException $exception) {
-                return array_fill(0, $sizeofPipe, $exception);
+                return array_fill(0, $size, $exception);
             }
         }
 
-        for ($i = 0; $i < $sizeofPipe; $i++) {
-            $command = $commands[$i];
-            unset($commands[$i]);
+        for ($i = 0; $i < $size; $i++) {
+            $command = $commands->dequeue();
 
             try {
                 $response = $connection->readResponse($command);
-                $values[] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
+                $values[$i] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
             }
             catch (CommunicationException $exception) {
                 $toAdd = count($commands) - count($values);

+ 6 - 6
lib/Predis/Pipeline/StandardExecutor.php

@@ -11,6 +11,7 @@
 
 namespace Predis\Pipeline;
 
+use SplQueue;
 use Predis\ResponseErrorInterface;
 use Predis\Connection\ConnectionInterface;
 use Predis\Connection\ReplicationConnectionInterface;
@@ -65,10 +66,10 @@ class StandardExecutor implements PipelineExecutorInterface
     /**
      * {@inheritdoc}
      */
-    public function execute(ConnectionInterface $connection, Array &$commands)
+    public function execute(ConnectionInterface $connection, SplQueue $commands)
     {
+        $size = count($commands);
         $values = array();
-        $sizeofPipe = count($commands);
         $useServerExceptions = $this->useServerExceptions;
 
         $this->checkConnection($connection);
@@ -77,15 +78,14 @@ class StandardExecutor implements PipelineExecutorInterface
             $connection->writeCommand($command);
         }
 
-        for ($i = 0; $i < $sizeofPipe; $i++) {
-            $response = $connection->readResponse($commands[$i]);
+        for ($i = 0; $i < $size; $i++) {
+            $response = $connection->readResponse($commands->dequeue());
 
             if ($response instanceof ResponseErrorInterface && $useServerExceptions === true) {
                 $this->onResponseError($connection, $response);
             }
 
-            $values[] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
-            unset($commands[$i]);
+            $values[$i] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
         }
 
         return $values;