Sfoglia il codice sorgente

Reworked the CommunicationException class and its derivates; When specified, close the underlying connection after a CommunicationException has been trown.

Daniele Alessandri 15 anni fa
parent
commit
f7f9e3483c
1 ha cambiato i file con 73 aggiunte e 16 eliminazioni
  1. 73 16
      lib/Predis.php

+ 73 - 16
lib/Predis.php

@@ -4,8 +4,22 @@ namespace Predis;
 class PredisException extends \Exception { }
 class ClientException extends PredisException { }                   // Client-side errors
 class ServerException extends PredisException { }                   // Server-side errors
-class CommunicationException extends PredisException { }            // Communication errors
-class MalformedServerResponse extends CommunicationException { }    // Unexpected responses
+
+class CommunicationException extends PredisException {              // Communication errors
+    private $_connection;
+
+    public function __construct($message, Connection $connection, $code = null) {
+        $this->_connection = $connection;
+        parent::__construct($message, $code);
+    }
+
+    public function getConnection() { return $this->_connection; }
+    public function shouldResetConnection() {  return true; }
+}
+
+class MalformedServerResponse extends CommunicationException {      // Unexpected responses
+    public function shouldResetConnection() {  return false;  }
+}
 
 /* ------------------------------------------------------------------------- */
 
@@ -301,19 +315,25 @@ class ResponseErrorSilentHandler implements IResponseHandler {
 class ResponseBulkHandler implements IResponseHandler {
     public function handle(Connection $connection, $dataLength) {
         if (!is_numeric($dataLength)) {
-            throw new MalformedServerResponse("Cannot parse '$dataLength' as data length");
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                "Cannot parse '$dataLength' as data length", $connection
+            ));
         }
 
         if ($dataLength > 0) {
             $value = $connection->readBytes($dataLength);
             if ($connection->readBytes(2) !== ResponseReader::NEWLINE) {
-                throw new MalformedServerResponse('Did not receive a new-line at the end of a bulk response');
+                Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                    'Did not receive a new-line at the end of a bulk response', $connection
+                ));
             }
             return $value;
         }
         else if ($dataLength == 0) {
             if ($connection->readBytes(2) !== ResponseReader::NEWLINE) {
-                throw new MalformedServerResponse('Did not receive a new-line at the end of a bulk response');
+                Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                    'Did not receive a new-line at the end of a bulk response', $connection
+                ));
             }
             return '';
         }
@@ -325,7 +345,9 @@ class ResponseBulkHandler implements IResponseHandler {
 class ResponseMultiBulkHandler implements IResponseHandler {
     public function handle(Connection $connection, $rawLength) {
         if (!is_numeric($rawLength)) {
-            throw new MalformedServerResponse("Cannot parse '$rawLength' as data length");
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                "Cannot parse '$rawLength' as data length", $connection
+            ));
         }
 
         $listLength = (int) $rawLength;
@@ -348,7 +370,9 @@ class ResponseMultiBulkHandler implements IResponseHandler {
 class ResponseMultiBulkStreamHandler implements IResponseHandler {
     public function handle(Connection $connection, $rawLength) {
         if (!is_numeric($rawLength)) {
-            throw new MalformedServerResponse("Cannot parse '$rawLength' as data length");
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                "Cannot parse '$rawLength' as data length", $connection
+            ));
         }
         return new Utilities\MultiBulkResponseIterator($connection, (int)$rawLength);
     }
@@ -361,7 +385,9 @@ class ResponseIntegerHandler implements IResponseHandler {
         }
         else {
             if ($number !== ResponseReader::NULL) {
-                throw new MalformedServerResponse("Cannot parse '$number' as numeric response");
+                Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                    "Cannot parse '$number' as numeric response", $connection
+                ));
             }
             return null;
         }
@@ -440,14 +466,18 @@ class ResponseReader {
     public function read(Connection $connection) {
         $header = $connection->readLine();
         if ($header === '') {
-            throw new MalformedServerResponse('Unexpected empty header');
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                'Unexpected empty header', $connection
+            ));
         }
 
         $prefix  = $header[0];
         $payload = strlen($header) > 1 ? substr($header, 1) : '';
 
         if (!isset($this->_prefixHandlers[$prefix])) {
-            throw new MalformedServerResponse("Unknown prefix '$prefix'");
+            Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+                "Unknown prefix '$prefix'", $connection
+            ));
         }
 
         $handler = $this->_prefixHandlers[$prefix];
@@ -603,7 +633,7 @@ class MultiExecBlock {
             return $this;
         }
         else {
-            throw new MalformedServerResponse('The server did not respond with a QUEUED status reply');
+            $this->malformedServerResponse('The server did not respond with a QUEUED status reply');
         }
     }
 
@@ -639,7 +669,7 @@ class MultiExecBlock {
             $sizeofReplies = count($execReply);
 
             if ($sizeofReplies !== count($commands)) {
-                throw new MalformedServerResponse('Unexpected number of responses for a MultiExecBlock');
+                $this->malformedServerResponse('Unexpected number of responses for a MultiExecBlock');
             }
 
             for ($i = 0; $i < $sizeofReplies; $i++) {
@@ -660,6 +690,15 @@ class MultiExecBlock {
 
         return $returnValues;
     }
+
+    private function malformedServerResponse($message) {
+        // NOTE: a MULTI/EXEC block cannot be initialized on a clustered 
+        //       connection, which means that Predis\Client::getConnection 
+        //       will always return an instance of Predis\Connection.
+        Utilities\Shared::onCommunicationException(new MalformedServerResponse(
+            $message, $this->_redisClient->getConnection()
+        ));
+    }
 }
 
 /* ------------------------------------------------------------------------- */
@@ -785,7 +824,7 @@ class Connection implements IConnection {
         );
 
         if (!$this->_socket) {
-            throw new CommunicationException(trim($errstr), $errno);
+            $this->onCommunicationException(trim($errstr), $errno);
         }
 
         if (isset($this->_params->read_write_timeout)) {
@@ -818,6 +857,12 @@ class Connection implements IConnection {
         }
     }
 
+    private function onCommunicationException($message, $code = null) {
+        Utilities\Shared::onCommunicationException(
+            new CommunicationException($message, $this, $code)
+        );
+    }
+
     public function writeCommand(Command $command) {
         $this->writeBytes($command());
     }
@@ -853,7 +898,7 @@ class Connection implements IConnection {
                 return true;
             }
             if ($written === false || $written === 0) {
-                throw new CommunicationException('Error while writing bytes to the server');
+                $this->onCommunicationException('Error while writing bytes to the server');
             }
             $value = substr($value, $written);
         }
@@ -866,7 +911,7 @@ class Connection implements IConnection {
         do {
             $chunk = fread($socket, $length);
             if ($chunk === false || $chunk === '') {
-                throw new CommunicationException('Error while reading bytes from the server');
+                $this->onCommunicationException('Error while reading bytes from the server');
             }
             $value .= $chunk;
         }
@@ -880,7 +925,7 @@ class Connection implements IConnection {
         do {
             $chunk = fgets($socket);
             if ($chunk === false || strlen($chunk) == 0) {
-                throw new CommunicationException('Error while reading line from the server');
+                $this->onCommunicationException('Error while reading line from the server');
             }
             $value .= $chunk;
         }
@@ -1307,6 +1352,18 @@ class RedisServer_vNext extends RedisServer_v1_2 {
 
 namespace Predis\Utilities;
 
+class Shared {
+    public static function onCommunicationException(\Predis\CommunicationException $exception) {
+        if ($exception->shouldResetConnection()) {
+            $connection = $exception->getConnection();
+            if ($connection->isConnected()) {
+                $connection->disconnect();
+            }
+        }
+        throw $exception;
+    }
+}
+
 class HashRing {
     const DEFAULT_REPLICAS = 128;
     const DEFAULT_WEIGHT   = 100;