|
@@ -424,7 +424,8 @@ class Predis_Protocol {
|
|
|
}
|
|
|
|
|
|
abstract class Predis_Command {
|
|
|
- private $_arguments, $_hash;
|
|
|
+ private $_hash;
|
|
|
+ private $_arguments = array();
|
|
|
|
|
|
public abstract function getCommandId();
|
|
|
|
|
@@ -438,22 +439,24 @@ abstract class Predis_Command {
|
|
|
if (isset($this->_hash)) {
|
|
|
return $this->_hash;
|
|
|
}
|
|
|
- else {
|
|
|
- if (isset($this->_arguments[0])) {
|
|
|
- // TODO: should we throw an exception if the command does
|
|
|
- // not support sharding?
|
|
|
- $key = $this->_arguments[0];
|
|
|
-
|
|
|
- $start = strpos($key, '{');
|
|
|
- $end = strpos($key, '}');
|
|
|
- if ($start !== false && $end !== false) {
|
|
|
+
|
|
|
+ if (isset($this->_arguments[0])) {
|
|
|
+ // TODO: should we throw an exception if the command does
|
|
|
+ // not support sharding?
|
|
|
+ $key = $this->_arguments[0];
|
|
|
+
|
|
|
+ $start = strpos($key, '{');
|
|
|
+ if ($start !== false) {
|
|
|
+ $end = strpos($key, '}', $start);
|
|
|
+ if ($end !== false) {
|
|
|
$key = substr($key, ++$start, $end - $start);
|
|
|
}
|
|
|
-
|
|
|
- $this->_hash = $distributor->generateKey($key);
|
|
|
- return $this->_hash;
|
|
|
}
|
|
|
+
|
|
|
+ $this->_hash = $distributor->generateKey($key);
|
|
|
+ return $this->_hash;
|
|
|
}
|
|
|
+
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -476,11 +479,13 @@ abstract class Predis_Command {
|
|
|
}
|
|
|
|
|
|
public function getArguments() {
|
|
|
- return isset($this->_arguments) ? $this->_arguments : array();
|
|
|
+ return $this->_arguments;
|
|
|
}
|
|
|
|
|
|
public function getArgument($index = 0) {
|
|
|
- return isset($this->_arguments[$index]) ? $this->_arguments[$index] : null;
|
|
|
+ if (isset($this->_arguments[$index]) === true) {
|
|
|
+ return $this->_arguments[$index];
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public function parseResponse($data) {
|
|
@@ -498,8 +503,7 @@ abstract class Predis_InlineCommand extends Predis_Command {
|
|
|
$arguments[0] = implode($arguments[0], ' ');
|
|
|
}
|
|
|
return $command . (count($arguments) > 0
|
|
|
- ? ' ' . implode($arguments, ' ') . Predis_Protocol::NEWLINE
|
|
|
- : Predis_Protocol::NEWLINE
|
|
|
+ ? ' ' . implode($arguments, ' ') . "\r\n" : "\r\n"
|
|
|
);
|
|
|
}
|
|
|
}
|
|
@@ -511,7 +515,7 @@ abstract class Predis_BulkCommand extends Predis_Command {
|
|
|
$data = implode($data, ' ');
|
|
|
}
|
|
|
return $command . ' ' . implode($arguments, ' ') . ' ' . strlen($data) .
|
|
|
- Predis_Protocol::NEWLINE . $data . Predis_Protocol::NEWLINE;
|
|
|
+ "\r\n" . $data . "\r\n";
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -528,14 +532,14 @@ abstract class Predis_MultiBulkCommand extends Predis_Command {
|
|
|
$cmd_args = $arguments;
|
|
|
}
|
|
|
|
|
|
- $newline = Predis_Protocol::NEWLINE;
|
|
|
$cmdlen = strlen($command);
|
|
|
$reqlen = $argsc + 1;
|
|
|
|
|
|
- $buffer = "*{$reqlen}{$newline}\${$cmdlen}{$newline}{$command}{$newline}";
|
|
|
- foreach ($cmd_args as $argument) {
|
|
|
+ $buffer = "*{$reqlen}\r\n\${$cmdlen}\r\n{$command}\r\n";
|
|
|
+ for ($i = 0; $i < $reqlen - 1; $i++) {
|
|
|
+ $argument = $cmd_args[$i];
|
|
|
$arglen = strlen($argument);
|
|
|
- $buffer .= "\${$arglen}{$newline}{$argument}{$newline}";
|
|
|
+ $buffer .= "\${$arglen}\r\n{$argument}\r\n";
|
|
|
}
|
|
|
|
|
|
return $buffer;
|
|
@@ -550,10 +554,10 @@ interface Predis_IResponseHandler {
|
|
|
|
|
|
class Predis_ResponseStatusHandler implements Predis_IResponseHandler {
|
|
|
public function handle(Predis_Connection $connection, $status) {
|
|
|
- if ($status === Predis_Protocol::OK) {
|
|
|
+ if ($status === "OK") {
|
|
|
return true;
|
|
|
}
|
|
|
- else if ($status === Predis_Protocol::QUEUED) {
|
|
|
+ if ($status === "QUEUED") {
|
|
|
return new Predis_ResponseQueued();
|
|
|
}
|
|
|
return $status;
|
|
@@ -573,44 +577,31 @@ class Predis_ResponseErrorSilentHandler implements Predis_IResponseHandler {
|
|
|
}
|
|
|
|
|
|
class Predis_ResponseBulkHandler implements Predis_IResponseHandler {
|
|
|
- public function handle(Predis_Connection $connection, $dataLength) {
|
|
|
- if (!is_numeric($dataLength)) {
|
|
|
+ public function handle(Predis_Connection $connection, $lengthString) {
|
|
|
+ $length = (int) $lengthString;
|
|
|
+ if ($length != $lengthString) {
|
|
|
Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
|
|
|
- $connection, "Cannot parse '$dataLength' as data length"
|
|
|
+ $connection, "Cannot parse '$length' as data length"
|
|
|
));
|
|
|
}
|
|
|
-
|
|
|
- if ($dataLength > 0) {
|
|
|
- $value = $connection->readBytes($dataLength);
|
|
|
- self::discardNewLine($connection);
|
|
|
- return $value;
|
|
|
- }
|
|
|
- else if ($dataLength == 0) {
|
|
|
- self::discardNewLine($connection);
|
|
|
- return '';
|
|
|
+ if ($length >= 0) {
|
|
|
+ return $length > 0 ? substr($connection->readBytes($length + 2), 0, -2) : '';
|
|
|
}
|
|
|
-
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- private static function discardNewLine(Predis_Connection $connection) {
|
|
|
- if ($connection->readBytes(2) !== Predis_Protocol::NEWLINE) {
|
|
|
- Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
|
|
|
- $connection, 'Did not receive a new-line at the end of a bulk response'
|
|
|
- ));
|
|
|
+ if ($length == -1) {
|
|
|
+ return null;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
class Predis_ResponseMultiBulkHandler implements Predis_IResponseHandler {
|
|
|
- public function handle(Predis_Connection $connection, $rawLength) {
|
|
|
- if (!is_numeric($rawLength)) {
|
|
|
+ public function handle(Predis_Connection $connection, $lengthString) {
|
|
|
+ $listLength = (int) $lengthString;
|
|
|
+ if ($listLength != $lengthString) {
|
|
|
Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
|
|
|
- $connection, "Cannot parse '$rawLength' as data length"
|
|
|
+ $connection, "Cannot parse '$lengthString' as data length"
|
|
|
));
|
|
|
}
|
|
|
|
|
|
- $listLength = (int) $rawLength;
|
|
|
if ($listLength === -1) {
|
|
|
return null;
|
|
|
}
|
|
@@ -618,9 +609,19 @@ class Predis_ResponseMultiBulkHandler implements Predis_IResponseHandler {
|
|
|
$list = array();
|
|
|
|
|
|
if ($listLength > 0) {
|
|
|
+ $handlers = array();
|
|
|
$reader = $connection->getResponseReader();
|
|
|
for ($i = 0; $i < $listLength; $i++) {
|
|
|
- $list[] = $reader->read($connection);
|
|
|
+ $header = $connection->readLine();
|
|
|
+ $prefix = $header[0];
|
|
|
+ if (isset($handlers[$prefix])) {
|
|
|
+ $handler = $handlers[$prefix];
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ $handler = $reader->getHandler($prefix);
|
|
|
+ $handlers[$prefix] = $handler;
|
|
|
+ }
|
|
|
+ $list[$i] = $handler->handle($connection, substr($header, 1));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -629,13 +630,14 @@ class Predis_ResponseMultiBulkHandler implements Predis_IResponseHandler {
|
|
|
}
|
|
|
|
|
|
class Predis_ResponseMultiBulkStreamHandler implements Predis_IResponseHandler {
|
|
|
- public function handle(Predis_Connection $connection, $rawLength) {
|
|
|
- if (!is_numeric($rawLength)) {
|
|
|
+ public function handle(Predis_Connection $connection, $lengthString) {
|
|
|
+ $listLength = (int) $lengthString;
|
|
|
+ if ($listLength != $lengthString) {
|
|
|
Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
|
|
|
- $connection, "Cannot parse '$rawLength' as data length"
|
|
|
+ $connection, "Cannot parse '$lengthString' as data length"
|
|
|
));
|
|
|
}
|
|
|
- return new Predis_Shared_MultiBulkResponseIterator($connection, (int)$rawLength);
|
|
|
+ return new Predis_Shared_MultiBulkResponseIterator($connection, $lengthString);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -645,7 +647,7 @@ class Predis_ResponseIntegerHandler implements Predis_IResponseHandler {
|
|
|
return (int) $number;
|
|
|
}
|
|
|
else {
|
|
|
- if ($number !== Predis_Protocol::NULL) {
|
|
|
+ if ($number !== 'nil') {
|
|
|
Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
|
|
|
$connection, "Cannot parse '$number' as numeric response"
|
|
|
));
|
|
@@ -685,26 +687,27 @@ class Predis_ResponseReader {
|
|
|
public function read(Predis_Connection $connection) {
|
|
|
$header = $connection->readLine();
|
|
|
if ($header === '') {
|
|
|
- Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
|
|
|
- $connection, 'Unexpected empty header'
|
|
|
- ));
|
|
|
+ $this->throwMalformedResponse($connection, 'Unexpected empty header');
|
|
|
}
|
|
|
|
|
|
$prefix = $header[0];
|
|
|
- $payload = strlen($header) > 1 ? substr($header, 1) : '';
|
|
|
-
|
|
|
if (!isset($this->_prefixHandlers[$prefix])) {
|
|
|
- Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
|
|
|
- $connection, "Unknown prefix '$prefix'"
|
|
|
- ));
|
|
|
+ $this->throwMalformedResponse($connection, "Unknown prefix '$prefix'");
|
|
|
}
|
|
|
|
|
|
$handler = $this->_prefixHandlers[$prefix];
|
|
|
- return $handler->handle($connection, $payload);
|
|
|
+ return $handler->handle($connection, substr($header, 1));
|
|
|
+ }
|
|
|
+
|
|
|
+ private function throwMalformedResponse(Predis_Connection $connection, $message) {
|
|
|
+ Predis_Shared_Utils::onCommunicationException(new Predis_MalformedServerResponse(
|
|
|
+ $connection, $message
|
|
|
+ ));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
class Predis_ResponseError {
|
|
|
+ public $skipParse = true;
|
|
|
private $_message;
|
|
|
|
|
|
public function __construct($message) {
|
|
@@ -712,10 +715,10 @@ class Predis_ResponseError {
|
|
|
}
|
|
|
|
|
|
public function __get($property) {
|
|
|
- if ($property == 'error') {
|
|
|
+ if ($property === 'error') {
|
|
|
return true;
|
|
|
}
|
|
|
- if ($property == 'message') {
|
|
|
+ if ($property === 'message') {
|
|
|
return $this->_message;
|
|
|
}
|
|
|
}
|
|
@@ -730,11 +733,21 @@ class Predis_ResponseError {
|
|
|
}
|
|
|
|
|
|
class Predis_ResponseQueued {
|
|
|
- public $queued = true;
|
|
|
+ public $skipParse = true;
|
|
|
|
|
|
public function __toString() {
|
|
|
return Predis_Protocol::QUEUED;
|
|
|
}
|
|
|
+
|
|
|
+ public function __get($property) {
|
|
|
+ if ($property === 'queued') {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public function __isset($property) {
|
|
|
+ return $property === 'queued';
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/* ------------------------------------------------------------------------- */
|
|
@@ -881,7 +894,7 @@ class Predis_MultiExecBlock {
|
|
|
}
|
|
|
$command = $client->createCommand($method, $arguments);
|
|
|
$response = $client->executeCommand($command);
|
|
|
- if (!isset($response->queued)) {
|
|
|
+ if (!$response instanceof Predis_ResponseQueued) {
|
|
|
$this->malformedServerResponse(
|
|
|
'The server did not respond with a QUEUED status reply'
|
|
|
);
|
|
@@ -994,6 +1007,9 @@ class Predis_MultiExecBlock {
|
|
|
);
|
|
|
}
|
|
|
$this->reset();
|
|
|
+ if (isset($this->_options['on_retry']) && is_callable($this->_options['on_retry'])) {
|
|
|
+ call_user_func($this->_options['on_retry'], $this, $attemptsLeft);
|
|
|
+ }
|
|
|
continue;
|
|
|
}
|
|
|
break;
|
|
@@ -1358,8 +1374,7 @@ class Predis_Connection implements Predis_IConnection {
|
|
|
|
|
|
public function readResponse(Predis_Command $command) {
|
|
|
$response = $this->_reader->read($this);
|
|
|
- $skipparse = isset($response->queued) || isset($response->error);
|
|
|
- return $skipparse ? $response : $command->parseResponse($response);
|
|
|
+ return isset($response->skipParse) ? $response : $command->parseResponse($response);
|
|
|
}
|
|
|
|
|
|
public function executeCommand(Predis_Command $command) {
|
|
@@ -1395,7 +1410,7 @@ class Predis_Connection implements Predis_IConnection {
|
|
|
}
|
|
|
|
|
|
public function readBytes($length) {
|
|
|
- if ($length == 0) {
|
|
|
+ if ($length <= 0) {
|
|
|
throw new InvalidArgumentException('Length parameter must be greater than 0');
|
|
|
}
|
|
|
$socket = $this->getSocket();
|
|
@@ -1416,12 +1431,12 @@ class Predis_Connection implements Predis_IConnection {
|
|
|
$value = '';
|
|
|
do {
|
|
|
$chunk = fgets($socket);
|
|
|
- if ($chunk === false || strlen($chunk) == 0) {
|
|
|
+ if ($chunk === false || $chunk === '') {
|
|
|
$this->onCommunicationException('Error while reading line from the server');
|
|
|
}
|
|
|
$value .= $chunk;
|
|
|
}
|
|
|
- while (substr($value, -2) !== Predis_Protocol::NEWLINE);
|
|
|
+ while (substr($value, -2) !== "\r\n");
|
|
|
return substr($value, 0, -2);
|
|
|
}
|
|
|
|
|
@@ -1544,6 +1559,7 @@ abstract class Predis_RedisServerProfile {
|
|
|
return array(
|
|
|
'1.2' => 'Predis_RedisServer_v1_2',
|
|
|
'2.0' => 'Predis_RedisServer_v2_0',
|
|
|
+ '2.2' => 'Predis_RedisServer_v2_2',
|
|
|
'default' => 'Predis_RedisServer_v2_0',
|
|
|
'dev' => 'Predis_RedisServer_vNext',
|
|
|
);
|
|
@@ -1671,7 +1687,7 @@ class Predis_RedisServer_v1_2 extends Predis_RedisServerProfile {
|
|
|
'type' => 'Predis_Commands_Type',
|
|
|
|
|
|
/* commands operating on the key space */
|
|
|
- 'keys' => 'Predis_Commands_Keys',
|
|
|
+ 'keys' => 'Predis_Commands_Keys_v1_2',
|
|
|
'randomkey' => 'Predis_Commands_RandomKey',
|
|
|
'randomKey' => 'Predis_Commands_RandomKey',
|
|
|
'rename' => 'Predis_Commands_Rename',
|
|
@@ -1805,6 +1821,9 @@ class Predis_RedisServer_v2_0 extends Predis_RedisServer_v1_2 {
|
|
|
'append' => 'Predis_Commands_Append',
|
|
|
'substr' => 'Predis_Commands_Substr',
|
|
|
|
|
|
+ /* commands operating on the key space */
|
|
|
+ 'keys' => 'Predis_Commands_Keys',
|
|
|
+
|
|
|
/* commands operating on lists */
|
|
|
'blpop' => 'Predis_Commands_ListPopFirstBlocking',
|
|
|
'popFirstBlocking' => 'Predis_Commands_ListPopFirstBlocking',
|
|
@@ -1865,8 +1884,8 @@ class Predis_RedisServer_v2_0 extends Predis_RedisServer_v1_2 {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-class Predis_RedisServer_vNext extends Predis_RedisServer_v2_0 {
|
|
|
- public function getVersion() { return '2.1'; }
|
|
|
+class Predis_RedisServer_v2_2 extends Predis_RedisServer_v2_0 {
|
|
|
+ public function getVersion() { return '2.2'; }
|
|
|
public function getSupportedCommands() {
|
|
|
return array_merge(parent::getSupportedCommands(), array(
|
|
|
/* transactions */
|
|
@@ -1895,6 +1914,10 @@ class Predis_RedisServer_vNext extends Predis_RedisServer_v2_0 {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+class Predis_RedisServer_vNext extends Predis_RedisServer_v2_2 {
|
|
|
+ public function getVersion() { return 'DEV'; }
|
|
|
+}
|
|
|
+
|
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
interface Predis_Pipeline_IPipelineExecutor {
|
|
@@ -2434,12 +2457,11 @@ class Predis_Commands_Strlen extends Predis_MultiBulkCommand {
|
|
|
class Predis_Commands_Keys extends Predis_MultiBulkCommand {
|
|
|
public function canBeHashed() { return false; }
|
|
|
public function getCommandId() { return 'KEYS'; }
|
|
|
- public function parseResponse($data) {
|
|
|
- // TODO: is this behaviour correct?
|
|
|
- if (is_array($data) || $data instanceof Iterator) {
|
|
|
- return $data;
|
|
|
- }
|
|
|
- return strlen($data) > 0 ? explode(' ', $data) : array();
|
|
|
+}
|
|
|
+
|
|
|
+class Predis_Commands_Keys_v1_2 extends Predis_Commands_Keys {
|
|
|
+ public function parseResponse($data) {
|
|
|
+ return explode(' ', $data);
|
|
|
}
|
|
|
}
|
|
|
|