Atomic.php 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use SplQueue;
  12. use Predis\ClientException;
  13. use Predis\ClientInterface;
  14. use Predis\Connection\ConnectionInterface;
  15. use Predis\Connection\SingleConnectionInterface;
  16. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  17. use Predis\Response\ResponseInterface;
  18. use Predis\Response\ServerException;
  19. /**
  20. * Command pipeline wrapped into a MULTI / EXEC transaction.
  21. *
  22. * @author Daniele Alessandri <suppakilla@gmail.com>
  23. */
  24. class Atomic extends Pipeline
  25. {
  26. /**
  27. * {@inheritdoc}
  28. */
  29. public function __construct(ClientInterface $client)
  30. {
  31. if (!$client->getProfile()->supportsCommands(array('multi', 'exec', 'discard'))) {
  32. throw new ClientException(
  33. 'The specified server profile must support MULTI, EXEC and DISCARD.'
  34. );
  35. }
  36. parent::__construct($client);
  37. }
  38. /**
  39. * {@inheritdoc}
  40. */
  41. protected function getConnection()
  42. {
  43. $connection = $this->getClient()->getConnection();
  44. if (!$connection instanceof SingleConnectionInterface) {
  45. $class = __CLASS__;
  46. throw new ClientException(
  47. "$class can be used only with connections to single nodes"
  48. );
  49. }
  50. return $connection;
  51. }
  52. /**
  53. * {@inheritdoc}
  54. */
  55. protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
  56. {
  57. $profile = $this->getClient()->getProfile();
  58. $connection->executeCommand($profile->createCommand('multi'));
  59. foreach ($commands as $command) {
  60. $connection->writeRequest($command);
  61. }
  62. foreach ($commands as $command) {
  63. $response = $connection->readResponse($command);
  64. if ($response instanceof ErrorResponseInterface) {
  65. $connection->executeCommand($profile->createCommand('discard'));
  66. throw new ServerException($response->getMessage());
  67. }
  68. }
  69. $executed = $connection->executeCommand($profile->createCommand('exec'));
  70. if (!isset($executed)) {
  71. // TODO: should be throwing a more appropriate exception.
  72. throw new ClientException(
  73. 'The underlying transaction has been aborted by the server'
  74. );
  75. }
  76. if (count($executed) !== count($commands)) {
  77. throw new ClientException(
  78. "Invalid number of responses [expected: ".count($commands)." - actual: ".count($executed)."]"
  79. );
  80. }
  81. $responses = array();
  82. $sizeOfPipe = count($commands);
  83. $exceptions = $this->throwServerExceptions();
  84. for ($i = 0; $i < $sizeOfPipe; $i++) {
  85. $command = $commands->dequeue();
  86. $response = $executed[$i];
  87. if (!$response instanceof ResponseInterface) {
  88. $responses[] = $command->parseResponse($response);
  89. } else if ($response instanceof ErrorResponseInterface && $exceptions) {
  90. $this->exception($connection, $response);
  91. } else {
  92. $responses[] = $response;
  93. }
  94. unset($executed[$i]);
  95. }
  96. return $responses;
  97. }
  98. }