PredisCluster.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Countable;
  12. use IteratorAggregate;
  13. use Predis\NotSupportedException;
  14. use Predis\Cluster\PredisStrategy as PredisClusterStrategy;
  15. use Predis\Cluster\StrategyInterface as ClusterStrategyInterface;
  16. use Predis\Cluster\Distributor\DistributorInterface;
  17. use Predis\Cluster\Distributor\HashRing;
  18. use Predis\Command\CommandInterface;
  19. /**
  20. * Abstraction for a cluster of aggregate connections to various Redis servers
  21. * implementing client-side sharding based on pluggable distribution strategies.
  22. *
  23. * @author Daniele Alessandri <suppakilla@gmail.com>
  24. * @todo Add the ability to remove connections from pool.
  25. */
  26. class PredisCluster implements ClusterConnectionInterface, IteratorAggregate, Countable
  27. {
  28. private $pool;
  29. private $strategy;
  30. private $distributor;
  31. /**
  32. * @param DistributorInterface $distributor Distributor instance.
  33. */
  34. public function __construct(DistributorInterface $distributor = null)
  35. {
  36. $distributor = $distributor ?: new HashRing();
  37. $this->pool = array();
  38. $this->strategy = new PredisClusterStrategy($distributor->getHashGenerator());
  39. $this->distributor = $distributor;
  40. }
  41. /**
  42. * {@inheritdoc}
  43. */
  44. public function isConnected()
  45. {
  46. foreach ($this->pool as $connection) {
  47. if ($connection->isConnected()) {
  48. return true;
  49. }
  50. }
  51. return false;
  52. }
  53. /**
  54. * {@inheritdoc}
  55. */
  56. public function connect()
  57. {
  58. foreach ($this->pool as $connection) {
  59. $connection->connect();
  60. }
  61. }
  62. /**
  63. * {@inheritdoc}
  64. */
  65. public function disconnect()
  66. {
  67. foreach ($this->pool as $connection) {
  68. $connection->disconnect();
  69. }
  70. }
  71. /**
  72. * {@inheritdoc}
  73. */
  74. public function add(SingleConnectionInterface $connection)
  75. {
  76. $parameters = $connection->getParameters();
  77. if (isset($parameters->alias)) {
  78. $this->pool[$parameters->alias] = $connection;
  79. } else {
  80. $this->pool[] = $connection;
  81. }
  82. $weight = isset($parameters->weight) ? $parameters->weight : null;
  83. $this->distributor->add($connection, $weight);
  84. }
  85. /**
  86. * {@inheritdoc}
  87. */
  88. public function remove(SingleConnectionInterface $connection)
  89. {
  90. if (($id = array_search($connection, $this->pool, true)) !== false) {
  91. unset($this->pool[$id]);
  92. $this->distributor->remove($connection);
  93. return true;
  94. }
  95. return false;
  96. }
  97. /**
  98. * Removes a connection instance using its alias or index.
  99. *
  100. * @param string $connectionID Alias or index of a connection.
  101. * @return bool Returns true if the connection was in the pool.
  102. */
  103. public function removeById($connectionID)
  104. {
  105. if ($connection = $this->getConnectionById($connectionID)) {
  106. return $this->remove($connection);
  107. }
  108. return false;
  109. }
  110. /**
  111. * {@inheritdoc}
  112. */
  113. public function getConnection(CommandInterface $command)
  114. {
  115. $hash = $this->strategy->getHash($command);
  116. if (!isset($hash)) {
  117. throw new NotSupportedException(
  118. "Cannot use '{$command->getId()}' over clusters of connections."
  119. );
  120. }
  121. $node = $this->distributor->get($hash);
  122. return $node;
  123. }
  124. /**
  125. * {@inheritdoc}
  126. */
  127. public function getConnectionById($connectionID)
  128. {
  129. return isset($this->pool[$connectionID]) ? $this->pool[$connectionID] : null;
  130. }
  131. /**
  132. * Retrieves a connection instance from the cluster using a key.
  133. *
  134. * @param string $key Key string.
  135. * @return SingleConnectionInterface
  136. */
  137. public function getConnectionByKey($key)
  138. {
  139. $hash = $this->strategy->getKeyHash($key);
  140. $node = $this->distributor->get($hash);
  141. return $node;
  142. }
  143. /**
  144. * Returns the underlying command hash strategy used to hash commands by
  145. * using keys found in their arguments.
  146. *
  147. * @return ClusterStrategyInterface
  148. */
  149. public function getClusterStrategy()
  150. {
  151. return $this->strategy;
  152. }
  153. /**
  154. * {@inheritdoc}
  155. */
  156. public function count()
  157. {
  158. return count($this->pool);
  159. }
  160. /**
  161. * {@inheritdoc}
  162. */
  163. public function getIterator()
  164. {
  165. return new \ArrayIterator($this->pool);
  166. }
  167. /**
  168. * {@inheritdoc}
  169. */
  170. public function writeRequest(CommandInterface $command)
  171. {
  172. $this->getConnection($command)->writeRequest($command);
  173. }
  174. /**
  175. * {@inheritdoc}
  176. */
  177. public function readResponse(CommandInterface $command)
  178. {
  179. return $this->getConnection($command)->readResponse($command);
  180. }
  181. /**
  182. * {@inheritdoc}
  183. */
  184. public function executeCommand(CommandInterface $command)
  185. {
  186. return $this->getConnection($command)->executeCommand($command);
  187. }
  188. /**
  189. * Executes the specified Redis command on all the nodes of a cluster.
  190. *
  191. * @param CommandInterface $command A Redis command.
  192. * @return array
  193. */
  194. public function executeCommandOnNodes(CommandInterface $command)
  195. {
  196. $responses = array();
  197. foreach ($this->pool as $connection) {
  198. $responses[] = $connection->executeCommand($command);
  199. }
  200. return $responses;
  201. }
  202. }