Atomic.php 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use SplQueue;
  12. use Predis\ClientException;
  13. use Predis\ClientInterface;
  14. use Predis\Connection\ConnectionInterface;
  15. use Predis\Connection\NodeConnectionInterface;
  16. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  17. use Predis\Response\ResponseInterface;
  18. use Predis\Response\ServerException;
  19. /**
  20. * Command pipeline wrapped into a MULTI / EXEC transaction.
  21. *
  22. * @author Daniele Alessandri <suppakilla@gmail.com>
  23. */
  24. class Atomic extends Pipeline
  25. {
  26. /**
  27. * {@inheritdoc}
  28. */
  29. public function __construct(ClientInterface $client)
  30. {
  31. if (!$client->getProfile()->supportsCommands(array('multi', 'exec', 'discard'))) {
  32. throw new ClientException(
  33. "The current profile does not support 'MULTI', 'EXEC' and 'DISCARD'."
  34. );
  35. }
  36. parent::__construct($client);
  37. }
  38. /**
  39. * {@inheritdoc}
  40. */
  41. protected function getConnection()
  42. {
  43. $connection = $this->getClient()->getConnection();
  44. if (!$connection instanceof NodeConnectionInterface) {
  45. $class = __CLASS__;
  46. throw new ClientException("The class '$class' does not support aggregate connections.");
  47. }
  48. return $connection;
  49. }
  50. /**
  51. * {@inheritdoc}
  52. */
  53. protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
  54. {
  55. $profile = $this->getClient()->getProfile();
  56. $connection->executeCommand($profile->createCommand('multi'));
  57. foreach ($commands as $command) {
  58. $connection->writeRequest($command);
  59. }
  60. foreach ($commands as $command) {
  61. $response = $connection->readResponse($command);
  62. if ($response instanceof ErrorResponseInterface) {
  63. $connection->executeCommand($profile->createCommand('discard'));
  64. throw new ServerException($response->getMessage());
  65. }
  66. }
  67. $executed = $connection->executeCommand($profile->createCommand('exec'));
  68. if (!isset($executed)) {
  69. // TODO: should be throwing a more appropriate exception.
  70. throw new ClientException(
  71. 'The underlying transaction has been aborted by the server.'
  72. );
  73. }
  74. if (count($executed) !== count($commands)) {
  75. $expected = count($commands);
  76. $received = count($executed);
  77. throw new ClientException(
  78. "Invalid number of responses [expected $expected, received $received]."
  79. );
  80. }
  81. $responses = array();
  82. $sizeOfPipe = count($commands);
  83. $exceptions = $this->throwServerExceptions();
  84. for ($i = 0; $i < $sizeOfPipe; $i++) {
  85. $command = $commands->dequeue();
  86. $response = $executed[$i];
  87. if (!$response instanceof ResponseInterface) {
  88. $responses[] = $command->parseResponse($response);
  89. } elseif ($response instanceof ErrorResponseInterface && $exceptions) {
  90. $this->exception($connection, $response);
  91. } else {
  92. $responses[] = $response;
  93. }
  94. unset($executed[$i]);
  95. }
  96. return $responses;
  97. }
  98. }