RedisCluster.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use ArrayIterator;
  12. use Countable;
  13. use IteratorAggregate;
  14. use OutOfBoundsException;
  15. use Predis\NotSupportedException;
  16. use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
  17. use Predis\Command\CommandInterface;
  18. use Predis\Command\RawCommand;
  19. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  20. /**
  21. * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
  22. *
  23. * This connection backend offers smart support for redis-cluster by handling
  24. * automatic slots map (re)generation upon -MOVE or -ASK responses returned by
  25. * Redis when redirecting a client to a different node.
  26. *
  27. * The cluster can be pre-initialized using only a subset of the actual nodes in
  28. * the cluster, Predis will do the rest by adjusting the slots map and creating
  29. * the missing underlying connection instances on the fly.
  30. *
  31. * It is possible to pre-associate connections to a slots range with the "slots"
  32. * parameter in the form "$first-$last". This can greatly reduce runtime node
  33. * guessing and redirections.
  34. *
  35. * It is also possible to ask for the full and updated slots map directly to one
  36. * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
  37. * Asking for the cluster configuration to Redis is actually done by issuing a
  38. * CLUSTER NODES command to a random node in the pool.
  39. *
  40. * @author Daniele Alessandri <suppakilla@gmail.com>
  41. */
  42. class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Countable
  43. {
  44. private $askClusterNodes = false;
  45. private $defaultParameters = array();
  46. private $pool = array();
  47. private $slots = array();
  48. private $slotsMap;
  49. private $strategy;
  50. private $connections;
  51. /**
  52. * @param FactoryInterface $connections Connection factory object.
  53. */
  54. public function __construct(FactoryInterface $connections = null)
  55. {
  56. $this->strategy = new RedisClusterStrategy();
  57. $this->connections = $connections ?: new Factory();
  58. }
  59. /**
  60. * {@inheritdoc}
  61. */
  62. public function isConnected()
  63. {
  64. foreach ($this->pool as $connection) {
  65. if ($connection->isConnected()) {
  66. return true;
  67. }
  68. }
  69. return false;
  70. }
  71. /**
  72. * {@inheritdoc}
  73. */
  74. public function connect()
  75. {
  76. if ($connection = $this->getRandomConnection()) {
  77. $connection->connect();
  78. }
  79. }
  80. /**
  81. * {@inheritdoc}
  82. */
  83. public function disconnect()
  84. {
  85. foreach ($this->pool as $connection) {
  86. $connection->disconnect();
  87. }
  88. }
  89. /**
  90. * {@inheritdoc}
  91. */
  92. public function add(SingleConnectionInterface $connection)
  93. {
  94. $this->pool[(string) $connection] = $connection;
  95. unset($this->slotsMap);
  96. }
  97. /**
  98. * {@inheritdoc}
  99. */
  100. public function remove(SingleConnectionInterface $connection)
  101. {
  102. if (false !== $id = array_search($connection, $this->pool, true)) {
  103. unset(
  104. $this->pool[$id],
  105. $this->slotsMap
  106. );
  107. return true;
  108. }
  109. return false;
  110. }
  111. /**
  112. * Removes a connection instance by using its identifier.
  113. *
  114. * @param string $connectionID Connection identifier.
  115. * @return bool True if the connection was in the pool.
  116. */
  117. public function removeById($connectionID)
  118. {
  119. if (isset($this->pool[$connectionID])) {
  120. unset(
  121. $this->pool[$connectionID],
  122. $this->slotsMap
  123. );
  124. return true;
  125. }
  126. return false;
  127. }
  128. /**
  129. * Generates the current slots map by guessing the cluster configuration out
  130. * of the connection parameters of the connections in the pool.
  131. *
  132. * Generation is based on the same algorithm used by Redis to generate the
  133. * cluster, so it is most effective when all of the connections supplied on
  134. * initialization have the "slots" parameter properly set accordingly to the
  135. * current cluster configuration.
  136. */
  137. public function buildSlotsMap()
  138. {
  139. $this->slotsMap = array();
  140. foreach ($this->pool as $connectionID => $connection) {
  141. $parameters = $connection->getParameters();
  142. if (!isset($parameters->slots)) {
  143. continue;
  144. }
  145. $slots = explode('-', $parameters->slots, 2);
  146. $this->setSlots($slots[0], $slots[1], $connectionID);
  147. }
  148. }
  149. /**
  150. * Generates the current slots map by fetching the cluster configuration to
  151. * one of the nodes by leveraging the CLUSTER NODES command.
  152. */
  153. public function askClusterNodes()
  154. {
  155. if (!$connection = $this->getRandomConnection()) {
  156. return array();
  157. }
  158. $cmdCluster = RawCommand::create('CLUSTER', 'NODES');
  159. $response = $connection->executeCommand($cmdCluster);
  160. $nodes = explode("\n", $response, -1);
  161. $count = count($nodes);
  162. for ($i = 0; $i < $count; $i++) {
  163. $node = explode(' ', $nodes[$i], 9);
  164. $slots = explode('-', $node[8], 2);
  165. if ($node[1] === ':0') {
  166. $this->setSlots($slots[0], $slots[1], (string) $connection);
  167. } else {
  168. $this->setSlots($slots[0], $slots[1], $node[1]);
  169. }
  170. }
  171. }
  172. /**
  173. * Returns the current slots map for the cluster.
  174. *
  175. * @return array
  176. */
  177. public function getSlotsMap()
  178. {
  179. if (!isset($this->slotsMap)) {
  180. $this->slotsMap = array();
  181. }
  182. return $this->slotsMap;
  183. }
  184. /**
  185. * Pre-associates a connection to a slots range to avoid runtime guessing.
  186. *
  187. * @param int $first Initial slot of the range.
  188. * @param int $last Last slot of the range.
  189. * @param SingleConnectionInterface|string $connection ID or connection instance.
  190. */
  191. public function setSlots($first, $last, $connection)
  192. {
  193. if ($first < 0x0000 || $first > 0x3FFF ||
  194. $last < 0x0000 || $last > 0x3FFF ||
  195. $last < $first
  196. ) {
  197. throw new OutOfBoundsException(
  198. "Invalid slot range for $connection: [$first-$last]."
  199. );
  200. }
  201. $slots = array_fill($first, $last - $first + 1, (string) $connection);
  202. $this->slotsMap = $this->getSlotsMap() + $slots;
  203. }
  204. /**
  205. * Guesses the correct node associated to a given slot using a precalculated
  206. * slots map, falling back to the same logic used by Redis to initialize a
  207. * cluster (best-effort).
  208. *
  209. * @param int $slot Slot index.
  210. * @return string Connection ID.
  211. */
  212. protected function guessNode($slot)
  213. {
  214. if (!isset($this->slotsMap)) {
  215. $this->buildSlotsMap();
  216. }
  217. if (isset($this->slotsMap[$slot])) {
  218. return $this->slotsMap[$slot];
  219. }
  220. $count = count($this->pool);
  221. $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
  222. $nodes = array_keys($this->pool);
  223. return $nodes[$index];
  224. }
  225. /**
  226. * Creates a new connection instance from the given connection ID.
  227. *
  228. * @param string $connectionID Identifier for the connection.
  229. * @return SingleConnectionInterface
  230. */
  231. protected function createConnection($connectionID)
  232. {
  233. $host = explode(':', $connectionID, 2);
  234. $parameters = array_merge($this->defaultParameters, array(
  235. 'host' => $host[0],
  236. 'port' => $host[1],
  237. ));
  238. $connection = $this->connections->create($parameters);
  239. return $connection;
  240. }
  241. /**
  242. * {@inheritdoc}
  243. */
  244. public function getConnection(CommandInterface $command)
  245. {
  246. $hash = $this->strategy->getHash($command);
  247. if (!isset($hash)) {
  248. throw new NotSupportedException(
  249. "Cannot use '{$command->getId()}' with redis-cluster."
  250. );
  251. }
  252. $slot = $hash & 0x3FFF;
  253. if (isset($this->slots[$slot])) {
  254. return $this->slots[$slot];
  255. } else {
  256. return $this->getConnectionBySlot($slot);
  257. }
  258. }
  259. /**
  260. * Returns the connection currently associated to a given slot.
  261. *
  262. * @param int $slot Slot index.
  263. * @return SingleConnectionInterface
  264. */
  265. public function getConnectionBySlot($slot)
  266. {
  267. if ($slot < 0x0000 || $slot > 0x3FFF) {
  268. throw new OutOfBoundsException("Invalid slot [$slot].");
  269. }
  270. if (isset($this->slots[$slot])) {
  271. return $this->slots[$slot];
  272. }
  273. $connectionID = $this->guessNode($slot);
  274. if (!$connection = $this->getConnectionById($connectionID)) {
  275. $connection = $this->createConnection($connectionID);
  276. $this->pool[$connectionID] = $connection;
  277. }
  278. return $this->slots[$slot] = $connection;
  279. }
  280. /**
  281. * {@inheritdoc}
  282. */
  283. public function getConnectionById($connectionID)
  284. {
  285. if (isset($this->pool[$connectionID])) {
  286. return $this->pool[$connectionID];
  287. }
  288. }
  289. /**
  290. * Returns a random connection from the pool.
  291. *
  292. * @return SingleConnectionInterface
  293. */
  294. protected function getRandomConnection()
  295. {
  296. if ($this->pool) {
  297. return $this->pool[array_rand($this->pool)];
  298. }
  299. }
  300. /**
  301. * Permanently associates the connection instance to a new slot.
  302. * The connection is added to the connections pool if not yet included.
  303. *
  304. * @param SingleConnectionInterface $connection Connection instance.
  305. * @param int $slot Target slot index.
  306. */
  307. protected function move(SingleConnectionInterface $connection, $slot)
  308. {
  309. $this->pool[(string) $connection] = $connection;
  310. $this->slots[(int) $slot] = $connection;
  311. }
  312. /**
  313. * Handles -ERR responses returned by Redis.
  314. *
  315. * @param CommandInterface $command Command that generated the -ERR response.
  316. * @param ErrorResponseInterface $error Redis error response object.
  317. * @return mixed
  318. */
  319. protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $error)
  320. {
  321. $details = explode(' ', $error->getMessage(), 2);
  322. switch ($details[0]) {
  323. case 'MOVED':
  324. return $this->onMovedResponse($command, $details[1]);
  325. case 'ASK':
  326. return $this->onAskResponse($command, $details[1]);
  327. default:
  328. return $error;
  329. }
  330. }
  331. /**
  332. * Handles -MOVED responses by executing again the command against the node
  333. * indicated by the Redis response.
  334. *
  335. * @param CommandInterface $command Command that generated the -MOVED response.
  336. * @param string $details Parameters of the -MOVED response.
  337. * @return mixed
  338. */
  339. protected function onMovedResponse(CommandInterface $command, $details)
  340. {
  341. list($slot, $connectionID) = explode(' ', $details, 2);
  342. if (!$connection = $this->getConnectionById($connectionID)) {
  343. $connection = $this->createConnection($connectionID);
  344. }
  345. if ($this->askClusterNodes) {
  346. $this->askClusterNodes();
  347. }
  348. $this->move($connection, $slot);
  349. $response = $this->executeCommand($command);
  350. return $response;
  351. }
  352. /**
  353. * Handles -ASK responses by executing again the command against the node
  354. * indicated by the Redis response.
  355. *
  356. * @param CommandInterface $command Command that generated the -ASK response.
  357. * @param string $details Parameters of the -ASK response.
  358. * @return mixed
  359. */
  360. protected function onAskResponse(CommandInterface $command, $details)
  361. {
  362. list($slot, $connectionID) = explode(' ', $details, 2);
  363. if (!$connection = $this->getConnectionById($connectionID)) {
  364. $connection = $this->createConnection($connectionID);
  365. }
  366. $connection->executeCommand(RawCommand::create('ASKING'));
  367. $response = $connection->executeCommand($command);
  368. return $response;
  369. }
  370. /**
  371. * {@inheritdoc}
  372. */
  373. public function writeRequest(CommandInterface $command)
  374. {
  375. $this->getConnection($command)->writeRequest($command);
  376. }
  377. /**
  378. * {@inheritdoc}
  379. */
  380. public function readResponse(CommandInterface $command)
  381. {
  382. return $this->getConnection($command)->readResponse($command);
  383. }
  384. /**
  385. * {@inheritdoc}
  386. */
  387. public function executeCommand(CommandInterface $command)
  388. {
  389. $connection = $this->getConnection($command);
  390. $response = $connection->executeCommand($command);
  391. if ($response instanceof ErrorResponseInterface) {
  392. return $this->onErrorResponse($command, $response);
  393. }
  394. return $response;
  395. }
  396. /**
  397. * {@inheritdoc}
  398. */
  399. public function count()
  400. {
  401. return count($this->pool);
  402. }
  403. /**
  404. * {@inheritdoc}
  405. */
  406. public function getIterator()
  407. {
  408. return new ArrayIterator(array_values($this->pool));
  409. }
  410. /**
  411. * Returns the underlying command hash strategy used to hash commands by
  412. * using keys found in their arguments.
  413. *
  414. * @return \Predis\Cluster\StrategyInterface
  415. */
  416. public function getClusterStrategy()
  417. {
  418. return $this->strategy;
  419. }
  420. /**
  421. * Enables automatic fetching of the current slots map from one of the nodes
  422. * using the CLUSTER NODES command. This option is disabled by default but
  423. * asking the current slots map to Redis upon -MOVE responses may reduce
  424. * overhead by eliminating the trial-and-error nature of the node guessing
  425. * procedure, mostly when targeting many keys that would end up in a lot of
  426. * redirections.
  427. *
  428. * The slots map can still be manually fetched using the askClusterNodes()
  429. * method whether or not this option is enabled.
  430. *
  431. * @param bool $value Enable or disable the use of CLUSTER NODES.
  432. */
  433. public function enableClusterNodes($value)
  434. {
  435. $this->askClusterNodes = (bool) $value;
  436. }
  437. /**
  438. * Sets a default array of connection parameters to be applied when creating
  439. * new connection instances on the fly when they are not part of the initial
  440. * pool supplied upon cluster initialization.
  441. *
  442. * These parameters are not applied to connections added to the pool using
  443. * the add() method.
  444. *
  445. * @param array $parameters Array of connection parameters.
  446. */
  447. public function setDefaultParameters(array $parameters)
  448. {
  449. $this->defaultParameters = array_merge(
  450. $this->defaultParameters,
  451. $parameters ?: array()
  452. );
  453. }
  454. }