RedisCluster.php 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection\Cluster;
  11. use Predis\ClientException;
  12. use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
  13. use Predis\Cluster\StrategyInterface;
  14. use Predis\Command\CommandInterface;
  15. use Predis\Command\RawCommand;
  16. use Predis\Connection\ConnectionException;
  17. use Predis\Connection\FactoryInterface;
  18. use Predis\Connection\NodeConnectionInterface;
  19. use Predis\NotSupportedException;
  20. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  21. /**
  22. * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
  23. *
  24. * This connection backend offers smart support for redis-cluster by handling
  25. * automatic slots map (re)generation upon -MOVED or -ASK responses returned by
  26. * Redis when redirecting a client to a different node.
  27. *
  28. * The cluster can be pre-initialized using only a subset of the actual nodes in
  29. * the cluster, Predis will do the rest by adjusting the slots map and creating
  30. * the missing underlying connection instances on the fly.
  31. *
  32. * It is possible to pre-associate connections to a slots range with the "slots"
  33. * parameter in the form "$first-$last". This can greatly reduce runtime node
  34. * guessing and redirections.
  35. *
  36. * It is also possible to ask for the full and updated slots map directly to one
  37. * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
  38. * Asking for the cluster configuration to Redis is actually done by issuing a
  39. * CLUSTER SLOTS command to a random node in the pool.
  40. *
  41. * @author Daniele Alessandri <suppakilla@gmail.com>
  42. */
  43. class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
  44. {
  45. private $useClusterSlots = true;
  46. private $pool = array();
  47. private $slots = array();
  48. private $slotsMap;
  49. private $strategy;
  50. private $connections;
  51. private $retryLimit = 5;
  52. /**
  53. * @param FactoryInterface $connections Optional connection factory.
  54. * @param StrategyInterface $strategy Optional cluster strategy.
  55. */
  56. public function __construct(
  57. FactoryInterface $connections,
  58. StrategyInterface $strategy = null
  59. ) {
  60. $this->connections = $connections;
  61. $this->strategy = $strategy ?: new RedisClusterStrategy();
  62. }
  63. /**
  64. * Sets the maximum number of retries for commands upon server failure.
  65. *
  66. * -1 = unlimited retry attempts
  67. * 0 = no retry attempts (fails immediatly)
  68. * n = fail only after n retry attempts
  69. *
  70. * @param int $retry Number of retry attempts.
  71. */
  72. public function setRetryLimit($retry)
  73. {
  74. $this->retryLimit = (int) $retry;
  75. }
  76. /**
  77. * {@inheritdoc}
  78. */
  79. public function isConnected()
  80. {
  81. foreach ($this->pool as $connection) {
  82. if ($connection->isConnected()) {
  83. return true;
  84. }
  85. }
  86. return false;
  87. }
  88. /**
  89. * {@inheritdoc}
  90. */
  91. public function connect()
  92. {
  93. if ($connection = $this->getRandomConnection()) {
  94. $connection->connect();
  95. }
  96. }
  97. /**
  98. * {@inheritdoc}
  99. */
  100. public function disconnect()
  101. {
  102. foreach ($this->pool as $connection) {
  103. $connection->disconnect();
  104. }
  105. }
  106. /**
  107. * {@inheritdoc}
  108. */
  109. public function add(NodeConnectionInterface $connection)
  110. {
  111. $this->pool[(string) $connection] = $connection;
  112. unset($this->slotsMap);
  113. }
  114. /**
  115. * {@inheritdoc}
  116. */
  117. public function remove(NodeConnectionInterface $connection)
  118. {
  119. if (false !== $id = array_search($connection, $this->pool, true)) {
  120. unset(
  121. $this->pool[$id],
  122. $this->slotsMap
  123. );
  124. $this->slots = array_diff($this->slots, array($connection));
  125. return true;
  126. }
  127. return false;
  128. }
  129. /**
  130. * Removes a connection instance by using its identifier.
  131. *
  132. * @param string $connectionID Connection identifier.
  133. *
  134. * @return bool True if the connection was in the pool.
  135. */
  136. public function removeById($connectionID)
  137. {
  138. if (isset($this->pool[$connectionID])) {
  139. unset(
  140. $this->pool[$connectionID],
  141. $this->slotsMap
  142. );
  143. return true;
  144. }
  145. return false;
  146. }
  147. /**
  148. * Generates the current slots map by guessing the cluster configuration out
  149. * of the connection parameters of the connections in the pool.
  150. *
  151. * Generation is based on the same algorithm used by Redis to generate the
  152. * cluster, so it is most effective when all of the connections supplied on
  153. * initialization have the "slots" parameter properly set accordingly to the
  154. * current cluster configuration.
  155. *
  156. * @return array
  157. */
  158. public function buildSlotsMap()
  159. {
  160. $this->slotsMap = array();
  161. foreach ($this->pool as $connectionID => $connection) {
  162. $parameters = $connection->getParameters();
  163. if (!isset($parameters->slots)) {
  164. continue;
  165. }
  166. foreach (explode(',', $parameters->slots) as $slotRange) {
  167. $slots = explode('-', $slotRange, 2);
  168. if (!isset($slots[1])) {
  169. $slots[1] = $slots[0];
  170. }
  171. $this->setSlots($slots[0], $slots[1], $connectionID);
  172. }
  173. }
  174. return $this->slotsMap;
  175. }
  176. /**
  177. * Queries the specified node of the cluster to fetch the updated slots map.
  178. *
  179. * When the connection fails, this method tries to execute the same command
  180. * on a different connection picked at random from the pool of known nodes,
  181. * up until the retry limit is reached.
  182. *
  183. * @param NodeConnectionInterface $connection Connection to a node of the cluster.
  184. *
  185. * @return mixed
  186. */
  187. private function queryClusterNodeForSlotsMap(NodeConnectionInterface $connection)
  188. {
  189. $retries = 0;
  190. $command = RawCommand::create('CLUSTER', 'SLOTS');
  191. RETRY_COMMAND: {
  192. try {
  193. $response = $connection->executeCommand($command);
  194. } catch (ConnectionException $exception) {
  195. $connection = $exception->getConnection();
  196. $connection->disconnect();
  197. $this->remove($connection);
  198. if ($retries === $this->retryLimit) {
  199. throw $exception;
  200. }
  201. if (!$connection = $this->getRandomConnection()) {
  202. throw new ClientException('No connections left in the pool for `CLUSTER SLOTS`');
  203. }
  204. ++$retries;
  205. goto RETRY_COMMAND;
  206. }
  207. }
  208. return $response;
  209. }
  210. /**
  211. * Generates an updated slots map fetching the cluster configuration using
  212. * the CLUSTER SLOTS command against the specified node or a random one from
  213. * the pool.
  214. *
  215. * @param NodeConnectionInterface $connection Optional connection instance.
  216. *
  217. * @return array
  218. */
  219. public function askSlotsMap(NodeConnectionInterface $connection = null)
  220. {
  221. if (!$connection && !$connection = $this->getRandomConnection()) {
  222. return array();
  223. }
  224. $this->resetSlotsMap();
  225. $response = $this->queryClusterNodeForSlotsMap($connection);
  226. foreach ($response as $slots) {
  227. // We only support master servers for now, so we ignore subsequent
  228. // elements in the $slots array identifying slaves.
  229. list($start, $end, $master) = $slots;
  230. if ($master[0] === '') {
  231. $this->setSlots($start, $end, (string) $connection);
  232. } else {
  233. $this->setSlots($start, $end, "{$master[0]}:{$master[1]}");
  234. }
  235. }
  236. return $this->slotsMap;
  237. }
  238. /**
  239. * Resets the slots map cache.
  240. */
  241. public function resetSlotsMap()
  242. {
  243. $this->slotsMap = array();
  244. }
  245. /**
  246. * Returns the current slots map for the cluster.
  247. *
  248. * The order of the returned $slot => $server dictionary is not guaranteed.
  249. *
  250. * @return array
  251. */
  252. public function getSlotsMap()
  253. {
  254. if (!isset($this->slotsMap)) {
  255. $this->slotsMap = array();
  256. }
  257. return $this->slotsMap;
  258. }
  259. /**
  260. * Pre-associates a connection to a slots range to avoid runtime guessing.
  261. *
  262. * @param int $first Initial slot of the range.
  263. * @param int $last Last slot of the range.
  264. * @param NodeConnectionInterface|string $connection ID or connection instance.
  265. *
  266. * @throws \OutOfBoundsException
  267. */
  268. public function setSlots($first, $last, $connection)
  269. {
  270. if ($first < 0x0000 || $first > 0x3FFF ||
  271. $last < 0x0000 || $last > 0x3FFF ||
  272. $last < $first
  273. ) {
  274. throw new \OutOfBoundsException(
  275. "Invalid slot range for $connection: [$first-$last]."
  276. );
  277. }
  278. $slots = array_fill($first, $last - $first + 1, (string) $connection);
  279. $this->slotsMap = $this->getSlotsMap() + $slots;
  280. }
  281. /**
  282. * Guesses the correct node associated to a given slot using a precalculated
  283. * slots map, falling back to the same logic used by Redis to initialize a
  284. * cluster (best-effort).
  285. *
  286. * @param int $slot Slot index.
  287. *
  288. * @return string Connection ID.
  289. */
  290. protected function guessNode($slot)
  291. {
  292. if (!$this->pool) {
  293. throw new ClientException('No connections available in the pool');
  294. }
  295. if (!isset($this->slotsMap)) {
  296. $this->buildSlotsMap();
  297. }
  298. if (isset($this->slotsMap[$slot])) {
  299. return $this->slotsMap[$slot];
  300. }
  301. $count = count($this->pool);
  302. $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
  303. $nodes = array_keys($this->pool);
  304. return $nodes[$index];
  305. }
  306. /**
  307. * Creates a new connection instance from the given connection ID.
  308. *
  309. * @param string $connectionID Identifier for the connection.
  310. *
  311. * @return NodeConnectionInterface
  312. */
  313. protected function createConnection($connectionID)
  314. {
  315. $separator = strrpos($connectionID, ':');
  316. return $this->connections->create(array(
  317. 'host' => substr($connectionID, 0, $separator),
  318. 'port' => substr($connectionID, $separator + 1),
  319. ));
  320. }
  321. /**
  322. * {@inheritdoc}
  323. */
  324. public function getConnectionByCommand(CommandInterface $command)
  325. {
  326. $slot = $this->strategy->getSlot($command);
  327. if (!isset($slot)) {
  328. throw new NotSupportedException(
  329. "Cannot use '{$command->getId()}' with redis-cluster."
  330. );
  331. }
  332. if (isset($this->slots[$slot])) {
  333. return $this->slots[$slot];
  334. } else {
  335. return $this->getConnectionBySlot($slot);
  336. }
  337. }
  338. /**
  339. * Returns the connection currently associated to a given slot.
  340. *
  341. * @param int $slot Slot index.
  342. *
  343. * @throws \OutOfBoundsException
  344. *
  345. * @return NodeConnectionInterface
  346. */
  347. public function getConnectionBySlot($slot)
  348. {
  349. if ($slot < 0x0000 || $slot > 0x3FFF) {
  350. throw new \OutOfBoundsException("Invalid slot [$slot].");
  351. }
  352. if (isset($this->slots[$slot])) {
  353. return $this->slots[$slot];
  354. }
  355. $connectionID = $this->guessNode($slot);
  356. if (!$connection = $this->getConnectionById($connectionID)) {
  357. $connection = $this->createConnection($connectionID);
  358. $this->pool[$connectionID] = $connection;
  359. }
  360. return $this->slots[$slot] = $connection;
  361. }
  362. /**
  363. * {@inheritdoc}
  364. */
  365. public function getConnectionById($connectionID)
  366. {
  367. if (isset($this->pool[$connectionID])) {
  368. return $this->pool[$connectionID];
  369. }
  370. }
  371. /**
  372. * Returns a random connection from the pool.
  373. *
  374. * @return NodeConnectionInterface|null
  375. */
  376. protected function getRandomConnection()
  377. {
  378. if ($this->pool) {
  379. return $this->pool[array_rand($this->pool)];
  380. }
  381. }
  382. /**
  383. * Permanently associates the connection instance to a new slot.
  384. * The connection is added to the connections pool if not yet included.
  385. *
  386. * @param NodeConnectionInterface $connection Connection instance.
  387. * @param int $slot Target slot index.
  388. */
  389. protected function move(NodeConnectionInterface $connection, $slot)
  390. {
  391. $this->pool[(string) $connection] = $connection;
  392. $this->slots[(int) $slot] = $connection;
  393. }
  394. /**
  395. * Handles -ERR responses returned by Redis.
  396. *
  397. * @param CommandInterface $command Command that generated the -ERR response.
  398. * @param ErrorResponseInterface $error Redis error response object.
  399. *
  400. * @return mixed
  401. */
  402. protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $error)
  403. {
  404. $details = explode(' ', $error->getMessage(), 2);
  405. switch ($details[0]) {
  406. case 'MOVED':
  407. return $this->onMovedResponse($command, $details[1]);
  408. case 'ASK':
  409. return $this->onAskResponse($command, $details[1]);
  410. default:
  411. return $error;
  412. }
  413. }
  414. /**
  415. * Handles -MOVED responses by executing again the command against the node
  416. * indicated by the Redis response.
  417. *
  418. * @param CommandInterface $command Command that generated the -MOVED response.
  419. * @param string $details Parameters of the -MOVED response.
  420. *
  421. * @return mixed
  422. */
  423. protected function onMovedResponse(CommandInterface $command, $details)
  424. {
  425. list($slot, $connectionID) = explode(' ', $details, 2);
  426. if (!$connection = $this->getConnectionById($connectionID)) {
  427. $connection = $this->createConnection($connectionID);
  428. }
  429. if ($this->useClusterSlots) {
  430. $this->askSlotsMap($connection);
  431. }
  432. $this->move($connection, $slot);
  433. $response = $this->executeCommand($command);
  434. return $response;
  435. }
  436. /**
  437. * Handles -ASK responses by executing again the command against the node
  438. * indicated by the Redis response.
  439. *
  440. * @param CommandInterface $command Command that generated the -ASK response.
  441. * @param string $details Parameters of the -ASK response.
  442. *
  443. * @return mixed
  444. */
  445. protected function onAskResponse(CommandInterface $command, $details)
  446. {
  447. list($slot, $connectionID) = explode(' ', $details, 2);
  448. if (!$connection = $this->getConnectionById($connectionID)) {
  449. $connection = $this->createConnection($connectionID);
  450. }
  451. $connection->executeCommand(RawCommand::create('ASKING'));
  452. $response = $connection->executeCommand($command);
  453. return $response;
  454. }
  455. /**
  456. * Ensures that a command is executed one more time on connection failure.
  457. *
  458. * The connection to the node that generated the error is evicted from the
  459. * pool before trying to fetch an updated slots map from another node. If
  460. * the new slots map points to an unreachable server the client gives up and
  461. * throws the exception as the nodes participating in the cluster may still
  462. * have to agree that something changed in the configuration of the cluster.
  463. *
  464. * @param CommandInterface $command Command instance.
  465. * @param string $method Actual method.
  466. *
  467. * @return mixed
  468. */
  469. private function retryCommandOnFailure(CommandInterface $command, $method)
  470. {
  471. $failure = false;
  472. RETRY_COMMAND: {
  473. try {
  474. $response = $this->getConnectionByCommand($command)->$method($command);
  475. } catch (ConnectionException $exception) {
  476. $connection = $exception->getConnection();
  477. $connection->disconnect();
  478. $this->remove($connection);
  479. if ($failure) {
  480. throw $exception;
  481. } elseif ($this->useClusterSlots) {
  482. $this->askSlotsMap();
  483. }
  484. $failure = true;
  485. goto RETRY_COMMAND;
  486. }
  487. }
  488. return $response;
  489. }
  490. /**
  491. * {@inheritdoc}
  492. */
  493. public function writeRequest(CommandInterface $command)
  494. {
  495. $this->retryCommandOnFailure($command, __FUNCTION__);
  496. }
  497. /**
  498. * {@inheritdoc}
  499. */
  500. public function readResponse(CommandInterface $command)
  501. {
  502. return $this->retryCommandOnFailure($command, __FUNCTION__);
  503. }
  504. /**
  505. * {@inheritdoc}
  506. */
  507. public function executeCommand(CommandInterface $command)
  508. {
  509. $response = $this->retryCommandOnFailure($command, __FUNCTION__);
  510. if ($response instanceof ErrorResponseInterface) {
  511. return $this->onErrorResponse($command, $response);
  512. }
  513. return $response;
  514. }
  515. /**
  516. * {@inheritdoc}
  517. */
  518. public function count()
  519. {
  520. return count($this->pool);
  521. }
  522. /**
  523. * {@inheritdoc}
  524. */
  525. public function getIterator()
  526. {
  527. if ($this->useClusterSlots) {
  528. $slotsmap = $this->getSlotsMap() ?: $this->askSlotsMap();
  529. } else {
  530. $slotsmap = $this->getSlotsMap() ?: $this->buildSlotsMap();
  531. }
  532. $connections = array();
  533. foreach (array_unique($slotsmap) as $node) {
  534. if (!$connection = $this->getConnectionById($node)) {
  535. $this->add($connection = $this->createConnection($node));
  536. }
  537. $connections[] = $connection;
  538. }
  539. return new \ArrayIterator($connections);
  540. }
  541. /**
  542. * Returns the underlying command hash strategy used to hash commands by
  543. * using keys found in their arguments.
  544. *
  545. * @return StrategyInterface
  546. */
  547. public function getClusterStrategy()
  548. {
  549. return $this->strategy;
  550. }
  551. /**
  552. * Returns the underlying connection factory used to create new connection
  553. * instances to Redis nodes indicated by redis-cluster.
  554. *
  555. * @return FactoryInterface
  556. */
  557. public function getConnectionFactory()
  558. {
  559. return $this->connections;
  560. }
  561. /**
  562. * Enables automatic fetching of the current slots map from one of the nodes
  563. * using the CLUSTER SLOTS command. This option is enabled by default as
  564. * asking the current slots map to Redis upon -MOVED responses may reduce
  565. * overhead by eliminating the trial-and-error nature of the node guessing
  566. * procedure, mostly when targeting many keys that would end up in a lot of
  567. * redirections.
  568. *
  569. * The slots map can still be manually fetched using the askSlotsMap()
  570. * method whether or not this option is enabled.
  571. *
  572. * @param bool $value Enable or disable the use of CLUSTER SLOTS.
  573. */
  574. public function useClusterSlots($value)
  575. {
  576. $this->useClusterSlots = (bool) $value;
  577. }
  578. }