|
@@ -282,11 +282,11 @@ abstract class MultiBulkCommand extends Command {
|
|
/* ------------------------------------------------------------------------- */
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
interface IResponseHandler {
|
|
interface IResponseHandler {
|
|
- function handle(ResponseReader $reader, $socket, $payload);
|
|
|
|
|
|
+ function handle(Connection $connection, $payload);
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseStatusHandler implements IResponseHandler {
|
|
class ResponseStatusHandler implements IResponseHandler {
|
|
- public function handle(ResponseReader $reader, $socket, $status) {
|
|
|
|
|
|
+ public function handle(Connection $connection, $status) {
|
|
if ($status === ResponseReader::OK) {
|
|
if ($status === ResponseReader::OK) {
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
@@ -298,19 +298,21 @@ class ResponseStatusHandler implements IResponseHandler {
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseErrorHandler implements IResponseHandler {
|
|
class ResponseErrorHandler implements IResponseHandler {
|
|
- public function handle(ResponseReader $reader, $socket, $errorMessage) {
|
|
|
|
|
|
+ public function handle(Connection $connection, $errorMessage) {
|
|
throw new ServerException(substr($errorMessage, 4));
|
|
throw new ServerException(substr($errorMessage, 4));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseErrorSilentHandler implements IResponseHandler {
|
|
class ResponseErrorSilentHandler implements IResponseHandler {
|
|
- public function handle(ResponseReader $reader, $socket, $errorMessage) {
|
|
|
|
|
|
+ public function handle(Connection $connection, $errorMessage) {
|
|
return new ResponseError(substr($errorMessage, 4));
|
|
return new ResponseError(substr($errorMessage, 4));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseBulkHandler implements IResponseHandler {
|
|
class ResponseBulkHandler implements IResponseHandler {
|
|
- public function handle(ResponseReader $reader, $socket, $dataLength) {
|
|
|
|
|
|
+ public function handle(Connection $connection, $dataLength) {
|
|
|
|
+ $socket = $connection->getSocket();
|
|
|
|
+
|
|
if (!is_numeric($dataLength)) {
|
|
if (!is_numeric($dataLength)) {
|
|
throw new ClientException("Cannot parse '$dataLength' as data length");
|
|
throw new ClientException("Cannot parse '$dataLength' as data length");
|
|
}
|
|
}
|
|
@@ -333,7 +335,7 @@ class ResponseBulkHandler implements IResponseHandler {
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseMultiBulkHandler implements IResponseHandler {
|
|
class ResponseMultiBulkHandler implements IResponseHandler {
|
|
- public function handle(ResponseReader $reader, $socket, $rawLength) {
|
|
|
|
|
|
+ public function handle(Connection $connection, $rawLength) {
|
|
if (!is_numeric($rawLength)) {
|
|
if (!is_numeric($rawLength)) {
|
|
throw new ClientException("Cannot parse '$rawLength' as data length");
|
|
throw new ClientException("Cannot parse '$rawLength' as data length");
|
|
}
|
|
}
|
|
@@ -347,7 +349,7 @@ class ResponseMultiBulkHandler implements IResponseHandler {
|
|
|
|
|
|
if ($listLength > 0) {
|
|
if ($listLength > 0) {
|
|
for ($i = 0; $i < $listLength; $i++) {
|
|
for ($i = 0; $i < $listLength; $i++) {
|
|
- $list[] = $reader->read($socket);
|
|
|
|
|
|
+ $list[] = $connection->getResponseReader()->read($connection);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -356,16 +358,16 @@ class ResponseMultiBulkHandler implements IResponseHandler {
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseMultiBulkStreamHandler implements IResponseHandler {
|
|
class ResponseMultiBulkStreamHandler implements IResponseHandler {
|
|
- public function handle(ResponseReader $reader, $socket, $rawLength) {
|
|
|
|
|
|
+ public function handle(Connection $connection, $rawLength) {
|
|
if (!is_numeric($rawLength)) {
|
|
if (!is_numeric($rawLength)) {
|
|
throw new ClientException("Cannot parse '$rawLength' as data length");
|
|
throw new ClientException("Cannot parse '$rawLength' as data length");
|
|
}
|
|
}
|
|
- return new Utilities\MultiBulkResponseIterator($socket, $reader, (int)$rawLength);
|
|
|
|
|
|
+ return new Utilities\MultiBulkResponseIterator($connection, (int)$rawLength);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
class ResponseIntegerHandler implements IResponseHandler {
|
|
class ResponseIntegerHandler implements IResponseHandler {
|
|
- public function handle(ResponseReader $reader, $socket, $number) {
|
|
|
|
|
|
+ public function handle(Connection $connection, $number) {
|
|
if (is_numeric($number)) {
|
|
if (is_numeric($number)) {
|
|
return (int) $number;
|
|
return (int) $number;
|
|
}
|
|
}
|
|
@@ -447,8 +449,8 @@ class ResponseReader {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- public function read($socket) {
|
|
|
|
- $header = fgets($socket);
|
|
|
|
|
|
+ public function read(Connection $connection) {
|
|
|
|
+ $header = fgets($connection->getSocket());
|
|
if ($header === false) {
|
|
if ($header === false) {
|
|
throw new ClientException('An error has occurred while reading from the network stream');
|
|
throw new ClientException('An error has occurred while reading from the network stream');
|
|
}
|
|
}
|
|
@@ -461,7 +463,7 @@ class ResponseReader {
|
|
}
|
|
}
|
|
|
|
|
|
$handler = $this->_prefixHandlers[$prefix];
|
|
$handler = $this->_prefixHandlers[$prefix];
|
|
- return $handler->handle($this, $socket, $payload);
|
|
|
|
|
|
+ return $handler->handle($connection, $payload);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -822,7 +824,7 @@ class Connection implements IConnection {
|
|
}
|
|
}
|
|
|
|
|
|
public function readResponse(Command $command) {
|
|
public function readResponse(Command $command) {
|
|
- $response = $this->_reader->read($this->getSocket());
|
|
|
|
|
|
+ $response = $this->_reader->read($this);
|
|
return isset($response->queued) ? $response : $command->parseResponse($response);
|
|
return isset($response->queued) ? $response : $command->parseResponse($response);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -835,7 +837,7 @@ class Connection implements IConnection {
|
|
if ($closesConnection) {
|
|
if ($closesConnection) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- return $this->_reader->read($socket);
|
|
|
|
|
|
+ return $this->_reader->read($this);
|
|
}
|
|
}
|
|
|
|
|
|
public function getSocket() {
|
|
public function getSocket() {
|
|
@@ -1337,9 +1339,9 @@ abstract class MultiBulkResponseIteratorBase implements \Iterator, \Countable {
|
|
class MultiBulkResponseIterator extends MultiBulkResponseIteratorBase {
|
|
class MultiBulkResponseIterator extends MultiBulkResponseIteratorBase {
|
|
private $_connection;
|
|
private $_connection;
|
|
|
|
|
|
- public function __construct($socket, $reader, $size) {
|
|
|
|
- $this->_connection = $socket;
|
|
|
|
- $this->_reader = $reader;
|
|
|
|
|
|
+ public function __construct(\Predis\Connection $connection, $size) {
|
|
|
|
+ $this->_connection = $connection;
|
|
|
|
+ $this->_reader = $connection->getResponseReader();
|
|
$this->_position = 0;
|
|
$this->_position = 0;
|
|
$this->_current = $size > 0 ? $this->getValue() : null;
|
|
$this->_current = $size > 0 ? $this->getValue() : null;
|
|
$this->_replySize = $size;
|
|
$this->_replySize = $size;
|