Kaynağa Gözat

Move some common code for connections in the Predis\Connection class.

Daniele Alessandri 15 yıl önce
ebeveyn
işleme
104bfa3332
1 değiştirilmiş dosya ile 75 ekleme ve 56 silme
  1. 75 56
      lib/Predis.php

+ 75 - 56
lib/Predis.php

@@ -1259,15 +1259,81 @@ final class ConnectionFactory {
     }
 }
 
-class TcpConnection implements IConnectionSingle {
-    private $_params, $_socket, $_initCmds, $_reader;
+abstract class Connection implements IConnectionSingle {
+    protected $_params, $_socket, $_initCmds, $_reader;
 
     public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
-        $this->_params   = $this->checkParameters($parameters);
+        $this->_params   = $parameters;
         $this->_initCmds = array();
         $this->_reader   = $reader ?: new ResponseReader();
     }
 
+    public function __destruct() {
+        $this->disconnect();
+    }
+
+    public function isConnected() {
+        return is_resource($this->_socket);
+    }
+
+    protected abstract function createResource();
+
+    public function connect() {
+        if ($this->isConnected()) {
+            throw new ClientException('Connection already estabilished');
+        }
+        $this->createResource();
+    }
+
+    public function disconnect() {
+        if ($this->isConnected()) {
+            fclose($this->_socket);
+        }
+    }
+
+    public function pushInitCommand(ICommand $command){
+        $this->_initCmds[] = $command;
+    }
+
+    public function executeCommand(ICommand $command) {
+        $this->writeCommand($command);
+        if ($command->closesConnection()) {
+            return $this->disconnect();
+        }
+        return $this->readResponse($command);
+    }
+
+    protected function onCommunicationException($message, $code = null) {
+        Utils::onCommunicationException(
+            new CommunicationException($this, $message, $code)
+        );
+    }
+
+    public function getSocket() {
+        if (!$this->isConnected()) {
+            $this->connect();
+        }
+        return $this->_socket;
+    }
+
+    public function getResponseReader() {
+        return $this->_reader;
+    }
+
+    public function getParameters() {
+        return $this->_params;
+    }
+
+    public function __toString() {
+        return sprintf('%s:%d', $this->_params->host, $this->_params->port);
+    }
+}
+
+class TcpConnection extends Connection implements IConnectionSingle {
+    public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
+        parent::__construct($this->checkParameters($parameters), $reader);
+    }
+
     public function __destruct() {
         if (!$this->_params->connection_persistent) {
             $this->disconnect();
@@ -1281,14 +1347,7 @@ class TcpConnection implements IConnectionSingle {
         return $parameters;
     }
 
-    public function isConnected() {
-        return is_resource($this->_socket);
-    }
-
-    public function connect() {
-        if ($this->isConnected()) {
-            throw new ClientException('Connection already estabilished');
-        }
+    protected function createResource() {
         $uri = sprintf('tcp://%s:%d/', $this->_params->host, $this->_params->port);
         $connectFlags = STREAM_CLIENT_CONNECT;
         if ($this->_params->connection_async) {
@@ -1310,20 +1369,6 @@ class TcpConnection implements IConnectionSingle {
             $timeoutUSeconds = ($this->_params->read_write_timeout - $timeoutSeconds) * 1000000;
             stream_set_timeout($this->_socket, $timeoutSeconds, $timeoutUSeconds);
         }
-
-        if (count($this->_initCmds) > 0){
-            $this->sendInitializationCommands();
-        }
-    }
-
-    public function disconnect() {
-        if ($this->isConnected()) {
-            fclose($this->_socket);
-        }
-    }
-
-    public function pushInitCommand(ICommand $command){
-        $this->_initCmds[] = $command;
     }
 
     private function sendInitializationCommands() {
@@ -1335,10 +1380,11 @@ class TcpConnection implements IConnectionSingle {
         }
     }
 
-    private function onCommunicationException($message, $code = null) {
-        Utils::onCommunicationException(
-            new CommunicationException($this, $message, $code)
-        );
+    public function connect() {
+        parent::connect();
+        if (count($this->_initCmds) > 0){
+            $this->sendInitializationCommands();
+        }
     }
 
     public function writeCommand(ICommand $command) {
@@ -1351,14 +1397,6 @@ class TcpConnection implements IConnectionSingle {
         return $skipparse ? $response : $command->parseResponse($response);
     }
 
-    public function executeCommand(ICommand $command) {
-        $this->writeCommand($command);
-        if ($command->closesConnection()) {
-            return $this->disconnect();
-        }
-        return $this->readResponse($command);
-    }
-
     public function rawCommand($rawCommandData, $closesConnection = false) {
         $this->writeBytes($rawCommandData);
         if ($closesConnection) {
@@ -1413,25 +1451,6 @@ class TcpConnection implements IConnectionSingle {
         while (substr($value, -2) !== Protocol::NEWLINE);
         return substr($value, 0, -2);
     }
-
-    public function getSocket() {
-        if (!$this->isConnected()) {
-            $this->connect();
-        }
-        return $this->_socket;
-    }
-
-    public function getResponseReader() {
-        return $this->_reader;
-    }
-
-    public function getParameters() {
-        return $this->_params;
-    }
-
-    public function __toString() {
-        return sprintf('%s:%d', $this->_params->host, $this->_params->port);
-    }
 }
 
 class ConnectionCluster implements IConnectionCluster, \IteratorAggregate {