Atomic.php 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use SplQueue;
  12. use Predis\ClientException;
  13. use Predis\Connection\ConnectionInterface;
  14. use Predis\Connection\SingleConnectionInterface;
  15. use Predis\Profile\ServerProfileInterface;
  16. use Predis\Response;
  17. /**
  18. * Command pipeline wrapped into a MULTI / EXEC transaction.
  19. *
  20. * @author Daniele Alessandri <suppakilla@gmail.com>
  21. */
  22. class Atomic extends Pipeline
  23. {
  24. /**
  25. * {@inheritdoc}
  26. */
  27. protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
  28. {
  29. $profile = $this->getClient()->getProfile();
  30. $this->check($connection, $profile);
  31. $connection->executeCommand($profile->createCommand('multi'));
  32. foreach ($commands as $command) {
  33. $connection->writeCommand($command);
  34. }
  35. foreach ($commands as $command) {
  36. $response = $connection->readResponse($command);
  37. if ($response instanceof Response\ErrorInterface) {
  38. $connection->executeCommand($profile->createCommand('discard'));
  39. throw new Response\ServerException($response->getMessage());
  40. }
  41. }
  42. $executed = $connection->executeCommand($profile->createCommand('exec'));
  43. if (!isset($executed)) {
  44. // TODO: should be throwing a more appropriate exception.
  45. throw new ClientException(
  46. 'The underlying transaction has been aborted by the server'
  47. );
  48. }
  49. if (count($executed) !== count($commands)) {
  50. throw new ClientException(
  51. "Invalid number of replies [expected: ".count($commands)." - actual: ".count($executed)."]"
  52. );
  53. }
  54. $responses = array();
  55. $sizeOfPipe = count($commands);
  56. $exceptions = $this->throwServerExceptions();
  57. for ($i = 0; $i < $sizeOfPipe; $i++) {
  58. $command = $commands->dequeue();
  59. $response = $executed[$i];
  60. if (!$response instanceof Response\ObjectInterface) {
  61. $responses[] = $command->parseResponse($response);
  62. } else if ($response instanceof Response\ErrorInterface && $exceptions) {
  63. $this->exception($connection, $response);
  64. } else {
  65. $responses[] = $response;
  66. }
  67. unset($executed[$i]);
  68. }
  69. return $responses;
  70. }
  71. /**
  72. * Verifies all the needed preconditions before executing the pipeline.
  73. *
  74. * @param ConnectionInterface $connection Connection instance.
  75. * @param ServerProfileInterface $profile Server profile.
  76. */
  77. protected function check(ConnectionInterface $connection, ServerProfileInterface $profile)
  78. {
  79. if (!$connection instanceof SingleConnectionInterface) {
  80. $class = __CLASS__;
  81. throw new ClientException(
  82. "$class can be used only with connections to single nodes"
  83. );
  84. }
  85. if (!$profile->supportsCommands(array('multi', 'exec', 'discard'))) {
  86. throw new ClientException(
  87. 'The specified server profile must support MULTI, EXEC and DISCARD.'
  88. );
  89. }
  90. }
  91. }