Pipeline.php 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use Exception;
  12. use InvalidArgumentException;
  13. use SplQueue;
  14. use Predis\BasicClientInterface;
  15. use Predis\ClientException;
  16. use Predis\ClientInterface;
  17. use Predis\ExecutableContextInterface;
  18. use Predis\Command\CommandInterface;
  19. use Predis\Connection\ConnectionInterface;
  20. use Predis\Connection\ReplicationConnectionInterface;
  21. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  22. use Predis\Response\ResponseInterface;
  23. use Predis\Response\ServerException;
  24. /**
  25. * Implementation of a command pipeline in which write and read operations of
  26. * Redis commands are pipelined to alleviate the effects of network round-trips.
  27. *
  28. * @author Daniele Alessandri <suppakilla@gmail.com>
  29. */
  30. class Pipeline implements BasicClientInterface, ExecutableContextInterface
  31. {
  32. private $client;
  33. private $pipeline;
  34. private $responses = array();
  35. private $running = false;
  36. /**
  37. * @param ClientInterface $client Client instance used by the context.
  38. */
  39. public function __construct(ClientInterface $client)
  40. {
  41. $this->client = $client;
  42. $this->pipeline = new SplQueue();
  43. }
  44. /**
  45. * Queues a command into the pipeline buffer.
  46. *
  47. * @param string $method Command ID.
  48. * @param array $arguments Arguments for the command.
  49. * @return Pipeline
  50. */
  51. public function __call($method, $arguments)
  52. {
  53. $command = $this->client->createCommand($method, $arguments);
  54. $this->recordCommand($command);
  55. return $this;
  56. }
  57. /**
  58. * Queues a command instance into the pipeline buffer.
  59. *
  60. * @param CommandInterface $command Command to be queued in the buffer.
  61. */
  62. protected function recordCommand(CommandInterface $command)
  63. {
  64. $this->pipeline->enqueue($command);
  65. }
  66. /**
  67. * Queues a command instance into the pipeline buffer.
  68. *
  69. * @param CommandInterface $command Command to be queued in the buffer.
  70. */
  71. public function executeCommand(CommandInterface $command)
  72. {
  73. $this->recordCommand($command);
  74. }
  75. /**
  76. * Throws an exception on -ERR responses returned by Redis.
  77. *
  78. * @param ConnectionInterface $connection Redis connection that returned the error.
  79. * @param ErrorResponseInterface $response Instance of the error response.
  80. */
  81. protected function exception(ConnectionInterface $connection, ErrorResponseInterface $response)
  82. {
  83. $connection->disconnect();
  84. $message = $response->getMessage();
  85. throw new ServerException($message);
  86. }
  87. /**
  88. * Returns the underlying connection to be used by the pipeline.
  89. *
  90. * @return ConnectionInterface
  91. */
  92. protected function getConnection()
  93. {
  94. $connection = $this->getClient()->getConnection();
  95. if ($connection instanceof ReplicationConnectionInterface) {
  96. $connection->switchTo('master');
  97. }
  98. return $connection;
  99. }
  100. /**
  101. * Implements the logic to flush the queued commands and read the responses
  102. * from the current connection.
  103. *
  104. * @param ConnectionInterface $connection Current connection instance.
  105. * @param SplQueue $commands Queued commands.
  106. * @return array
  107. */
  108. protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
  109. {
  110. foreach ($commands as $command) {
  111. $connection->writeRequest($command);
  112. }
  113. $responses = array();
  114. $exceptions = $this->throwServerExceptions();
  115. while (!$commands->isEmpty()) {
  116. $command = $commands->dequeue();
  117. $response = $connection->readResponse($command);
  118. if (!$response instanceof ResponseInterface) {
  119. $responses[] = $command->parseResponse($response);
  120. } elseif ($response instanceof ErrorResponseInterface && $exceptions) {
  121. $this->exception($connection, $response);
  122. } else {
  123. $responses[] = $response;
  124. }
  125. }
  126. return $responses;
  127. }
  128. /**
  129. * Flushes the buffer holding all of the commands queued so far.
  130. *
  131. * @param bool $send Specifies if the commands in the buffer should be sent to Redis.
  132. * @return Pipeline
  133. */
  134. public function flushPipeline($send = true)
  135. {
  136. if ($send && !$this->pipeline->isEmpty()) {
  137. $responses = $this->executePipeline($this->getConnection(), $this->pipeline);
  138. $this->responses = array_merge($this->responses, $responses);
  139. } else {
  140. $this->pipeline = new SplQueue();
  141. }
  142. return $this;
  143. }
  144. /**
  145. * Marks the running status of the pipeline.
  146. *
  147. * @param bool $bool Sets the running status of the pipeline.
  148. */
  149. private function setRunning($bool)
  150. {
  151. if ($bool && $this->running) {
  152. throw new ClientException('The current pipeline context is already being executed.');
  153. }
  154. $this->running = $bool;
  155. }
  156. /**
  157. * Handles the actual execution of the whole pipeline.
  158. *
  159. * @param mixed $callable Optional callback for execution.
  160. * @return array
  161. */
  162. public function execute($callable = null)
  163. {
  164. if ($callable && !is_callable($callable)) {
  165. throw new InvalidArgumentException('The argument must be a callable object.');
  166. }
  167. $exception = null;
  168. $this->setRunning(true);
  169. try {
  170. if ($callable) {
  171. call_user_func($callable, $this);
  172. }
  173. $this->flushPipeline();
  174. } catch (Exception $exception) {
  175. // NOOP
  176. }
  177. $this->setRunning(false);
  178. if ($exception) {
  179. throw $exception;
  180. }
  181. return $this->responses;
  182. }
  183. /**
  184. * Returns if the pipeline should throw exceptions on server errors.
  185. *
  186. * @todo Awful naming...
  187. * @return bool
  188. */
  189. protected function throwServerExceptions()
  190. {
  191. return (bool) $this->client->getOptions()->exceptions;
  192. }
  193. /**
  194. * Returns the underlying client instance used by the pipeline object.
  195. *
  196. * @return ClientInterface
  197. */
  198. public function getClient()
  199. {
  200. return $this->client;
  201. }
  202. }