MasterSlaveReplication.php 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. <?php
  2. /*
  3. * This file is part of the Predis package.
  4. *
  5. * (c) Daniele Alessandri <suppakilla@gmail.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Predis\Connection;
  11. use InvalidArgumentException;
  12. use RuntimeException;
  13. use Predis\Command\CommandInterface;
  14. use Predis\Replication\ReplicationStrategy;
  15. /**
  16. * Aggregate connection handling replication of Redis nodes configured in a
  17. * single master / multiple slaves setup.
  18. *
  19. * @author Daniele Alessandri <suppakilla@gmail.com>
  20. */
  21. class MasterSlaveReplication implements ReplicationConnectionInterface
  22. {
  23. protected $strategy;
  24. protected $master;
  25. protected $slaves;
  26. protected $current;
  27. /**
  28. *
  29. */
  30. public function __construct(ReplicationStrategy $strategy = null)
  31. {
  32. $this->slaves = array();
  33. $this->strategy = $strategy ?: new ReplicationStrategy();
  34. }
  35. /**
  36. * Checks if one master and at least one slave have been defined.
  37. */
  38. protected function check()
  39. {
  40. if (!isset($this->master) || !$this->slaves) {
  41. throw new RuntimeException(
  42. 'Replication needs a master and at least one slave.'
  43. );
  44. }
  45. }
  46. /**
  47. * Resets the connection state.
  48. */
  49. protected function reset()
  50. {
  51. $this->current = null;
  52. }
  53. /**
  54. * {@inheritdoc}
  55. */
  56. public function add(SingleConnectionInterface $connection)
  57. {
  58. $alias = $connection->getParameters()->alias;
  59. if ($alias === 'master') {
  60. $this->master = $connection;
  61. } else {
  62. $this->slaves[$alias ?: count($this->slaves)] = $connection;
  63. }
  64. $this->reset();
  65. }
  66. /**
  67. * {@inheritdoc}
  68. */
  69. public function remove(SingleConnectionInterface $connection)
  70. {
  71. if ($connection->getParameters()->alias === 'master') {
  72. $this->master = null;
  73. $this->reset();
  74. return true;
  75. } else {
  76. if (($id = array_search($connection, $this->slaves, true)) !== false) {
  77. unset($this->slaves[$id]);
  78. $this->reset();
  79. return true;
  80. }
  81. }
  82. return false;
  83. }
  84. /**
  85. * {@inheritdoc}
  86. */
  87. public function getConnection(CommandInterface $command)
  88. {
  89. if ($this->current === null) {
  90. $this->check();
  91. $this->current = $this->strategy->isReadOperation($command)
  92. ? $this->pickSlave()
  93. : $this->master;
  94. return $this->current;
  95. }
  96. if ($this->current === $this->master) {
  97. return $this->current;
  98. }
  99. if (!$this->strategy->isReadOperation($command)) {
  100. $this->current = $this->master;
  101. }
  102. return $this->current;
  103. }
  104. /**
  105. * {@inheritdoc}
  106. */
  107. public function getConnectionById($connectionId)
  108. {
  109. if ($connectionId === 'master') {
  110. return $this->master;
  111. }
  112. if (isset($this->slaves[$connectionId])) {
  113. return $this->slaves[$connectionId];
  114. }
  115. return null;
  116. }
  117. /**
  118. * {@inheritdoc}
  119. */
  120. public function switchTo($connection)
  121. {
  122. $this->check();
  123. if (!$connection instanceof SingleConnectionInterface) {
  124. $connection = $this->getConnectionById($connection);
  125. }
  126. if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
  127. throw new InvalidArgumentException('The specified connection is not valid.');
  128. }
  129. $this->current = $connection;
  130. }
  131. /**
  132. * {@inheritdoc}
  133. */
  134. public function getCurrent()
  135. {
  136. return $this->current;
  137. }
  138. /**
  139. * {@inheritdoc}
  140. */
  141. public function getMaster()
  142. {
  143. return $this->master;
  144. }
  145. /**
  146. * {@inheritdoc}
  147. */
  148. public function getSlaves()
  149. {
  150. return array_values($this->slaves);
  151. }
  152. /**
  153. * Returns the underlying replication strategy.
  154. *
  155. * @return ReplicationStrategy
  156. */
  157. public function getReplicationStrategy()
  158. {
  159. return $this->strategy;
  160. }
  161. /**
  162. * Returns a random slave.
  163. *
  164. * @return SingleConnectionInterface
  165. */
  166. protected function pickSlave()
  167. {
  168. return $this->slaves[array_rand($this->slaves)];
  169. }
  170. /**
  171. * {@inheritdoc}
  172. */
  173. public function isConnected()
  174. {
  175. return $this->current ? $this->current->isConnected() : false;
  176. }
  177. /**
  178. * {@inheritdoc}
  179. */
  180. public function connect()
  181. {
  182. if ($this->current === null) {
  183. $this->check();
  184. $this->current = $this->pickSlave();
  185. }
  186. $this->current->connect();
  187. }
  188. /**
  189. * {@inheritdoc}
  190. */
  191. public function disconnect()
  192. {
  193. if ($this->master) {
  194. $this->master->disconnect();
  195. }
  196. foreach ($this->slaves as $connection) {
  197. $connection->disconnect();
  198. }
  199. }
  200. /**
  201. * {@inheritdoc}
  202. */
  203. public function writeRequest(CommandInterface $command)
  204. {
  205. $this->getConnection($command)->writeRequest($command);
  206. }
  207. /**
  208. * {@inheritdoc}
  209. */
  210. public function readResponse(CommandInterface $command)
  211. {
  212. return $this->getConnection($command)->readResponse($command);
  213. }
  214. /**
  215. * {@inheritdoc}
  216. */
  217. public function executeCommand(CommandInterface $command)
  218. {
  219. return $this->getConnection($command)->executeCommand($command);
  220. }
  221. /**
  222. * {@inheritdoc}
  223. */
  224. public function __sleep()
  225. {
  226. return array('master', 'slaves', 'strategy');
  227. }
  228. }