RedisCluster.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use ArrayIterator;
  12. use Countable;
  13. use IteratorAggregate;
  14. use OutOfBoundsException;
  15. use Predis\NotSupportedException;
  16. use Predis\ResponseErrorInterface;
  17. use Predis\Cluster\CommandHashStrategyInterface;
  18. use Predis\Cluster\RedisClusterHashStrategy;
  19. use Predis\Command\CommandInterface;
  20. use Predis\Command\RawCommand;
  21. use Predis\Protocol\ProtocolException;
  22. /**
  23. * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
  24. *
  25. * This connection backend offers smart support for redis-cluster by handling
  26. * automatic slots map (re)generation upon -MOVE or -ASK responses returned by
  27. * Redis when redirecting a client to a different node.
  28. *
  29. * The cluster can be pre-initialized using only a subset of the actual nodes in
  30. * the cluster, Predis will do the rest by adjusting the slots map and creating
  31. * the missing underlying connection instances on the fly.
  32. *
  33. * It is possible to pre-associate connections to a slots range with the "slots"
  34. * parameter in the form "$first-$last". This can greatly reduce runtime node
  35. * guessing and redirections.
  36. *
  37. * It is also possible to ask for the full and updated slots map directly to one
  38. * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
  39. * Asking for the cluster configuration to Redis is actually done by issuing a
  40. * CLUSTER NODES command to a random node in the pool.
  41. *
  42. * @author Daniele Alessandri <suppakilla@gmail.com>
  43. */
  44. class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Countable
  45. {
  46. private $askClusterNodes = false;
  47. private $defaultParameters = array();
  48. private $pool = array();
  49. private $slots = array();
  50. private $slotsMap;
  51. private $strategy;
  52. private $connections;
  53. /**
  54. * @param ConnectionFactoryInterface $connections Connection factory object.
  55. */
  56. public function __construct(ConnectionFactoryInterface $connections = null)
  57. {
  58. $this->strategy = new RedisClusterHashStrategy();
  59. $this->connections = $connections ?: new ConnectionFactory();
  60. }
  61. /**
  62. * {@inheritdoc}
  63. */
  64. public function isConnected()
  65. {
  66. foreach ($this->pool as $connection) {
  67. if ($connection->isConnected()) {
  68. return true;
  69. }
  70. }
  71. return false;
  72. }
  73. /**
  74. * {@inheritdoc}
  75. */
  76. public function connect()
  77. {
  78. if ($connection = $this->getRandomConnection()) {
  79. $connection->connect();
  80. }
  81. }
  82. /**
  83. * {@inheritdoc}
  84. */
  85. public function disconnect()
  86. {
  87. foreach ($this->pool as $connection) {
  88. $connection->disconnect();
  89. }
  90. }
  91. /**
  92. * {@inheritdoc}
  93. */
  94. public function add(SingleConnectionInterface $connection)
  95. {
  96. $this->pool[(string) $connection] = $connection;
  97. unset($this->slotsMap);
  98. }
  99. /**
  100. * {@inheritdoc}
  101. */
  102. public function remove(SingleConnectionInterface $connection)
  103. {
  104. if (false !== $id = array_search($connection, $this->pool, true)) {
  105. unset(
  106. $this->pool[$id],
  107. $this->slotsMap
  108. );
  109. return true;
  110. }
  111. return false;
  112. }
  113. /**
  114. * Removes a connection instance by using its identifier.
  115. *
  116. * @param string $connectionID Connection identifier.
  117. * @return bool True if the connection was in the pool.
  118. */
  119. public function removeById($connectionID)
  120. {
  121. if (isset($this->pool[$connectionID])) {
  122. unset(
  123. $this->pool[$connectionID],
  124. $this->slotsMap
  125. );
  126. return true;
  127. }
  128. return false;
  129. }
  130. /**
  131. * Generates the current slots map by guessing the cluster configuration out
  132. * of the connection parameters of the connections in the pool.
  133. *
  134. * Generation is based on the same algorithm used by Redis to generate the
  135. * cluster, so it is most effective when all of the connections supplied on
  136. * initialization have the "slots" parameter properly set accordingly to the
  137. * current cluster configuration.
  138. */
  139. public function buildSlotsMap()
  140. {
  141. $this->slotsMap = array();
  142. foreach ($this->pool as $connectionID => $connection) {
  143. $parameters = $connection->getParameters();
  144. if (!isset($parameters->slots)) {
  145. continue;
  146. }
  147. $slots = explode('-', $parameters->slots, 2);
  148. $this->setSlots($slots[0], $slots[1], $connectionID);
  149. }
  150. }
  151. /**
  152. * Generates the current slots map by fetching the cluster configuration to
  153. * one of the nodes by leveraging the CLUSTER NODES command.
  154. */
  155. public function askClusterNodes()
  156. {
  157. if (!$connection = $this->getRandomConnection()) {
  158. return array();
  159. }
  160. $cmdCluster = RawCommand::create('CLUSTER', 'NODES');
  161. $response = $connection->executeCommand($cmdCluster);
  162. $nodes = explode("\n", $response, -1);
  163. $count = count($nodes);
  164. for ($i = 0; $i < $count; $i++) {
  165. $node = explode(' ', $nodes[$i], 9);
  166. $slots = explode('-', $node[8], 2);
  167. if ($node[1] === ':0') {
  168. $this->setSlots($slots[0], $slots[1], (string) $connection);
  169. } else {
  170. $this->setSlots($slots[0], $slots[1], $node[1]);
  171. }
  172. }
  173. }
  174. /**
  175. * Returns the current slots map for the cluster.
  176. *
  177. * @return array
  178. */
  179. public function getSlotsMap()
  180. {
  181. if (!isset($this->slotsMap)) {
  182. $this->slotsMap = array();
  183. }
  184. return $this->slotsMap;
  185. }
  186. /**
  187. * Pre-associates a connection to a slots range to avoid runtime guessing.
  188. *
  189. * @param int $first Initial slot of the range.
  190. * @param int $last Last slot of the range.
  191. * @param SingleConnectionInterface|string $connection ID or connection instance.
  192. */
  193. public function setSlots($first, $last, $connection)
  194. {
  195. if ($first < 0x0000 || $first > 0x3FFF ||
  196. $last < 0x0000 || $last > 0x3FFF ||
  197. $last < $first
  198. ) {
  199. throw new OutOfBoundsException(
  200. "Invalid slot range for $connection: [$first-$last]"
  201. );
  202. }
  203. $slots = array_fill($first, $last - $first + 1, (string) $connection);
  204. $this->slotsMap = $this->getSlotsMap() + $slots;
  205. }
  206. /**
  207. * Guesses the correct node associated to a given slot using a precalculated
  208. * slots map, falling back to the same logic used by Redis to initialize a
  209. * cluster (best-effort).
  210. *
  211. * @param int $slot Slot index.
  212. * @return string Connection ID.
  213. */
  214. protected function guessNode($slot)
  215. {
  216. if (!isset($this->slotsMap)) {
  217. $this->buildSlotsMap();
  218. }
  219. if (isset($this->slotsMap[$slot])) {
  220. return $this->slotsMap[$slot];
  221. }
  222. $count = count($this->pool);
  223. $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
  224. $nodes = array_keys($this->pool);
  225. return $nodes[$index];
  226. }
  227. /**
  228. * {@inheritdoc}
  229. */
  230. public function getConnection(CommandInterface $command)
  231. {
  232. $hash = $this->strategy->getHash($command);
  233. if (!isset($hash)) {
  234. throw new NotSupportedException(
  235. "Cannot use {$command->getId()} with redis-cluster"
  236. );
  237. }
  238. $slot = $hash & 0x3FFF;
  239. if (isset($this->slots[$slot])) {
  240. return $this->slots[$slot];
  241. } else {
  242. return $this->getConnectionBySlot($slot);
  243. }
  244. }
  245. /**
  246. * Returns the connection currently associated to a given slot.
  247. *
  248. * @param int $slot Slot index.
  249. * @return SingleConnectionInterface
  250. */
  251. public function getConnectionBySlot($slot)
  252. {
  253. if ($slot < 0x0000 || $slot > 0x3FFF) {
  254. throw new OutOfBoundsException("Invalid slot [$slot]");
  255. }
  256. if (isset($this->slots[$slot])) {
  257. return $this->slots[$slot];
  258. }
  259. $connectionID = $this->guessNode($slot);
  260. if (!$connection = $this->getConnectionById($connectionID)) {
  261. $host = explode(':', $connectionID, 2);
  262. $parameters = array_merge($this->defaultParameters, array(
  263. 'host' => $host[0],
  264. 'port' => $host[1],
  265. ));
  266. $connection = $this->connections->create($parameters);
  267. $this->pool[$connectionID] = $connection;
  268. }
  269. return $this->slots[$slot] = $connection;
  270. }
  271. /**
  272. * {@inheritdoc}
  273. */
  274. public function getConnectionById($connectionID)
  275. {
  276. if (isset($this->pool[$connectionID])) {
  277. return $this->pool[$connectionID];
  278. }
  279. }
  280. /**
  281. * Returns a random connection from the pool.
  282. *
  283. * @return SingleConnectionInterface
  284. */
  285. protected function getRandomConnection()
  286. {
  287. if ($this->pool) {
  288. return $this->pool[array_rand($this->pool)];
  289. }
  290. }
  291. /**
  292. * Permanently associates the connection instance to a new slot.
  293. * The connection is added to the connections pool if not yet included.
  294. *
  295. * @param SingleConnectionInterface $connection Connection instance.
  296. * @param int $slot Target slot index.
  297. */
  298. protected function move(SingleConnectionInterface $connection, $slot)
  299. {
  300. $this->pool[(string) $connection] = $connection;
  301. $this->slots[(int) $slot] = $connection;
  302. }
  303. /**
  304. * Handles -ERR responses from Redis.
  305. *
  306. * @param CommandInterface $command Command that generated the -ERR response.
  307. * @param ResponseErrorInterface $error Redis error response object.
  308. * @return mixed
  309. */
  310. protected function onErrorResponse(CommandInterface $command, ResponseErrorInterface $error)
  311. {
  312. $details = explode(' ', $error->getMessage(), 2);
  313. switch ($details[0]) {
  314. case 'MOVED':
  315. case 'ASK':
  316. return $this->onMoveRequest($command, $details[0], $details[1]);
  317. default:
  318. return $error;
  319. }
  320. }
  321. /**
  322. * Handles -MOVED and -ASK responses by re-executing the command on the node
  323. * specified by the Redis response.
  324. *
  325. * @param CommandInterface $command Command that generated the -MOVE or -ASK response.
  326. * @param string $request Type of request (either 'MOVED' or 'ASK').
  327. * @param string $details Parameters of the MOVED/ASK request.
  328. * @return mixed
  329. */
  330. protected function onMoveRequest(CommandInterface $command, $request, $details)
  331. {
  332. list($slot, $host) = explode(' ', $details, 2);
  333. $connection = $this->getConnectionById($host);
  334. if (!$connection) {
  335. $host = explode(':', $host, 2);
  336. $parameters = array_merge($this->defaultParameters, array(
  337. 'host' => $host[0],
  338. 'port' => $host[1],
  339. ));
  340. $connection = $this->connections->create($parameters);
  341. }
  342. switch ($request) {
  343. case 'MOVED':
  344. if ($this->askClusterNodes) {
  345. $this->askClusterNodes();
  346. }
  347. $this->move($connection, $slot);
  348. $response = $this->executeCommand($command);
  349. return $response;
  350. case 'ASK':
  351. $connection->executeCommand(RawCommand::create('ASKING'));
  352. $response = $connection->executeCommand($command);
  353. return $response;
  354. default:
  355. throw new ProtocolException(
  356. "Unexpected request type for a move request: $request"
  357. );
  358. }
  359. }
  360. /**
  361. * {@inheritdoc}
  362. */
  363. public function writeCommand(CommandInterface $command)
  364. {
  365. $this->getConnection($command)->writeCommand($command);
  366. }
  367. /**
  368. * {@inheritdoc}
  369. */
  370. public function readResponse(CommandInterface $command)
  371. {
  372. return $this->getConnection($command)->readResponse($command);
  373. }
  374. /**
  375. * {@inheritdoc}
  376. */
  377. public function executeCommand(CommandInterface $command)
  378. {
  379. $connection = $this->getConnection($command);
  380. $response = $connection->executeCommand($command);
  381. if ($response instanceof ResponseErrorInterface) {
  382. return $this->onErrorResponse($command, $response);
  383. }
  384. return $response;
  385. }
  386. /**
  387. * {@inheritdoc}
  388. */
  389. public function count()
  390. {
  391. return count($this->pool);
  392. }
  393. /**
  394. * {@inheritdoc}
  395. */
  396. public function getIterator()
  397. {
  398. return new ArrayIterator(array_values($this->pool));
  399. }
  400. /**
  401. * Returns the underlying hash strategy used to hash commands by their keys.
  402. *
  403. * @return CommandHashStrategyInterface
  404. */
  405. public function getCommandHashStrategy()
  406. {
  407. return $this->strategy;
  408. }
  409. /**
  410. * Enables automatic fetching of the current slots map from one of the nodes
  411. * using the CLUSTER NODES command. This option is disabled by default but
  412. * asking the current slots map to Redis upon -MOVE responses may reduce
  413. * overhead by eliminating the trial-and-error nature of the node guessing
  414. * procedure, mostly when targeting many keys that would end up in a lot of
  415. * redirections.
  416. *
  417. * The slots map can still be manually fetched using the askClusterNodes()
  418. * method whether or not this option is enabled.
  419. *
  420. * @param bool $value Enable or disable the use of CLUSTER NODES.
  421. */
  422. public function enableClusterNodes($value)
  423. {
  424. $this->askClusterNodes = (bool) $value;
  425. }
  426. /**
  427. * Sets a default array of connection parameters to be applied when creating
  428. * new connection instances on the fly when they are not part of the initial
  429. * pool supplied upon cluster initialization.
  430. *
  431. * These parameters are not applied to connections added to the pool using
  432. * the add() method.
  433. *
  434. * @param array $parameters Array of connection parameters.
  435. */
  436. public function setDefaultParameters(array $parameters)
  437. {
  438. $this->defaultParameters = array_merge(
  439. $this->defaultParameters,
  440. $parameters ?: array()
  441. );
  442. }
  443. }