PipelineContext.php 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. <?php
  2. namespace Predis\Pipeline;
  3. use Predis\Client;
  4. use Predis\Helpers;
  5. use Predis\ClientException;
  6. use Predis\Commands\ICommand;
  7. class PipelineContext
  8. {
  9. private $_client;
  10. private $_executor;
  11. private $_pipeline = array();
  12. private $_replies = array();
  13. private $_running = false;
  14. public function __construct(Client $client, Array $options = null)
  15. {
  16. $this->_client = $client;
  17. $this->_executor = $this->getExecutor($client, $options ?: array());
  18. }
  19. protected function getExecutor(Client $client, Array $options)
  20. {
  21. if (!$options) {
  22. return new StandardExecutor();
  23. }
  24. if (isset($options['executor'])) {
  25. $executor = $options['executor'];
  26. if (!$executor instanceof IPipelineExecutor) {
  27. throw new \InvalidArgumentException(
  28. 'The executor option accepts only instances ' .
  29. 'of Predis\Pipeline\IPipelineExecutor'
  30. );
  31. }
  32. return $executor;
  33. }
  34. if (isset($options['safe']) && $options['safe'] == true) {
  35. $isCluster = Helpers::isCluster($client->getConnection());
  36. return $isCluster ? new SafeClusterExecutor() : new SafeExecutor();
  37. }
  38. return new StandardExecutor();
  39. }
  40. public function __call($method, $arguments)
  41. {
  42. $command = $this->_client->createCommand($method, $arguments);
  43. $this->recordCommand($command);
  44. return $this;
  45. }
  46. protected function recordCommand(ICommand $command)
  47. {
  48. $this->_pipeline[] = $command;
  49. }
  50. public function executeCommand(ICommand $command)
  51. {
  52. $this->recordCommand($command);
  53. }
  54. public function flushPipeline()
  55. {
  56. if (count($this->_pipeline) > 0) {
  57. $connection = $this->_client->getConnection();
  58. $replies = $this->_executor->execute($connection, $this->_pipeline);
  59. $this->_replies = array_merge($this->_replies, $replies);
  60. $this->_pipeline = array();
  61. }
  62. return $this;
  63. }
  64. private function setRunning($bool)
  65. {
  66. if ($bool === true && $this->_running === true) {
  67. throw new ClientException("This pipeline is already opened");
  68. }
  69. $this->_running = $bool;
  70. }
  71. public function execute($block = null)
  72. {
  73. if ($block && !is_callable($block)) {
  74. throw new \InvalidArgumentException('Argument passed must be a callable object');
  75. }
  76. $this->setRunning(true);
  77. $pipelineBlockException = null;
  78. try {
  79. if ($block !== null) {
  80. $block($this);
  81. }
  82. $this->flushPipeline();
  83. }
  84. catch (\Exception $exception) {
  85. $pipelineBlockException = $exception;
  86. }
  87. $this->setRunning(false);
  88. if ($pipelineBlockException !== null) {
  89. throw $pipelineBlockException;
  90. }
  91. return $this->_replies;
  92. }
  93. }