MasterSlaveReplication.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use Predis\NotSupportedException;
  12. use Predis\Command\CommandInterface;
  13. use Predis\Replication\ReplicationStrategy;
  14. /**
  15. * Aggregated connection class used by to handle replication with a
  16. * group of servers in a master/slave configuration.
  17. *
  18. * @author Daniele Alessandri <suppakilla@gmail.com>
  19. */
  20. class MasterSlaveReplication implements ReplicationConnectionInterface
  21. {
  22. protected $strategy;
  23. protected $master;
  24. protected $slaves;
  25. protected $current;
  26. /**
  27. *
  28. */
  29. public function __construct(ReplicationStrategy $strategy = null)
  30. {
  31. $this->slaves = array();
  32. $this->strategy = $strategy ?: new ReplicationStrategy();
  33. }
  34. /**
  35. * Checks if one master and at least one slave have been defined.
  36. */
  37. protected function check()
  38. {
  39. if (!isset($this->master) || !$this->slaves) {
  40. throw new \RuntimeException('Replication needs a master and at least one slave.');
  41. }
  42. }
  43. /**
  44. * Resets the connection state.
  45. */
  46. protected function reset()
  47. {
  48. $this->current = null;
  49. }
  50. /**
  51. * {@inheritdoc}
  52. */
  53. public function add(SingleConnectionInterface $connection)
  54. {
  55. $alias = $connection->getParameters()->alias;
  56. if ($alias === 'master') {
  57. $this->master = $connection;
  58. } else {
  59. $this->slaves[$alias ?: count($this->slaves)] = $connection;
  60. }
  61. $this->reset();
  62. }
  63. /**
  64. * {@inheritdoc}
  65. */
  66. public function remove(SingleConnectionInterface $connection)
  67. {
  68. if ($connection->getParameters()->alias === 'master') {
  69. $this->master = null;
  70. $this->reset();
  71. return true;
  72. } else {
  73. if (($id = array_search($connection, $this->slaves, true)) !== false) {
  74. unset($this->slaves[$id]);
  75. $this->reset();
  76. return true;
  77. }
  78. }
  79. return false;
  80. }
  81. /**
  82. * {@inheritdoc}
  83. */
  84. public function getConnection(CommandInterface $command)
  85. {
  86. if ($this->current === null) {
  87. $this->check();
  88. $this->current = $this->strategy->isReadOperation($command) ? $this->pickSlave() : $this->master;
  89. return $this->current;
  90. }
  91. if ($this->current === $this->master) {
  92. return $this->current;
  93. }
  94. if (!$this->strategy->isReadOperation($command)) {
  95. $this->current = $this->master;
  96. }
  97. return $this->current;
  98. }
  99. /**
  100. * {@inheritdoc}
  101. */
  102. public function getConnectionById($connectionId)
  103. {
  104. if ($connectionId === 'master') {
  105. return $this->master;
  106. }
  107. if (isset($this->slaves[$connectionId])) {
  108. return $this->slaves[$connectionId];
  109. }
  110. return null;
  111. }
  112. /**
  113. * {@inheritdoc}
  114. */
  115. public function switchTo($connection)
  116. {
  117. $this->check();
  118. if (!$connection instanceof SingleConnectionInterface) {
  119. $connection = $this->getConnectionById($connection);
  120. }
  121. if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
  122. throw new \InvalidArgumentException('The specified connection is not valid.');
  123. }
  124. $this->current = $connection;
  125. }
  126. /**
  127. * {@inheritdoc}
  128. */
  129. public function getCurrent()
  130. {
  131. return $this->current;
  132. }
  133. /**
  134. * {@inheritdoc}
  135. */
  136. public function getMaster()
  137. {
  138. return $this->master;
  139. }
  140. /**
  141. * {@inheritdoc}
  142. */
  143. public function getSlaves()
  144. {
  145. return array_values($this->slaves);
  146. }
  147. /**
  148. * Returns the underlying replication strategy.
  149. *
  150. * @return ReplicationStrategy
  151. */
  152. public function getReplicationStrategy()
  153. {
  154. return $this->strategy;
  155. }
  156. /**
  157. * Returns a random slave.
  158. *
  159. * @return SingleConnectionInterface
  160. */
  161. protected function pickSlave()
  162. {
  163. return $this->slaves[array_rand($this->slaves)];
  164. }
  165. /**
  166. * {@inheritdoc}
  167. */
  168. public function isConnected()
  169. {
  170. return $this->current ? $this->current->isConnected() : false;
  171. }
  172. /**
  173. * {@inheritdoc}
  174. */
  175. public function connect()
  176. {
  177. if ($this->current === null) {
  178. $this->check();
  179. $this->current = $this->pickSlave();
  180. }
  181. $this->current->connect();
  182. }
  183. /**
  184. * {@inheritdoc}
  185. */
  186. public function disconnect()
  187. {
  188. if ($this->master) {
  189. $this->master->disconnect();
  190. }
  191. foreach ($this->slaves as $connection) {
  192. $connection->disconnect();
  193. }
  194. }
  195. /**
  196. * {@inheritdoc}
  197. */
  198. public function writeCommand(CommandInterface $command)
  199. {
  200. $this->getConnection($command)->writeCommand($command);
  201. }
  202. /**
  203. * {@inheritdoc}
  204. */
  205. public function readResponse(CommandInterface $command)
  206. {
  207. return $this->getConnection($command)->readResponse($command);
  208. }
  209. /**
  210. * {@inheritdoc}
  211. */
  212. public function executeCommand(CommandInterface $command)
  213. {
  214. return $this->getConnection($command)->executeCommand($command);
  215. }
  216. /**
  217. * {@inheritdoc}
  218. */
  219. public function __sleep()
  220. {
  221. return array('master', 'slaves', 'strategy');
  222. }
  223. }