RedisCluster.php 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Predis\ClientException;
  12. use Predis\NotSupportedException;
  13. use Predis\ResponseErrorInterface;
  14. use Predis\Command\CommandInterface;
  15. use Predis\Distribution\CRC16HashGenerator;
  16. /**
  17. * Abstraction for Redis cluster (Redis v3.0).
  18. *
  19. * @author Daniele Alessandri <suppakilla@gmail.com>
  20. */
  21. class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \Countable
  22. {
  23. private $pool;
  24. private $slots;
  25. private $connections;
  26. private $hashgenerator;
  27. /**
  28. * @param FactoryInterface $connections Connection factory object.
  29. */
  30. public function __construct(FactoryInterface $connections = null)
  31. {
  32. $this->pool = array();
  33. $this->slots = array();
  34. $this->connections = $connections;
  35. $this->hashgenerator = new CRC16HashGenerator();
  36. }
  37. /**
  38. * {@inheritdoc}
  39. */
  40. public function isConnected()
  41. {
  42. foreach ($this->pool as $connection) {
  43. if ($connection->isConnected()) {
  44. return true;
  45. }
  46. }
  47. return false;
  48. }
  49. /**
  50. * {@inheritdoc}
  51. */
  52. public function connect()
  53. {
  54. foreach ($this->pool as $connection) {
  55. $connection->connect();
  56. }
  57. }
  58. /**
  59. * {@inheritdoc}
  60. */
  61. public function disconnect()
  62. {
  63. foreach ($this->pool as $connection) {
  64. $connection->disconnect();
  65. }
  66. }
  67. /**
  68. * {@inheritdoc}
  69. */
  70. public function add(SingleConnectionInterface $connection)
  71. {
  72. $parameters = $connection->getParameters();
  73. $this->pool["{$parameters->host}:{$parameters->port}"] = $connection;
  74. }
  75. /**
  76. * {@inheritdoc}
  77. */
  78. public function remove(SingleConnectionInterface $connection)
  79. {
  80. if (($id = array_search($connection, $this->pool, true)) !== false) {
  81. unset($this->pool[$id]);
  82. return true;
  83. }
  84. return false;
  85. }
  86. /**
  87. * Removes a connection instance using its alias or index.
  88. *
  89. * @param string $connectionId Alias or index of a connection.
  90. * @return Boolean Returns true if the connection was in the pool.
  91. */
  92. public function removeById($connectionId)
  93. {
  94. if (isset($this->pool[$connectionId])) {
  95. unset($this->pool[$connectionId]);
  96. return true;
  97. }
  98. return false;
  99. }
  100. /**
  101. * {@inheritdoc}
  102. */
  103. public function getConnection(CommandInterface $command)
  104. {
  105. if ($hash = $command->getHash() === null) {
  106. $hash = $this->hashgenerator->hash($command->getArgument(0));
  107. if (!isset($hash)) {
  108. throw new NotSupportedException("Cannot send {$command->getId()} commands to redis-cluster");
  109. }
  110. }
  111. $slot = $hash & 0x0FFF;
  112. if (isset($this->slots[$slot])) {
  113. return $this->slots[$slot];
  114. }
  115. $connection = $this->pool[array_rand($this->pool)];
  116. $this->slots[$slot] = $connection;
  117. return $connection;
  118. }
  119. /**
  120. * {@inheritdoc}
  121. */
  122. public function getConnectionById($id = null)
  123. {
  124. if (!isset($id)) {
  125. throw new \InvalidArgumentException("A valid connection ID must be specified");
  126. }
  127. return isset($this->pool[$id]) ? $this->pool[$id] : null;
  128. }
  129. /**
  130. * Handles -MOVED or -ASK replies by re-executing the command on the server
  131. * specified by the Redis reply.
  132. *
  133. * @param CommandInterface $command Command that generated the -MOVE or -ASK reply.
  134. * @param string $request Type of request (either 'MOVED' or 'ASK').
  135. * @param string $details Parameters of the MOVED/ASK request.
  136. * @return mixed
  137. */
  138. protected function onMoveRequest(CommandInterface $command, $request, $details)
  139. {
  140. list($slot, $host) = explode(' ', $details, 2);
  141. $connection = $this->getConnectionById($host);
  142. if (!isset($connection)) {
  143. $parameters = array('host' => null, 'port' => null);
  144. list($parameters['host'], $parameters['port']) = explode(':', $host, 2);
  145. $connection = $this->connections->create($parameters);
  146. }
  147. switch ($request) {
  148. case 'MOVED':
  149. $this->add($connection);
  150. $this->slots[$slot] = $connection;
  151. return $this->executeCommand($command);
  152. case 'ASK':
  153. return $connection->executeCommand($command);
  154. default:
  155. throw new ClientException("Unexpected request type for a move request: $request");
  156. }
  157. }
  158. /**
  159. * {@inheritdoc}
  160. */
  161. public function count()
  162. {
  163. return count($this->pool);
  164. }
  165. /**
  166. * {@inheritdoc}
  167. */
  168. public function getIterator()
  169. {
  170. return new \ArrayIterator($this->pool);
  171. }
  172. /**
  173. * Handles -ERR replies from Redis.
  174. *
  175. * @param CommandInterface $command Command that generated the -ERR reply.
  176. * @param ResponseErrorInterface $error Redis error reply object.
  177. * @return mixed
  178. */
  179. protected function handleServerError(CommandInterface $command, ResponseErrorInterface $error)
  180. {
  181. list($type, $details) = explode(' ', $error->getMessage(), 2);
  182. switch ($type) {
  183. case 'MOVED':
  184. case 'ASK':
  185. return $this->onMoveRequest($command, $type, $details);
  186. default:
  187. return $error;
  188. }
  189. }
  190. /**
  191. * {@inheritdoc}
  192. */
  193. public function writeCommand(CommandInterface $command)
  194. {
  195. $this->getConnection($command)->writeCommand($command);
  196. }
  197. /**
  198. * {@inheritdoc}
  199. */
  200. public function readResponse(CommandInterface $command)
  201. {
  202. return $this->getConnection($command)->readResponse($command);
  203. }
  204. /**
  205. * {@inheritdoc}
  206. */
  207. public function executeCommand(CommandInterface $command)
  208. {
  209. $connection = $this->getConnection($command);
  210. $reply = $connection->executeCommand($command);
  211. if ($reply instanceof ResponseErrorInterface) {
  212. return $this->handleServerError($command, $reply);
  213. }
  214. return $reply;
  215. }
  216. }