Browse Source

Merge branch 'replication' into v0.7

See ISSUE #21 for history and details about transparent master / slave
replication support in Predis.
Daniele Alessandri 13 years ago
parent
commit
2cff6f0886

+ 52 - 0
examples/MasterSlaveReplication.php

@@ -0,0 +1,52 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+require 'SharedConfigurations.php';
+
+// Predis supports master / slave replication scenarios where write operations are
+// performed on the master server and read operations are executed against one of
+// the slaves. The behaviour of commands or EVAL scripts can be customized at will.
+// As soon as a write operation is performed, all the subsequent requests (reads
+// or writes) will be served by the master server.
+//
+// This example must be executed with the second Redis server acting as the slave
+// of the first one using the SLAVEOF command.
+//
+
+$parameters = array(
+    'tcp://127.0.0.1:6379?database=15&alias=master',
+    'tcp://127.0.0.1:6379?database=15&alias=slave',
+);
+
+$options = array('replication' => true);
+
+$client = new Predis\Client($parameters, $options);
+
+// Read operation.
+$exists = $client->exists('foo') ? 'yes' : 'no';
+$current = $client->getConnection()->getCurrent()->getParameters();
+echo "Does 'foo' exist on {$current->alias}? $exists.\n";
+
+// Write operation.
+$client->set('foo', 'bar');
+$current = $client->getConnection()->getCurrent()->getParameters();
+echo "Now 'foo' has been set to 'bar' on {$current->alias}!\n";
+
+// Read operation.
+$bar = $client->get('foo');
+$current = $client->getConnection()->getCurrent()->getParameters();
+echo "We just fetched 'foo' from {$current->alias} and its value is '$bar'.\n";
+
+/* OUTPUT:
+Does 'foo' exist on slave? yes.
+Now 'foo' has been set to 'bar' on master!
+We just fetched 'foo' from master and its value is 'bar'.
+*/

+ 9 - 3
lib/Predis/Client.php

@@ -90,8 +90,14 @@ class Client
         if ($parameters instanceof IConnection) {
             return $parameters;
         }
+
         if (is_array($parameters) && isset($parameters[0])) {
-            return $this->connections->createCluster($this->options->cluster, $parameters, $this->profile);
+            $replication = isset($this->options->replication) && $this->options->replication;
+
+            $connection = $this->options->{$replication ? 'replication' : 'cluster'};
+            $initializer = $replication ? 'createReplication' : 'createCluster';
+
+            return $this->connections->$initializer($connection, $parameters, $this->profile);
         }
 
         return $this->connections->create($parameters, $this->profile);
@@ -190,8 +196,8 @@ class Client
     public function getConnection($id = null)
     {
         if (isset($id)) {
-            if (!Helpers::isCluster($this->connection)) {
-                $message = 'Retrieving connections by alias is supported only with clustered connections';
+            if (!Helpers::isAggregated($this->connection)) {
+                $message = 'Retrieving connections by alias is supported only with aggregated connections (cluster or replication)';
                 throw new NotSupportedException($message);
             }
             return $this->connection->getConnectionById($id);

+ 13 - 0
lib/Predis/ConnectionFactory.php

@@ -14,6 +14,7 @@ namespace Predis;
 use Predis\Profiles\IServerProfile;
 use Predis\Network\IConnectionSingle;
 use Predis\Network\IConnectionCluster;
+use Predis\Network\IConnectionReplication;
 use Predis\Profiles\ServerProfile;
 
 /**
@@ -135,6 +136,18 @@ class ConnectionFactory implements IConnectionFactory
         return $cluster;
     }
 
+    /**
+     * {@inheritdoc}
+     */
+    public function createReplication(IConnectionReplication $replication, $parameters, IServerProfile $profile = null)
+    {
+        foreach ($parameters as $node) {
+            $replication->add($node instanceof IConnectionSingle ? $node : $this->create($node, $profile));
+        }
+
+        return $replication;
+    }
+
     /**
      * Prepares a connection object after its initialization.
      *

+ 12 - 0
lib/Predis/Helpers.php

@@ -13,6 +13,7 @@ namespace Predis;
 
 use Predis\Network\IConnection;
 use Predis\Network\IConnectionCluster;
+use Predis\Network\IConnectionReplication;
 
 /**
  * Defines a few helper methods.
@@ -21,6 +22,17 @@ use Predis\Network\IConnectionCluster;
  */
 class Helpers
 {
+    /**
+     * Checks if the specified connection represents an aggregation of connections.
+     *
+     * @param IConnection $connection Connection object.
+     * @return Boolean
+     */
+    public static function isAggregated(IConnection $connection)
+    {
+        return $connection instanceof IConnectionCluster || $connection instanceof IConnectionReplication;
+    }
+
     /**
      * Checks if the specified connection represents a cluster.
      *

+ 10 - 0
lib/Predis/IConnectionFactory.php

@@ -13,6 +13,7 @@ namespace Predis;
 
 use Predis\Profiles\IServerProfile;
 use Predis\Network\IConnectionCluster;
+use Predis\Network\IConnectionReplication;
 
 /**
  * Interface that must be implemented by classes that provide their own mechanism
@@ -53,4 +54,13 @@ interface IConnectionFactory
      * @return Predis\Network\IConnectionCluster
      */
     public function createCluster(IConnectionCluster $cluster, $parameters, IServerProfile $profile = null);
+
+    /**
+     * Prepares a master / slave replication configuration.
+     *
+     * @param IConnectionReplication Instance of a connection cluster class.
+     * @param array $parameters List of parameters for each connection object.
+     * @return Predis\Network\IConnectionReplication
+     */
+    public function createReplication(IConnectionReplication $replication, $parameters, IServerProfile $profile = null);
 }

+ 81 - 0
lib/Predis/Network/IConnectionReplication.php

@@ -0,0 +1,81 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Network;
+
+use Predis\Commands\ICommand;
+
+/**
+ * Defines a group of Redis servers in a master/slave replication configuration.
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+interface IConnectionReplication extends IConnection
+{
+    /**
+     * Adds a connection instance to the cluster.
+     *
+     * @param IConnectionSingle $connection Instance of a connection.
+     */
+    public function add(IConnectionSingle $connection);
+
+    /**
+     * Removes the specified connection instance from the cluster.
+     *
+     * @param IConnectionSingle $connection Instance of a connection.
+     * @return Boolean Returns true if the connection was in the pool.
+     */
+    public function remove(IConnectionSingle $connection);
+
+    /**
+     * Gets the actual connection instance in charge of the specified command.
+     *
+     * @param ICommand $command Instance of a Redis command.
+     * @return IConnectionSingle
+     */
+    public function getConnection(ICommand $command);
+
+    /**
+     * Retrieves a connection instance from the cluster using an alias.
+     *
+     * @param string $connectionId Alias of a connection
+     * @return IConnectionSingle
+     */
+    public function getConnectionById($connectionId);
+
+    /**
+     * Switches the internal connection object being used.
+     *
+     * @param string $connection Alias of a connection
+     */
+    public function switchTo($connection);
+
+    /**
+     * Retrieves the connection object currently being used.
+     *
+     * @return IConnectionSingle
+     */
+    public function getCurrent();
+
+    /**
+     * Retrieves the connection object to the master Redis server.
+     *
+     * @return IConnectionSingle
+     */
+    public function getMaster();
+
+    /**
+     * Retrieves a list of connection objects to slaves Redis servers.
+     *
+     * @return IConnectionSingle
+     */
+    public function getSlaves();
+}

+ 405 - 0
lib/Predis/Network/MasterSlaveReplication.php

@@ -0,0 +1,405 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Network;
+
+use Predis\Commands\ICommand;
+use Predis\NotSupportedException;
+
+/**
+ * Defines the standard virtual connection class that is used
+ * by Predis to handle replication with a group of servers in
+ * a master/slave configuration.
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class MasterSlaveReplication implements IConnectionReplication
+{
+    private $disallowed = array();
+    private $readonly = array();
+    private $readonlySHA1 = array();
+    private $current = null;
+    private $master = null;
+    private $slaves = array();
+
+    /**
+     *
+     */
+    public function __construct()
+    {
+        $this->disallowed = $this->getDisallowedOperations();
+        $this->readonly = $this->getReadOnlyOperations();
+    }
+
+    /**
+     * Checks if one master and at least one slave have been defined.
+     */
+    protected function check()
+    {
+        if (!isset($this->master) || !$this->slaves) {
+            throw new \RuntimeException('Replication needs a master and at least one slave.');
+        }
+    }
+
+    /**
+     * Resets the connection state.
+     */
+    protected function reset()
+    {
+        $this->current = null;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function add(IConnectionSingle $connection)
+    {
+        $alias = $connection->getParameters()->alias;
+
+        if ($alias === 'master') {
+            $this->master = $connection;
+        }
+        else {
+            $this->slaves[$alias ?: count($this->slaves)] = $connection;
+        }
+
+        $this->reset();
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function remove(IConnectionSingle $connection)
+    {
+        if ($connection->getParameters()->alias === 'master') {
+            $this->master = null;
+            $this->reset();
+
+            return true;
+        }
+        else {
+            if (($id = array_search($connection, $this->slaves, true)) !== false) {
+                unset($this->slaves[$id]);
+                $this->reset();
+
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function getConnection(ICommand $command)
+    {
+        if ($this->current === null) {
+            $this->check();
+            $this->current = $this->isReadOperation($command) ? $this->pickSlave() : $this->master;
+
+            return $this->current;
+        }
+
+        if ($this->current === $this->master) {
+            return $this->current;
+        }
+
+        if (!$this->isReadOperation($command)) {
+            $this->current = $this->master;
+        }
+
+        return $this->current;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function getConnectionById($connectionId)
+    {
+        if ($connectionId === 'master') {
+            return $this->master;
+        }
+        if (isset($this->slaves[$connectionId])) {
+            return $this->slaves[$connectionId];
+        }
+
+        return null;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function switchTo($connection)
+    {
+        $this->check();
+
+        if (!$connection instanceof IConnectionSingle) {
+            $connection = $this->getConnectionById($connection);
+        }
+        if ($connection !== $this->master && !in_array($connection, $this->slaves, true)) {
+            throw new \InvalidArgumentException('The specified connection is not valid.');
+        }
+
+        $this->current = $connection;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function getCurrent()
+    {
+        return $this->current;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function getMaster()
+    {
+        return $this->master;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function getSlaves()
+    {
+        return array_values($this->slaves);
+    }
+
+    /**
+     * Returns a random slave.
+     *
+     * @return IConnectionSingle
+     */
+    protected function pickSlave()
+    {
+        return $this->slaves[array_rand($this->slaves)];
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function isConnected()
+    {
+        return $this->current ? $this->current->isConnected() : false;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function connect()
+    {
+        if ($this->current === null) {
+            $this->check();
+            $this->current = $this->pickSlave();
+        }
+
+        $this->current->connect();
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function disconnect()
+    {
+        if ($this->master) {
+            $this->master->disconnect();
+        }
+        foreach ($this->slaves as $connection) {
+            $connection->disconnect();
+        }
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function writeCommand(ICommand $command)
+    {
+        $this->getConnection($command)->writeCommand($command);
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function readResponse(ICommand $command)
+    {
+        return $this->getConnection($command)->readResponse($command);
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function executeCommand(ICommand $command)
+    {
+        return $this->getConnection($command)->executeCommand($command);
+    }
+
+    /**
+     * Returns if the specified command performs a read-only operation
+     * against a key stored on Redis.
+     *
+     * @param ICommand $command Instance of Redis command.
+     * @return Boolean
+     */
+    protected function isReadOperation(ICommand $command)
+    {
+        if (isset($this->disallowed[$id = $command->getId()])) {
+            throw new NotSupportedException("The command $id is not allowed in replication mode");
+        }
+
+        if (isset($this->readonly[$id])) {
+            if (true === $readonly = $this->readonly[$id]) {
+                return true;
+            }
+
+            return $readonly($command);
+        }
+
+        if (($eval = $id === 'EVAL') || $id === 'EVALSHA') {
+            $sha1 = $eval ? sha1($command->getArgument(0)) : $command->getArgument(0);
+
+            if (isset($this->readonlySHA1[$sha1])) {
+                if (true === $readonly = $this->readonlySHA1[$sha1]) {
+                    return true;
+                }
+
+                return $readonly($command);
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * Marks a command as a read-only operation. When the behaviour of a
+     * command can be decided only at runtime depending on its arguments,
+     * a callable object can be provided to dinamically check if the passed
+     * instance of a command performs write operations or not.
+     *
+     * @param string $commandID ID of the command.
+     * @param mixed $readonly A boolean or a callable object.
+     */
+    public function setCommandReadOnly($commandID, $readonly = true)
+    {
+        $commandID = strtoupper($commandID);
+
+        if ($readonly) {
+            $this->readonly[$commandID] = $readonly;
+        }
+        else {
+            unset($this->readonly[$commandID]);
+        }
+    }
+
+    /**
+     * Marks a Lua script for EVAL and EVALSHA as a read-only operation. When
+     * the behaviour of a script can be decided only at runtime depending on
+     * its arguments, a callable object can be provided to dinamically check
+     * if the passed instance of EVAL or EVALSHA performs write operations or
+     * not.
+     *
+     * @param string $script Body of the Lua script.
+     * @param mixed $readonly A boolean or a callable object.
+     */
+    public function setScriptReadOnly($script, $readonly = true)
+    {
+        $sha1 = sha1($script);
+
+        if ($readonly) {
+            $this->readonlySHA1[$sha1] = $readonly;
+        }
+        else {
+            unset($this->readonlySHA1[$sha1]);
+        }
+    }
+
+    /**
+     * Returns the default list of disallowed commands.
+     *
+     * @return array
+     */
+    protected function getDisallowedOperations()
+    {
+        return array(
+            'SHUTDOWN'          => true,
+            'INFO'              => true,
+            'DBSIZE'            => true,
+            'LASTSAVE'          => true,
+            'CONFIG'            => true,
+            'MONITOR'           => true,
+            'SLAVEOF'           => true,
+            'SAVE'              => true,
+            'BGSAVE'            => true,
+            'BGREWRITEAOF'      => true,
+            'SLOWLOG'           => true,
+        );
+    }
+
+    /**
+     * Returns the default list of commands performing read-only operations.
+     *
+     * @return array
+     */
+    protected function getReadOnlyOperations()
+    {
+        return array(
+            'EXISTS'            => true,
+            'TYPE'              => true,
+            'KEYS'              => true,
+            'RANDOMKEY'         => true,
+            'TTL'               => true,
+            'GET'               => true,
+            'MGET'              => true,
+            'SUBSTR'            => true,
+            'STRLEN'            => true,
+            'GETRANGE'          => true,
+            'GETBIT'            => true,
+            'LLEN'              => true,
+            'LRANGE'            => true,
+            'LINDEX'            => true,
+            'SCARD'             => true,
+            'SISMEMBER'         => true,
+            'SINTER'            => true,
+            'SUNION'            => true,
+            'SDIFF'             => true,
+            'SMEMBERS'          => true,
+            'SRANDMEMBER'       => true,
+            'ZRANGE'            => true,
+            'ZREVRANGE'         => true,
+            'ZRANGEBYSCORE'     => true,
+            'ZREVRANGEBYSCORE'  => true,
+            'ZCARD'             => true,
+            'ZSCORE'            => true,
+            'ZCOUNT'            => true,
+            'ZRANK'             => true,
+            'ZREVRANK'          => true,
+            'HGET'              => true,
+            'HMGET'             => true,
+            'HEXISTS'           => true,
+            'HLEN'              => true,
+            'HKEYS'             => true,
+            'HVELS'             => true,
+            'HGETALL'           => true,
+            'PING'              => true,
+            'AUTH'              => true,
+            'SELECT'            => true,
+            'ECHO'              => true,
+            'QUIT'              => true,
+            'OBJECT'            => true,
+            'SORT'              => function(ICommand $command) {
+                $arguments = $command->getArguments();
+                return ($c = count($arguments)) === 1 ? true : $arguments[$c - 2] !== 'STORE';
+            },
+        );
+    }
+}

+ 1 - 0
lib/Predis/Options/ClientOptions.php

@@ -42,6 +42,7 @@ class ClientOptions implements IClientOptions
             'profile' => new ClientProfile(),
             'connections' => new ClientConnectionFactory(),
             'cluster' => new ClientCluster(),
+            'replication' => new ClientReplication(),
             'prefix' => new ClientPrefix(),
         );
     }

+ 74 - 0
lib/Predis/Options/ClientReplication.php

@@ -0,0 +1,74 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Options;
+
+use Predis\Network\IConnectionReplication;
+use Predis\Network\MasterSlaveReplication;
+
+/**
+ * Option class that returns a replication connection be used by a client.
+ *
+ * @author Daniele Alessandri <suppakilla@gmail.com>
+ */
+class ClientReplication extends Option
+{
+    /**
+     * Checks if the specified value is a valid instance of IConnectionReplication.
+     *
+     * @param IConnectionReplication $cluster Instance of a connection cluster.
+     * @return IConnectionReplication
+     */
+    protected function checkInstance($connection)
+    {
+        if (!$connection instanceof IConnectionReplication) {
+            throw new \InvalidArgumentException('Instance of Predis\Network\IConnectionReplication expected');
+        }
+
+        return $connection;
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function filter(IClientOptions $options, $value)
+    {
+        if (is_callable($value)) {
+            $connection = call_user_func($value, $options);
+            if (!$connection instanceof IConnectionReplication) {
+                throw new \InvalidArgumentException('Instance of Predis\Network\IConnectionReplication expected');
+            }
+            return $connection;
+        }
+
+        if (is_string($value)) {
+            if (!class_exists($value)) {
+                throw new \InvalidArgumentException("Class $value does not exist");
+            }
+            if (!($connection = new $value()) instanceof IConnectionReplication) {
+                throw new \InvalidArgumentException('Instance of Predis\Network\IConnectionReplication expected');
+            }
+            return $connection;
+        }
+
+        if ($value == true) {
+            return $this->getDefault($options);
+        }
+    }
+
+    /**
+     * {@inheritdoc}
+     */
+    public function getDefault(IClientOptions $options)
+    {
+        return new MasterSlaveReplication();
+    }
+}

+ 8 - 0
lib/Predis/Pipeline/PipelineContext.php

@@ -15,6 +15,7 @@ use Predis\Client;
 use Predis\Helpers;
 use Predis\ClientException;
 use Predis\Commands\ICommand;
+use Predis\Network\IConnectionReplication;
 
 /**
  * Abstraction of a pipeline context where write and read operations
@@ -120,6 +121,13 @@ class PipelineContext
         if (count($this->pipeline) > 0) {
             if ($send) {
                 $connection = $this->client->getConnection();
+
+                // TODO: it would be better to use a dedicated pipeline executor
+                //       for classes implementing master/slave replication.
+                if ($connection instanceof IConnectionReplication) {
+                    $connection->switchTo('master');
+                }
+
                 $replies = $this->executor->execute($connection, $this->pipeline);
                 $this->replies = array_merge($this->replies, $replies);
             }

+ 32 - 1
tests/Predis/ClientTest.php

@@ -15,6 +15,7 @@ use \PHPUnit_Framework_TestCase as StandardTestCase;
 
 use Predis\Profiles\ServerProfile;
 use Predis\Network\PredisCluster;
+use Predis\Network\MasterSlaveReplication;
 
 /**
  *
@@ -179,6 +180,22 @@ class ClientTest extends StandardTestCase
         $this->assertSame($cluster, $client->getConnection());
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testConstructorWithReplicationArgument()
+    {
+        $replication = new MasterSlaveReplication();
+
+        $factory = new ConnectionFactory();
+        $factory->createReplication($replication, array('tcp://host1?alias=master', 'tcp://host2?alias=slave'));
+
+        $client = new Client($replication);
+
+        $this->assertInstanceOf('Predis\Network\IConnectionReplication', $client->getConnection());
+        $this->assertSame($replication, $client->getConnection());
+    }
+
     /**
      * @group disconnected
      */
@@ -217,6 +234,20 @@ class ClientTest extends StandardTestCase
         $this->assertSame($factory, $client->getConnectionFactory());
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testConstructorWithArrayAndOptionReplicationArgument()
+    {
+        $arg1 = array('tcp://host1?alias=master', 'tcp://host2?alias=slave');
+        $arg2 = array('replication' => true);
+        $client = new Client($arg1, $arg2);
+
+        $this->assertInstanceOf('Predis\Network\IConnectionReplication', $connection = $client->getConnection());
+        $this->assertSame('host1', $connection->getConnectionById('master')->getParameters()->host);
+        $this->assertSame('host2', $connection->getConnectionById('slave')->getParameters()->host);
+    }
+
     /**
      * @group disconnected
      */
@@ -385,7 +416,7 @@ class ClientTest extends StandardTestCase
     /**
      * @group disconnected
      * @expectedException Predis\NotSupportedException
-     * @expectedExceptionMessage Retrieving connections by alias is supported only with clustered connections
+     * @expectedExceptionMessage Retrieving connections by alias is supported only with aggregated connections (cluster or replication)
      */
     public function testGetConnectionWithAliasWorksOnlyWithCluster()
     {

+ 23 - 0
tests/Predis/ConnectionFactoryTest.php

@@ -335,6 +335,29 @@ class ConnectionFactoryTest extends StandardTestCase
         $factory->createCluster($cluster, $nodes, $profile);
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testReplicationWithMixedConnectionParameters()
+    {
+        list(, $connectionClass) = $this->getMockConnectionClass();
+
+        $replication = $this->getMock('Predis\Network\IConnectionReplication');
+        $replication->expects($this->exactly(4))
+                    ->method('add')
+                    ->with($this->isInstanceOf('Predis\Network\IConnectionSingle'));
+
+        $factory = $this->getMock('Predis\ConnectionFactory', array('create'));
+        $factory->expects($this->exactly(3))
+                ->method('create')
+                ->will($this->returnCallback(function($_, $_) use($connectionClass) {
+                    return new $connectionClass;
+                }));
+
+        $factory->createReplication($replication, array(null, 'tcp://127.0.0.1', array('scheme' => 'tcp'), new $connectionClass()));
+    }
+
+
     // ******************************************************************** //
     // ---- HELPER METHODS ------------------------------------------------ //
     // ******************************************************************** //

+ 539 - 0
tests/Predis/Network/MasterSlaveReplicationTest.php

@@ -0,0 +1,539 @@
+<?php
+
+/*
+ * This file is part of the Predis package.
+ *
+ * (c) Daniele Alessandri <suppakilla@gmail.com>
+ *
+ * For the full copyright and license information, please view the LICENSE
+ * file that was distributed with this source code.
+ */
+
+namespace Predis\Network;
+
+use \PHPUnit_Framework_TestCase as StandardTestCase;
+
+use Predis\ConnectionParameters;
+use Predis\Profiles\ServerProfile;
+
+/**
+ *
+ */
+class MasterSlaveReplicationTest extends StandardTestCase
+{
+    /**
+     * @group disconnected
+     */
+    public function testAddingConnectionsToReplication()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave2 = $this->getMockConnection('tcp://host3?alias=slave2');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+        $replication->add($slave2);
+
+        $this->assertSame($master, $replication->getConnectionById('master'));
+        $this->assertSame($slave1, $replication->getConnectionById('slave1'));
+        $this->assertSame($slave2, $replication->getConnectionById('slave2'));
+
+        $this->assertSame($master, $replication->getMaster());
+        $this->assertSame(array($slave1, $slave2), $replication->getSlaves());
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testRemovingConnectionsFromReplication()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave2 = $this->getMockConnection('tcp://host3?alias=slave2');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $this->assertTrue($replication->remove($slave1));
+        $this->assertFalse($replication->remove($slave2));
+
+        $this->assertSame($master, $replication->getMaster());
+        $this->assertSame(array(), $replication->getSlaves());
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException RuntimeException
+     * @expectedExceptionMessage Replication needs a master and at least one slave
+     */
+    public function testThrowsExceptionOnEmptyReplication()
+    {
+        $replication = new MasterSlaveReplication();
+        $replication->connect();
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException RuntimeException
+     * @expectedExceptionMessage Replication needs a master and at least one slave
+     */
+    public function testThrowsExceptionOnMissingMaster()
+    {
+        $replication = new MasterSlaveReplication();
+        $replication->add($this->getMockConnection('tcp://host2?alias=slave1'));
+
+        $replication->connect();
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException RuntimeException
+     * @expectedExceptionMessage Replication needs a master and at least one slave
+     */
+    public function testThrowsExceptionOnMissingSlave()
+    {
+        $replication = new MasterSlaveReplication();
+        $replication->add($this->getMockConnection('tcp://host1?alias=master'));
+
+        $replication->connect();
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testConnectForcesConnectionToOneOfSlaves()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->never())->method('connect');
+
+        $slave = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave->expects($this->once())->method('connect');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave);
+
+        $replication->connect();
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testIsConnectedReturnsTrueIfAtLeastOneConnectionIsOpen()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->never())->method('isConnected')->will($this->returnValue(false));
+
+        $slave = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave->expects($this->once())->method('isConnected')->will($this->returnValue(true));
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave);
+        $replication->connect();
+
+        $this->assertTrue($replication->isConnected());
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testIsConnectedReturnsFalseIfAllConnectionsAreClosed()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->any())->method('isConnected')->will($this->returnValue(false));
+
+        $slave = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave->expects($this->any())->method('isConnected')->will($this->returnValue(false));
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave);
+
+        $this->assertFalse($replication->isConnected());
+
+        $replication->connect();
+        $replication->disconnect();
+
+        $this->assertFalse($replication->isConnected());
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testDisconnectForcesCurrentConnectionToDisconnect()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())->method('disconnect');
+
+        $slave = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave->expects($this->once())->method('disconnect');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave);
+
+        $replication->disconnect();
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testCanSwitchConnectionByAlias()
+    {
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $this->assertNull($replication->getCurrent());
+
+        $replication->switchTo('master');
+        $this->assertSame($master, $replication->getCurrent());
+        $replication->switchTo('slave1');
+        $this->assertSame($slave1, $replication->getCurrent());
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException InvalidArgumentException
+     * @expectedExceptionMessage The specified connection is not valid
+     */
+    public function testThrowsErrorWhenSwitchingToUnknownConnection()
+    {
+        $replication = new MasterSlaveReplication();
+        $replication->add($this->getMockConnection('tcp://host1?alias=master'));
+        $replication->add($this->getMockConnection('tcp://host2?alias=slave1'));
+
+        $replication->switchTo('unknown');
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testUsesSlavesOnReadOnlyCommands()
+    {
+        $profile = ServerProfile::getDefault();
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $cmd = $profile->createCommand('exists', array('foo'));
+        $this->assertSame($slave1, $replication->getConnection($cmd));
+
+        $cmd = $profile->createCommand('get', array('foo'));
+        $this->assertSame($slave1, $replication->getConnection($cmd));
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testUsesMasterOnWriteCommands()
+    {
+        $profile = ServerProfile::getDefault();
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $cmd = $profile->createCommand('set', array('foo', 'bar'));
+        $this->assertSame($master, $replication->getConnection($cmd));
+
+        $cmd = $profile->createCommand('get', array('foo'));
+        $this->assertSame($master, $replication->getConnection($cmd));
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testSwitchesFromSlaveToMasterOnWriteCommands()
+    {
+        $profile = ServerProfile::getDefault();
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $cmd = $profile->createCommand('exists', array('foo'));
+        $this->assertSame($slave1, $replication->getConnection($cmd));
+
+        $cmd = $profile->createCommand('set', array('foo', 'bar'));
+        $this->assertSame($master, $replication->getConnection($cmd));
+
+        $cmd = $profile->createCommand('exists', array('foo'));
+        $this->assertSame($master, $replication->getConnection($cmd));
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testWritesCommandToCorrectConnection()
+    {
+        $profile = ServerProfile::getDefault();
+        $cmdExists = $profile->createCommand('exists', array('foo'));
+        $cmdSet = $profile->getDefault()->createCommand('set', array('foo', 'bar'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())->method('writeCommand')->with($cmdSet);
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->once())->method('writeCommand')->with($cmdExists);
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->writeCommand($cmdExists);
+        $replication->writeCommand($cmdSet);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testReadsCommandFromCorrectConnection()
+    {
+        $profile = ServerProfile::getDefault();
+        $cmdExists = $profile->createCommand('exists', array('foo'));
+        $cmdSet = $profile->getDefault()->createCommand('set', array('foo', 'bar'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())->method('readResponse')->with($cmdSet);
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->once())->method('readResponse')->with($cmdExists);
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->readResponse($cmdExists);
+        $replication->readResponse($cmdSet);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testExecutesCommandOnCorrectConnection()
+    {
+        $profile = ServerProfile::getDefault();
+        $cmdExists = $profile->createCommand('exists', array('foo'));
+        $cmdSet = $profile->getDefault()->createCommand('set', array('foo', 'bar'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())->method('executeCommand')->with($cmdSet);
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->once())->method('executeCommand')->with($cmdExists);
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->executeCommand($cmdExists);
+        $replication->executeCommand($cmdSet);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testWatchTriggersSwitchToMasterConnection()
+    {
+        $profile = ServerProfile::getDefault();
+        $cmdWatch = $profile->createCommand('watch', array('foo'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())->method('executeCommand')->with($cmdWatch);
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->never())->method('executeCommand');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->executeCommand($cmdWatch);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testMultiTriggersSwitchToMasterConnection()
+    {
+        $profile = ServerProfile::getDefault();
+        $cmdMulti = $profile->createCommand('multi');
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())->method('executeCommand')->with($cmdMulti);
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->never())->method('executeCommand');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->executeCommand($cmdMulti);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testEvalTriggersSwitchToMasterConnection()
+    {
+        $profile = ServerProfile::get('dev');
+        $cmdEval = $profile->createCommand('eval', array("return redis.call('info')"));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())->method('executeCommand')->with($cmdEval);
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->never())->method('executeCommand');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->executeCommand($cmdEval);
+    }
+
+    /**
+     * @group disconnected
+     * @expectedException Predis\NotSupportedException
+     * @expectedExceptionMessage The command INFO is not allowed in replication mode
+     */
+    public function testThrowsExceptionOnNonSupportedCommand()
+    {
+        $cmd = ServerProfile::getDefault()->createCommand('info');
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($this->getMockConnection('tcp://host1?alias=master'));
+        $replication->add($this->getMockConnection('tcp://host2?alias=slave1'));
+
+        $replication->getConnection($cmd);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testCanOverrideReadOnlyFlagForCommands()
+    {
+        $profile = ServerProfile::getDefault();
+        $cmdSet = $profile->createCommand('set', array('foo', 'bar'));
+        $cmdGet = $profile->createCommand('get', array('foo'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())->method('executeCommand')->with($cmdGet);
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->once())->method('executeCommand')->with($cmdSet);
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->setCommandReadOnly($cmdSet->getId(), true);
+        $replication->setCommandReadOnly($cmdGet->getId(), false);
+
+        $replication->executeCommand($cmdSet);
+        $replication->executeCommand($cmdGet);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testAcceptsCallableToOverrideReadOnlyFlagForCommands()
+    {
+        $profile = ServerProfile::getDefault();
+        $cmdExistsFoo = $profile->createCommand('exists', array('foo'));
+        $cmdExistsBar = $profile->createCommand('exists', array('bar'));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->once())->method('executeCommand')->with($cmdExistsBar);
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->once())->method('executeCommand')->with($cmdExistsFoo);
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->setCommandReadOnly('exists', function($cmd) {
+            list($arg1) = $cmd->getArguments();
+            return $arg1 === 'foo';
+        });
+
+        $replication->executeCommand($cmdExistsFoo);
+        $replication->executeCommand($cmdExistsBar);
+    }
+
+    /**
+     * @group disconnected
+     */
+    public function testCanSetReadOnlyFlagForEvalScripts()
+    {
+        $profile = ServerProfile::get('dev');
+
+        $cmdEval = $profile->createCommand('eval', array($script = "return redis.call('info');"));
+        $cmdEvalSha = $profile->createCommand('evalsha', array($scriptSHA1 = sha1($script)));
+
+        $master = $this->getMockConnection('tcp://host1?alias=master');
+        $master->expects($this->never())->method('executeCommand');
+
+        $slave1 = $this->getMockConnection('tcp://host2?alias=slave1');
+        $slave1->expects($this->exactly(2))
+               ->method('executeCommand')
+               ->with($this->logicalOr($cmdEval, $cmdEvalSha));
+
+        $replication = new MasterSlaveReplication();
+        $replication->add($master);
+        $replication->add($slave1);
+
+        $replication->setScriptReadOnly($script);
+
+        $replication->executeCommand($cmdEval);
+        $replication->executeCommand($cmdEvalSha);
+    }
+
+    // ******************************************************************** //
+    // ---- HELPER METHODS ------------------------------------------------ //
+    // ******************************************************************** //
+
+    /**
+     * Returns a base mocked connection from Predis\Network\IConnectionSingle.
+     *
+     * @param mixed $parameters Optional parameters.
+     * @return mixed
+     */
+    protected function getMockConnection($parameters = null)
+    {
+        $connection = $this->getMock('Predis\Network\IConnectionSingle');
+
+        if ($parameters) {
+            $parameters = new ConnectionParameters($parameters);
+            $hash = "{$parameters->host}:{$parameters->port}";
+
+            $connection->expects($this->any())
+                       ->method('getParameters')
+                       ->will($this->returnValue($parameters));
+            $connection->expects($this->any())
+                       ->method('__toString')
+                       ->will($this->returnValue($hash));
+        }
+
+        return $connection;
+    }
+}