RedisCluster.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use ArrayIterator;
  12. use Countable;
  13. use IteratorAggregate;
  14. use OutOfBoundsException;
  15. use Predis\ClientException;
  16. use Predis\NotSupportedException;
  17. use Predis\Cluster;
  18. use Predis\Command\CommandInterface;
  19. use Predis\Command\RawCommand;
  20. use Predis\Response;
  21. /**
  22. * Abstraction for Redis cluster (Redis v3.0).
  23. *
  24. * @author Daniele Alessandri <suppakilla@gmail.com>
  25. */
  26. class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Countable
  27. {
  28. private $askSlotsMap = false;
  29. private $pool = array();
  30. private $slots = array();
  31. private $slotsMap;
  32. private $strategy;
  33. private $connections;
  34. /**
  35. * @param ConnectionFactoryInterface $connections Connection factory object.
  36. */
  37. public function __construct(ConnectionFactoryInterface $connections = null)
  38. {
  39. $this->strategy = new Cluster\RedisStrategy();
  40. $this->connections = $connections ?: new ConnectionFactory();
  41. }
  42. /**
  43. * {@inheritdoc}
  44. */
  45. public function isConnected()
  46. {
  47. foreach ($this->pool as $connection) {
  48. if ($connection->isConnected()) {
  49. return true;
  50. }
  51. }
  52. return false;
  53. }
  54. /**
  55. * {@inheritdoc}
  56. */
  57. public function connect()
  58. {
  59. if ($connection = $this->getRandomConnection()) {
  60. $connection->connect();
  61. }
  62. }
  63. /**
  64. * {@inheritdoc}
  65. */
  66. public function disconnect()
  67. {
  68. foreach ($this->pool as $connection) {
  69. $connection->disconnect();
  70. }
  71. }
  72. /**
  73. * {@inheritdoc}
  74. */
  75. public function add(SingleConnectionInterface $connection)
  76. {
  77. $this->pool[(string) $connection] = $connection;
  78. unset($this->slotsMap);
  79. }
  80. /**
  81. * {@inheritdoc}
  82. */
  83. public function remove(SingleConnectionInterface $connection)
  84. {
  85. if (false !== $id = array_search($connection, $this->pool, true)) {
  86. unset(
  87. $this->pool[$id],
  88. $this->slotsMap
  89. );
  90. return true;
  91. }
  92. return false;
  93. }
  94. /**
  95. * Removes a connection instance using its alias or index.
  96. *
  97. * @param string $connectionID Alias or index of a connection.
  98. * @return bool Returns true if the connection was in the pool.
  99. */
  100. public function removeById($connectionID)
  101. {
  102. if (isset($this->pool[$connectionID])) {
  103. unset(
  104. $this->pool[$connectionID],
  105. $this->slotsMap
  106. );
  107. return true;
  108. }
  109. return false;
  110. }
  111. /**
  112. * Builds the slots map for the cluster.
  113. */
  114. public function buildSlotsMap()
  115. {
  116. $this->slotsMap = array();
  117. foreach ($this->pool as $connectionID => $connection) {
  118. $parameters = $connection->getParameters();
  119. if (!isset($parameters->slots)) {
  120. continue;
  121. }
  122. $slots = explode('-', $parameters->slots, 2);
  123. $this->setSlots($slots[0], $slots[1], $connectionID);
  124. }
  125. }
  126. /**
  127. * Builds the slots map for the cluster by asking the current configuration
  128. * to one of the nodes in the cluster.
  129. */
  130. public function askSlotsMap()
  131. {
  132. if (!$connection = $this->getRandomConnection()) {
  133. return array();
  134. }
  135. $cmdCluster = RawCommand::create('CLUSTER', 'NODES');
  136. $response = $connection->executeCommand($cmdCluster);
  137. $nodes = explode("\n", $response, -1);
  138. $count = count($nodes);
  139. for ($i = 0; $i < $count; $i++) {
  140. $node = explode(' ', $nodes[$i], 9);
  141. $slots = explode('-', $node[8], 2);
  142. if ($node[1] === ':0') {
  143. $this->setSlots($slots[0], $slots[1], (string) $connection);
  144. } else {
  145. $this->setSlots($slots[0], $slots[1], $node[1]);
  146. }
  147. }
  148. }
  149. /**
  150. * Returns the current slots map for the cluster.
  151. *
  152. * @return array
  153. */
  154. public function getSlotsMap()
  155. {
  156. if (!isset($this->slotsMap)) {
  157. $this->slotsMap = array();
  158. }
  159. return $this->slotsMap;
  160. }
  161. /**
  162. * Preassociate a connection to a set of slots to avoid runtime guessing.
  163. *
  164. * @param int $first Initial slot.
  165. * @param int $last Last slot.
  166. * @param SingleConnectionInterface|string $connection ID or connection instance.
  167. */
  168. public function setSlots($first, $last, $connection)
  169. {
  170. if ($first < 0x0000 || $first > 0x3FFF ||
  171. $last < 0x0000 || $last > 0x3FFF ||
  172. $last < $first
  173. ) {
  174. throw new OutOfBoundsException(
  175. "Invalid slot range for $connection: [$first-$last]"
  176. );
  177. }
  178. $slots = array_fill($first, $last - $first + 1, (string) $connection);
  179. $this->slotsMap = $this->getSlotsMap() + $slots;
  180. }
  181. /**
  182. * {@inheritdoc}
  183. */
  184. public function getConnection(CommandInterface $command)
  185. {
  186. $hash = $this->strategy->getHash($command);
  187. if (!isset($hash)) {
  188. throw new NotSupportedException(
  189. "Cannot use {$command->getId()} with redis-cluster"
  190. );
  191. }
  192. $slot = $hash & 0x3FFF;
  193. if (isset($this->slots[$slot])) {
  194. return $this->slots[$slot];
  195. } else {
  196. return $this->getConnectionBySlot($slot);
  197. }
  198. }
  199. /**
  200. * Returns a random connection from the pool.
  201. *
  202. * @return SingleConnectionInterface
  203. */
  204. protected function getRandomConnection()
  205. {
  206. if ($this->pool) {
  207. return $this->pool[array_rand($this->pool)];
  208. }
  209. }
  210. /**
  211. * Returns the connection associated to the specified slot.
  212. *
  213. * @param int $slot Slot ID.
  214. * @return SingleConnectionInterface
  215. */
  216. public function getConnectionBySlot($slot)
  217. {
  218. if ($slot < 0x0000 || $slot > 0x3FFF) {
  219. throw new OutOfBoundsException("Invalid slot [$slot]");
  220. }
  221. if (isset($this->slots[$slot])) {
  222. return $this->slots[$slot];
  223. }
  224. $connectionID = $this->guessNode($slot);
  225. if (!$connection = $this->getConnectionById($connectionID)) {
  226. $host = explode(':', $connectionID, 2);
  227. $connection = $this->connections->create(array(
  228. 'host' => $host[0],
  229. 'port' => $host[1],
  230. ));
  231. $this->pool[$connectionID] = $connection;
  232. }
  233. return $this->slots[$slot] = $connection;
  234. }
  235. /**
  236. * {@inheritdoc}
  237. */
  238. public function getConnectionById($connectionID)
  239. {
  240. if (isset($this->pool[$connectionID])) {
  241. return $this->pool[$connectionID];
  242. }
  243. }
  244. /**
  245. * Tries guessing the correct node associated to the given slot using a precalculated
  246. * slots map or the same logic used by redis-trib to initialize a redis cluster.
  247. *
  248. * @param int $slot Slot ID.
  249. * @return string
  250. */
  251. protected function guessNode($slot)
  252. {
  253. if (!isset($this->slotsMap)) {
  254. $this->buildSlotsMap();
  255. }
  256. if (isset($this->slotsMap[$slot])) {
  257. return $this->slotsMap[$slot];
  258. }
  259. $count = count($this->pool);
  260. $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
  261. $nodes = array_keys($this->pool);
  262. return $nodes[$index];
  263. }
  264. /**
  265. * Handles -MOVED and -ASK responses by re-executing the command on the node
  266. * specified by the Redis response.
  267. *
  268. * @param CommandInterface $command Command that generated the -MOVE or -ASK response.
  269. * @param string $request Type of request (either 'MOVED' or 'ASK').
  270. * @param string $details Parameters of the MOVED/ASK request.
  271. * @return mixed
  272. */
  273. protected function onMoveRequest(CommandInterface $command, $request, $details)
  274. {
  275. list($slot, $host) = explode(' ', $details, 2);
  276. $connection = $this->getConnectionById($host);
  277. if (!$connection) {
  278. $host = explode(':', $host, 2);
  279. $connection = $this->connections->create(array(
  280. 'host' => $host[0],
  281. 'port' => $host[1],
  282. ));
  283. }
  284. switch ($request) {
  285. case 'MOVED':
  286. if ($this->askSlotsMap) {
  287. $this->askSlotsMap();
  288. }
  289. $this->move($connection, $slot);
  290. $response = $this->executeCommand($command);
  291. return $response;
  292. case 'ASK':
  293. $connection->executeCommand(RawCommand::create('ASKING'));
  294. $response = $connection->executeCommand($command);
  295. return $response;
  296. default:
  297. throw new ClientException("Unexpected request type for a move request: $request");
  298. }
  299. }
  300. /**
  301. * Assign the connection instance to a new slot and adds it to the
  302. * pool if the connection was not already part of the pool.
  303. *
  304. * @param SingleConnectionInterface $connection Connection instance
  305. * @param int $slot Target slot.
  306. */
  307. protected function move(SingleConnectionInterface $connection, $slot)
  308. {
  309. $this->pool[(string) $connection] = $connection;
  310. $this->slots[(int) $slot] = $connection;
  311. }
  312. /**
  313. * Returns the underlying hash strategy used to hash commands by their keys.
  314. *
  315. * @return Cluster\StrategyInterface
  316. */
  317. public function getClusterStrategy()
  318. {
  319. return $this->strategy;
  320. }
  321. /**
  322. * {@inheritdoc}
  323. */
  324. public function count()
  325. {
  326. return count($this->pool);
  327. }
  328. /**
  329. * {@inheritdoc}
  330. */
  331. public function getIterator()
  332. {
  333. return new ArrayIterator(array_values($this->pool));
  334. }
  335. /**
  336. * Handles -ERR responses from Redis.
  337. *
  338. * @param CommandInterface $command Command that generated the -ERR response.
  339. * @param Response\ErrorInterface $error Redis error response object.
  340. * @return mixed
  341. */
  342. protected function onErrorResponse(CommandInterface $command, Response\ErrorInterface $error)
  343. {
  344. $details = explode(' ', $error->getMessage(), 2);
  345. switch ($details[0]) {
  346. case 'MOVED':
  347. case 'ASK':
  348. return $this->onMoveRequest($command, $details[0], $details[1]);
  349. default:
  350. return $error;
  351. }
  352. }
  353. /**
  354. * {@inheritdoc}
  355. */
  356. public function writeCommand(CommandInterface $command)
  357. {
  358. $this->getConnection($command)->writeCommand($command);
  359. }
  360. /**
  361. * {@inheritdoc}
  362. */
  363. public function readResponse(CommandInterface $command)
  364. {
  365. return $this->getConnection($command)->readResponse($command);
  366. }
  367. /**
  368. * {@inheritdoc}
  369. */
  370. public function executeCommand(CommandInterface $command)
  371. {
  372. $connection = $this->getConnection($command);
  373. $response = $connection->executeCommand($command);
  374. if ($response instanceof Response\ErrorInterface) {
  375. return $this->onErrorResponse($command, $response);
  376. }
  377. return $response;
  378. }
  379. /**
  380. * Instruct the cluster to fetch the slots map from one of the nodes.
  381. *
  382. * @param bool $value Enable or disable fetching the slots map.
  383. */
  384. public function setAskSlotsMap($value)
  385. {
  386. $this->askSlotsMap = (bool) $value;
  387. }
  388. }