RedisCluster.php 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Predis\ClientException;
  12. use Predis\NotSupportedException;
  13. use Predis\ResponseErrorInterface;
  14. use Predis\Command\CommandInterface;
  15. use Predis\Command\Hash\RedisClusterHashStrategy;
  16. use Predis\Connection\ConnectionFactory;
  17. use Predis\Connection\ConnectionFactoryInterface;
  18. use Predis\Distribution\CRC16HashGenerator;
  19. /**
  20. * Abstraction for Redis cluster (Redis v3.0).
  21. *
  22. * @author Daniele Alessandri <suppakilla@gmail.com>
  23. */
  24. class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \Countable
  25. {
  26. private $pool;
  27. private $slots;
  28. private $slotsMap;
  29. private $connections;
  30. private $distributor;
  31. private $cmdHasher;
  32. /**
  33. * @param ConnectionFactoryInterface $connections Connection factory object.
  34. */
  35. public function __construct(ConnectionFactoryInterface $connections = null)
  36. {
  37. $this->pool = array();
  38. $this->slots = array();
  39. $this->slotsMap = array();
  40. $this->connections = $connections ?: new ConnectionFactory();
  41. $this->distributor = new CRC16HashGenerator();
  42. $this->cmdHasher = new RedisClusterHashStrategy();
  43. }
  44. /**
  45. * {@inheritdoc}
  46. */
  47. public function isConnected()
  48. {
  49. foreach ($this->pool as $connection) {
  50. if ($connection->isConnected()) {
  51. return true;
  52. }
  53. }
  54. return false;
  55. }
  56. /**
  57. * {@inheritdoc}
  58. */
  59. public function connect()
  60. {
  61. foreach ($this->pool as $connection) {
  62. $connection->connect();
  63. }
  64. }
  65. /**
  66. * {@inheritdoc}
  67. */
  68. public function disconnect()
  69. {
  70. foreach ($this->pool as $connection) {
  71. $connection->disconnect();
  72. }
  73. }
  74. /**
  75. * {@inheritdoc}
  76. */
  77. public function add(SingleConnectionInterface $connection)
  78. {
  79. $parameters = $connection->getParameters();
  80. $this->pool["{$parameters->host}:{$parameters->port}"] = $connection;
  81. }
  82. /**
  83. * {@inheritdoc}
  84. */
  85. public function remove(SingleConnectionInterface $connection)
  86. {
  87. if (($id = array_search($connection, $this->pool, true)) !== false) {
  88. unset($this->pool[$id]);
  89. return true;
  90. }
  91. return false;
  92. }
  93. /**
  94. * Removes a connection instance using its alias or index.
  95. *
  96. * @param string $connectionId Alias or index of a connection.
  97. * @return Boolean Returns true if the connection was in the pool.
  98. */
  99. public function removeById($connectionId)
  100. {
  101. if (isset($this->pool[$connectionId])) {
  102. unset($this->pool[$connectionId]);
  103. return true;
  104. }
  105. return false;
  106. }
  107. /**
  108. * Preassociate a connection to a set of slots to avoid runtime guessing.
  109. *
  110. * @todo Check type or existence of the specified connection.
  111. *
  112. * @param int $first Initial slot.
  113. * @param int $last Last slot.
  114. * @param ConnectionSingleInterface|string $connection ID or connection instance.
  115. */
  116. public function setSlots($first, $last, $connection)
  117. {
  118. $connection = (string) $connection;
  119. if ($first < 0 || $first > 4095 || $last < 0 || $last > 4095 || $last < $first) {
  120. throw new \OutOfBoundsException("Invalid slot values for $connection: [$first-$last]");
  121. }
  122. $slots = array_fill($first, $last - $first + 1, $connection);
  123. $this->slotsMap = array_merge($this->slotsMap, $slots);
  124. }
  125. /**
  126. * {@inheritdoc}
  127. */
  128. public function getConnection(CommandInterface $command)
  129. {
  130. $hash = $command->getHash();
  131. if (!isset($hash)) {
  132. $hash = $this->cmdHasher->getHash($this->distributor, $command);
  133. if (!isset($hash)) {
  134. throw new NotSupportedException("Cannot send {$command->getId()} commands to redis-cluster");
  135. }
  136. $command->setHash($hash);
  137. }
  138. $slot = $hash & 4095; // 0x0FFF
  139. if (isset($this->slots[$slot])) {
  140. return $this->slots[$slot];
  141. }
  142. if (isset($this->slotsMap[$slot])) {
  143. $this->slots[$slot] = $connection = $this->pool[$this->slotsMap[$slot]];
  144. }
  145. $connection = $this->pool[isset($this->slotsMap[$slot]) ? $this->slotsMap[$slot] : array_rand($this->pool)];
  146. $this->slots[$slot] = $connection;
  147. return $connection;
  148. }
  149. /**
  150. * {@inheritdoc}
  151. */
  152. public function getConnectionById($id = null)
  153. {
  154. if (!isset($id)) {
  155. throw new \InvalidArgumentException("A valid connection ID must be specified");
  156. }
  157. return isset($this->pool[$id]) ? $this->pool[$id] : null;
  158. }
  159. /**
  160. * Handles -MOVED or -ASK replies by re-executing the command on the server
  161. * specified by the Redis reply.
  162. *
  163. * @param CommandInterface $command Command that generated the -MOVE or -ASK reply.
  164. * @param string $request Type of request (either 'MOVED' or 'ASK').
  165. * @param string $details Parameters of the MOVED/ASK request.
  166. * @return mixed
  167. */
  168. protected function onMoveRequest(CommandInterface $command, $request, $details)
  169. {
  170. list($slot, $host) = explode(' ', $details, 2);
  171. $connection = $this->getConnectionById($host);
  172. if (!isset($connection)) {
  173. $parameters = array('host' => null, 'port' => null);
  174. list($parameters['host'], $parameters['port']) = explode(':', $host, 2);
  175. $connection = $this->connections->create($parameters);
  176. }
  177. switch ($request) {
  178. case 'MOVED':
  179. $this->add($connection);
  180. $this->slots[(int) $slot] = $connection;
  181. return $this->executeCommand($command);
  182. case 'ASK':
  183. return $connection->executeCommand($command);
  184. default:
  185. throw new ClientException("Unexpected request type for a move request: $request");
  186. }
  187. }
  188. /**
  189. * Returns the underlying command hash strategy used to hash
  190. * commands by their keys.
  191. *
  192. * @return CommandHashStrategy
  193. */
  194. public function getCommandHashStrategy()
  195. {
  196. return $this->cmdHasher;
  197. }
  198. /**
  199. * {@inheritdoc}
  200. */
  201. public function count()
  202. {
  203. return count($this->pool);
  204. }
  205. /**
  206. * {@inheritdoc}
  207. */
  208. public function getIterator()
  209. {
  210. return new \ArrayIterator($this->pool);
  211. }
  212. /**
  213. * Handles -ERR replies from Redis.
  214. *
  215. * @param CommandInterface $command Command that generated the -ERR reply.
  216. * @param ResponseErrorInterface $error Redis error reply object.
  217. * @return mixed
  218. */
  219. protected function handleServerError(CommandInterface $command, ResponseErrorInterface $error)
  220. {
  221. list($type, $details) = explode(' ', $error->getMessage(), 2);
  222. switch ($type) {
  223. case 'MOVED':
  224. case 'ASK':
  225. return $this->onMoveRequest($command, $type, $details);
  226. default:
  227. return $error;
  228. }
  229. }
  230. /**
  231. * {@inheritdoc}
  232. */
  233. public function writeCommand(CommandInterface $command)
  234. {
  235. $this->getConnection($command)->writeCommand($command);
  236. }
  237. /**
  238. * {@inheritdoc}
  239. */
  240. public function readResponse(CommandInterface $command)
  241. {
  242. return $this->getConnection($command)->readResponse($command);
  243. }
  244. /**
  245. * {@inheritdoc}
  246. */
  247. public function executeCommand(CommandInterface $command)
  248. {
  249. $connection = $this->getConnection($command);
  250. $reply = $connection->executeCommand($command);
  251. if ($reply instanceof ResponseErrorInterface) {
  252. return $this->handleServerError($command, $reply);
  253. }
  254. return $reply;
  255. }
  256. }