SafeClusterExecutor.php 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use SplQueue;
  12. use Predis\CommunicationException;
  13. use Predis\Connection\ConnectionInterface;
  14. /**
  15. * Implements a pipeline executor strategy for connection clusters that does
  16. * not fail when an error is encountered, but adds the returned error in the
  17. * replies array.
  18. *
  19. * @author Daniele Alessandri <suppakilla@gmail.com>
  20. */
  21. class SafeClusterExecutor implements PipelineExecutorInterface
  22. {
  23. /**
  24. * {@inheritdoc}
  25. */
  26. public function execute(ConnectionInterface $connection, SplQueue $commands)
  27. {
  28. $size = count($commands);
  29. $values = array();
  30. $connectionExceptions = array();
  31. foreach ($commands as $command) {
  32. $cmdConnection = $connection->getConnection($command);
  33. if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) {
  34. continue;
  35. }
  36. try {
  37. $cmdConnection->writeCommand($command);
  38. } catch (CommunicationException $exception) {
  39. $connectionExceptions[spl_object_hash($cmdConnection)] = $exception;
  40. }
  41. }
  42. for ($i = 0; $i < $size; $i++) {
  43. $command = $commands->dequeue();
  44. $cmdConnection = $connection->getConnection($command);
  45. $connectionObjectHash = spl_object_hash($cmdConnection);
  46. if (isset($connectionExceptions[$connectionObjectHash])) {
  47. $values[$i] = $connectionExceptions[$connectionObjectHash];
  48. continue;
  49. }
  50. try {
  51. $response = $cmdConnection->readResponse($command);
  52. $values[$i] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
  53. } catch (CommunicationException $exception) {
  54. $values[$i] = $exception;
  55. $connectionExceptions[$connectionObjectHash] = $exception;
  56. }
  57. }
  58. return $values;
  59. }
  60. }