SafeClusterExecutor.php 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use Predis\CommunicationException;
  12. use Predis\Connection\ConnectionInterface;
  13. /**
  14. * Implements a pipeline executor strategy for connection clusters that does
  15. * not fail when an error is encountered, but adds the returned error in the
  16. * replies array.
  17. *
  18. * @author Daniele Alessandri <suppakilla@gmail.com>
  19. */
  20. class SafeClusterExecutor implements PipelineExecutorInterface
  21. {
  22. /**
  23. * {@inheritdoc}
  24. */
  25. public function execute(ConnectionInterface $connection, Array &$commands)
  26. {
  27. $connectionExceptions = array();
  28. $sizeofPipe = count($commands);
  29. $values = array();
  30. foreach ($commands as $command) {
  31. $cmdConnection = $connection->getConnection($command);
  32. if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) {
  33. continue;
  34. }
  35. try {
  36. $cmdConnection->writeCommand($command);
  37. }
  38. catch (CommunicationException $exception) {
  39. $connectionExceptions[spl_object_hash($cmdConnection)] = $exception;
  40. }
  41. }
  42. for ($i = 0; $i < $sizeofPipe; $i++) {
  43. $command = $commands[$i];
  44. unset($commands[$i]);
  45. $cmdConnection = $connection->getConnection($command);
  46. $connectionObjectHash = spl_object_hash($cmdConnection);
  47. if (isset($connectionExceptions[$connectionObjectHash])) {
  48. $values[] = $connectionExceptions[$connectionObjectHash];
  49. continue;
  50. }
  51. try {
  52. $response = $cmdConnection->readResponse($command);
  53. $values[] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
  54. }
  55. catch (CommunicationException $exception) {
  56. $values[] = $exception;
  57. $connectionExceptions[$connectionObjectHash] = $exception;
  58. }
  59. }
  60. return $values;
  61. }
  62. }