浏览代码

Backport improvements for redis-cluster from master (v0.9).

List of changes:

  - The cluster connection sends an ASKING command before retrying
    a command on a different node when Redis returns a -ASK response.
  - The cluster connection can be initialized with a partial list of
    nodes, the full slots map will be fetched from Redis itself using
    the CLUSTER NODES command.
  - The slots map can be optionally retrieved from Redis if the server
    returns a -MOVE response, otherwise only the interested slot will
    be permanently reassigned to the new target node.
  - It is possible to specify a set of common parameters applied to
    connections created on the fly when not part of the current pool
    upon -MOVE and -ASK responses returned by Redis for redirections.
  - $cluster->connect() connects to a random connection in the pool
    instead of forcing the connect operation on all the connections.
Daniele Alessandri 11 年之前
父节点
当前提交
73780da52d
共有 3 个文件被更改,包括 302 次插入128 次删除
  1. 10 0
      CHANGELOG.md
  2. 250 123
      lib/Predis/Connection/RedisCluster.php
  3. 42 5
      tests/Predis/Connection/RedisClusterTest.php

+ 10 - 0
CHANGELOG.md

@@ -19,6 +19,16 @@ v0.8.5 (2013-xx-xx)
   responses are not parsed, which means arguments must follow the signature of
   the command as defined by Redis and complex responses are left untouched.
 
+- Various improvements and fixes to the redis-cluster connection backend:
+
+    - __FIX__: the `ASKING` command is sent upon -ASK redirections.
+    - An updated slots-map can be fetched from nodes using the `CLUSTER NODES`
+      command. By default this is a manual operation but can be enabled to get
+      automatically done upon -MOVED redirections.
+    - It is possible to specify a common set of connection parameters that are
+      applied to connections created on the fly upon redirections to nodes not
+      part of the initial pool.
+
 - List of deprecated methods:
 
     - `Predis\Client::multiExec()`: superseded by `Predis\Client::transaction()`

+ 250 - 123
lib/Predis/Connection/RedisCluster.php

@@ -11,24 +11,47 @@
 
 namespace Predis\Connection;
 
-use Predis\ClientException;
-use Predis\Cluster\CommandHashStrategyInterface;
+use ArrayIterator;
+use Countable;
+use IteratorAggregate;
+use OutOfBoundsException;
 use Predis\NotSupportedException;
 use Predis\ResponseErrorInterface;
+use Predis\Cluster\CommandHashStrategyInterface;
 use Predis\Cluster\RedisClusterHashStrategy;
 use Predis\Command\CommandInterface;
+use Predis\Command\RawCommand;
+use Predis\Protocol\ProtocolException;
 
 /**
- * Abstraction for Redis cluster (Redis v3.0).
+ * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
+ *
+ * This connection backend offers smart support for redis-cluster by handling
+ * automatic slots map (re)generation upon -MOVE or -ASK responses returned by
+ * Redis when redirecting a client to a different node.
+ *
+ * The cluster can be pre-initialized using only a subset of the actual nodes in
+ * the cluster, Predis will do the rest by adjusting the slots map and creating
+ * the missing underlying connection instances on the fly.
+ *
+ * It is possible to pre-associate connections to a slots range with the "slots"
+ * parameter in the form "$first-$last". This can greatly reduce runtime node
+ * guessing and redirections.
+ *
+ * It is also possible to ask for the full and updated slots map directly to one
+ * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
+ * Asking for the cluster configuration to Redis is actually done by issuing a
+ * CLUSTER NODES command to a random node in the pool.
  *
  * @author Daniele Alessandri <suppakilla@gmail.com>
  */
-class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \Countable
+class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Countable
 {
-    private $pool;
-    private $slots;
+    private $askClusterNodes = false;
+    private $defaultParameters = array();
+    private $pool = array();
+    private $slots = array();
     private $slotsMap;
-    private $slotsPerNode;
     private $strategy;
     private $connections;
 
@@ -37,8 +60,6 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
      */
     public function __construct(ConnectionFactoryInterface $connections = null)
     {
-        $this->pool = array();
-        $this->slots = array();
         $this->strategy = new RedisClusterHashStrategy();
         $this->connections = $connections ?: new ConnectionFactory();
     }
@@ -62,7 +83,7 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
      */
     public function connect()
     {
-        foreach ($this->pool as $connection) {
+        if ($connection = $this->getRandomConnection()) {
             $connection->connect();
         }
     }
@@ -83,10 +104,7 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
     public function add(SingleConnectionInterface $connection)
     {
         $this->pool[(string) $connection] = $connection;
-        unset(
-            $this->slotsMap,
-            $this->slotsPerNode
-        );
+        unset($this->slotsMap);
     }
 
     /**
@@ -94,11 +112,10 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
      */
     public function remove(SingleConnectionInterface $connection)
     {
-        if (($id = array_search($connection, $this->pool, true)) !== false) {
+        if (false !== $id = array_search($connection, $this->pool, true)) {
             unset(
                 $this->pool[$id],
-                $this->slotsMap,
-                $this->slotsPerNode
+                $this->slotsMap
             );
 
             return true;
@@ -108,18 +125,17 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
     }
 
     /**
-     * Removes a connection instance using its alias or index.
+     * Removes a connection instance by using its identifier.
      *
-     * @param string $connectionId Alias or index of a connection.
-     * @return Boolean Returns true if the connection was in the pool.
+     * @param string $connectionID Connection identifier.
+     * @return bool True if the connection was in the pool.
      */
-    public function removeById($connectionId)
+    public function removeById($connectionID)
     {
-        if (isset($this->pool[$connectionId])) {
+        if (isset($this->pool[$connectionID])) {
             unset(
-                $this->pool[$connectionId],
-                $this->slotsMap,
-                $this->slotsPerNode
+                $this->pool[$connectionID],
+                $this->slotsMap
             );
 
             return true;
@@ -129,14 +145,17 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
     }
 
     /**
-     * Builds the slots map for the cluster.
+     * Generates the current slots map by guessing the cluster configuration out
+     * of the connection parameters of the connections in the pool.
      *
-     * @return array
+     * Generation is based on the same algorithm used by Redis to generate the
+     * cluster, so it is most effective when all of the connections supplied on
+     * initialization have the "slots" parameter properly set accordingly to the
+     * current cluster configuration.
      */
     public function buildSlotsMap()
     {
         $this->slotsMap = array();
-        $this->slotsPerNode = (int) (16384 / count($this->pool));
 
         foreach ($this->pool as $connectionID => $connection) {
             $parameters = $connection->getParameters();
@@ -145,11 +164,37 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
                 continue;
             }
 
-            list($first, $last) = explode('-', $parameters->slots, 2);
-            $this->setSlots($first, $last, $connectionID);
+            $slots = explode('-', $parameters->slots, 2);
+            $this->setSlots($slots[0], $slots[1], $connectionID);
         }
+    }
 
-        return $this->slotsMap;
+    /**
+     * Generates the current slots map by fetching the cluster configuration to
+     * one of the nodes by leveraging the CLUSTER NODES command.
+     */
+    public function askClusterNodes()
+    {
+        if (!$connection = $this->getRandomConnection()) {
+            return array();
+        }
+
+        $cmdCluster = RawCommand::create('CLUSTER', 'NODES');
+        $response = $connection->executeCommand($cmdCluster);
+
+        $nodes = explode("\n", $response, -1);
+        $count = count($nodes);
+
+        for ($i = 0; $i < $count; $i++) {
+            $node = explode(' ', $nodes[$i], 9);
+            $slots = explode('-', $node[8], 2);
+
+            if ($node[1] === ':0') {
+                $this->setSlots($slots[0], $slots[1], (string) $connection);
+            } else {
+                $this->setSlots($slots[0], $slots[1], $node[1]);
+            }
+        }
     }
 
     /**
@@ -167,22 +212,50 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
     }
 
     /**
-     * Preassociate a connection to a set of slots to avoid runtime guessing.
+     * Pre-associates a connection to a slots range to avoid runtime guessing.
      *
-     * @todo Check type or existence of the specified connection.
-     * @todo Cluster loses the slots assigned with this methods when adding / removing connections.
-     *
-     * @param int $first Initial slot.
-     * @param int $last Last slot.
+     * @param int $first Initial slot of the range.
+     * @param int $last Last slot of the range.
      * @param SingleConnectionInterface|string $connection ID or connection instance.
      */
     public function setSlots($first, $last, $connection)
     {
-        if ($first < 0x0000 || $first > 0x3FFF || $last < 0x0000 || $last > 0x3FFF || $last < $first) {
-            throw new \OutOfBoundsException("Invalid slot values for $connection: [$first-$last]");
+        if ($first < 0x0000 || $first > 0x3FFF ||
+            $last < 0x0000 || $last > 0x3FFF ||
+            $last < $first
+        ) {
+            throw new OutOfBoundsException(
+                "Invalid slot range for $connection: [$first-$last]"
+            );
+        }
+
+        $slots = array_fill($first, $last - $first + 1, (string) $connection);
+        $this->slotsMap = $this->getSlotsMap() + $slots;
+    }
+
+    /**
+     * Guesses the correct node associated to a given slot using a precalculated
+     * slots map, falling back to the same logic used by Redis to initialize a
+     * cluster (best-effort).
+     *
+     * @param int $slot Slot index.
+     * @return string Connection ID.
+     */
+    protected function guessNode($slot)
+    {
+        if (!isset($this->slotsMap)) {
+            $this->buildSlotsMap();
+        }
+
+        if (isset($this->slotsMap[$slot])) {
+            return $this->slotsMap[$slot];
         }
 
-        $this->slotsMap = $this->getSlotsMap() + array_fill($first, $last - $first + 1, (string) $connection);
+        $count = count($this->pool);
+        $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
+        $nodes = array_keys($this->pool);
+
+        return $nodes[$index];
     }
 
     /**
@@ -193,75 +266,113 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
         $hash = $this->strategy->getHash($command);
 
         if (!isset($hash)) {
-            throw new NotSupportedException("Cannot use {$command->getId()} with redis-cluster");
+            throw new NotSupportedException(
+                "Cannot use {$command->getId()} with redis-cluster"
+            );
         }
 
         $slot = $hash & 0x3FFF;
 
         if (isset($this->slots[$slot])) {
             return $this->slots[$slot];
+        } else {
+            return $this->getConnectionBySlot($slot);
         }
-
-        $this->slots[$slot] = $connection = $this->pool[$this->guessNode($slot)];
-
-        return $connection;
     }
 
     /**
-     * Returns the connection associated to the specified slot.
+     * Returns the connection currently associated to a given slot.
      *
-     * @param int $slot Slot ID.
+     * @param int $slot Slot index.
      * @return SingleConnectionInterface
      */
     public function getConnectionBySlot($slot)
     {
         if ($slot < 0x0000 || $slot > 0x3FFF) {
-            throw new \OutOfBoundsException("Invalid slot value [$slot]");
+            throw new OutOfBoundsException("Invalid slot [$slot]");
         }
 
         if (isset($this->slots[$slot])) {
             return $this->slots[$slot];
         }
 
-        return $this->pool[$this->guessNode($slot)];
+        $connectionID = $this->guessNode($slot);
+
+        if (!$connection = $this->getConnectionById($connectionID)) {
+            $host = explode(':', $connectionID, 2);
+            $parameters = array_merge($this->defaultParameters, array(
+                'host' => $host[0],
+                'port' => $host[1],
+            ));
+
+            $connection = $this->connections->create($parameters);
+            $this->pool[$connectionID] = $connection;
+        }
+
+        return $this->slots[$slot] = $connection;
     }
 
     /**
      * {@inheritdoc}
      */
-    public function getConnectionById($connectionId)
+    public function getConnectionById($connectionID)
     {
-        return isset($this->pool[$connectionId]) ? $this->pool[$connectionId] : null;
+        if (isset($this->pool[$connectionID])) {
+            return $this->pool[$connectionID];
+        }
     }
 
     /**
-     * Tries guessing the correct node associated to the given slot using a precalculated
-     * slots map or the same logic used by redis-trib to initialize a redis cluster.
+     * Returns a random connection from the pool.
      *
-     * @param int $slot Slot ID.
-     * @return string
+     * @return SingleConnectionInterface
      */
-    protected function guessNode($slot)
+    protected function getRandomConnection()
     {
-        if (!isset($this->slotsMap)) {
-            $this->buildSlotsMap();
+        if ($this->pool) {
+            return $this->pool[array_rand($this->pool)];
         }
+    }
 
-        if (isset($this->slotsMap[$slot])) {
-            return $this->slotsMap[$slot];
-        }
+    /**
+     * Permanently associates the connection instance to a new slot.
+     * The connection is added to the connections pool if not yet included.
+     *
+     * @param SingleConnectionInterface $connection Connection instance.
+     * @param int $slot Target slot index.
+     */
+    protected function move(SingleConnectionInterface $connection, $slot)
+    {
+        $this->pool[(string) $connection] = $connection;
+        $this->slots[(int) $slot] = $connection;
+    }
 
-        $index = min((int) ($slot / $this->slotsPerNode), count($this->pool) - 1);
-        $nodes = array_keys($this->pool);
+    /**
+     * Handles -ERR responses from Redis.
+     *
+     * @param CommandInterface $command Command that generated the -ERR response.
+     * @param ResponseErrorInterface $error Redis error response object.
+     * @return mixed
+     */
+    protected function onErrorResponse(CommandInterface $command, ResponseErrorInterface $error)
+    {
+        $details = explode(' ', $error->getMessage(), 2);
 
-        return $nodes[$index];
+        switch ($details[0]) {
+            case 'MOVED':
+            case 'ASK':
+                return $this->onMoveRequest($command, $details[0], $details[1]);
+
+            default:
+                return $error;
+        }
     }
 
     /**
-     * Handles -MOVED or -ASK replies by re-executing the command on the server
-     * specified by the Redis reply.
+     * Handles -MOVED and -ASK responses by re-executing the command on the node
+     * specified by the Redis response.
      *
-     * @param CommandInterface $command Command that generated the -MOVE or -ASK reply.
+     * @param CommandInterface $command Command that generated the -MOVE or -ASK response.
      * @param string $request Type of request (either 'MOVED' or 'ASK').
      * @param string $details Parameters of the MOVED/ASK request.
      * @return mixed
@@ -271,114 +382,130 @@ class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \C
         list($slot, $host) = explode(' ', $details, 2);
         $connection = $this->getConnectionById($host);
 
-        if (!isset($connection)) {
-            $parameters = array('host' => null, 'port' => null);
-            list($parameters['host'], $parameters['port']) = explode(':', $host, 2);
+        if (!$connection) {
+            $host = explode(':', $host, 2);
+            $parameters = array_merge($this->defaultParameters, array(
+                'host' => $host[0],
+                'port' => $host[1],
+            ));
+
             $connection = $this->connections->create($parameters);
         }
 
         switch ($request) {
             case 'MOVED':
+                if ($this->askClusterNodes) {
+                    $this->askClusterNodes();
+                }
+
                 $this->move($connection, $slot);
-                return $this->executeCommand($command);
+                $response = $this->executeCommand($command);
+
+                return $response;
 
             case 'ASK':
-                return $connection->executeCommand($command);
+                $connection->executeCommand(RawCommand::create('ASKING'));
+                $response = $connection->executeCommand($command);
+
+                return $response;
 
             default:
-                throw new ClientException("Unexpected request type for a move request: $request");
+                throw new ProtocolException(
+                    "Unexpected request type for a move request: $request"
+                );
         }
     }
 
     /**
-     * Assign the connection instance to a new slot and adds it to the
-     * pool if the connection was not already part of the pool.
-     *
-     * @param SingleConnectionInterface $connection Connection instance
-     * @param int $slot Target slot.
+     * {@inheritdoc}
      */
-    protected function move(SingleConnectionInterface $connection, $slot)
+    public function writeCommand(CommandInterface $command)
     {
-        $this->pool[(string) $connection] = $connection;
-        $this->slots[(int) $slot] = $connection;
+        $this->getConnection($command)->writeCommand($command);
     }
 
     /**
-     * Returns the underlying command hash strategy used to hash
-     * commands by their keys.
-     *
-     * @return CommandHashStrategyInterface
+     * {@inheritdoc}
      */
-    public function getCommandHashStrategy()
+    public function readResponse(CommandInterface $command)
     {
-        return $this->strategy;
+        return $this->getConnection($command)->readResponse($command);
     }
 
     /**
      * {@inheritdoc}
      */
-    public function count()
+    public function executeCommand(CommandInterface $command)
     {
-        return count($this->pool);
+        $connection = $this->getConnection($command);
+        $response = $connection->executeCommand($command);
+
+        if ($response instanceof ResponseErrorInterface) {
+            return $this->onErrorResponse($command, $response);
+        }
+
+        return $response;
     }
 
     /**
      * {@inheritdoc}
      */
-    public function getIterator()
+    public function count()
     {
-        return new \ArrayIterator(array_values($this->pool));
+        return count($this->pool);
     }
 
     /**
-     * Handles -ERR replies from Redis.
-     *
-     * @param CommandInterface $command Command that generated the -ERR reply.
-     * @param ResponseErrorInterface $error Redis error reply object.
-     * @return mixed
+     * {@inheritdoc}
      */
-    protected function handleServerError(CommandInterface $command, ResponseErrorInterface $error)
+    public function getIterator()
     {
-        list($type, $details) = explode(' ', $error->getMessage(), 2);
-
-        switch ($type) {
-            case 'MOVED':
-            case 'ASK':
-                return $this->onMoveRequest($command, $type, $details);
-
-            default:
-                return $error;
-        }
+        return new ArrayIterator(array_values($this->pool));
     }
 
     /**
-     * {@inheritdoc}
+     * Returns the underlying hash strategy used to hash commands by their keys.
+     *
+     * @return CommandHashStrategyInterface
      */
-    public function writeCommand(CommandInterface $command)
+    public function getCommandHashStrategy()
     {
-        $this->getConnection($command)->writeCommand($command);
+        return $this->strategy;
     }
 
     /**
-     * {@inheritdoc}
+     * Enables automatic fetching of the current slots map from one of the nodes
+     * using the CLUSTER NODES command. This option is disabled by default but
+     * asking the current slots map to Redis upon -MOVE responses may reduce
+     * overhead by eliminating the trial-and-error nature of the node guessing
+     * procedure, mostly when targeting many keys that would end up in a lot of
+     * redirections.
+     *
+     * The slots map can still be manually fetched using the askClusterNodes()
+     * method whether or not this option is enabled.
+     *
+     * @param bool $value Enable or disable the use of CLUSTER NODES.
      */
-    public function readResponse(CommandInterface $command)
+    public function enableClusterNodes($value)
     {
-        return $this->getConnection($command)->readResponse($command);
+        $this->askClusterNodes = (bool) $value;
     }
 
     /**
-     * {@inheritdoc}
+     * Sets a default array of connection parameters to be applied when creating
+     * new connection instances on the fly when they are not part of the initial
+     * pool supplied upon cluster initialization.
+     *
+     * These parameters are not applied to connections added to the pool using
+     * the add() method.
+     *
+     * @param array $parameters Array of connection parameters.
      */
-    public function executeCommand(CommandInterface $command)
+    public function setDefaultParameters(array $parameters)
     {
-        $connection = $this->getConnection($command);
-        $reply = $connection->executeCommand($command);
-
-        if ($reply instanceof ResponseErrorInterface) {
-            return $this->handleServerError($command, $reply);
-        }
-
-        return $reply;
+        $this->defaultParameters = array_merge(
+            $this->defaultParameters,
+            $parameters ?: array()
+        );
     }
 }

+ 42 - 5
tests/Predis/Connection/RedisClusterTest.php

@@ -105,19 +105,50 @@ class RedisClusterTest extends PredisTestCase
     /**
      * @group disconnected
      */
-    public function testConnectForcesAllConnectionsToConnect()
+    public function testConnectPicksRandomConnection()
     {
+        $connect1 = false;
+        $connect2 = false;
+
         $connection1 = $this->getMockConnection('tcp://127.0.0.1:6379');
-        $connection1->expects($this->once())->method('connect');
+        $connection1->expects($this->any())
+                    ->method('connect')
+                    ->will($this->returnCallback(function () use (&$connect1) {
+                        $connect1 = true;
+                    }));
+        $connection1->expects($this->any())
+                    ->method('isConnected')
+                    ->will($this->returnCallback(function () use (&$connect1) {
+                        return $connect1;
+                    }));
 
         $connection2 = $this->getMockConnection('tcp://127.0.0.1:6380');
-        $connection2->expects($this->once())->method('connect');
+        $connection2->expects($this->any())
+                    ->method('connect')
+                    ->will($this->returnCallback(function () use (&$connect2) {
+                        $connect2 = true;
+                    }));
+        $connection2->expects($this->any())
+                    ->method('isConnected')
+                    ->will($this->returnCallback(function () use (&$connect2) {
+                        return $connect2;
+                    }));
 
         $cluster = new RedisCluster();
         $cluster->add($connection1);
         $cluster->add($connection2);
 
         $cluster->connect();
+
+        $this->assertTrue($cluster->isConnected());
+
+        if ($connect1) {
+            $this->assertTrue($connect1);
+            $this->assertFalse($connect2);
+        } else {
+            $this->assertFalse($connect1);
+            $this->assertTrue($connect2);
+        }
     }
 
     /**
@@ -433,7 +464,10 @@ class RedisClusterTest extends PredisTestCase
                     ->will($this->onConsecutiveCalls($askResponse, 'foobar'));
 
         $connection2 = $this->getMockConnection('tcp://127.0.0.1:6380');
-        $connection2->expects($this->exactly(1))
+        $connection2->expects($this->at(2))
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand('ASKING'));
+        $connection2->expects($this->at(3))
                     ->method('executeCommand')
                     ->with($command)
                     ->will($this->returnValue('foobar'));
@@ -470,7 +504,10 @@ class RedisClusterTest extends PredisTestCase
                     ->method('executeCommand');
 
         $connection3 = $this->getMockConnection('tcp://127.0.0.1:6381');
-        $connection3->expects($this->once())
+        $connection3->expects($this->at(0))
+                    ->method('executeCommand')
+                    ->with($this->isRedisCommand('ASKING'));
+        $connection3->expects($this->at(1))
                     ->method('executeCommand')
                     ->with($command)
                     ->will($this->returnValue('foobar'));