|
@@ -11,9 +11,10 @@
|
|
|
|
|
|
namespace Predis\Pipeline;
|
|
namespace Predis\Pipeline;
|
|
|
|
|
|
-use Predis\ServerException;
|
|
|
|
|
|
+use Predis\ResponseErrorInterface;
|
|
use Predis\Connection\ConnectionInterface;
|
|
use Predis\Connection\ConnectionInterface;
|
|
use Predis\Connection\ReplicationConnectionInterface;
|
|
use Predis\Connection\ReplicationConnectionInterface;
|
|
|
|
+use Predis\ServerException;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Implements the standard pipeline executor strategy used
|
|
* Implements the standard pipeline executor strategy used
|
|
@@ -24,6 +25,14 @@ use Predis\Connection\ReplicationConnectionInterface;
|
|
*/
|
|
*/
|
|
class StandardExecutor implements PipelineExecutorInterface
|
|
class StandardExecutor implements PipelineExecutorInterface
|
|
{
|
|
{
|
|
|
|
+ /**
|
|
|
|
+ * @param bool $useServerExceptions Specifies if the executor should throw exceptions on server errors.
|
|
|
|
+ */
|
|
|
|
+ public function __construct($useServerExceptions = true)
|
|
|
|
+ {
|
|
|
|
+ $this->useServerExceptions = (bool) $useServerExceptions;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Allows the pipeline executor to perform operations on the
|
|
* Allows the pipeline executor to perform operations on the
|
|
* connection before starting to execute the commands stored
|
|
* connection before starting to execute the commands stored
|
|
@@ -38,13 +47,29 @@ class StandardExecutor implements PipelineExecutorInterface
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Handles -ERR responses returned by Redis.
|
|
|
|
+ *
|
|
|
|
+ * @param ConnectionInterface $connection The connection that returned the error.
|
|
|
|
+ * @param ResponseErrorInterface $response The error response instance.
|
|
|
|
+ */
|
|
|
|
+ protected function onResponseError(ConnectionInterface $connection, ResponseErrorInterface $response)
|
|
|
|
+ {
|
|
|
|
+ // Force disconnection to prevent protocol desynchronization.
|
|
|
|
+ $connection->disconnect();
|
|
|
|
+ $message = $response->getMessage();
|
|
|
|
+
|
|
|
|
+ throw new ServerException($message);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* {@inheritdoc}
|
|
* {@inheritdoc}
|
|
*/
|
|
*/
|
|
- public function execute(ConnectionInterface $connection, &$commands)
|
|
|
|
|
|
+ public function execute(ConnectionInterface $connection, Array &$commands)
|
|
{
|
|
{
|
|
- $sizeofPipe = count($commands);
|
|
|
|
$values = array();
|
|
$values = array();
|
|
|
|
+ $sizeofPipe = count($commands);
|
|
|
|
+ $useServerExceptions = $this->useServerExceptions;
|
|
|
|
|
|
$this->checkConnection($connection);
|
|
$this->checkConnection($connection);
|
|
|
|
|
|
@@ -52,19 +77,15 @@ class StandardExecutor implements PipelineExecutorInterface
|
|
$connection->writeCommand($command);
|
|
$connection->writeCommand($command);
|
|
}
|
|
}
|
|
|
|
|
|
- try {
|
|
|
|
- for ($i = 0; $i < $sizeofPipe; $i++) {
|
|
|
|
- $response = $connection->readResponse($commands[$i]);
|
|
|
|
- $values[] = $response instanceof \Iterator
|
|
|
|
- ? iterator_to_array($response)
|
|
|
|
- : $response;
|
|
|
|
- unset($commands[$i]);
|
|
|
|
|
|
+ for ($i = 0; $i < $sizeofPipe; $i++) {
|
|
|
|
+ $response = $connection->readResponse($commands[$i]);
|
|
|
|
+
|
|
|
|
+ if ($response instanceof ResponseErrorInterface && $useServerExceptions === true) {
|
|
|
|
+ $this->onResponseError($connection, $response);
|
|
}
|
|
}
|
|
- }
|
|
|
|
- catch (ServerException $exception) {
|
|
|
|
- // Force disconnection to prevent protocol desynchronization.
|
|
|
|
- $connection->disconnect();
|
|
|
|
- throw $exception;
|
|
|
|
|
|
+
|
|
|
|
+ $values[] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
|
|
|
|
+ unset($commands[$i]);
|
|
}
|
|
}
|
|
|
|
|
|
return $values;
|
|
return $values;
|