Pārlūkot izejas kodu

Various fixes and improvements to redis-cluster connection backend.

List of changes:

  - The cluster connection can be initialized with a partial list of
    nodes, the full slots map will be fetched from Redis itself using
    the CLUSTER NODES command.
  - The slots map can be optionally retrieved from Redis if the server
    returns a -MOVE response, otherwise only the interested slot will
    be permanently reassigned to the new target node.
  - $cluster->connect() connects to a random connection in the pool
    instead of forcing the connect operation on all the connections.
Daniele Alessandri 11 gadi atpakaļ
vecāks
revīzija
1f66f4e8f7

+ 130 - 61
lib/Predis/Connection/RedisCluster.php

@@ -11,6 +11,10 @@
 
 namespace Predis\Connection;
 
+use ArrayIterator;
+use Countable;
+use IteratorAggregate;
+use OutOfBoundsException;
 use Predis\ClientException;
 use Predis\NotSupportedException;
 use Predis\Cluster;
@@ -23,12 +27,12 @@ use Predis\Response;
  *
  * @author Daniele Alessandri <suppakilla@gmail.com>
  */
-class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \Countable
+class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Countable
 {
-    private $pool;
-    private $slots;
+    private $askSlotsMap = false;
+    private $pool = array();
+    private $slots = array();
     private $slotsMap;
-    private $slotsPerNode;
     private $strategy;
     private $connections;
 
@@ -37,8 +41,6 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
      */
     public function __construct(ConnectionFactoryInterface $connections = null)
     {
-        $this->pool = array();
-        $this->slots = array();
         $this->strategy = new Cluster\RedisStrategy();
         $this->connections = $connections ?: new ConnectionFactory();
     }
@@ -62,7 +64,7 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
      */
     public function connect()
     {
-        foreach ($this->pool as $connection) {
+        if ($connection = $this->getRandomConnection()) {
             $connection->connect();
         }
     }
@@ -83,10 +85,7 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
     public function add(SingleConnectionInterface $connection)
     {
         $this->pool[(string) $connection] = $connection;
-        unset(
-            $this->slotsMap,
-            $this->slotsPerNode
-        );
+        unset($this->slotsMap);
     }
 
     /**
@@ -94,11 +93,10 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
      */
     public function remove(SingleConnectionInterface $connection)
     {
-        if (($id = array_search($connection, $this->pool, true)) !== false) {
+        if (false !== $id = array_search($connection, $this->pool, true)) {
             unset(
                 $this->pool[$id],
-                $this->slotsMap,
-                $this->slotsPerNode
+                $this->slotsMap
             );
 
             return true;
@@ -110,16 +108,15 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
     /**
      * Removes a connection instance using its alias or index.
      *
-     * @param string $connectionId Alias or index of a connection.
-     * @return Boolean Returns true if the connection was in the pool.
+     * @param string $connectionID Alias or index of a connection.
+     * @return bool Returns true if the connection was in the pool.
      */
-    public function removeById($connectionId)
+    public function removeById($connectionID)
     {
-        if (isset($this->pool[$connectionId])) {
+        if (isset($this->pool[$connectionID])) {
             unset(
-                $this->pool[$connectionId],
-                $this->slotsMap,
-                $this->slotsPerNode
+                $this->pool[$connectionID],
+                $this->slotsMap
             );
 
             return true;
@@ -130,13 +127,10 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
 
     /**
      * Builds the slots map for the cluster.
-     *
-     * @return array
      */
     public function buildSlotsMap()
     {
         $this->slotsMap = array();
-        $this->slotsPerNode = (int) (16384 / count($this->pool));
 
         foreach ($this->pool as $connectionID => $connection) {
             $parameters = $connection->getParameters();
@@ -145,11 +139,37 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
                 continue;
             }
 
-            list($first, $last) = explode('-', $parameters->slots, 2);
-            $this->setSlots($first, $last, $connectionID);
+            $slots = explode('-', $parameters->slots, 2);
+            $this->setSlots($slots[0], $slots[1], $connectionID);
         }
+    }
 
-        return $this->slotsMap;
+    /**
+     * Builds the slots map for the cluster by asking the current configuration
+     * to one of the nodes in the cluster.
+     */
+    public function askSlotsMap()
+    {
+        if (!$connection = $this->getRandomConnection()) {
+            return array();
+        }
+
+        $cmdCluster = RawCommand::create('CLUSTER', 'NODES');
+        $response = $connection->executeCommand($cmdCluster);
+
+        $nodes = explode("\n", $response, -1);
+        $count = count($nodes);
+
+        for ($i = 0; $i < $count; $i++) {
+            $node = explode(' ', $nodes[$i], 9);
+            $slots = explode('-', $node[8], 2);
+
+            if ($node[1] === ':0') {
+                $this->setSlots($slots[0], $slots[1], (string) $connection);
+            } else {
+                $this->setSlots($slots[0], $slots[1], $node[1]);
+            }
+        }
     }
 
     /**
@@ -169,20 +189,23 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
     /**
      * Preassociate a connection to a set of slots to avoid runtime guessing.
      *
-     * @todo Check type or existence of the specified connection.
-     * @todo Cluster loses the slots assigned with this methods when adding / removing connections.
-     *
      * @param int $first Initial slot.
      * @param int $last Last slot.
      * @param SingleConnectionInterface|string $connection ID or connection instance.
      */
     public function setSlots($first, $last, $connection)
     {
-        if ($first < 0x0000 || $first > 0x3FFF || $last < 0x0000 || $last > 0x3FFF || $last < $first) {
-            throw new \OutOfBoundsException("Invalid slot values for $connection: [$first-$last]");
+        if ($first < 0x0000 || $first > 0x3FFF ||
+            $last < 0x0000 || $last > 0x3FFF ||
+            $last < $first
+        ) {
+            throw new OutOfBoundsException(
+                "Invalid slot range for $connection: [$first-$last]"
+            );
         }
 
-        $this->slotsMap = $this->getSlotsMap() + array_fill($first, $last - $first + 1, (string) $connection);
+        $slots = array_fill($first, $last - $first + 1, (string) $connection);
+        $this->slotsMap = $this->getSlotsMap() + $slots;
     }
 
     /**
@@ -193,18 +216,30 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
         $hash = $this->strategy->getHash($command);
 
         if (!isset($hash)) {
-            throw new NotSupportedException("Cannot use {$command->getId()} with redis-cluster");
+            throw new NotSupportedException(
+                "Cannot use {$command->getId()} with redis-cluster"
+            );
         }
 
         $slot = $hash & 0x3FFF;
 
         if (isset($this->slots[$slot])) {
             return $this->slots[$slot];
+        } else {
+            return $this->getConnectionBySlot($slot);
         }
+    }
 
-        $this->slots[$slot] = $connection = $this->pool[$this->guessNode($slot)];
-
-        return $connection;
+    /**
+     * Returns a random connection from the pool.
+     *
+     * @return SingleConnectionInterface
+     */
+    protected function getRandomConnection()
+    {
+        if ($this->pool) {
+            return $this->pool[array_rand($this->pool)];
+        }
     }
 
     /**
@@ -216,22 +251,36 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
     public function getConnectionBySlot($slot)
     {
         if ($slot < 0x0000 || $slot > 0x3FFF) {
-            throw new \OutOfBoundsException("Invalid slot value [$slot]");
+            throw new OutOfBoundsException("Invalid slot [$slot]");
         }
 
         if (isset($this->slots[$slot])) {
             return $this->slots[$slot];
         }
 
-        return $this->pool[$this->guessNode($slot)];
+        $connectionID = $this->guessNode($slot);
+
+        if (!$connection = $this->getConnectionById($connectionID)) {
+            $host = explode(':', $connectionID, 2);
+            $connection = $this->connections->create(array(
+                'host' => $host[0],
+                'port' => $host[1],
+            ));
+
+            $this->pool[$connectionID] = $connection;
+        }
+
+        return $this->slots[$slot] = $connection;
     }
 
     /**
      * {@inheritdoc}
      */
-    public function getConnectionById($connectionId)
+    public function getConnectionById($connectionID)
     {
-        return isset($this->pool[$connectionId]) ? $this->pool[$connectionId] : null;
+        if (isset($this->pool[$connectionID])) {
+            return $this->pool[$connectionID];
+        }
     }
 
     /**
@@ -251,17 +300,18 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
             return $this->slotsMap[$slot];
         }
 
-        $index = min((int) ($slot / $this->slotsPerNode), count($this->pool) - 1);
+        $count = count($this->pool);
+        $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
         $nodes = array_keys($this->pool);
 
         return $nodes[$index];
     }
 
     /**
-     * Handles -MOVED or -ASK replies by re-executing the command on the server
-     * specified by the Redis reply.
+     * Handles -MOVED and -ASK responses by re-executing the command on the node
+     * specified by the Redis response.
      *
-     * @param CommandInterface $command Command that generated the -MOVE or -ASK reply.
+     * @param CommandInterface $command Command that generated the -MOVE or -ASK response.
      * @param string $request Type of request (either 'MOVED' or 'ASK').
      * @param string $details Parameters of the MOVED/ASK request.
      * @return mixed
@@ -271,21 +321,30 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
         list($slot, $host) = explode(' ', $details, 2);
         $connection = $this->getConnectionById($host);
 
-        if (!isset($connection)) {
-            $parameters = array('host' => null, 'port' => null);
-            list($parameters['host'], $parameters['port']) = explode(':', $host, 2);
-            $connection = $this->connections->create($parameters);
+        if (!$connection) {
+            $host = explode(':', $host, 2);
+
+            $connection = $this->connections->create(array(
+                'host' => $host[0],
+                'port' => $host[1],
+            ));
         }
 
         switch ($request) {
             case 'MOVED':
+                if ($this->askSlotsMap) {
+                    $this->askSlotsMap();
+                }
+
                 $this->move($connection, $slot);
                 $response = $this->executeCommand($command);
+
                 return $response;
 
             case 'ASK':
                 $connection->executeCommand(RawCommand::create('ASKING'));
                 $response = $connection->executeCommand($command);
+
                 return $response;
 
             default:
@@ -329,24 +388,24 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
      */
     public function getIterator()
     {
-        return new \ArrayIterator(array_values($this->pool));
+        return new ArrayIterator(array_values($this->pool));
     }
 
     /**
-     * Handles -ERR replies from Redis.
+     * Handles -ERR responses from Redis.
      *
-     * @param CommandInterface $command Command that generated the -ERR reply.
-     * @param Response\ErrorInterface $error Redis error reply object.
+     * @param CommandInterface $command Command that generated the -ERR response.
+     * @param Response\ErrorInterface $error Redis error response object.
      * @return mixed
      */
-    protected function handleServerError(CommandInterface $command, Response\ErrorInterface $error)
+    protected function onErrorResponse(CommandInterface $command, Response\ErrorInterface $error)
     {
-        list($type, $details) = explode(' ', $error->getMessage(), 2);
+        $details = explode(' ', $error->getMessage(), 2);
 
-        switch ($type) {
+        switch ($details[0]) {
             case 'MOVED':
             case 'ASK':
-                return $this->onMoveRequest($command, $type, $details);
+                return $this->onMoveRequest($command, $details[0], $details[1]);
 
             default:
                 return $error;
@@ -375,12 +434,22 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
     public function executeCommand(CommandInterface $command)
     {
         $connection = $this->getConnection($command);
-        $reply = $connection->executeCommand($command);
+        $response = $connection->executeCommand($command);
 
-        if ($reply instanceof Response\ErrorInterface) {
-            return $this->handleServerError($command, $reply);
+        if ($response instanceof Response\ErrorInterface) {
+            return $this->onErrorResponse($command, $response);
         }
 
-        return $reply;
+        return $response;
+    }
+
+    /**
+     * Instruct the cluster to fetch the slots map from one of the nodes.
+     *
+     * @param bool $value Enable or disable fetching the slots map.
+     */
+    public function setAskSlotsMap($value)
+    {
+        $this->askSlotsMap = (bool) $value;
     }
 }

+ 34 - 3
tests/Predis/Connection/RedisClusterTest.php

@@ -106,19 +106,50 @@ class RedisClusterTest extends StandardTestCase
     /**
      * @group disconnected
      */
-    public function testConnectForcesAllConnectionsToConnect()
+    public function testConnectPicksRandomConnection()
     {
+        $connect1 = false;
+        $connect2 = false;
+
         $connection1 = $this->getMockConnection('tcp://127.0.0.1:6379');
-        $connection1->expects($this->once())->method('connect');
+        $connection1->expects($this->any())
+                    ->method('connect')
+                    ->will($this->returnCallback(function () use (&$connect1) {
+                        $connect1 = true;
+                    }));
+        $connection1->expects($this->any())
+                    ->method('isConnected')
+                    ->will($this->returnCallback(function () use (&$connect1) {
+                        return $connect1;
+                    }));
 
         $connection2 = $this->getMockConnection('tcp://127.0.0.1:6380');
-        $connection2->expects($this->once())->method('connect');
+        $connection2->expects($this->any())
+                    ->method('connect')
+                    ->will($this->returnCallback(function () use (&$connect2) {
+                        $connect2 = true;
+                    }));
+        $connection2->expects($this->any())
+                    ->method('isConnected')
+                    ->will($this->returnCallback(function () use (&$connect2) {
+                        return $connect2;
+                    }));
 
         $cluster = new RedisCluster();
         $cluster->add($connection1);
         $cluster->add($connection2);
 
         $cluster->connect();
+
+        $this->assertTrue($cluster->isConnected());
+
+        if ($connect1) {
+            $this->assertTrue($connect1);
+            $this->assertFalse($connect2);
+        } else {
+            $this->assertFalse($connect1);
+            $this->assertTrue($connect2);
+        }
     }
 
     /**