123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- <?php
- /*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
- namespace Predis\Connection\Aggregate;
- use Predis\ClientException;
- use Predis\Command\CommandInterface;
- use Predis\Connection\ConnectionException;
- use Predis\Connection\NodeConnectionInterface;
- use Predis\Replication\ReplicationStrategy;
- /**
- * Aggregate connection handling replication of Redis nodes configured in a
- * single master / multiple slaves setup.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
- class MasterSlaveReplication implements ReplicationInterface
- {
- /**
- * @var ReplicationStrategy
- */
- protected $strategy;
- /**
- * @var NodeConnectionInterface
- */
- protected $master;
- /**
- * @var NodeConnectionInterface[]
- */
- protected $slaves = array();
- /**
- * @var NodeConnectionInterface
- */
- protected $current;
- /**
- * {@inheritdoc}
- */
- public function __construct(ReplicationStrategy $strategy = null)
- {
- $this->strategy = $strategy ?: new ReplicationStrategy();
- }
- /**
- * Resets the connection state.
- */
- protected function reset()
- {
- $this->current = null;
- }
- /**
- * {@inheritdoc}
- */
- public function add(NodeConnectionInterface $connection)
- {
- $alias = $connection->getParameters()->alias;
- if ($alias === 'master') {
- $this->master = $connection;
- } else {
- $this->slaves[$alias ?: "slave-$connection"] = $connection;
- }
- $this->reset();
- }
- /**
- * {@inheritdoc}
- */
- public function remove(NodeConnectionInterface $connection)
- {
- if ($connection->getParameters()->alias === 'master') {
- $this->master = null;
- $this->reset();
- return true;
- } else {
- if (($id = array_search($connection, $this->slaves, true)) !== false) {
- unset($this->slaves[$id]);
- $this->reset();
- return true;
- }
- }
- return false;
- }
- /**
- * {@inheritdoc}
- */
- public function getConnection(CommandInterface $command)
- {
- if (!$this->current) {
- if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
- $this->current = $slave;
- } else {
- $this->current = $this->getMasterOrDie();
- }
- return $this->current;
- }
- if ($this->current === $master = $this->getMasterOrDie()) {
- return $master;
- }
- if (!$this->strategy->isReadOperation($command) || !$this->slaves) {
- $this->current = $master;
- }
- return $this->current;
- }
- /**
- * {@inheritdoc}
- */
- public function getConnectionById($connectionId)
- {
- if ($connectionId === 'master') {
- return $this->master;
- }
- if (isset($this->slaves[$connectionId])) {
- return $this->slaves[$connectionId];
- }
- return;
- }
- /**
- * {@inheritdoc}
- */
- public function switchTo($connection)
- {
- if (!$connection instanceof NodeConnectionInterface) {
- $connection = $this->getConnectionById($connection);
- }
- if (!$connection) {
- throw new \InvalidArgumentException('Invalid connection or connection not found.');
- }
- if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
- throw new \InvalidArgumentException('Invalid connection or connection not found.');
- }
- $this->current = $connection;
- }
- /**
- * Switches to the master server.
- */
- public function switchToMaster()
- {
- $this->switchTo('master');
- }
- /**
- * Switches to a random slave server.
- */
- public function switchToSlave()
- {
- $connection = $this->pickSlave();
- $this->switchTo($connection);
- }
- /**
- * {@inheritdoc}
- */
- public function getCurrent()
- {
- return $this->current;
- }
- /**
- * {@inheritdoc}
- */
- public function getMaster()
- {
- return $this->master;
- }
- /**
- * Returns the connection associated to the master server.
- *
- * @return NodeConnectionInterface
- */
- private function getMasterOrDie()
- {
- if (!$connection = $this->getMaster()) {
- throw new ClientException('No master server available for replication');
- }
- return $connection;
- }
- /**
- * {@inheritdoc}
- */
- public function getSlaves()
- {
- return array_values($this->slaves);
- }
- /**
- * Returns the underlying replication strategy.
- *
- * @return ReplicationStrategy
- */
- public function getReplicationStrategy()
- {
- return $this->strategy;
- }
- /**
- * Returns a random slave.
- *
- * @return NodeConnectionInterface
- */
- protected function pickSlave()
- {
- if ($this->slaves) {
- return $this->slaves[array_rand($this->slaves)];
- }
- }
- /**
- * {@inheritdoc}
- */
- public function isConnected()
- {
- return $this->current ? $this->current->isConnected() : false;
- }
- /**
- * {@inheritdoc}
- */
- public function connect()
- {
- if (!$this->current) {
- if (!$this->current = $this->pickSlave()) {
- if (!$this->current = $this->getMaster()) {
- throw new ClientException("No available connection for replication");
- }
- }
- }
- $this->current->connect();
- }
- /**
- * {@inheritdoc}
- */
- public function disconnect()
- {
- if ($this->master) {
- $this->master->disconnect();
- }
- foreach ($this->slaves as $connection) {
- $connection->disconnect();
- }
- }
- /**
- * Retries the execution of a command upon slave failure.
- *
- * @param CommandInterface $command Command instance.
- * @param string $method Actual method.
- *
- * @return mixed
- */
- private function retryCommandOnFailure(CommandInterface $command, $method)
- {
- RETRY_COMMAND: {
- try {
- $response = $this->getConnection($command)->$method($command);
- } catch (ConnectionException $exception) {
- $connection = $exception->getConnection();
- $connection->disconnect();
- if ($connection === $this->master) {
- // Throw immediatly if the client was connected to master,
- // even when the command represents a read-only operation.
- throw $exception;
- } else {
- // Otherwise remove the failing slave and attempt to execute
- // the command again on one of the remaining slaves...
- $this->remove($connection);
- }
- // ... that is, unless we have no more connections to use.
- if (!$this->slaves && !$this->master) {
- throw $exception;
- }
- goto RETRY_COMMAND;
- }
- }
- return $response;
- }
- /**
- * {@inheritdoc}
- */
- public function writeRequest(CommandInterface $command)
- {
- $this->retryCommandOnFailure($command, __FUNCTION__);
- }
- /**
- * {@inheritdoc}
- */
- public function readResponse(CommandInterface $command)
- {
- return $this->retryCommandOnFailure($command, __FUNCTION__);
- }
- /**
- * {@inheritdoc}
- */
- public function executeCommand(CommandInterface $command)
- {
- return $this->retryCommandOnFailure($command, __FUNCTION__);
- }
- /**
- * {@inheritdoc}
- */
- public function __sleep()
- {
- return array('master', 'slaves', 'strategy');
- }
- }
|