Kaynağa Gözat

Add a pipeline executor that internally uses MULTI / EXEC.

The actual pipeline is wrapped between MULTI and EXEC to ensure that all the
commands are correctly sent and executed on the server. The whole pipeline is
discarded should it fail at a certain point during execution.

We do not use our Predis\Transaction\MultiExecContext abstraction internally
but rely on raw commands since this new executor does not really need full
support for Redis transactions.
Daniele Alessandri 13 yıl önce
ebeveyn
işleme
56a3e327fc
2 değiştirilmiş dosya ile 126 ekleme ve 0 silme
  1. 4 0
      CHANGELOG.md
  2. 122 0
      lib/Predis/Pipeline/MultiExecExecutor.php

+ 4 - 0
CHANGELOG.md

@@ -31,6 +31,10 @@ v0.8.0 (201x-xx-xx)
   at the cost of bringing a breaking change in the signature of the interface
   for pipeline executors.
 
+- Added a new pipeline executor that sends commands wrapped in a MULTI / EXEC
+  context to make the execution atomic: if a pipeline fails at a certain point
+  then the whole pipeline is discarded.
+
 - `Predis\Options\Option` is now abstract, see `Predis\Option\AbstractOption`.
 
 

+ 122 - 0
lib/Predis/Pipeline/MultiExecExecutor.php

@@ -0,0 +1,122 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Pipeline;
+
+use SplQueue;
+use Predis\ResponseErrorInterface;
+use Predis\Profile\ServerProfileInterface;
+use Predis\Connection\ConnectionInterface;
+use Predis\Connection\SingleConnectionInterface;
+use Predis\ResponseQueued;
+use Predis\ClientException;
+use Predis\ServerException;
+use Predis\Profile\ServerProfile;
+
+/**
+ * Implements a pipeline executor that wraps the whole pipeline
+ * in a MULTI / EXEC context to make sure that it is executed
+ * correctly.
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class MultiExecExecutor implements PipelineExecutorInterface
+{
+    protected $profile;
+
+    /**
+     *
+     */
+    public function __construct()
+    {
+        $this->setProfile(ServerProfile::getDefault());
+    }
+
+    /**
+     * Allows the pipeline executor to perform operations on the
+     * connection before starting to execute the commands stored
+     * in the pipeline.
+     *
+     * @param ConnectionInterface Connection instance.
+     */
+    protected function checkConnection(ConnectionInterface $connection)
+    {
+        if (!$connection instanceof SingleConnectionInterface) {
+            $class = __CLASS__;
+            throw new ClientException("$class can be used only with single connections");
+        }
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function execute(ConnectionInterface $connection, SplQueue $commands)
+    {
+        $size = count($commands);
+        $values = array();
+
+        $this->checkConnection($connection);
+
+        $cmd = $this->profile->createCommand('multi');
+        $connection->executeCommand($cmd);
+
+        foreach ($commands as $command) {
+            $connection->writeCommand($command);
+        }
+
+        foreach ($commands as $command) {
+            $response = $connection->readResponse($command);
+
+            if (!$response instanceof ResponseQueued) {
+                if ($response instanceof ResponseErrorInterface) {
+                    $cmd = $this->profile->createCommand('discard');
+                    $connection->executeCommand($cmd);
+
+                    throw new ServerException($response->getMessage());
+                }
+            }
+        }
+
+        $cmd = $this->profile->createCommand('exec');
+        $responses = $connection->executeCommand($cmd);
+
+        if (!isset($responses)) {
+            throw new ClientException('The underlying transaction has been aborted by the server');
+        }
+
+        if (count($responses) !== $size) {
+            throw new ClientException("Invalid number of replies [expected: $size - actual: ".count($responses)."]");
+        }
+
+        for ($i = 0; $i < $size; $i++) {
+            if ($response = $responses[$i] instanceof \Iterator) {
+                $response = iterator_to_array($response);
+            }
+
+            $values[$i] = $commands->dequeue()->parseResponse($responses[$i]);
+            unset($responses[$i]);
+        }
+
+        return $values;
+    }
+
+    /**
+     * @param ServerProfileInterface $profile Server profile.
+     */
+    public function setProfile(ServerProfileInterface $profile)
+    {
+        if (!$profile->supportsCommands(array('multi', 'exec', 'discard'))) {
+            throw new ClientException('The specified server profile must support MULTI, EXEC and DISCARD.');
+        }
+
+        $this->profile = $profile;
+    }
+}