Explorar el Código

Rewrite a good chunk of the classes in the Predis\Pipeline namespace.

First of all we completely removed the concept of pipeline executors.
Now pipelines can be easily customized by extending our default class
Predis\Pipeline\Pipeline.

Tests coverage for the Predis\Pipeline namespace is decent but can be
definitely improved while test cases can be beautified.
Daniele Alessandri hace 11 años
padre
commit
8068c87e47

+ 3 - 0
CHANGELOG.md

@@ -17,6 +17,9 @@ v0.9.0 (201x-xx-xx)
   using objects that responds to `__invoke()` (not all the kinds of callables)
   even for custom options defined by the user.
 
+- Removed pipeline executors, now command pipelines can be easily customized by
+  extending the standard `Predis\Pipeline\Pipeline` class.
+
 - Most classes and interfaces in the `Predis\Protocol` namespace have been moved
   or renamed while rationalizing the whole API of external protocol processors.
 

+ 3 - 2
examples/PipelineContext.php → examples/PipeliningCommands.php

@@ -11,8 +11,9 @@
 
 require 'SharedConfigurations.php';
 
-// When you have a whole set of consecutive commands to send to
-// a redis server, you can use a pipeline to improve performances.
+// When you have a whole set of consecutive commands to send to a redis server,
+// you can use a pipeline to dramatically improve performances. Pipelines can
+// greatly reduce the effects of network round-trips.
 
 $client = new Predis\Client($single_server);
 

+ 4 - 10
lib/Predis/Client.php

@@ -21,7 +21,7 @@ use Predis\Connection\AggregatedConnectionInterface;
 use Predis\Connection\ConnectionInterface;
 use Predis\Connection\ConnectionParametersInterface;
 use Predis\Monitor;
-use Predis\Pipeline\PipelineContext;
+use Predis\Pipeline;
 use Predis\Profile\ServerProfile;
 use Predis\PubSub;
 use Predis\Response;
@@ -355,7 +355,7 @@ class Client implements ClientInterface
      * a pipeline executed inside the optionally provided callable object.
      *
      * @param mixed $arg,... Options for the context, or a callable, or both.
-     * @return PipelineContext|array
+     * @return Pipeline\Pipeline|array
      */
     public function pipeline(/* arguments */)
     {
@@ -367,17 +367,11 @@ class Client implements ClientInterface
      *
      * @param array $options Options for the context.
      * @param mixed $callable Optional callable used to execute the context.
-     * @return PipelineContext|array
+     * @return Pipeline\Pipeline|array
      */
     protected function createPipeline(array $options = null, $callable = null)
     {
-        $executor = isset($options['executor']) ? $options['executor'] : null;
-
-        if (is_callable($executor)) {
-            $executor = call_user_func($executor, $this, $options);
-        }
-
-        $pipeline = new PipelineContext($this, $executor);
+        $pipeline = new Pipeline\Pipeline($this);
 
         if (isset($callable)) {
             return $pipeline->execute($callable);

+ 110 - 0
lib/Predis/Pipeline/Atomic.php

@@ -0,0 +1,110 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Pipeline;
+
+use SplQueue;
+use Predis\ClientException;
+use Predis\Connection\ConnectionInterface;
+use Predis\Connection\SingleConnectionInterface;
+use Predis\Profile\ServerProfileInterface;
+use Predis\Response;
+
+/**
+ * Command pipeline wrapped into a MULTI / EXEC transaction.
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class Atomic extends Pipeline
+{
+    /**
+     * {@inheritdoc}
+     */
+    protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
+    {
+        $profile = $this->getClient()->getProfile();
+        $this->check($connection, $profile);
+
+        $connection->executeCommand($profile->createCommand('multi'));
+
+        foreach ($commands as $command) {
+            $connection->writeCommand($command);
+        }
+
+        foreach ($commands as $command) {
+            $response = $connection->readResponse($command);
+
+            if ($response instanceof Response\ErrorInterface) {
+                $connection->executeCommand($profile->createCommand('discard'));
+                throw new Response\ServerException($response->getMessage());
+            }
+        }
+
+        $executed = $connection->executeCommand($profile->createCommand('exec'));
+
+        if (!isset($executed)) {
+            // TODO: should be throwing a more appropriate exception.
+            throw new ClientException(
+                'The underlying transaction has been aborted by the server'
+            );
+        }
+
+        if (count($executed) !== count($commands)) {
+            throw new ClientException(
+                "Invalid number of replies [expected: ".count($commands)." - actual: ".count($executed)."]"
+            );
+        }
+
+        $responses  = array();
+        $sizeOfPipe = count($commands);
+        $exceptions = $this->throwServerExceptions();
+
+        for ($i = 0; $i < $sizeOfPipe; $i++) {
+            $command  = $commands->dequeue();
+            $response = $executed[$i];
+
+            if (!$response instanceof Response\ObjectInterface) {
+                $responses[] = $command->parseResponse($response);
+            } else if ($response instanceof Response\ErrorInterface && $exceptions) {
+                $this->exception($connection, $response);
+            } else {
+                $responses[] = $response;
+            }
+
+            unset($executed[$i]);
+        }
+
+        return $responses;
+    }
+
+    /**
+     * Verifies all the needed preconditions before executing the pipeline.
+     *
+     * @param ConnectionInterface $connection Connection instance.
+     * @param ServerProfileInterface $profile Server profile.
+     */
+    protected function check(ConnectionInterface $connection, ServerProfileInterface $profile)
+    {
+        if (!$connection instanceof SingleConnectionInterface) {
+            $class = __CLASS__;
+
+            throw new ClientException(
+                "$class can be used only with connections to single nodes"
+            );
+        }
+
+        if (!$profile->supportsCommands(array('multi', 'exec', 'discard'))) {
+            throw new ClientException(
+                'The specified server profile must support MULTI, EXEC and DISCARD.'
+            );
+        }
+    }
+}

+ 120 - 0
lib/Predis/Pipeline/ConnectionErrorProof.php

@@ -0,0 +1,120 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Pipeline;
+
+use SplQueue;
+use Predis\NotSupportedException;
+use Predis\CommunicationException;
+use Predis\Connection\ClusterConnectionInterface;
+use Predis\Connection\ConnectionInterface;
+use Predis\Connection\SingleConnectionInterface;
+
+/**
+ * Command pipeline that does not throw exceptions on connection errors, but
+ * returns the exception instances as the rest of the response elements.
+ *
+ * @todo Awful naming!
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class ConnectionErrorProof extends Pipeline
+{
+    /**
+     * {@inheritdoc}
+     */
+    protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
+    {
+        if ($connection instanceof SingleConnectionInterface) {
+            return $this->executePipelineNode($connection, $commands);
+        } else if ($connection instanceof ClusterConnectionInterface) {
+            return $this->executePipelineCluster($connection, $commands);
+        } else {
+            throw new NotSupportedException("Unsupported connection type");
+        }
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function executePipelineNode(SingleConnectionInterface $connection, SplQueue $commands)
+    {
+        $responses  = array();
+        $sizeOfPipe = count($commands);
+
+        foreach ($commands as $command) {
+            try {
+                $connection->writeCommand($command);
+            } catch (CommunicationException $exception) {
+                return array_fill(0, $sizeOfPipe, $exception);
+            }
+        }
+
+        for ($i = 0; $i < $sizeOfPipe; $i++) {
+            $command = $commands->dequeue();
+
+            try {
+                $responses[$i] = $connection->readResponse($command);
+            } catch (CommunicationException $exception) {
+                $add = count($commands) - count($responses);
+                $responses = array_merge($responses, array_fill(0, $add, $exception));
+
+                break;
+            }
+        }
+
+        return $responses;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function executePipelineCluster(ClusterConnectionInterface $connection, SplQueue $commands)
+    {
+        $responses  = array();
+        $sizeOfPipe = count($commands);
+        $exceptions = array();
+
+        foreach ($commands as $command) {
+            $cmdConnection = $connection->getConnection($command);
+
+            if (isset($exceptions[spl_object_hash($cmdConnection)])) {
+                continue;
+            }
+
+            try {
+                $cmdConnection->writeCommand($command);
+            } catch (CommunicationException $exception) {
+                $exceptions[spl_object_hash($cmdConnection)] = $exception;
+            }
+        }
+
+        for ($i = 0; $i < $sizeOfPipe; $i++) {
+            $command = $commands->dequeue();
+
+            $cmdConnection = $connection->getConnection($command);
+            $connectionHash = spl_object_hash($cmdConnection);
+
+            if (isset($exceptions[$connectionHash])) {
+                $responses[$i] = $exceptions[$connectionHash];
+                continue;
+            }
+
+            try {
+                $responses[$i] = $cmdConnection->readResponse($command);
+            } catch (CommunicationException $exception) {
+                $responses[$i] = $exception;
+                $exceptions[$connectionHash] = $exception;
+            }
+        }
+
+        return $responses;
+    }
+}

+ 4 - 17
lib/Predis/Pipeline/FireAndForgetExecutor.php → lib/Predis/Pipeline/FireAndForget.php

@@ -16,33 +16,20 @@ use Predis\Connection\ConnectionInterface;
 use Predis\Connection\ReplicationConnectionInterface;
 
 /**
- * Implements a pipeline executor strategy that writes a list of commands to
- * the connection object but does not read back their replies.
+ * Command pipeline that writes commands to the servers but discards responses.
  *
  * @author Daniele Alessandri <suppakilla@gmail.com>
  */
-class FireAndForgetExecutor implements PipelineExecutorInterface
+class FireAndForget extends Pipeline
 {
     /**
-     * Allows the pipeline executor to perform operations on the
-     * connection before starting to execute the commands stored
-     * in the pipeline.
-     *
-     * @param ConnectionInterface $connection Connection instance.
+     * {@inheritdoc}
      */
-    protected function checkConnection(ConnectionInterface $connection)
+    protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
     {
         if ($connection instanceof ReplicationConnectionInterface) {
             $connection->switchTo('master');
         }
-    }
-
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands)
-    {
-        $this->checkConnection($connection);
 
         while (!$commands->isEmpty()) {
             $connection->writeCommand($commands->dequeue());

+ 0 - 134
lib/Predis/Pipeline/MultiExecExecutor.php

@@ -1,134 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\ClientException;
-use Predis\Connection\ConnectionInterface;
-use Predis\Connection\SingleConnectionInterface;
-use Predis\Profile\ServerProfile;
-use Predis\Profile\ServerProfileInterface;
-use Predis\Response;
-
-/**
- * Implements a pipeline executor that wraps the whole pipeline
- * in a MULTI / EXEC context to make sure that it is executed
- * correctly.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-class MultiExecExecutor implements PipelineExecutorInterface
-{
-    protected $profile;
-
-    /**
-     *
-     */
-    public function __construct(ServerProfileInterface $profile = null)
-    {
-        $this->setProfile($profile ?: ServerProfile::getDefault());
-    }
-
-    /**
-     * Allows the pipeline executor to perform operations on the
-     * connection before starting to execute the commands stored
-     * in the pipeline.
-     *
-     * @param ConnectionInterface $connection Connection instance.
-     */
-    protected function checkConnection(ConnectionInterface $connection)
-    {
-        if (!$connection instanceof SingleConnectionInterface) {
-            $class = __CLASS__;
-            throw new ClientException("$class can be used only with single connections");
-        }
-    }
-
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands)
-    {
-        $this->checkConnection($connection);
-
-        $cmd = $this->profile->createCommand('multi');
-        $connection->executeCommand($cmd);
-
-        foreach ($commands as $command) {
-            $connection->writeCommand($command);
-        }
-
-        foreach ($commands as $command) {
-            $response = $connection->readResponse($command);
-
-            if ($response instanceof Response\ErrorInterface) {
-                $cmd = $this->profile->createCommand('discard');
-                $connection->executeCommand($cmd);
-
-                throw new Response\ServerException($response->getMessage());
-            }
-        }
-
-        $cmd = $this->profile->createCommand('exec');
-        $responses = $connection->executeCommand($cmd);
-
-        if (!isset($responses)) {
-            throw new ClientException('The underlying transaction has been aborted by the server');
-        }
-
-        if (count($responses) !== count($commands)) {
-            throw new ClientException("Invalid number of replies [expected: ".count($commands)." - actual: ".count($responses)."]");
-        }
-
-        return $this->consumeArrayResponse($commands, $responses);
-    }
-
-    /**
-     * Consumes an array response returned by EXEC.
-     *
-     * @param SplQueue $commands Pipelined commands
-     * @param array $responses Responses returned by EXEC.
-     * @return array
-     */
-    protected function consumeArrayResponse(SplQueue $commands, array &$responses)
-    {
-        $size = count($commands);
-        $values = array();
-
-        for ($i = 0; $i < $size; $i++) {
-            $command = $commands->dequeue();
-            $response = $responses[$i];
-
-            if ($response instanceof Response\ObjectInterface) {
-                $values[$i] = $response;
-            } else {
-                $values[$i] = $command->parseResponse($response);
-            }
-
-            unset($responses[$i]);
-        }
-
-        return $values;
-    }
-
-    /**
-     * @param ServerProfileInterface $profile Server profile.
-     */
-    public function setProfile(ServerProfileInterface $profile)
-    {
-        if (!$profile->supportsCommands(array('multi', 'exec', 'discard'))) {
-            throw new ClientException('The specified server profile must support MULTI, EXEC and DISCARD.');
-        }
-
-        $this->profile = $profile;
-    }
-}

+ 223 - 0
lib/Predis/Pipeline/Pipeline.php

@@ -0,0 +1,223 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Pipeline;
+
+use Exception;
+use InvalidArgumentException;
+use SplQueue;
+use Predis\BasicClientInterface;
+use Predis\ClientException;
+use Predis\ClientInterface;
+use Predis\ExecutableContextInterface;
+use Predis\Command\CommandInterface;
+use Predis\Connection\ConnectionInterface;
+use Predis\Connection\ReplicationConnectionInterface;
+use Predis\Response;
+
+/**
+ * Implementation of a command pipeline in which write and read operations of
+ * Redis commands are pipelined to alleviate the effects of network round-trips.
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class Pipeline implements BasicClientInterface, ExecutableContextInterface
+{
+    private $client;
+    private $pipeline;
+
+    private $responses = array();
+    private $running = false;
+
+    /**
+     * @param ClientInterface $client Client instance used by the context.
+     */
+    public function __construct(ClientInterface $client)
+    {
+        $this->client = $client;
+        $this->pipeline = new SplQueue();
+    }
+
+    /**
+     * Queues a command into the pipeline buffer.
+     *
+     * @param string $method Command ID.
+     * @param array $arguments Arguments for the command.
+     * @return Pipeline
+     */
+    public function __call($method, $arguments)
+    {
+        $command = $this->client->createCommand($method, $arguments);
+        $this->recordCommand($command);
+
+        return $this;
+    }
+
+    /**
+     * Queues a command instance into the pipeline buffer.
+     *
+     * @param CommandInterface $command Command to queue in the buffer.
+     */
+    protected function recordCommand(CommandInterface $command)
+    {
+        $this->pipeline->enqueue($command);
+    }
+
+    /**
+     * Queues a command instance into the pipeline buffer.
+     *
+     * @param CommandInterface $command Command to queue in the buffer.
+     */
+    public function executeCommand(CommandInterface $command)
+    {
+        $this->recordCommand($command);
+    }
+
+    /**
+     * Throws and exception on -ERR responses returned by Redis.
+     *
+     * @param ConnectionInterface $connection The connection that returned the error.
+     * @param Response\ErrorInterface $response The error response instance.
+     */
+    protected function exception(ConnectionInterface $connection, Response\ErrorInterface $response)
+    {
+        $connection->disconnect();
+        $message = $response->getMessage();
+
+        throw new Response\ServerException($message);
+    }
+
+    /**
+     * Implements the logic to flush the queued commands and read the responses
+     * from the current connection.
+     *
+     * @param ConnectionInterface $connection Current connection instance.
+     * @param SplQueue $commands Queued commands.
+     * @return array
+     */
+    protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
+    {
+        if ($connection instanceof ReplicationConnectionInterface) {
+            $connection->switchTo('master');
+        }
+
+        foreach ($commands as $command) {
+            $connection->writeCommand($command);
+        }
+
+        $responses  = array();
+        $exceptions = $this->throwServerExceptions();
+
+        while (!$commands->isEmpty()) {
+            $command  = $commands->dequeue();
+            $response = $connection->readResponse($command);
+
+            if (!$response instanceof Response\ObjectInterface) {
+                $responses[] = $command->parseResponse($response);
+            } else if ($response instanceof Response\ErrorInterface && $exceptions) {
+                $this->exception($connection, $response);
+            } else {
+                $responses[] = $response;
+            }
+        }
+
+        return $responses;
+    }
+
+    /**
+     * Flushes the buffer that holds the queued commands.
+     *
+     * @param bool $send Specifies if the commands in the buffer should be sent to Redis.
+     * @return Pipeline
+     */
+    public function flushPipeline($send = true)
+    {
+        if ($send && !$this->pipeline->isEmpty()) {
+            $connection = $this->client->getConnection();
+            $responses = $this->executePipeline($connection, $this->pipeline);
+
+            $this->responses = array_merge($this->responses, $responses);
+        } else {
+            $this->pipeline = new SplQueue();
+        }
+
+        return $this;
+    }
+
+    /**
+     * Marks the running status of the pipeline.
+     *
+     * @param bool $bool Sets the running status of the pipeline.
+     */
+    private function setRunning($bool)
+    {
+        if ($bool && $this->running) {
+            throw new ClientException('This pipeline is already opened');
+        }
+
+        $this->running = $bool;
+    }
+
+    /**
+     * Handles the actual execution of the whole pipeline.
+     *
+     * @param mixed $callable Optional callback for execution.
+     * @return array
+     */
+    public function execute($callable = null)
+    {
+        if ($callable && !is_callable($callable)) {
+            throw new InvalidArgumentException('Argument passed must be a callable object');
+        }
+
+        $exception = null;
+        $this->setRunning(true);
+
+        try {
+            if ($callable) {
+                call_user_func($callable, $this);
+            }
+
+            $this->flushPipeline();
+        } catch (Exception $exception) {
+            // NOOP
+        }
+
+        $this->setRunning(false);
+
+        if ($exception) {
+            throw $exception;
+        }
+
+        return $this->responses;
+    }
+
+    /**
+     * Returns if the pipeline should throw exceptions on server errors.
+     *
+     * @todo Awful naming...
+     * @return bool
+     */
+    protected function throwServerExceptions()
+    {
+        return (bool) $this->client->getOptions()->exceptions;
+    }
+
+    /**
+     * Returns the underlying client instance used by the pipeline object.
+     *
+     * @return ClientInterface
+     */
+    public function getClient()
+    {
+        return $this->client;
+    }
+}

+ 0 - 186
lib/Predis/Pipeline/PipelineContext.php

@@ -1,186 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\BasicClientInterface;
-use Predis\ClientException;
-use Predis\ClientInterface;
-use Predis\ExecutableContextInterface;
-use Predis\Command\CommandInterface;
-
-/**
- * Abstraction of a pipeline context where write and read operations
- * of commands and their replies over the network are pipelined.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-class PipelineContext implements BasicClientInterface, ExecutableContextInterface
-{
-    private $client;
-    private $executor;
-    private $pipeline;
-
-    private $replies = array();
-    private $running = false;
-
-    /**
-     * @param ClientInterface $client Client instance used by the context.
-     * @param PipelineExecutorInterface $executor Pipeline executor instace.
-     */
-    public function __construct(ClientInterface $client, PipelineExecutorInterface $executor = null)
-    {
-        $this->client = $client;
-        $this->executor = $executor ?: $this->createExecutor($client);
-        $this->pipeline = new SplQueue();
-    }
-
-    /**
-     * Returns a pipeline executor depending on the kind of the underlying
-     * connection and the passed options.
-     *
-     * @param ClientInterface $client Client instance used by the context.
-     * @return PipelineExecutorInterface
-     */
-    protected function createExecutor(ClientInterface $client)
-    {
-        $options = $client->getOptions();
-
-        if (isset($options->exceptions)) {
-            return new StandardExecutor($options->exceptions);
-        }
-
-        return new StandardExecutor();
-    }
-
-    /**
-     * Queues a command into the pipeline buffer.
-     *
-     * @param string $method Command ID.
-     * @param array $arguments Arguments for the command.
-     * @return PipelineContext
-     */
-    public function __call($method, $arguments)
-    {
-        $command = $this->client->createCommand($method, $arguments);
-        $this->recordCommand($command);
-
-        return $this;
-    }
-
-    /**
-     * Queues a command instance into the pipeline buffer.
-     *
-     * @param CommandInterface $command Command to queue in the buffer.
-     */
-    protected function recordCommand(CommandInterface $command)
-    {
-        $this->pipeline->enqueue($command);
-    }
-
-    /**
-     * Queues a command instance into the pipeline buffer.
-     *
-     * @param CommandInterface $command Command to queue in the buffer.
-     */
-    public function executeCommand(CommandInterface $command)
-    {
-        $this->recordCommand($command);
-    }
-
-    /**
-     * Flushes the buffer that holds the queued commands.
-     *
-     * @param Boolean $send Specifies if the commands in the buffer should be sent to Redis.
-     * @return PipelineContext
-     */
-    public function flushPipeline($send = true)
-    {
-        if ($send && !$this->pipeline->isEmpty()) {
-            $connection = $this->client->getConnection();
-            $replies = $this->executor->execute($connection, $this->pipeline);
-            $this->replies = array_merge($this->replies, $replies);
-        } else {
-            $this->pipeline = new SplQueue();
-        }
-
-        return $this;
-    }
-
-    /**
-     * Marks the running status of the pipeline.
-     *
-     * @param Boolean $bool True if the pipeline is running.
-     *                      False if the pipeline is not running.
-     */
-    private function setRunning($bool)
-    {
-        if ($bool === true && $this->running === true) {
-            throw new ClientException("This pipeline is already opened");
-        }
-
-        $this->running = $bool;
-    }
-
-    /**
-     * Handles the actual execution of the whole pipeline.
-     *
-     * @param mixed $callable Optional callback for execution.
-     * @return array
-     */
-    public function execute($callable = null)
-    {
-        if ($callable && !is_callable($callable)) {
-            throw new \InvalidArgumentException('Argument passed must be a callable object');
-        }
-
-        $this->setRunning(true);
-        $pipelineBlockException = null;
-
-        try {
-            if ($callable !== null) {
-                call_user_func($callable, $this);
-            }
-            $this->flushPipeline();
-        } catch (\Exception $exception) {
-            $pipelineBlockException = $exception;
-        }
-
-        $this->setRunning(false);
-
-        if ($pipelineBlockException !== null) {
-            throw $pipelineBlockException;
-        }
-
-        return $this->replies;
-    }
-
-    /**
-     * Returns the underlying client instance used by the pipeline object.
-     *
-     * @return ClientInterface
-     */
-    public function getClient()
-    {
-        return $this->client;
-    }
-
-    /**
-     * Returns the underlying pipeline executor used by the pipeline object.
-     *
-     * @return PipelineExecutorInterface
-     */
-    public function getExecutor()
-    {
-        return $this->executor;
-    }
-}

+ 0 - 33
lib/Predis/Pipeline/PipelineExecutorInterface.php

@@ -1,33 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\Connection\ConnectionInterface;
-
-/**
- * Defines a strategy to write a list of commands to the network
- * and read back their replies.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-interface PipelineExecutorInterface
-{
-    /**
-     * Writes a list of commands to the network and reads back their replies.
-     *
-     * @param ConnectionInterface $connection Connection to Redis.
-     * @param SplQueue $commands Commands queued for execution.
-     * @return array
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands);
-}

+ 0 - 71
lib/Predis/Pipeline/SafeClusterExecutor.php

@@ -1,71 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\CommunicationException;
-use Predis\Connection\ConnectionInterface;
-
-/**
- * Implements a pipeline executor strategy for connection clusters that does
- * not fail when an error is encountered, but adds the returned error in the
- * replies array.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-class SafeClusterExecutor implements PipelineExecutorInterface
-{
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands)
-    {
-        $size = count($commands);
-        $values = array();
-        $connectionExceptions = array();
-
-        foreach ($commands as $command) {
-            $cmdConnection = $connection->getConnection($command);
-
-            if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) {
-                continue;
-            }
-
-            try {
-                $cmdConnection->writeCommand($command);
-            } catch (CommunicationException $exception) {
-                $connectionExceptions[spl_object_hash($cmdConnection)] = $exception;
-            }
-        }
-
-        for ($i = 0; $i < $size; $i++) {
-            $command = $commands->dequeue();
-
-            $cmdConnection = $connection->getConnection($command);
-            $connectionObjectHash = spl_object_hash($cmdConnection);
-
-            if (isset($connectionExceptions[$connectionObjectHash])) {
-                $values[$i] = $connectionExceptions[$connectionObjectHash];
-                continue;
-            }
-
-            try {
-                $values[$i] = $cmdConnection->readResponse($command);
-            } catch (CommunicationException $exception) {
-                $values[$i] = $exception;
-                $connectionExceptions[$connectionObjectHash] = $exception;
-            }
-        }
-
-        return $values;
-    }
-}

+ 0 - 56
lib/Predis/Pipeline/SafeExecutor.php

@@ -1,56 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\CommunicationException;
-use Predis\Connection\ConnectionInterface;
-
-/**
- * Implements a pipeline executor strategy that does not fail when an error is
- * encountered, but adds the returned error in the replies array.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-class SafeExecutor implements PipelineExecutorInterface
-{
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands)
-    {
-        $size = count($commands);
-        $values = array();
-
-        foreach ($commands as $command) {
-            try {
-                $connection->writeCommand($command);
-            } catch (CommunicationException $exception) {
-                return array_fill(0, $size, $exception);
-            }
-        }
-
-        for ($i = 0; $i < $size; $i++) {
-            $command = $commands->dequeue();
-
-            try {
-                $values[$i] = $connection->readResponse($command);
-            } catch (CommunicationException $exception) {
-                $toAdd = count($commands) - count($values);
-                $values = array_merge($values, array_fill(0, $toAdd, $exception));
-                break;
-            }
-        }
-
-        return $values;
-    }
-}

+ 0 - 118
lib/Predis/Pipeline/StandardExecutor.php

@@ -1,118 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use SplQueue;
-use Predis\Command\CommandInterface;
-use Predis\Connection\ConnectionInterface;
-use Predis\Connection\ReplicationConnectionInterface;
-use Predis\Response;
-
-/**
- * Implements the standard pipeline executor strategy used
- * to write a list of commands and read their replies over
- * a connection to Redis.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
-class StandardExecutor implements PipelineExecutorInterface
-{
-    protected $exceptions;
-
-    /**
-     * @param bool $exceptions Specifies if the executor should throw exceptions on server errors.
-     */
-    public function __construct($exceptions = true)
-    {
-        $this->exceptions = (bool) $exceptions;
-    }
-
-    /**
-     * Allows the pipeline executor to perform operations on the
-     * connection before starting to execute the commands stored
-     * in the pipeline.
-     *
-     * @param ConnectionInterface $connection Connection instance.
-     */
-    protected function checkConnection(ConnectionInterface $connection)
-    {
-        if ($connection instanceof ReplicationConnectionInterface) {
-            $connection->switchTo('master');
-        }
-    }
-
-    /**
-     * Handles a response object.
-     *
-     * @param ConnectionInterface $connection
-     * @param CommandInterface $command
-     * @param Response\ObjectInterface $response
-     * @return mixed
-     */
-    protected function onResponseObject(
-        ConnectionInterface $connection,
-        CommandInterface $command,
-        Response\ObjectInterface $response)
-    {
-        if ($response instanceof Response\ErrorInterface) {
-            return $this->onResponseError($connection, $response);
-        }
-
-        return $response;
-    }
-
-    /**
-     * Handles -ERR responses returned by Redis.
-     *
-     * @param ConnectionInterface $connection The connection that returned the error.
-     * @param Response\ErrorInterface $response The error response instance.
-     */
-    protected function onResponseError(ConnectionInterface $connection, Response\ErrorInterface $response)
-    {
-        if (!$this->exceptions) {
-            return $response;
-        }
-
-        // Force disconnection to prevent protocol desynchronization.
-        $connection->disconnect();
-        $message = $response->getMessage();
-
-        throw new Response\ServerException($message);
-    }
-
-    /**
-     * {@inheritdoc}
-     */
-    public function execute(ConnectionInterface $connection, SplQueue $commands)
-    {
-        $this->checkConnection($connection);
-
-        foreach ($commands as $command) {
-            $connection->writeCommand($command);
-        }
-
-        $values = array();
-
-        while (!$commands->isEmpty()) {
-            $command = $commands->dequeue();
-            $response = $connection->readResponse($command);
-
-            if ($response instanceof Response\ObjectInterface) {
-                $values[] = $this->onResponseObject($connection, $command, $response);
-            } else {
-                $values[] = $command->parseResponse($response);
-            }
-        }
-
-        return $values;
-    }
-}

+ 6 - 38
tests/Predis/ClientTest.php

@@ -568,28 +568,22 @@ class ClientTest extends StandardTestCase
     /**
      * @group disconnected
      */
-    public function testPipelineWithoutArgumentsReturnsPipelineContext()
+    public function testPipelineWithoutArgumentsReturnsPipeline()
     {
         $client = new Client();
 
-        $this->assertInstanceOf('Predis\Pipeline\PipelineContext', $client->pipeline());
+        $this->assertInstanceOf('Predis\Pipeline\Pipeline', $client->pipeline());
     }
 
     /**
      * @group disconnected
      */
-    public function testPipelineWithArrayReturnsPipelineContextWithOptions()
+    public function testPipelineWithArrayReturnsPipeline()
     {
         $client = new Client();
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
+        $options = array();
 
-        $options = array('executor' => $executor);
-        $this->assertInstanceOf('Predis\Pipeline\PipelineContext', $pipeline = $client->pipeline($options));
-        $this->assertSame($executor, $pipeline->getExecutor());
-
-        $options = array('executor' => function ($client, $options) use ($executor) { return $executor; });
-        $this->assertInstanceOf('Predis\Pipeline\PipelineContext', $pipeline = $client->pipeline($options));
-        $this->assertSame($executor, $pipeline->getExecutor());
+        $this->assertInstanceOf('Predis\Pipeline\Pipeline', $client->pipeline($options));
     }
 
     /**
@@ -600,38 +594,12 @@ class ClientTest extends StandardTestCase
         $callable = $this->getMock('stdClass', array('__invoke'));
         $callable->expects($this->once())
                  ->method('__invoke')
-                 ->with($this->isInstanceOf('Predis\Pipeline\PipelineContext'));
+                 ->with($this->isInstanceOf('Predis\Pipeline\Pipeline'));
 
         $client = new Client();
         $client->pipeline($callable);
     }
 
-    /**
-     * @group disconnected
-     */
-    public function testPipelineWithArrayAndCallableExecutesPipelineWithOptions()
-    {
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $options = array('executor' => $executor);
-
-        $test = $this;
-        $mockCallback = function ($pipeline) use ($executor, $test) {
-            $reflection = new \ReflectionProperty($pipeline, 'executor');
-            $reflection->setAccessible(true);
-
-            $test->assertSame($executor, $reflection->getValue($pipeline));
-        };
-
-        $callable = $this->getMock('stdClass', array('__invoke'));
-        $callable->expects($this->once())
-                 ->method('__invoke')
-                 ->with($this->isInstanceOf('Predis\Pipeline\PipelineContext'))
-                 ->will($this->returnCallback($mockCallback));
-
-        $client = new Client();
-        $client->pipeline($options, $callable);
-    }
-
     /**
      * @group disconnected
      */

+ 59 - 48
tests/Predis/Pipeline/MultiExecExecutorTest.php → tests/Predis/Pipeline/AtomicTest.php

@@ -14,21 +14,19 @@ namespace Predis\Pipeline;
 use PHPUnit_Framework_TestCase as StandardTestCase;
 
 use SplQueue;
-use Predis\Profile\ServerProfile;
+use Predis\Client;
 use Predis\Response;
 
 /**
  *
  */
-class MultiExecExecutorTest extends StandardTestCase
+class AtomicTest extends StandardTestCase
 {
     /**
      * @group disconnected
      */
-    public function testExecutorWithSingleConnection()
+    public function testPipelineWithSingleConnection()
     {
-        $executor = new MultiExecExecutor();
-        $pipeline = $this->getCommandsQueue();
         $queued = new Response\StatusQueued();
 
         $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
@@ -41,10 +39,13 @@ class MultiExecExecutorTest extends StandardTestCase
                    ->method('readResponse')
                    ->will($this->onConsecutiveCalls($queued, $queued, $queued));
 
-        $replies = $executor->execute($connection, $pipeline);
+        $pipeline = new Atomic(new Client($connection));
 
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertSame(array(true, true, true), $replies);
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertSame(array(true, true, true), $pipeline->execute());
     }
 
     /**
@@ -52,17 +53,20 @@ class MultiExecExecutorTest extends StandardTestCase
      * @expectedException Predis\ClientException
      * @expectedExceptionMessage The underlying transaction has been aborted by the server
      */
-    public function testExecutorWithAbortedTransaction()
+    public function testThrowsExceptionOnAbortedTransaction()
     {
-        $executor = new MultiExecExecutor();
-        $pipeline = $this->getCommandsQueue();
-
         $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
         $connection->expects($this->exactly(2))
                    ->method('executeCommand')
                    ->will($this->onConsecutiveCalls(true, null));
 
-        $executor->execute($connection, $pipeline);
+        $pipeline = new Atomic(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $pipeline->execute();
     }
 
     /**
@@ -70,10 +74,8 @@ class MultiExecExecutorTest extends StandardTestCase
      * @expectedException Predis\Response\ServerException
      * @expectedExceptionMessage ERR Test error
      */
-    public function testExecutorWithErrorInTransaction()
+    public function testPipelineWithErrorInTransaction()
     {
-        $executor = new MultiExecExecutor();
-        $pipeline = $this->getCommandsQueue();
         $queued = new Response\StatusQueued();
         $error = new Response\Error('ERR Test error');
 
@@ -88,16 +90,42 @@ class MultiExecExecutorTest extends StandardTestCase
                    ->method('executeCommand')
                    ->with($this->isInstanceOf('Predis\Command\TransactionDiscard'));
 
-        $executor->execute($connection, $pipeline);
+        $pipeline = new Atomic(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $pipeline->execute();
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException Predis\Response\ServerException
+     * @expectedExceptionMessage ERR Test error
+     */
+    public function testThrowsServerExceptionOnResponseErrorByDefault()
+    {
+        $error = new Response\Error('ERR Test error');
+
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->once())
+                   ->method('readResponse')
+                   ->will($this->returnValue($error));
+
+        $pipeline = new Atomic(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $pipeline->execute();
     }
 
     /**
      * @group disconnected
      */
-    public function testExecutorWithErrorInCommandResponse()
+    public function testReturnsResponseErrorWithClientExceptionsSetToFalse()
     {
-        $executor = new MultiExecExecutor();
-        $pipeline = $this->getCommandsQueue();
         $queued = new Response\StatusQueued();
         $error = new Response\Error('ERR Test error');
 
@@ -109,44 +137,27 @@ class MultiExecExecutorTest extends StandardTestCase
                    ->method('executeCommand')
                    ->will($this->returnValue(array('PONG', 'PONG', $error)));
 
-        $replies = $executor->execute($connection, $pipeline);
+        $pipeline = new Atomic(new Client($connection, array('exceptions' => false)));
 
-        $this->assertSame(array(true, true, $error), $replies);
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertSame(array(true, true, $error), $pipeline->execute());
     }
 
     /**
      * @group disconnected
      * @expectedException Predis\ClientException
-     * @expectedExceptionMessage Predis\Pipeline\MultiExecExecutor can be used only with single connections
+     * @expectedExceptionMessage Predis\Pipeline\Atomic can be used only with connections to single nodes
      */
     public function testExecutorWithAggregatedConnection()
     {
-        $executor = new MultiExecExecutor();
-        $pipeline = $this->getCommandsQueue();
-
-        $connection = $this->getMock('Predis\Connection\ReplicationConnectionInterface');
-
-        $replies = $executor->execute($connection, $pipeline);
-    }
-
-    // ******************************************************************** //
-    // ---- HELPER METHODS ------------------------------------------------ //
-    // ******************************************************************** //
-
-    /**
-     * Returns a list of queued command instances.
-     *
-     * @return SplQueue
-     */
-    protected function getCommandsQueue()
-    {
-        $profile = ServerProfile::getDevelopment();
+        $connection = $this->getMock('Predis\Connection\ClusterConnectionInterface');
+        $pipeline = new Atomic(new Client($connection));
 
-        $pipeline = new SplQueue();
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
+        $pipeline->ping();
 
-        return $pipeline;
+        $pipeline->execute();
     }
 }

+ 0 - 87
tests/Predis/Pipeline/FireAndForgetExecutorTest.php

@@ -1,87 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use PHPUnit_Framework_TestCase as StandardTestCase;
-
-use SplQueue;
-use Predis\Profile\ServerProfile;
-
-/**
- *
- */
-class FireAndForgetExecutorTest extends StandardTestCase
-{
-    /**
-     * @group disconnected
-     */
-    public function testExecutorWithSingleConnection()
-    {
-        $executor = new FireAndForgetExecutor();
-        $pipeline = $this->getCommandsQueue();
-
-        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
-        $connection->expects($this->exactly(3))
-                   ->method('writeCommand');
-        $connection->expects($this->never())
-                   ->method('readResponse');
-
-        $replies = $executor->execute($connection, $pipeline);
-
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertEmpty($replies);
-    }
-
-    /**
-     * @group disconnected
-     */
-    public function testExecutorWithReplicationConnection()
-    {
-        $executor = new FireAndForgetExecutor();
-        $pipeline = $this->getCommandsQueue();
-
-        $connection = $this->getMock('Predis\Connection\ReplicationConnectionInterface');
-        $connection->expects($this->once())
-                   ->method('switchTo')
-                   ->with('master');
-        $connection->expects($this->exactly(3))
-                   ->method('writeCommand');
-        $connection->expects($this->never())
-                   ->method('readResponse');
-
-        $replies = $executor->execute($connection, $pipeline);
-
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertEmpty($replies);
-    }
-
-    // ******************************************************************** //
-    // ---- HELPER METHODS ------------------------------------------------ //
-    // ******************************************************************** //
-
-    /**
-     * Returns a list of queued command instances.
-     *
-     * @return SplQueue
-     */
-    protected function getCommandsQueue()
-    {
-        $profile = ServerProfile::getDevelopment();
-
-        $pipeline = new SplQueue();
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
-
-        return $pipeline;
-    }
-}

+ 69 - 0
tests/Predis/Pipeline/FireAndForgetTest.php

@@ -0,0 +1,69 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Pipeline;
+
+use PHPUnit_Framework_TestCase as StandardTestCase;
+
+use SplQueue;
+use Predis\Client;
+use Predis\Profile\ServerProfile;
+
+/**
+ *
+ */
+class FireAndForgetTest extends StandardTestCase
+{
+    /**
+     * @group disconnected
+     */
+    public function testPipelineWithSingleConnection()
+    {
+        $profile = ServerProfile::getDefault();
+
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->exactly(3))->method('writeCommand');
+        $connection->expects($this->never())->method('readResponse');
+
+        $pipeline = new FireAndForget(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertEmpty($pipeline->execute());
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testSwitchesToMasterWithReplicationConnection()
+    {
+        $profile = ServerProfile::getDefault();
+
+        $connection = $this->getMock('Predis\Connection\ReplicationConnectionInterface');
+        $connection->expects($this->once())
+                   ->method('switchTo')
+                   ->with('master');
+        $connection->expects($this->exactly(3))
+                   ->method('writeCommand');
+        $connection->expects($this->never())
+                   ->method('readResponse');
+
+        $pipeline = new FireAndForget(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertEmpty($pipeline->execute());
+    }
+}

+ 115 - 39
tests/Predis/Pipeline/PipelineContextTest.php → tests/Predis/Pipeline/PipelineTest.php

@@ -16,34 +16,22 @@ use PHPUnit_Framework_TestCase as StandardTestCase;
 use Predis\Client;
 use Predis\ClientException;
 use Predis\Profile\ServerProfile;
+use Predis\Response;
 
 /**
  *
  */
-class PipelineContextTest extends StandardTestCase
+class PipelineTest extends StandardTestCase
 {
     /**
      * @group disconnected
      */
-    public function testConstructorWithoutOptions()
+    public function testConstructor()
     {
         $client = new Client();
-        $pipeline = new PipelineContext($client);
+        $pipeline = new Pipeline($client);
 
         $this->assertSame($client, $pipeline->getClient());
-        $this->assertInstanceOf('Predis\Pipeline\StandardExecutor', $pipeline->getExecutor());
-    }
-
-    /**
-     * @group disconnected
-     */
-    public function testConstructorWithExecutorArgument()
-    {
-        $client = new Client();
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-
-        $pipeline = new PipelineContext($client, $executor);
-        $this->assertSame($executor, $pipeline->getExecutor());
     }
 
     /**
@@ -51,10 +39,11 @@ class PipelineContextTest extends StandardTestCase
      */
     public function testCallDoesNotSendCommandsWithoutExecute()
     {
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $executor->expects($this->never())->method('executor');
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->never())->method('writeCommand');
+        $connection->expects($this->never())->method('readResponse');
 
-        $pipeline = new PipelineContext(new Client(), $executor);
+        $pipeline = new Pipeline(new Client($connection));
 
         $pipeline->echo('one');
         $pipeline->echo('two');
@@ -66,15 +55,79 @@ class PipelineContextTest extends StandardTestCase
      */
     public function testCallReturnsPipelineForFluentInterface()
     {
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $executor->expects($this->never())->method('executor');
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->never())->method('writeCommand');
+        $connection->expects($this->never())->method('readResponse');
 
-        $pipeline = new PipelineContext(new Client(), $executor);
+        $pipeline = new Pipeline(new Client($connection));
 
         $this->assertSame($pipeline, $pipeline->echo('one'));
         $this->assertSame($pipeline, $pipeline->echo('one')->echo('two')->echo('three'));
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testDoesNotParseComplexResponseObjects()
+    {
+        $object = $this->getMock('Predis\Response\ObjectInterface');
+
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->once())
+                   ->method('readResponse')
+                   ->will($this->returnValue($object));
+
+        $pipeline = new Pipeline(new Client($connection));
+
+        $pipeline->ping();
+
+        $this->assertSame(array($object), $pipeline->execute());
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException Predis\Response\ServerException
+     * @expectedExceptionMessage ERR Test error
+     */
+    public function testThrowsServerExceptionOnResponseErrorByDefault()
+    {
+        $error = new Response\Error('ERR Test error');
+
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->once())
+                   ->method('readResponse')
+                   ->will($this->returnValue($error));
+
+        $pipeline = new Pipeline(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $pipeline->execute();
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testReturnsResponseErrorWithClientExceptionsSetToFalse()
+    {
+        $error = $this->getMock('Predis\Response\ErrorInterface');
+
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->exactly(2))
+                   ->method('readResponse')
+                   ->will($this->returnValue($error));
+
+        $client = new Client($connection, array('exceptions' => false));
+
+        $pipeline = new Pipeline($client);
+
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertSame(array($error, $error), $pipeline->execute());
+    }
+
     /**
      * @group disconnected
      */
@@ -82,10 +135,11 @@ class PipelineContextTest extends StandardTestCase
     {
         $profile = ServerProfile::getDefault();
 
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $executor->expects($this->never())->method('executor');
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->never())->method('writeCommand');
+        $connection->expects($this->never())->method('readResponse');
 
-        $pipeline = new PipelineContext(new Client(), $executor);
+        $pipeline = new Pipeline(new Client($connection));
 
         $pipeline->executeCommand($profile->createCommand('echo', array('one')));
         $pipeline->executeCommand($profile->createCommand('echo', array('two')));
@@ -97,10 +151,11 @@ class PipelineContextTest extends StandardTestCase
      */
     public function testExecuteWithEmptyBuffer()
     {
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $executor->expects($this->never())->method('executor');
+        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
+        $connection->expects($this->never())->method('writeCommand');
+        $connection->expects($this->never())->method('readResponse');
 
-        $pipeline = new PipelineContext(new Client(), $executor);
+        $pipeline = new Pipeline(new Client($connection));
 
         $this->assertSame(array(), $pipeline->execute());
     }
@@ -117,7 +172,7 @@ class PipelineContextTest extends StandardTestCase
                    ->method('readResponse')
                    ->will($this->returnCallback($this->getReadCallback()));
 
-        $pipeline = new PipelineContext(new Client($connection));
+        $pipeline = new Pipeline(new Client($connection));
 
         $pipeline->echo('one');
         $pipeline->echo('two');
@@ -133,10 +188,7 @@ class PipelineContextTest extends StandardTestCase
      */
     public function testFlushWithFalseArgumentDiscardsBuffer()
     {
-        $executor = $this->getMock('Predis\Pipeline\PipelineExecutorInterface');
-        $executor->expects($this->never())->method('executor');
-
-        $pipeline = new PipelineContext(new Client(), $executor);
+        $pipeline = new Pipeline(new Client());
 
         $pipeline->echo('one');
         $pipeline->echo('two');
@@ -159,7 +211,7 @@ class PipelineContextTest extends StandardTestCase
                    ->method('readResponse')
                    ->will($this->returnCallback($this->getReadCallback()));
 
-        $pipeline = new PipelineContext(new Client($connection));
+        $pipeline = new Pipeline(new Client($connection));
 
         $pipeline->echo('one');
         $pipeline->echo('two');
@@ -170,13 +222,37 @@ class PipelineContextTest extends StandardTestCase
         $this->assertSame(array('one', 'two', 'three', 'four'), $pipeline->execute());
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testSwitchesToMasterWithReplicationConnection()
+    {
+        $connection = $this->getMock('Predis\Connection\ReplicationConnectionInterface');
+        $connection->expects($this->once())
+                   ->method('switchTo')
+                   ->with('master');
+        $connection->expects($this->exactly(3))
+                   ->method('writeCommand');
+        $connection->expects($this->exactly(3))
+                   ->method('readResponse')
+                   ->will($this->returnValue('PONG'));
+
+        $pipeline = new Pipeline(new Client($connection));
+
+        $pipeline->ping();
+        $pipeline->ping();
+        $pipeline->ping();
+
+        $this->assertSame(array(true, true, true), $pipeline->execute());
+    }
+
     /**
      * @group disconnected
      */
     public function testExecuteAcceptsCallableArgument()
     {
         $test = $this;
-        $pipeline = new PipelineContext(new Client());
+        $pipeline = new Pipeline(new Client());
 
         $callable = function ($pipe) use ($test, $pipeline) {
             $test->assertSame($pipeline, $pipe);
@@ -194,7 +270,7 @@ class PipelineContextTest extends StandardTestCase
     {
         $noncallable = new \stdClass();
 
-        $pipeline = new PipelineContext(new Client());
+        $pipeline = new Pipeline(new Client());
         $pipeline->execute($noncallable);
     }
 
@@ -204,7 +280,7 @@ class PipelineContextTest extends StandardTestCase
      */
     public function testExecuteInsideCallableArgumentThrowsException()
     {
-        $pipeline = new PipelineContext(new Client());
+        $pipeline = new Pipeline(new Client());
 
         $pipeline->execute(function ($pipe) {
             $pipe->execute();
@@ -223,7 +299,7 @@ class PipelineContextTest extends StandardTestCase
                    ->method('readResponse')
                    ->will($this->returnCallback($this->getReadCallback()));
 
-        $pipeline = new PipelineContext(new Client($connection));
+        $pipeline = new Pipeline(new Client($connection));
 
         $replies = $pipeline->execute(function ($pipe) {
             $pipe->echo('one');
@@ -244,7 +320,7 @@ class PipelineContextTest extends StandardTestCase
         $connection->expects($this->never())->method('writeCommand');
         $connection->expects($this->never())->method('readResponse');
 
-        $pipeline = new PipelineContext(new Client($connection));
+        $pipeline = new Pipeline(new Client($connection));
 
         $exception = null;
         $replies = null;

+ 0 - 159
tests/Predis/Pipeline/StandardExecutorTest.php

@@ -1,159 +0,0 @@
-<?php
-
-/*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
-
-namespace Predis\Pipeline;
-
-use PHPUnit_Framework_TestCase as StandardTestCase;
-
-use SplQueue;
-use Predis\Profile\ServerProfile;
-use Predis\Response;
-
-/**
- *
- */
-class StandardExecutorTest extends StandardTestCase
-{
-    /**
-     * @group disconnected
-     */
-    public function testExecutorWithSingleConnection()
-    {
-        $executor = new StandardExecutor();
-        $pipeline = $this->getCommandsQueue();
-
-        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
-        $connection->expects($this->exactly(3))
-                   ->method('writeCommand');
-        $connection->expects($this->exactly(3))
-                   ->method('readResponse')
-                   ->will($this->returnValue('PONG'));
-
-        $replies = $executor->execute($connection, $pipeline);
-
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertSame(array(true, true, true), $replies);
-    }
-
-    /**
-     * @group disconnected
-     */
-    public function testExecutorWithReplicationConnection()
-    {
-        $executor = new StandardExecutor();
-        $pipeline = $this->getCommandsQueue();
-
-        $connection = $this->getMock('Predis\Connection\ReplicationConnectionInterface');
-        $connection->expects($this->once())
-                   ->method('switchTo')
-                   ->with('master');
-        $connection->expects($this->exactly(3))
-                   ->method('writeCommand');
-        $connection->expects($this->exactly(3))
-                   ->method('readResponse')
-                   ->will($this->returnValue('PONG'));
-
-        $replies = $executor->execute($connection, $pipeline);
-
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertSame(array(true, true, true), $replies);
-    }
-
-    /**
-     * @group disconnected
-     */
-    public function testExecutorDoesNotParseResponseObjects()
-    {
-        $executor = new StandardExecutor();
-        $response = $this->getMock('Predis\Response\ObjectInterface');
-
-        $this->simpleResponseObjectTest($executor, $response);
-    }
-
-    /**
-     * @group disconnected
-     */
-    public function testExecutorCanReturnRedisErrors()
-    {
-        $executor = new StandardExecutor(false);
-        $response = $this->getMock('Predis\Response\ErrorInterface');
-
-        $this->simpleResponseObjectTest($executor, $response);
-    }
-
-    /**
-     * @group disconnected
-     * @expectedException Predis\Response\ServerException
-     * @expectedExceptionMessage ERR Test error
-     */
-    public function testExecutorCanThrowExceptions()
-    {
-        $executor = new StandardExecutor(true);
-        $pipeline = $this->getCommandsQueue();
-        $error = new Response\Error('ERR Test error');
-
-        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
-        $connection->expects($this->once())
-                   ->method('readResponse')
-                   ->will($this->returnValue($error));
-
-        $executor->execute($connection, $pipeline);
-    }
-
-    // ******************************************************************** //
-    // ---- HELPER METHODS ------------------------------------------------ //
-    // ******************************************************************** //
-
-    /**
-     * Executes a test for the Predis\Response\ObjectInterface type.
-     *
-     * @param PipelineExecutorInterface $executor
-     * @param ResponseObjectInterface $response
-     */
-    protected function simpleResponseObjectTest(PipelineExecutorInterface $executor, Response\ObjectInterface $response)
-    {
-        $pipeline = new SplQueue();
-
-        $command = $this->getMock('Predis\Command\CommandInterface');
-        $command->expects($this->never())
-                ->method('parseResponse');
-
-        $connection = $this->getMock('Predis\Connection\SingleConnectionInterface');
-        $connection->expects($this->once())
-                   ->method('writeCommand');
-        $connection->expects($this->once())
-                   ->method('readResponse')
-                   ->will($this->returnValue($response));
-
-        $pipeline->enqueue($command);
-        $replies = $executor->execute($connection, $pipeline);
-
-        $this->assertTrue($pipeline->isEmpty());
-        $this->assertSame(array($response), $replies);
-    }
-
-    /**
-     * Returns a list of queued command instances.
-     *
-     * @return SplQueue
-     */
-    protected function getCommandsQueue()
-    {
-        $profile = ServerProfile::getDevelopment();
-
-        $pipeline = new SplQueue();
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
-        $pipeline->enqueue($profile->createCommand('ping'));
-
-        return $pipeline;
-    }
-}