RedisCluster.php 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use ArrayIterator;
  12. use OutOfBoundsException;
  13. use Predis\ClientException;
  14. use Predis\NotSupportedException;
  15. use Predis\Cluster;
  16. use Predis\Command\CommandInterface;
  17. use Predis\Response;
  18. /**
  19. * Abstraction for Redis cluster (Redis v3.0).
  20. *
  21. * @author Daniele Alessandri <suppakilla@gmail.com>
  22. */
  23. class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \Countable
  24. {
  25. private $pool;
  26. private $slots;
  27. private $slotsMap;
  28. private $slotsPerNode;
  29. private $strategy;
  30. private $connections;
  31. /**
  32. * @param ConnectionFactoryInterface $connections Connection factory object.
  33. */
  34. public function __construct(ConnectionFactoryInterface $connections = null)
  35. {
  36. $this->pool = array();
  37. $this->slots = array();
  38. $this->strategy = new Cluster\RedisStrategy();
  39. $this->connections = $connections ?: new ConnectionFactory();
  40. }
  41. /**
  42. * {@inheritdoc}
  43. */
  44. public function isConnected()
  45. {
  46. foreach ($this->pool as $connection) {
  47. if ($connection->isConnected()) {
  48. return true;
  49. }
  50. }
  51. return false;
  52. }
  53. /**
  54. * {@inheritdoc}
  55. */
  56. public function connect()
  57. {
  58. foreach ($this->pool as $connection) {
  59. $connection->connect();
  60. }
  61. }
  62. /**
  63. * {@inheritdoc}
  64. */
  65. public function disconnect()
  66. {
  67. foreach ($this->pool as $connection) {
  68. $connection->disconnect();
  69. }
  70. }
  71. /**
  72. * {@inheritdoc}
  73. */
  74. public function add(SingleConnectionInterface $connection)
  75. {
  76. $this->pool[(string) $connection] = $connection;
  77. unset(
  78. $this->slotsMap,
  79. $this->slotsPerNode
  80. );
  81. }
  82. /**
  83. * {@inheritdoc}
  84. */
  85. public function remove(SingleConnectionInterface $connection)
  86. {
  87. if (($id = array_search($connection, $this->pool, true)) !== false) {
  88. unset(
  89. $this->pool[$id],
  90. $this->slotsMap,
  91. $this->slotsPerNode
  92. );
  93. return true;
  94. }
  95. return false;
  96. }
  97. /**
  98. * Removes a connection instance using its alias or index.
  99. *
  100. * @param string $connectionId Alias or index of a connection.
  101. * @return Boolean Returns true if the connection was in the pool.
  102. */
  103. public function removeById($connectionId)
  104. {
  105. if (isset($this->pool[$connectionId])) {
  106. unset(
  107. $this->pool[$connectionId],
  108. $this->slotsMap,
  109. $this->slotsPerNode
  110. );
  111. return true;
  112. }
  113. return false;
  114. }
  115. /**
  116. * Builds the slots map for the cluster.
  117. *
  118. * @return array
  119. */
  120. public function buildSlotsMap()
  121. {
  122. $this->slotsMap = array();
  123. $this->slotsPerNode = (int) (16384 / count($this->pool));
  124. foreach ($this->pool as $connectionID => $connection) {
  125. $parameters = $connection->getParameters();
  126. if (!isset($parameters->slots)) {
  127. continue;
  128. }
  129. list($first, $last) = explode('-', $parameters->slots, 2);
  130. $this->setSlots($first, $last, $connectionID);
  131. }
  132. return $this->slotsMap;
  133. }
  134. /**
  135. * Returns the current slots map for the cluster.
  136. *
  137. * @return array
  138. */
  139. public function getSlotsMap()
  140. {
  141. if (!isset($this->slotsMap)) {
  142. $this->slotsMap = array();
  143. }
  144. return $this->slotsMap;
  145. }
  146. /**
  147. * Preassociate a connection to a set of slots to avoid runtime guessing.
  148. *
  149. * @todo Check type or existence of the specified connection.
  150. * @todo Cluster loses the slots assigned with this methods when adding / removing connections.
  151. *
  152. * @param int $first Initial slot.
  153. * @param int $last Last slot.
  154. * @param SingleConnectionInterface|string $connection ID or connection instance.
  155. */
  156. public function setSlots($first, $last, $connection)
  157. {
  158. if ($first < 0x0000 || $first > 0x3FFF || $last < 0x0000 || $last > 0x3FFF || $last < $first) {
  159. throw new OutOfBoundsException("Invalid slot values for $connection: [$first-$last]");
  160. }
  161. $this->slotsMap = $this->getSlotsMap() + array_fill($first, $last - $first + 1, (string) $connection);
  162. }
  163. /**
  164. * {@inheritdoc}
  165. */
  166. public function getConnection(CommandInterface $command)
  167. {
  168. $hash = $this->strategy->getHash($command);
  169. if (!isset($hash)) {
  170. throw new NotSupportedException("Cannot use {$command->getId()} with redis-cluster");
  171. }
  172. $slot = $hash & 0x3FFF;
  173. if (isset($this->slots[$slot])) {
  174. return $this->slots[$slot];
  175. }
  176. $this->slots[$slot] = $connection = $this->pool[$this->guessNode($slot)];
  177. return $connection;
  178. }
  179. /**
  180. * Returns the connection associated to the specified slot.
  181. *
  182. * @param int $slot Slot ID.
  183. * @return SingleConnectionInterface
  184. */
  185. public function getConnectionBySlot($slot)
  186. {
  187. if ($slot < 0x0000 || $slot > 0x3FFF) {
  188. throw new \OutOfBoundsException("Invalid slot value [$slot]");
  189. }
  190. if (isset($this->slots[$slot])) {
  191. return $this->slots[$slot];
  192. }
  193. return $this->pool[$this->guessNode($slot)];
  194. }
  195. /**
  196. * {@inheritdoc}
  197. */
  198. public function getConnectionById($connectionId)
  199. {
  200. return isset($this->pool[$connectionId]) ? $this->pool[$connectionId] : null;
  201. }
  202. /**
  203. * Tries guessing the correct node associated to the given slot using a precalculated
  204. * slots map or the same logic used by redis-trib to initialize a redis cluster.
  205. *
  206. * @param int $slot Slot ID.
  207. * @return string
  208. */
  209. protected function guessNode($slot)
  210. {
  211. if (!isset($this->slotsMap)) {
  212. $this->buildSlotsMap();
  213. }
  214. if (isset($this->slotsMap[$slot])) {
  215. return $this->slotsMap[$slot];
  216. }
  217. $index = min((int) ($slot / $this->slotsPerNode), count($this->pool) - 1);
  218. $nodes = array_keys($this->pool);
  219. return $nodes[$index];
  220. }
  221. /**
  222. * Handles -MOVED or -ASK replies by re-executing the command on the server
  223. * specified by the Redis reply.
  224. *
  225. * @param CommandInterface $command Command that generated the -MOVE or -ASK reply.
  226. * @param string $request Type of request (either 'MOVED' or 'ASK').
  227. * @param string $details Parameters of the MOVED/ASK request.
  228. * @return mixed
  229. */
  230. protected function onMoveRequest(CommandInterface $command, $request, $details)
  231. {
  232. list($slot, $host) = explode(' ', $details, 2);
  233. $connection = $this->getConnectionById($host);
  234. if (!isset($connection)) {
  235. $parameters = array('host' => null, 'port' => null);
  236. list($parameters['host'], $parameters['port']) = explode(':', $host, 2);
  237. $connection = $this->connections->create($parameters);
  238. }
  239. switch ($request) {
  240. case 'MOVED':
  241. $this->move($connection, $slot);
  242. return $this->executeCommand($command);
  243. case 'ASK':
  244. return $connection->executeCommand($command);
  245. default:
  246. throw new ClientException("Unexpected request type for a move request: $request");
  247. }
  248. }
  249. /**
  250. * Assign the connection instance to a new slot and adds it to the
  251. * pool if the connection was not already part of the pool.
  252. *
  253. * @param SingleConnectionInterface $connection Connection instance
  254. * @param int $slot Target slot.
  255. */
  256. protected function move(SingleConnectionInterface $connection, $slot)
  257. {
  258. $this->pool[(string) $connection] = $connection;
  259. $this->slots[(int) $slot] = $connection;
  260. }
  261. /**
  262. * Returns the underlying hash strategy used to hash commands by their keys.
  263. *
  264. * @return Cluster\StrategyInterface
  265. */
  266. public function getClusterStrategy()
  267. {
  268. return $this->strategy;
  269. }
  270. /**
  271. * {@inheritdoc}
  272. */
  273. public function count()
  274. {
  275. return count($this->pool);
  276. }
  277. /**
  278. * {@inheritdoc}
  279. */
  280. public function getIterator()
  281. {
  282. return new ArrayIterator(array_values($this->pool));
  283. }
  284. /**
  285. * Handles -ERR replies from Redis.
  286. *
  287. * @param CommandInterface $command Command that generated the -ERR reply.
  288. * @param Response\ErrorInterface $error Redis error reply object.
  289. * @return mixed
  290. */
  291. protected function handleServerError(CommandInterface $command, Response\ErrorInterface $error)
  292. {
  293. list($type, $details) = explode(' ', $error->getMessage(), 2);
  294. switch ($type) {
  295. case 'MOVED':
  296. case 'ASK':
  297. return $this->onMoveRequest($command, $type, $details);
  298. default:
  299. return $error;
  300. }
  301. }
  302. /**
  303. * {@inheritdoc}
  304. */
  305. public function writeRequest(CommandInterface $command)
  306. {
  307. $this->getConnection($command)->writeRequest($command);
  308. }
  309. /**
  310. * {@inheritdoc}
  311. */
  312. public function readResponse(CommandInterface $command)
  313. {
  314. return $this->getConnection($command)->readResponse($command);
  315. }
  316. /**
  317. * {@inheritdoc}
  318. */
  319. public function executeCommand(CommandInterface $command)
  320. {
  321. $connection = $this->getConnection($command);
  322. $reply = $connection->executeCommand($command);
  323. if ($reply instanceof Response\ErrorInterface) {
  324. return $this->handleServerError($command, $reply);
  325. }
  326. return $reply;
  327. }
  328. }