RedisCluster.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use ArrayIterator;
  12. use Countable;
  13. use IteratorAggregate;
  14. use OutOfBoundsException;
  15. use Predis\NotSupportedException;
  16. use Predis\ResponseErrorInterface;
  17. use Predis\Cluster\CommandHashStrategyInterface;
  18. use Predis\Cluster\RedisClusterHashStrategy;
  19. use Predis\Command\CommandInterface;
  20. use Predis\Command\RawCommand;
  21. /**
  22. * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
  23. *
  24. * This connection backend offers smart support for redis-cluster by handling
  25. * automatic slots map (re)generation upon -MOVE or -ASK responses returned by
  26. * Redis when redirecting a client to a different node.
  27. *
  28. * The cluster can be pre-initialized using only a subset of the actual nodes in
  29. * the cluster, Predis will do the rest by adjusting the slots map and creating
  30. * the missing underlying connection instances on the fly.
  31. *
  32. * It is possible to pre-associate connections to a slots range with the "slots"
  33. * parameter in the form "$first-$last". This can greatly reduce runtime node
  34. * guessing and redirections.
  35. *
  36. * It is also possible to ask for the full and updated slots map directly to one
  37. * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
  38. * Asking for the cluster configuration to Redis is actually done by issuing a
  39. * CLUSTER NODES command to a random node in the pool.
  40. *
  41. * @author Daniele Alessandri <suppakilla@gmail.com>
  42. */
  43. class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Countable
  44. {
  45. private $askClusterNodes = false;
  46. private $defaultParameters = array();
  47. private $pool = array();
  48. private $slots = array();
  49. private $slotsMap;
  50. private $strategy;
  51. private $connections;
  52. /**
  53. * @param ConnectionFactoryInterface $connections Connection factory object.
  54. */
  55. public function __construct(ConnectionFactoryInterface $connections = null)
  56. {
  57. $this->strategy = new RedisClusterHashStrategy();
  58. $this->connections = $connections ?: new ConnectionFactory();
  59. }
  60. /**
  61. * {@inheritdoc}
  62. */
  63. public function isConnected()
  64. {
  65. foreach ($this->pool as $connection) {
  66. if ($connection->isConnected()) {
  67. return true;
  68. }
  69. }
  70. return false;
  71. }
  72. /**
  73. * {@inheritdoc}
  74. */
  75. public function connect()
  76. {
  77. if ($connection = $this->getRandomConnection()) {
  78. $connection->connect();
  79. }
  80. }
  81. /**
  82. * {@inheritdoc}
  83. */
  84. public function disconnect()
  85. {
  86. foreach ($this->pool as $connection) {
  87. $connection->disconnect();
  88. }
  89. }
  90. /**
  91. * {@inheritdoc}
  92. */
  93. public function add(SingleConnectionInterface $connection)
  94. {
  95. $this->pool[(string) $connection] = $connection;
  96. unset($this->slotsMap);
  97. }
  98. /**
  99. * {@inheritdoc}
  100. */
  101. public function remove(SingleConnectionInterface $connection)
  102. {
  103. if (false !== $id = array_search($connection, $this->pool, true)) {
  104. unset(
  105. $this->pool[$id],
  106. $this->slotsMap
  107. );
  108. return true;
  109. }
  110. return false;
  111. }
  112. /**
  113. * Removes a connection instance by using its identifier.
  114. *
  115. * @param string $connectionID Connection identifier.
  116. * @return bool True if the connection was in the pool.
  117. */
  118. public function removeById($connectionID)
  119. {
  120. if (isset($this->pool[$connectionID])) {
  121. unset(
  122. $this->pool[$connectionID],
  123. $this->slotsMap
  124. );
  125. return true;
  126. }
  127. return false;
  128. }
  129. /**
  130. * Generates the current slots map by guessing the cluster configuration out
  131. * of the connection parameters of the connections in the pool.
  132. *
  133. * Generation is based on the same algorithm used by Redis to generate the
  134. * cluster, so it is most effective when all of the connections supplied on
  135. * initialization have the "slots" parameter properly set accordingly to the
  136. * current cluster configuration.
  137. */
  138. public function buildSlotsMap()
  139. {
  140. $this->slotsMap = array();
  141. foreach ($this->pool as $connectionID => $connection) {
  142. $parameters = $connection->getParameters();
  143. if (!isset($parameters->slots)) {
  144. continue;
  145. }
  146. $slots = explode('-', $parameters->slots, 2);
  147. $this->setSlots($slots[0], $slots[1], $connectionID);
  148. }
  149. }
  150. /**
  151. * Generates the current slots map by fetching the cluster configuration to
  152. * one of the nodes by leveraging the CLUSTER NODES command.
  153. */
  154. public function askClusterNodes()
  155. {
  156. if (!$connection = $this->getRandomConnection()) {
  157. return array();
  158. }
  159. $cmdCluster = RawCommand::create('CLUSTER', 'NODES');
  160. $response = $connection->executeCommand($cmdCluster);
  161. $nodes = explode("\n", $response, -1);
  162. $count = count($nodes);
  163. for ($i = 0; $i < $count; $i++) {
  164. $node = explode(' ', $nodes[$i], 9);
  165. if (false === strpos($node[2], 'master')) {
  166. continue;
  167. }
  168. $slots = explode('-', $node[8], 2);
  169. if ($node[1] === ':0') {
  170. $this->setSlots($slots[0], $slots[1], (string) $connection);
  171. } else {
  172. $this->setSlots($slots[0], $slots[1], $node[1]);
  173. }
  174. }
  175. }
  176. /**
  177. * Returns the current slots map for the cluster.
  178. *
  179. * @return array
  180. */
  181. public function getSlotsMap()
  182. {
  183. if (!isset($this->slotsMap)) {
  184. $this->slotsMap = array();
  185. }
  186. return $this->slotsMap;
  187. }
  188. /**
  189. * Pre-associates a connection to a slots range to avoid runtime guessing.
  190. *
  191. * @param int $first Initial slot of the range.
  192. * @param int $last Last slot of the range.
  193. * @param SingleConnectionInterface|string $connection ID or connection instance.
  194. */
  195. public function setSlots($first, $last, $connection)
  196. {
  197. if ($first < 0x0000 || $first > 0x3FFF ||
  198. $last < 0x0000 || $last > 0x3FFF ||
  199. $last < $first
  200. ) {
  201. throw new OutOfBoundsException(
  202. "Invalid slot range for $connection: [$first-$last]"
  203. );
  204. }
  205. $slots = array_fill($first, $last - $first + 1, (string) $connection);
  206. $this->slotsMap = $this->getSlotsMap() + $slots;
  207. }
  208. /**
  209. * Guesses the correct node associated to a given slot using a precalculated
  210. * slots map, falling back to the same logic used by Redis to initialize a
  211. * cluster (best-effort).
  212. *
  213. * @param int $slot Slot index.
  214. * @return string Connection ID.
  215. */
  216. protected function guessNode($slot)
  217. {
  218. if (!isset($this->slotsMap)) {
  219. $this->buildSlotsMap();
  220. }
  221. if (isset($this->slotsMap[$slot])) {
  222. return $this->slotsMap[$slot];
  223. }
  224. $count = count($this->pool);
  225. $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
  226. $nodes = array_keys($this->pool);
  227. return $nodes[$index];
  228. }
  229. /**
  230. * Creates a new connection instance from the given connection ID.
  231. *
  232. * @param string $connectionID Identifier for the connection.
  233. * @return SingleConnectionInterface
  234. */
  235. protected function createConnection($connectionID)
  236. {
  237. $host = explode(':', $connectionID, 2);
  238. $parameters = array_merge($this->defaultParameters, array(
  239. 'host' => $host[0],
  240. 'port' => $host[1],
  241. ));
  242. $connection = $this->connections->create($parameters);
  243. return $connection;
  244. }
  245. /**
  246. * {@inheritdoc}
  247. */
  248. public function getConnection(CommandInterface $command)
  249. {
  250. $hash = $this->strategy->getHash($command);
  251. if (!isset($hash)) {
  252. throw new NotSupportedException(
  253. "Cannot use {$command->getId()} with redis-cluster"
  254. );
  255. }
  256. $slot = $hash & 0x3FFF;
  257. if (isset($this->slots[$slot])) {
  258. return $this->slots[$slot];
  259. } else {
  260. return $this->getConnectionBySlot($slot);
  261. }
  262. }
  263. /**
  264. * Returns the connection currently associated to a given slot.
  265. *
  266. * @param int $slot Slot index.
  267. * @return SingleConnectionInterface
  268. */
  269. public function getConnectionBySlot($slot)
  270. {
  271. if ($slot < 0x0000 || $slot > 0x3FFF) {
  272. throw new OutOfBoundsException("Invalid slot [$slot]");
  273. }
  274. if (isset($this->slots[$slot])) {
  275. return $this->slots[$slot];
  276. }
  277. $connectionID = $this->guessNode($slot);
  278. if (!$connection = $this->getConnectionById($connectionID)) {
  279. $connection = $this->createConnection($connectionID);
  280. $this->pool[$connectionID] = $connection;
  281. }
  282. return $this->slots[$slot] = $connection;
  283. }
  284. /**
  285. * {@inheritdoc}
  286. */
  287. public function getConnectionById($connectionID)
  288. {
  289. if (isset($this->pool[$connectionID])) {
  290. return $this->pool[$connectionID];
  291. }
  292. }
  293. /**
  294. * Returns a random connection from the pool.
  295. *
  296. * @return SingleConnectionInterface
  297. */
  298. protected function getRandomConnection()
  299. {
  300. if ($this->pool) {
  301. return $this->pool[array_rand($this->pool)];
  302. }
  303. }
  304. /**
  305. * Permanently associates the connection instance to a new slot.
  306. * The connection is added to the connections pool if not yet included.
  307. *
  308. * @param SingleConnectionInterface $connection Connection instance.
  309. * @param int $slot Target slot index.
  310. */
  311. protected function move(SingleConnectionInterface $connection, $slot)
  312. {
  313. $this->pool[(string) $connection] = $connection;
  314. $this->slots[(int) $slot] = $connection;
  315. }
  316. /**
  317. * Handles -ERR responses returned by Redis.
  318. *
  319. * @param CommandInterface $command Command that generated the -ERR response.
  320. * @param ResponseErrorInterface $error Redis error response object.
  321. * @return mixed
  322. */
  323. protected function onErrorResponse(CommandInterface $command, ResponseErrorInterface $error)
  324. {
  325. $details = explode(' ', $error->getMessage(), 2);
  326. switch ($details[0]) {
  327. case 'MOVED':
  328. return $this->onMovedResponse($command, $details[1]);
  329. case 'ASK':
  330. return $this->onAskResponse($command, $details[1]);
  331. default:
  332. return $error;
  333. }
  334. }
  335. /**
  336. * Handles -MOVED responses by executing again the command against the node
  337. * indicated by the Redis response.
  338. *
  339. * @param CommandInterface $command Command that generated the -MOVED response.
  340. * @param string $details Parameters of the -MOVED response.
  341. * @return mixed
  342. */
  343. protected function onMovedResponse(CommandInterface $command, $details)
  344. {
  345. list($slot, $connectionID) = explode(' ', $details, 2);
  346. if (!$connection = $this->getConnectionById($connectionID)) {
  347. $connection = $this->createConnection($connectionID);
  348. }
  349. if ($this->askClusterNodes) {
  350. $this->askClusterNodes();
  351. }
  352. $this->move($connection, $slot);
  353. $response = $this->executeCommand($command);
  354. return $response;
  355. }
  356. /**
  357. * Handles -ASK responses by executing again the command against the node
  358. * indicated by the Redis response.
  359. *
  360. * @param CommandInterface $command Command that generated the -ASK response.
  361. * @param string $details Parameters of the -ASK response.
  362. * @return mixed
  363. */
  364. protected function onAskResponse(CommandInterface $command, $details)
  365. {
  366. list($slot, $connectionID) = explode(' ', $details, 2);
  367. if (!$connection = $this->getConnectionById($connectionID)) {
  368. $connection = $this->createConnection($connectionID);
  369. }
  370. $connection->executeCommand(RawCommand::create('ASKING'));
  371. $response = $connection->executeCommand($command);
  372. return $response;
  373. }
  374. /**
  375. * {@inheritdoc}
  376. */
  377. public function writeCommand(CommandInterface $command)
  378. {
  379. $this->getConnection($command)->writeCommand($command);
  380. }
  381. /**
  382. * {@inheritdoc}
  383. */
  384. public function readResponse(CommandInterface $command)
  385. {
  386. return $this->getConnection($command)->readResponse($command);
  387. }
  388. /**
  389. * {@inheritdoc}
  390. */
  391. public function executeCommand(CommandInterface $command)
  392. {
  393. $connection = $this->getConnection($command);
  394. $response = $connection->executeCommand($command);
  395. if ($response instanceof ResponseErrorInterface) {
  396. return $this->onErrorResponse($command, $response);
  397. }
  398. return $response;
  399. }
  400. /**
  401. * {@inheritdoc}
  402. */
  403. public function count()
  404. {
  405. return count($this->pool);
  406. }
  407. /**
  408. * {@inheritdoc}
  409. */
  410. public function getIterator()
  411. {
  412. return new ArrayIterator(array_values($this->pool));
  413. }
  414. /**
  415. * Returns the underlying hash strategy used to hash commands by their keys.
  416. *
  417. * @return CommandHashStrategyInterface
  418. */
  419. public function getCommandHashStrategy()
  420. {
  421. return $this->strategy;
  422. }
  423. /**
  424. * Enables automatic fetching of the current slots map from one of the nodes
  425. * using the CLUSTER NODES command. This option is disabled by default but
  426. * asking the current slots map to Redis upon -MOVE responses may reduce
  427. * overhead by eliminating the trial-and-error nature of the node guessing
  428. * procedure, mostly when targeting many keys that would end up in a lot of
  429. * redirections.
  430. *
  431. * The slots map can still be manually fetched using the askClusterNodes()
  432. * method whether or not this option is enabled.
  433. *
  434. * @param bool $value Enable or disable the use of CLUSTER NODES.
  435. */
  436. public function enableClusterNodes($value)
  437. {
  438. $this->askClusterNodes = (bool) $value;
  439. }
  440. /**
  441. * Sets a default array of connection parameters to be applied when creating
  442. * new connection instances on the fly when they are not part of the initial
  443. * pool supplied upon cluster initialization.
  444. *
  445. * These parameters are not applied to connections added to the pool using
  446. * the add() method.
  447. *
  448. * @param array $parameters Array of connection parameters.
  449. */
  450. public function setDefaultParameters(array $parameters)
  451. {
  452. $this->defaultParameters = array_merge(
  453. $this->defaultParameters,
  454. $parameters ?: array()
  455. );
  456. }
  457. }