|
@@ -9,11 +9,12 @@ class MalformedServerResponse extends ServerException { }
|
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
class Client {
|
|
|
- private $_connection, $_serverProfile;
|
|
|
+ private $_connection, $_serverProfile, $_responseReader;
|
|
|
|
|
|
public function __construct($parameters = null, RedisServerProfile $serverProfile = null) {
|
|
|
+ $this->_responseReader = new ResponseReader();
|
|
|
$this->setProfile($serverProfile ?: RedisServerProfile::getDefault());
|
|
|
- $this->setupConnection($parameters);
|
|
|
+ $this->setupConnection($parameters, $this->_responseReader);
|
|
|
}
|
|
|
|
|
|
public function __destruct() {
|
|
@@ -57,7 +58,7 @@ class Client {
|
|
|
|
|
|
private function createConnection($parameters) {
|
|
|
$params = new ConnectionParameters($parameters);
|
|
|
- $connection = new Connection($params);
|
|
|
+ $connection = new Connection($params, $this->_responseReader);
|
|
|
|
|
|
if ($params->password !== null) {
|
|
|
$connection->pushInitCommand($this->createCommand(
|
|
@@ -232,7 +233,7 @@ abstract class InlineCommand extends Command {
|
|
|
if (isset($arguments[0]) && is_array($arguments[0])) {
|
|
|
$arguments[0] = implode($arguments[0], ' ');
|
|
|
}
|
|
|
- return $command . ' ' . implode($arguments, ' ') . Response::NEWLINE;
|
|
|
+ return $command . ' ' . implode($arguments, ' ') . ResponseReader::NEWLINE;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -243,7 +244,7 @@ abstract class BulkCommand extends Command {
|
|
|
$data = implode($data, ' ');
|
|
|
}
|
|
|
return $command . ' ' . implode($arguments, ' ') . ' ' . strlen($data) .
|
|
|
- Response::NEWLINE . $data . Response::NEWLINE;
|
|
|
+ ResponseReader::NEWLINE . $data . ResponseReader::NEWLINE;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -263,10 +264,10 @@ abstract class MultiBulkCommand extends Command {
|
|
|
$cmd_args = $arguments;
|
|
|
}
|
|
|
|
|
|
- $buffer[] = '*' . ((string) count($cmd_args) + 1) . Response::NEWLINE;
|
|
|
- $buffer[] = '$' . strlen($command) . Response::NEWLINE . $command . Response::NEWLINE;
|
|
|
+ $buffer[] = '*' . ((string) count($cmd_args) + 1) . ResponseReader::NEWLINE;
|
|
|
+ $buffer[] = '$' . strlen($command) . ResponseReader::NEWLINE . $command . ResponseReader::NEWLINE;
|
|
|
foreach ($cmd_args as $argument) {
|
|
|
- $buffer[] = '$' . strlen($argument) . Response::NEWLINE . $argument . Response::NEWLINE;
|
|
|
+ $buffer[] = '$' . strlen($argument) . ResponseReader::NEWLINE . $argument . ResponseReader::NEWLINE;
|
|
|
}
|
|
|
|
|
|
return implode('', $buffer);
|
|
@@ -276,15 +277,15 @@ abstract class MultiBulkCommand extends Command {
|
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
interface IResponseHandler {
|
|
|
- function handle($socket, $prefix, $payload);
|
|
|
+ function handle(ResponseReader $reader, $socket, $payload);
|
|
|
}
|
|
|
|
|
|
class ResponseStatusHandler implements IResponseHandler {
|
|
|
- public function handle($socket, $prefix, $status) {
|
|
|
- if ($status === Response::OK) {
|
|
|
+ public function handle(ResponseReader $reader, $socket, $status) {
|
|
|
+ if ($status === ResponseReader::OK) {
|
|
|
return true;
|
|
|
}
|
|
|
- else if ($status === Response::QUEUED) {
|
|
|
+ else if ($status === ResponseReader::QUEUED) {
|
|
|
return new ResponseQueued();
|
|
|
}
|
|
|
return $status;
|
|
@@ -292,13 +293,13 @@ class ResponseStatusHandler implements IResponseHandler {
|
|
|
}
|
|
|
|
|
|
class ResponseErrorHandler implements IResponseHandler {
|
|
|
- public function handle($socket, $prefix, $errorMessage) {
|
|
|
+ public function handle(ResponseReader $reader, $socket, $errorMessage) {
|
|
|
throw new ServerException(substr($errorMessage, 4));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
class ResponseBulkHandler implements IResponseHandler {
|
|
|
- public function handle($socket, $prefix, $dataLength) {
|
|
|
+ public function handle(ResponseReader $reader, $socket, $dataLength) {
|
|
|
if (!is_numeric($dataLength)) {
|
|
|
throw new ClientException("Cannot parse '$dataLength' as data length");
|
|
|
}
|
|
@@ -321,7 +322,7 @@ class ResponseBulkHandler implements IResponseHandler {
|
|
|
}
|
|
|
|
|
|
class ResponseMultiBulkHandler implements IResponseHandler {
|
|
|
- public function handle($socket, $prefix, $rawLength) {
|
|
|
+ public function handle(ResponseReader $reader, $socket, $rawLength) {
|
|
|
if (!is_numeric($rawLength)) {
|
|
|
throw new ClientException("Cannot parse '$rawLength' as data length");
|
|
|
}
|
|
@@ -335,7 +336,7 @@ class ResponseMultiBulkHandler implements IResponseHandler {
|
|
|
|
|
|
if ($listLength > 0) {
|
|
|
for ($i = 0; $i < $listLength; $i++) {
|
|
|
- $list[] = Response::read($socket);
|
|
|
+ $list[] = $reader->read($socket);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -344,12 +345,12 @@ class ResponseMultiBulkHandler implements IResponseHandler {
|
|
|
}
|
|
|
|
|
|
class ResponseIntegerHandler implements IResponseHandler {
|
|
|
- public function handle($socket, $prefix, $number) {
|
|
|
+ public function handle(ResponseReader $reader, $socket, $number) {
|
|
|
if (is_numeric($number)) {
|
|
|
return (int) $number;
|
|
|
}
|
|
|
else {
|
|
|
- if ($number !== Response::NULL) {
|
|
|
+ if ($number !== ResponseReader::NULL) {
|
|
|
throw new ClientException("Cannot parse '$number' as numeric response");
|
|
|
}
|
|
|
return null;
|
|
@@ -357,17 +358,21 @@ class ResponseIntegerHandler implements IResponseHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-class Response {
|
|
|
+class ResponseReader {
|
|
|
const NEWLINE = "\r\n";
|
|
|
const OK = 'OK';
|
|
|
const ERROR = 'ERR';
|
|
|
const QUEUED = 'QUEUED';
|
|
|
const NULL = 'nil';
|
|
|
|
|
|
- private static $_prefixHandlers;
|
|
|
+ private $_prefixHandlers;
|
|
|
|
|
|
- private static function initializePrefixHandlers() {
|
|
|
- return array(
|
|
|
+ public function __construct() {
|
|
|
+ $this->initializePrefixHandlers();
|
|
|
+ }
|
|
|
+
|
|
|
+ private function initializePrefixHandlers() {
|
|
|
+ $this->_prefixHandlers = array(
|
|
|
'+' => new ResponseStatusHandler(),
|
|
|
'-' => new ResponseErrorHandler(),
|
|
|
':' => new ResponseIntegerHandler(),
|
|
@@ -376,7 +381,7 @@ class Response {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public static function read($socket) {
|
|
|
+ public function read($socket) {
|
|
|
$header = fgets($socket);
|
|
|
if ($header === false) {
|
|
|
throw new ClientException('An error has occurred while reading from the network stream');
|
|
@@ -385,15 +390,12 @@ class Response {
|
|
|
$prefix = $header[0];
|
|
|
$payload = substr($header, 1, -2);
|
|
|
|
|
|
- if (!isset(self::$_prefixHandlers)) {
|
|
|
- self::$_prefixHandlers = self::initializePrefixHandlers();
|
|
|
- }
|
|
|
- if (!isset(self::$_prefixHandlers[$prefix])) {
|
|
|
+ if (!isset($this->_prefixHandlers[$prefix])) {
|
|
|
throw new MalformedServerResponse("Unknown prefix '$prefix'");
|
|
|
}
|
|
|
|
|
|
- $handler = self::$_prefixHandlers[$prefix];
|
|
|
- return $handler->handle($socket, $prefix, $payload);
|
|
|
+ $handler = $this->_prefixHandlers[$prefix];
|
|
|
+ return $handler->handle($this, $socket, $payload);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -401,7 +403,7 @@ class ResponseQueued {
|
|
|
public $queued = true;
|
|
|
|
|
|
public function __toString() {
|
|
|
- return Response::QUEUED;
|
|
|
+ return ResponseReader::QUEUED;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -639,11 +641,12 @@ interface IConnection {
|
|
|
}
|
|
|
|
|
|
class Connection implements IConnection {
|
|
|
- private $_params, $_socket, $_initCmds;
|
|
|
+ private $_params, $_socket, $_initCmds, $_reader;
|
|
|
|
|
|
- public function __construct(ConnectionParameters $parameters) {
|
|
|
+ public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
|
|
|
$this->_params = $parameters;
|
|
|
$this->_initCmds = array();
|
|
|
+ $this->_reader = $reader ?: new ResponseReader();
|
|
|
}
|
|
|
|
|
|
public function __destruct() {
|
|
@@ -703,7 +706,7 @@ class Connection implements IConnection {
|
|
|
}
|
|
|
|
|
|
public function readResponse(Command $command) {
|
|
|
- $response = Response::read($this->getSocket());
|
|
|
+ $response = $this->_reader->read($this->getSocket());
|
|
|
return isset($response->queued) ? $response : $command->parseResponse($response);
|
|
|
}
|
|
|
|
|
@@ -716,7 +719,7 @@ class Connection implements IConnection {
|
|
|
if ($closesConnection) {
|
|
|
return;
|
|
|
}
|
|
|
- return Response::read($socket);
|
|
|
+ return $this->_reader->read($socket);
|
|
|
}
|
|
|
|
|
|
public function getSocket() {
|