Atomic.php 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use SplQueue;
  12. use Predis\ClientException;
  13. use Predis\ClientInterface;
  14. use Predis\Connection\ConnectionInterface;
  15. use Predis\Connection\SingleConnectionInterface;
  16. use Predis\Profile;
  17. use Predis\Response;
  18. /**
  19. * Command pipeline wrapped into a MULTI / EXEC transaction.
  20. *
  21. * @author Daniele Alessandri <suppakilla@gmail.com>
  22. */
  23. class Atomic extends Pipeline
  24. {
  25. /**
  26. * {@inheritdoc}
  27. */
  28. public function __construct(ClientInterface $client)
  29. {
  30. if (!$client->getProfile()->supportsCommands(array('multi', 'exec', 'discard'))) {
  31. throw new ClientException(
  32. 'The specified server profile must support MULTI, EXEC and DISCARD.'
  33. );
  34. }
  35. parent::__construct($client);
  36. }
  37. /**
  38. * {@inheritdoc}
  39. */
  40. protected function getConnection()
  41. {
  42. $connection = $this->getClient()->getConnection();
  43. if (!$connection instanceof SingleConnectionInterface) {
  44. $class = __CLASS__;
  45. throw new ClientException(
  46. "$class can be used only with connections to single nodes"
  47. );
  48. }
  49. return $connection;
  50. }
  51. /**
  52. * {@inheritdoc}
  53. */
  54. protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
  55. {
  56. $profile = $this->getClient()->getProfile();
  57. $connection->executeCommand($profile->createCommand('multi'));
  58. foreach ($commands as $command) {
  59. $connection->writeRequest($command);
  60. }
  61. foreach ($commands as $command) {
  62. $response = $connection->readResponse($command);
  63. if ($response instanceof Response\ErrorInterface) {
  64. $connection->executeCommand($profile->createCommand('discard'));
  65. throw new Response\ServerException($response->getMessage());
  66. }
  67. }
  68. $executed = $connection->executeCommand($profile->createCommand('exec'));
  69. if (!isset($executed)) {
  70. // TODO: should be throwing a more appropriate exception.
  71. throw new ClientException(
  72. 'The underlying transaction has been aborted by the server'
  73. );
  74. }
  75. if (count($executed) !== count($commands)) {
  76. throw new ClientException(
  77. "Invalid number of responses [expected: ".count($commands)." - actual: ".count($executed)."]"
  78. );
  79. }
  80. $responses = array();
  81. $sizeOfPipe = count($commands);
  82. $exceptions = $this->throwServerExceptions();
  83. for ($i = 0; $i < $sizeOfPipe; $i++) {
  84. $command = $commands->dequeue();
  85. $response = $executed[$i];
  86. if (!$response instanceof Response\ResponseInterface) {
  87. $responses[] = $command->parseResponse($response);
  88. } else if ($response instanceof Response\ErrorInterface && $exceptions) {
  89. $this->exception($connection, $response);
  90. } else {
  91. $responses[] = $response;
  92. }
  93. unset($executed[$i]);
  94. }
  95. return $responses;
  96. }
  97. }