PredisCluster.php 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Network;
  11. use Predis\Helpers;
  12. use Predis\ClientException;
  13. use Predis\Commands\ICommand;
  14. use Predis\Distribution\IDistributionStrategy;
  15. use Predis\Distribution\HashRing;
  16. /**
  17. * Abstraction for a cluster of aggregated connections to various Redis servers
  18. * implementing client-side sharding based on pluggable distribution strategies.
  19. *
  20. * @author Daniele Alessandri <suppakilla@gmail.com>
  21. */
  22. class PredisCluster implements IConnectionCluster, \IteratorAggregate
  23. {
  24. private $pool;
  25. private $distributor;
  26. /**
  27. * @param IDistributionStrategy $distributor Distribution strategy used by the cluster.
  28. */
  29. public function __construct(IDistributionStrategy $distributor = null)
  30. {
  31. $this->pool = array();
  32. $this->distributor = $distributor ?: new HashRing();
  33. }
  34. /**
  35. * {@inheritdoc}
  36. */
  37. public function isConnected()
  38. {
  39. foreach ($this->pool as $connection) {
  40. if ($connection->isConnected()) {
  41. return true;
  42. }
  43. }
  44. return false;
  45. }
  46. /**
  47. * {@inheritdoc}
  48. */
  49. public function connect()
  50. {
  51. foreach ($this->pool as $connection) {
  52. $connection->connect();
  53. }
  54. }
  55. /**
  56. * {@inheritdoc}
  57. */
  58. public function disconnect()
  59. {
  60. foreach ($this->pool as $connection) {
  61. $connection->disconnect();
  62. }
  63. }
  64. /**
  65. * {@inheritdoc}
  66. */
  67. public function add(IConnectionSingle $connection)
  68. {
  69. $parameters = $connection->getParameters();
  70. if (isset($parameters->alias)) {
  71. $this->pool[$parameters->alias] = $connection;
  72. }
  73. else {
  74. $this->pool[] = $connection;
  75. }
  76. $this->distributor->add($connection, $parameters->weight);
  77. }
  78. /**
  79. * {@inheritdoc}
  80. */
  81. public function getConnection(ICommand $command)
  82. {
  83. $cmdHash = $command->getHash($this->distributor);
  84. if (isset($cmdHash)) {
  85. return $this->distributor->get($cmdHash);
  86. }
  87. throw new ClientException(
  88. sprintf("Cannot send '%s' commands to a cluster of connections", $command->getId())
  89. );
  90. }
  91. /**
  92. * {@inheritdoc}
  93. */
  94. public function getConnectionById($id = null)
  95. {
  96. $alias = $id ?: 0;
  97. return isset($this->pool[$alias]) ? $this->pool[$alias] : null;
  98. }
  99. /**
  100. * Retrieves a connection instance from the cluster using a key.
  101. *
  102. * @param string $key Key of a Redis value.
  103. * @return IConnectionSingle
  104. */
  105. public function getConnectionByKey($key)
  106. {
  107. $hashablePart = Helpers::getKeyHashablePart($key);
  108. $keyHash = $this->distributor->generateKey($hashablePart);
  109. return $this->distributor->get($keyHash);
  110. }
  111. /**
  112. * {@inheritdoc}
  113. */
  114. public function getIterator()
  115. {
  116. return new \ArrayIterator($this->pool);
  117. }
  118. /**
  119. * {@inheritdoc}
  120. */
  121. public function writeCommand(ICommand $command)
  122. {
  123. $this->getConnection($command)->writeCommand($command);
  124. }
  125. /**
  126. * {@inheritdoc}
  127. */
  128. public function readResponse(ICommand $command)
  129. {
  130. return $this->getConnection($command)->readResponse($command);
  131. }
  132. /**
  133. * {@inheritdoc}
  134. */
  135. public function executeCommand(ICommand $command)
  136. {
  137. return $this->getConnection($command)->executeCommand($command);
  138. }
  139. }