Browse Source

Add initial support for Redis cluster (Redis v3.0).

This implementation is capable of handling "ASK" and "MOVED" replies
returned by Redis when one of the nodes asks the client for temporary
or permanent redirects of a slot to a different node.

Performances look almost on par with our client-side sharding solution
and everything looks already relatively stable.

Tests are still missing.
Daniele Alessandri 13 năm trước cách đây
mục cha
commit
1f0be85a3d

+ 3 - 0
CHANGELOG.md

@@ -3,6 +3,9 @@ v0.8.0 (201x-xx-xx)
 
 - The default server profile for Redis is now `2.6`.
 
+- Added smart support for redis-cluster (Redis v3.0) in addition to the usual
+  cluster implementation that uses client-side sharding.
+
 - Some namespaces and classes have been renamed to follow one common rule
   inspired by the Symfony2 naming conventions. See `CHANGELOG.NAMING.md` for
   more details about these changes.

+ 1 - 0
README.md

@@ -11,6 +11,7 @@ project.
 ## Main features ##
 
 - Complete support for Redis from __1.2__ to __2.6__ and unstable versions using different server profiles.
+- Smart support for [redis-cluster](http://redis.io/topics/cluster-spec) (Redis >= 3.0).
 - Client-side sharding with support for consistent hashing or custom distribution strategies.
 - Support for master / slave replication configurations (write on master, read from slaves).
 - Command pipelining on single and aggregated connections.

+ 253 - 0
lib/Predis/Connection/RedisCluster.php

@@ -0,0 +1,253 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Connection;
+
+use Predis\ResponseErrorInterface;
+use Predis\ConnectionFactoryInterface;
+use Predis\Command\CommandInterface;
+use Predis\ClientException;
+use Predis\NotSupportedException;
+use Predis\Distribution\CRC16HashGenerator;
+
+/**
+ * Abstraction for Redis cluster (Redis v3.0).
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class RedisCluster implements ClusterConnectionInterface, \IteratorAggregate, \Countable
+{
+    private $pool;
+    private $slots;
+    private $connections;
+    private $hashgenerator;
+
+    /**
+     * @param ConnectionFactoryInterface $connections Connection factory object.
+     */
+    public function __construct(ConnectionFactoryInterface $connections = null)
+    {
+        $this->pool = array();
+        $this->slots = array();
+        $this->connections = $connections;
+        $this->hashgenerator = new CRC16HashGenerator();
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function isConnected()
+    {
+        foreach ($this->pool as $connection) {
+            if ($connection->isConnected()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function connect()
+    {
+        foreach ($this->pool as $connection) {
+            $connection->connect();
+        }
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function disconnect()
+    {
+        foreach ($this->pool as $connection) {
+            $connection->disconnect();
+        }
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function add(SingleConnectionInterface $connection)
+    {
+        $parameters = $connection->getParameters();
+        $this->pool["{$parameters->host}:{$parameters->port}"] = $connection;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function remove(SingleConnectionInterface $connection)
+    {
+        if (($id = array_search($connection, $this->pool, true)) !== false) {
+            unset($this->pool[$id]);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Removes a connection instance using its alias or index.
+     *
+     * @param string $connectionId Alias or index of a connection.
+     * @return Boolean Returns true if the connection was in the pool.
+     */
+    public function removeById($connectionId)
+    {
+        if (isset($this->pool[$connectionId])) {
+            unset($this->pool[$connectionId]);
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function getConnection(CommandInterface $command)
+    {
+        if ($hash = $command->getHash() === null) {
+            $hash = $this->hashgenerator->hash($command->getArgument(0));
+
+            if (!isset($hash)) {
+                throw new NotSupportedException("Cannot send {$command->getId()} commands to redis-cluster");
+            }
+        }
+
+        $slot = $hash & 0x0FFF;
+        if (isset($this->slots[$slot])) {
+            return $this->slots[$slot];
+        }
+
+        $connection = $this->pool[array_rand($this->pool)];
+        $this->slots[$slot] = $connection;
+
+        return $connection;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function getConnectionById($id = null)
+    {
+        if (!isset($id)) {
+            throw new \InvalidArgumentException("A valid connection ID must be specified");
+        }
+
+        return isset($this->pool[$id]) ? $this->pool[$id] : null;
+    }
+
+    /**
+     * Handles -MOVED or -ASK replies by re-executing the command on the server
+     * specified by the Redis reply.
+     *
+     * @param CommandInterface $command Command that generated the -MOVE or -ASK reply.
+     * @param string $request Type of request (either 'MOVED' or 'ASK').
+     * @param string $details Parameters of the MOVED/ASK request.
+     * @return mixed
+     */
+    protected function onMoveRequest(CommandInterface $command, $request, $details)
+    {
+        list($slot, $host) = explode(' ', $details, 2);
+        $connection = $this->getConnectionById($host);
+
+        if (!isset($connection)) {
+            $parameters = array('host' => null, 'port' => null);
+            list($parameters['host'], $parameters['port']) = explode(':', $host, 2);
+            $connection = $this->connections->create($parameters);
+        }
+
+        switch ($request) {
+            case 'MOVED':
+                $this->add($connection);
+                $this->slots[$slot] = $connection;
+                return $this->executeCommand($command);
+
+            case 'ASK':
+                return $connection->executeCommand($command);
+
+            default:
+                throw new ClientException("Unexpected request type for a move request: $request");
+        }
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function count()
+    {
+        return count($this->pool);
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function getIterator()
+    {
+        return new \ArrayIterator($this->pool);
+    }
+
+    /**
+     * Handles -ERR replies from Redis.
+     *
+     * @param CommandInterface $command Command that generated the -ERR reply.
+     * @param ResponseErrorInterface $error Redis error reply object.
+     * @return mixed
+     */
+    protected function handleServerError(CommandInterface $command, ResponseErrorInterface $error)
+    {
+        list($type, $details) = explode(' ', $error->getMessage(), 2);
+
+        switch ($type) {
+            case 'MOVED':
+            case 'ASK':
+                return $this->onMoveRequest($command, $type, $details);
+
+            default:
+                return $error;
+        }
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function writeCommand(CommandInterface $command)
+    {
+        $this->getConnection($command)->writeCommand($command);
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function readResponse(CommandInterface $command)
+    {
+        return $this->getConnection($command)->readResponse($command);
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function executeCommand(CommandInterface $command)
+    {
+        $connection = $this->getConnection($command);
+        $reply = $connection->executeCommand($command);
+
+        if ($reply instanceof ResponseErrorInterface) {
+            return $this->handleServerError($command, $reply);
+        }
+
+        return $reply;
+    }
+}

+ 72 - 0
lib/Predis/Distribution/CRC16HashGenerator.php

@@ -0,0 +1,72 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Distribution;
+
+/**
+ * This class implements the CRC-CCITT-16 algorithm used by redis-cluster.
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class CRC16HashGenerator implements HashGeneratorInterface
+{
+    private static $CCITT_16 = array(
+        0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
+        0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
+        0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6,
+        0x9339, 0x8318, 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE,
+        0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, 0x5485,
+        0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D,
+        0x3653, 0x2672, 0x1611, 0x0630, 0x76D7, 0x66F6, 0x5695, 0x46B4,
+        0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC,
+        0x48C4, 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823,
+        0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, 0xA90A, 0xB92B,
+        0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12,
+        0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A,
+        0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41,
+        0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49,
+        0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, 0x2E32, 0x1E51, 0x0E70,
+        0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78,
+        0x9188, 0x81A9, 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F,
+        0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, 0x6067,
+        0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E,
+        0x02B1, 0x1290, 0x22F3, 0x32D2, 0x4235, 0x5214, 0x6277, 0x7256,
+        0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D,
+        0x34E2, 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
+        0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, 0xC71D, 0xD73C,
+        0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634,
+        0xD94C, 0xC96D, 0xF90E, 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB,
+        0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3,
+        0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A,
+        0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, 0x1AD0, 0x2AB3, 0x3A92,
+        0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9,
+        0x7C26, 0x6C07, 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1,
+        0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8,
+        0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0
+    );
+
+    /**
+     * {@inheritdoc}
+     */
+    public function hash($value)
+    {
+        // CRC-CCITT-16 algorithm
+        $crc = 0;
+        $CCITT_16 = self::$CCITT_16;
+        $strlen = strlen($value);
+
+        for ($i = 0; $i < $strlen; $i++) {
+            $crc = (($crc << 8) ^ $CCITT_16[($crc >> 8) ^ ord($value[$i])]) & 0xFFFF;
+        }
+
+        return $crc;
+    }
+}

+ 12 - 1
lib/Predis/Option/ClientCluster.php

@@ -12,6 +12,7 @@
 namespace Predis\Option;
 
 use Predis\Connection\ClusterConnectionInterface;
+use Predis\Connection\RedisCluster;
 use Predis\Connection\PredisCluster;
 
 /**
@@ -60,7 +61,17 @@ class ClientCluster extends AbstractOption
     {
         switch ($fqnOrType) {
             case 'predis':
-                return function() { return new PredisCluster(); };
+                return function() {
+                    return new PredisCluster();
+                };
+
+            case 'redis':
+                return function() use($options) {
+                    $connectionFactory = $options->connections;
+                    $cluster = new RedisCluster($connectionFactory);
+
+                    return $cluster;
+                };
 
             default:
                 // TODO: we should not even allow non-string values here.