ConnectionErrorProof.php 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use SplQueue;
  12. use Predis\NotSupportedException;
  13. use Predis\CommunicationException;
  14. use Predis\Connection\ClusterConnectionInterface;
  15. use Predis\Connection\ConnectionInterface;
  16. use Predis\Connection\SingleConnectionInterface;
  17. /**
  18. * Command pipeline that does not throw exceptions on connection errors, but
  19. * returns the exception instances as the rest of the response elements.
  20. *
  21. * @todo Awful naming!
  22. * @author Daniele Alessandri <suppakilla@gmail.com>
  23. */
  24. class ConnectionErrorProof extends Pipeline
  25. {
  26. /**
  27. * {@inheritdoc}
  28. */
  29. protected function getConnection()
  30. {
  31. return $this->getClient()->getConnection();
  32. }
  33. /**
  34. * {@inheritdoc}
  35. */
  36. protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
  37. {
  38. if ($connection instanceof SingleConnectionInterface) {
  39. return $this->executeSingleNode($connection, $commands);
  40. } else if ($connection instanceof ClusterConnectionInterface) {
  41. return $this->executeCluster($connection, $commands);
  42. } else {
  43. throw new NotSupportedException("Unsupported connection type");
  44. }
  45. }
  46. /**
  47. * {@inheritdoc}
  48. */
  49. public function executeSingleNode(SingleConnectionInterface $connection, SplQueue $commands)
  50. {
  51. $responses = array();
  52. $sizeOfPipe = count($commands);
  53. foreach ($commands as $command) {
  54. try {
  55. $connection->writeRequest($command);
  56. } catch (CommunicationException $exception) {
  57. return array_fill(0, $sizeOfPipe, $exception);
  58. }
  59. }
  60. for ($i = 0; $i < $sizeOfPipe; $i++) {
  61. $command = $commands->dequeue();
  62. try {
  63. $responses[$i] = $connection->readResponse($command);
  64. } catch (CommunicationException $exception) {
  65. $add = count($commands) - count($responses);
  66. $responses = array_merge($responses, array_fill(0, $add, $exception));
  67. break;
  68. }
  69. }
  70. return $responses;
  71. }
  72. /**
  73. * {@inheritdoc}
  74. */
  75. public function executeCluster(ClusterConnectionInterface $connection, SplQueue $commands)
  76. {
  77. $responses = array();
  78. $sizeOfPipe = count($commands);
  79. $exceptions = array();
  80. foreach ($commands as $command) {
  81. $cmdConnection = $connection->getConnection($command);
  82. if (isset($exceptions[spl_object_hash($cmdConnection)])) {
  83. continue;
  84. }
  85. try {
  86. $cmdConnection->writeRequest($command);
  87. } catch (CommunicationException $exception) {
  88. $exceptions[spl_object_hash($cmdConnection)] = $exception;
  89. }
  90. }
  91. for ($i = 0; $i < $sizeOfPipe; $i++) {
  92. $command = $commands->dequeue();
  93. $cmdConnection = $connection->getConnection($command);
  94. $connectionHash = spl_object_hash($cmdConnection);
  95. if (isset($exceptions[$connectionHash])) {
  96. $responses[$i] = $exceptions[$connectionHash];
  97. continue;
  98. }
  99. try {
  100. $responses[$i] = $cmdConnection->readResponse($command);
  101. } catch (CommunicationException $exception) {
  102. $responses[$i] = $exception;
  103. $exceptions[$connectionHash] = $exception;
  104. }
  105. }
  106. return $responses;
  107. }
  108. }