* * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. */ namespace Predis\Connection; use Predis\NotSupportedException; use Predis\Command\CommandInterface; use Predis\Replication\ReplicationStrategy; /** * Aggregated connection class used by to handle replication with a * group of servers in a master/slave configuration. * * @author Daniele Alessandri */ class MasterSlaveReplication implements ReplicationConnectionInterface { protected $strategy; protected $master; protected $slaves; protected $current; /** * */ public function __construct(ReplicationStrategy $strategy = null) { $this->slaves = array(); $this->strategy = $strategy ?: new ReplicationStrategy(); } /** * Checks if one master and at least one slave have been defined. */ protected function check() { if (!isset($this->master) || !$this->slaves) { throw new \RuntimeException('Replication needs a master and at least one slave.'); } } /** * Resets the connection state. */ protected function reset() { $this->current = null; } /** * {@inheritdoc} */ public function add(SingleConnectionInterface $connection) { $alias = $connection->getParameters()->alias; if ($alias === 'master') { $this->master = $connection; } else { $this->slaves[$alias ?: count($this->slaves)] = $connection; } $this->reset(); } /** * {@inheritdoc} */ public function remove(SingleConnectionInterface $connection) { if ($connection->getParameters()->alias === 'master') { $this->master = null; $this->reset(); return true; } else { if (($id = array_search($connection, $this->slaves, true)) !== false) { unset($this->slaves[$id]); $this->reset(); return true; } } return false; } /** * {@inheritdoc} */ public function getConnection(CommandInterface $command) { if ($this->current === null) { $this->check(); $this->current = $this->strategy->isReadOperation($command) ? $this->pickSlave() : $this->master; return $this->current; } if ($this->current === $this->master) { return $this->current; } if (!$this->strategy->isReadOperation($command)) { $this->current = $this->master; } return $this->current; } /** * {@inheritdoc} */ public function getConnectionById($connectionId) { if ($connectionId === 'master') { return $this->master; } if (isset($this->slaves[$connectionId])) { return $this->slaves[$connectionId]; } return null; } /** * {@inheritdoc} */ public function switchTo($connection) { $this->check(); if (!$connection instanceof SingleConnectionInterface) { $connection = $this->getConnectionById($connection); } if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) { throw new \InvalidArgumentException('The specified connection is not valid.'); } $this->current = $connection; } /** * {@inheritdoc} */ public function getCurrent() { return $this->current; } /** * {@inheritdoc} */ public function getMaster() { return $this->master; } /** * {@inheritdoc} */ public function getSlaves() { return array_values($this->slaves); } /** * Returns the underlying replication strategy. * * @return ReplicationStrategy */ public function getReplicationStrategy() { return $this->strategy; } /** * Returns a random slave. * * @return SingleConnectionInterface */ protected function pickSlave() { return $this->slaves[array_rand($this->slaves)]; } /** * {@inheritdoc} */ public function isConnected() { return $this->current ? $this->current->isConnected() : false; } /** * {@inheritdoc} */ public function connect() { if ($this->current === null) { $this->check(); $this->current = $this->pickSlave(); } $this->current->connect(); } /** * {@inheritdoc} */ public function disconnect() { if ($this->master) { $this->master->disconnect(); } foreach ($this->slaves as $connection) { $connection->disconnect(); } } /** * {@inheritdoc} */ public function writeCommand(CommandInterface $command) { $this->getConnection($command)->writeCommand($command); } /** * {@inheritdoc} */ public function readResponse(CommandInterface $command) { return $this->getConnection($command)->readResponse($command); } /** * {@inheritdoc} */ public function executeCommand(CommandInterface $command) { return $this->getConnection($command)->executeCommand($command); } /** * {@inheritdoc} */ public function __sleep() { return array('master', 'slaves', 'strategy'); } }