PredisCluster.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Predis\ClientException;
  12. use Predis\NotSupportedException;
  13. use Predis\Command\CommandInterface;
  14. use Predis\Command\Hash\CommandHashStrategy;
  15. use Predis\Distribution\HashRing;
  16. use Predis\Distribution\DistributionStrategyInterface;
  17. /**
  18. * Abstraction for a cluster of aggregated connections to various Redis servers
  19. * implementing client-side sharding based on pluggable distribution strategies.
  20. *
  21. * @author Daniele Alessandri <suppakilla@gmail.com>
  22. * @todo Add the ability to remove connections from pool.
  23. */
  24. class PredisCluster implements ClusterConnectionInterface, \IteratorAggregate, \Countable
  25. {
  26. private $pool;
  27. private $distributor;
  28. private $cmdHasher;
  29. /**
  30. * @param DistributionStrategyInterface $distributor Distribution strategy used by the cluster.
  31. */
  32. public function __construct(DistributionStrategyInterface $distributor = null)
  33. {
  34. $this->pool = array();
  35. $this->cmdHasher = new CommandHashStrategy();
  36. $this->distributor = $distributor ?: new HashRing();
  37. }
  38. /**
  39. * {@inheritdoc}
  40. */
  41. public function isConnected()
  42. {
  43. foreach ($this->pool as $connection) {
  44. if ($connection->isConnected()) {
  45. return true;
  46. }
  47. }
  48. return false;
  49. }
  50. /**
  51. * {@inheritdoc}
  52. */
  53. public function connect()
  54. {
  55. foreach ($this->pool as $connection) {
  56. $connection->connect();
  57. }
  58. }
  59. /**
  60. * {@inheritdoc}
  61. */
  62. public function disconnect()
  63. {
  64. foreach ($this->pool as $connection) {
  65. $connection->disconnect();
  66. }
  67. }
  68. /**
  69. * {@inheritdoc}
  70. */
  71. public function add(SingleConnectionInterface $connection)
  72. {
  73. $parameters = $connection->getParameters();
  74. if (isset($parameters->alias)) {
  75. $this->pool[$parameters->alias] = $connection;
  76. }
  77. else {
  78. $this->pool[] = $connection;
  79. }
  80. $weight = isset($parameters->weight) ? $parameters->weight : null;
  81. $this->distributor->add($connection, $weight);
  82. }
  83. /**
  84. * {@inheritdoc}
  85. */
  86. public function remove(SingleConnectionInterface $connection)
  87. {
  88. if (($id = array_search($connection, $this->pool, true)) !== false) {
  89. unset($this->pool[$id]);
  90. $this->distributor->remove($connection);
  91. return true;
  92. }
  93. return false;
  94. }
  95. /**
  96. * Removes a connection instance using its alias or index.
  97. *
  98. * @param string $connectionId Alias or index of a connection.
  99. * @return Boolean Returns true if the connection was in the pool.
  100. */
  101. public function removeById($connectionId)
  102. {
  103. if ($connection = $this->getConnectionById($connectionId)) {
  104. return $this->remove($connection);
  105. }
  106. return false;
  107. }
  108. /**
  109. * {@inheritdoc}
  110. */
  111. public function getConnection(CommandInterface $command)
  112. {
  113. $hash = $command->getHash();
  114. if (isset($hash)) {
  115. return $this->distributor->get($hash);
  116. }
  117. if ($hash = $this->cmdHasher->getHash($this->distributor, $command)) {
  118. $command->setHash($hash);
  119. return $this->distributor->get($hash);
  120. }
  121. throw new NotSupportedException("Cannot send {$command->getId()} to a cluster of connections");
  122. }
  123. /**
  124. * {@inheritdoc}
  125. */
  126. public function getConnectionById($id = null)
  127. {
  128. $alias = $id ?: 0;
  129. return isset($this->pool[$alias]) ? $this->pool[$alias] : null;
  130. }
  131. /**
  132. * Retrieves a connection instance from the cluster using a key.
  133. *
  134. * @param string $key Key of a Redis value.
  135. * @return SingleConnectionInterface
  136. */
  137. public function getConnectionByKey($key)
  138. {
  139. $hash = $this->cmdHasher->getKeyHash($this->distributor, $key);
  140. $node = $this->distributor->get($hash);
  141. return $node;
  142. }
  143. /**
  144. * Returns the underlying command hash strategy used to hash
  145. * commands by their keys.
  146. *
  147. * @return CommandHashStrategy
  148. */
  149. public function getCommandHashStrategy()
  150. {
  151. return $this->cmdHasher;
  152. }
  153. /**
  154. * {@inheritdoc}
  155. */
  156. public function count()
  157. {
  158. return count($this->pool);
  159. }
  160. /**
  161. * {@inheritdoc}
  162. */
  163. public function getIterator()
  164. {
  165. return new \ArrayIterator($this->pool);
  166. }
  167. /**
  168. * {@inheritdoc}
  169. */
  170. public function writeCommand(CommandInterface $command)
  171. {
  172. $this->getConnection($command)->writeCommand($command);
  173. }
  174. /**
  175. * {@inheritdoc}
  176. */
  177. public function readResponse(CommandInterface $command)
  178. {
  179. return $this->getConnection($command)->readResponse($command);
  180. }
  181. /**
  182. * {@inheritdoc}
  183. */
  184. public function executeCommand(CommandInterface $command)
  185. {
  186. return $this->getConnection($command)->executeCommand($command);
  187. }
  188. /**
  189. * Executes the specified Redis command on all the nodes of a cluster.
  190. *
  191. * @param CommandInterface $command A Redis command.
  192. * @return array
  193. */
  194. public function executeCommandOnNodes(CommandInterface $command)
  195. {
  196. $replies = array();
  197. foreach ($this->pool as $connection) {
  198. $replies[] = $connection->executeCommand($command);
  199. }
  200. return $replies;
  201. }
  202. }