SafeClusterExecutor.php 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use SplQueue;
  12. use Predis\CommunicationException;
  13. use Predis\Connection\ConnectionInterface;
  14. /**
  15. * Implements a pipeline executor strategy for connection clusters that does
  16. * not fail when an error is encountered, but adds the returned error in the
  17. * replies array.
  18. *
  19. * @author Daniele Alessandri <suppakilla@gmail.com>
  20. */
  21. class SafeClusterExecutor implements PipelineExecutorInterface
  22. {
  23. /**
  24. * {@inheritdoc}
  25. */
  26. public function execute(ConnectionInterface $connection, SplQueue $commands)
  27. {
  28. $size = count($commands);
  29. $values = array();
  30. $connectionExceptions = array();
  31. foreach ($commands as $command) {
  32. $cmdConnection = $connection->getConnection($command);
  33. if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) {
  34. continue;
  35. }
  36. try {
  37. $cmdConnection->writeCommand($command);
  38. }
  39. catch (CommunicationException $exception) {
  40. $connectionExceptions[spl_object_hash($cmdConnection)] = $exception;
  41. }
  42. }
  43. for ($i = 0; $i < $size; $i++) {
  44. $command = $commands->dequeue();
  45. $cmdConnection = $connection->getConnection($command);
  46. $connectionObjectHash = spl_object_hash($cmdConnection);
  47. if (isset($connectionExceptions[$connectionObjectHash])) {
  48. $values[$i] = $connectionExceptions[$connectionObjectHash];
  49. continue;
  50. }
  51. try {
  52. $response = $cmdConnection->readResponse($command);
  53. $values[$i] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
  54. }
  55. catch (CommunicationException $exception) {
  56. $values[$i] = $exception;
  57. $connectionExceptions[$connectionObjectHash] = $exception;
  58. }
  59. }
  60. return $values;
  61. }
  62. }