123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- <?php
- /*
- * This file is part of the Predis package.
- *
- * (c) Daniele Alessandri <suppakilla@gmail.com>
- *
- * For the full copyright and license information, please view the LICENSE
- * file that was distributed with this source code.
- */
- namespace Predis\Connection;
- use ArrayIterator;
- use Countable;
- use IteratorAggregate;
- use OutOfBoundsException;
- use Predis\NotSupportedException;
- use Predis\ResponseErrorInterface;
- use Predis\Cluster\CommandHashStrategyInterface;
- use Predis\Cluster\RedisClusterHashStrategy;
- use Predis\Command\CommandInterface;
- use Predis\Command\RawCommand;
- /**
- * Abstraction for a Redis-backed cluster of nodes (Redis >= 3.0.0).
- *
- * This connection backend offers smart support for redis-cluster by handling
- * automatic slots map (re)generation upon -MOVED or -ASK responses returned by
- * Redis when redirecting a client to a different node.
- *
- * The cluster can be pre-initialized using only a subset of the actual nodes in
- * the cluster, Predis will do the rest by adjusting the slots map and creating
- * the missing underlying connection instances on the fly.
- *
- * It is possible to pre-associate connections to a slots range with the "slots"
- * parameter in the form "$first-$last". This can greatly reduce runtime node
- * guessing and redirections.
- *
- * It is also possible to ask for the full and updated slots map directly to one
- * of the nodes and optionally enable such a behaviour upon -MOVED redirections.
- * Asking for the cluster configuration to Redis is actually done by issuing a
- * CLUSTER SLOTS command to a random node in the pool.
- *
- * @author Daniele Alessandri <suppakilla@gmail.com>
- */
- class RedisCluster implements ClusterConnectionInterface, IteratorAggregate, Countable
- {
- private $askClusterNodes = true;
- private $defaultParameters = array();
- private $pool = array();
- private $slots = array();
- private $slotsMap;
- private $strategy;
- private $connections;
- /**
- * @param ConnectionFactoryInterface $connections Connection factory object.
- */
- public function __construct(ConnectionFactoryInterface $connections = null)
- {
- $this->strategy = new RedisClusterHashStrategy();
- $this->connections = $connections ?: new ConnectionFactory();
- }
- /**
- * {@inheritdoc}
- */
- public function isConnected()
- {
- foreach ($this->pool as $connection) {
- if ($connection->isConnected()) {
- return true;
- }
- }
- return false;
- }
- /**
- * {@inheritdoc}
- */
- public function connect()
- {
- if ($connection = $this->getRandomConnection()) {
- $connection->connect();
- }
- }
- /**
- * {@inheritdoc}
- */
- public function disconnect()
- {
- foreach ($this->pool as $connection) {
- $connection->disconnect();
- }
- }
- /**
- * {@inheritdoc}
- */
- public function add(SingleConnectionInterface $connection)
- {
- $this->pool[(string) $connection] = $connection;
- unset($this->slotsMap);
- }
- /**
- * {@inheritdoc}
- */
- public function remove(SingleConnectionInterface $connection)
- {
- if (false !== $id = array_search($connection, $this->pool, true)) {
- unset(
- $this->pool[$id],
- $this->slotsMap
- );
- return true;
- }
- return false;
- }
- /**
- * Removes a connection instance by using its identifier.
- *
- * @param string $connectionID Connection identifier.
- * @return bool True if the connection was in the pool.
- */
- public function removeById($connectionID)
- {
- if (isset($this->pool[$connectionID])) {
- unset(
- $this->pool[$connectionID],
- $this->slotsMap
- );
- return true;
- }
- return false;
- }
- /**
- * Generates the current slots map by guessing the cluster configuration out
- * of the connection parameters of the connections in the pool.
- *
- * Generation is based on the same algorithm used by Redis to generate the
- * cluster, so it is most effective when all of the connections supplied on
- * initialization have the "slots" parameter properly set accordingly to the
- * current cluster configuration.
- */
- public function buildSlotsMap()
- {
- $this->slotsMap = array();
- foreach ($this->pool as $connectionID => $connection) {
- $parameters = $connection->getParameters();
- if (!isset($parameters->slots)) {
- continue;
- }
- $slots = explode('-', $parameters->slots, 2);
- $this->setSlots($slots[0], $slots[1], $connectionID);
- }
- }
- /**
- * Generates an updated slots map fetching the cluster configuration using
- * the CLUSTER SLOTS command against the specified node or a random one from
- * the pool.
- *
- * @param SingleConnectionInterface $connection Optional connection instance.
- * @return array
- */
- public function askClusterNodes(SingleConnectionInterface $connection = null)
- {
- if (!$connection && !$connection = $this->getRandomConnection()) {
- return array();
- }
- $command = RawCommand::create('CLUSTER', 'SLOTS');
- $response = $connection->executeCommand($command);
- foreach ($response as $slots) {
- // We only support master servers for now, so we ignore subsequent
- // elements in the $slots array identifying slaves.
- list($start, $end, $master) = $slots;
- if ($master[0] === '') {
- $this->setSlots($start, $end, (string) $connection);
- } else {
- $this->setSlots($start, $end, "{$master[0]}:{$master[1]}");
- }
- }
- return $this->slotsMap;
- }
- /**
- * Returns the current slots map for the cluster.
- *
- * @return array
- */
- public function getSlotsMap()
- {
- if (!isset($this->slotsMap)) {
- $this->slotsMap = array();
- }
- return $this->slotsMap;
- }
- /**
- * Pre-associates a connection to a slots range to avoid runtime guessing.
- *
- * @param int $first Initial slot of the range.
- * @param int $last Last slot of the range.
- * @param SingleConnectionInterface|string $connection ID or connection instance.
- */
- public function setSlots($first, $last, $connection)
- {
- if ($first < 0x0000 || $first > 0x3FFF ||
- $last < 0x0000 || $last > 0x3FFF ||
- $last < $first
- ) {
- throw new OutOfBoundsException(
- "Invalid slot range for $connection: [$first-$last]"
- );
- }
- $slots = array_fill($first, $last - $first + 1, (string) $connection);
- $this->slotsMap = $this->getSlotsMap() + $slots;
- }
- /**
- * Guesses the correct node associated to a given slot using a precalculated
- * slots map, falling back to the same logic used by Redis to initialize a
- * cluster (best-effort).
- *
- * @param int $slot Slot index.
- * @return string Connection ID.
- */
- protected function guessNode($slot)
- {
- if (!isset($this->slotsMap)) {
- $this->buildSlotsMap();
- }
- if (isset($this->slotsMap[$slot])) {
- return $this->slotsMap[$slot];
- }
- $count = count($this->pool);
- $index = min((int) ($slot / (int) (16384 / $count)), $count - 1);
- $nodes = array_keys($this->pool);
- return $nodes[$index];
- }
- /**
- * Creates a new connection instance from the given connection ID.
- *
- * @param string $connectionID Identifier for the connection.
- * @return SingleConnectionInterface
- */
- protected function createConnection($connectionID)
- {
- $host = explode(':', $connectionID, 2);
- $parameters = array_merge($this->defaultParameters, array(
- 'host' => $host[0],
- 'port' => $host[1],
- ));
- $connection = $this->connections->create($parameters);
- return $connection;
- }
- /**
- * {@inheritdoc}
- */
- public function getConnection(CommandInterface $command)
- {
- $hash = $this->strategy->getHash($command);
- if (!isset($hash)) {
- throw new NotSupportedException(
- "Cannot use {$command->getId()} with redis-cluster"
- );
- }
- $slot = $hash & 0x3FFF;
- if (isset($this->slots[$slot])) {
- return $this->slots[$slot];
- } else {
- return $this->getConnectionBySlot($slot);
- }
- }
- /**
- * Returns the connection currently associated to a given slot.
- *
- * @param int $slot Slot index.
- * @return SingleConnectionInterface
- */
- public function getConnectionBySlot($slot)
- {
- if ($slot < 0x0000 || $slot > 0x3FFF) {
- throw new OutOfBoundsException("Invalid slot [$slot]");
- }
- if (isset($this->slots[$slot])) {
- return $this->slots[$slot];
- }
- $connectionID = $this->guessNode($slot);
- if (!$connection = $this->getConnectionById($connectionID)) {
- $connection = $this->createConnection($connectionID);
- $this->pool[$connectionID] = $connection;
- }
- return $this->slots[$slot] = $connection;
- }
- /**
- * {@inheritdoc}
- */
- public function getConnectionById($connectionID)
- {
- if (isset($this->pool[$connectionID])) {
- return $this->pool[$connectionID];
- }
- }
- /**
- * Returns a random connection from the pool.
- *
- * @return SingleConnectionInterface
- */
- protected function getRandomConnection()
- {
- if ($this->pool) {
- return $this->pool[array_rand($this->pool)];
- }
- }
- /**
- * Permanently associates the connection instance to a new slot.
- * The connection is added to the connections pool if not yet included.
- *
- * @param SingleConnectionInterface $connection Connection instance.
- * @param int $slot Target slot index.
- */
- protected function move(SingleConnectionInterface $connection, $slot)
- {
- $this->pool[(string) $connection] = $connection;
- $this->slots[(int) $slot] = $connection;
- }
- /**
- * Handles -ERR responses returned by Redis.
- *
- * @param CommandInterface $command Command that generated the -ERR response.
- * @param ResponseErrorInterface $error Redis error response object.
- * @return mixed
- */
- protected function onErrorResponse(CommandInterface $command, ResponseErrorInterface $error)
- {
- $details = explode(' ', $error->getMessage(), 2);
- switch ($details[0]) {
- case 'MOVED':
- return $this->onMovedResponse($command, $details[1]);
- case 'ASK':
- return $this->onAskResponse($command, $details[1]);
- default:
- return $error;
- }
- }
- /**
- * Handles -MOVED responses by executing again the command against the node
- * indicated by the Redis response.
- *
- * @param CommandInterface $command Command that generated the -MOVED response.
- * @param string $details Parameters of the -MOVED response.
- * @return mixed
- */
- protected function onMovedResponse(CommandInterface $command, $details)
- {
- list($slot, $connectionID) = explode(' ', $details, 2);
- if (!$connection = $this->getConnectionById($connectionID)) {
- $connection = $this->createConnection($connectionID);
- }
- if ($this->askClusterNodes) {
- $this->askClusterNodes($connection);
- }
- $this->move($connection, $slot);
- $response = $this->executeCommand($command);
- return $response;
- }
- /**
- * Handles -ASK responses by executing again the command against the node
- * indicated by the Redis response.
- *
- * @param CommandInterface $command Command that generated the -ASK response.
- * @param string $details Parameters of the -ASK response.
- * @return mixed
- */
- protected function onAskResponse(CommandInterface $command, $details)
- {
- list($slot, $connectionID) = explode(' ', $details, 2);
- if (!$connection = $this->getConnectionById($connectionID)) {
- $connection = $this->createConnection($connectionID);
- }
- $connection->executeCommand(RawCommand::create('ASKING'));
- $response = $connection->executeCommand($command);
- return $response;
- }
- /**
- * {@inheritdoc}
- */
- public function writeCommand(CommandInterface $command)
- {
- $this->getConnection($command)->writeCommand($command);
- }
- /**
- * {@inheritdoc}
- */
- public function readResponse(CommandInterface $command)
- {
- return $this->getConnection($command)->readResponse($command);
- }
- /**
- * {@inheritdoc}
- */
- public function executeCommand(CommandInterface $command)
- {
- $connection = $this->getConnection($command);
- $response = $connection->executeCommand($command);
- if ($response instanceof ResponseErrorInterface) {
- return $this->onErrorResponse($command, $response);
- }
- return $response;
- }
- /**
- * {@inheritdoc}
- */
- public function count()
- {
- return count($this->pool);
- }
- /**
- * {@inheritdoc}
- */
- public function getIterator()
- {
- return new ArrayIterator(array_values($this->pool));
- }
- /**
- * Returns the underlying hash strategy used to hash commands by their keys.
- *
- * @return CommandHashStrategyInterface
- */
- public function getCommandHashStrategy()
- {
- return $this->strategy;
- }
- /**
- * Enables automatic fetching of the current slots map from one of the nodes
- * using the CLUSTER SLOTS command. This option is disabled by default but
- * asking the current slots map to Redis upon -MOVED responses may reduce
- * overhead by eliminating the trial-and-error nature of the node guessing
- * procedure, mostly when targeting many keys that would end up in a lot of
- * redirections.
- *
- * The slots map can still be manually fetched using the askClusterNodes()
- * method whether or not this option is enabled.
- *
- * @param bool $value Enable or disable the use of CLUSTER SLOTS.
- */
- public function enableClusterNodes($value)
- {
- $this->askClusterNodes = (bool) $value;
- }
- /**
- * Sets a default array of connection parameters to be applied when creating
- * new connection instances on the fly when they are not part of the initial
- * pool supplied upon cluster initialization.
- *
- * These parameters are not applied to connections added to the pool using
- * the add() method.
- *
- * @param array $parameters Array of connection parameters.
- */
- public function setDefaultParameters(array $parameters)
- {
- $this->defaultParameters = array_merge(
- $this->defaultParameters,
- $parameters ?: array()
- );
- }
- }
|