Pipeline.php 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Pipeline;
  11. use Exception;
  12. use InvalidArgumentException;
  13. use SplQueue;
  14. use Predis\BasicClientInterface;
  15. use Predis\ClientException;
  16. use Predis\ClientInterface;
  17. use Predis\ExecutableContextInterface;
  18. use Predis\Command\CommandInterface;
  19. use Predis\Connection\ConnectionInterface;
  20. use Predis\Connection\ReplicationConnectionInterface;
  21. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  22. use Predis\Response\ResponseInterface;
  23. use Predis\Response\ServerException;
  24. /**
  25. * Implementation of a command pipeline in which write and read operations of
  26. * Redis commands are pipelined to alleviate the effects of network round-trips.
  27. *
  28. * @author Daniele Alessandri <suppakilla@gmail.com>
  29. */
  30. class Pipeline implements BasicClientInterface, ExecutableContextInterface
  31. {
  32. private $client;
  33. private $pipeline;
  34. private $responses = array();
  35. private $running = false;
  36. /**
  37. * @param ClientInterface $client Client instance used by the context.
  38. */
  39. public function __construct(ClientInterface $client)
  40. {
  41. $this->client = $client;
  42. $this->pipeline = new SplQueue();
  43. }
  44. /**
  45. * Queues a command into the pipeline buffer.
  46. *
  47. * @param string $method Command ID.
  48. * @param array $arguments Arguments for the command.
  49. * @return $this
  50. */
  51. public function __call($method, $arguments)
  52. {
  53. $command = $this->client->createCommand($method, $arguments);
  54. $this->recordCommand($command);
  55. return $this;
  56. }
  57. /**
  58. * Queues a command instance into the pipeline buffer.
  59. *
  60. * @param CommandInterface $command Command to be queued in the buffer.
  61. */
  62. protected function recordCommand(CommandInterface $command)
  63. {
  64. $this->pipeline->enqueue($command);
  65. }
  66. /**
  67. * Queues a command instance into the pipeline buffer.
  68. *
  69. * @param CommandInterface $command Command instance to be queued in the buffer.
  70. * @return $this
  71. */
  72. public function executeCommand(CommandInterface $command)
  73. {
  74. $this->recordCommand($command);
  75. return $this;
  76. }
  77. /**
  78. * Throws an exception on -ERR responses returned by Redis.
  79. *
  80. * @param ConnectionInterface $connection Redis connection that returned the error.
  81. * @param ErrorResponseInterface $response Instance of the error response.
  82. */
  83. protected function exception(ConnectionInterface $connection, ErrorResponseInterface $response)
  84. {
  85. $connection->disconnect();
  86. $message = $response->getMessage();
  87. throw new ServerException($message);
  88. }
  89. /**
  90. * Returns the underlying connection to be used by the pipeline.
  91. *
  92. * @return ConnectionInterface
  93. */
  94. protected function getConnection()
  95. {
  96. $connection = $this->getClient()->getConnection();
  97. if ($connection instanceof ReplicationConnectionInterface) {
  98. $connection->switchTo('master');
  99. }
  100. return $connection;
  101. }
  102. /**
  103. * Implements the logic to flush the queued commands and read the responses
  104. * from the current connection.
  105. *
  106. * @param ConnectionInterface $connection Current connection instance.
  107. * @param SplQueue $commands Queued commands.
  108. * @return array
  109. */
  110. protected function executePipeline(ConnectionInterface $connection, SplQueue $commands)
  111. {
  112. foreach ($commands as $command) {
  113. $connection->writeRequest($command);
  114. }
  115. $responses = array();
  116. $exceptions = $this->throwServerExceptions();
  117. while (!$commands->isEmpty()) {
  118. $command = $commands->dequeue();
  119. $response = $connection->readResponse($command);
  120. if (!$response instanceof ResponseInterface) {
  121. $responses[] = $command->parseResponse($response);
  122. } elseif ($response instanceof ErrorResponseInterface && $exceptions) {
  123. $this->exception($connection, $response);
  124. } else {
  125. $responses[] = $response;
  126. }
  127. }
  128. return $responses;
  129. }
  130. /**
  131. * Flushes the buffer holding all of the commands queued so far.
  132. *
  133. * @param bool $send Specifies if the commands in the buffer should be sent to Redis.
  134. * @return $this
  135. */
  136. public function flushPipeline($send = true)
  137. {
  138. if ($send && !$this->pipeline->isEmpty()) {
  139. $responses = $this->executePipeline($this->getConnection(), $this->pipeline);
  140. $this->responses = array_merge($this->responses, $responses);
  141. } else {
  142. $this->pipeline = new SplQueue();
  143. }
  144. return $this;
  145. }
  146. /**
  147. * Marks the running status of the pipeline.
  148. *
  149. * @param bool $bool Sets the running status of the pipeline.
  150. */
  151. private function setRunning($bool)
  152. {
  153. if ($bool && $this->running) {
  154. throw new ClientException('The current pipeline context is already being executed.');
  155. }
  156. $this->running = $bool;
  157. }
  158. /**
  159. * Handles the actual execution of the whole pipeline.
  160. *
  161. * @param mixed $callable Optional callback for execution.
  162. * @return array
  163. */
  164. public function execute($callable = null)
  165. {
  166. if ($callable && !is_callable($callable)) {
  167. throw new InvalidArgumentException('The argument must be a callable object.');
  168. }
  169. $exception = null;
  170. $this->setRunning(true);
  171. try {
  172. if ($callable) {
  173. call_user_func($callable, $this);
  174. }
  175. $this->flushPipeline();
  176. } catch (Exception $exception) {
  177. // NOOP
  178. }
  179. $this->setRunning(false);
  180. if ($exception) {
  181. throw $exception;
  182. }
  183. return $this->responses;
  184. }
  185. /**
  186. * Returns if the pipeline should throw exceptions on server errors.
  187. *
  188. * @todo Awful naming...
  189. * @return bool
  190. */
  191. protected function throwServerExceptions()
  192. {
  193. return (bool) $this->client->getOptions()->exceptions;
  194. }
  195. /**
  196. * Returns the underlying client instance used by the pipeline object.
  197. *
  198. * @return ClientInterface
  199. */
  200. public function getClient()
  201. {
  202. return $this->client;
  203. }
  204. }