Преглед изворни кода

Try again on connection failure to node in cluster.

When the connection to a node in the cluster fails in the attempt to
execute a command, Predis now removes the failed connection from the
cluster pool and contacts a random node to ask for a fresh slots map
and tries to execute the command once again.

When the cluster is configured to have each master replicated to one
or more slaves, one the slaves is automatically promoted to the role
of master by redis-cluster with this change being reflected in the
output of CLUSTER SLOTS, so the next execution should run just fine.

Our current approach is relatively naive as CLUSTER SLOTS is executed
against a random master node, meaning that the client must open a new
connection and execute one more roundtrip only to fetch the new slots
map. For now it is enough, it is still better than having the client
fail when you actually have somes slaves in your redis-cluster setup,
but one improvement could consist in caching the list of slaves for
each master returned in the response of CLUSTER SLOTS so that when a
connection fails the client can try to guess which connection should
use for the next attempt.

This commit closes #173, closes #215, and closes #314.
Daniele Alessandri пре 8 година
родитељ
комит
ffc1c25baa

+ 53 - 4
src/Connection/Aggregate/RedisCluster.php

@@ -15,6 +15,7 @@ use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
 use Predis\Cluster\StrategyInterface;
 use Predis\Command\CommandInterface;
 use Predis\Command\RawCommand;
+use Predis\Connection\ConnectionException;
 use Predis\Connection\FactoryInterface;
 use Predis\Connection\NodeConnectionInterface;
 use Predis\NotSupportedException;
@@ -51,6 +52,7 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
     private $slotsMap;
     private $strategy;
     private $connections;
+    private $retryLimit = 5;
 
     /**
      * @param FactoryInterface  $connections Optional connection factory.
@@ -64,6 +66,20 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
         $this->strategy = $strategy ?: new RedisClusterStrategy();
     }
 
+    /**
+     * Sets the maximum number of retries for commands upon server failure.
+     *
+     * -1 = unlimited retry attempts
+     *  0 = no retry attempts (fails immediatly)
+     *  n = fail only after n retry attempts
+     *
+     * @param int $retry Number of retry attempts.
+     */
+    public function setRetryLimit($retry)
+    {
+        $this->retryLimit = (int) $retry;
+    }
+
     /**
      * {@inheritdoc}
      */
@@ -457,12 +473,46 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
         return $response;
     }
 
+    /**
+     * Retries the execution of a command when  failure.
+     *
+     * @param CommandInterface $command Command instance.
+     * @param string           $method  Actual method.
+     *
+     * @return mixed
+     */
+    private function retryCommandOnFailure(CommandInterface $command, $method)
+    {
+        $retries = 0;
+
+        RETRY_COMMAND: {
+            try {
+                $response = $this->getConnection($command)->$method($command);
+            } catch (ConnectionException $exception) {
+                $connection = $exception->getConnection();
+                $connection->disconnect();
+
+                $this->remove($connection);
+                $this->askSlotsMap();
+
+                if ($retries === $this->retryLimit) {
+                    throw $exception;
+                }
+
+                ++$retries;
+                goto RETRY_COMMAND;
+            }
+        }
+
+        return $response;
+    }
+
     /**
      * {@inheritdoc}
      */
     public function writeRequest(CommandInterface $command)
     {
-        $this->getConnection($command)->writeRequest($command);
+        $this->retryCommandOnFailure($command, __FUNCTION__);
     }
 
     /**
@@ -470,7 +520,7 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
      */
     public function readResponse(CommandInterface $command)
     {
-        return $this->getConnection($command)->readResponse($command);
+        return $this->retryCommandOnFailure($command, __FUNCTION__);
     }
 
     /**
@@ -478,8 +528,7 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
      */
     public function executeCommand(CommandInterface $command)
     {
-        $connection = $this->getConnection($command);
-        $response = $connection->executeCommand($command);
+        $response = $this->retryCommandOnFailure($command, __FUNCTION__);
 
         if ($response instanceof ErrorResponseInterface) {
             return $this->onErrorResponse($command, $response);

+ 78 - 0
tests/Predis/Connection/Aggregate/RedisClusterTest.php

@@ -486,6 +486,84 @@ class RedisClusterTest extends PredisTestCase
         $cluster->readResponse($command);
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testRetriesExecutingCommandAfterFetchingNewSlotsMapOnConnectionFailure()
+    {
+        $slotsmap = array(
+            array(0, 5500, array('127.0.0.1', 9381), array()),
+            array(5501, 11000, array('127.0.0.1', 6382), array()),
+            array(1101, 16383, array('127.0.0.1', 6383), array()),
+        );
+        $connection1 = $this->getMockConnection('tcp://127.0.0.1:6379?slots=0-1364');
+        $connection2 = $this->getMockConnection('tcp://127.0.0.1:6380?slots=1365-2729');
+        $connection3 = $this->getMockConnection('tcp://127.0.0.1:6381?slots=2730-4095');
+
+        $connection1 = $this->getMockConnection('tcp://127.0.0.1:6381');
+        $connection1->expects($this->once())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'GET', array('node:1001')
+                    ))
+                    ->will($this->throwException(
+                        new Connection\ConnectionException($connection1, "Unknown connection error [127.0.0.1:6382]")
+                    ));
+
+        $connection2 = $this->getMockConnection('tcp://127.0.0.1:6382');
+        $connection2->expects($this->any())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'CLUSTER', array('SLOTS')
+                    ))
+                    ->will($this->returnValue($slotsmap));
+
+        $connection3 = $this->getMockConnection('tcp://127.0.0.1:6383');
+        $connection3->expects($this->any())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'CLUSTER', array('SLOTS')
+                    ))
+                    ->will($this->returnValue($slotsmap));
+
+        $connection4 = $this->getMockConnection('tcp://127.0.0.1:9381');
+        $connection4->expects($this->at(0))
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'GET', array('node:1001')
+                    ))
+                    ->will($this->returnValue('value:1001'));
+        $connection4->expects($this->at(1))
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'GET', array('node:5001')
+                    ))
+                    ->will($this->returnValue('value:5001'));
+
+        $factory = $this->getMock('Predis\Connection\FactoryInterface');
+        $factory->expects($this->once())
+                 ->method('create')
+                 ->with(array(
+                    'host' => '127.0.0.1',
+                    'port' => '9381',
+                  ))
+                 ->will($this->returnValue($connection4));
+
+        $cluster = new RedisCluster($factory);
+
+        $cluster->add($connection1);
+        $cluster->add($connection2);
+        $cluster->add($connection3);
+
+        $this->assertSame('value:1001', $cluster->executeCommand(
+            Command\RawCommand::create('get', 'node:1001')
+        ));
+
+        $this->assertSame('value:5001', $cluster->executeCommand(
+            Command\RawCommand::create('get', 'node:5001')
+        ));
+    }
+
     /**
      * @group disconnected
      */