Browse Source

Handling of partial failures and server errors in pipelines.

Daniele Alessandri 15 years ago
parent
commit
b3f340f830
1 changed files with 145 additions and 26 deletions
  1. 145 26
      lib/Predis.php

+ 145 - 26
lib/Predis.php

@@ -208,8 +208,20 @@ class Client {
     }
 
     public function pipeline($pipelineBlock = null) {
-        $pipeline = new CommandPipeline($this);
-        return $pipelineBlock !== null ? $pipeline->execute($pipelineBlock) : $pipeline;
+        return $this->pipelineExecute(new CommandPipeline($this), $pipelineBlock);
+    }
+
+    public function pipelineSafe($pipelineBlock = null) {
+        $connection = $this->getConnection();
+        $pipeline   = new CommandPipeline($this, $connection instanceof Connection
+            ? new Pipeline\SafeExecutor($connection)
+            : new Pipeline\SafeClusterExecutor($connection)
+        );
+        return $this->pipelineExecute($pipeline, $pipelineBlock);
+    }
+
+    private function pipelineExecute(CommandPipeline $pipeline, $block) {
+        return $block !== null ? $pipeline->execute($block) : $pipeline;
     }
 
     public function multiExec($multiExecBlock = null) {
@@ -654,10 +666,11 @@ class ResponseQueued {
 /* ------------------------------------------------------------------------- */
 
 class CommandPipeline {
-    private $_redisClient, $_pipelineBuffer, $_returnValues, $_running;
+    private $_redisClient, $_pipelineBuffer, $_returnValues, $_running, $_executor;
 
-    public function __construct(Client $redisClient) {
+    public function __construct(Client $redisClient, Pipeline\IExecutor $executor = null) {
         $this->_redisClient    = $redisClient;
+        $this->_executor       = $executor ?: new Pipeline\StandardExecutor();
         $this->_pipelineBuffer = array();
         $this->_returnValues   = array();
     }
@@ -677,27 +690,14 @@ class CommandPipeline {
     }
 
     public function flushPipeline() {
-        $sizeofPipe = count($this->_pipelineBuffer);
-        if ($sizeofPipe === 0) {
-            return;
-        }
-
-        $connection = $this->_redisClient->getConnection();
-        $commands   = &$this->_pipelineBuffer;
-
-        foreach ($commands as $command) {
-            $connection->writeCommand($command);
-        }
-        for ($i = 0; $i < $sizeofPipe; $i++) {
-            $response = $connection->readResponse($commands[$i]);
-            $this->_returnValues[] = ($response instanceof \Iterator
-                ? iterator_to_array($response)
-                : $response
+        if (count($this->_pipelineBuffer) > 0) {
+            $connection = $this->_redisClient->getConnection();
+            $this->_returnValues = array_merge(
+                $this->_returnValues, 
+                $this->_executor->execute($connection, $this->_pipelineBuffer)
             );
-            unset($commands[$i]);
+            $this->_pipelineBuffer = array();
         }
-        $this->_pipelineBuffer = array();
-
         return $this;
     }
 
@@ -705,7 +705,6 @@ class CommandPipeline {
         if ($bool == true && $this->_running == true) {
             throw new ClientException("This pipeline is already opened");
         }
-
         $this->_running = $bool;
     }
 
@@ -722,13 +721,12 @@ class CommandPipeline {
             if ($block !== null) {
                 $block($this);
             }
-            $this->flushPipeline();
         }
         catch (\Exception $exception) {
-            // TODO: client/server desync on ServerException
             $pipelineBlockException = $exception;
         }
 
+        $this->flushPipeline();
         $this->setRunning(false);
 
         if ($pipelineBlockException !== null) {
@@ -1529,6 +1527,127 @@ class RedisServer_vNext extends RedisServer_v2_0 {
 
 /* ------------------------------------------------------------------------- */
 
+namespace Predis\Pipeline;
+
+interface IExecutor {
+    public function execute(\Predis\IConnection $connection, &$commands);
+}
+
+class StandardExecutor implements IExecutor {
+    public function execute(\Predis\IConnection $connection, &$commands) {
+        $sizeofPipe = count($commands);
+        $values = array();
+
+        foreach ($commands as $command) {
+            $connection->writeCommand($command);
+        }
+        for ($i = 0; $i < $sizeofPipe; $i++) {
+            $response = $connection->readResponse($commands[$i]);
+            $values[] = $response instanceof \Iterator
+                ? iterator_to_array($response)
+                : $response;
+            unset($commands[$i]);
+        }
+
+        return $values;
+    }
+}
+
+class SafeExecutor implements IExecutor {
+    public function execute(\Predis\IConnection $connection, &$commands) {
+        $firstServerException = null;
+        $sizeofPipe = count($commands);
+        $values = array();
+
+        foreach ($commands as $command) {
+            try {
+                $this->_connection->writeCommand($command);
+            }
+            catch (CommunicationException $exception) {
+                return array_fill(0, $sizeofPipe, $exception);
+            }
+        }
+
+        for ($i = 0; $i < $sizeofPipe; $i++) {
+            $command = $commands[$i];
+            unset($commands[$i]);
+            try {
+                $response = $this->_connection->readResponse($command);
+                $values[] = ($response instanceof \Iterator
+                    ? iterator_to_array($response)
+                    : $response
+                );
+            }
+            catch (\Predis\ServerException $exception) {
+                $values[] = $exception->toResponseError();
+            }
+            catch (\Predis\CommunicationException $exception) {
+                if ($throwExceptions) {
+                    throw $exception;
+                }
+                $toAdd  = count($commands) - count($values);
+                $values = array_merge($values, array_fill(0, $toAdd, $exception));
+                break;
+            }
+        }
+
+        return $values;
+    }
+}
+
+class SafeClusterExecutor implements IExecutor {
+    public function execute(\Predis\IConnection $connection, &$commands) {
+        $connectionExceptions = array();
+        $sizeofPipe = count($commands);
+        $values = array();
+
+        foreach ($commands as $command) {
+            $cmdConnection = $connection->getConnection($command);
+            if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) {
+                continue;
+            }
+            try {
+                $cmdConnection->writeCommand($command);
+            }
+            catch (\Predis\CommunicationException $exception) {
+                $connectionExceptions[spl_object_hash($cmdConnection)] = $exception;
+            }
+        }
+
+        for ($i = 0; $i < $sizeofPipe; $i++) {
+            $command = $commands[$i];
+            unset($commands[$i]);
+
+            $cmdConnection = $connection->getConnection($command);
+            $connectionObjectHash = spl_object_hash($cmdConnection);
+
+            if (isset($connectionExceptions[$connectionObjectHash])) {
+                $values[] = $connectionExceptions[$connectionObjectHash];
+                continue;
+            }
+
+            try {
+                $response = $cmdConnection->readResponse($command);
+                $values[] = ($response instanceof \Iterator
+                    ? iterator_to_array($response)
+                    : $response
+                );
+            }
+            catch (\Predis\ServerException $exception) {
+                $values[] = $exception->toResponseError();
+            }
+            catch (\Predis\CommunicationException $exception) {
+                $values[] = $exception;
+                $connectionExceptions[$connectionObjectHash] = $exception;
+            }
+        }
+
+        return $values;
+    }
+}
+
+/* ------------------------------------------------------------------------- */
+
 namespace Predis\Utilities;
 
 class Shared {