RedisCluster.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection\Aggregate;
  11. use ArrayIterator;
  12. use Countable;
  13. use IteratorAggregate;
  14. use OutOfBoundsException;
  15. use Predis\NotSupportedException;
  16. use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
  17. use Predis\Command\CommandInterface;
  18. use Predis\Command\RawCommand;
  19. use Predis\Connection\NodeConnectionInterface;
  20. use Predis\Connection\Factory;
  21. use Predis\Connection\FactoryInterface;
  22. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  23. /**
  24. * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
  25. *
  26. * This connection backend offers smart support for redis-cluster by handling
  27. * automatic slots map (re)generation upon -MOVE or -ASK responses returned by
  28. * Redis when redirecting a client to a different node.
  29. *
  30. * The cluster can be pre-initialized using only a subset of the actual nodes in
  31. * the cluster, Predis will do the rest by adjusting the slots map and creating
  32. * the missing underlying connection instances on the fly.
  33. *
  34. * It is possible to pre-associate connections to a slots range with the "slots"
  35. * parameter in the form "$first-$last". This can greatly reduce runtime node
  36. * guessing and redirections.
  37. *
  38. * It is also possible to ask for the full and updated slots map directly to one
  39. * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
  40. * Asking for the cluster configuration to Redis is actually done by issuing a
  41. * CLUSTER NODES command to a random node in the pool.
  42. *
  43. * @author Daniele Alessandri <suppakilla@gmail.com>
  44. */
  45. class RedisCluster implements ClusterInterface, IteratorAggregate, Countable
  46. {
  47. private $askSlotsMap = false;
  48. private $defaultParameters = array();
  49. private $pool = array();
  50. private $slots = array();
  51. private $slotsMap;
  52. private $strategy;
  53. private $connections;
  54. /**
  55. * @param FactoryInterface $connections Connection factory object.
  56. */
  57. public function __construct(FactoryInterface $connections = null)
  58. {
  59. $this->strategy = new RedisClusterStrategy();
  60. $this->connections = $connections ?: new Factory();
  61. }
  62. /**
  63. * {@inheritdoc}
  64. */
  65. public function isConnected()
  66. {
  67. foreach ($this->pool as $connection) {
  68. if ($connection->isConnected()) {
  69. return true;
  70. }
  71. }
  72. return false;
  73. }
  74. /**
  75. * {@inheritdoc}
  76. */
  77. public function connect()
  78. {
  79. if ($connection = $this->getRandomConnection()) {
  80. $connection->connect();
  81. }
  82. }
  83. /**
  84. * {@inheritdoc}
  85. */
  86. public function disconnect()
  87. {
  88. foreach ($this->pool as $connection) {
  89. $connection->disconnect();
  90. }
  91. }
  92. /**
  93. * {@inheritdoc}
  94. */
  95. public function add(NodeConnectionInterface $connection)
  96. {
  97. $this->pool[(string) $connection] = $connection;
  98. unset($this->slotsMap);
  99. }
  100. /**
  101. * {@inheritdoc}
  102. */
  103. public function remove(NodeConnectionInterface $connection)
  104. {
  105. if (false !== $id = array_search($connection, $this->pool, true)) {
  106. unset(
  107. $this->pool[$id],
  108. $this->slotsMap
  109. );
  110. return true;
  111. }
  112. return false;
  113. }
  114. /**
  115. * Removes a connection instance by using its identifier.
  116. *
  117. * @param string $connectionID Connection identifier.
  118. * @return bool True if the connection was in the pool.
  119. */
  120. public function removeById($connectionID)
  121. {
  122. if (isset($this->pool[$connectionID])) {
  123. unset(
  124. $this->pool[$connectionID],
  125. $this->slotsMap
  126. );
  127. return true;
  128. }
  129. return false;
  130. }
  131. /**
  132. * Generates the current slots map by guessing the cluster configuration out
  133. * of the connection parameters of the connections in the pool.
  134. *
  135. * Generation is based on the same algorithm used by Redis to generate the
  136. * cluster, so it is most effective when all of the connections supplied on
  137. * initialization have the "slots" parameter properly set accordingly to the
  138. * current cluster configuration.
  139. */
  140. public function buildSlotsMap()
  141. {
  142. $this->slotsMap = array();
  143. foreach ($this->pool as $connectionID => $connection) {
  144. $parameters = $connection->getParameters();
  145. if (!isset($parameters->slots)) {
  146. continue;
  147. }
  148. $slots = explode('-', $parameters->slots, 2);
  149. $this->setSlots($slots[0], $slots[1], $connectionID);
  150. }
  151. }
  152. /**
  153. * Generates the current slots map by fetching the cluster configuration to
  154. * one of the nodes by leveraging the CLUSTER NODES command.
  155. *
  156. * @return array
  157. */
  158. public function askSlotsMap()
  159. {
  160. if (!$connection = $this->getRandomConnection()) {
  161. return array();
  162. }
  163. $cmdCluster = RawCommand::create('CLUSTER', 'NODES');
  164. $response = $connection->executeCommand($cmdCluster);
  165. $nodes = explode("\n", $response, -1);
  166. $count = count($nodes);
  167. for ($i = 0; $i < $count; $i++) {
  168. $node = explode(' ', $nodes[$i], 9);
  169. if (false === strpos($node[2], 'master')) {
  170. continue;
  171. }
  172. $slots = explode('-', $node[8], 2);
  173. if ($node[1] === ':0') {
  174. $this->setSlots($slots[0], $slots[1], (string) $connection);
  175. } else {
  176. $this->setSlots($slots[0], $slots[1], $node[1]);
  177. }
  178. }
  179. return $this->slotsMap;
  180. }
  181. /**
  182. * Returns the current slots map for the cluster.
  183. *
  184. * @return array
  185. */
  186. public function getSlotsMap()
  187. {
  188. if (!isset($this->slotsMap)) {
  189. $this->slotsMap = array();
  190. }
  191. return $this->slotsMap;
  192. }
  193. /**
  194. * Pre-associates a connection to a slots range to avoid runtime guessing.
  195. *
  196. * @param int $first Initial slot of the range.
  197. * @param int $last Last slot of the range.
  198. * @param NodeConnectionInterface|string $connection ID or connection instance.
  199. */
  200. public function setSlots($first, $last, $connection)
  201. {
  202. if ($first < 0x0000 || $first > 0x3FFF ||
  203. $last < 0x0000 || $last > 0x3FFF ||
  204. $last < $first
  205. ) {
  206. throw new OutOfBoundsException(
  207. "Invalid slot range for $connection: [$first-$last]."
  208. );
  209. }
  210. $slots = array_fill($first, $last - $first + 1, (string) $connection);
  211. $this->slotsMap = $this->getSlotsMap() + $slots;
  212. }
  213. /**
  214. * Guesses the correct node associated to a given slot using a precalculated
  215. * slots map, falling back to the same logic used by Redis to initialize a
  216. * cluster (best-effort).
  217. *
  218. * @param int $slot Slot index.
  219. * @return string Connection ID.
  220. */
  221. protected function guessNode($slot)
  222. {
  223. if (!isset($this->slotsMap)) {
  224. $this->buildSlotsMap();
  225. }
  226. if (isset($this->slotsMap[$slot])) {
  227. return $this->slotsMap[$slot];
  228. }
  229. $count = count($this->pool);
  230. $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
  231. $nodes = array_keys($this->pool);
  232. return $nodes[$index];
  233. }
  234. /**
  235. * Creates a new connection instance from the given connection ID.
  236. *
  237. * @param string $connectionID Identifier for the connection.
  238. * @return NodeConnectionInterface
  239. */
  240. protected function createConnection($connectionID)
  241. {
  242. $host = explode(':', $connectionID, 2);
  243. $parameters = array_merge($this->defaultParameters, array(
  244. 'host' => $host[0],
  245. 'port' => $host[1],
  246. ));
  247. $connection = $this->connections->create($parameters);
  248. return $connection;
  249. }
  250. /**
  251. * {@inheritdoc}
  252. */
  253. public function getConnection(CommandInterface $command)
  254. {
  255. $hash = $this->strategy->getHash($command);
  256. if (!isset($hash)) {
  257. throw new NotSupportedException(
  258. "Cannot use '{$command->getId()}' with redis-cluster."
  259. );
  260. }
  261. $slot = $hash & 0x3FFF;
  262. if (isset($this->slots[$slot])) {
  263. return $this->slots[$slot];
  264. } else {
  265. return $this->getConnectionBySlot($slot);
  266. }
  267. }
  268. /**
  269. * Returns the connection currently associated to a given slot.
  270. *
  271. * @param int $slot Slot index.
  272. * @return NodeConnectionInterface
  273. */
  274. public function getConnectionBySlot($slot)
  275. {
  276. if ($slot < 0x0000 || $slot > 0x3FFF) {
  277. throw new OutOfBoundsException("Invalid slot [$slot].");
  278. }
  279. if (isset($this->slots[$slot])) {
  280. return $this->slots[$slot];
  281. }
  282. $connectionID = $this->guessNode($slot);
  283. if (!$connection = $this->getConnectionById($connectionID)) {
  284. $connection = $this->createConnection($connectionID);
  285. $this->pool[$connectionID] = $connection;
  286. }
  287. return $this->slots[$slot] = $connection;
  288. }
  289. /**
  290. * {@inheritdoc}
  291. */
  292. public function getConnectionById($connectionID)
  293. {
  294. if (isset($this->pool[$connectionID])) {
  295. return $this->pool[$connectionID];
  296. }
  297. }
  298. /**
  299. * Returns a random connection from the pool.
  300. *
  301. * @return NodeConnectionInterface
  302. */
  303. protected function getRandomConnection()
  304. {
  305. if ($this->pool) {
  306. return $this->pool[array_rand($this->pool)];
  307. }
  308. }
  309. /**
  310. * Permanently associates the connection instance to a new slot.
  311. * The connection is added to the connections pool if not yet included.
  312. *
  313. * @param NodeConnectionInterface $connection Connection instance.
  314. * @param int $slot Target slot index.
  315. */
  316. protected function move(NodeConnectionInterface $connection, $slot)
  317. {
  318. $this->pool[(string) $connection] = $connection;
  319. $this->slots[(int) $slot] = $connection;
  320. }
  321. /**
  322. * Handles -ERR responses returned by Redis.
  323. *
  324. * @param CommandInterface $command Command that generated the -ERR response.
  325. * @param ErrorResponseInterface $error Redis error response object.
  326. * @return mixed
  327. */
  328. protected function onErrorResponse(CommandInterface $command, ErrorResponseInterface $error)
  329. {
  330. $details = explode(' ', $error->getMessage(), 2);
  331. switch ($details[0]) {
  332. case 'MOVED':
  333. return $this->onMovedResponse($command, $details[1]);
  334. case 'ASK':
  335. return $this->onAskResponse($command, $details[1]);
  336. default:
  337. return $error;
  338. }
  339. }
  340. /**
  341. * Handles -MOVED responses by executing again the command against the node
  342. * indicated by the Redis response.
  343. *
  344. * @param CommandInterface $command Command that generated the -MOVED response.
  345. * @param string $details Parameters of the -MOVED response.
  346. * @return mixed
  347. */
  348. protected function onMovedResponse(CommandInterface $command, $details)
  349. {
  350. list($slot, $connectionID) = explode(' ', $details, 2);
  351. if (!$connection = $this->getConnectionById($connectionID)) {
  352. $connection = $this->createConnection($connectionID);
  353. }
  354. if ($this->askSlotsMap) {
  355. $this->askSlotsMap();
  356. }
  357. $this->move($connection, $slot);
  358. $response = $this->executeCommand($command);
  359. return $response;
  360. }
  361. /**
  362. * Handles -ASK responses by executing again the command against the node
  363. * indicated by the Redis response.
  364. *
  365. * @param CommandInterface $command Command that generated the -ASK response.
  366. * @param string $details Parameters of the -ASK response.
  367. * @return mixed
  368. */
  369. protected function onAskResponse(CommandInterface $command, $details)
  370. {
  371. list($slot, $connectionID) = explode(' ', $details, 2);
  372. if (!$connection = $this->getConnectionById($connectionID)) {
  373. $connection = $this->createConnection($connectionID);
  374. }
  375. $connection->executeCommand(RawCommand::create('ASKING'));
  376. $response = $connection->executeCommand($command);
  377. return $response;
  378. }
  379. /**
  380. * {@inheritdoc}
  381. */
  382. public function writeRequest(CommandInterface $command)
  383. {
  384. $this->getConnection($command)->writeRequest($command);
  385. }
  386. /**
  387. * {@inheritdoc}
  388. */
  389. public function readResponse(CommandInterface $command)
  390. {
  391. return $this->getConnection($command)->readResponse($command);
  392. }
  393. /**
  394. * {@inheritdoc}
  395. */
  396. public function executeCommand(CommandInterface $command)
  397. {
  398. $connection = $this->getConnection($command);
  399. $response = $connection->executeCommand($command);
  400. if ($response instanceof ErrorResponseInterface) {
  401. return $this->onErrorResponse($command, $response);
  402. }
  403. return $response;
  404. }
  405. /**
  406. * {@inheritdoc}
  407. */
  408. public function count()
  409. {
  410. return count($this->pool);
  411. }
  412. /**
  413. * {@inheritdoc}
  414. */
  415. public function getIterator()
  416. {
  417. return new ArrayIterator(array_values($this->pool));
  418. }
  419. /**
  420. * Returns the underlying command hash strategy used to hash commands by
  421. * using keys found in their arguments.
  422. *
  423. * @return \Predis\Cluster\StrategyInterface
  424. */
  425. public function getClusterStrategy()
  426. {
  427. return $this->strategy;
  428. }
  429. /**
  430. * Enables automatic fetching of the current slots map from one of the nodes
  431. * using the CLUSTER NODES command. This option is disabled by default but
  432. * asking the current slots map to Redis upon -MOVE responses may reduce
  433. * overhead by eliminating the trial-and-error nature of the node guessing
  434. * procedure, mostly when targeting many keys that would end up in a lot of
  435. * redirections.
  436. *
  437. * The slots map can still be manually fetched using the askSlotsMap()
  438. * method whether or not this option is enabled.
  439. *
  440. * @param bool $value Enable or disable the use of CLUSTER NODES.
  441. */
  442. public function enableAutoSlotsMap($value)
  443. {
  444. $this->askSlotsMap = (bool) $value;
  445. }
  446. /**
  447. * Sets a default array of connection parameters to be applied when creating
  448. * new connection instances on the fly when they are not part of the initial
  449. * pool supplied upon cluster initialization.
  450. *
  451. * These parameters are not applied to connections added to the pool using
  452. * the add() method.
  453. *
  454. * @param array $parameters Array of connection parameters.
  455. */
  456. public function setDefaultParameters(array $parameters)
  457. {
  458. $this->defaultParameters = array_merge(
  459. $this->defaultParameters,
  460. $parameters ?: array()
  461. );
  462. }
  463. }