ConnectionErrorProof.php 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use SplQueue;
  12. use Predis\NotSupportedException;
  13. use Predis\CommunicationException;
  14. use Predis\Connection\ClusterConnectionInterface;
  15. use Predis\Connection\ConnectionInterface;
  16. use Predis\Connection\SingleConnectionInterface;
  17. /**
  18. * Command pipeline that does not throw exceptions on connection errors, but
  19. * returns the exception instances as the rest of the response elements.
  20. *
  21. * @todo Awful naming!
  22. * @author Daniele Alessandri <suppakilla@gmail.com>
  23. */
  24. class ConnectionErrorProof extends Pipeline
  25. {
  26. /**
  27. * {@inheritdoc}
  28. */
  29. protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
  30. {
  31. if ($connection instanceof SingleConnectionInterface) {
  32. return $this->executePipelineNode($connection, $commands);
  33. } else if ($connection instanceof ClusterConnectionInterface) {
  34. return $this->executePipelineCluster($connection, $commands);
  35. } else {
  36. throw new NotSupportedException("Unsupported connection type");
  37. }
  38. }
  39. /**
  40. * {@inheritdoc}
  41. */
  42. public function executePipelineNode(SingleConnectionInterface $connection, SplQueue $commands)
  43. {
  44. $responses = array();
  45. $sizeOfPipe = count($commands);
  46. foreach ($commands as $command) {
  47. try {
  48. $connection->writeCommand($command);
  49. } catch (CommunicationException $exception) {
  50. return array_fill(0, $sizeOfPipe, $exception);
  51. }
  52. }
  53. for ($i = 0; $i < $sizeOfPipe; $i++) {
  54. $command = $commands->dequeue();
  55. try {
  56. $responses[$i] = $connection->readResponse($command);
  57. } catch (CommunicationException $exception) {
  58. $add = count($commands) - count($responses);
  59. $responses = array_merge($responses, array_fill(0, $add, $exception));
  60. break;
  61. }
  62. }
  63. return $responses;
  64. }
  65. /**
  66. * {@inheritdoc}
  67. */
  68. public function executePipelineCluster(ClusterConnectionInterface $connection, SplQueue $commands)
  69. {
  70. $responses = array();
  71. $sizeOfPipe = count($commands);
  72. $exceptions = array();
  73. foreach ($commands as $command) {
  74. $cmdConnection = $connection->getConnection($command);
  75. if (isset($exceptions[spl_object_hash($cmdConnection)])) {
  76. continue;
  77. }
  78. try {
  79. $cmdConnection->writeCommand($command);
  80. } catch (CommunicationException $exception) {
  81. $exceptions[spl_object_hash($cmdConnection)] = $exception;
  82. }
  83. }
  84. for ($i = 0; $i < $sizeOfPipe; $i++) {
  85. $command = $commands->dequeue();
  86. $cmdConnection = $connection->getConnection($command);
  87. $connectionHash = spl_object_hash($cmdConnection);
  88. if (isset($exceptions[$connectionHash])) {
  89. $responses[$i] = $exceptions[$connectionHash];
  90. continue;
  91. }
  92. try {
  93. $responses[$i] = $cmdConnection->readResponse($command);
  94. } catch (CommunicationException $exception) {
  95. $responses[$i] = $exception;
  96. $exceptions[$connectionHash] = $exception;
  97. }
  98. }
  99. return $responses;
  100. }
  101. }