Przeglądaj źródła

Add support for connecting to Redis with UNIX domain sockets.

Daniele Alessandri 14 lat temu
rodzic
commit
6b6edbe6c2
4 zmienionych plików z 57 dodań i 16 usunięć
  1. 3 0
      CHANGELOG
  2. 1 0
      README.markdown
  3. 1 1
      VERSION
  4. 52 15
      lib/Predis.php

+ 3 - 0
CHANGELOG

@@ -1,3 +1,6 @@
+v0.6.6 (2011-xx-xx)
+  * Add support for connecting to Redis using UNIX domain sockets (ISSUE #25).
+
 v0.6.5 (2011-02-12)
   * FIX: due to an untested internal change introduced in v0.6.4, a wrong 
     handling of bulk reads of zero-length values was producing protocol 

+ 1 - 0
README.markdown

@@ -22,6 +22,7 @@ to be implemented soon in Predis.
 - Command pipelining on single and multiple connections (transparent).
 - Abstraction for Redis transactions (>= 2.0) with support for CAS operations (>= 2.2).
 - Lazy connections (connections to Redis instances are only established just in time).
+- Ability to connect to Redis using TCP/IP or UNIX domain sockets.
 - Flexible system to define and register your own set of commands to a client instance.
 
 

+ 1 - 1
VERSION

@@ -1 +1 @@
-0.6.5
+0.6.6-dev

+ 52 - 15
lib/Predis.php

@@ -1198,6 +1198,7 @@ class PubSubContext implements \Iterator {
 /* ------------------------------------------------------------------------- */
 
 class ConnectionParameters {
+    const DEFAULT_SCHEME = 'redis';
     const DEFAULT_HOST = '127.0.0.1';
     const DEFAULT_PORT = 6379;
     const DEFAULT_TIMEOUT = 5;
@@ -1211,9 +1212,12 @@ class ConnectionParameters {
     }
 
     private static function parseURI($uri) {
+        if (stripos($uri, 'unix') === 0) {
+            // Hack to support URIs for UNIX sockets with minimal effort.
+            $uri = str_ireplace('unix:///', 'unix://localhost/', $uri);
+        }
         $parsed = @parse_url($uri);
-
-        if ($parsed == false || $parsed['scheme'] != 'redis' || $parsed['host'] == null) {
+        if ($parsed == false) {
             throw new ClientException("Invalid URI: $uri");
         }
 
@@ -1246,6 +1250,9 @@ class ConnectionParameters {
                     case 'weight':
                         $details['weight'] = $v;
                         break;
+                    case 'path':
+                        $details['path'] = $v;
+                        break;
                 }
             }
             $parsed = array_merge($parsed, $details);
@@ -1260,6 +1267,7 @@ class ConnectionParameters {
 
     private static function filterConnectionParams($parameters) {
         return array(
+            'scheme' => self::getParamOrDefault($parameters, 'scheme', self::DEFAULT_SCHEME), 
             'host' => self::getParamOrDefault($parameters, 'host', self::DEFAULT_HOST), 
             'port' => (int) self::getParamOrDefault($parameters, 'port', self::DEFAULT_PORT), 
             'database' => self::getParamOrDefault($parameters, 'database'), 
@@ -1270,6 +1278,7 @@ class ConnectionParameters {
             'read_write_timeout' => self::getParamOrDefault($parameters, 'read_write_timeout'), 
             'alias'  => self::getParamOrDefault($parameters, 'alias'), 
             'weight' => self::getParamOrDefault($parameters, 'weight'), 
+            'path' => self::getParamOrDefault($parameters, 'path'), 
         );
     }
 
@@ -1292,9 +1301,14 @@ interface IConnection {
 }
 
 class Connection implements IConnection {
-    private $_params, $_socket, $_initCmds, $_reader;
+    private $_params, $_socket, $_initCmds, $_reader, $_initializer;
 
     public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
+        $scheme = $parameters->scheme;
+        if ($scheme !== 'redis' && $scheme !== 'tcp' && $scheme !== 'unix') {
+            throw new \InvalidArgumentException("Invalid scheme: $scheme");
+        }
+        $this->_initializer = array($this, "{$parameters->scheme}StreamInitializer");
         $this->_params   = $parameters;
         $this->_initCmds = array();
         $this->_reader   = $reader ?: new ResponseReader();
@@ -1314,31 +1328,54 @@ class Connection implements IConnection {
         if ($this->isConnected()) {
             throw new ClientException('Connection already estabilished');
         }
-        $uri = sprintf('tcp://%s:%d/', $this->_params->host, $this->_params->port);
+        $this->_socket = call_user_func($this->_initializer, $this->_params);
+        if (count($this->_initCmds) > 0){
+            $this->sendInitializationCommands();
+        }
+    }
+
+    private function tcpStreamInitializer(ConnectionParameters $parameters) {
+        return $this->redisStreamInitializer($parameters);
+    }
+
+    private function redisStreamInitializer(ConnectionParameters $parameters) {
+        $uri = sprintf('tcp://%s:%d/', $parameters->host, $parameters->port);
         $connectFlags = STREAM_CLIENT_CONNECT;
-        if ($this->_params->connection_async) {
+        if ($parameters->connection_async) {
             $connectFlags |= STREAM_CLIENT_ASYNC_CONNECT;
         }
-        if ($this->_params->connection_persistent) {
+        if ($parameters->connection_persistent) {
             $connectFlags |= STREAM_CLIENT_PERSISTENT;
         }
-        $this->_socket = @stream_socket_client(
-            $uri, $errno, $errstr, $this->_params->connection_timeout, $connectFlags
+        $socket = @stream_socket_client(
+            $uri, $errno, $errstr, $parameters->connection_timeout, $connectFlags
         );
 
-        if (!$this->_socket) {
+        if (!$socket) {
             $this->onCommunicationException(trim($errstr), $errno);
         }
 
-        if (isset($this->_params->read_write_timeout)) {
-            $timeoutSeconds  = floor($this->_params->read_write_timeout);
-            $timeoutUSeconds = ($this->_params->read_write_timeout - $timeoutSeconds) * 1000000;
-            stream_set_timeout($this->_socket, $timeoutSeconds, $timeoutUSeconds);
+        if (isset($parameters->read_write_timeout)) {
+            $timeoutSeconds  = floor($parameters->read_write_timeout);
+            $timeoutUSeconds = ($parameters->read_write_timeout - $timeoutSeconds) * 1000000;
+            stream_set_timeout($socket, $timeoutSeconds, $timeoutUSeconds);
         }
+        return $socket;
+    }
 
-        if (count($this->_initCmds) > 0){
-            $this->sendInitializationCommands();
+    private function unixStreamInitializer(ConnectionParameters $parameters) {
+        $uri = sprintf('unix:///%s', $parameters->path);
+        $connectFlags = STREAM_CLIENT_CONNECT;
+        if ($parameters->connection_persistent) {
+            $connectFlags |= STREAM_CLIENT_PERSISTENT;
+        }
+        $socket = @stream_socket_client(
+            $uri, $errno, $errstr, $parameters->connection_timeout, $connectFlags
+        );
+        if (!$socket) {
+            $this->onCommunicationException(trim($errstr), $errno);
         }
+        return $socket;
     }
 
     public function disconnect() {