RedisCluster.php 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Predis\ClientException;
  12. use Predis\NotSupportedException;
  13. use Predis\ResponseErrorInterface;
  14. use Predis\Command\CommandInterface;
  15. use Predis\Command\Hash\RedisClusterHashStrategy;
  16. use Predis\Distribution\CRC16HashGenerator;
  17. /**
  18. * Abstraction for Redis cluster (Redis v3.0).
  19. *
  20. * @author Daniele Alessandri <suppakilla@gmail.com>
  21. */
  22. class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \Countable
  23. {
  24. private $pool;
  25. private $slots;
  26. private $slotsMap;
  27. private $slotsPerNode;
  28. private $connections;
  29. private $distributor;
  30. private $cmdHasher;
  31. /**
  32. * @param ConnectionFactoryInterface $connections Connection factory object.
  33. */
  34. public function __construct(ConnectionFactoryInterface $connections = null)
  35. {
  36. $this->pool = array();
  37. $this->slots = array();
  38. $this->connections = $connections ?: new ConnectionFactory();
  39. $this->distributor = new CRC16HashGenerator();
  40. $this->cmdHasher = new RedisClusterHashStrategy();
  41. }
  42. /**
  43. * {@inheritdoc}
  44. */
  45. public function isConnected()
  46. {
  47. foreach ($this->pool as $connection) {
  48. if ($connection->isConnected()) {
  49. return true;
  50. }
  51. }
  52. return false;
  53. }
  54. /**
  55. * {@inheritdoc}
  56. */
  57. public function connect()
  58. {
  59. foreach ($this->pool as $connection) {
  60. $connection->connect();
  61. }
  62. }
  63. /**
  64. * {@inheritdoc}
  65. */
  66. public function disconnect()
  67. {
  68. foreach ($this->pool as $connection) {
  69. $connection->disconnect();
  70. }
  71. }
  72. /**
  73. * {@inheritdoc}
  74. */
  75. public function add(SingleConnectionInterface $connection)
  76. {
  77. $this->pool[(string) $connection] = $connection;
  78. unset(
  79. $this->slotsMap,
  80. $this->slotsPerNode
  81. );
  82. }
  83. /**
  84. * {@inheritdoc}
  85. */
  86. public function remove(SingleConnectionInterface $connection)
  87. {
  88. if (($id = array_search($connection, $this->pool, true)) !== false) {
  89. unset(
  90. $this->pool[$id],
  91. $this->slotsMap,
  92. $this->slotsPerNode
  93. );
  94. return true;
  95. }
  96. return false;
  97. }
  98. /**
  99. * Removes a connection instance using its alias or index.
  100. *
  101. * @param string $connectionId Alias or index of a connection.
  102. * @return Boolean Returns true if the connection was in the pool.
  103. */
  104. public function removeById($connectionId)
  105. {
  106. if (isset($this->pool[$connectionId])) {
  107. unset(
  108. $this->pool[$connectionId],
  109. $this->slotsMap,
  110. $this->slotsPerNode
  111. );
  112. return true;
  113. }
  114. return false;
  115. }
  116. /**
  117. * Builds the slots map for the cluster.
  118. */
  119. public function buildSlotsMap()
  120. {
  121. $this->slotsMap = array();
  122. $this->slotsPerNode = (int) (4096 / count($this->pool));
  123. foreach ($this->pool as $connectionID => $connection) {
  124. $parameters = $connection->getParameters();
  125. if (!isset($parameters->slots)) {
  126. continue;
  127. }
  128. list($first, $last) = explode('-', $parameters->slots, 2);
  129. $this->setSlots($first, $last, $connectionID);
  130. }
  131. return $this->slotsMap;
  132. }
  133. /**
  134. * Preassociate a connection to a set of slots to avoid runtime guessing.
  135. *
  136. * @todo Check type or existence of the specified connection.
  137. *
  138. * @param int $first Initial slot.
  139. * @param int $last Last slot.
  140. * @param SingleConnectionInterface|string $connection ID or connection instance.
  141. */
  142. public function setSlots($first, $last, $connection)
  143. {
  144. if ($first < 0 || $first > 4095 || $last < 0 || $last > 4095 || $last < $first) {
  145. throw new \OutOfBoundsException("Invalid slot values for $connection: [$first-$last]");
  146. }
  147. $this->slotsMap = $this->slotsMap + array_fill($first, $last - $first + 1, (string) $connection);
  148. }
  149. /**
  150. * {@inheritdoc}
  151. */
  152. public function getConnection(CommandInterface $command)
  153. {
  154. $hash = $command->getHash();
  155. if (!isset($hash)) {
  156. $hash = $this->cmdHasher->getHash($this->distributor, $command);
  157. if (!isset($hash)) {
  158. throw new NotSupportedException("Cannot send {$command->getId()} commands to redis-cluster");
  159. }
  160. $command->setHash($hash);
  161. }
  162. $slot = $hash & 4095; // 0x0FFF
  163. if (isset($this->slots[$slot])) {
  164. return $this->slots[$slot];
  165. }
  166. $this->slots[$slot] = $connection = $this->pool[$this->guessNode($slot)];
  167. return $connection;
  168. }
  169. /**
  170. * Tries guessing the correct node associated to the given slot using a precalculated
  171. * slots map or the same logic used by redis-trib to initialize a redis cluster.
  172. *
  173. * @param int $slot Slot ID.
  174. * @return string
  175. */
  176. protected function guessNode($slot)
  177. {
  178. if (!isset($this->slotsMap)) {
  179. $this->buildSlotsMap();
  180. }
  181. if (isset($this->slotsMap[$slot])) {
  182. return $this->slotsMap[$slot];
  183. }
  184. $index = min((int) ($slot / $this->slotsPerNode), count($this->pool) - 1);
  185. $nodes = array_keys($this->pool);
  186. return $nodes[$index];
  187. }
  188. /**
  189. * {@inheritdoc}
  190. */
  191. public function getConnectionById($id = null)
  192. {
  193. if (!isset($id)) {
  194. throw new \InvalidArgumentException("A valid connection ID must be specified");
  195. }
  196. return isset($this->pool[$id]) ? $this->pool[$id] : null;
  197. }
  198. /**
  199. * Handles -MOVED or -ASK replies by re-executing the command on the server
  200. * specified by the Redis reply.
  201. *
  202. * @param CommandInterface $command Command that generated the -MOVE or -ASK reply.
  203. * @param string $request Type of request (either 'MOVED' or 'ASK').
  204. * @param string $details Parameters of the MOVED/ASK request.
  205. * @return mixed
  206. */
  207. protected function onMoveRequest(CommandInterface $command, $request, $details)
  208. {
  209. list($slot, $host) = explode(' ', $details, 2);
  210. $connection = $this->getConnectionById($host);
  211. if (!isset($connection)) {
  212. $parameters = array('host' => null, 'port' => null);
  213. list($parameters['host'], $parameters['port']) = explode(':', $host, 2);
  214. $connection = $this->connections->create($parameters);
  215. }
  216. switch ($request) {
  217. case 'MOVED':
  218. $this->move($connection, $slot);
  219. return $this->executeCommand($command);
  220. case 'ASK':
  221. return $connection->executeCommand($command);
  222. default:
  223. throw new ClientException("Unexpected request type for a move request: $request");
  224. }
  225. }
  226. /**
  227. * Assign the connection instance to a new slot and adds it to the
  228. * pool if the connection was not already part of the pool.
  229. *
  230. * @param SingleConnectionInterface $connection Connection instance
  231. * @param int $slot Target slot.
  232. */
  233. protected function move(SingleConnectionInterface $connection, $slot)
  234. {
  235. $this->pool[(string) $connection] = $connection;
  236. $this->slots[(int) $slot] = $connection;
  237. }
  238. /**
  239. * Returns the underlying command hash strategy used to hash
  240. * commands by their keys.
  241. *
  242. * @return CommandHashStrategy
  243. */
  244. public function getCommandHashStrategy()
  245. {
  246. return $this->cmdHasher;
  247. }
  248. /**
  249. * {@inheritdoc}
  250. */
  251. public function count()
  252. {
  253. return count($this->pool);
  254. }
  255. /**
  256. * {@inheritdoc}
  257. */
  258. public function getIterator()
  259. {
  260. return new \ArrayIterator($this->pool);
  261. }
  262. /**
  263. * Handles -ERR replies from Redis.
  264. *
  265. * @param CommandInterface $command Command that generated the -ERR reply.
  266. * @param ResponseErrorInterface $error Redis error reply object.
  267. * @return mixed
  268. */
  269. protected function handleServerError(CommandInterface $command, ResponseErrorInterface $error)
  270. {
  271. list($type, $details) = explode(' ', $error->getMessage(), 2);
  272. switch ($type) {
  273. case 'MOVED':
  274. case 'ASK':
  275. return $this->onMoveRequest($command, $type, $details);
  276. default:
  277. return $error;
  278. }
  279. }
  280. /**
  281. * {@inheritdoc}
  282. */
  283. public function writeCommand(CommandInterface $command)
  284. {
  285. $this->getConnection($command)->writeCommand($command);
  286. }
  287. /**
  288. * {@inheritdoc}
  289. */
  290. public function readResponse(CommandInterface $command)
  291. {
  292. return $this->getConnection($command)->readResponse($command);
  293. }
  294. /**
  295. * {@inheritdoc}
  296. */
  297. public function executeCommand(CommandInterface $command)
  298. {
  299. $connection = $this->getConnection($command);
  300. $reply = $connection->executeCommand($command);
  301. if ($reply instanceof ResponseErrorInterface) {
  302. return $this->handleServerError($command, $reply);
  303. }
  304. return $reply;
  305. }
  306. }