SafeClusterExecutor.php 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. <?php
  2. namespace Predis\Pipeline;
  3. use Predis\ServerException;
  4. use Predis\CommunicationException;
  5. use Predis\Network\IConnection;
  6. class SafeClusterExecutor implements IPipelineExecutor
  7. {
  8. public function execute(IConnection $connection, &$commands)
  9. {
  10. $connectionExceptions = array();
  11. $sizeofPipe = count($commands);
  12. $values = array();
  13. foreach ($commands as $command) {
  14. $cmdConnection = $connection->getConnection($command);
  15. if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) {
  16. continue;
  17. }
  18. try {
  19. $cmdConnection->writeCommand($command);
  20. }
  21. catch (CommunicationException $exception) {
  22. $connectionExceptions[spl_object_hash($cmdConnection)] = $exception;
  23. }
  24. }
  25. for ($i = 0; $i < $sizeofPipe; $i++) {
  26. $command = $commands[$i];
  27. unset($commands[$i]);
  28. $cmdConnection = $connection->getConnection($command);
  29. $connectionObjectHash = spl_object_hash($cmdConnection);
  30. if (isset($connectionExceptions[$connectionObjectHash])) {
  31. $values[] = $connectionExceptions[$connectionObjectHash];
  32. continue;
  33. }
  34. try {
  35. $response = $cmdConnection->readResponse($command);
  36. $values[] = $response instanceof \Iterator ? iterator_to_array($response) : $response;
  37. }
  38. catch (ServerException $exception) {
  39. $values[] = $exception->toResponseError();
  40. }
  41. catch (CommunicationException $exception) {
  42. $values[] = $exception;
  43. $connectionExceptions[$connectionObjectHash] = $exception;
  44. }
  45. }
  46. return $values;
  47. }
  48. }