Parcourir la source

Merge branch 'replication-improvements'

Daniele Alessandri il y a 8 ans
Parent
commit
126998631b

+ 9 - 0
CHANGELOG.md

@@ -31,6 +31,15 @@ v1.1.0 (2015-xx-xx)
 - Implemented support for SSL-encrypted connections, the connection parameters
   must use either the `tls` or `rediss` scheme.
 
+- `Predis\Connection\Aggregate\MasterSlaveReplication` now tries to send again
+  read-only commands to the next slave when the one picked by the client fails,
+  and eventually switches to master is no other slave is available. The master
+  connection is also used for read-only commands when no slaves are registered.
+
+- `Predis\Connection\Aggregate\MasterSlaveReplication` exposes 2 new methods to
+  force a switch to the master connection (`switchToMaster()`) or to a randomly
+  picked slave (`switchToSlave()`).
+
 
 v1.0.3 (2015-07-30)
 ================================================================================

+ 100 - 30
src/Connection/Aggregate/MasterSlaveReplication.php

@@ -11,7 +11,9 @@
 
 namespace Predis\Connection\Aggregate;
 
+use Predis\ClientException;
 use Predis\Command\CommandInterface;
+use Predis\Connection\ConnectionException;
 use Predis\Connection\NodeConnectionInterface;
 use Predis\Replication\ReplicationStrategy;
 
@@ -36,7 +38,7 @@ class MasterSlaveReplication implements ReplicationInterface
     /**
      * @var NodeConnectionInterface[]
      */
-    protected $slaves;
+    protected $slaves = array();
 
     /**
      * @var NodeConnectionInterface
@@ -48,20 +50,9 @@ class MasterSlaveReplication implements ReplicationInterface
      */
     public function __construct(ReplicationStrategy $strategy = null)
     {
-        $this->slaves = array();
         $this->strategy = $strategy ?: new ReplicationStrategy();
     }
 
-    /**
-     * Checks if one master and at least one slave have been defined.
-     */
-    protected function check()
-    {
-        if (!isset($this->master) || !$this->slaves) {
-            throw new \RuntimeException('Replication needs one master and at least one slave.');
-        }
-    }
-
     /**
      * Resets the connection state.
      */
@@ -113,21 +104,22 @@ class MasterSlaveReplication implements ReplicationInterface
      */
     public function getConnection(CommandInterface $command)
     {
-        if ($this->current === null) {
-            $this->check();
-            $this->current = $this->strategy->isReadOperation($command)
-                ? $this->pickSlave()
-                : $this->master;
+        if (!$this->current) {
+            if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
+                $this->current = $slave;
+            } else {
+                $this->current = $this->getMasterOrDie();
+            }
 
             return $this->current;
         }
 
-        if ($this->current === $this->master) {
-            return $this->current;
+        if ($this->current === $master = $this->getMasterOrDie()) {
+            return $master;
         }
 
-        if (!$this->strategy->isReadOperation($command)) {
-            $this->current = $this->master;
+        if (!$this->strategy->isReadOperation($command) || !$this->slaves) {
+            $this->current = $master;
         }
 
         return $this->current;
@@ -154,11 +146,14 @@ class MasterSlaveReplication implements ReplicationInterface
      */
     public function switchTo($connection)
     {
-        $this->check();
-
         if (!$connection instanceof NodeConnectionInterface) {
             $connection = $this->getConnectionById($connection);
         }
+
+        if (!$connection) {
+            throw new \InvalidArgumentException('Invalid connection or connection not found.');
+        }
+
         if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
             throw new \InvalidArgumentException('Invalid connection or connection not found.');
         }
@@ -166,6 +161,23 @@ class MasterSlaveReplication implements ReplicationInterface
         $this->current = $connection;
     }
 
+    /**
+     * Switches to the master server.
+     */
+    public function switchToMaster()
+    {
+        $this->switchTo('master');
+    }
+
+    /**
+     * Switches to a random slave server.
+     */
+    public function switchToSlave()
+    {
+        $connection = $this->pickSlave();
+        $this->switchTo($connection);
+    }
+
     /**
      * {@inheritdoc}
      */
@@ -182,6 +194,20 @@ class MasterSlaveReplication implements ReplicationInterface
         return $this->master;
     }
 
+    /**
+     * Returns the connection associated to the master server.
+     *
+     * @return NodeConnectionInterface
+     */
+    private function getMasterOrDie()
+    {
+        if (!$connection = $this->getMaster()) {
+            throw new ClientException('No master server available for replication');
+        }
+
+        return $connection;
+    }
+
     /**
      * {@inheritdoc}
      */
@@ -207,7 +233,9 @@ class MasterSlaveReplication implements ReplicationInterface
      */
     protected function pickSlave()
     {
-        return $this->slaves[array_rand($this->slaves)];
+        if ($this->slaves) {
+            return $this->slaves[array_rand($this->slaves)];
+        }
     }
 
     /**
@@ -223,9 +251,12 @@ class MasterSlaveReplication implements ReplicationInterface
      */
     public function connect()
     {
-        if ($this->current === null) {
-            $this->check();
-            $this->current = $this->pickSlave();
+        if (!$this->current) {
+            if (!$this->current = $this->pickSlave()) {
+                if (!$this->current = $this->getMaster()) {
+                    throw new ClientException("No available connection for replication");
+                }
+            }
         }
 
         $this->current->connect();
@@ -245,12 +276,51 @@ class MasterSlaveReplication implements ReplicationInterface
         }
     }
 
+    /**
+     * Retries the execution of a command upon slave failure.
+     *
+     * @param CommandInterface $command Command instance.
+     * @param string           $method  Actual method.
+     *
+     * @return mixed
+     */
+    private function retryCommandOnFailure(CommandInterface $command, $method)
+    {
+        RETRY_COMMAND: {
+            try {
+                $response = $this->getConnection($command)->$method($command);
+            } catch (ConnectionException $exception) {
+                $connection = $exception->getConnection();
+                $connection->disconnect();
+
+                if ($connection === $this->master) {
+                    // Throw immediatly if the client was connected to master,
+                    // even when the command represents a read-only operation.
+                    throw $exception;
+                } else {
+                    // Otherwise remove the failing slave and attempt to execute
+                    // the command again on one of the remaining slaves...
+                    $this->remove($connection);
+                }
+
+                // ... that is, unless we have no more connections to use.
+                if (!$this->slaves && !$this->master) {
+                    throw $exception;
+                }
+
+                goto RETRY_COMMAND;
+            }
+        }
+
+        return $response;
+    }
+
     /**
      * {@inheritdoc}
      */
     public function writeRequest(CommandInterface $command)
     {
-        $this->getConnection($command)->writeRequest($command);
+        $this->retryCommandOnFailure($command, __FUNCTION__);
     }
 
     /**
@@ -258,7 +328,7 @@ class MasterSlaveReplication implements ReplicationInterface
      */
     public function readResponse(CommandInterface $command)
     {
-        return $this->getConnection($command)->readResponse($command);
+        return $this->retryCommandOnFailure($command, __FUNCTION__);
     }
 
     /**
@@ -266,7 +336,7 @@ class MasterSlaveReplication implements ReplicationInterface
      */
     public function executeCommand(CommandInterface $command)
     {
-        return $this->getConnection($command)->executeCommand($command);
+        return $this->retryCommandOnFailure($command, __FUNCTION__);
     }
 
     /**

+ 285 - 24
tests/Predis/Connection/Aggregate/MasterSlaveReplicationTest.php

@@ -65,8 +65,8 @@ class MasterSlaveReplicationTest extends PredisTestCase
 
     /**
      * @group disconnected
-     * @expectedException \RuntimeException
-     * @expectedExceptionMessage Replication needs one master and at least one slave.
+     * @expectedException \Predis\ClientException
+     * @expectedExceptionMessage No available connection for replication
      */
     public function testThrowsExceptionOnEmptyReplication()
     {
@@ -76,26 +76,18 @@ class MasterSlaveReplicationTest extends PredisTestCase
 
     /**
      * @group disconnected
-     * @expectedException \RuntimeException
-     * @expectedExceptionMessage Replication needs one master and at least one slave.
      */
-    public function testThrowsExceptionOnMissingMaster()
+    public function testConnectsToOneOfSlaves()
     {
-        $replication = new MasterSlaveReplication();
-        $replication->add($this->getMockConnection('tcp://host2?alias=slave1'));
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->never())->method('connect');
 
-        $replication->connect();
-    }
+        $slave = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave->expects($this->once())->method('connect');
 
-    /**
-     * @group disconnected
-     * @expectedException \RuntimeException
-     * @expectedExceptionMessage Replication needs one master and at least one slave.
-     */
-    public function testThrowsExceptionOnMissingSlave()
-    {
         $replication = new MasterSlaveReplication();
-        $replication->add($this->getMockConnection('tcp://host1?alias=master'));
+        $replication->add($master);
+        $replication->add($slave);
 
         $replication->connect();
     }
@@ -103,19 +95,15 @@ class MasterSlaveReplicationTest extends PredisTestCase
     /**
      * @group disconnected
      */
-    public function testConnectForcesConnectionToOneOfSlaves()
+    public function testConnectsToMasterOnMissingSlaves()
     {
         $master = $this->getMockConnection('tcp://host1?alias=master');
-        $master->expects($this->never())->method('connect');
-
-        $slave = $this->getMockConnection('tcp://host2?alias=slave1');
-        $slave->expects($this->once())->method('connect');
 
         $replication = new MasterSlaveReplication();
         $replication->add($master);
-        $replication->add($slave);
 
         $replication->connect();
+        $this->assertSame($master, $replication->getCurrent());
     }
 
     /**
@@ -203,7 +191,7 @@ class MasterSlaveReplicationTest extends PredisTestCase
      * @expectedException \InvalidArgumentException
      * @expectedExceptionMessage Invalid connection or connection not found.
      */
-    public function testThrowsErrorWhenSwitchingToUnknownConnection()
+    public function testThrowsErrorWhenSwitchingToUnknownConnectionByAlias()
     {
         $replication = new MasterSlaveReplication();
         $replication->add($this->getMockConnection('tcp://host1?alias=master'));
@@ -212,6 +200,113 @@ class MasterSlaveReplicationTest extends PredisTestCase
         $replication->switchTo('unknown');
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testCanSwitchConnectionByInstance()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $this->assertNull($replication->getCurrent());
+
+        $replication->switchTo($master);
+        $this->assertSame($master, $replication->getCurrent());
+        $replication->switchTo($slave1);
+        $this->assertSame($slave1, $replication->getCurrent());
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException \InvalidArgumentException
+     * @expectedExceptionMessage Invalid connection or connection not found.
+     */
+    public function testThrowsErrorWhenSwitchingToUnknownConnectionByInstance()
+    {
+        $replication = new MasterSlaveReplication();
+        $replication->add($this->getMockConnection('tcp://host1?alias=master'));
+        $replication->add($this->getMockConnection('tcp://host2?alias=slave1'));
+
+        $slave2 = $this->getMockConnection('tcp://host3?alias=slave2');
+
+        $replication->switchTo($slave2);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testCanSwitchToMaster()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave2 = $this->getMockConnection('tcp://host3?alias=slave2');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+        $replication->add($slave2);
+
+        $this->assertNull($replication->getCurrent());
+
+        $replication->switchToMaster();
+        $this->assertSame($master, $replication->getCurrent());
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException \InvalidArgumentException
+     * @expectedExceptionMessage Invalid connection or connection not found.
+     */
+    public function testThrowsErrorOnSwitchToMasterWithNoMasterDefined()
+    {
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($slave1);
+
+        $replication->switchToMaster();
+    }
+
+    /**
+     * @group disconnected
+     * @todo We should find a way to test that the slave is indeed randomly selected.
+     */
+    public function testCanSwitchToRandomSlave()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave2 = $this->getMockConnection('tcp://host3?alias=slave2');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+        $replication->add($slave2);
+
+        $this->assertNull($replication->getCurrent());
+
+        $replication->switchToSlave();
+        $this->assertSame($slave1, $replication->getCurrent());
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException \InvalidArgumentException
+     * @expectedExceptionMessage Invalid connection or connection not found.
+     */
+    public function testThrowsErrorOnSwitchToRandomSlaveWithNoSlavesDefined()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+
+        $replication->switchToSlave();
+    }
+
     /**
      * @group disconnected
      */
@@ -254,6 +349,25 @@ class MasterSlaveReplicationTest extends PredisTestCase
         $this->assertSame($master, $replication->getConnection($cmd));
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testUsesMasterOnReadRequestsWhenNoSlavesAvailable()
+    {
+        $profile = Profile\Factory::getDefault();
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+
+        $cmd = $profile->createCommand('exists', array('foo'));
+        $this->assertSame($master, $replication->getConnection($cmd));
+
+        $cmd = $profile->createCommand('set', array('foo', 'bar'));
+        $this->assertSame($master, $replication->getConnection($cmd));
+    }
+
     /**
      * @group disconnected
      */
@@ -433,6 +547,153 @@ class MasterSlaveReplicationTest extends PredisTestCase
         $replication->executeCommand($cmdSortStore);
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testDiscardsUnreachableSlaveAndExecutesReadOnlyCommandOnNextSlave()
+    {
+        $profile = Profile\Factory::getDefault();
+        $cmdExists = $profile->createCommand('exists', array('key'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->never())->method('executeCommand');
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->throwException(new Connection\ConnectionException($slave1)));
+
+        $slave2 = $this->getMockConnection('tcp://host3?alias=slave2');
+        $slave2->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->returnValue(1));
+
+        $replication = new MasterSlaveReplication();
+
+        $replication->add($master);
+        $replication->add($slave1);
+        $replication->add($slave2);
+
+        $replication->switchTo($slave1);
+
+        $response = $replication->executeCommand($cmdExists);
+
+        $this->assertSame(1, $response);
+        $this->assertNull($replication->getConnectionById('slave1'));
+        $this->assertSame($slave2, $replication->getConnectionById('slave2'));
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testDiscardsUnreachableSlavesAndExecutesReadOnlyCommandOnMaster()
+    {
+        $profile = Profile\Factory::getDefault();
+        $cmdExists = $profile->createCommand('exists', array('key'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->returnValue(1));
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->throwException(new Connection\ConnectionException($slave1)));
+
+        $slave2 = $this->getMockConnection('tcp://host3?alias=slave2');
+        $slave2->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->throwException(new Connection\ConnectionException($slave2)));
+
+        $replication = new MasterSlaveReplication();
+
+        $replication->add($master);
+        $replication->add($slave1);
+        $replication->add($slave2);
+
+        $replication->switchTo($slave1);
+
+        $response = $replication->executeCommand($cmdExists);
+
+        $this->assertSame(1, $response);
+        $this->assertNull($replication->getConnectionById('slave1'));
+        $this->assertNull($replication->getConnectionById('slave2'));
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testSucceedOnReadOnlyCommandAndNoConnectionSetAsMaster()
+    {
+        $profile = Profile\Factory::getDefault();
+        $cmdExists = $profile->createCommand('exists', array('key'));
+
+        $slave1 = $this->getMockConnection('tcp://host1?alias=slave1');
+        $slave1->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->returnValue(1));
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($slave1);
+
+        $response = $replication->executeCommand($cmdExists);
+
+        $this->assertSame(1, $response);
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException \Predis\ClientException
+     * @expectedMessage No master server available for replication
+     */
+    public function testFailsOnWriteCommandAndNoConnectionSetAsMaster()
+    {
+        $profile = Profile\Factory::getDefault();
+        $cmdSet = $profile->createCommand('set', array('key', 'value'));
+
+        $slave1 = $this->getMockConnection('tcp://host1?alias=slave1');
+        $slave1->expects($this->never())->method('executeCommand');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($slave1);
+
+        $replication->executeCommand($cmdSet);
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException \Predis\Connection\ConnectionException
+     */
+    public function testFailsOnUnreachableMaster()
+    {
+        $profile = Profile\Factory::getDefault();
+        $cmdSet = $profile->createCommand('set', array('key', 'value'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdSet)
+               ->will($this->throwException(new Connection\ConnectionException($master)));
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->never())
+               ->method('executeCommand');
+
+        $replication = new MasterSlaveReplication();
+
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->executeCommand($cmdSet);
+    }
+
     /**
      * @group disconnected
      * @expectedException \Predis\NotSupportedException