Ver Fonte

Merge branch 'connections'

Daniele Alessandri há 14 anos atrás
pai
commit
5defb14698
2 ficheiros alterados com 97 adições e 36 exclusões
  1. 84 23
      lib/Predis.php
  2. 13 13
      test/PredisClientFeatures.php

+ 84 - 23
lib/Predis.php

@@ -14,7 +14,9 @@ class ServerException extends PredisException {                     // Server-si
 class CommunicationException extends PredisException {              // Communication errors
     private $_connection;
 
-    public function __construct(Connection $connection, $message = null, $code = null) {
+    public function __construct(IConnectionSingle $connection, 
+        $message = null, $code = null) {
+
         $this->_connection = $connection;
         parent::__construct($message, $code);
     }
@@ -112,7 +114,7 @@ class Client {
 
     private function createConnection($parameters) {
         $params     = new ConnectionParameters($parameters);
-        $connection = new Connection($params, $this->_responseReader);
+        $connection = Connection::create($params, $this->_responseReader);
 
         if ($params->password !== null) {
             $connection->pushInitCommand($this->createCommand(
@@ -153,7 +155,7 @@ class Client {
     }
 
     public function getClientFor($connectionAlias) {
-        if (!($this->_connection instanceof ConnectionCluster)) {
+        if (!($this->_connection instanceof IConnectionCluster)) {
             throw new ClientException(
                 'This method is supported only when the client is connected to a cluster of connections'
             );
@@ -189,7 +191,7 @@ class Client {
             return $this->_connection;
         }
         else {
-            return $this->_connection instanceof ConnectionCluster 
+            return $this->_connection instanceof IConnectionCluster 
                 ? $this->_connection->getConnectionById($id)
                 : $this->_connection;
         }
@@ -210,7 +212,7 @@ class Client {
 
     public function executeCommandOnShards(Command $command) {
         $replies = array();
-        if ($this->_connection instanceof \Predis\ConnectionCluster) {
+        if ($this->_connection instanceof \Predis\IConnectionCluster) {
             foreach($this->_connection as $connection) {
                 $replies[] = $connection->executeCommand($command);
             }
@@ -222,7 +224,7 @@ class Client {
     }
 
     public function rawCommand($rawCommandData, $closesConnection = false) {
-        if ($this->_connection instanceof \Predis\ConnectionCluster) {
+        if ($this->_connection instanceof \Predis\IConnectionCluster) {
             throw new ClientException('Cannot send raw commands when connected to a cluster of Redis servers');
         }
         return $this->_connection->rawCommand($rawCommandData, $closesConnection);
@@ -234,7 +236,7 @@ class Client {
 
     public function pipelineSafe($pipelineBlock = null) {
         $connection = $this->getConnection();
-        $pipeline   = new CommandPipeline($this, $connection instanceof Connection
+        $pipeline   = new CommandPipeline($this, $connection instanceof IConnectionSingle
             ? new Pipeline\SafeExecutor($connection)
             : new Pipeline\SafeClusterExecutor($connection)
         );
@@ -518,11 +520,11 @@ abstract class MultiBulkCommand extends Command {
 /* ------------------------------------------------------------------------- */
 
 interface IResponseHandler {
-    function handle(Connection $connection, $payload);
+    function handle(IConnectionSingle $connection, $payload);
 }
 
 class ResponseStatusHandler implements IResponseHandler {
-    public function handle(Connection $connection, $status) {
+    public function handle(IConnectionSingle $connection, $status) {
         if ($status === Protocol::OK) {
             return true;
         }
@@ -534,19 +536,19 @@ class ResponseStatusHandler implements IResponseHandler {
 }
 
 class ResponseErrorHandler implements IResponseHandler {
-    public function handle(Connection $connection, $errorMessage) {
+    public function handle(IConnectionSingle $connection, $errorMessage) {
         throw new ServerException(substr($errorMessage, 4));
     }
 }
 
 class ResponseErrorSilentHandler implements IResponseHandler {
-    public function handle(Connection $connection, $errorMessage) {
+    public function handle(IConnectionSingle $connection, $errorMessage) {
         return new ResponseError(substr($errorMessage, 4));
     }
 }
 
 class ResponseBulkHandler implements IResponseHandler {
-    public function handle(Connection $connection, $dataLength) {
+    public function handle(IConnectionSingle $connection, $dataLength) {
         if (!is_numeric($dataLength)) {
             Shared\Utils::onCommunicationException(new MalformedServerResponse(
                 $connection, "Cannot parse '$dataLength' as data length"
@@ -566,7 +568,7 @@ class ResponseBulkHandler implements IResponseHandler {
         return null;
     }
 
-    private static function discardNewLine(Connection $connection) {
+    private static function discardNewLine(IConnectionSingle $connection) {
         if ($connection->readBytes(2) !== Protocol::NEWLINE) {
             Shared\Utils::onCommunicationException(new MalformedServerResponse(
                 $connection, 'Did not receive a new-line at the end of a bulk response'
@@ -576,7 +578,7 @@ class ResponseBulkHandler implements IResponseHandler {
 }
 
 class ResponseMultiBulkHandler implements IResponseHandler {
-    public function handle(Connection $connection, $rawLength) {
+    public function handle(IConnectionSingle $connection, $rawLength) {
         if (!is_numeric($rawLength)) {
             Shared\Utils::onCommunicationException(new MalformedServerResponse(
                 $connection, "Cannot parse '$rawLength' as data length"
@@ -602,7 +604,7 @@ class ResponseMultiBulkHandler implements IResponseHandler {
 }
 
 class ResponseMultiBulkStreamHandler implements IResponseHandler {
-    public function handle(Connection $connection, $rawLength) {
+    public function handle(IConnectionSingle $connection, $rawLength) {
         if (!is_numeric($rawLength)) {
             Shared\Utils::onCommunicationException(new MalformedServerResponse(
                 $connection, "Cannot parse '$rawLength' as data length"
@@ -613,7 +615,7 @@ class ResponseMultiBulkStreamHandler implements IResponseHandler {
 }
 
 class ResponseIntegerHandler implements IResponseHandler {
-    public function handle(Connection $connection, $number) {
+    public function handle(IConnectionSingle $connection, $number) {
         if (is_numeric($number)) {
             return (int) $number;
         }
@@ -655,7 +657,7 @@ class ResponseReader {
         }
     }
 
-    public function read(Connection $connection) {
+    public function read(IConnectionSingle $connection) {
         $header = $connection->readLine();
         if ($header === '') {
             Shared\Utils::onCommunicationException(new MalformedServerResponse(
@@ -1086,6 +1088,7 @@ class PubSubContext implements \Iterator {
 /* ------------------------------------------------------------------------- */
 
 class ConnectionParameters {
+    const DEFAULT_SCHEME = 'tcp';
     const DEFAULT_HOST = '127.0.0.1';
     const DEFAULT_PORT = 6379;
     const DEFAULT_TIMEOUT = 5;
@@ -1101,7 +1104,7 @@ class ConnectionParameters {
     private static function parseURI($uri) {
         $parsed = @parse_url($uri);
 
-        if ($parsed == false || $parsed['scheme'] != 'redis' || $parsed['host'] == null) {
+        if ($parsed == false || $parsed['host'] == null) {
             throw new ClientException("Invalid URI: $uri");
         }
 
@@ -1148,6 +1151,7 @@ class ConnectionParameters {
 
     private static function filterConnectionParams($parameters) {
         return array(
+            'scheme' => self::getParamOrDefault($parameters, 'scheme', self::DEFAULT_SCHEME), 
             'host' => self::getParamOrDefault($parameters, 'host', self::DEFAULT_HOST), 
             'port' => (int) self::getParamOrDefault($parameters, 'port', self::DEFAULT_PORT), 
             'database' => self::getParamOrDefault($parameters, 'database'), 
@@ -1179,11 +1183,61 @@ interface IConnection {
     public function executeCommand(Command $command);
 }
 
-class Connection implements IConnection {
+interface IConnectionSingle extends IConnection {
+    public function writeBytes($buffer);
+    public function readBytes($length);
+    public function readLine();
+}
+
+interface IConnectionCluster extends IConnection {
+    public function getConnection(Command $command);
+    public function getConnectionById($connectionId);
+}
+
+class Connection {
+    private static $_registeredSchemes;
+
+    private static function ensureInitialized() {
+        if (!isset(self::$_registeredSchemes)) {
+            self::$_registeredSchemes = self::getDefaultSchemes();
+        }
+    }
+
+    private static function getDefaultSchemes() {
+        return array(
+            'tcp'   => '\Predis\TcpConnection',
+
+            // used for compatibility with older versions of Predis
+            'redis' => '\Predis\TcpConnection',
+        );
+    }
+
+    public static function registerScheme($scheme, $connectionClass) {
+        self::ensureInitialized();
+        $connectionReflection = new \ReflectionClass($connectionClass);
+        if (!$connectionReflection->isSubclassOf('\Predis\IConnectionSingle')) {
+            throw new ClientException(
+                "Cannot register '$connectionClass' as it is not a valid connection class"
+            );
+        }
+        self::$_registeredSchemes[$scheme] = $connectionClass;
+    }
+
+    public static function create(ConnectionParameters $parameters, ResponseReader $reader) {
+        self::ensureInitialized();
+        if (!isset(self::$_registeredSchemes[$parameters->scheme])) {
+            throw new ClientException("Unknown connection scheme: {$parameters->scheme}");
+        }
+        $connection = self::$_registeredSchemes[$parameters->scheme];
+        return new $connection($parameters, $reader);
+    }
+}
+
+class TcpConnection implements IConnectionSingle {
     private $_params, $_socket, $_initCmds, $_reader;
 
     public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
-        $this->_params   = $parameters;
+        $this->_params   = $this->checkParameters($parameters);
         $this->_initCmds = array();
         $this->_reader   = $reader ?: new ResponseReader();
     }
@@ -1194,6 +1248,13 @@ class Connection implements IConnection {
         }
     }
 
+    private function checkParameters(ConnectionParameters $parameters) {
+        if ($parameters->scheme != 'tcp' && $parameters->scheme != 'redis') {
+            throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}");
+        }
+        return $parameters;
+    }
+
     public function isConnected() {
         return is_resource($this->_socket);
     }
@@ -1347,7 +1408,7 @@ class Connection implements IConnection {
     }
 }
 
-class ConnectionCluster implements IConnection, \IteratorAggregate {
+class ConnectionCluster implements IConnectionCluster, \IteratorAggregate {
     private $_pool, $_distributor;
 
     public function __construct(Distribution\IDistributionStrategy $distributor = null) {
@@ -1376,7 +1437,7 @@ class ConnectionCluster implements IConnection, \IteratorAggregate {
         }
     }
 
-    public function add(Connection $connection) {
+    public function add(IConnectionSingle $connection) {
         $parameters = $connection->getParameters();
         if (isset($parameters->alias)) {
             $this->_pool[$parameters->alias] = $connection;
@@ -2120,7 +2181,7 @@ abstract class MultiBulkResponseIteratorBase implements \Iterator, \Countable {
 class MultiBulkResponseIterator extends MultiBulkResponseIteratorBase {
     private $_connection;
 
-    public function __construct(\Predis\Connection $connection, $size) {
+    public function __construct(\Predis\IConnectionSingle $connection, $size) {
         $this->_connection = $connection;
         $this->_reader     = $connection->getResponseReader();
         $this->_position   = 0;

+ 13 - 13
test/PredisClientFeatures.php

@@ -236,12 +236,12 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
     /* Connection */
 
     function testConnection_StringCastReturnsIPAndPort() {
-        $connection = new \Predis\Connection(RC::getConnectionParameters());
+        $connection = new \Predis\TcpConnection(RC::getConnectionParameters());
         $this->assertEquals(RC::SERVER_HOST . ':' . RC::SERVER_PORT, (string) $connection);
     }
 
     function testConnection_ConnectDisconnect() {
-        $connection = new \Predis\Connection(RC::getConnectionParameters());
+        $connection = new \Predis\TcpConnection(RC::getConnectionParameters());
 
         $this->assertFalse($connection->isConnected());
         $connection->connect();
@@ -252,7 +252,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
 
     function testConnection_WriteAndReadCommand() {
         $cmd = \Predis\RedisServerProfile::getDefault()->createCommand('ping');
-        $connection = new \Predis\Connection(RC::getConnectionParameters());
+        $connection = new \Predis\TcpConnection(RC::getConnectionParameters());
         $connection->connect();
 
         $connection->writeCommand($cmd);
@@ -261,7 +261,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
 
     function testConnection_WriteCommandAndCloseConnection() {
         $cmd = \Predis\RedisServerProfile::getDefault()->createCommand('quit');
-        $connection = new \Predis\Connection(RC::getConnectionParameters());
+        $connection = new \Predis\TcpConnection(RC::getConnectionParameters());
         $connection->connect();
 
         $this->assertTrue($connection->isConnected());
@@ -274,7 +274,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
     }
 
     function testConnection_GetSocketOpensConnection() {
-        $connection = new \Predis\Connection(RC::getConnectionParameters());
+        $connection = new \Predis\TcpConnection(RC::getConnectionParameters());
 
         $this->assertFalse($connection->isConnected());
         $this->assertType('resource', $connection->getSocket());
@@ -283,7 +283,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
 
     function testConnection_LazyConnect() {
         $cmd = \Predis\RedisServerProfile::getDefault()->createCommand('ping');
-        $connection = new \Predis\Connection(RC::getConnectionParameters());
+        $connection = new \Predis\TcpConnection(RC::getConnectionParameters());
 
         $this->assertFalse($connection->isConnected());
         $connection->writeCommand($cmd);
@@ -292,23 +292,23 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
     }
 
     function testConnection_RawCommand() {
-        $connection = new \Predis\Connection(RC::getConnectionParameters());
+        $connection = new \Predis\TcpConnection(RC::getConnectionParameters());
         $this->assertEquals('PONG', $connection->rawCommand("PING\r\n"));
     }
 
     function testConnection_Alias() {
-        $connection1 = new \Predis\Connection(RC::getConnectionParameters());
+        $connection1 = new \Predis\TcpConnection(RC::getConnectionParameters());
         $this->assertNull($connection1->getParameters()->alias);
 
         $args = array_merge(RC::getConnectionArguments(), array('alias' => 'servername'));
-        $connection2 = new \Predis\Connection(new \Predis\ConnectionParameters($args));
+        $connection2 = new \Predis\TcpConnection(new \Predis\ConnectionParameters($args));
         $this->assertEquals('servername', $connection2->getParameters()->alias);
     }
 
     function testConnection_ConnectionTimeout() {
         $timeout = 3;
         $args    = array('host' => '1.0.0.1', 'connection_timeout' => $timeout);
-        $connection = new \Predis\Connection(new \Predis\ConnectionParameters($args));
+        $connection = new \Predis\TcpConnection(new \Predis\ConnectionParameters($args));
 
         $start = time();
         RC::testForCommunicationException($this, null, function() use($connection) {
@@ -321,7 +321,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
         $timeout = 1;
         $args    = array_merge(RC::getConnectionArguments(), array('read_write_timeout' => $timeout));
         $cmdFake = \Predis\RedisServerProfile::getDefault()->createCommand('ping');
-        $connection = new \Predis\Connection(new \Predis\ConnectionParameters($args));
+        $connection = new \Predis\TcpConnection(new \Predis\ConnectionParameters($args));
 
         $expectedMessage = 'Error while reading line from the server';
         $start = time();
@@ -335,7 +335,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
     /* ResponseReader */
 
     function testResponseReader_OptionIterableMultiBulkReplies() {
-        $connection = new \Predis\Connection(RC::getConnectionParameters());
+        $connection = new \Predis\TcpConnection(RC::getConnectionParameters());
         $responseReader = $connection->getResponseReader();
 
         $responseReader->setHandler(
@@ -352,7 +352,7 @@ class PredisClientFeaturesTestSuite extends PHPUnit_Framework_TestCase {
     }
 
     function testResponseReader_OptionExceptionOnError() {
-        $connection = new \Predis\Connection(RC::getConnectionParameters());
+        $connection = new \Predis\TcpConnection(RC::getConnectionParameters());
         $responseReader = $connection->getResponseReader();
         $connection->rawCommand("SET key 5\r\nvalue\r\n");
         $rawCmdUnexpected = "LPUSH key 5\r\nvalue\r\n";