Browse Source

Send read-only commands on next slave on failure.

If no other slave is available try again on master as last resort
before giving up and throwing an exception.
Daniele Alessandri 9 years ago
parent
commit
b491dff126

+ 60 - 5
src/Connection/Aggregate/MasterSlaveReplication.php

@@ -11,7 +11,9 @@
 
 
 namespace Predis\Connection\Aggregate;
 namespace Predis\Connection\Aggregate;
 
 
+use Predis\ClientException;
 use Predis\Command\CommandInterface;
 use Predis\Command\CommandInterface;
+use Predis\Connection\ConnectionException;
 use Predis\Connection\NodeConnectionInterface;
 use Predis\Connection\NodeConnectionInterface;
 use Predis\Replication\ReplicationStrategy;
 use Predis\Replication\ReplicationStrategy;
 
 
@@ -116,13 +118,13 @@ class MasterSlaveReplication implements ReplicationInterface
             if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
             if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
                 $this->current = $slave;
                 $this->current = $slave;
             } else {
             } else {
-                $this->current = $this->getMaster();
+                $this->current = $this->getMasterOrDie();
             }
             }
 
 
             return $this->current;
             return $this->current;
         }
         }
 
 
-        if ($this->current === $master = $this->getMaster()) {
+        if ($this->current === $master = $this->getMasterOrDie()) {
             return $master;
             return $master;
         }
         }
 
 
@@ -183,6 +185,20 @@ class MasterSlaveReplication implements ReplicationInterface
         return $this->master;
         return $this->master;
     }
     }
 
 
+    /**
+     * Returns the connection associated to the master server.
+     *
+     * @return NodeConnectionInterface
+     */
+    private function getMasterOrDie()
+    {
+        if (!$connection = $this->getMaster()) {
+            throw new ClientException('No master server available for replication');
+        }
+
+        return $connection;
+    }
+
     /**
     /**
      * {@inheritdoc}
      * {@inheritdoc}
      */
      */
@@ -248,12 +264,51 @@ class MasterSlaveReplication implements ReplicationInterface
         }
         }
     }
     }
 
 
+    /**
+     * Retries the execution of a command upon slave failure.
+     *
+     * @param CommandInterface $command Command instance.
+     * @param string           $method  Actual method.
+     *
+     * @return mixed
+     */
+    private function retryCommandOnFailure(CommandInterface $command, $method)
+    {
+        RETRY_COMMAND: {
+            try {
+                $response = $this->getConnection($command)->$method($command);
+            } catch (ConnectionException $exception) {
+                $connection = $exception->getConnection();
+                $connection->disconnect();
+
+                if ($connection === $this->master) {
+                    // Throw immediatly if the client was connected to master,
+                    // even when the command represents a read-only operation.
+                    throw $exception;
+                } else {
+                    // Otherwise remove the failing slave and attempt to execute
+                    // the command again on one of the remaining slaves...
+                    $this->remove($connection);
+                }
+
+                // ... that is, unless we have no more connections to use.
+                if (!$this->slaves && !$this->master) {
+                    throw $exception;
+                }
+
+                goto RETRY_COMMAND;
+            }
+        }
+
+        return $response;
+    }
+
     /**
     /**
      * {@inheritdoc}
      * {@inheritdoc}
      */
      */
     public function writeRequest(CommandInterface $command)
     public function writeRequest(CommandInterface $command)
     {
     {
-        $this->getConnection($command)->writeRequest($command);
+        $this->retryCommandOnFailure($command, __FUNCTION__);
     }
     }
 
 
     /**
     /**
@@ -261,7 +316,7 @@ class MasterSlaveReplication implements ReplicationInterface
      */
      */
     public function readResponse(CommandInterface $command)
     public function readResponse(CommandInterface $command)
     {
     {
-        return $this->getConnection($command)->readResponse($command);
+        return $this->retryCommandOnFailure($command, __FUNCTION__);
     }
     }
 
 
     /**
     /**
@@ -269,7 +324,7 @@ class MasterSlaveReplication implements ReplicationInterface
      */
      */
     public function executeCommand(CommandInterface $command)
     public function executeCommand(CommandInterface $command)
     {
     {
-        return $this->getConnection($command)->executeCommand($command);
+        return $this->retryCommandOnFailure($command, __FUNCTION__);
     }
     }
 
 
     /**
     /**

+ 147 - 0
tests/Predis/Connection/Aggregate/MasterSlaveReplicationTest.php

@@ -452,6 +452,153 @@ class MasterSlaveReplicationTest extends PredisTestCase
         $replication->executeCommand($cmdSortStore);
         $replication->executeCommand($cmdSortStore);
     }
     }
 
 
+    /**
+     * @group disconnected
+     */
+    public function testDiscardsUnreachableSlaveAndExecutesReadOnlyCommandOnNextSlave()
+    {
+        $profile = Profile\Factory::getDefault();
+        $cmdExists = $profile->createCommand('exists', array('key'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->never())->method('executeCommand');
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->throwException(new Connection\ConnectionException($slave1)));
+
+        $slave2 = $this->getMockConnection('tcp://host3?alias=slave2');
+        $slave2->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->returnValue(1));
+
+        $replication = new MasterSlaveReplication();
+
+        $replication->add($master);
+        $replication->add($slave1);
+        $replication->add($slave2);
+
+        $replication->switchTo($slave1);
+
+        $response = $replication->executeCommand($cmdExists);
+
+        $this->assertSame(1, $response);
+        $this->assertNull($replication->getConnectionById('slave1'));
+        $this->assertSame($slave2, $replication->getConnectionById('slave2'));
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testDiscardsUnreachableSlavesAndExecutesReadOnlyCommandOnMaster()
+    {
+        $profile = Profile\Factory::getDefault();
+        $cmdExists = $profile->createCommand('exists', array('key'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->returnValue(1));
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->throwException(new Connection\ConnectionException($slave1)));
+
+        $slave2 = $this->getMockConnection('tcp://host3?alias=slave2');
+        $slave2->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->throwException(new Connection\ConnectionException($slave2)));
+
+        $replication = new MasterSlaveReplication();
+
+        $replication->add($master);
+        $replication->add($slave1);
+        $replication->add($slave2);
+
+        $replication->switchTo($slave1);
+
+        $response = $replication->executeCommand($cmdExists);
+
+        $this->assertSame(1, $response);
+        $this->assertNull($replication->getConnectionById('slave1'));
+        $this->assertNull($replication->getConnectionById('slave2'));
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testSucceedOnReadOnlyCommandAndNoConnectionSetAsMaster()
+    {
+        $profile = Profile\Factory::getDefault();
+        $cmdExists = $profile->createCommand('exists', array('key'));
+
+        $slave1 = $this->getMockConnection('tcp://host1?alias=slave1');
+        $slave1->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdExists)
+               ->will($this->returnValue(1));
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($slave1);
+
+        $response = $replication->executeCommand($cmdExists);
+
+        $this->assertSame(1, $response);
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException \Predis\ClientException
+     * @expectedMessage No master server available for replication
+     */
+    public function testFailsOnWriteCommandAndNoConnectionSetAsMaster()
+    {
+        $profile = Profile\Factory::getDefault();
+        $cmdSet = $profile->createCommand('set', array('key', 'value'));
+
+        $slave1 = $this->getMockConnection('tcp://host1?alias=slave1');
+        $slave1->expects($this->never())->method('executeCommand');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($slave1);
+
+        $replication->executeCommand($cmdSet);
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException \Predis\Connection\ConnectionException
+     */
+    public function testFailsOnUnreachableMaster()
+    {
+        $profile = Profile\Factory::getDefault();
+        $cmdSet = $profile->createCommand('set', array('key', 'value'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())
+               ->method('executeCommand')
+               ->with($cmdSet)
+               ->will($this->throwException(new Connection\ConnectionException($master)));
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->never())
+               ->method('executeCommand');
+
+        $replication = new MasterSlaveReplication();
+
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->executeCommand($cmdSet);
+    }
+
     /**
     /**
      * @group disconnected
      * @group disconnected
      * @expectedException \Predis\NotSupportedException
      * @expectedException \Predis\NotSupportedException