RedisCluster.php 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Predis\ClientException;
  12. use Predis\NotSupportedException;
  13. use Predis\ResponseErrorInterface;
  14. use Predis\Command\CommandInterface;
  15. use Predis\Command\Hash\RedisClusterHashStrategy;
  16. use Predis\Connection\ConnectionFactory;
  17. use Predis\Connection\ConnectionFactoryInterface;
  18. use Predis\Distribution\CRC16HashGenerator;
  19. /**
  20. * Abstraction for Redis cluster (Redis v3.0).
  21. *
  22. * @author Daniele Alessandri <suppakilla@gmail.com>
  23. */
  24. class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \Countable
  25. {
  26. private $pool;
  27. private $slots;
  28. private $slotsMap;
  29. private $connections;
  30. private $distributor;
  31. private $cmdHasher;
  32. /**
  33. * @param ConnectionFactoryInterface $connections Connection factory object.
  34. */
  35. public function __construct(ConnectionFactoryInterface $connections = null)
  36. {
  37. $this->pool = array();
  38. $this->slots = array();
  39. $this->slotsMap = array();
  40. $this->connections = $connections ?: new ConnectionFactory();
  41. $this->distributor = new CRC16HashGenerator();
  42. $this->cmdHasher = new RedisClusterHashStrategy();
  43. }
  44. /**
  45. * {@inheritdoc}
  46. */
  47. public function isConnected()
  48. {
  49. foreach ($this->pool as $connection) {
  50. if ($connection->isConnected()) {
  51. return true;
  52. }
  53. }
  54. return false;
  55. }
  56. /**
  57. * {@inheritdoc}
  58. */
  59. public function connect()
  60. {
  61. foreach ($this->pool as $connection) {
  62. $connection->connect();
  63. }
  64. }
  65. /**
  66. * {@inheritdoc}
  67. */
  68. public function disconnect()
  69. {
  70. foreach ($this->pool as $connection) {
  71. $connection->disconnect();
  72. }
  73. }
  74. /**
  75. * {@inheritdoc}
  76. */
  77. public function add(SingleConnectionInterface $connection)
  78. {
  79. $parameters = $connection->getParameters();
  80. $this->pool[$connectionID = "{$parameters->host}:{$parameters->port}"] = $connection;
  81. if (isset($parameters->slots)) {
  82. @list($first, $last) = explode('-', $parameters->slots, 2);
  83. $this->setSlots($first, $last, $connectionID);
  84. }
  85. }
  86. /**
  87. * {@inheritdoc}
  88. */
  89. public function remove(SingleConnectionInterface $connection)
  90. {
  91. if (($id = array_search($connection, $this->pool, true)) !== false) {
  92. unset($this->pool[$id]);
  93. return true;
  94. }
  95. return false;
  96. }
  97. /**
  98. * Removes a connection instance using its alias or index.
  99. *
  100. * @param string $connectionId Alias or index of a connection.
  101. * @return Boolean Returns true if the connection was in the pool.
  102. */
  103. public function removeById($connectionId)
  104. {
  105. if (isset($this->pool[$connectionId])) {
  106. unset($this->pool[$connectionId]);
  107. return true;
  108. }
  109. return false;
  110. }
  111. /**
  112. * Preassociate a connection to a set of slots to avoid runtime guessing.
  113. *
  114. * @todo Check type or existence of the specified connection.
  115. *
  116. * @param int $first Initial slot.
  117. * @param int $last Last slot.
  118. * @param SingleConnectionInterface|string $connection ID or connection instance.
  119. */
  120. public function setSlots($first, $last, $connection)
  121. {
  122. if ($first < 0 || $first > 4095 || $last < 0 || $last > 4095 || $last < $first) {
  123. throw new \OutOfBoundsException("Invalid slot values for $connection: [$first-$last]");
  124. }
  125. $slots = array_fill($first, $last - $first + 1, (string) $connection);
  126. $this->slotsMap = array_merge($this->slotsMap, $slots);
  127. }
  128. /**
  129. * {@inheritdoc}
  130. */
  131. public function getConnection(CommandInterface $command)
  132. {
  133. $hash = $command->getHash();
  134. if (!isset($hash)) {
  135. $hash = $this->cmdHasher->getHash($this->distributor, $command);
  136. if (!isset($hash)) {
  137. throw new NotSupportedException("Cannot send {$command->getId()} commands to redis-cluster");
  138. }
  139. $command->setHash($hash);
  140. }
  141. $slot = $hash & 4095; // 0x0FFF
  142. if (isset($this->slots[$slot])) {
  143. return $this->slots[$slot];
  144. }
  145. if (isset($this->slotsMap[$slot])) {
  146. $this->slots[$slot] = $connection = $this->pool[$this->slotsMap[$slot]];
  147. }
  148. $connection = $this->pool[isset($this->slotsMap[$slot]) ? $this->slotsMap[$slot] : array_rand($this->pool)];
  149. $this->slots[$slot] = $connection;
  150. return $connection;
  151. }
  152. /**
  153. * {@inheritdoc}
  154. */
  155. public function getConnectionById($id = null)
  156. {
  157. if (!isset($id)) {
  158. throw new \InvalidArgumentException("A valid connection ID must be specified");
  159. }
  160. return isset($this->pool[$id]) ? $this->pool[$id] : null;
  161. }
  162. /**
  163. * Handles -MOVED or -ASK replies by re-executing the command on the server
  164. * specified by the Redis reply.
  165. *
  166. * @param CommandInterface $command Command that generated the -MOVE or -ASK reply.
  167. * @param string $request Type of request (either 'MOVED' or 'ASK').
  168. * @param string $details Parameters of the MOVED/ASK request.
  169. * @return mixed
  170. */
  171. protected function onMoveRequest(CommandInterface $command, $request, $details)
  172. {
  173. list($slot, $host) = explode(' ', $details, 2);
  174. $connection = $this->getConnectionById($host);
  175. if (!isset($connection)) {
  176. $parameters = array('host' => null, 'port' => null);
  177. list($parameters['host'], $parameters['port']) = explode(':', $host, 2);
  178. $connection = $this->connections->create($parameters);
  179. }
  180. switch ($request) {
  181. case 'MOVED':
  182. $this->move($connection, $slot);
  183. return $this->executeCommand($command);
  184. case 'ASK':
  185. return $connection->executeCommand($command);
  186. default:
  187. throw new ClientException("Unexpected request type for a move request: $request");
  188. }
  189. }
  190. /**
  191. * Assign the connection instance to a new slot and adds it to the
  192. * pool if the connection was not already part of the pool.
  193. *
  194. * @param SingleConnectionInterface $connection Connection instance
  195. * @param int $slot Target slot.
  196. */
  197. protected function move(SingleConnectionInterface $connection, $slot)
  198. {
  199. $this->pool[(string) $connection] = $connection;
  200. $this->slots[(int) $slot] = $connection;
  201. }
  202. /**
  203. * Returns the underlying command hash strategy used to hash
  204. * commands by their keys.
  205. *
  206. * @return CommandHashStrategy
  207. */
  208. public function getCommandHashStrategy()
  209. {
  210. return $this->cmdHasher;
  211. }
  212. /**
  213. * {@inheritdoc}
  214. */
  215. public function count()
  216. {
  217. return count($this->pool);
  218. }
  219. /**
  220. * {@inheritdoc}
  221. */
  222. public function getIterator()
  223. {
  224. return new \ArrayIterator($this->pool);
  225. }
  226. /**
  227. * Handles -ERR replies from Redis.
  228. *
  229. * @param CommandInterface $command Command that generated the -ERR reply.
  230. * @param ResponseErrorInterface $error Redis error reply object.
  231. * @return mixed
  232. */
  233. protected function handleServerError(CommandInterface $command, ResponseErrorInterface $error)
  234. {
  235. list($type, $details) = explode(' ', $error->getMessage(), 2);
  236. switch ($type) {
  237. case 'MOVED':
  238. case 'ASK':
  239. return $this->onMoveRequest($command, $type, $details);
  240. default:
  241. return $error;
  242. }
  243. }
  244. /**
  245. * {@inheritdoc}
  246. */
  247. public function writeCommand(CommandInterface $command)
  248. {
  249. $this->getConnection($command)->writeCommand($command);
  250. }
  251. /**
  252. * {@inheritdoc}
  253. */
  254. public function readResponse(CommandInterface $command)
  255. {
  256. return $this->getConnection($command)->readResponse($command);
  257. }
  258. /**
  259. * {@inheritdoc}
  260. */
  261. public function executeCommand(CommandInterface $command)
  262. {
  263. $connection = $this->getConnection($command);
  264. $reply = $connection->executeCommand($command);
  265. if ($reply instanceof ResponseErrorInterface) {
  266. return $this->handleServerError($command, $reply);
  267. }
  268. return $reply;
  269. }
  270. }