|
@@ -11,6 +11,7 @@
|
|
|
|
|
|
namespace Predis\Pipeline;
|
|
|
|
|
|
+use Iterator;
|
|
|
use SplQueue;
|
|
|
use Predis\ClientException;
|
|
|
use Predis\ResponseErrorInterface;
|
|
@@ -61,9 +62,6 @@ class MultiExecExecutor implements PipelineExecutorInterface
|
|
|
*/
|
|
|
public function execute(ConnectionInterface $connection, SplQueue $commands)
|
|
|
{
|
|
|
- $size = count($commands);
|
|
|
- $values = array();
|
|
|
-
|
|
|
$this->checkConnection($connection);
|
|
|
|
|
|
$cmd = $this->profile->createCommand('multi');
|
|
@@ -91,22 +89,64 @@ class MultiExecExecutor implements PipelineExecutorInterface
|
|
|
throw new ClientException('The underlying transaction has been aborted by the server');
|
|
|
}
|
|
|
|
|
|
- if (count($responses) !== $size) {
|
|
|
- throw new ClientException("Invalid number of replies [expected: $size - actual: ".count($responses)."]");
|
|
|
+ if (count($responses) !== count($commands)) {
|
|
|
+ throw new ClientException("Invalid number of replies [expected: ".count($commands)." - actual: ".count($responses)."]");
|
|
|
}
|
|
|
|
|
|
- for ($i = 0; $i < $size; $i++) {
|
|
|
- $commandReply = $responses[$i];
|
|
|
+ $consumer = $responses instanceof Iterator ? 'consumeIteratorResponse' : 'consumeArrayResponse';
|
|
|
|
|
|
- if ($commandReply instanceof ResponseObjectInterface) {
|
|
|
- $values[$i] = $commandReply;
|
|
|
- $commands->dequeue();
|
|
|
- } else {
|
|
|
- if ($commandReply instanceof \Iterator) {
|
|
|
- $commandReply = iterator_to_array($commandReply);
|
|
|
+ return $this->$consumer($commands, $responses);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Consumes an iterator response returned by EXEC.
|
|
|
+ *
|
|
|
+ * @param SplQueue $commands Pipelined commands
|
|
|
+ * @param Iterator $responses Responses returned by EXEC.
|
|
|
+ * @return array
|
|
|
+ */
|
|
|
+ protected function consumeIteratorResponse(SplQueue $commands, Iterator $responses)
|
|
|
+ {
|
|
|
+ $values = array();
|
|
|
+
|
|
|
+ foreach ($responses as $response) {
|
|
|
+ $command = $commands->dequeue();
|
|
|
+
|
|
|
+ if ($response instanceof ResponseObjectInterface) {
|
|
|
+ if ($response instanceof Iterator) {
|
|
|
+ $response = iterator_to_array($response);
|
|
|
+ $values[] = $command->parseResponse($response);
|
|
|
+ } else {
|
|
|
+ $values[] = $response;
|
|
|
}
|
|
|
+ } else {
|
|
|
+ $values[] = $command->parseResponse($response);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return $values;
|
|
|
+ }
|
|
|
|
|
|
- $values[$i] = $commands->dequeue()->parseResponse($commandReply);
|
|
|
+ /**
|
|
|
+ * Consumes an array response returned by EXEC.
|
|
|
+ *
|
|
|
+ * @param SplQueue $commands Pipelined commands
|
|
|
+ * @param Array $responses Responses returned by EXEC.
|
|
|
+ * @return array
|
|
|
+ */
|
|
|
+ protected function consumeArrayResponse(SplQueue $commands, Array &$responses)
|
|
|
+ {
|
|
|
+ $size = count($commands);
|
|
|
+ $values = array();
|
|
|
+
|
|
|
+ for ($i = 0; $i < $size; $i++) {
|
|
|
+ $command = $commands->dequeue();
|
|
|
+ $response = $responses[$i];
|
|
|
+
|
|
|
+ if ($response instanceof ResponseObjectInterface) {
|
|
|
+ $values[$i] = $response;
|
|
|
+ } else {
|
|
|
+ $values[$i] = $command->parseResponse($response);
|
|
|
}
|
|
|
|
|
|
unset($responses[$i]);
|