浏览代码

Merge branch 'response_reader' into integration

Daniele Alessandri 15 年之前
父节点
当前提交
9086cfc284
共有 1 个文件被更改,包括 62 次插入33 次删除
  1. 62 33
      lib/Predis.php

+ 62 - 33
lib/Predis.php

@@ -9,9 +9,10 @@ class MalformedServerResponse extends ServerException { }
 /* ------------------------------------------------------------------------- */
 
 class Client {
-    private $_connection, $_serverProfile;
+    private $_connection, $_serverProfile, $_responseReader;
 
     public function __construct($parameters = null, RedisServerProfile $serverProfile = null) {
+        $this->_responseReader = new ResponseReader();
         $this->setProfile($serverProfile ?: RedisServerProfile::getDefault());
         $this->setupConnection($parameters);
     }
@@ -57,7 +58,7 @@ class Client {
 
     private function createConnection($parameters) {
         $params     = new ConnectionParameters($parameters);
-        $connection = new Connection($params);
+        $connection = new Connection($params, $this->_responseReader);
 
         if ($params->password !== null) {
             $connection->pushInitCommand($this->createCommand(
@@ -85,6 +86,10 @@ class Client {
         return $this->_serverProfile;
     }
 
+    public function getResponseReader() {
+        return $this->_responseReader;
+    }
+
     public function connect() {
         $this->_connection->connect();
     }
@@ -232,7 +237,7 @@ abstract class InlineCommand extends Command {
         if (isset($arguments[0]) && is_array($arguments[0])) {
             $arguments[0] = implode($arguments[0], ' ');
         }
-        return $command . ' ' . implode($arguments, ' ') . Response::NEWLINE;
+        return $command . ' ' . implode($arguments, ' ') . ResponseReader::NEWLINE;
     }
 }
 
@@ -243,7 +248,7 @@ abstract class BulkCommand extends Command {
             $data = implode($data, ' ');
         }
         return $command . ' ' . implode($arguments, ' ') . ' ' . strlen($data) . 
-            Response::NEWLINE . $data . Response::NEWLINE;
+            ResponseReader::NEWLINE . $data . ResponseReader::NEWLINE;
     }
 }
 
@@ -263,10 +268,10 @@ abstract class MultiBulkCommand extends Command {
             $cmd_args = $arguments;
         }
 
-        $buffer[] = '*' . ((string) count($cmd_args) + 1) . Response::NEWLINE;
-        $buffer[] = '$' . strlen($command) . Response::NEWLINE . $command . Response::NEWLINE;
+        $buffer[] = '*' . ((string) count($cmd_args) + 1) . ResponseReader::NEWLINE;
+        $buffer[] = '$' . strlen($command) . ResponseReader::NEWLINE . $command . ResponseReader::NEWLINE;
         foreach ($cmd_args as $argument) {
-            $buffer[] = '$' . strlen($argument) . Response::NEWLINE . $argument . Response::NEWLINE;
+            $buffer[] = '$' . strlen($argument) . ResponseReader::NEWLINE . $argument . ResponseReader::NEWLINE;
         }
 
         return implode('', $buffer);
@@ -276,15 +281,15 @@ abstract class MultiBulkCommand extends Command {
 /* ------------------------------------------------------------------------- */
 
 interface IResponseHandler {
-    function handle($socket, $prefix, $payload);
+    function handle(ResponseReader $reader, $socket, $payload);
 }
 
 class ResponseStatusHandler implements IResponseHandler {
-    public function handle($socket, $prefix, $status) {
-        if ($status === Response::OK) {
+    public function handle(ResponseReader $reader, $socket, $status) {
+        if ($status === ResponseReader::OK) {
             return true;
         }
-        else if ($status === Response::QUEUED) {
+        else if ($status === ResponseReader::QUEUED) {
             return new ResponseQueued();
         }
         return $status;
@@ -292,13 +297,13 @@ class ResponseStatusHandler implements IResponseHandler {
 }
 
 class ResponseErrorHandler implements IResponseHandler {
-    public function handle($socket, $prefix, $errorMessage) {
+    public function handle(ResponseReader $reader, $socket, $errorMessage) {
         throw new ServerException(substr($errorMessage, 4));
     }
 }
 
 class ResponseBulkHandler implements IResponseHandler {
-    public function handle($socket, $prefix, $dataLength) {
+    public function handle(ResponseReader $reader, $socket, $dataLength) {
         if (!is_numeric($dataLength)) {
             throw new ClientException("Cannot parse '$dataLength' as data length");
         }
@@ -321,7 +326,7 @@ class ResponseBulkHandler implements IResponseHandler {
 }
 
 class ResponseMultiBulkHandler implements IResponseHandler {
-    public function handle($socket, $prefix, $rawLength) {
+    public function handle(ResponseReader $reader, $socket, $rawLength) {
         if (!is_numeric($rawLength)) {
             throw new ClientException("Cannot parse '$rawLength' as data length");
         }
@@ -335,7 +340,7 @@ class ResponseMultiBulkHandler implements IResponseHandler {
 
         if ($listLength > 0) {
             for ($i = 0; $i < $listLength; $i++) {
-                $list[] = Response::read($socket);
+                $list[] = $reader->read($socket);
             }
         }
 
@@ -344,12 +349,12 @@ class ResponseMultiBulkHandler implements IResponseHandler {
 }
 
 class ResponseIntegerHandler implements IResponseHandler {
-    public function handle($socket, $prefix, $number) {
+    public function handle(ResponseReader $reader, $socket, $number) {
         if (is_numeric($number)) {
             return (int) $number;
         }
         else {
-            if ($number !== Response::NULL) {
+            if ($number !== ResponseReader::NULL) {
                 throw new ClientException("Cannot parse '$number' as numeric response");
             }
             return null;
@@ -357,17 +362,21 @@ class ResponseIntegerHandler implements IResponseHandler {
     }
 }
 
-class Response {
+class ResponseReader {
     const NEWLINE = "\r\n";
     const OK      = 'OK';
     const ERROR   = 'ERR';
     const QUEUED  = 'QUEUED';
     const NULL    = 'nil';
 
-    private static $_prefixHandlers;
+    private $_prefixHandlers;
 
-    private static function initializePrefixHandlers() {
-        return array(
+    public function __construct() {
+        $this->initializePrefixHandlers();
+    }
+
+    private function initializePrefixHandlers() {
+        $this->_prefixHandlers = array(
             '+' => new ResponseStatusHandler(), 
             '-' => new ResponseErrorHandler(), 
             ':' => new ResponseIntegerHandler(), 
@@ -376,7 +385,25 @@ class Response {
         );
     }
 
-    public static function read($socket) {
+    private function setHandler($prefix, IResponseHandler $handler) {
+        $this->_prefixHandlers[$prefix] = $handler;
+    }
+
+    public function setOption($option, $value) {
+        switch ($option) {
+            default:
+                throw new \InvalidArgumentException("Unknown option: $option");
+        }
+    }
+
+    public function getOption($option) {
+        switch ($option) {
+            default:
+                throw new \InvalidArgumentException("Unknown option: $option");
+        }
+    }
+
+    public function read($socket) {
         $header  = fgets($socket);
         if ($header === false) {
            throw new ClientException('An error has occurred while reading from the network stream');
@@ -385,15 +412,12 @@ class Response {
         $prefix  = $header[0];
         $payload = substr($header, 1, -2);
 
-        if (!isset(self::$_prefixHandlers)) {
-            self::$_prefixHandlers = self::initializePrefixHandlers();
-        }
-        if (!isset(self::$_prefixHandlers[$prefix])) {
+        if (!isset($this->_prefixHandlers[$prefix])) {
             throw new MalformedServerResponse("Unknown prefix '$prefix'");
         }
 
-        $handler = self::$_prefixHandlers[$prefix];
-        return $handler->handle($socket, $prefix, $payload);
+        $handler = $this->_prefixHandlers[$prefix];
+        return $handler->handle($this, $socket, $payload);
     }
 }
 
@@ -401,7 +425,7 @@ class ResponseQueued {
     public $queued = true;
 
     public function __toString() {
-        return Response::QUEUED;
+        return ResponseReader::QUEUED;
     }
 }
 
@@ -652,11 +676,12 @@ interface IConnection {
 }
 
 class Connection implements IConnection {
-    private $_params, $_socket, $_initCmds;
+    private $_params, $_socket, $_initCmds, $_reader;
 
-    public function __construct(ConnectionParameters $parameters) {
+    public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
         $this->_params   = $parameters;
         $this->_initCmds = array();
+        $this->_reader   = $reader ?: new ResponseReader();
     }
 
     public function __destruct() {
@@ -716,7 +741,7 @@ class Connection implements IConnection {
     }
 
     public function readResponse(Command $command) {
-        $response = Response::read($this->getSocket());
+        $response = $this->_reader->read($this->getSocket());
         return isset($response->queued) ? $response : $command->parseResponse($response);
     }
 
@@ -729,7 +754,7 @@ class Connection implements IConnection {
         if ($closesConnection) {
             return;
         }
-        return Response::read($socket);
+        return $this->_reader->read($socket);
     }
 
     public function getSocket() {
@@ -739,6 +764,10 @@ class Connection implements IConnection {
         return $this->_socket;
     }
 
+    public function getResponseReader() {
+        return $this->_reader;
+    }
+
     public function getAlias() {
         return $this->_params->alias;
     }