Эх сурвалжийг харах

Rewrite the connection class for redis-sentinel.

Now we do not extend Predis\Connection\Aggregate\MasterSlaveReplication
anymore in order to obtain a more coherent implementation with the logic
of redis-sentinel and apply more optimizations by avoiding useless round
trips with sentinel servers.
Daniele Alessandri 9 жил өмнө
parent
commit
b047e8b6a1

+ 361 - 65
src/Connection/Aggregate/SentinelReplication.php

@@ -28,15 +28,25 @@ use Predis\Response\ServerException;
  * @author Daniele Alessandri <suppakilla@gmail.com>
  * @author Ville Mattila <ville@eventio.fi>
  */
-class SentinelReplication extends MasterSlaveReplication
+class SentinelReplication implements ReplicationInterface
 {
     /**
-     * List of sentinel servers.
+     * @var NodeConnectionInterface
+     */
+    protected $master;
+
+    /**
+     * @var NodeConnectionInterface[]
+     */
+    protected $slaves = array();
+
+    /**
+     * @var NodeConnectionInterface
      */
-    protected $sentinels;
+    protected $current;
 
     /**
-     * Name of the service.
+     * @var string
      */
     protected $service;
 
@@ -46,14 +56,22 @@ class SentinelReplication extends MasterSlaveReplication
     protected $connectionFactory;
 
     /**
-     * The current sentinel connection instance.
-     *
+     * @var ReplicationStrategy
+     */
+    protected $strategy;
+
+    /**
+     * @var NodeConnectionInterface[]
+     */
+    protected $sentinels = array();
+
+    /**
      * @var NodeConnectionInterface
      */
     protected $sentinelConnection;
 
     /**
-     * Timeout for the connection to a sentinel.
+     * @var float
      */
     protected $sentinelTimeout = 0.100;
 
@@ -63,11 +81,23 @@ class SentinelReplication extends MasterSlaveReplication
      * -1 = unlimited retry attempts
      *  0 = no retry attempts (fails immediatly)
      *  n = fail only after n retry attempts
+     *
+     * @var int
      */
     protected $retryLimit = 20;
 
+    /**
+     * Time to wait in milliseconds before fetching a new configuration from one
+     * of the sentinel servers.
+     *
+     * @var int
+     */
+    protected $retryWait = 1000;
+
     /**
      * Flag for automatic fetching of available sentinels.
+     *
+     * @var bool
      */
     protected $updateSentinels = false;
 
@@ -86,8 +116,7 @@ class SentinelReplication extends MasterSlaveReplication
         $this->sentinels = $sentinels;
         $this->service = $service;
         $this->connectionFactory = $connectionFactory;
-
-        parent::__construct($strategy);
+        $this->strategy = $strategy ?: new ReplicationStrategy();
     }
 
     /**
@@ -98,25 +127,36 @@ class SentinelReplication extends MasterSlaveReplication
      *
      * @param float $timeout Timeout value.
      */
-    public function setDefaultSentinelTimeout($timeout)
+    public function setSentinelTimeout($timeout)
     {
         $this->sentinelTimeout = (float) $timeout;
     }
 
     /**
-     * Set maximum number of automatic retries of commands upon server failure.
+     * Sets the maximum number of retries for commands upon server failure.
      *
      * -1 = unlimited retry attempts
      *  0 = no retry attempts (fails immediatly)
      *  n = fail only after n retry attempts
      *
-     * @param integer $retry Number of retry attempts.
+     * @param int $retry Number of retry attempts.
      */
     public function setRetryLimit($retry)
     {
         $this->retryLimit = (int) $retry;
     }
 
+    /**
+     * Sets the time to wait (in seconds) before fetching a new configuration
+     * from one of the sentinels.
+     *
+     * @param float $seconds Time to wait before the next attempt.
+     */
+    public function setRetryWait($seconds)
+    {
+        $this->retryWait = (float) $seconds;
+    }
+
     /**
      * Set automatic fetching of available sentinels.
      *
@@ -128,28 +168,63 @@ class SentinelReplication extends MasterSlaveReplication
     }
 
     /**
-     * {@inheritdoc}
+     * Resets the current connection.
      */
-    protected function check()
+    protected function reset()
     {
-        $this->querySentinel();
-
-        parent::check();
+        $this->current = null;
     }
 
     /**
-     * Wipes the list of master and slaves nodes.
+     * Wipes the current list of master and slaves nodes.
      */
     protected function wipeServerList()
     {
         $this->reset();
 
         $this->master = null;
-        $this->slaves = null;
+        $this->slaves = array();
     }
 
     /**
-     * Creates the connection to a sentinel server.
+     * {@inheritdoc}
+     */
+    public function add(NodeConnectionInterface $connection)
+    {
+        $alias = $connection->getParameters()->alias;
+
+        if ($alias === 'master') {
+            $this->master = $connection;
+        } else {
+            $this->slaves[$alias ?: count($this->slaves)] = $connection;
+        }
+
+        $this->reset();
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function remove(NodeConnectionInterface $connection)
+    {
+        if ($connection->getParameters()->alias === 'master') {
+            $this->wipeServerList();
+
+            return true;
+        } else {
+            if (($id = array_search($connection, $this->slaves, true)) !== false) {
+                unset($this->slaves[$id]);
+                $this->wipeServerList();
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Creates a new connection to a sentinel server.
      *
      * @return NodeConnectionInterface
      */
@@ -166,6 +241,11 @@ class SentinelReplication extends MasterSlaveReplication
         if (is_array($parameters)) {
             $parameters += array(
                 'timeout' => $this->sentinelTimeout,
+
+                // We need to override password and database by setting them to
+                // NULL as they are not needed when connecting to sentinels.
+                'password' => null,
+                'database' => null,
             );
         }
 
@@ -196,14 +276,56 @@ class SentinelReplication extends MasterSlaveReplication
     }
 
     /**
-     * Fetches details of the master server from a sentinel server.
+     * Fetches an updated list of sentinels from a sentinel.
+     */
+    public function updateSentinels()
+    {
+        SENTINEL_QUERY: {
+            $sentinel = $this->getSentinelConnection();
+
+            try {
+                $payload = $sentinel->executeCommand(
+                    RawCommand::create('SENTINEL', 'sentinels', $this->service)
+                );
+
+                $this->sentinels = array();
+                $this->sentinels[] = $sentinel->getParameters()->toArray();
+
+                foreach ($payload as $sentinel) {
+                    $this->sentinels[] = array(
+                        'host' => $sentinel[3],
+                        'port' => $sentinel[5],
+                    );
+                }
+            } catch (ConnectionException $exception) {
+                $this->sentinelConnection = null;
+
+                goto SENTINEL_QUERY;
+            }
+        }
+    }
+
+    /**
+     * Fetches the details for the master and slave servers from a sentinel.
+     */
+    public function querySentinel()
+    {
+        $this->wipeServerList();
+
+        $this->updateSentinels();
+        $this->getMaster();
+        $this->getSlaves();
+    }
+
+    /**
+     * Fetches the details for the master server from a sentinel.
      *
      * @param NodeConnectionInterface $sentinel Connection to a sentinel server.
      * @param string                  $service  Name of the service.
      *
      * @return array
      */
-    protected function getMasterFromSentinel(NodeConnectionInterface $sentinel, $service)
+    protected function querySentinelForMaster(NodeConnectionInterface $sentinel, $service)
     {
         $payload = $sentinel->executeCommand(
             RawCommand::create('SENTINEL', 'get-master-addr-by-name', $service)
@@ -225,14 +347,14 @@ class SentinelReplication extends MasterSlaveReplication
     }
 
     /**
-     * Fetches details of the slave servers from a sentinel server.
+     * Fetches the details for the slave servers from a sentinel.
      *
      * @param NodeConnectionInterface $sentinel Connection to a sentinel server.
      * @param string                  $service  Name of the service.
      *
      * @return array
      */
-    protected function getSlavesFromSentinel(NodeConnectionInterface $sentinel, $service)
+    protected function querySentinelForSlaves(NodeConnectionInterface $sentinel, $service)
     {
         $slaves = array();
 
@@ -254,7 +376,7 @@ class SentinelReplication extends MasterSlaveReplication
             $slaves[] = array(
                 'host' => $slave[3],
                 'port' => $slave[5],
-                'alias' => "slave-$slave[3]:$slave[5]",
+                'alias' => "slave-$slave[1]",
             );
         }
 
@@ -262,61 +384,65 @@ class SentinelReplication extends MasterSlaveReplication
     }
 
     /**
-     * Configures replication by querying a sentinel server for autodiscovery.
+     * {@inheritdoc}
+     */
+    public function getCurrent()
+    {
+        return $this->current;
+    }
+
+    /**
+     * {@inheritdoc}
      */
-    public function querySentinel()
+    public function getMaster()
     {
+        if ($this->master) {
+            return $this->master;
+        }
+
         if ($this->updateSentinels) {
             $this->updateSentinels();
         }
 
-        $this->wipeServerList();
-
         SENTINEL_QUERY: {
             $sentinel = $this->getSentinelConnection();
 
             try {
-                $masterParameters = $this->getMasterFromSentinel($sentinel, $this->service);
+                $masterParameters = $this->querySentinelForMaster($sentinel, $this->service);
                 $masterConnection = $this->connectionFactory->create($masterParameters);
-                $this->add($masterConnection);
 
-                if (!$slavesParameters = $this->getSlavesFromSentinel($sentinel, $this->service)) {
-                    unset($masterParameters['alias']);
-                    $slavesParameters[] = $masterParameters;
-                }
-
-                foreach ($slavesParameters as $slaveParameters) {
-                    $this->add($this->connectionFactory->create($slaveParameters));
-                }
+                $this->add($masterConnection);
             } catch (ConnectionException $exception) {
                 $this->sentinelConnection = null;
 
                 goto SENTINEL_QUERY;
             }
         }
+
+        return $masterConnection;
     }
 
     /**
-     * Updates the full list of sentinels by asking to a sentinel server.
+     * {@inheritdoc}
      */
-    public function updateSentinels()
+    public function getSlaves()
     {
+        if ($this->slaves) {
+            return array_values($this->slaves);
+        }
+
+        if ($this->updateSentinels) {
+            $this->updateSentinels();
+        }
+
         SENTINEL_QUERY: {
             $sentinel = $this->getSentinelConnection();
 
             try {
-                $payload = $sentinel->executeCommand(
-                    RawCommand::create('SENTINEL', 'sentinels', $this->service)
-                );
+                $slavesParameters = $this->querySentinelForSlaves($sentinel, $this->service);
 
-                $this->sentinels = array();
-                $this->sentinels[] = $sentinel->getParameters()->toArray();
-
-                foreach ($payload as $sentinel) {
-                    $this->sentinels[] = array(
-                        'host' => $sentinel[3],
-                        'port' => $sentinel[5],
-                    );
+                foreach ($slavesParameters as $slaveParameters) {
+                    $this->add($this->connectionFactory->create($slaveParameters));
                 }
             } catch (ConnectionException $exception) {
                 $this->sentinelConnection = null;
@@ -324,6 +450,66 @@ class SentinelReplication extends MasterSlaveReplication
                 goto SENTINEL_QUERY;
             }
         }
+
+        return array_values($this->slaves ?: array());
+    }
+
+    /**
+     * Returns a random slave.
+     *
+     * @return NodeConnectionInterface
+     */
+    protected function pickSlave()
+    {
+        if ($slaves = $this->getSlaves()) {
+            return $slaves[rand(1, count($slaves)) - 1];
+        }
+    }
+
+    /**
+     * Returns the connection instance in charge for the given command.
+     *
+     * @param CommandInterface $command Command instance.
+     *
+     * @return NodeConnectionInterface
+     */
+    private function getConnectionInternal(CommandInterface $command)
+    {
+        if (!$this->current) {
+            if ($this->strategy->isReadOperation($command) && $slave = $this->pickSlave()) {
+                $this->current = $slave;
+            } else {
+                $this->current = $this->getMaster();
+            }
+
+            return $this->current;
+        }
+
+        if ($this->current === $this->master) {
+            return $this->current;
+        }
+
+        if (!$this->strategy->isReadOperation($command)) {
+            $this->current = $this->getMaster();
+        }
+
+        return $this->current;
+    }
+
+    /**
+     * Asserts that the specified connection matches an expected role.
+     *
+     * @param NodeConnectionInterface $sentinel Connection to a redis server.
+     * @param string                  $role     Expected role of the server ("master", "slave" or "sentinel").
+     */
+    protected function assertConnectionRole(NodeConnectionInterface $connection, $role)
+    {
+        $role = strtolower($role);
+        $actualRole = $connection->executeCommand(RawCommand::create('ROLE'));
+
+        if ($role !== $actualRole[0]) {
+            throw new RoleException($connection, "Expected $role but got $actualRole[0] [$connection]");
+        }
     }
 
     /**
@@ -331,21 +517,109 @@ class SentinelReplication extends MasterSlaveReplication
      */
     public function getConnection(CommandInterface $command)
     {
-        $connection = parent::getConnection($command);
+        $connection = $this->getConnectionInternal($command);
+
+        if (!$connection->isConnected() && $this->slaves) {
+            $this->assertConnectionRole(
+                $connection,
+                $this->strategy->isReadOperation($command) ? 'slave' : 'master'
+            );
+        }
 
-        if (!$connection->isConnected()) {
-            $role = $connection->executeCommand(RawCommand::create('ROLE'));
+        return $connection;
+    }
 
-            if ($connection === $this->master && $role[0] !== 'master') {
-                throw new RoleException($connection, "Expected master but got $role[0] [$connection]");
-            }
+    /**
+     * {@inheritdoc}
+     */
+    public function getConnectionById($connectionId)
+    {
+        if ($connectionId === 'master') {
+            return $this->getMaster();
+        }
 
-            if ($connection !== $this->master && $role[0] !== 'slave') {
-                throw new RoleException($connection, "Expected slave but got $role[0] [$connection]");
-            }
+        $this->getSlaves();
+
+        if (isset($this->slaves[$connectionId])) {
+            return $this->slaves[$connectionId];
         }
+    }
 
-        return $connection;
+    /**
+     * {@inheritdoc}
+     */
+    public function switchTo($connection)
+    {
+        if (!$connection instanceof NodeConnectionInterface) {
+            $connection = $this->getConnectionById($connection);
+        }
+
+        if ($connection === $this->current) {
+            return;
+        }
+
+        if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
+            throw new \InvalidArgumentException('Invalid connection or connection not found.');
+        }
+
+        $connection->connect();
+
+        if ($this->current) {
+            $this->current->disconnect();
+        }
+
+        $this->current = $connection;
+    }
+
+    /**
+     * Switches to the master server.
+     */
+    public function switchToMaster()
+    {
+        $this->switchTo('master');
+    }
+
+    /**
+     * Switches to a random slave server.
+     */
+    public function switchToSlave()
+    {
+        $connection = $this->pickSlave();
+        $this->switchTo($connection);
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function isConnected()
+    {
+        return $this->current ? $this->current->isConnected() : false;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function connect()
+    {
+        if (!$this->current) {
+            $this->current = $this->pickSlave();
+        }
+
+        $this->current->connect();
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function disconnect()
+    {
+        if ($this->master) {
+            $this->master->disconnect();
+        }
+
+        foreach ($this->slaves as $connection) {
+            $connection->disconnect();
+        }
     }
 
     /**
@@ -363,16 +637,18 @@ class SentinelReplication extends MasterSlaveReplication
 
         SENTINEL_RETRY: {
             try {
-                $response = parent::$method($command);
+                $response = $this->getConnection($command)->$method($command);
             } catch (CommunicationException $exception) {
                 if ($retries == $this->retryLimit) {
                     throw $exception;
                 }
 
+                $this->wipeServerList();
                 $exception->getConnection()->disconnect();
-                $this->querySentinel();
 
-                $retries++;
+                usleep($this->retryWait * 1000);
+
+                ++$retries;
                 goto SENTINEL_RETRY;
             }
         }
@@ -403,4 +679,24 @@ class SentinelReplication extends MasterSlaveReplication
     {
         return $this->retryCommandOnFailure(__FUNCTION__, $command);
     }
+
+    /**
+     * Returns the underlying replication strategy.
+     *
+     * @return ReplicationStrategy
+     */
+    public function getReplicationStrategy()
+    {
+        return $this->strategy;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function __sleep()
+    {
+        return array(
+            'master', 'slaves', 'service', 'sentinels', 'connectionFactory', 'strategy',
+        );
+    }
 }