PipelineContext.php 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. <?php
  2. namespace Predis\Pipeline;
  3. use Predis\Client;
  4. use Predis\Helpers;
  5. use Predis\ClientException;
  6. use Predis\Commands\ICommand;
  7. class PipelineContext {
  8. private $_client;
  9. private $_executor;
  10. private $_pipeline = array();
  11. private $_replies = array();
  12. private $_running = false;
  13. public function __construct(Client $client, Array $options = null) {
  14. $this->_client = $client;
  15. $this->_executor = $this->getExecutor($client, $options ?: array());
  16. }
  17. protected function getExecutor(Client $client, Array $options) {
  18. if (!$options) {
  19. return new StandardExecutor();
  20. }
  21. if (isset($options['executor'])) {
  22. $executor = $options['executor'];
  23. if (!$executor instanceof IPipelineExecutor) {
  24. throw new \ArgumentException();
  25. }
  26. return $executor;
  27. }
  28. if (isset($options['safe']) && $options['safe'] == true) {
  29. $isCluster = Helpers::isCluster($client->getConnection());
  30. return $isCluster ? new SafeClusterExecutor() : new SafeExecutor();
  31. }
  32. return new StandardExecutor();
  33. }
  34. public function __call($method, $arguments) {
  35. $command = $this->_client->createCommand($method, $arguments);
  36. $this->recordCommand($command);
  37. return $this;
  38. }
  39. protected function recordCommand(ICommand $command) {
  40. $this->_pipeline[] = $command;
  41. }
  42. public function executeCommand(ICommand $command) {
  43. $this->recordCommand($command);
  44. }
  45. public function flushPipeline() {
  46. if (count($this->_pipeline) > 0) {
  47. $connection = $this->_client->getConnection();
  48. $replies = $this->_executor->execute($connection, $this->_pipeline);
  49. $this->_replies = array_merge($this->_replies, $replies);
  50. $this->_pipeline = array();
  51. }
  52. return $this;
  53. }
  54. private function setRunning($bool) {
  55. if ($bool === true && $this->_running === true) {
  56. throw new ClientException("This pipeline is already opened");
  57. }
  58. $this->_running = $bool;
  59. }
  60. public function execute($block = null) {
  61. if ($block && !is_callable($block)) {
  62. throw new \InvalidArgumentException('Argument passed must be a callable object');
  63. }
  64. $this->setRunning(true);
  65. $pipelineBlockException = null;
  66. try {
  67. if ($block !== null) {
  68. $block($this);
  69. }
  70. $this->flushPipeline();
  71. }
  72. catch (\Exception $exception) {
  73. $pipelineBlockException = $exception;
  74. }
  75. $this->setRunning(false);
  76. if ($pipelineBlockException !== null) {
  77. throw $pipelineBlockException;
  78. }
  79. return $this->_replies;
  80. }
  81. }