PipelineContext.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. <?php
  2. namespace Predis\Pipeline;
  3. use Predis\Client;
  4. use Predis\Helpers;
  5. use Predis\ClientException;
  6. use Predis\Commands\ICommand;
  7. class PipelineContext {
  8. private $_client, $_pipelineBuffer, $_returnValues, $_running, $_executor;
  9. public function __construct(Client $client, Array $options = null) {
  10. $this->_client = $client;
  11. $this->_executor = $this->getExecutor($client, $options ?: array());
  12. $this->_pipelineBuffer = array();
  13. $this->_returnValues = array();
  14. }
  15. protected function getExecutor(Client $client, Array $options) {
  16. if (!$options) {
  17. return new StandardExecutor();
  18. }
  19. if (isset($options['executor'])) {
  20. $executor = $options['executor'];
  21. if (!$executor instanceof IPipelineExecutor) {
  22. throw new \ArgumentException();
  23. }
  24. return $executor;
  25. }
  26. if (isset($options['safe']) && $options['safe'] == true) {
  27. $isCluster = Helpers::isCluster($client->getConnection());
  28. return $isCluster ? new SafeClusterExecutor() : new SafeExecutor();
  29. }
  30. return new StandardExecutor();
  31. }
  32. public function __call($method, $arguments) {
  33. $command = $this->_client->createCommand($method, $arguments);
  34. $this->recordCommand($command);
  35. return $this;
  36. }
  37. protected function recordCommand(ICommand $command) {
  38. $this->_pipelineBuffer[] = $command;
  39. }
  40. public function flushPipeline() {
  41. if (count($this->_pipelineBuffer) > 0) {
  42. $connection = $this->_client->getConnection();
  43. $this->_returnValues = array_merge(
  44. $this->_returnValues,
  45. $this->_executor->execute($connection, $this->_pipelineBuffer)
  46. );
  47. $this->_pipelineBuffer = array();
  48. }
  49. return $this;
  50. }
  51. private function setRunning($bool) {
  52. if ($bool === true && $this->_running === true) {
  53. throw new ClientException("This pipeline is already opened");
  54. }
  55. $this->_running = $bool;
  56. }
  57. public function execute($block = null) {
  58. if ($block && !is_callable($block)) {
  59. throw new \InvalidArgumentException('Argument passed must be a callable object');
  60. }
  61. $this->setRunning(true);
  62. $pipelineBlockException = null;
  63. try {
  64. if ($block !== null) {
  65. $block($this);
  66. }
  67. $this->flushPipeline();
  68. }
  69. catch (\Exception $exception) {
  70. $pipelineBlockException = $exception;
  71. }
  72. $this->setRunning(false);
  73. if ($pipelineBlockException !== null) {
  74. throw $pipelineBlockException;
  75. }
  76. return $this->_returnValues;
  77. }
  78. }