Pipeline.php 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use Exception;
  12. use InvalidArgumentException;
  13. use SplQueue;
  14. use Predis\BasicClientInterface;
  15. use Predis\ClientException;
  16. use Predis\ClientInterface;
  17. use Predis\ExecutableContextInterface;
  18. use Predis\Command\CommandInterface;
  19. use Predis\Connection\ConnectionInterface;
  20. use Predis\Connection\ReplicationConnectionInterface;
  21. use Predis\Response;
  22. /**
  23. * Implementation of a command pipeline in which write and read operations of
  24. * Redis commands are pipelined to alleviate the effects of network round-trips.
  25. *
  26. * @author Daniele Alessandri <suppakilla@gmail.com>
  27. */
  28. class Pipeline implements BasicClientInterface, ExecutableContextInterface
  29. {
  30. private $client;
  31. private $pipeline;
  32. private $responses = array();
  33. private $running = false;
  34. /**
  35. * @param ClientInterface $client Client instance used by the context.
  36. */
  37. public function __construct(ClientInterface $client)
  38. {
  39. $this->client = $client;
  40. $this->pipeline = new SplQueue();
  41. }
  42. /**
  43. * Queues a command into the pipeline buffer.
  44. *
  45. * @param string $method Command ID.
  46. * @param array $arguments Arguments for the command.
  47. * @return Pipeline
  48. */
  49. public function __call($method, $arguments)
  50. {
  51. $command = $this->client->createCommand($method, $arguments);
  52. $this->recordCommand($command);
  53. return $this;
  54. }
  55. /**
  56. * Queues a command instance into the pipeline buffer.
  57. *
  58. * @param CommandInterface $command Command to be queued in the buffer.
  59. */
  60. protected function recordCommand(CommandInterface $command)
  61. {
  62. $this->pipeline->enqueue($command);
  63. }
  64. /**
  65. * Queues a command instance into the pipeline buffer.
  66. *
  67. * @param CommandInterface $command Command to be queued in the buffer.
  68. */
  69. public function executeCommand(CommandInterface $command)
  70. {
  71. $this->recordCommand($command);
  72. }
  73. /**
  74. * Throws an exception on -ERR responses returned by Redis.
  75. *
  76. * @param ConnectionInterface $connection Redis connection that returned the error.
  77. * @param Response\ErrorInterface $response Instance of the error response.
  78. */
  79. protected function exception(ConnectionInterface $connection, Response\ErrorInterface $response)
  80. {
  81. $connection->disconnect();
  82. $message = $response->getMessage();
  83. throw new Response\ServerException($message);
  84. }
  85. /**
  86. * Returns the underlying connection to be used by the pipeline.
  87. *
  88. * @return ConnectionInterface
  89. */
  90. protected function getConnection()
  91. {
  92. $connection = $this->getClient()->getConnection();
  93. if ($connection instanceof ReplicationConnectionInterface) {
  94. $connection->switchTo('master');
  95. }
  96. return $connection;
  97. }
  98. /**
  99. * Implements the logic to flush the queued commands and read the responses
  100. * from the current connection.
  101. *
  102. * @param ConnectionInterface $connection Current connection instance.
  103. * @param SplQueue $commands Queued commands.
  104. * @return array
  105. */
  106. protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
  107. {
  108. foreach ($commands as $command) {
  109. $connection->writeRequest($command);
  110. }
  111. $responses = array();
  112. $exceptions = $this->throwServerExceptions();
  113. while (!$commands->isEmpty()) {
  114. $command = $commands->dequeue();
  115. $response = $connection->readResponse($command);
  116. if (!$response instanceof Response\ResponseInterface) {
  117. $responses[] = $command->parseResponse($response);
  118. } else if ($response instanceof Response\ErrorInterface && $exceptions) {
  119. $this->exception($connection, $response);
  120. } else {
  121. $responses[] = $response;
  122. }
  123. }
  124. return $responses;
  125. }
  126. /**
  127. * Flushes the buffer holding all of the commands queued so far.
  128. *
  129. * @param bool $send Specifies if the commands in the buffer should be sent to Redis.
  130. * @return Pipeline
  131. */
  132. public function flushPipeline($send = true)
  133. {
  134. if ($send && !$this->pipeline->isEmpty()) {
  135. $responses = $this->executePipeline($this->getConnection(), $this->pipeline);
  136. $this->responses = array_merge($this->responses, $responses);
  137. } else {
  138. $this->pipeline = new SplQueue();
  139. }
  140. return $this;
  141. }
  142. /**
  143. * Marks the running status of the pipeline.
  144. *
  145. * @param bool $bool Sets the running status of the pipeline.
  146. */
  147. private function setRunning($bool)
  148. {
  149. if ($bool && $this->running) {
  150. throw new ClientException('This pipeline is already opened');
  151. }
  152. $this->running = $bool;
  153. }
  154. /**
  155. * Handles the actual execution of the whole pipeline.
  156. *
  157. * @param mixed $callable Optional callback for execution.
  158. * @return array
  159. */
  160. public function execute($callable = null)
  161. {
  162. if ($callable && !is_callable($callable)) {
  163. throw new InvalidArgumentException('Argument passed must be a callable object');
  164. }
  165. $exception = null;
  166. $this->setRunning(true);
  167. try {
  168. if ($callable) {
  169. call_user_func($callable, $this);
  170. }
  171. $this->flushPipeline();
  172. } catch (Exception $exception) {
  173. // NOOP
  174. }
  175. $this->setRunning(false);
  176. if ($exception) {
  177. throw $exception;
  178. }
  179. return $this->responses;
  180. }
  181. /**
  182. * Returns if the pipeline should throw exceptions on server errors.
  183. *
  184. * @todo Awful naming...
  185. * @return bool
  186. */
  187. protected function throwServerExceptions()
  188. {
  189. return (bool) $this->client->getOptions()->exceptions;
  190. }
  191. /**
  192. * Returns the underlying client instance used by the pipeline object.
  193. *
  194. * @return ClientInterface
  195. */
  196. public function getClient()
  197. {
  198. return $this->client;
  199. }
  200. }