RedisCluster.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection\Aggregate;
  11. use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
  12. use Predis\Cluster\StrategyInterface;
  13. use Predis\Command\CommandInterface;
  14. use Predis\Command\RawCommand;
  15. use Predis\Connection\FactoryInterface;
  16. use Predis\Connection\NodeConnectionInterface;
  17. use Predis\NotSupportedException;
  18. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  19. /**
  20. * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
  21. *
  22. * This connection backend offers smart support for redis-cluster by handling
  23. * automatic slots map (re)generation upon -MOVED or -ASK responses returned by
  24. * Redis when redirecting a client to a different node.
  25. *
  26. * The cluster can be pre-initialized using only a subset of the actual nodes in
  27. * the cluster, Predis will do the rest by adjusting the slots map and creating
  28. * the missing underlying connection instances on the fly.
  29. *
  30. * It is possible to pre-associate connections to a slots range with the "slots"
  31. * parameter in the form "$first-$last". This can greatly reduce runtime node
  32. * guessing and redirections.
  33. *
  34. * It is also possible to ask for the full and updated slots map directly to one
  35. * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
  36. * Asking for the cluster configuration to Redis is actually done by issuing a
  37. * CLUSTER SLOTS command to a random node in the pool.
  38. *
  39. * @author Daniele Alessandri <suppakilla@gmail.com>
  40. */
  41. class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
  42. {
  43. private $useClusterSlots = true;
  44. private $defaultParameters = array();
  45. private $pool = array();
  46. private $slots = array();
  47. private $slotsMap;
  48. private $strategy;
  49. private $connections;
  50. /**
  51. * @param FactoryInterface $connections Optional connection factory.
  52. * @param StrategyInterface $strategy Optional cluster strategy.
  53. */
  54. public function __construct(
  55. FactoryInterface $connections,
  56. StrategyInterface $strategy = null
  57. ) {
  58. $this->connections = $connections;
  59. $this->strategy = $strategy ?: new RedisClusterStrategy();
  60. }
  61. /**
  62. * {@inheritdoc}
  63. */
  64. public function isConnected()
  65. {
  66. foreach ($this->pool as $connection) {
  67. if ($connection->isConnected()) {
  68. return true;
  69. }
  70. }
  71. return false;
  72. }
  73. /**
  74. * {@inheritdoc}
  75. */
  76. public function connect()
  77. {
  78. if ($connection = $this->getRandomConnection()) {
  79. $connection->connect();
  80. }
  81. }
  82. /**
  83. * {@inheritdoc}
  84. */
  85. public function disconnect()
  86. {
  87. foreach ($this->pool as $connection) {
  88. $connection->disconnect();
  89. }
  90. }
  91. /**
  92. * {@inheritdoc}
  93. */
  94. public function add(NodeConnectionInterface $connection)
  95. {
  96. $this->pool[(string) $connection] = $connection;
  97. unset($this->slotsMap);
  98. }
  99. /**
  100. * {@inheritdoc}
  101. */
  102. public function remove(NodeConnectionInterface $connection)
  103. {
  104. if (false !== $id = array_search($connection, $this->pool, true)) {
  105. unset(
  106. $this->pool[$id],
  107. $this->slotsMap
  108. );
  109. return true;
  110. }
  111. return false;
  112. }
  113. /**
  114. * Removes a connection instance by using its identifier.
  115. *
  116. * @param string $connectionID Connection identifier.
  117. *
  118. * @return bool True if the connection was in the pool.
  119. */
  120. public function removeById($connectionID)
  121. {
  122. if (isset($this->pool[$connectionID])) {
  123. unset(
  124. $this->pool[$connectionID],
  125. $this->slotsMap
  126. );
  127. return true;
  128. }
  129. return false;
  130. }
  131. /**
  132. * Generates the current slots map by guessing the cluster configuration out
  133. * of the connection parameters of the connections in the pool.
  134. *
  135. * Generation is based on the same algorithm used by Redis to generate the
  136. * cluster, so it is most effective when all of the connections supplied on
  137. * initialization have the "slots" parameter properly set accordingly to the
  138. * current cluster configuration.
  139. */
  140. public function buildSlotsMap()
  141. {
  142. $this->slotsMap = array();
  143. foreach ($this->pool as $connectionID => $connection) {
  144. $parameters = $connection->getParameters();
  145. if (!isset($parameters->slots)) {
  146. continue;
  147. }
  148. foreach (explode(',', $parameters->slots) as $slotRange) {
  149. $slots = explode('-', $slotRange, 2);
  150. if (!isset($slots[1])) {
  151. $slots[1] = $slots[0];
  152. }
  153. $this->setSlots($slots[0], $slots[1], $connectionID);
  154. }
  155. }
  156. }
  157. /**
  158. * Generates an updated slots map fetching the cluster configuration using
  159. * the CLUSTER SLOTS command against the specified node or a random one from
  160. * the pool.
  161. *
  162. * @param NodeConnectionInterface $connection Optional connection instance.
  163. *
  164. * @return array
  165. */
  166. public function askSlotsMap(NodeConnectionInterface $connection = null)
  167. {
  168. if (!$connection && !$connection = $this->getRandomConnection()) {
  169. return array();
  170. }
  171. $command = RawCommand::create('CLUSTER', 'SLOTS');
  172. $response = $connection->executeCommand($command);
  173. foreach ($response as $slots) {
  174. // We only support master servers for now, so we ignore subsequent
  175. // elements in the $slots array identifying slaves.
  176. list($start, $end, $master) = $slots;
  177. if ($master[0] === '') {
  178. $this->setSlots($start, $end, (string) $connection);
  179. } else {
  180. $this->setSlots($start, $end, "{$master[0]}:{$master[1]}");
  181. }
  182. }
  183. return $this->slotsMap;
  184. }
  185. /**
  186. * Returns the current slots map for the cluster.
  187. *
  188. * The order of the returned $slot => $server dictionary is not guaranteed.
  189. *
  190. * @return array
  191. */
  192. public function getSlotsMap()
  193. {
  194. if (!isset($this->slotsMap)) {
  195. $this->slotsMap = array();
  196. }
  197. return $this->slotsMap;
  198. }
  199. /**
  200. * Pre-associates a connection to a slots range to avoid runtime guessing.
  201. *
  202. * @param int $first Initial slot of the range.
  203. * @param int $last Last slot of the range.
  204. * @param NodeConnectionInterface|string $connection ID or connection instance.
  205. *
  206. * @throws \OutOfBoundsException
  207. */
  208. public function setSlots($first, $last, $connection)
  209. {
  210. if ($first < 0x0000 || $first > 0x3FFF ||
  211. $last < 0x0000 || $last > 0x3FFF ||
  212. $last < $first
  213. ) {
  214. throw new \OutOfBoundsException(
  215. "Invalid slot range for $connection: [$first-$last]."
  216. );
  217. }
  218. $slots = array_fill($first, $last - $first + 1, (string) $connection);
  219. $this->slotsMap = $this->getSlotsMap() + $slots;
  220. }
  221. /**
  222. * Guesses the correct node associated to a given slot using a precalculated
  223. * slots map, falling back to the same logic used by Redis to initialize a
  224. * cluster (best-effort).
  225. *
  226. * @param int $slot Slot index.
  227. *
  228. * @return string Connection ID.
  229. */
  230. protected function guessNode($slot)
  231. {
  232. if (!isset($this->slotsMap)) {
  233. $this->buildSlotsMap();
  234. }
  235. if (isset($this->slotsMap[$slot])) {
  236. return $this->slotsMap[$slot];
  237. }
  238. $count = count($this->pool);
  239. $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
  240. $nodes = array_keys($this->pool);
  241. return $nodes[$index];
  242. }
  243. /**
  244. * Creates a new connection instance from the given connection ID.
  245. *
  246. * @param string $connectionID Identifier for the connection.
  247. *
  248. * @return NodeConnectionInterface
  249. */
  250. protected function createConnection($connectionID)
  251. {
  252. $separator = strrpos($connectionID, ':');
  253. $parameters = array_merge($this->defaultParameters, array(
  254. 'host' => substr($connectionID, 0, $separator),
  255. 'port' => substr($connectionID, $separator + 1),
  256. ));
  257. $connection = $this->connections->create($parameters);
  258. return $connection;
  259. }
  260. /**
  261. * {@inheritdoc}
  262. */
  263. public function getConnection(CommandInterface $command)
  264. {
  265. $slot = $this->strategy->getSlot($command);
  266. if (!isset($slot)) {
  267. throw new NotSupportedException(
  268. "Cannot use '{$command->getId()}' with redis-cluster."
  269. );
  270. }
  271. if (isset($this->slots[$slot])) {
  272. return $this->slots[$slot];
  273. } else {
  274. return $this->getConnectionBySlot($slot);
  275. }
  276. }
  277. /**
  278. * Returns the connection currently associated to a given slot.
  279. *
  280. * @param int $slot Slot index.
  281. *
  282. * @throws \OutOfBoundsException
  283. *
  284. * @return NodeConnectionInterface
  285. */
  286. public function getConnectionBySlot($slot)
  287. {
  288. if ($slot < 0x0000 || $slot > 0x3FFF) {
  289. throw new \OutOfBoundsException("Invalid slot [$slot].");
  290. }
  291. if (isset($this->slots[$slot])) {
  292. return $this->slots[$slot];
  293. }
  294. $connectionID = $this->guessNode($slot);
  295. if (!$connection = $this->getConnectionById($connectionID)) {
  296. $connection = $this->createConnection($connectionID);
  297. $this->pool[$connectionID] = $connection;
  298. }
  299. return $this->slots[$slot] = $connection;
  300. }
  301. /**
  302. * {@inheritdoc}
  303. */
  304. public function getConnectionById($connectionID)
  305. {
  306. if (isset($this->pool[$connectionID])) {
  307. return $this->pool[$connectionID];
  308. }
  309. }
  310. /**
  311. * Returns a random connection from the pool.
  312. *
  313. * @return NodeConnectionInterface|null
  314. */
  315. protected function getRandomConnection()
  316. {
  317. if ($this->pool) {
  318. return $this->pool[array_rand($this->pool)];
  319. }
  320. }
  321. /**
  322. * Permanently associates the connection instance to a new slot.
  323. * The connection is added to the connections pool if not yet included.
  324. *
  325. * @param NodeConnectionInterface $connection Connection instance.
  326. * @param int $slot Target slot index.
  327. */
  328. protected function move(NodeConnectionInterface $connection, $slot)
  329. {
  330. $this->pool[(string) $connection] = $connection;
  331. $this->slots[(int) $slot] = $connection;
  332. }
  333. /**
  334. * Handles -ERR responses returned by Redis.
  335. *
  336. * @param CommandInterface $command Command that generated the -ERR response.
  337. * @param ErrorResponseInterface $error Redis error response object.
  338. *
  339. * @return mixed
  340. */
  341. protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $error)
  342. {
  343. $details = explode(' ', $error->getMessage(), 2);
  344. switch ($details[0]) {
  345. case 'MOVED':
  346. return $this->onMovedResponse($command, $details[1]);
  347. case 'ASK':
  348. return $this->onAskResponse($command, $details[1]);
  349. default:
  350. return $error;
  351. }
  352. }
  353. /**
  354. * Handles -MOVED responses by executing again the command against the node
  355. * indicated by the Redis response.
  356. *
  357. * @param CommandInterface $command Command that generated the -MOVED response.
  358. * @param string $details Parameters of the -MOVED response.
  359. *
  360. * @return mixed
  361. */
  362. protected function onMovedResponse(CommandInterface $command, $details)
  363. {
  364. list($slot, $connectionID) = explode(' ', $details, 2);
  365. if (!$connection = $this->getConnectionById($connectionID)) {
  366. $connection = $this->createConnection($connectionID);
  367. }
  368. if ($this->useClusterSlots) {
  369. $this->askSlotsMap($connection);
  370. }
  371. $this->move($connection, $slot);
  372. $response = $this->executeCommand($command);
  373. return $response;
  374. }
  375. /**
  376. * Handles -ASK responses by executing again the command against the node
  377. * indicated by the Redis response.
  378. *
  379. * @param CommandInterface $command Command that generated the -ASK response.
  380. * @param string $details Parameters of the -ASK response.
  381. *
  382. * @return mixed
  383. */
  384. protected function onAskResponse(CommandInterface $command, $details)
  385. {
  386. list($slot, $connectionID) = explode(' ', $details, 2);
  387. if (!$connection = $this->getConnectionById($connectionID)) {
  388. $connection = $this->createConnection($connectionID);
  389. }
  390. $connection->executeCommand(RawCommand::create('ASKING'));
  391. $response = $connection->executeCommand($command);
  392. return $response;
  393. }
  394. /**
  395. * {@inheritdoc}
  396. */
  397. public function writeRequest(CommandInterface $command)
  398. {
  399. $this->getConnection($command)->writeRequest($command);
  400. }
  401. /**
  402. * {@inheritdoc}
  403. */
  404. public function readResponse(CommandInterface $command)
  405. {
  406. return $this->getConnection($command)->readResponse($command);
  407. }
  408. /**
  409. * {@inheritdoc}
  410. */
  411. public function executeCommand(CommandInterface $command)
  412. {
  413. $connection = $this->getConnection($command);
  414. $response = $connection->executeCommand($command);
  415. if ($response instanceof ErrorResponseInterface) {
  416. return $this->onErrorResponse($command, $response);
  417. }
  418. return $response;
  419. }
  420. /**
  421. * {@inheritdoc}
  422. */
  423. public function count()
  424. {
  425. return count($this->pool);
  426. }
  427. /**
  428. * {@inheritdoc}
  429. */
  430. public function getIterator()
  431. {
  432. return new \ArrayIterator(array_values($this->pool));
  433. }
  434. /**
  435. * Returns the underlying command hash strategy used to hash commands by
  436. * using keys found in their arguments.
  437. *
  438. * @return StrategyInterface
  439. */
  440. public function getClusterStrategy()
  441. {
  442. return $this->strategy;
  443. }
  444. /**
  445. * Returns the underlying connection factory used to create new connection
  446. * instances to Redis nodes indicated by redis-cluster.
  447. *
  448. * @return FactoryInterface
  449. */
  450. public function getConnectionFactory()
  451. {
  452. return $this->connections;
  453. }
  454. /**
  455. * Enables automatic fetching of the current slots map from one of the nodes
  456. * using the CLUSTER SLOTS command. This option is enabled by default as
  457. * asking the current slots map to Redis upon -MOVED responses may reduce
  458. * overhead by eliminating the trial-and-error nature of the node guessing
  459. * procedure, mostly when targeting many keys that would end up in a lot of
  460. * redirections.
  461. *
  462. * The slots map can still be manually fetched using the askSlotsMap()
  463. * method whether or not this option is enabled.
  464. *
  465. * @param bool $value Enable or disable the use of CLUSTER SLOTS.
  466. */
  467. public function useClusterSlots($value)
  468. {
  469. $this->useClusterSlots = (bool) $value;
  470. }
  471. /**
  472. * Sets a default array of connection parameters to be applied when creating
  473. * new connection instances on the fly when they are not part of the initial
  474. * pool supplied upon cluster initialization.
  475. *
  476. * These parameters are not applied to connections added to the pool using
  477. * the add() method.
  478. *
  479. * @param array $parameters Array of connection parameters.
  480. */
  481. public function setDefaultParameters(array $parameters)
  482. {
  483. $this->defaultParameters = array_merge(
  484. $this->defaultParameters,
  485. $parameters ?: array()
  486. );
  487. }
  488. }