PredisCluster.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Network;
  11. use Predis\Helpers;
  12. use Predis\ClientException;
  13. use Predis\Commands\ICommand;
  14. use Predis\Distribution\IDistributionStrategy;
  15. use Predis\Distribution\HashRing;
  16. class PredisCluster implements IConnectionCluster, \IteratorAggregate
  17. {
  18. private $_pool;
  19. private $_distributor;
  20. public function __construct(IDistributionStrategy $distributor = null)
  21. {
  22. $this->_pool = array();
  23. $this->_distributor = $distributor ?: new HashRing();
  24. }
  25. public function isConnected()
  26. {
  27. foreach ($this->_pool as $connection) {
  28. if ($connection->isConnected()) {
  29. return true;
  30. }
  31. }
  32. return false;
  33. }
  34. public function connect()
  35. {
  36. foreach ($this->_pool as $connection) {
  37. $connection->connect();
  38. }
  39. }
  40. public function disconnect()
  41. {
  42. foreach ($this->_pool as $connection) {
  43. $connection->disconnect();
  44. }
  45. }
  46. public function add(IConnectionSingle $connection)
  47. {
  48. $parameters = $connection->getParameters();
  49. if (isset($parameters->alias)) {
  50. $this->_pool[$parameters->alias] = $connection;
  51. }
  52. else {
  53. $this->_pool[] = $connection;
  54. }
  55. $this->_distributor->add($connection, $parameters->weight);
  56. }
  57. public function getConnection(ICommand $command)
  58. {
  59. $cmdHash = $command->getHash($this->_distributor);
  60. if (isset($cmdHash)) {
  61. return $this->_distributor->get($cmdHash);
  62. }
  63. throw new ClientException(
  64. sprintf("Cannot send '%s' commands to a cluster of connections", $command->getId())
  65. );
  66. }
  67. public function getConnectionById($id = null)
  68. {
  69. $alias = $id ?: 0;
  70. return isset($this->_pool[$alias]) ? $this->_pool[$alias] : null;
  71. }
  72. public function getConnectionByKey($key)
  73. {
  74. $hashablePart = Helpers::getKeyHashablePart($key);
  75. $keyHash = $this->_distributor->generateKey($hashablePart);
  76. return $this->_distributor->get($keyHash);
  77. }
  78. public function getIterator()
  79. {
  80. return new \ArrayIterator($this->_pool);
  81. }
  82. public function writeCommand(ICommand $command)
  83. {
  84. $this->getConnection($command)->writeCommand($command);
  85. }
  86. public function readResponse(ICommand $command)
  87. {
  88. return $this->getConnection($command)->readResponse($command);
  89. }
  90. public function executeCommand(ICommand $command)
  91. {
  92. return $this->getConnection($command)->executeCommand($command);
  93. }
  94. }