Преглед изворни кода

Merge branch 'pipeline_partial_failures'

Conflicts:
	lib/Predis.php
Daniele Alessandri пре 15 година
родитељ
комит
6c512784cb
2 измењених фајлова са 170 додато и 26 уклоњено
  1. 18 0
      CHANGELOG
  2. 152 26
      lib/Predis.php

+ 18 - 0
CHANGELOG

@@ -67,6 +67,24 @@ v0.6.0
   * CommandPipeline and MultiExecBlock return their instances when invoking 
     commands, thus allowing method chaining in pipelines and multi-exec blocks.
 
+  * Introduced the Predis\Pipeline\IExecutor interface. Classes implementing 
+    this interface are now used internally by the Predis\CommandPipeline class 
+    to change the behaviour of the pipeline when writing/reading commands from 
+    one or multiple servers. Here is the list of the default executors:
+      - Predis\Pipeline\StandardExecutor
+        Exceptions generated by server errors might be thrown depending on the 
+        options passed to the client (see "throw_on_error"). Instead, protocol 
+        or network errors always throw exceptions. This is the default executor 
+        for single and clustered connections and shares the same behaviour of 
+        Predis 0.5.x.
+      - Predis\Pipeline\SafeExecutor
+        Exceptions generated by server, protocol or network errors are not 
+        thrown, instead they are returned in the response array as instances of 
+        ResponseError or CommunicationException.
+      - Predis\Pipeline\SafeClusterExecutor
+        This executor shares the same behaviour of Predis\Pipeline\SafeExecutor 
+        but it is geared towards clustered connections.
+
   * MultiExecBlock instances can handle the new DISCARD command.
 
   * The GET parameter for the SORT command now accepts also multiple key 

+ 152 - 26
lib/Predis.php

@@ -208,8 +208,20 @@ class Client {
     }
 
     public function pipeline($pipelineBlock = null) {
-        $pipeline = new CommandPipeline($this);
-        return $pipelineBlock !== null ? $pipeline->execute($pipelineBlock) : $pipeline;
+        return $this->pipelineExecute(new CommandPipeline($this), $pipelineBlock);
+    }
+
+    public function pipelineSafe($pipelineBlock = null) {
+        $connection = $this->getConnection();
+        $pipeline   = new CommandPipeline($this, $connection instanceof Connection
+            ? new Pipeline\SafeExecutor($connection)
+            : new Pipeline\SafeClusterExecutor($connection)
+        );
+        return $this->pipelineExecute($pipeline, $pipelineBlock);
+    }
+
+    private function pipelineExecute(CommandPipeline $pipeline, $block) {
+        return $block !== null ? $pipeline->execute($block) : $pipeline;
     }
 
     public function multiExec($multiExecBlock = null) {
@@ -663,10 +675,11 @@ class ResponseQueued {
 /* ------------------------------------------------------------------------- */
 
 class CommandPipeline {
-    private $_redisClient, $_pipelineBuffer, $_returnValues, $_running;
+    private $_redisClient, $_pipelineBuffer, $_returnValues, $_running, $_executor;
 
-    public function __construct(Client $redisClient) {
+    public function __construct(Client $redisClient, Pipeline\IPipelineExecutor $executor = null) {
         $this->_redisClient    = $redisClient;
+        $this->_executor       = $executor ?: new Pipeline\StandardExecutor();
         $this->_pipelineBuffer = array();
         $this->_returnValues   = array();
     }
@@ -686,27 +699,14 @@ class CommandPipeline {
     }
 
     public function flushPipeline() {
-        $sizeofPipe = count($this->_pipelineBuffer);
-        if ($sizeofPipe === 0) {
-            return;
-        }
-
-        $connection = $this->_redisClient->getConnection();
-        $commands   = &$this->_pipelineBuffer;
-
-        foreach ($commands as $command) {
-            $connection->writeCommand($command);
-        }
-        for ($i = 0; $i < $sizeofPipe; $i++) {
-            $response = $connection->readResponse($commands[$i]);
-            $this->_returnValues[] = ($response instanceof \Iterator
-                ? iterator_to_array($response)
-                : $response
+        if (count($this->_pipelineBuffer) > 0) {
+            $connection = $this->_redisClient->getConnection();
+            $this->_returnValues = array_merge(
+                $this->_returnValues, 
+                $this->_executor->execute($connection, $this->_pipelineBuffer)
             );
-            unset($commands[$i]);
+            $this->_pipelineBuffer = array();
         }
-        $this->_pipelineBuffer = array();
-
         return $this;
     }
 
@@ -714,7 +714,6 @@ class CommandPipeline {
         if ($bool == true && $this->_running == true) {
             throw new ClientException("This pipeline is already opened");
         }
-
         $this->_running = $bool;
     }
 
@@ -731,13 +730,12 @@ class CommandPipeline {
             if ($block !== null) {
                 $block($this);
             }
-            $this->flushPipeline();
         }
         catch (\Exception $exception) {
-            // TODO: client/server desync on ServerException
             $pipelineBlockException = $exception;
         }
 
+        $this->flushPipeline();
         $this->setRunning(false);
 
         if ($pipelineBlockException !== null) {
@@ -1651,6 +1649,134 @@ class RedisServer_vNext extends RedisServer_v2_0 {
 
 /* ------------------------------------------------------------------------- */
 
+namespace Predis\Pipeline;
+
+interface IPipelineExecutor {
+    public function execute(\Predis\IConnection $connection, &$commands);
+}
+
+class StandardExecutor implements IPipelineExecutor {
+    public function execute(\Predis\IConnection $connection, &$commands) {
+        $sizeofPipe = count($commands);
+        $values = array();
+
+        foreach ($commands as $command) {
+            $connection->writeCommand($command);
+        }
+        try {
+            for ($i = 0; $i < $sizeofPipe; $i++) {
+                $response = $connection->readResponse($commands[$i]);
+                $values[] = $response instanceof \Iterator
+                    ? iterator_to_array($response)
+                    : $response;
+                unset($commands[$i]);
+            }
+        }
+        catch (\Predis\ServerException $exception) {
+            // force disconnection to prevent protocol desynchronization
+            $connection->disconnect();
+            throw $exception;
+        }
+
+        return $values;
+    }
+}
+
+class SafeExecutor implements IPipelineExecutor {
+    public function execute(\Predis\IConnection $connection, &$commands) {
+        $firstServerException = null;
+        $sizeofPipe = count($commands);
+        $values = array();
+
+        foreach ($commands as $command) {
+            try {
+                $connection->writeCommand($command);
+            }
+            catch (\Predis\CommunicationException $exception) {
+                return array_fill(0, $sizeofPipe, $exception);
+            }
+        }
+
+        for ($i = 0; $i < $sizeofPipe; $i++) {
+            $command = $commands[$i];
+            unset($commands[$i]);
+            try {
+                $response = $connection->readResponse($command);
+                $values[] = ($response instanceof \Iterator
+                    ? iterator_to_array($response)
+                    : $response
+                );
+            }
+            catch (\Predis\ServerException $exception) {
+                $values[] = $exception->toResponseError();
+            }
+            catch (\Predis\CommunicationException $exception) {
+                if ($throwExceptions) {
+                    throw $exception;
+                }
+                $toAdd  = count($commands) - count($values);
+                $values = array_merge($values, array_fill(0, $toAdd, $exception));
+                break;
+            }
+        }
+
+        return $values;
+    }
+}
+
+class SafeClusterExecutor implements IPipelineExecutor {
+    public function execute(\Predis\IConnection $connection, &$commands) {
+        $connectionExceptions = array();
+        $sizeofPipe = count($commands);
+        $values = array();
+
+        foreach ($commands as $command) {
+            $cmdConnection = $connection->getConnection($command);
+            if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) {
+                continue;
+            }
+            try {
+                $cmdConnection->writeCommand($command);
+            }
+            catch (\Predis\CommunicationException $exception) {
+                $connectionExceptions[spl_object_hash($cmdConnection)] = $exception;
+            }
+        }
+
+        for ($i = 0; $i < $sizeofPipe; $i++) {
+            $command = $commands[$i];
+            unset($commands[$i]);
+
+            $cmdConnection = $connection->getConnection($command);
+            $connectionObjectHash = spl_object_hash($cmdConnection);
+
+            if (isset($connectionExceptions[$connectionObjectHash])) {
+                $values[] = $connectionExceptions[$connectionObjectHash];
+                continue;
+            }
+
+            try {
+                $response = $cmdConnection->readResponse($command);
+                $values[] = ($response instanceof \Iterator
+                    ? iterator_to_array($response)
+                    : $response
+                );
+            }
+            catch (\Predis\ServerException $exception) {
+                $values[] = $exception->toResponseError();
+            }
+            catch (\Predis\CommunicationException $exception) {
+                $values[] = $exception;
+                $connectionExceptions[$connectionObjectHash] = $exception;
+            }
+        }
+
+        return $values;
+    }
+}
+
+/* ------------------------------------------------------------------------- */
+
 namespace Predis\Distribution;
 
 interface IDistributionAlgorithm {