RedisCluster.php 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Predis\ClientException;
  12. use Predis\NotSupportedException;
  13. use Predis\ResponseErrorInterface;
  14. use Predis\Command\CommandInterface;
  15. use Predis\Command\Hash\RedisClusterHashStrategy;
  16. use Predis\Connection\ConnectionFactory;
  17. use Predis\Connection\ConnectionFactoryInterface;
  18. use Predis\Distribution\CRC16HashGenerator;
  19. /**
  20. * Abstraction for Redis cluster (Redis v3.0).
  21. *
  22. * @author Daniele Alessandri <suppakilla@gmail.com>
  23. */
  24. class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \Countable
  25. {
  26. private $pool;
  27. private $slots;
  28. private $connections;
  29. private $distributor;
  30. private $cmdHasher;
  31. /**
  32. * @param ConnectionFactoryInterface $connections Connection factory object.
  33. */
  34. public function __construct(ConnectionFactoryInterface $connections = null)
  35. {
  36. $this->pool = array();
  37. $this->slots = array();
  38. $this->connections = $connections ?: new ConnectionFactory();
  39. $this->distributor = new CRC16HashGenerator();
  40. $this->cmdHasher = new RedisClusterHashStrategy();
  41. }
  42. /**
  43. * {@inheritdoc}
  44. */
  45. public function isConnected()
  46. {
  47. foreach ($this->pool as $connection) {
  48. if ($connection->isConnected()) {
  49. return true;
  50. }
  51. }
  52. return false;
  53. }
  54. /**
  55. * {@inheritdoc}
  56. */
  57. public function connect()
  58. {
  59. foreach ($this->pool as $connection) {
  60. $connection->connect();
  61. }
  62. }
  63. /**
  64. * {@inheritdoc}
  65. */
  66. public function disconnect()
  67. {
  68. foreach ($this->pool as $connection) {
  69. $connection->disconnect();
  70. }
  71. }
  72. /**
  73. * {@inheritdoc}
  74. */
  75. public function add(SingleConnectionInterface $connection)
  76. {
  77. $parameters = $connection->getParameters();
  78. $this->pool["{$parameters->host}:{$parameters->port}"] = $connection;
  79. }
  80. /**
  81. * {@inheritdoc}
  82. */
  83. public function remove(SingleConnectionInterface $connection)
  84. {
  85. if (($id = array_search($connection, $this->pool, true)) !== false) {
  86. unset($this->pool[$id]);
  87. return true;
  88. }
  89. return false;
  90. }
  91. /**
  92. * Removes a connection instance using its alias or index.
  93. *
  94. * @param string $connectionId Alias or index of a connection.
  95. * @return Boolean Returns true if the connection was in the pool.
  96. */
  97. public function removeById($connectionId)
  98. {
  99. if (isset($this->pool[$connectionId])) {
  100. unset($this->pool[$connectionId]);
  101. return true;
  102. }
  103. return false;
  104. }
  105. /**
  106. * {@inheritdoc}
  107. */
  108. public function getConnection(CommandInterface $command)
  109. {
  110. $hash = $command->getHash();
  111. if (!isset($hash)) {
  112. $hash = $this->cmdHasher->getHash($this->distributor, $command);
  113. if (!isset($hash)) {
  114. throw new NotSupportedException("Cannot send {$command->getId()} commands to redis-cluster");
  115. }
  116. $command->setHash($hash);
  117. }
  118. $slot = $hash & 4095; // 0x0FFF
  119. if (isset($this->slots[$slot])) {
  120. return $this->slots[$slot];
  121. }
  122. $this->slots[$slot] = $connection = $this->pool[array_rand($this->pool)];
  123. return $connection;
  124. }
  125. /**
  126. * {@inheritdoc}
  127. */
  128. public function getConnectionById($id = null)
  129. {
  130. if (!isset($id)) {
  131. throw new \InvalidArgumentException("A valid connection ID must be specified");
  132. }
  133. return isset($this->pool[$id]) ? $this->pool[$id] : null;
  134. }
  135. /**
  136. * Handles -MOVED or -ASK replies by re-executing the command on the server
  137. * specified by the Redis reply.
  138. *
  139. * @param CommandInterface $command Command that generated the -MOVE or -ASK reply.
  140. * @param string $request Type of request (either 'MOVED' or 'ASK').
  141. * @param string $details Parameters of the MOVED/ASK request.
  142. * @return mixed
  143. */
  144. protected function onMoveRequest(CommandInterface $command, $request, $details)
  145. {
  146. list($slot, $host) = explode(' ', $details, 2);
  147. $connection = $this->getConnectionById($host);
  148. if (!isset($connection)) {
  149. $parameters = array('host' => null, 'port' => null);
  150. list($parameters['host'], $parameters['port']) = explode(':', $host, 2);
  151. $connection = $this->connections->create($parameters);
  152. }
  153. switch ($request) {
  154. case 'MOVED':
  155. $this->add($connection);
  156. $this->slots[(int) $slot] = $connection;
  157. return $this->executeCommand($command);
  158. case 'ASK':
  159. return $connection->executeCommand($command);
  160. default:
  161. throw new ClientException("Unexpected request type for a move request: $request");
  162. }
  163. }
  164. /**
  165. * Returns the underlying command hash strategy used to hash
  166. * commands by their keys.
  167. *
  168. * @return CommandHashStrategy
  169. */
  170. public function getCommandHashStrategy()
  171. {
  172. return $this->cmdHasher;
  173. }
  174. /**
  175. * {@inheritdoc}
  176. */
  177. public function count()
  178. {
  179. return count($this->pool);
  180. }
  181. /**
  182. * {@inheritdoc}
  183. */
  184. public function getIterator()
  185. {
  186. return new \ArrayIterator($this->pool);
  187. }
  188. /**
  189. * Handles -ERR replies from Redis.
  190. *
  191. * @param CommandInterface $command Command that generated the -ERR reply.
  192. * @param ResponseErrorInterface $error Redis error reply object.
  193. * @return mixed
  194. */
  195. protected function handleServerError(CommandInterface $command, ResponseErrorInterface $error)
  196. {
  197. list($type, $details) = explode(' ', $error->getMessage(), 2);
  198. switch ($type) {
  199. case 'MOVED':
  200. case 'ASK':
  201. return $this->onMoveRequest($command, $type, $details);
  202. default:
  203. return $error;
  204. }
  205. }
  206. /**
  207. * {@inheritdoc}
  208. */
  209. public function writeCommand(CommandInterface $command)
  210. {
  211. $this->getConnection($command)->writeCommand($command);
  212. }
  213. /**
  214. * {@inheritdoc}
  215. */
  216. public function readResponse(CommandInterface $command)
  217. {
  218. return $this->getConnection($command)->readResponse($command);
  219. }
  220. /**
  221. * {@inheritdoc}
  222. */
  223. public function executeCommand(CommandInterface $command)
  224. {
  225. $connection = $this->getConnection($command);
  226. $reply = $connection->executeCommand($command);
  227. if ($reply instanceof ResponseErrorInterface) {
  228. return $this->handleServerError($command, $reply);
  229. }
  230. return $reply;
  231. }
  232. }