PipelineContext.php 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. <?php
  2. namespace Predis\Pipeline;
  3. use Predis\Client;
  4. use Predis\Helpers;
  5. use Predis\ClientException;
  6. use Predis\Commands\ICommand;
  7. class PipelineContext {
  8. private $_client;
  9. private $_executor;
  10. private $_pipeline = array();
  11. private $_replies = array();
  12. private $_running = false;
  13. public function __construct(Client $client, Array $options = null) {
  14. $this->_client = $client;
  15. $this->_executor = $this->getExecutor($client, $options ?: array());
  16. }
  17. protected function getExecutor(Client $client, Array $options) {
  18. if (!$options) {
  19. return new StandardExecutor();
  20. }
  21. if (isset($options['executor'])) {
  22. $executor = $options['executor'];
  23. if (!$executor instanceof IPipelineExecutor) {
  24. throw new \InvalidArgumentException(
  25. 'The executor option accepts only instances ' .
  26. 'of Predis\Pipeline\IPipelineExecutor'
  27. );
  28. }
  29. return $executor;
  30. }
  31. if (isset($options['safe']) && $options['safe'] == true) {
  32. $isCluster = Helpers::isCluster($client->getConnection());
  33. return $isCluster ? new SafeClusterExecutor() : new SafeExecutor();
  34. }
  35. return new StandardExecutor();
  36. }
  37. public function __call($method, $arguments) {
  38. $command = $this->_client->createCommand($method, $arguments);
  39. $this->recordCommand($command);
  40. return $this;
  41. }
  42. protected function recordCommand(ICommand $command) {
  43. $this->_pipeline[] = $command;
  44. }
  45. public function executeCommand(ICommand $command) {
  46. $this->recordCommand($command);
  47. }
  48. public function flushPipeline() {
  49. if (count($this->_pipeline) > 0) {
  50. $connection = $this->_client->getConnection();
  51. $replies = $this->_executor->execute($connection, $this->_pipeline);
  52. $this->_replies = array_merge($this->_replies, $replies);
  53. $this->_pipeline = array();
  54. }
  55. return $this;
  56. }
  57. private function setRunning($bool) {
  58. if ($bool === true && $this->_running === true) {
  59. throw new ClientException("This pipeline is already opened");
  60. }
  61. $this->_running = $bool;
  62. }
  63. public function execute($block = null) {
  64. if ($block && !is_callable($block)) {
  65. throw new \InvalidArgumentException('Argument passed must be a callable object');
  66. }
  67. $this->setRunning(true);
  68. $pipelineBlockException = null;
  69. try {
  70. if ($block !== null) {
  71. $block($this);
  72. }
  73. $this->flushPipeline();
  74. }
  75. catch (\Exception $exception) {
  76. $pipelineBlockException = $exception;
  77. }
  78. $this->setRunning(false);
  79. if ($pipelineBlockException !== null) {
  80. throw $pipelineBlockException;
  81. }
  82. return $this->_replies;
  83. }
  84. }