|
@@ -23,13 +23,30 @@ use Predis\Command\RawCommand;
|
|
|
use Predis\Response;
|
|
|
|
|
|
/**
|
|
|
- * Abstraction for Redis cluster (Redis v3.0).
|
|
|
+ * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
|
|
|
+ *
|
|
|
+ * This connection backend offers smart support for redis-cluster by handling
|
|
|
+ * automatic slots map (re)generation upon -MOVE or -ASK responses returned by
|
|
|
+ * Redis when redirecting a client to a different node.
|
|
|
+ *
|
|
|
+ * The cluster can be pre-initialized using only a subset of the actual nodes in
|
|
|
+ * the cluster, Predis will do the rest by adjusting the slots map and creating
|
|
|
+ * the missing underlying connection instances on the fly.
|
|
|
+ *
|
|
|
+ * It is possible to pre-associate connections to a slots range with the "slots"
|
|
|
+ * parameter in the form "$first-$last". This can greatly reduce runtime node
|
|
|
+ * guessing and redirections.
|
|
|
+ *
|
|
|
+ * It is also possible to ask for the full and updated slots map directly to one
|
|
|
+ * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
|
|
|
+ * Asking for the cluster configuration to Redis is actually done by issuing a
|
|
|
+ * CLUSTER NODES command to a random node in the pool.
|
|
|
*
|
|
|
* @author Daniele Alessandri <suppakilla@gmail.com>
|
|
|
*/
|
|
|
class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Countable
|
|
|
{
|
|
|
- private $askSlotsMap = false;
|
|
|
+ private $askClusterNodes = false;
|
|
|
private $pool = array();
|
|
|
private $slots = array();
|
|
|
private $slotsMap;
|
|
@@ -106,10 +123,10 @@ class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Cou
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Removes a connection instance using its alias or index.
|
|
|
+ * Removes a connection instance by using its identifier.
|
|
|
*
|
|
|
- * @param string $connectionID Alias or index of a connection.
|
|
|
- * @return bool Returns true if the connection was in the pool.
|
|
|
+ * @param string $connectionID Connection identifier.
|
|
|
+ * @return bool True if the connection was in the pool.
|
|
|
*/
|
|
|
public function removeById($connectionID)
|
|
|
{
|
|
@@ -126,7 +143,13 @@ class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Cou
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Builds the slots map for the cluster.
|
|
|
+ * Generates the current slots map by guessing the cluster configuration out
|
|
|
+ * of the connection parameters of the connections in the pool.
|
|
|
+ *
|
|
|
+ * Generation is based on the same algorithm used by Redis to generate the
|
|
|
+ * cluster, so it is most effective when all of the connections supplied on
|
|
|
+ * initialization have the "slots" parameter properly set accordingly to the
|
|
|
+ * current cluster configuration.
|
|
|
*/
|
|
|
public function buildSlotsMap()
|
|
|
{
|
|
@@ -145,10 +168,10 @@ class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Cou
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Builds the slots map for the cluster by asking the current configuration
|
|
|
- * to one of the nodes in the cluster.
|
|
|
+ * Generates the current slots map by fetching the cluster configuration to
|
|
|
+ * one of the nodes by leveraging the CLUSTER NODES command.
|
|
|
*/
|
|
|
- public function askSlotsMap()
|
|
|
+ public function askClusterNodes()
|
|
|
{
|
|
|
if (!$connection = $this->getRandomConnection()) {
|
|
|
return array();
|
|
@@ -187,10 +210,10 @@ class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Cou
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Preassociate a connection to a set of slots to avoid runtime guessing.
|
|
|
+ * Pre-associates a connection to a slots range to avoid runtime guessing.
|
|
|
*
|
|
|
- * @param int $first Initial slot.
|
|
|
- * @param int $last Last slot.
|
|
|
+ * @param int $first Initial slot of the range.
|
|
|
+ * @param int $last Last slot of the range.
|
|
|
* @param SingleConnectionInterface|string $connection ID or connection instance.
|
|
|
*/
|
|
|
public function setSlots($first, $last, $connection)
|
|
@@ -208,6 +231,31 @@ class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Cou
|
|
|
$this->slotsMap = $this->getSlotsMap() + $slots;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Guesses the correct node associated to a given slot using a precalculated
|
|
|
+ * slots map, falling back to the same logic used by Redis to initialize a
|
|
|
+ * cluster (best-effort).
|
|
|
+ *
|
|
|
+ * @param int $slot Slot index.
|
|
|
+ * @return string Connection ID.
|
|
|
+ */
|
|
|
+ protected function guessNode($slot)
|
|
|
+ {
|
|
|
+ if (!isset($this->slotsMap)) {
|
|
|
+ $this->buildSlotsMap();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isset($this->slotsMap[$slot])) {
|
|
|
+ return $this->slotsMap[$slot];
|
|
|
+ }
|
|
|
+
|
|
|
+ $count = count($this->pool);
|
|
|
+ $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
|
|
|
+ $nodes = array_keys($this->pool);
|
|
|
+
|
|
|
+ return $nodes[$index];
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* {@inheritdoc}
|
|
|
*/
|
|
@@ -231,21 +279,9 @@ class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Cou
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Returns a random connection from the pool.
|
|
|
+ * Returns the connection currently associated to a given slot.
|
|
|
*
|
|
|
- * @return SingleConnectionInterface
|
|
|
- */
|
|
|
- protected function getRandomConnection()
|
|
|
- {
|
|
|
- if ($this->pool) {
|
|
|
- return $this->pool[array_rand($this->pool)];
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the connection associated to the specified slot.
|
|
|
- *
|
|
|
- * @param int $slot Slot ID.
|
|
|
+ * @param int $slot Slot index.
|
|
|
* @return SingleConnectionInterface
|
|
|
*/
|
|
|
public function getConnectionBySlot($slot)
|
|
@@ -284,27 +320,49 @@ class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Cou
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Tries guessing the correct node associated to the given slot using a precalculated
|
|
|
- * slots map or the same logic used by redis-trib to initialize a redis cluster.
|
|
|
+ * Returns a random connection from the pool.
|
|
|
*
|
|
|
- * @param int $slot Slot ID.
|
|
|
- * @return string
|
|
|
+ * @return SingleConnectionInterface
|
|
|
*/
|
|
|
- protected function guessNode($slot)
|
|
|
+ protected function getRandomConnection()
|
|
|
{
|
|
|
- if (!isset($this->slotsMap)) {
|
|
|
- $this->buildSlotsMap();
|
|
|
+ if ($this->pool) {
|
|
|
+ return $this->pool[array_rand($this->pool)];
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- if (isset($this->slotsMap[$slot])) {
|
|
|
- return $this->slotsMap[$slot];
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Permanently associates the connection instance to a new slot.
|
|
|
+ * The connection is added to the connections pool if not yet included.
|
|
|
+ *
|
|
|
+ * @param SingleConnectionInterface $connection Connection instance.
|
|
|
+ * @param int $slot Target slot index.
|
|
|
+ */
|
|
|
+ protected function move(SingleConnectionInterface $connection, $slot)
|
|
|
+ {
|
|
|
+ $this->pool[(string) $connection] = $connection;
|
|
|
+ $this->slots[(int) $slot] = $connection;
|
|
|
+ }
|
|
|
|
|
|
- $count = count($this->pool);
|
|
|
- $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
|
|
|
- $nodes = array_keys($this->pool);
|
|
|
+ /**
|
|
|
+ * Handles -ERR responses from Redis.
|
|
|
+ *
|
|
|
+ * @param CommandInterface $command Command that generated the -ERR response.
|
|
|
+ * @param Response\ErrorInterface $error Redis error response object.
|
|
|
+ * @return mixed
|
|
|
+ */
|
|
|
+ protected function onErrorResponse(CommandInterface $command, Response\ErrorInterface $error)
|
|
|
+ {
|
|
|
+ $details = explode(' ', $error->getMessage(), 2);
|
|
|
|
|
|
- return $nodes[$index];
|
|
|
+ switch ($details[0]) {
|
|
|
+ case 'MOVED':
|
|
|
+ case 'ASK':
|
|
|
+ return $this->onMoveRequest($command, $details[0], $details[1]);
|
|
|
+
|
|
|
+ default:
|
|
|
+ return $error;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -332,8 +390,8 @@ class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Cou
|
|
|
|
|
|
switch ($request) {
|
|
|
case 'MOVED':
|
|
|
- if ($this->askSlotsMap) {
|
|
|
- $this->askSlotsMap();
|
|
|
+ if ($this->askClusterNodes) {
|
|
|
+ $this->askClusterNodes();
|
|
|
}
|
|
|
|
|
|
$this->move($connection, $slot);
|
|
@@ -352,104 +410,78 @@ class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Cou
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Assign the connection instance to a new slot and adds it to the
|
|
|
- * pool if the connection was not already part of the pool.
|
|
|
- *
|
|
|
- * @param SingleConnectionInterface $connection Connection instance
|
|
|
- * @param int $slot Target slot.
|
|
|
- */
|
|
|
- protected function move(SingleConnectionInterface $connection, $slot)
|
|
|
- {
|
|
|
- $this->pool[(string) $connection] = $connection;
|
|
|
- $this->slots[(int) $slot] = $connection;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Returns the underlying hash strategy used to hash commands by their keys.
|
|
|
- *
|
|
|
- * @return Cluster\StrategyInterface
|
|
|
- */
|
|
|
- public function getClusterStrategy()
|
|
|
- {
|
|
|
- return $this->strategy;
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* {@inheritdoc}
|
|
|
*/
|
|
|
- public function count()
|
|
|
+ public function writeCommand(CommandInterface $command)
|
|
|
{
|
|
|
- return count($this->pool);
|
|
|
+ $this->getConnection($command)->writeCommand($command);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* {@inheritdoc}
|
|
|
*/
|
|
|
- public function getIterator()
|
|
|
+ public function readResponse(CommandInterface $command)
|
|
|
{
|
|
|
- return new ArrayIterator(array_values($this->pool));
|
|
|
+ return $this->getConnection($command)->readResponse($command);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Handles -ERR responses from Redis.
|
|
|
- *
|
|
|
- * @param CommandInterface $command Command that generated the -ERR response.
|
|
|
- * @param Response\ErrorInterface $error Redis error response object.
|
|
|
- * @return mixed
|
|
|
+ * {@inheritdoc}
|
|
|
*/
|
|
|
- protected function onErrorResponse(CommandInterface $command, Response\ErrorInterface $error)
|
|
|
+ public function executeCommand(CommandInterface $command)
|
|
|
{
|
|
|
- $details = explode(' ', $error->getMessage(), 2);
|
|
|
-
|
|
|
- switch ($details[0]) {
|
|
|
- case 'MOVED':
|
|
|
- case 'ASK':
|
|
|
- return $this->onMoveRequest($command, $details[0], $details[1]);
|
|
|
+ $connection = $this->getConnection($command);
|
|
|
+ $response = $connection->executeCommand($command);
|
|
|
|
|
|
- default:
|
|
|
- return $error;
|
|
|
+ if ($response instanceof Response\ErrorInterface) {
|
|
|
+ return $this->onErrorResponse($command, $response);
|
|
|
}
|
|
|
+
|
|
|
+ return $response;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* {@inheritdoc}
|
|
|
*/
|
|
|
- public function writeCommand(CommandInterface $command)
|
|
|
+ public function count()
|
|
|
{
|
|
|
- $this->getConnection($command)->writeCommand($command);
|
|
|
+ return count($this->pool);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* {@inheritdoc}
|
|
|
*/
|
|
|
- public function readResponse(CommandInterface $command)
|
|
|
+ public function getIterator()
|
|
|
{
|
|
|
- return $this->getConnection($command)->readResponse($command);
|
|
|
+ return new ArrayIterator(array_values($this->pool));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * {@inheritdoc}
|
|
|
+ * Returns the underlying hash strategy used to hash commands by their keys.
|
|
|
+ *
|
|
|
+ * @return Cluster\StrategyInterface
|
|
|
*/
|
|
|
- public function executeCommand(CommandInterface $command)
|
|
|
+ public function getClusterStrategy()
|
|
|
{
|
|
|
- $connection = $this->getConnection($command);
|
|
|
- $response = $connection->executeCommand($command);
|
|
|
-
|
|
|
- if ($response instanceof Response\ErrorInterface) {
|
|
|
- return $this->onErrorResponse($command, $response);
|
|
|
- }
|
|
|
-
|
|
|
- return $response;
|
|
|
+ return $this->strategy;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Instruct the cluster to fetch the slots map from one of the nodes.
|
|
|
+ * Enables automatic fetching of the current slots map from one of the nodes
|
|
|
+ * using the CLUSTER NODES command. This option is disabled by default but
|
|
|
+ * asking the current slots map to Redis upon -MOVE responses may reduce
|
|
|
+ * overhead by eliminating the trial-and-error nature of the node guessing
|
|
|
+ * procedure, mostly when targeting many keys that would end up in a lot of
|
|
|
+ * redirections.
|
|
|
+ *
|
|
|
+ * The slots map can still be manually fetched using the askClusterNodes()
|
|
|
+ * method whether or not this option is enabled.
|
|
|
*
|
|
|
- * @param bool $value Enable or disable fetching the slots map.
|
|
|
+ * @param bool $value Enable or disable the use of CLUSTER NODES.
|
|
|
*/
|
|
|
- public function setAskSlotsMap($value)
|
|
|
+ public function enableClusterNodes($value)
|
|
|
{
|
|
|
- $this->askSlotsMap = (bool) $value;
|
|
|
+ $this->askClusterNodes = (bool) $value;
|
|
|
}
|
|
|
}
|