Эх сурвалжийг харах

Implement discovery in basic replication.

Now the client can discover the whole replication configuration by
asking to one of the servers (master has the precedence) using the
INFO REPLICATION command. This is obviously a best-effort fallback
and there is no strong guarantee about reliability and efficiency.

By enabling auto-discovery, the client automates this process when
the execution of a command fails because one of the target servers
is unreachable. The replication connection requires an instance of
connection factory associated to it in order to be able to create
new connections on the fly.

It is possible to enable the auto-discovery procedure easily via
client options:

  $client = new Predis\Client($servers, [
    'replication' => true,
    'autodiscovery' => true,
  ]);
Daniele Alessandri 9 жил өмнө
parent
commit
9f6759ca1c

+ 8 - 1
src/Configuration/ReplicationOption.php

@@ -56,6 +56,13 @@ class ReplicationOption implements OptionInterface
      */
     public function getDefault(OptionsInterface $options)
     {
-        return new MasterSlaveReplication();
+        $replication = new MasterSlaveReplication();
+
+        if ($options->autodiscovery) {
+            $replication->setConnectionFactory($options->connections);
+            $replication->setAutoDiscovery(true);
+        }
+
+        return $replication;
     }
 }

+ 159 - 5
src/Connection/Aggregate/MasterSlaveReplication.php

@@ -13,8 +13,11 @@ namespace Predis\Connection\Aggregate;
 
 use Predis\ClientException;
 use Predis\Command\CommandInterface;
+use Predis\Command\RawCommand;
 use Predis\Connection\ConnectionException;
+use Predis\Connection\FactoryInterface;
 use Predis\Connection\NodeConnectionInterface;
+use Predis\Replication\MissingMasterException;
 use Predis\Replication\ReplicationStrategy;
 
 /**
@@ -45,6 +48,16 @@ class MasterSlaveReplication implements ReplicationInterface
      */
     protected $current;
 
+    /**
+     * @var bool
+     */
+    protected $autoDiscovery = false;
+
+    /**
+     * @var FactoryInterface
+     */
+    protected $connectionFactory;
+
     /**
      * {@inheritdoc}
      */
@@ -53,6 +66,31 @@ class MasterSlaveReplication implements ReplicationInterface
         $this->strategy = $strategy ?: new ReplicationStrategy();
     }
 
+    /**
+     * Configures the automatic discovery of the replication configuration on failure.
+     *
+     * @param bool $value Enable or disable auto discovery.
+     */
+    public function setAutoDiscovery($value)
+    {
+        if (!$this->connectionFactory) {
+            throw new ClientException('Automatic discovery requires a connection factory');
+        }
+
+        $this->autoDiscovery = (bool) $value;
+    }
+
+    /**
+     * Sets the connection factory used to create the connections by the auto
+     * discovery procedure.
+     *
+     * @param FactoryInterface $connectionFactory Connection factory instance.
+     */
+    public function setConnectionFactory(FactoryInterface $connectionFactory)
+    {
+        $this->connectionFactory = $connectionFactory;
+    }
+
     /**
      * Resets the connection state.
      */
@@ -202,7 +240,7 @@ class MasterSlaveReplication implements ReplicationInterface
     private function getMasterOrDie()
     {
         if (!$connection = $this->getMaster()) {
-            throw new ClientException('No master server available for replication');
+            throw new MissingMasterException('No master server available for replication');
         }
 
         return $connection;
@@ -254,7 +292,7 @@ class MasterSlaveReplication implements ReplicationInterface
         if (!$this->current) {
             if (!$this->current = $this->pickSlave()) {
                 if (!$this->current = $this->getMaster()) {
-                    throw new ClientException("No available connection for replication");
+                    throw new ClientException('No available connection for replication');
                 }
             }
         }
@@ -276,6 +314,111 @@ class MasterSlaveReplication implements ReplicationInterface
         }
     }
 
+    /**
+     * Handles response from INFO.
+     *
+     * @param string $response
+     *
+     * @return array
+     */
+    private function handleInfoResponse($response)
+    {
+        $info = array();
+
+        foreach (preg_split('/\r?\n/', $response) as $row) {
+            if (strpos($row, ':') === false) {
+                continue;
+            }
+
+            list($k, $v) = explode(':', $row, 2);
+            $info[$k] = $v;
+        }
+
+        return $info;
+    }
+
+    /**
+     * Fetches the replication configuration from one of the servers.
+     */
+    public function discover()
+    {
+        if (!$this->connectionFactory) {
+            throw new ClientException('Discovery requires a connection factory');
+        }
+
+        RETRY_FETCH: {
+            try {
+                if ($connection = $this->getMaster()) {
+                    $this->discoverFromMaster($connection, $this->connectionFactory);
+                } elseif ($connection = $this->pickSlave()) {
+                    $this->discoverFromSlave($connection, $this->connectionFactory);
+                } else {
+                    throw new ClientException('No connection available for discovery');
+                }
+            } catch (ConnectionException $exception) {
+                $this->remove($connection);
+                goto RETRY_FETCH;
+            }
+        }
+    }
+
+    /**
+     * Discovers the replication configuration by contacting the master node.
+     *
+     * @param NodeConnectionInterface $connection        Connection to the master node.
+     * @param FactoryInterface        $connectionFactory Connection factory instance.
+     */
+    protected function discoverFromMaster(NodeConnectionInterface $connection, FactoryInterface $connectionFactory)
+    {
+        $response = $connection->executeCommand(RawCommand::create('INFO', 'REPLICATION'));
+        $replication = $this->handleInfoResponse($response);
+
+        if ($replication['role'] !== 'master') {
+            throw new ClientException("Role mismatch (expected master, got slave) [$connection]");
+        }
+
+        $this->slaves = array();
+
+        foreach ($replication as $k => $v) {
+            $parameters = null;
+
+            if (strpos($k, 'slave') === 0 && preg_match('/ip=(?P<host>.*),port=(?P<port>\d+)/', $v, $parameters)) {
+                $slaveConnection = $connectionFactory->create(array(
+                    'host' => $parameters['host'],
+                    'port' => $parameters['port'],
+                ));
+
+                $this->add($slaveConnection);
+            }
+        }
+    }
+
+    /**
+     * Discovers the replication configuration by contacting one of the slaves.
+     *
+     * @param NodeConnectionInterface $connection        Connection to one of the slaves.
+     * @param FactoryInterface        $connectionFactory Connection factory instance.
+     */
+    protected function discoverFromSlave(NodeConnectionInterface $connection, FactoryInterface $connectionFactory)
+    {
+        $response = $connection->executeCommand(RawCommand::create('INFO', 'REPLICATION'));
+        $replication = $this->handleInfoResponse($response);
+
+        if ($replication['role'] !== 'slave') {
+            throw new ClientException("Role mismatch (expected slave, got master) [$connection]");
+        }
+
+        $masterConnection = $connectionFactory->create(array(
+            'host' => $replication['master_host'],
+            'port' => $replication['master_port'],
+            'alias' => 'master',
+        ));
+
+        $this->add($masterConnection);
+
+        $this->discoverFromMaster($masterConnection, $connectionFactory);
+    }
+
     /**
      * Retries the execution of a command upon slave failure.
      *
@@ -293,9 +436,10 @@ class MasterSlaveReplication implements ReplicationInterface
                 $connection = $exception->getConnection();
                 $connection->disconnect();
 
-                if ($connection === $this->master) {
-                    // Throw immediatly if the client was connected to master,
-                    // even when the command represents a read-only operation.
+                if ($connection === $this->master && !$this->autoDiscovery) {
+                    // Throw immediatly when master connection is failing, even
+                    // if the command represents a read-only operation, unless
+                    // automatic discovery has been enabled.
                     throw $exception;
                 } else {
                     // Otherwise remove the failing slave and attempt to execute
@@ -306,6 +450,16 @@ class MasterSlaveReplication implements ReplicationInterface
                 // ... that is, unless we have no more connections to use.
                 if (!$this->slaves && !$this->master) {
                     throw $exception;
+                } elseif ($this->autoDiscovery) {
+                    $this->discover();
+                }
+
+                goto RETRY_COMMAND;
+            } catch (MissingMasterException $exception) {
+                if ($this->autoDiscovery) {
+                    $this->discover();
+                } else {
+                    throw $exception;
                 }
 
                 goto RETRY_COMMAND;

+ 23 - 0
src/Replication/MissingMasterException.php

@@ -0,0 +1,23 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Replication;
+
+use Predis\ClientException;
+
+/**
+ * Exception class that identifies when master is missing in a replication setup.
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class MissingMasterException extends ClientException
+{
+}

+ 28 - 0
tests/Predis/Configuration/ReplicationOptionTest.php

@@ -53,6 +53,34 @@ class ReplicationOptionTest extends PredisTestCase
         $this->assertNull($option->filter($options, 'off'));
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testConfiguresAutomaticDiscoveryWhenAutodiscoveryOptionIsPresent()
+    {
+        $option = new ReplicationOption();
+        $options = $this->getMock('Predis\Configuration\OptionsInterface');
+        $connFactory = $this->getMock('Predis\Connection\FactoryInterface');
+
+        $options->expects($this->at(0))
+                ->method('__get')
+                ->with('autodiscovery')
+                ->will($this->returnValue(true));
+        $options->expects($this->at(1))
+                ->method('__get')
+                ->with('connections')
+                ->will($this->returnValue($connFactory));
+
+        $replication = $option->getDefault($options);
+
+        // TODO: I know, I know...
+        $reflection = new \ReflectionProperty($replication, 'autoDiscovery');
+        $reflection->setAccessible(true);
+
+        $this->assertTrue($reflection->getValue($replication));
+
+    }
+
     /**
      * @group disconnected
      * @expectedException \InvalidArgumentException

+ 290 - 1
tests/Predis/Connection/Aggregate/MasterSlaveReplicationTest.php

@@ -12,6 +12,7 @@
 namespace Predis\Connection\Aggregate;
 
 use Predis\Connection;
+use Predis\Command;
 use Predis\Profile;
 use Predis\Replication\ReplicationStrategy;
 use PredisTestCase;
@@ -664,7 +665,7 @@ class MasterSlaveReplicationTest extends PredisTestCase
 
     /**
      * @group disconnected
-     * @expectedException \Predis\ClientException
+     * @expectedException \Predis\Replication\MissingMasterException
      * @expectedMessage No master server available for replication
      */
     public function testFailsOnWriteCommandAndNoConnectionSetAsMaster()
@@ -807,6 +808,294 @@ class MasterSlaveReplicationTest extends PredisTestCase
         $replication->executeCommand($cmdEvalSha);
     }
 
+    /**
+     * @group disconnected
+     * @expectedException \Predis\ClientException
+     * @expectedMessage Discovery requires a connection factory
+     */
+    public function testDiscoveryRequiresConnectionFactory()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+
+        $replication->discover();
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testDiscoversReplicationConfigurationFromMaster()
+    {
+        $connFactory = new Connection\Factory();
+        $cmdInfo = Command\RawCommand::create('INFO', 'REPLICATION');
+
+        $master = $this->getMockConnection('tcp://127.0.0.1:6381?alias=master');
+        $master->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdInfo)
+               ->will($this->returnValue("
+# Replication
+role:master
+connected_slaves:2
+slave0:ip=127.0.0.1,port=6382,state=online,offset=12979,lag=0
+slave1:ip=127.0.0.1,port=6383,state=online,offset=12979,lag=1
+master_repl_offset:12979
+repl_backlog_active:1
+repl_backlog_size:1048576
+repl_backlog_first_byte_offset:2
+repl_backlog_histlen:12978
+"));
+
+        $replication = new MasterSlaveReplication();
+        $replication->setConnectionFactory($connFactory);
+
+        $replication->add($master);
+
+        $replication->discover();
+
+        $this->assertCount(2, $slaves = $replication->getSlaves());
+        $this->assertContainsOnlyInstancesOf('Predis\Connection\ConnectionInterface', $slaves);
+
+        $this->assertSame('127.0.0.1:6381', (string) $replication->getMaster());
+        $this->assertSame('127.0.0.1:6382', (string) $slaves[0]);
+        $this->assertSame('127.0.0.1:6383', (string) $slaves[1]);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testDiscoversReplicationConfigurationFromSlave()
+    {
+        $cmdInfo = $command = Command\RawCommand::create('INFO', 'REPLICATION');
+
+        $master = $this->getMockConnection('tcp://127.0.0.1:6381?alias=master');
+        $slave1 = $this->getMockConnection('tcp://127.0.0.1:6382?alias=slave1');
+        $slave2 = $this->getMockConnection('tcp://127.0.0.1:6383?alias=slave2');
+
+        $connFactory = $this->getMock('Predis\Connection\Factory');
+        $connFactory->expects($this->at(0))
+                    ->method('create')
+                    ->with(array('host' => '127.0.0.1', 'port' => '6381', 'alias' => 'master'))
+                    ->will($this->returnValue($master));
+        $connFactory->expects($this->at(1))
+                    ->method('create')
+                    ->with(array('host' => '127.0.0.1', 'port' => '6382'))
+                    ->will($this->returnValue($slave1));
+        $connFactory->expects($this->at(2))
+                    ->method('create')
+                    ->with(array('host' => '127.0.0.1', 'port' => '6383'))
+                    ->will($this->returnValue($slave2));
+
+        $slave1->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdInfo)
+               ->will($this->returnValue("
+# Replication
+role:slave
+master_host:127.0.0.1
+master_port:6381
+master_link_status:up
+master_last_io_seconds_ago:8
+master_sync_in_progress:0
+slave_repl_offset:17715532
+slave_priority:100
+slave_read_only:1
+connected_slaves:0
+master_repl_offset:0
+repl_backlog_active:0
+repl_backlog_size:1048576
+repl_backlog_first_byte_offset:0
+repl_backlog_histlen:0
+"));
+
+        $master->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdInfo)
+               ->will($this->returnValue("
+# Replication
+role:master
+connected_slaves:2
+slave0:ip=127.0.0.1,port=6382,state=online,offset=12979,lag=0
+slave1:ip=127.0.0.1,port=6383,state=online,offset=12979,lag=1
+master_repl_offset:12979
+repl_backlog_active:1
+repl_backlog_size:1048576
+repl_backlog_first_byte_offset:2
+repl_backlog_histlen:12978
+"));
+
+        $replication = new MasterSlaveReplication();
+        $replication->setConnectionFactory($connFactory);
+
+        $replication->add($slave1);
+
+        $replication->discover();
+
+        $this->assertCount(2, $slaves = $replication->getSlaves());
+        $this->assertContainsOnlyInstancesOf('Predis\Connection\ConnectionInterface', $slaves);
+
+        $this->assertSame('127.0.0.1:6381', (string) $replication->getMaster());
+        $this->assertSame('127.0.0.1:6382', (string) $slaves[0]);
+        $this->assertSame('127.0.0.1:6383', (string) $slaves[1]);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testDiscoversReplicationConfigurationFromSlaveIfMasterFails()
+    {
+        $cmdInfo = $command = Command\RawCommand::create('INFO', 'REPLICATION');
+
+        $masterKO = $this->getMockConnection('tcp://127.0.0.1:7381?alias=master');
+        $master = $this->getMockConnection('tcp://127.0.0.1:6381?alias=master');
+        $slave1 = $this->getMockConnection('tcp://127.0.0.1:6382?alias=slave1');
+        $slave2 = $this->getMockConnection('tcp://127.0.0.1:6383?alias=slave2');
+
+        $connFactory = $this->getMock('Predis\Connection\Factory');
+        $connFactory->expects($this->at(0))
+                    ->method('create')
+                    ->with(array('host' => '127.0.0.1', 'port' => '6381', 'alias' => 'master'))
+                    ->will($this->returnValue($master));
+        $connFactory->expects($this->at(1))
+                    ->method('create')
+                    ->with(array('host' => '127.0.0.1', 'port' => '6382'))
+                    ->will($this->returnValue($slave1));
+        $connFactory->expects($this->at(2))
+                    ->method('create')
+                    ->with(array('host' => '127.0.0.1', 'port' => '6383'))
+                    ->will($this->returnValue($slave2));
+
+
+        $masterKO->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdInfo)
+               ->will($this->throwException(new Connection\ConnectionException($masterKO)));
+
+        $slave1->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdInfo)
+               ->will($this->returnValue("
+# Replication
+role:slave
+master_host:127.0.0.1
+master_port:6381
+master_link_status:up
+master_last_io_seconds_ago:8
+master_sync_in_progress:0
+slave_repl_offset:17715532
+slave_priority:100
+slave_read_only:1
+connected_slaves:0
+master_repl_offset:0
+repl_backlog_active:0
+repl_backlog_size:1048576
+repl_backlog_first_byte_offset:0
+repl_backlog_histlen:0
+"));
+
+        $master->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdInfo)
+               ->will($this->returnValue("
+# Replication
+role:master
+connected_slaves:2
+slave0:ip=127.0.0.1,port=6382,state=online,offset=12979,lag=0
+slave1:ip=127.0.0.1,port=6383,state=online,offset=12979,lag=1
+master_repl_offset:12979
+repl_backlog_active:1
+repl_backlog_size:1048576
+repl_backlog_first_byte_offset:2
+repl_backlog_histlen:12978
+"));
+
+        $replication = new MasterSlaveReplication();
+        $replication->setConnectionFactory($connFactory);
+
+        $replication->add($masterKO);
+        $replication->add($slave1);
+
+        $replication->discover();
+
+        $this->assertCount(2, $slaves = $replication->getSlaves());
+        $this->assertContainsOnlyInstancesOf('Predis\Connection\ConnectionInterface', $slaves);
+
+        $this->assertSame('127.0.0.1:6381', (string) $replication->getMaster());
+        $this->assertSame('127.0.0.1:6382', (string) $slaves[0]);
+        $this->assertSame('127.0.0.1:6383', (string) $slaves[1]);
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException \Predis\ClientException
+     * @expectedMessage Automatic discovery requires a connection factory
+     */
+    public function testAutomaticDiscoveryRequiresConnectionFactory()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+
+        $replication->setAutoDiscovery(true);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testAutomaticDiscoveryOnUnreachableServer()
+    {
+        $cmdInfo = $command = Command\RawCommand::create('INFO', 'REPLICATION');
+        $cmdExists = $command = Command\RawCommand::create('EXISTS', 'key');
+
+        $slaveKO = $this->getMockConnection('tcp://127.0.0.1:7382?alias=slaveKO');
+        $master = $this->getMockConnection('tcp://127.0.0.1:6381?alias=master');
+        $slave1 = $this->getMockConnection('tcp://127.0.0.1:6382?alias=slave1');
+
+        $connFactory = $this->getMock('Predis\Connection\Factory');
+        $connFactory->expects($this->once())
+                    ->method('create')
+                    ->with(array('host' => '127.0.0.1', 'port' => '6382'))
+                    ->will($this->returnValue($slave1));
+
+
+        $slaveKO->expects($this->once())
+                ->method('executeCommand')
+                ->with($cmdExists)
+                ->will($this->throwException(new Connection\ConnectionException($slaveKO)));
+
+        $slave1->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->returnValue(1));
+
+        $master->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdInfo)
+               ->will($this->returnValue("
+# Replication
+role:master
+connected_slaves:2
+slave0:ip=127.0.0.1,port=6382,state=online,offset=12979,lag=0
+master_repl_offset:12979
+repl_backlog_active:1
+repl_backlog_size:1048576
+repl_backlog_first_byte_offset:2
+repl_backlog_histlen:12978
+"));
+
+        $replication = new MasterSlaveReplication();
+        $replication->setConnectionFactory($connFactory);
+        $replication->setAutoDiscovery(true);
+
+        $replication->add($master);
+        $replication->add($slaveKO);
+
+        $replication->executeCommand($cmdExists);
+    }
+
     /**
      * @group disconnected
      */