Kaynağa Gözat

Merge branch 'v0.9/pipeline-refactoring'

Daniele Alessandri 11 yıl önce
ebeveyn
işleme
c047c4992a

+ 9 - 0
CHANGELOG.md

@@ -20,6 +20,15 @@ v0.9.0 (201x-xx-xx)
   using objects that responds to `__invoke()` (not all the kinds of callables)
   even for custom options defined by the user.
 
+- Removed pipeline executors, now command pipelines can be easily customized by
+  extending the standard `Predis\Pipeline\Pipeline` class. Accepted options when
+  creating a pipeline using `Predis\Client::pipeline()` are:
+
+    - `atomic`: returns a pipeline wrapped in a MULTI / EXEC transaction
+      (class: `Predis\Pipeline\Atomic`).
+    - `fire-and-forget`: returns a pipeline that does not read back responses
+      (class: `Predis\Pipeline\FireAndForget`).
+
 - Most classes and interfaces in the `Predis\Protocol` namespace have been moved
   or renamed while rationalizing the whole API of external protocol processors.
 

+ 3 - 2
examples/PipelineContext.php → examples/PipeliningCommands.php

@@ -11,8 +11,9 @@
 
 require 'SharedConfigurations.php';
 
-// When you have a whole set of consecutive commands to send to
-// a redis server, you can use a pipeline to improve performances.
+// When you have a whole set of consecutive commands to send to a redis server,
+// you can use a pipeline to dramatically improve performances. Pipelines can
+// greatly reduce the effects of network round-trips.
 
 $client = new Predis\Client($single_server);
 

+ 10 - 8
lib/Predis/Client.php

@@ -21,7 +21,7 @@ use Predis\Connection\AggregatedConnectionInterface;
 use Predis\Connection\ConnectionInterface;
 use Predis\Connection\ConnectionParametersInterface;
 use Predis\Monitor;
-use Predis\Pipeline\PipelineContext;
+use Predis\Pipeline;
 use Predis\Profile\ServerProfile;
 use Predis\PubSub;
 use Predis\Response;
@@ -355,7 +355,7 @@ class Client implements ClientInterface
      * a pipeline executed inside the optionally provided callable object.
      *
      * @param mixed $arg,... Options for the context, or a callable, or both.
-     * @return PipelineContext|array
+     * @return Pipeline\Pipeline|array
      */
     public function pipeline(/* arguments */)
     {
@@ -367,17 +367,19 @@ class Client implements ClientInterface
      *
      * @param array $options Options for the context.
      * @param mixed $callable Optional callable used to execute the context.
-     * @return PipelineContext|array
+     * @return Pipeline\Pipeline|array
      */
     protected function createPipeline(array $options = null, $callable = null)
     {
-        $executor = isset($options['executor']) ? $options['executor'] : null;
-
-        if (is_callable($executor)) {
-            $executor = call_user_func($executor, $this, $options);
+        if (isset($options['atomic']) && $options['atomic']) {
+            $class = 'Predis\Pipeline\Atomic';
+        } else if (isset($options['fire-and-forget']) && $options['fire-and-forget']) {
+            $class = 'Predis\Pipeline\FireAndForget';
+        } else {
+            $class = 'Predis\Pipeline\Pipeline';
         }
 
-        $pipeline = new PipelineContext($this, $executor);
+        $pipeline = new $class($this);
 
         if (isset($callable)) {
             return $pipeline->execute($callable);

+ 110 - 0
lib/Predis/Pipeline/Atomic.php

@@ -0,0 +1,110 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Pipeline;
+
+use SplQueue;
+use Predis\ClientException;
+use Predis\Connection\ConnectionInterface;
+use Predis\Connection\SingleConnectionInterface;
+use Predis\Profile\ServerProfileInterface;
+use Predis\Response;
+
+/**
+ * Command pipeline wrapped into a MULTI / EXEC transaction.
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class Atomic extends Pipeline
+{
+    /**
+     * {@inheritdoc}
+     */
+    protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
+    {
+        $profile = $this->getClient()->getProfile();
+        $this->check($connection, $profile);
+
+        $connection->executeCommand($profile->createCommand('multi'));
+
+        foreach ($commands as $command) {
+            $connection->writeCommand($command);
+        }
+
+        foreach ($commands as $command) {
+            $response = $connection->readResponse($command);
+
+            if ($response instanceof Response\ErrorInterface) {
+                $connection->executeCommand($profile->createCommand('discard'));
+                throw new Response\ServerException($response->getMessage());
+            }
+        }
+
+        $executed = $connection->executeCommand($profile->createCommand('exec'));
+
+        if (!isset($executed)) {
+            // TODO: should be throwing a more appropriate exception.
+            throw new ClientException(
+                'The underlying transaction has been aborted by the server'
+            );
+        }
+
+        if (count($executed) !== count($commands)) {
+            throw new ClientException(
+                "Invalid number of replies [expected: ".count($commands)." - actual: ".count($executed)."]"
+            );
+        }
+
+        $responses  = array();
+        $sizeOfPipe = count($commands);
+        $exceptions = $this->throwServerExceptions();
+
+        for ($i = 0; $i < $sizeOfPipe; $i++) {
+            $command  = $commands->dequeue();
+            $response = $executed[$i];
+
+            if (!$response instanceof Response\ObjectInterface) {
+                $responses[] = $command->parseResponse($response);
+            } else if ($response instanceof Response\ErrorInterface && $exceptions) {
+                $this->exception($connection, $response);
+            } else {
+                $responses[] = $response;
+            }
+
+            unset($executed[$i]);
+        }
+
+        return $responses;
+    }
+
+    /**
+     * Verifies all the needed preconditions before executing the pipeline.
+     *
+     * @param ConnectionInterface $connection Connection instance.
+     * @param ServerProfileInterface $profile Server profile.
+     */
+    protected function check(ConnectionInterface $connection, ServerProfileInterface $profile)
+    {
+        if (!$connection instanceof SingleConnectionInterface) {
+            $class = __CLASS__;
+
+            throw new ClientException(
+                "$class can be used only with connections to single nodes"
+            );
+        }
+
+        if (!$profile->supportsCommands(array('multi', 'exec', 'discard'))) {
+            throw new ClientException(
+                'The specified server profile must support MULTI, EXEC and DISCARD.'
+            );
+        }
+    }
+}

+ 120 - 0
lib/Predis/Pipeline/ConnectionErrorProof.php

@@ -0,0 +1,120 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Pipeline;
+
+use SplQueue;
+use Predis\NotSupportedException;
+use Predis\CommunicationException;
+use Predis\Connection\ClusterConnectionInterface;
+use Predis\Connection\ConnectionInterface;
+use Predis\Connection\SingleConnectionInterface;
+
+/**
+ * Command pipeline that does not throw exceptions on connection errors, but
+ * returns the exception instances as the rest of the response elements.
+ *
+ * @todo Awful naming!
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class ConnectionErrorProof extends Pipeline
+{
+    /**
+     * {@inheritdoc}
+     */
+    protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
+    {
+        if ($connection instanceof SingleConnectionInterface) {
+            return $this->executePipelineNode($connection, $commands);
+        } else if ($connection instanceof ClusterConnectionInterface) {
+            return $this->executePipelineCluster($connection, $commands);
+        } else {
+            throw new NotSupportedException("Unsupported connection type");
+        }
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function executePipelineNode(SingleConnectionInterface $connection, SplQueue $commands)
+    {
+        $responses  = array();
+        $sizeOfPipe = count($commands);
+
+        foreach ($commands as $command) {
+            try {
+                $connection->writeCommand($command);
+            } catch (CommunicationException $exception) {
+                return array_fill(0, $sizeOfPipe, $exception);
+            }
+        }
+
+        for ($i = 0; $i < $sizeOfPipe; $i++) {
+            $command = $commands->dequeue();
+
+            try {
+                $responses[$i] = $connection->readResponse($command);
+            } catch (CommunicationException $exception) {
+                $add = count($commands) - count($responses);
+                $responses = array_merge($responses, array_fill(0, $add, $exception));
+
+                break;
+            }
+        }
+
+        return $responses;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function executePipelineCluster(ClusterConnectionInterface $connection, SplQueue $commands)
+    {
+        $responses  = array();
+        $sizeOfPipe = count($commands);
+        $exceptions = array();
+
+        foreach ($commands as $command) {
+            $cmdConnection = $connection->getConnection($command);
+
+            if (isset($exceptions[spl_object_hash($cmdConnection)])) {
+                continue;
+            }
+
+            try {
+                $cmdConnection->writeCommand($command);
+            } catch (CommunicationException $exception) {
+                $exceptions[spl_object_hash($cmdConnection)] = $exception;
+            }
+        }
+
+        for ($i = 0; $i < $sizeOfPipe; $i++) {
+            $command = $commands->dequeue();
+
+            $cmdConnection = $connection->getConnection($command);
+            $connectionHash = spl_object_hash($cmdConnection);
+
+            if (isset($exceptions[$connectionHash])) {
+                $responses[$i] = $exceptions[$connectionHash];
+                continue;
+            }
+
+            try {
+                $responses[$i] = $cmdConnection->readResponse($command);
+            } catch (CommunicationException $exception) {
+                $responses[$i] = $exception;
+                $exceptions[$connectionHash] = $exception;
+            }
+        }
+
+        return $responses;
+    }
+}

+ 4 - 17
lib/Predis/Pipeline/FireAndForgetExecutor.php → lib/Predis/Pipeline/FireAndForget.php

@@ -16,33 +16,20 @@ use Predis\Connection\ConnectionInterface;
 use Predis\Connection\ReplicationConnectionInterface;
 
 /**
- * Implements a pipeline executor strategy that writes a list of commands to
- * the connection object but does not read back their replies.
+ * Command pipeline that writes commands to the servers but discards responses.
  *
  * @author Daniele Alessandri <suppakilla@gmail.com>
  */
-class FireAndForgetExecutor implements PipelineExecutorInterface
+class FireAndForget extends Pipeline
 {
     /**
-     * Allows the pipeline executor to perform operations on the
-     * connection before starting to execute the commands stored
-     * in the pipeline.
-     *
-     * @param ConnectionInterface $connection Connection instance.
+     * {@inheritdoc}
      */
-    protected function checkConnection(ConnectionInterface $connection)
+    protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
     {
         if ($connection instanceof ReplicationConnectionInterface) {
             $connection->switchTo('master');
         }
-    }
-
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands)
-    {
-        $this->checkConnection($connection);
 
         while (!$commands->isEmpty()) {
             $connection->writeCommand($commands->dequeue());

+ 0 - 134
lib/Predis/Pipeline/MultiExecExecutor.php

@@ -1,134 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\ClientException;
-use Predis\Connection\ConnectionInterface;
-use Predis\Connection\SingleConnectionInterface;
-use Predis\Profile\ServerProfile;
-use Predis\Profile\ServerProfileInterface;
-use Predis\Response;
-
-/**
- * Implements a pipeline executor that wraps the whole pipeline
- * in a MULTI / EXEC context to make sure that it is executed
- * correctly.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-class MultiExecExecutor implements PipelineExecutorInterface
-{
-    protected $profile;
-
-    /**
-     *
-     */
-    public function __construct(ServerProfileInterface $profile = null)
-    {
-        $this->setProfile($profile ?: ServerProfile::getDefault());
-    }
-
-    /**
-     * Allows the pipeline executor to perform operations on the
-     * connection before starting to execute the commands stored
-     * in the pipeline.
-     *
-     * @param ConnectionInterface $connection Connection instance.
-     */
-    protected function checkConnection(ConnectionInterface $connection)
-    {
-        if (!$connection instanceof SingleConnectionInterface) {
-            $class = __CLASS__;
-            throw new ClientException("$class can be used only with single connections");
-        }
-    }
-
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands)
-    {
-        $this->checkConnection($connection);
-
-        $cmd = $this->profile->createCommand('multi');
-        $connection->executeCommand($cmd);
-
-        foreach ($commands as $command) {
-            $connection->writeCommand($command);
-        }
-
-        foreach ($commands as $command) {
-            $response = $connection->readResponse($command);
-
-            if ($response instanceof Response\ErrorInterface) {
-                $cmd = $this->profile->createCommand('discard');
-                $connection->executeCommand($cmd);
-
-                throw new Response\ServerException($response->getMessage());
-            }
-        }
-
-        $cmd = $this->profile->createCommand('exec');
-        $responses = $connection->executeCommand($cmd);
-
-        if (!isset($responses)) {
-            throw new ClientException('The underlying transaction has been aborted by the server');
-        }
-
-        if (count($responses) !== count($commands)) {
-            throw new ClientException("Invalid number of replies [expected: ".count($commands)." - actual: ".count($responses)."]");
-        }
-
-        return $this->consumeArrayResponse($commands, $responses);
-    }
-
-    /**
-     * Consumes an array response returned by EXEC.
-     *
-     * @param SplQueue $commands Pipelined commands
-     * @param array $responses Responses returned by EXEC.
-     * @return array
-     */
-    protected function consumeArrayResponse(SplQueue $commands, array &$responses)
-    {
-        $size = count($commands);
-        $values = array();
-
-        for ($i = 0; $i < $size; $i++) {
-            $command = $commands->dequeue();
-            $response = $responses[$i];
-
-            if ($response instanceof Response\ObjectInterface) {
-                $values[$i] = $response;
-            } else {
-                $values[$i] = $command->parseResponse($response);
-            }
-
-            unset($responses[$i]);
-        }
-
-        return $values;
-    }
-
-    /**
-     * @param ServerProfileInterface $profile Server profile.
-     */
-    public function setProfile(ServerProfileInterface $profile)
-    {
-        if (!$profile->supportsCommands(array('multi', 'exec', 'discard'))) {
-            throw new ClientException('The specified server profile must support MULTI, EXEC and DISCARD.');
-        }
-
-        $this->profile = $profile;
-    }
-}

+ 223 - 0
lib/Predis/Pipeline/Pipeline.php

@@ -0,0 +1,223 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Pipeline;
+
+use Exception;
+use InvalidArgumentException;
+use SplQueue;
+use Predis\BasicClientInterface;
+use Predis\ClientException;
+use Predis\ClientInterface;
+use Predis\ExecutableContextInterface;
+use Predis\Command\CommandInterface;
+use Predis\Connection\ConnectionInterface;
+use Predis\Connection\ReplicationConnectionInterface;
+use Predis\Response;
+
+/**
+ * Implementation of a command pipeline in which write and read operations of
+ * Redis commands are pipelined to alleviate the effects of network round-trips.
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class Pipeline implements BasicClientInterface, ExecutableContextInterface
+{
+    private $client;
+    private $pipeline;
+
+    private $responses = array();
+    private $running = false;
+
+    /**
+     * @param ClientInterface $client Client instance used by the context.
+     */
+    public function __construct(ClientInterface $client)
+    {
+        $this->client = $client;
+        $this->pipeline = new SplQueue();
+    }
+
+    /**
+     * Queues a command into the pipeline buffer.
+     *
+     * @param string $method Command ID.
+     * @param array $arguments Arguments for the command.
+     * @return Pipeline
+     */
+    public function __call($method, $arguments)
+    {
+        $command = $this->client->createCommand($method, $arguments);
+        $this->recordCommand($command);
+
+        return $this;
+    }
+
+    /**
+     * Queues a command instance into the pipeline buffer.
+     *
+     * @param CommandInterface $command Command to queue in the buffer.
+     */
+    protected function recordCommand(CommandInterface $command)
+    {
+        $this->pipeline->enqueue($command);
+    }
+
+    /**
+     * Queues a command instance into the pipeline buffer.
+     *
+     * @param CommandInterface $command Command to queue in the buffer.
+     */
+    public function executeCommand(CommandInterface $command)
+    {
+        $this->recordCommand($command);
+    }
+
+    /**
+     * Throws and exception on -ERR responses returned by Redis.
+     *
+     * @param ConnectionInterface $connection The connection that returned the error.
+     * @param Response\ErrorInterface $response The error response instance.
+     */
+    protected function exception(ConnectionInterface $connection, Response\ErrorInterface $response)
+    {
+        $connection->disconnect();
+        $message = $response->getMessage();
+
+        throw new Response\ServerException($message);
+    }
+
+    /**
+     * Implements the logic to flush the queued commands and read the responses
+     * from the current connection.
+     *
+     * @param ConnectionInterface $connection Current connection instance.
+     * @param SplQueue $commands Queued commands.
+     * @return array
+     */
+    protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
+    {
+        if ($connection instanceof ReplicationConnectionInterface) {
+            $connection->switchTo('master');
+        }
+
+        foreach ($commands as $command) {
+            $connection->writeCommand($command);
+        }
+
+        $responses  = array();
+        $exceptions = $this->throwServerExceptions();
+
+        while (!$commands->isEmpty()) {
+            $command  = $commands->dequeue();
+            $response = $connection->readResponse($command);
+
+            if (!$response instanceof Response\ObjectInterface) {
+                $responses[] = $command->parseResponse($response);
+            } else if ($response instanceof Response\ErrorInterface && $exceptions) {
+                $this->exception($connection, $response);
+            } else {
+                $responses[] = $response;
+            }
+        }
+
+        return $responses;
+    }
+
+    /**
+     * Flushes the buffer that holds the queued commands.
+     *
+     * @param bool $send Specifies if the commands in the buffer should be sent to Redis.
+     * @return Pipeline
+     */
+    public function flushPipeline($send = true)
+    {
+        if ($send && !$this->pipeline->isEmpty()) {
+            $connection = $this->client->getConnection();
+            $responses = $this->executePipeline($connection, $this->pipeline);
+
+            $this->responses = array_merge($this->responses, $responses);
+        } else {
+            $this->pipeline = new SplQueue();
+        }
+
+        return $this;
+    }
+
+    /**
+     * Marks the running status of the pipeline.
+     *
+     * @param bool $bool Sets the running status of the pipeline.
+     */
+    private function setRunning($bool)
+    {
+        if ($bool && $this->running) {
+            throw new ClientException('This pipeline is already opened');
+        }
+
+        $this->running = $bool;
+    }
+
+    /**
+     * Handles the actual execution of the whole pipeline.
+     *
+     * @param mixed $callable Optional callback for execution.
+     * @return array
+     */
+    public function execute($callable = null)
+    {
+        if ($callable && !is_callable($callable)) {
+            throw new InvalidArgumentException('Argument passed must be a callable object');
+        }
+
+        $exception = null;
+        $this->setRunning(true);
+
+        try {
+            if ($callable) {
+                call_user_func($callable, $this);
+            }
+
+            $this->flushPipeline();
+        } catch (Exception $exception) {
+            // NOOP
+        }
+
+        $this->setRunning(false);
+
+        if ($exception) {
+            throw $exception;
+        }
+
+        return $this->responses;
+    }
+
+    /**
+     * Returns if the pipeline should throw exceptions on server errors.
+     *
+     * @todo Awful naming...
+     * @return bool
+     */
+    protected function throwServerExceptions()
+    {
+        return (bool) $this->client->getOptions()->exceptions;
+    }
+
+    /**
+     * Returns the underlying client instance used by the pipeline object.
+     *
+     * @return ClientInterface
+     */
+    public function getClient()
+    {
+        return $this->client;
+    }
+}

+ 0 - 186
lib/Predis/Pipeline/PipelineContext.php

@@ -1,186 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\BasicClientInterface;
-use Predis\ClientException;
-use Predis\ClientInterface;
-use Predis\ExecutableContextInterface;
-use Predis\Command\CommandInterface;
-
-/**
- * Abstraction of a pipeline context where write and read operations
- * of commands and their replies over the network are pipelined.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-class PipelineContext implements BasicClientInterface, ExecutableContextInterface
-{
-    private $client;
-    private $executor;
-    private $pipeline;
-
-    private $replies = array();
-    private $running = false;
-
-    /**
-     * @param ClientInterface $client Client instance used by the context.
-     * @param PipelineExecutorInterface $executor Pipeline executor instace.
-     */
-    public function __construct(ClientInterface $client, PipelineExecutorInterface $executor = null)
-    {
-        $this->client = $client;
-        $this->executor = $executor ?: $this->createExecutor($client);
-        $this->pipeline = new SplQueue();
-    }
-
-    /**
-     * Returns a pipeline executor depending on the kind of the underlying
-     * connection and the passed options.
-     *
-     * @param ClientInterface $client Client instance used by the context.
-     * @return PipelineExecutorInterface
-     */
-    protected function createExecutor(ClientInterface $client)
-    {
-        $options = $client->getOptions();
-
-        if (isset($options->exceptions)) {
-            return new StandardExecutor($options->exceptions);
-        }
-
-        return new StandardExecutor();
-    }
-
-    /**
-     * Queues a command into the pipeline buffer.
-     *
-     * @param string $method Command ID.
-     * @param array $arguments Arguments for the command.
-     * @return PipelineContext
-     */
-    public function __call($method, $arguments)
-    {
-        $command = $this->client->createCommand($method, $arguments);
-        $this->recordCommand($command);
-
-        return $this;
-    }
-
-    /**
-     * Queues a command instance into the pipeline buffer.
-     *
-     * @param CommandInterface $command Command to queue in the buffer.
-     */
-    protected function recordCommand(CommandInterface $command)
-    {
-        $this->pipeline->enqueue($command);
-    }
-
-    /**
-     * Queues a command instance into the pipeline buffer.
-     *
-     * @param CommandInterface $command Command to queue in the buffer.
-     */
-    public function executeCommand(CommandInterface $command)
-    {
-        $this->recordCommand($command);
-    }
-
-    /**
-     * Flushes the buffer that holds the queued commands.
-     *
-     * @param Boolean $send Specifies if the commands in the buffer should be sent to Redis.
-     * @return PipelineContext
-     */
-    public function flushPipeline($send = true)
-    {
-        if ($send && !$this->pipeline->isEmpty()) {
-            $connection = $this->client->getConnection();
-            $replies = $this->executor->execute($connection, $this->pipeline);
-            $this->replies = array_merge($this->replies, $replies);
-        } else {
-            $this->pipeline = new SplQueue();
-        }
-
-        return $this;
-    }
-
-    /**
-     * Marks the running status of the pipeline.
-     *
-     * @param Boolean $bool True if the pipeline is running.
-     *                      False if the pipeline is not running.
-     */
-    private function setRunning($bool)
-    {
-        if ($bool === true && $this->running === true) {
-            throw new ClientException("This pipeline is already opened");
-        }
-
-        $this->running = $bool;
-    }
-
-    /**
-     * Handles the actual execution of the whole pipeline.
-     *
-     * @param mixed $callable Optional callback for execution.
-     * @return array
-     */
-    public function execute($callable = null)
-    {
-        if ($callable && !is_callable($callable)) {
-            throw new \InvalidArgumentException('Argument passed must be a callable object');
-        }
-
-        $this->setRunning(true);
-        $pipelineBlockException = null;
-
-        try {
-            if ($callable !== null) {
-                call_user_func($callable, $this);
-            }
-            $this->flushPipeline();
-        } catch (\Exception $exception) {
-            $pipelineBlockException = $exception;
-        }
-
-        $this->setRunning(false);
-
-        if ($pipelineBlockException !== null) {
-            throw $pipelineBlockException;
-        }
-
-        return $this->replies;
-    }
-
-    /**
-     * Returns the underlying client instance used by the pipeline object.
-     *
-     * @return ClientInterface
-     */
-    public function getClient()
-    {
-        return $this->client;
-    }
-
-    /**
-     * Returns the underlying pipeline executor used by the pipeline object.
-     *
-     * @return PipelineExecutorInterface
-     */
-    public function getExecutor()
-    {
-        return $this->executor;
-    }
-}

+ 0 - 33
lib/Predis/Pipeline/PipelineExecutorInterface.php

@@ -1,33 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\Connection\ConnectionInterface;
-
-/**
- * Defines a strategy to write a list of commands to the network
- * and read back their replies.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-interface PipelineExecutorInterface
-{
-    /**
-     * Writes a list of commands to the network and reads back their replies.
-     *
-     * @param ConnectionInterface $connection Connection to Redis.
-     * @param SplQueue $commands Commands queued for execution.
-     * @return array
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands);
-}

+ 0 - 71
lib/Predis/Pipeline/SafeClusterExecutor.php

@@ -1,71 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\CommunicationException;
-use Predis\Connection\ConnectionInterface;
-
-/**
- * Implements a pipeline executor strategy for connection clusters that does
- * not fail when an error is encountered, but adds the returned error in the
- * replies array.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-class SafeClusterExecutor implements PipelineExecutorInterface
-{
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands)
-    {
-        $size = count($commands);
-        $values = array();
-        $connectionExceptions = array();
-
-        foreach ($commands as $command) {
-            $cmdConnection = $connection->getConnection($command);
-
-            if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) {
-                continue;
-            }
-
-            try {
-                $cmdConnection->writeCommand($command);
-            } catch (CommunicationException $exception) {
-                $connectionExceptions[spl_object_hash($cmdConnection)] = $exception;
-            }
-        }
-
-        for ($i = 0; $i < $size; $i++) {
-            $command = $commands->dequeue();
-
-            $cmdConnection = $connection->getConnection($command);
-            $connectionObjectHash = spl_object_hash($cmdConnection);
-
-            if (isset($connectionExceptions[$connectionObjectHash])) {
-                $values[$i] = $connectionExceptions[$connectionObjectHash];
-                continue;
-            }
-
-            try {
-                $values[$i] = $cmdConnection->readResponse($command);
-            } catch (CommunicationException $exception) {
-                $values[$i] = $exception;
-                $connectionExceptions[$connectionObjectHash] = $exception;
-            }
-        }
-
-        return $values;
-    }
-}

+ 0 - 56
lib/Predis/Pipeline/SafeExecutor.php

@@ -1,56 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\CommunicationException;
-use Predis\Connection\ConnectionInterface;
-
-/**
- * Implements a pipeline executor strategy that does not fail when an error is
- * encountered, but adds the returned error in the replies array.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-class SafeExecutor implements PipelineExecutorInterface
-{
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands)
-    {
-        $size = count($commands);
-        $values = array();
-
-        foreach ($commands as $command) {
-            try {
-                $connection->writeCommand($command);
-            } catch (CommunicationException $exception) {
-                return array_fill(0, $size, $exception);
-            }
-        }
-
-        for ($i = 0; $i < $size; $i++) {
-            $command = $commands->dequeue();
-
-            try {
-                $values[$i] = $connection->readResponse($command);
-            } catch (CommunicationException $exception) {
-                $toAdd = count($commands) - count($values);
-                $values = array_merge($values, array_fill(0, $toAdd, $exception));
-                break;
-            }
-        }
-
-        return $values;
-    }
-}

+ 0 - 118
lib/Predis/Pipeline/StandardExecutor.php

@@ -1,118 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\Command\CommandInterface;
-use Predis\Connection\ConnectionInterface;
-use Predis\Connection\ReplicationConnectionInterface;
-use Predis\Response;
-
-/**
- * Implements the standard pipeline executor strategy used
- * to write a list of commands and read their replies over
- * a connection to Redis.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-class StandardExecutor implements PipelineExecutorInterface
-{
-    protected $exceptions;
-
-    /**
-     * @param bool $exceptions Specifies if the executor should throw exceptions on server errors.
-     */
-    public function __construct($exceptions = true)
-    {
-        $this->exceptions = (bool) $exceptions;
-    }
-
-    /**
-     * Allows the pipeline executor to perform operations on the
-     * connection before starting to execute the commands stored
-     * in the pipeline.
-     *
-     * @param ConnectionInterface $connection Connection instance.
-     */
-    protected function checkConnection(ConnectionInterface $connection)
-    {
-        if ($connection instanceof ReplicationConnectionInterface) {
-            $connection->switchTo('master');
-        }
-    }
-
-    /**
-     * Handles a response object.
-     *
-     * @param ConnectionInterface $connection
-     * @param CommandInterface $command
-     * @param Response\ObjectInterface $response
-     * @return mixed
-     */
-    protected function onResponseObject(
-        ConnectionInterface $connection,
-        CommandInterface $command,
-        Response\ObjectInterface $response)
-    {
-        if ($response instanceof Response\ErrorInterface) {
-            return $this->onResponseError($connection, $response);
-        }
-
-        return $response;
-    }
-
-    /**
-     * Handles -ERR responses returned by Redis.
-     *
-     * @param ConnectionInterface $connection The connection that returned the error.
-     * @param Response\ErrorInterface $response The error response instance.
-     */
-    protected function onResponseError(ConnectionInterface $connection, Response\ErrorInterface $response)
-    {
-        if (!$this->exceptions) {
-            return $response;
-        }
-
-        // Force disconnection to prevent protocol desynchronization.
-        $connection->disconnect();
-        $message = $response->getMessage();
-
-        throw new Response\ServerException($message);
-    }
-
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands)
-    {
-        $this->checkConnection($connection);
-
-        foreach ($commands as $command) {
-            $connection->writeCommand($command);
-        }
-
-        $values = array();
-
-        while (!$commands->isEmpty()) {
-            $command = $commands->dequeue();
-            $response = $connection->readResponse($command);
-
-            if ($response instanceof Response\ObjectInterface) {
-                $values[] = $this->onResponseObject($connection, $command, $response);
-            } else {
-                $values[] = $command->parseResponse($response);
-            }
-        }
-
-        return $values;
-    }
-}

+ 7 - 38
tests/Predis/ClientTest.php

@@ -568,28 +568,23 @@ class ClientTest extends StandardTestCase
     /**
      * @group disconnected
      */
-    public function testPipelineWithoutArgumentsReturnsPipelineContext()
+    public function testPipelineWithoutArgumentsReturnsPipeline()
     {
         $client = new Client();
 
-        $this->assertInstanceOf('Predis\Pipeline\PipelineContext', $client->pipeline());
+        $this->assertInstanceOf('Predis\Pipeline\Pipeline', $client->pipeline());
     }
 
     /**
      * @group disconnected
      */
-    public function testPipelineWithArrayReturnsPipelineContextWithOptions()
+    public function testPipelineWithArrayReturnsPipeline()
     {
         $client = new Client();
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
 
-        $options = array('executor' => $executor);
-        $this->assertInstanceOf('Predis\Pipeline\PipelineContext', $pipeline = $client->pipeline($options));
-        $this->assertSame($executor, $pipeline->getExecutor());
-
-        $options = array('executor' => function ($client, $options) use ($executor) { return $executor; });
-        $this->assertInstanceOf('Predis\Pipeline\PipelineContext', $pipeline = $client->pipeline($options));
-        $this->assertSame($executor, $pipeline->getExecutor());
+        $this->assertInstanceOf('Predis\Pipeline\Pipeline', $client->pipeline(array()));
+        $this->assertInstanceOf('Predis\Pipeline\Atomic', $client->pipeline(array('atomic' => true)));
+        $this->assertInstanceOf('Predis\Pipeline\FireAndForget', $client->pipeline(array('fire-and-forget' => true)));
     }
 
     /**
@@ -600,38 +595,12 @@ class ClientTest extends StandardTestCase
         $callable = $this->getMock('stdClass', array('__invoke'));
         $callable->expects($this->once())
                  ->method('__invoke')
-                 ->with($this->isInstanceOf('Predis\Pipeline\PipelineContext'));
+                 ->with($this->isInstanceOf('Predis\Pipeline\Pipeline'));
 
         $client = new Client();
         $client->pipeline($callable);
     }
 
-    /**
-     * @group disconnected
-     */
-    public function testPipelineWithArrayAndCallableExecutesPipelineWithOptions()
-    {
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $options = array('executor' => $executor);
-
-        $test = $this;
-        $mockCallback = function ($pipeline) use ($executor, $test) {
-            $reflection = new \ReflectionProperty($pipeline, 'executor');
-            $reflection->setAccessible(true);
-
-            $test->assertSame($executor, $reflection->getValue($pipeline));
-        };
-
-        $callable = $this->getMock('stdClass', array('__invoke'));
-        $callable->expects($this->once())
-                 ->method('__invoke')
-                 ->with($this->isInstanceOf('Predis\Pipeline\PipelineContext'))
-                 ->will($this->returnCallback($mockCallback));
-
-        $client = new Client();
-        $client->pipeline($options, $callable);
-    }
-
     /**
      * @group disconnected
      */

+ 59 - 48
tests/Predis/Pipeline/MultiExecExecutorTest.php → tests/Predis/Pipeline/AtomicTest.php

@@ -14,21 +14,19 @@ namespace Predis\Pipeline;
 use PHPUnit_Framework_TestCase as StandardTestCase;
 
 use SplQueue;
-use Predis\Profile\ServerProfile;
+use Predis\Client;
 use Predis\Response;
 
 /**
  *
  */
-class MultiExecExecutorTest extends StandardTestCase
+class AtomicTest extends StandardTestCase
 {
     /**
      * @group disconnected
      */
-    public function testExecutorWithSingleConnection()
+    public function testPipelineWithSingleConnection()
     {
-        $executor = new MultiExecExecutor();
-        $pipeline = $this->getCommandsQueue();
         $queued = new Response\StatusQueued();
 
         $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
@@ -41,10 +39,13 @@ class MultiExecExecutorTest extends StandardTestCase
                    ->method('readResponse')
                    ->will($this->onConsecutiveCalls($queued, $queued, $queued));
 
-        $replies = $executor->execute($connection, $pipeline);
+        $pipeline = new Atomic(new Client($connection));
 
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertSame(array(true, true, true), $replies);
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertSame(array(true, true, true), $pipeline->execute());
     }
 
     /**
@@ -52,17 +53,20 @@ class MultiExecExecutorTest extends StandardTestCase
      * @expectedException Predis\ClientException
      * @expectedExceptionMessage The underlying transaction has been aborted by the server
      */
-    public function testExecutorWithAbortedTransaction()
+    public function testThrowsExceptionOnAbortedTransaction()
     {
-        $executor = new MultiExecExecutor();
-        $pipeline = $this->getCommandsQueue();
-
         $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
         $connection->expects($this->exactly(2))
                    ->method('executeCommand')
                    ->will($this->onConsecutiveCalls(true, null));
 
-        $executor->execute($connection, $pipeline);
+        $pipeline = new Atomic(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $pipeline->execute();
     }
 
     /**
@@ -70,10 +74,8 @@ class MultiExecExecutorTest extends StandardTestCase
      * @expectedException Predis\Response\ServerException
      * @expectedExceptionMessage ERR Test error
      */
-    public function testExecutorWithErrorInTransaction()
+    public function testPipelineWithErrorInTransaction()
     {
-        $executor = new MultiExecExecutor();
-        $pipeline = $this->getCommandsQueue();
         $queued = new Response\StatusQueued();
         $error = new Response\Error('ERR Test error');
 
@@ -88,16 +90,42 @@ class MultiExecExecutorTest extends StandardTestCase
                    ->method('executeCommand')
                    ->with($this->isInstanceOf('Predis\Command\TransactionDiscard'));
 
-        $executor->execute($connection, $pipeline);
+        $pipeline = new Atomic(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $pipeline->execute();
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException Predis\Response\ServerException
+     * @expectedExceptionMessage ERR Test error
+     */
+    public function testThrowsServerExceptionOnResponseErrorByDefault()
+    {
+        $error = new Response\Error('ERR Test error');
+
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->once())
+                   ->method('readResponse')
+                   ->will($this->returnValue($error));
+
+        $pipeline = new Atomic(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $pipeline->execute();
     }
 
     /**
      * @group disconnected
      */
-    public function testExecutorWithErrorInCommandResponse()
+    public function testReturnsResponseErrorWithClientExceptionsSetToFalse()
     {
-        $executor = new MultiExecExecutor();
-        $pipeline = $this->getCommandsQueue();
         $queued = new Response\StatusQueued();
         $error = new Response\Error('ERR Test error');
 
@@ -109,44 +137,27 @@ class MultiExecExecutorTest extends StandardTestCase
                    ->method('executeCommand')
                    ->will($this->returnValue(array('PONG', 'PONG', $error)));
 
-        $replies = $executor->execute($connection, $pipeline);
+        $pipeline = new Atomic(new Client($connection, array('exceptions' => false)));
 
-        $this->assertSame(array(true, true, $error), $replies);
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertSame(array(true, true, $error), $pipeline->execute());
     }
 
     /**
      * @group disconnected
      * @expectedException Predis\ClientException
-     * @expectedExceptionMessage Predis\Pipeline\MultiExecExecutor can be used only with single connections
+     * @expectedExceptionMessage Predis\Pipeline\Atomic can be used only with connections to single nodes
      */
     public function testExecutorWithAggregatedConnection()
     {
-        $executor = new MultiExecExecutor();
-        $pipeline = $this->getCommandsQueue();
-
-        $connection = $this->getMock('Predis\Connection\ReplicationConnectionInterface');
-
-        $replies = $executor->execute($connection, $pipeline);
-    }
-
-    // ******************************************************************** //
-    // ---- HELPER METHODS ------------------------------------------------ //
-    // ******************************************************************** //
-
-    /**
-     * Returns a list of queued command instances.
-     *
-     * @return SplQueue
-     */
-    protected function getCommandsQueue()
-    {
-        $profile = ServerProfile::getDevelopment();
+        $connection = $this->getMock('Predis\Connection\ClusterConnectionInterface');
+        $pipeline = new Atomic(new Client($connection));
 
-        $pipeline = new SplQueue();
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
+        $pipeline->ping();
 
-        return $pipeline;
+        $pipeline->execute();
     }
 }

+ 0 - 87
tests/Predis/Pipeline/FireAndForgetExecutorTest.php

@@ -1,87 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use PHPUnit_Framework_TestCase as StandardTestCase;
-
-use SplQueue;
-use Predis\Profile\ServerProfile;
-
-/**
- *
- */
-class FireAndForgetExecutorTest extends StandardTestCase
-{
-    /**
-     * @group disconnected
-     */
-    public function testExecutorWithSingleConnection()
-    {
-        $executor = new FireAndForgetExecutor();
-        $pipeline = $this->getCommandsQueue();
-
-        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
-        $connection->expects($this->exactly(3))
-                   ->method('writeCommand');
-        $connection->expects($this->never())
-                   ->method('readResponse');
-
-        $replies = $executor->execute($connection, $pipeline);
-
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertEmpty($replies);
-    }
-
-    /**
-     * @group disconnected
-     */
-    public function testExecutorWithReplicationConnection()
-    {
-        $executor = new FireAndForgetExecutor();
-        $pipeline = $this->getCommandsQueue();
-
-        $connection = $this->getMock('Predis\Connection\ReplicationConnectionInterface');
-        $connection->expects($this->once())
-                   ->method('switchTo')
-                   ->with('master');
-        $connection->expects($this->exactly(3))
-                   ->method('writeCommand');
-        $connection->expects($this->never())
-                   ->method('readResponse');
-
-        $replies = $executor->execute($connection, $pipeline);
-
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertEmpty($replies);
-    }
-
-    // ******************************************************************** //
-    // ---- HELPER METHODS ------------------------------------------------ //
-    // ******************************************************************** //
-
-    /**
-     * Returns a list of queued command instances.
-     *
-     * @return SplQueue
-     */
-    protected function getCommandsQueue()
-    {
-        $profile = ServerProfile::getDevelopment();
-
-        $pipeline = new SplQueue();
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
-
-        return $pipeline;
-    }
-}

+ 69 - 0
tests/Predis/Pipeline/FireAndForgetTest.php

@@ -0,0 +1,69 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Pipeline;
+
+use PHPUnit_Framework_TestCase as StandardTestCase;
+
+use SplQueue;
+use Predis\Client;
+use Predis\Profile\ServerProfile;
+
+/**
+ *
+ */
+class FireAndForgetTest extends StandardTestCase
+{
+    /**
+     * @group disconnected
+     */
+    public function testPipelineWithSingleConnection()
+    {
+        $profile = ServerProfile::getDefault();
+
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->exactly(3))->method('writeCommand');
+        $connection->expects($this->never())->method('readResponse');
+
+        $pipeline = new FireAndForget(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertEmpty($pipeline->execute());
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testSwitchesToMasterWithReplicationConnection()
+    {
+        $profile = ServerProfile::getDefault();
+
+        $connection = $this->getMock('Predis\Connection\ReplicationConnectionInterface');
+        $connection->expects($this->once())
+                   ->method('switchTo')
+                   ->with('master');
+        $connection->expects($this->exactly(3))
+                   ->method('writeCommand');
+        $connection->expects($this->never())
+                   ->method('readResponse');
+
+        $pipeline = new FireAndForget(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertEmpty($pipeline->execute());
+    }
+}

+ 115 - 39
tests/Predis/Pipeline/PipelineContextTest.php → tests/Predis/Pipeline/PipelineTest.php

@@ -16,34 +16,22 @@ use PHPUnit_Framework_TestCase as StandardTestCase;
 use Predis\Client;
 use Predis\ClientException;
 use Predis\Profile\ServerProfile;
+use Predis\Response;
 
 /**
  *
  */
-class PipelineContextTest extends StandardTestCase
+class PipelineTest extends StandardTestCase
 {
     /**
      * @group disconnected
      */
-    public function testConstructorWithoutOptions()
+    public function testConstructor()
     {
         $client = new Client();
-        $pipeline = new PipelineContext($client);
+        $pipeline = new Pipeline($client);
 
         $this->assertSame($client, $pipeline->getClient());
-        $this->assertInstanceOf('Predis\Pipeline\StandardExecutor', $pipeline->getExecutor());
-    }
-
-    /**
-     * @group disconnected
-     */
-    public function testConstructorWithExecutorArgument()
-    {
-        $client = new Client();
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-
-        $pipeline = new PipelineContext($client, $executor);
-        $this->assertSame($executor, $pipeline->getExecutor());
     }
 
     /**
@@ -51,10 +39,11 @@ class PipelineContextTest extends StandardTestCase
      */
     public function testCallDoesNotSendCommandsWithoutExecute()
     {
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $executor->expects($this->never())->method('executor');
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->never())->method('writeCommand');
+        $connection->expects($this->never())->method('readResponse');
 
-        $pipeline = new PipelineContext(new Client(), $executor);
+        $pipeline = new Pipeline(new Client($connection));
 
         $pipeline->echo('one');
         $pipeline->echo('two');
@@ -66,15 +55,79 @@ class PipelineContextTest extends StandardTestCase
      */
     public function testCallReturnsPipelineForFluentInterface()
     {
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $executor->expects($this->never())->method('executor');
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->never())->method('writeCommand');
+        $connection->expects($this->never())->method('readResponse');
 
-        $pipeline = new PipelineContext(new Client(), $executor);
+        $pipeline = new Pipeline(new Client($connection));
 
         $this->assertSame($pipeline, $pipeline->echo('one'));
         $this->assertSame($pipeline, $pipeline->echo('one')->echo('two')->echo('three'));
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testDoesNotParseComplexResponseObjects()
+    {
+        $object = $this->getMock('Predis\Response\ObjectInterface');
+
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->once())
+                   ->method('readResponse')
+                   ->will($this->returnValue($object));
+
+        $pipeline = new Pipeline(new Client($connection));
+
+        $pipeline->ping();
+
+        $this->assertSame(array($object), $pipeline->execute());
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException Predis\Response\ServerException
+     * @expectedExceptionMessage ERR Test error
+     */
+    public function testThrowsServerExceptionOnResponseErrorByDefault()
+    {
+        $error = new Response\Error('ERR Test error');
+
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->once())
+                   ->method('readResponse')
+                   ->will($this->returnValue($error));
+
+        $pipeline = new Pipeline(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $pipeline->execute();
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testReturnsResponseErrorWithClientExceptionsSetToFalse()
+    {
+        $error = $this->getMock('Predis\Response\ErrorInterface');
+
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->exactly(2))
+                   ->method('readResponse')
+                   ->will($this->returnValue($error));
+
+        $client = new Client($connection, array('exceptions' => false));
+
+        $pipeline = new Pipeline($client);
+
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertSame(array($error, $error), $pipeline->execute());
+    }
+
     /**
      * @group disconnected
      */
@@ -82,10 +135,11 @@ class PipelineContextTest extends StandardTestCase
     {
         $profile = ServerProfile::getDefault();
 
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $executor->expects($this->never())->method('executor');
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->never())->method('writeCommand');
+        $connection->expects($this->never())->method('readResponse');
 
-        $pipeline = new PipelineContext(new Client(), $executor);
+        $pipeline = new Pipeline(new Client($connection));
 
         $pipeline->executeCommand($profile->createCommand('echo', array('one')));
         $pipeline->executeCommand($profile->createCommand('echo', array('two')));
@@ -97,10 +151,11 @@ class PipelineContextTest extends StandardTestCase
      */
     public function testExecuteWithEmptyBuffer()
     {
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $executor->expects($this->never())->method('executor');
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->never())->method('writeCommand');
+        $connection->expects($this->never())->method('readResponse');
 
-        $pipeline = new PipelineContext(new Client(), $executor);
+        $pipeline = new Pipeline(new Client($connection));
 
         $this->assertSame(array(), $pipeline->execute());
     }
@@ -117,7 +172,7 @@ class PipelineContextTest extends StandardTestCase
                    ->method('readResponse')
                    ->will($this->returnCallback($this->getReadCallback()));
 
-        $pipeline = new PipelineContext(new Client($connection));
+        $pipeline = new Pipeline(new Client($connection));
 
         $pipeline->echo('one');
         $pipeline->echo('two');
@@ -133,10 +188,7 @@ class PipelineContextTest extends StandardTestCase
      */
     public function testFlushWithFalseArgumentDiscardsBuffer()
     {
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $executor->expects($this->never())->method('executor');
-
-        $pipeline = new PipelineContext(new Client(), $executor);
+        $pipeline = new Pipeline(new Client());
 
         $pipeline->echo('one');
         $pipeline->echo('two');
@@ -159,7 +211,7 @@ class PipelineContextTest extends StandardTestCase
                    ->method('readResponse')
                    ->will($this->returnCallback($this->getReadCallback()));
 
-        $pipeline = new PipelineContext(new Client($connection));
+        $pipeline = new Pipeline(new Client($connection));
 
         $pipeline->echo('one');
         $pipeline->echo('two');
@@ -170,13 +222,37 @@ class PipelineContextTest extends StandardTestCase
         $this->assertSame(array('one', 'two', 'three', 'four'), $pipeline->execute());
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testSwitchesToMasterWithReplicationConnection()
+    {
+        $connection = $this->getMock('Predis\Connection\ReplicationConnectionInterface');
+        $connection->expects($this->once())
+                   ->method('switchTo')
+                   ->with('master');
+        $connection->expects($this->exactly(3))
+                   ->method('writeCommand');
+        $connection->expects($this->exactly(3))
+                   ->method('readResponse')
+                   ->will($this->returnValue('PONG'));
+
+        $pipeline = new Pipeline(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertSame(array(true, true, true), $pipeline->execute());
+    }
+
     /**
      * @group disconnected
      */
     public function testExecuteAcceptsCallableArgument()
     {
         $test = $this;
-        $pipeline = new PipelineContext(new Client());
+        $pipeline = new Pipeline(new Client());
 
         $callable = function ($pipe) use ($test, $pipeline) {
             $test->assertSame($pipeline, $pipe);
@@ -194,7 +270,7 @@ class PipelineContextTest extends StandardTestCase
     {
         $noncallable = new \stdClass();
 
-        $pipeline = new PipelineContext(new Client());
+        $pipeline = new Pipeline(new Client());
         $pipeline->execute($noncallable);
     }
 
@@ -204,7 +280,7 @@ class PipelineContextTest extends StandardTestCase
      */
     public function testExecuteInsideCallableArgumentThrowsException()
     {
-        $pipeline = new PipelineContext(new Client());
+        $pipeline = new Pipeline(new Client());
 
         $pipeline->execute(function ($pipe) {
             $pipe->execute();
@@ -223,7 +299,7 @@ class PipelineContextTest extends StandardTestCase
                    ->method('readResponse')
                    ->will($this->returnCallback($this->getReadCallback()));
 
-        $pipeline = new PipelineContext(new Client($connection));
+        $pipeline = new Pipeline(new Client($connection));
 
         $replies = $pipeline->execute(function ($pipe) {
             $pipe->echo('one');
@@ -244,7 +320,7 @@ class PipelineContextTest extends StandardTestCase
         $connection->expects($this->never())->method('writeCommand');
         $connection->expects($this->never())->method('readResponse');
 
-        $pipeline = new PipelineContext(new Client($connection));
+        $pipeline = new Pipeline(new Client($connection));
 
         $exception = null;
         $replies = null;

+ 0 - 159
tests/Predis/Pipeline/StandardExecutorTest.php

@@ -1,159 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use PHPUnit_Framework_TestCase as StandardTestCase;
-
-use SplQueue;
-use Predis\Profile\ServerProfile;
-use Predis\Response;
-
-/**
- *
- */
-class StandardExecutorTest extends StandardTestCase
-{
-    /**
-     * @group disconnected
-     */
-    public function testExecutorWithSingleConnection()
-    {
-        $executor = new StandardExecutor();
-        $pipeline = $this->getCommandsQueue();
-
-        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
-        $connection->expects($this->exactly(3))
-                   ->method('writeCommand');
-        $connection->expects($this->exactly(3))
-                   ->method('readResponse')
-                   ->will($this->returnValue('PONG'));
-
-        $replies = $executor->execute($connection, $pipeline);
-
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertSame(array(true, true, true), $replies);
-    }
-
-    /**
-     * @group disconnected
-     */
-    public function testExecutorWithReplicationConnection()
-    {
-        $executor = new StandardExecutor();
-        $pipeline = $this->getCommandsQueue();
-
-        $connection = $this->getMock('Predis\Connection\ReplicationConnectionInterface');
-        $connection->expects($this->once())
-                   ->method('switchTo')
-                   ->with('master');
-        $connection->expects($this->exactly(3))
-                   ->method('writeCommand');
-        $connection->expects($this->exactly(3))
-                   ->method('readResponse')
-                   ->will($this->returnValue('PONG'));
-
-        $replies = $executor->execute($connection, $pipeline);
-
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertSame(array(true, true, true), $replies);
-    }
-
-    /**
-     * @group disconnected
-     */
-    public function testExecutorDoesNotParseResponseObjects()
-    {
-        $executor = new StandardExecutor();
-        $response = $this->getMock('Predis\Response\ObjectInterface');
-
-        $this->simpleResponseObjectTest($executor, $response);
-    }
-
-    /**
-     * @group disconnected
-     */
-    public function testExecutorCanReturnRedisErrors()
-    {
-        $executor = new StandardExecutor(false);
-        $response = $this->getMock('Predis\Response\ErrorInterface');
-
-        $this->simpleResponseObjectTest($executor, $response);
-    }
-
-    /**
-     * @group disconnected
-     * @expectedException Predis\Response\ServerException
-     * @expectedExceptionMessage ERR Test error
-     */
-    public function testExecutorCanThrowExceptions()
-    {
-        $executor = new StandardExecutor(true);
-        $pipeline = $this->getCommandsQueue();
-        $error = new Response\Error('ERR Test error');
-
-        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
-        $connection->expects($this->once())
-                   ->method('readResponse')
-                   ->will($this->returnValue($error));
-
-        $executor->execute($connection, $pipeline);
-    }
-
-    // ******************************************************************** //
-    // ---- HELPER METHODS ------------------------------------------------ //
-    // ******************************************************************** //
-
-    /**
-     * Executes a test for the Predis\Response\ObjectInterface type.
-     *
-     * @param PipelineExecutorInterface $executor
-     * @param ResponseObjectInterface $response
-     */
-    protected function simpleResponseObjectTest(PipelineExecutorInterface $executor, Response\ObjectInterface $response)
-    {
-        $pipeline = new SplQueue();
-
-        $command = $this->getMock('Predis\Command\CommandInterface');
-        $command->expects($this->never())
-                ->method('parseResponse');
-
-        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
-        $connection->expects($this->once())
-                   ->method('writeCommand');
-        $connection->expects($this->once())
-                   ->method('readResponse')
-                   ->will($this->returnValue($response));
-
-        $pipeline->enqueue($command);
-        $replies = $executor->execute($connection, $pipeline);
-
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertSame(array($response), $replies);
-    }
-
-    /**
-     * Returns a list of queued command instances.
-     *
-     * @return SplQueue
-     */
-    protected function getCommandsQueue()
-    {
-        $profile = ServerProfile::getDevelopment();
-
-        $pipeline = new SplQueue();
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
-
-        return $pipeline;
-    }
-}