StandardExecutor.php 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use SplQueue;
  12. use Predis\ResponseErrorInterface;
  13. use Predis\ServerException;
  14. use Predis\Connection\ConnectionInterface;
  15. use Predis\Connection\ReplicationConnectionInterface;
  16. /**
  17. * Implements the standard pipeline executor strategy used
  18. * to write a list of commands and read their replies over
  19. * a connection to Redis.
  20. *
  21. * @author Daniele Alessandri <suppakilla@gmail.com>
  22. */
  23. class StandardExecutor implements PipelineExecutorInterface
  24. {
  25. protected $exceptions;
  26. /**
  27. * @param bool $exceptions Specifies if the executor should throw exceptions on server errors.
  28. */
  29. public function __construct($exceptions = true)
  30. {
  31. $this->exceptions = (bool) $exceptions;
  32. }
  33. /**
  34. * Allows the pipeline executor to perform operations on the
  35. * connection before starting to execute the commands stored
  36. * in the pipeline.
  37. *
  38. * @param ConnectionInterface Connection instance.
  39. */
  40. protected function checkConnection(ConnectionInterface $connection)
  41. {
  42. if ($connection instanceof ReplicationConnectionInterface) {
  43. $connection->switchTo('master');
  44. }
  45. }
  46. /**
  47. * Handles -ERR responses returned by Redis.
  48. *
  49. * @param ConnectionInterface $connection The connection that returned the error.
  50. * @param ResponseErrorInterface $response The error response instance.
  51. */
  52. protected function onResponseError(ConnectionInterface $connection, ResponseErrorInterface $response)
  53. {
  54. // Force disconnection to prevent protocol desynchronization.
  55. $connection->disconnect();
  56. $message = $response->getMessage();
  57. throw new ServerException($message);
  58. }
  59. /**
  60. * {@inheritdoc}
  61. */
  62. public function execute(ConnectionInterface $connection, SplQueue $commands)
  63. {
  64. $size = count($commands);
  65. $values = array();
  66. $exceptions = $this->exceptions;
  67. $this->checkConnection($connection);
  68. foreach ($commands as $command) {
  69. $connection->writeCommand($command);
  70. }
  71. for ($i = 0; $i < $size; $i++) {
  72. $response = $connection->readResponse($commands->dequeue());
  73. if ($response instanceof ResponseErrorInterface && $exceptions === true) {
  74. $this->onResponseError($connection, $response);
  75. }
  76. $values[$i] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
  77. }
  78. return $values;
  79. }
  80. }