浏览代码

Implement full support for IPv6.

Using IPv6 with Predis was basically impossible due to various inconsistencies
and bugs through the library, now it is supported by all the connection classes.

Following the standard for IPv6 literal addresses in URI strings, the IP literal
must be enclosed within square brackets when passing the parameters as a string:

  $parameters = 'tcp://[2001:db8:0:f101::1]:6379';

See https://tools.ietf.org/html/rfc3986#section-3.2.2 for further details.

This commit also fixes #239 making redis-cluster usable with nodes using IPv6.
Daniele Alessandri 10 年之前
父节点
当前提交
8aec51b34f

+ 2 - 0
CHANGELOG.md

@@ -1,6 +1,8 @@
 v1.0.2 (2015-xx-xx)
 ================================================================================
 
+- IPv6 is now fully supported.
+
 - Added `redis` as an accepted scheme for connection parameters. When using this
   scheme, the rules used to parse URI strings match the provisional registration
   [published by IANA](http://www.iana.org/assignments/uri-schemes/prov/redis).

+ 25 - 6
src/Connection/AbstractConnection.php

@@ -133,6 +133,29 @@ abstract class AbstractConnection implements NodeConnectionInterface
         return $this->read();
     }
 
+    /**
+     * Helper method that returns an exception message augmented with useful
+     * details from the connection parameters.
+     *
+     * @param string $message Error message.
+     *
+     * @return string
+     */
+    private function createExceptionMessage($message)
+    {
+        $parameters = $this->parameters;
+
+        if ($parameters->scheme === 'unix') {
+            return "$message [$parameters->scheme:$parameters->path]";
+        }
+
+        if (filter_var($parameters->host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) {
+            return "$message [$parameters->scheme://[$parameters->host]:$parameters->port]";
+        }
+
+        return "$message [$parameters->scheme://$parameters->host:$parameters->port]";
+    }
+
     /**
      * Helper method to handle connection errors.
      *
@@ -142,9 +165,7 @@ abstract class AbstractConnection implements NodeConnectionInterface
     protected function onConnectionError($message, $code = null)
     {
         CommunicationException::handle(
-            new ConnectionException(
-                $this, "$message [{$this->parameters->scheme}://{$this->getIdentifier()}]", $code
-            )
+            new ConnectionException($this, static::createExceptionMessage($message), $code)
         );
     }
 
@@ -156,9 +177,7 @@ abstract class AbstractConnection implements NodeConnectionInterface
     protected function onProtocolError($message)
     {
         CommunicationException::handle(
-            new ProtocolException(
-                $this, "$message [{$this->parameters->scheme}://{$this->getIdentifier()}]"
-            )
+            new ProtocolException($this, static::createExceptionMessage($message))
         );
     }
 

+ 3 - 3
src/Connection/Aggregate/RedisCluster.php

@@ -276,11 +276,11 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
      */
     protected function createConnection($connectionID)
     {
-        $host = explode(':', $connectionID, 2);
+        $separator = strrpos($connectionID, ':');
 
         $parameters = array_merge($this->defaultParameters, array(
-            'host' => $host[0],
-            'port' => $host[1],
+            'host' => substr($connectionID, 0, $separator),
+            'port' => substr($connectionID, $separator + 1),
         ));
 
         $connection = $this->connections->create($parameters);

+ 53 - 51
src/Connection/PhpiredisSocketConnection.php

@@ -166,22 +166,54 @@ class PhpiredisSocketConnection extends AbstractConnection
         $this->onConnectionError(trim($errstr), $errno);
     }
 
+    /**
+     * Gets the address of an host from connection parameters.
+     *
+     * @param ParametersInterface $parameters Parameters used to initialize the connection.
+     *
+     * @return string
+     */
+    protected static function getAddress(ParametersInterface $parameters)
+    {
+        if (filter_var($host = $parameters->host, FILTER_VALIDATE_IP)) {
+            return $host;
+        }
+
+        if ($host === $address = gethostbyname($host)) {
+            return false;
+        }
+
+        return $address;
+    }
+
     /**
      * {@inheritdoc}
      */
     protected function createResource()
     {
-        $isUnix = $this->parameters->scheme === 'unix';
-        $domain = $isUnix ? AF_UNIX : AF_INET;
-        $protocol = $isUnix ? 0 : SOL_TCP;
+        $parameters = $this->parameters;
 
-        $socket = @call_user_func('socket_create', $domain, SOCK_STREAM, $protocol);
+        if ($parameters->scheme === 'unix') {
+            $address = $parameters->path;
+            $domain = AF_UNIX;
+            $protocol = 0;
+        } else {
+            if (false === $address = self::getAddress($parameters)) {
+                $this->onConnectionError("Cannot resolve the address of '$parameters->host'.");
+            }
+
+            $domain = filter_var($address, FILTER_VALIDATE_IP, FILTER_FLAG_IPV6) ? AF_INET6 : AF_INET;
+            $protocol = SOL_TCP;
+        }
+
+        $socket = @socket_create($domain, SOCK_STREAM, $protocol);
 
         if (!is_resource($socket)) {
             $this->emitSocketError();
         }
 
-        $this->setSocketOptions($socket, $this->parameters);
+        $this->setSocketOptions($socket, $parameters);
+        $this->connectWithTimeout($socket, $address, $parameters);
 
         return $socket;
     }
@@ -194,12 +226,14 @@ class PhpiredisSocketConnection extends AbstractConnection
      */
     private function setSocketOptions($socket, ParametersInterface $parameters)
     {
-        if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) {
-            $this->emitSocketError();
-        }
+        if ($parameters->scheme !== 'unix') {
+            if (!socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1)) {
+                $this->emitSocketError();
+            }
 
-        if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
-            $this->emitSocketError();
+            if (!socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
+                $this->emitSocketError();
+            }
         }
 
         if (isset($parameters->read_write_timeout)) {
@@ -222,50 +256,20 @@ class PhpiredisSocketConnection extends AbstractConnection
         }
     }
 
-    /**
-     * Gets the address from the connection parameters.
-     *
-     * @param ParametersInterface $parameters Parameters used to initialize the connection.
-     *
-     * @return string
-     */
-    protected static function getAddress(ParametersInterface $parameters)
-    {
-        if ($parameters->scheme === 'unix') {
-            return $parameters->path;
-        }
-
-        $host = $parameters->host;
-
-        if (ip2long($host) === false) {
-            if (false === $addresses = gethostbynamel($host)) {
-                return false;
-            }
-
-            return $addresses[array_rand($addresses)];
-        }
-
-        return $host;
-    }
-
     /**
      * Opens the actual connection to the server with a timeout.
      *
+     * @param resource            $socket     Socket resource.
+     * @param string              $address    IP address (DNS-resolved from hostname)
      * @param ParametersInterface $parameters Parameters used to initialize the connection.
      *
      * @return string
      */
-    private function connectWithTimeout(ParametersInterface $parameters)
+    private function connectWithTimeout($socket, $address, ParametersInterface $parameters)
     {
-        if (false === $host = self::getAddress($parameters)) {
-            $this->onConnectionError("Cannot resolve the address of '$parameters->host'.");
-        }
-
-        $socket = $this->getResource();
-
         socket_set_nonblock($socket);
 
-        if (@socket_connect($socket, $host, (int) $parameters->port) === false) {
+        if (@socket_connect($socket, $address, (int) $parameters->port) === false) {
             $error = socket_last_error();
 
             if ($error != SOCKET_EINPROGRESS && $error != SOCKET_EALREADY) {
@@ -287,9 +291,11 @@ class PhpiredisSocketConnection extends AbstractConnection
         if ($selected === 2) {
             $this->onConnectionError('Connection refused.', SOCKET_ECONNREFUSED);
         }
+
         if ($selected === 0) {
             $this->onConnectionError('Connection timed out.', SOCKET_ETIMEDOUT);
         }
+
         if ($selected === false) {
             $this->emitSocketError();
         }
@@ -300,13 +306,9 @@ class PhpiredisSocketConnection extends AbstractConnection
      */
     public function connect()
     {
-        if (parent::connect()) {
-            $this->connectWithTimeout($this->parameters);
-
-            if ($this->initCommands) {
-                foreach ($this->initCommands as $command) {
-                    $this->executeCommand($command);
-                }
+        if (parent::connect() && $this->initCommands) {
+            foreach ($this->initCommands as $command) {
+                $this->executeCommand($command);
             }
         }
     }

+ 1 - 1
src/Connection/PhpiredisStreamConnection.php

@@ -89,7 +89,7 @@ class PhpiredisStreamConnection extends StreamConnection
      */
     protected function tcpStreamInitializer(ParametersInterface $parameters)
     {
-        $uri = "tcp://{$parameters->host}:{$parameters->port}";
+        $uri = "tcp://[{$parameters->host}]:{$parameters->port}";
         $flags = STREAM_CLIENT_CONNECT;
         $socket = null;
 

+ 1 - 1
src/Connection/StreamConnection.php

@@ -74,7 +74,7 @@ class StreamConnection extends AbstractConnection
      */
     protected function tcpStreamInitializer(ParametersInterface $parameters)
     {
-        $uri = "tcp://{$parameters->host}:{$parameters->port}";
+        $uri = "tcp://[$parameters->host]:$parameters->port";
         $flags = STREAM_CLIENT_CONNECT;
 
         if (isset($parameters->async_connect) && (bool) $parameters->async_connect) {

+ 5 - 1
src/Connection/WebdisConnection.php

@@ -118,10 +118,14 @@ class WebdisConnection implements NodeConnectionInterface
     {
         $parameters = $this->getParameters();
 
+        if (filter_var($host = $parameters->host, FILTER_VALIDATE_IP)) {
+            $host = "[$host]";
+        }
+
         $options = array(
             CURLOPT_FAILONERROR => true,
             CURLOPT_CONNECTTIMEOUT_MS => $parameters->timeout * 1000,
-            CURLOPT_URL => "{$parameters->scheme}://{$parameters->host}:{$parameters->port}",
+            CURLOPT_URL => "$parameters->scheme://$host:$parameters->port",
             CURLOPT_HTTP_VERSION => CURL_HTTP_VERSION_1_1,
             CURLOPT_POST => true,
             CURLOPT_WRITEFUNCTION => array($this, 'feedReader'),

+ 34 - 1
tests/PHPUnit/PredisConnectionTestCase.php

@@ -411,12 +411,45 @@ abstract class PredisConnectionTestCase extends PredisTestCase
      * @group connected
      * @group slow
      * @expectedException \Predis\Connection\ConnectionException
+     * @expectedExceptionMessageRegExp /.* \[tcp:\/\/169.254.10.10:6379\]/
      */
     public function testThrowsExceptionOnConnectionTimeout()
     {
         $connection = $this->createConnectionWithParams(array(
             'host' => '169.254.10.10',
-            'timeout' => 0.5,
+            'timeout' => 0.1,
+        ), false);
+
+        $connection->connect();
+    }
+
+    /**
+     * @group connected
+     * @group slow
+     * @expectedException \Predis\Connection\ConnectionException
+     * @expectedExceptionMessageRegExp /.* \[tcp:\/\/\[0:0:0:0:0:ffff:a9fe:a0a\]:6379\]/
+     */
+    public function testThrowsExceptionOnConnectionTimeoutIPv6()
+    {
+        $connection = $this->createConnectionWithParams(array(
+            'host' => '0:0:0:0:0:ffff:a9fe:a0a',
+            'timeout' => 0.1,
+        ), false);
+
+        $connection->connect();
+    }
+
+    /**
+     * @group connected
+     * @group slow
+     * @expectedException \Predis\Connection\ConnectionException
+     * @expectedExceptionMessageRegExp /.* \[unix:\/tmp\/nonexistent\/redis\.sock]/
+     */
+    public function testThrowsExceptionOnUnixDomainSocketNotFound()
+    {
+        $connection = $this->createConnectionWithParams(array(
+            'scheme' => 'unix',
+            'path' => '/tmp/nonexistent/redis.sock',
         ), false);
 
         $connection->connect();

+ 34 - 0
tests/Predis/Connection/Aggregate/RedisClusterTest.php

@@ -632,6 +632,40 @@ class RedisClusterTest extends PredisTestCase
         $this->assertSame(3, count($cluster));
     }
 
+    /**
+     * @group disconnected
+     */
+    public function testParseIPv6AddresseAndPortPairInRedirectionPayload()
+    {
+        $movedResponse = new Response\Error('MOVED 1970 2001:db8:0:f101::2:6379');
+
+        $command = Profile\Factory::getDefault()->createCommand('get', array('node:1001'));
+
+        $connection1 = $this->getMockConnection('tcp://[2001:db8:0:f101::1]:6379');
+        $connection1->expects($this->once())
+                   ->method('executeCommand')
+                   ->with($command)
+                   ->will($this->returnValue($movedResponse));
+
+        $connection2 = $this->getMockConnection('tcp://[2001:db8:0:f101::2]:6379');
+        $connection2->expects($this->once())
+                    ->method('executeCommand')
+                    ->with($command)
+                    ->will($this->returnValue('foobar'));
+
+        $factory = $this->getMock('Predis\Connection\Factory');
+        $factory->expects($this->once())
+                ->method('create')
+                ->with(array('host' => '2001:db8:0:f101::2', 'port' => '6379'))
+                ->will($this->returnValue($connection2));
+
+        $cluster = new RedisCluster($factory);
+        $cluster->useClusterSlots(false);
+        $cluster->add($connection1);
+
+        $cluster->executeCommand($command);
+    }
+
     /**
      * @group disconnected
      */

+ 4 - 8
tests/Predis/Connection/ParametersTest.php

@@ -265,15 +265,11 @@ class ParametersTest extends PredisTestCase
      */
     public function testParsingURIWithEmbeddedIPV6AddressShouldStripBracketsFromHost()
     {
-        $uri = 'tcp://[::1]:7000';
+        $expected = array('scheme' => 'tcp', 'host' => '::1', 'port' => 7000);
+        $this->assertSame($expected, Parameters::parse('tcp://[::1]:7000'));
 
-        $expected = array(
-            'scheme' => 'tcp',
-            'host' => '::1',
-            'port' => 7000,
-        );
-
-        $this->assertSame($expected, Parameters::parse($uri));
+        $expected = array('scheme' => 'tcp', 'host' => '2001:db8:0:f101::1', 'port' => 7000);
+        $this->assertSame($expected, Parameters::parse('tcp://[2001:db8:0:f101::1]:7000'));
     }
 
     /**