Просмотр исходного кода

Fix fetching slots map from unreachable nodes.

When various nodes in the configuration are unreachable while trying
to send a command, we should attempt to contact a reachable node to
fetch an updated slots map up to $retryLimit times or until there are
no more servers in the pool before giving up.

It is possible that the slots map fetched from Redis contains stale
data and points to a dead server, this happens when the nodes still
have to agree that a master server is down before promoting a slave
to the role of master. In this case no further attempts to execute
the command are performed and an exception is thrown.

This still needs some more testing and will delay v1.0.4 a few days
past its scheduled release.
Daniele Alessandri 8 лет назад
Родитель
Сommit
2d01a27e17

+ 70 - 7
src/Connection/Aggregate/RedisCluster.php

@@ -11,6 +11,7 @@
 
 namespace Predis\Connection\Aggregate;
 
+use Predis\ClientException;
 use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
 use Predis\Cluster\StrategyInterface;
 use Predis\Command\CommandInterface;
@@ -194,6 +195,47 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
         }
     }
 
+    /**
+     * Queries the specified node of the cluster to fetch the updated slots map.
+     *
+     * When the connection fails, this method tries to execute the same command
+     * on a different connection picked at random from the pool of known nodes,
+     * up until the retry limit is reached.
+     *
+     * @param NodeConnectionInterface $connection Connection to a node of the cluster.
+     *
+     * @return mixed
+     */
+    private function queryClusterNodeForSlotsMap(NodeConnectionInterface $connection)
+    {
+        $retries = 0;
+        $command = RawCommand::create('CLUSTER', 'SLOTS');
+
+        RETRY_COMMAND: {
+            try {
+                $response = $connection->executeCommand($command);
+            } catch (ConnectionException $exception) {
+                $connection = $exception->getConnection();
+                $connection->disconnect();
+
+                $this->remove($connection);
+
+                if ($retries === $this->retryLimit) {
+                    throw $exception;
+                }
+
+                if (!$connection = $this->getRandomConnection()) {
+                    throw new ClientException('No connections left in the pool for `CLUSTER SLOTS`');
+                }
+
+                ++$retries;
+                goto RETRY_COMMAND;
+            }
+        }
+
+        return $response;
+    }
+
     /**
      * Generates an updated slots map fetching the cluster configuration using
      * the CLUSTER SLOTS command against the specified node or a random one from
@@ -209,8 +251,9 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
             return array();
         }
 
-        $command = RawCommand::create('CLUSTER', 'SLOTS');
-        $response = $connection->executeCommand($command);
+        $this->resetSlotsMap();
+
+        $response = $this->queryClusterNodeForSlotsMap($connection);
 
         foreach ($response as $slots) {
             // We only support master servers for now, so we ignore subsequent
@@ -227,6 +270,14 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
         return $this->slotsMap;
     }
 
+    /**
+     * Resets the slots map cache.
+     */
+    public function resetSlotsMap()
+    {
+        $this->slotsMap = array();
+    }
+
     /**
      * Returns the current slots map for the cluster.
      *
@@ -278,6 +329,10 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
      */
     protected function guessNode($slot)
     {
+        if (!$this->pool) {
+            throw new ClientException('No connections available in the pool');
+        }
+
         if (!isset($this->slotsMap)) {
             $this->buildSlotsMap();
         }
@@ -469,7 +524,13 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
     }
 
     /**
-     * Retries the execution of a command when  failure.
+     * Ensures that a command is executed one more time on connection failure.
+     *
+     * The connection to the node that generated the error is evicted from the
+     * pool before trying to fetch an updated slots map from another node. If
+     * the new slots map points to an unreachable server the client gives up and
+     * throws the exception as the nodes participating in the cluster may still
+     * have to agree that something changed in the configuration of the cluster.
      *
      * @param CommandInterface $command Command instance.
      * @param string           $method  Actual method.
@@ -478,7 +539,7 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
      */
     private function retryCommandOnFailure(CommandInterface $command, $method)
     {
-        $retries = 0;
+        $failure = false;
 
         RETRY_COMMAND: {
             try {
@@ -488,13 +549,15 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
                 $connection->disconnect();
 
                 $this->remove($connection);
-                $this->askSlotsMap();
 
-                if ($retries === $this->retryLimit) {
+                if ($failure) {
                     throw $exception;
+                } elseif ($this->useClusterSlots) {
+                    $this->askSlotsMap();
                 }
 
-                ++$retries;
+                $failure = true;
+
                 goto RETRY_COMMAND;
             }
         }

+ 208 - 11
tests/Predis/Connection/Aggregate/RedisClusterTest.php

@@ -489,28 +489,25 @@ class RedisClusterTest extends PredisTestCase
     /**
      * @group disconnected
      */
-    public function testRetriesExecutingCommandAfterFetchingNewSlotsMapOnConnectionFailure()
+    public function testRetriesExecutingCommandOnConnectionFailureOnlyAfterFetchingNewSlotsMap()
     {
         $slotsmap = array(
-            array(0, 5500, array('127.0.0.1', 9381), array()),
-            array(5501, 11000, array('127.0.0.1', 6382), array()),
-            array(1101, 16383, array('127.0.0.1', 6383), array()),
+            array(0, 5460, array('127.0.0.1', 9381), array()),
+            array(5461, 10921, array('127.0.0.1', 6382), array()),
+            array(10922, 16383, array('127.0.0.1', 6383), array()),
         );
-        $connection1 = $this->getMockConnection('tcp://127.0.0.1:6379?slots=0-1364');
-        $connection2 = $this->getMockConnection('tcp://127.0.0.1:6380?slots=1365-2729');
-        $connection3 = $this->getMockConnection('tcp://127.0.0.1:6381?slots=2730-4095');
 
-        $connection1 = $this->getMockConnection('tcp://127.0.0.1:6381');
+        $connection1 = $this->getMockConnection('tcp://127.0.0.1:6381?slots=0-5460');
         $connection1->expects($this->once())
                     ->method('executeCommand')
                     ->with($this->isRedisCommand(
                         'GET', array('node:1001')
                     ))
                     ->will($this->throwException(
-                        new Connection\ConnectionException($connection1, 'Unknown connection error [127.0.0.1:6382]')
+                        new Connection\ConnectionException($connection1, 'Unknown connection error [127.0.0.1:6381]')
                     ));
 
-        $connection2 = $this->getMockConnection('tcp://127.0.0.1:6382');
+        $connection2 = $this->getMockConnection('tcp://127.0.0.1:6382?slots=5461-10921');
         $connection2->expects($this->any())
                     ->method('executeCommand')
                     ->with($this->isRedisCommand(
@@ -518,7 +515,7 @@ class RedisClusterTest extends PredisTestCase
                     ))
                     ->will($this->returnValue($slotsmap));
 
-        $connection3 = $this->getMockConnection('tcp://127.0.0.1:6383');
+        $connection3 = $this->getMockConnection('tcp://127.0.0.1:6383?slots=10922-16383');
         $connection3->expects($this->any())
                     ->method('executeCommand')
                     ->with($this->isRedisCommand(
@@ -564,6 +561,206 @@ class RedisClusterTest extends PredisTestCase
         ));
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testRetriesExecutingCommandOnConnectionFailureButDoNotAskSlotsMapWhenDisabled()
+    {
+        $connection1 = $this->getMockConnection('tcp://127.0.0.1:6381?slots=0-5500');
+        $connection1->expects($this->once())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'GET', array('node:1001')
+                    ))
+                    ->will($this->throwException(
+                        new Connection\ConnectionException($connection1, 'Unknown connection error [127.0.0.1:6381]')
+                    ));
+
+        $connection2 = $this->getMockConnection('tcp://127.0.0.1:6382?slots=5501-11000');
+        $connection2->expects($this->once())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'GET', array('node:1001')
+                    ))
+                    ->will($this->returnValue(
+                        new Response\Error('MOVED 1970 127.0.0.1:9381')
+                    ));
+
+        $connection3 = $this->getMockConnection('tcp://127.0.0.1:6383?slots=11101-16383');
+        $connection3->expects($this->never())
+                    ->method('executeCommand');
+
+        $connection4 = $this->getMockConnection('tcp://127.0.0.1:9381');
+        $connection4->expects($this->once())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'GET', array('node:1001')
+                    ))
+                    ->will($this->returnValue('value:1001'));
+
+        $factory = $this->getMock('Predis\Connection\FactoryInterface');
+        $factory->expects($this->once())
+                 ->method('create')
+                 ->with(array(
+                    'host' => '127.0.0.1',
+                    'port' => '9381',
+                  ))
+                 ->will($this->returnValue($connection4));
+
+        // TODO: I'm not sure about mocking a protected method, but it'll do for now
+        $cluster = $this->getMock('Predis\Connection\Aggregate\RedisCluster', array('getRandomConnection'), array($factory));
+        $cluster->expects($this->never())
+                ->method('getRandomConnection');
+
+        $cluster->useClusterSlots(false);
+
+        $cluster->add($connection1);
+        $cluster->add($connection2);
+        $cluster->add($connection3);
+
+        $this->assertSame('value:1001', $cluster->executeCommand(
+            Command\RawCommand::create('get', 'node:1001')
+        ));
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException \Predis\ClientException
+     * @expectedExceptionMessage No connections available in the pool
+     */
+    public function testThrowsClientExceptionWhenExecutingCommandWithEmptyPool()
+    {
+        $factory = $this->getMock('Predis\Connection\FactoryInterface');
+        $factory->expects($this->never())->method('create');
+
+        $cluster = new RedisCluster($factory);
+
+        $cluster->executeCommand(Command\RawCommand::create('get', 'node:1001'));
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testAskSlotsMapReturnEmptyArrayOnEmptyConnectionsPool()
+    {
+        $factory = $this->getMock('Predis\Connection\FactoryInterface');
+        $factory->expects($this->never())->method('create');
+
+        $cluster = new RedisCluster($factory);
+
+        $this->assertEmpty($cluster->askSlotsMap());
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testAskSlotsMapRetriesOnDifferentNodeOnConnectionFailure()
+    {
+        $slotsmap = array(
+            array(0, 5460, array('127.0.0.1', 9381), array()),
+            array(5461, 10921, array('127.0.0.1', 6382), array()),
+            array(10922, 16383, array('127.0.0.1', 6383), array()),
+        );
+
+        $connection1 = $this->getMockConnection('tcp://127.0.0.1:6381?slots=0-5460');
+        $connection1->expects($this->once())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'CLUSTER', array('SLOTS')
+                    ))
+                    ->will($this->throwException(
+                        new Connection\ConnectionException($connection1, 'Unknown connection error [127.0.0.1:6381]')
+                    ));
+
+        $connection2 = $this->getMockConnection('tcp://127.0.0.1:6382?slots=5461-10921');
+        $connection2->expects($this->once())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'CLUSTER', array('SLOTS')
+                    ))
+                    ->will($this->throwException(
+                        new Connection\ConnectionException($connection2, 'Unknown connection error [127.0.0.1:6383]')
+                    ));
+
+        $connection3 = $this->getMockConnection('tcp://127.0.0.1:6383?slots=10922-16383');
+        $connection3->expects($this->once())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'CLUSTER', array('SLOTS')
+                    ))
+                    ->will($this->returnValue($slotsmap));
+
+        $factory = $this->getMock('Predis\Connection\FactoryInterface');
+        $factory->expects($this->never())->method('create');
+
+        // TODO: I'm not sure about mocking a protected method, but it'll do for now
+        $cluster = $this->getMock('Predis\Connection\Aggregate\RedisCluster', array('getRandomConnection'), array($factory));
+        $cluster->expects($this->exactly(3))
+                ->method('getRandomConnection')
+                ->will($this->onConsecutiveCalls($connection1, $connection2, $connection3));
+
+        $cluster->add($connection1);
+        $cluster->add($connection2);
+        $cluster->add($connection3);
+
+        $this->assertCount(16384, $cluster->askSlotsMap());
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException \Predis\Connection\ConnectionException
+     * @expectedExceptionMessage Unknown connection error [127.0.0.1:6382]
+     */
+    public function testAskSlotsMapHonorsRetryLimitOnMultipleConnectionFailures()
+    {
+        $slotsmap = array(
+            array(0, 5460, array('127.0.0.1', 9381), array()),
+            array(5461, 10921, array('127.0.0.1', 6382), array()),
+            array(10922, 16383, array('127.0.0.1', 6383), array()),
+        );
+
+        $connection1 = $this->getMockConnection('tcp://127.0.0.1:6381?slots=0-5460');
+        $connection1->expects($this->any())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'CLUSTER', array('SLOTS')
+                    ))
+                    ->will($this->throwException(
+                        new Connection\ConnectionException($connection1, 'Unknown connection error [127.0.0.1:6381]')
+                    ));
+
+        $connection2 = $this->getMockConnection('tcp://127.0.0.1:6382?slots=5461-10921');
+        $connection2->expects($this->any())
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand(
+                        'CLUSTER', array('SLOTS')
+                    ))
+                    ->will($this->throwException(
+                        new Connection\ConnectionException($connection2, 'Unknown connection error [127.0.0.1:6382]')
+                    ));
+
+        $connection3 = $this->getMockConnection('tcp://127.0.0.1:6383?slots=10922-16383');
+        $connection3->expects($this->never())
+                    ->method('executeCommand');
+
+        $factory = $this->getMock('Predis\Connection\FactoryInterface');
+        $factory->expects($this->never())->method('create');
+
+        // TODO: I'm not sure about mocking a protected method, but it'll do for now
+        $cluster = $this->getMock('Predis\Connection\Aggregate\RedisCluster', array('getRandomConnection'), array($factory));
+        $cluster->expects($this->exactly(2))
+                ->method('getRandomConnection')
+                ->will($this->onConsecutiveCalls($connection1, $connection2));
+
+        $cluster->add($connection1);
+        $cluster->add($connection2);
+        $cluster->add($connection3);
+
+        $cluster->setRetryLimit(1);
+
+        $cluster->askSlotsMap();
+    }
+
     /**
      * @group disconnected
      */