StandardExecutor.php 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use Predis\ResponseErrorInterface;
  12. use Predis\Connection\ConnectionInterface;
  13. use Predis\Connection\ReplicationConnectionInterface;
  14. use Predis\ServerException;
  15. /**
  16. * Implements the standard pipeline executor strategy used
  17. * to write a list of commands and read their replies over
  18. * a connection to Redis.
  19. *
  20. * @author Daniele Alessandri <suppakilla@gmail.com>
  21. */
  22. class StandardExecutor implements PipelineExecutorInterface
  23. {
  24. /**
  25. * @param bool $useServerExceptions Specifies if the executor should throw exceptions on server errors.
  26. */
  27. public function __construct($useServerExceptions = true)
  28. {
  29. $this->useServerExceptions = (bool) $useServerExceptions;
  30. }
  31. /**
  32. * Allows the pipeline executor to perform operations on the
  33. * connection before starting to execute the commands stored
  34. * in the pipeline.
  35. *
  36. * @param ConnectionInterface Connection instance.
  37. */
  38. protected function checkConnection(ConnectionInterface $connection)
  39. {
  40. if ($connection instanceof ReplicationConnectionInterface) {
  41. $connection->switchTo('master');
  42. }
  43. }
  44. /**
  45. * Handles -ERR responses returned by Redis.
  46. *
  47. * @param ConnectionInterface $connection The connection that returned the error.
  48. * @param ResponseErrorInterface $response The error response instance.
  49. */
  50. protected function onResponseError(ConnectionInterface $connection, ResponseErrorInterface $response)
  51. {
  52. // Force disconnection to prevent protocol desynchronization.
  53. $connection->disconnect();
  54. $message = $response->getMessage();
  55. throw new ServerException($message);
  56. }
  57. /**
  58. * {@inheritdoc}
  59. */
  60. public function execute(ConnectionInterface $connection, Array &$commands)
  61. {
  62. $values = array();
  63. $sizeofPipe = count($commands);
  64. $useServerExceptions = $this->useServerExceptions;
  65. $this->checkConnection($connection);
  66. foreach ($commands as $command) {
  67. $connection->writeCommand($command);
  68. }
  69. for ($i = 0; $i < $sizeofPipe; $i++) {
  70. $response = $connection->readResponse($commands[$i]);
  71. if ($response instanceof ResponseErrorInterface && $useServerExceptions === true) {
  72. $this->onResponseError($connection, $response);
  73. }
  74. $values[] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
  75. unset($commands[$i]);
  76. }
  77. return $values;
  78. }
  79. }