PredisCluster.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use ArrayIterator;
  12. use Countable;
  13. use IteratorAggregate;
  14. use Predis\NotSupportedException;
  15. use Predis\Cluster\PredisStrategy as PredisClusterStrategy;
  16. use Predis\Cluster\StrategyInterface as ClusterStrategyInterface;
  17. use Predis\Cluster\Distributor\DistributorInterface;
  18. use Predis\Cluster\Distributor\HashRing;
  19. use Predis\Command\CommandInterface;
  20. /**
  21. * Abstraction for a cluster of aggregate connections to various Redis servers
  22. * implementing client-side sharding based on pluggable distribution strategies.
  23. *
  24. * @author Daniele Alessandri <suppakilla@gmail.com>
  25. * @todo Add the ability to remove connections from pool.
  26. */
  27. class PredisCluster implements ClusterConnectionInterface, IteratorAggregate, Countable
  28. {
  29. private $pool;
  30. private $strategy;
  31. private $distributor;
  32. /**
  33. * @param DistributorInterface $distributor Distributor instance.
  34. */
  35. public function __construct(DistributorInterface $distributor = null)
  36. {
  37. $distributor = $distributor ?: new HashRing();
  38. $this->pool = array();
  39. $this->strategy = new PredisClusterStrategy($distributor->getHashGenerator());
  40. $this->distributor = $distributor;
  41. }
  42. /**
  43. * {@inheritdoc}
  44. */
  45. public function isConnected()
  46. {
  47. foreach ($this->pool as $connection) {
  48. if ($connection->isConnected()) {
  49. return true;
  50. }
  51. }
  52. return false;
  53. }
  54. /**
  55. * {@inheritdoc}
  56. */
  57. public function connect()
  58. {
  59. foreach ($this->pool as $connection) {
  60. $connection->connect();
  61. }
  62. }
  63. /**
  64. * {@inheritdoc}
  65. */
  66. public function disconnect()
  67. {
  68. foreach ($this->pool as $connection) {
  69. $connection->disconnect();
  70. }
  71. }
  72. /**
  73. * {@inheritdoc}
  74. */
  75. public function add(SingleConnectionInterface $connection)
  76. {
  77. $parameters = $connection->getParameters();
  78. if (isset($parameters->alias)) {
  79. $this->pool[$parameters->alias] = $connection;
  80. } else {
  81. $this->pool[] = $connection;
  82. }
  83. $weight = isset($parameters->weight) ? $parameters->weight : null;
  84. $this->distributor->add($connection, $weight);
  85. }
  86. /**
  87. * {@inheritdoc}
  88. */
  89. public function remove(SingleConnectionInterface $connection)
  90. {
  91. if (($id = array_search($connection, $this->pool, true)) !== false) {
  92. unset($this->pool[$id]);
  93. $this->distributor->remove($connection);
  94. return true;
  95. }
  96. return false;
  97. }
  98. /**
  99. * Removes a connection instance using its alias or index.
  100. *
  101. * @param string $connectionID Alias or index of a connection.
  102. * @return bool Returns true if the connection was in the pool.
  103. */
  104. public function removeById($connectionID)
  105. {
  106. if ($connection = $this->getConnectionById($connectionID)) {
  107. return $this->remove($connection);
  108. }
  109. return false;
  110. }
  111. /**
  112. * {@inheritdoc}
  113. */
  114. public function getConnection(CommandInterface $command)
  115. {
  116. $hash = $this->strategy->getHash($command);
  117. if (!isset($hash)) {
  118. throw new NotSupportedException(
  119. "Cannot use '{$command->getId()}' over clusters of connections."
  120. );
  121. }
  122. $node = $this->distributor->get($hash);
  123. return $node;
  124. }
  125. /**
  126. * {@inheritdoc}
  127. */
  128. public function getConnectionById($connectionID)
  129. {
  130. return isset($this->pool[$connectionID]) ? $this->pool[$connectionID] : null;
  131. }
  132. /**
  133. * Retrieves a connection instance from the cluster using a key.
  134. *
  135. * @param string $key Key string.
  136. * @return SingleConnectionInterface
  137. */
  138. public function getConnectionByKey($key)
  139. {
  140. $hash = $this->strategy->getKeyHash($key);
  141. $node = $this->distributor->get($hash);
  142. return $node;
  143. }
  144. /**
  145. * Returns the underlying command hash strategy used to hash commands by
  146. * using keys found in their arguments.
  147. *
  148. * @return ClusterStrategyInterface
  149. */
  150. public function getClusterStrategy()
  151. {
  152. return $this->strategy;
  153. }
  154. /**
  155. * {@inheritdoc}
  156. */
  157. public function count()
  158. {
  159. return count($this->pool);
  160. }
  161. /**
  162. * {@inheritdoc}
  163. */
  164. public function getIterator()
  165. {
  166. return new ArrayIterator($this->pool);
  167. }
  168. /**
  169. * {@inheritdoc}
  170. */
  171. public function writeRequest(CommandInterface $command)
  172. {
  173. $this->getConnection($command)->writeRequest($command);
  174. }
  175. /**
  176. * {@inheritdoc}
  177. */
  178. public function readResponse(CommandInterface $command)
  179. {
  180. return $this->getConnection($command)->readResponse($command);
  181. }
  182. /**
  183. * {@inheritdoc}
  184. */
  185. public function executeCommand(CommandInterface $command)
  186. {
  187. return $this->getConnection($command)->executeCommand($command);
  188. }
  189. /**
  190. * Executes the specified Redis command on all the nodes of a cluster.
  191. *
  192. * @param CommandInterface $command A Redis command.
  193. * @return array
  194. */
  195. public function executeCommandOnNodes(CommandInterface $command)
  196. {
  197. $responses = array();
  198. foreach ($this->pool as $connection) {
  199. $responses[] = $connection->executeCommand($command);
  200. }
  201. return $responses;
  202. }
  203. }