|
@@ -7,13 +7,10 @@ class Predis_MalformedServerResponse extends Predis_ServerException { }
|
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
class Predis_Client {
|
|
|
- // TODO: command arguments should be sanitized or checked for bad arguments
|
|
|
- // (e.g. CRLF in keys for inline commands)
|
|
|
-
|
|
|
private $_connection, $_serverProfile;
|
|
|
|
|
|
public function __construct($parameters = null, Predis_RedisServerProfile $serverProfile = null) {
|
|
|
- $this->setServerProfile(
|
|
|
+ $this->setProfile(
|
|
|
$serverProfile === null
|
|
|
? Predis_RedisServerProfile::getDefault()
|
|
|
: $serverProfile
|
|
@@ -82,10 +79,14 @@ class Predis_Client {
|
|
|
$this->_connection = $connection;
|
|
|
}
|
|
|
|
|
|
- public function setServerProfile(Predis_RedisServerProfile $serverProfile) {
|
|
|
+ public function setProfile(Predis_RedisServerProfile $serverProfile) {
|
|
|
$this->_serverProfile = $serverProfile;
|
|
|
}
|
|
|
|
|
|
+ public function getProfile() {
|
|
|
+ return $this->_serverProfile;
|
|
|
+ }
|
|
|
+
|
|
|
public function connect() {
|
|
|
$this->_connection->connect();
|
|
|
}
|
|
@@ -137,28 +138,20 @@ class Predis_Client {
|
|
|
}
|
|
|
|
|
|
public function rawCommand($rawCommandData, $closesConnection = false) {
|
|
|
- // TODO: rather than check the type of a connection instance, we should
|
|
|
- // check if it does respond to the rawCommand method.
|
|
|
if (is_a($this->_connection, 'Predis_ConnectionCluster')) {
|
|
|
throw new Predis_ClientException('Cannot send raw commands when connected to a cluster of Redis servers');
|
|
|
}
|
|
|
return $this->_connection->rawCommand($rawCommandData, $closesConnection);
|
|
|
}
|
|
|
|
|
|
- public function pipeline() {
|
|
|
- return new Predis_CommandPipeline($this);
|
|
|
- }
|
|
|
-
|
|
|
- public function multiExec() {
|
|
|
- return new Predis_MultiExecBlock($this);
|
|
|
+ public function pipeline($pipelineBlock = null) {
|
|
|
+ $pipeline = new Predis_CommandPipeline($this);
|
|
|
+ return $pipelineBlock !== null ? $pipeline->execute($pipelineBlock) : $pipeline;
|
|
|
}
|
|
|
|
|
|
- public function registerCommands(Array $commands) {
|
|
|
- $this->_serverProfile->registerCommands($commands);
|
|
|
- }
|
|
|
-
|
|
|
- public function registerCommand($command, $aliases) {
|
|
|
- $this->_serverProfile->registerCommand($command, $aliases);
|
|
|
+ public function multiExec($multiExecBlock = null) {
|
|
|
+ $multiExec = new Predis_MultiExecBlock($this);
|
|
|
+ return $multiExecBlock !== null ? $multiExec->execute($multiExecBlock) : $multiExec;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -305,20 +298,7 @@ class Predis_Response {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- public static function getPrefixHandler($prefix) {
|
|
|
- if (self::$_prefixHandlers === null) {
|
|
|
- self::$_prefixHandlers = self::initializePrefixHandlers();
|
|
|
- }
|
|
|
-
|
|
|
- $handler = self::$_prefixHandlers[$prefix];
|
|
|
- if ($handler === null) {
|
|
|
- throw new Predis_MalformedServerResponse("Unknown prefix '$prefix'");
|
|
|
- }
|
|
|
- return $handler;
|
|
|
- }
|
|
|
-
|
|
|
- public static function handleStatus($socket) {
|
|
|
- $status = rtrim(fgets($socket), Predis_Response::NEWLINE);
|
|
|
+ public static function handleStatus($socket, $prefix, $status) {
|
|
|
if ($status === Predis_Response::OK) {
|
|
|
return true;
|
|
|
}
|
|
@@ -328,20 +308,20 @@ class Predis_Response {
|
|
|
return $status;
|
|
|
}
|
|
|
|
|
|
- public static function handleError($socket) {
|
|
|
- $errorMessage = rtrim(fgets($socket), Predis_Response::NEWLINE);
|
|
|
+ public static function handleError($socket, $prefix, $errorMessage) {
|
|
|
throw new Predis_ServerException(substr($errorMessage, 4));
|
|
|
}
|
|
|
|
|
|
- public static function handleBulk($socket) {
|
|
|
- $dataLength = rtrim(fgets($socket), Predis_Response::NEWLINE);
|
|
|
-
|
|
|
+ public static function handleBulk($socket, $prefix, $dataLength) {
|
|
|
if (!is_numeric($dataLength)) {
|
|
|
throw new Predis_ClientException("Cannot parse '$dataLength' as data length");
|
|
|
}
|
|
|
|
|
|
if ($dataLength > 0) {
|
|
|
$value = stream_get_contents($socket, $dataLength);
|
|
|
+ if ($value === false) {
|
|
|
+ throw new Predis_ClientException('An error has occurred while reading from the network stream');
|
|
|
+ }
|
|
|
fread($socket, 2);
|
|
|
return $value;
|
|
|
}
|
|
@@ -353,8 +333,7 @@ class Predis_Response {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- public static function handleMultiBulk($socket) {
|
|
|
- $rawLength = rtrim(fgets($socket), Predis_Response::NEWLINE);
|
|
|
+ public static function handleMultiBulk($socket, $prefix, $rawLength) {
|
|
|
if (!is_numeric($rawLength)) {
|
|
|
throw new Predis_ClientException("Cannot parse '$rawLength' as data length");
|
|
|
}
|
|
@@ -368,26 +347,44 @@ class Predis_Response {
|
|
|
|
|
|
if ($listLength > 0) {
|
|
|
for ($i = 0; $i < $listLength; $i++) {
|
|
|
- $handler = Predis_Response::getPrefixHandler(fgetc($socket));
|
|
|
- $list[] = call_user_func($handler, $socket);
|
|
|
+ $list[] = Predis_Response::read($socket);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return $list;
|
|
|
}
|
|
|
|
|
|
- public static function handleInteger($socket) {
|
|
|
- $number = rtrim(fgets($socket), Predis_Response::NEWLINE);
|
|
|
+ public static function handleInteger($socket, $prefix, $number) {
|
|
|
if (is_numeric($number)) {
|
|
|
return (int) $number;
|
|
|
}
|
|
|
else {
|
|
|
- if ($number !== Predis_Response::NULL) {
|
|
|
+ if ($number !== Response::NULL) {
|
|
|
throw new Predis_ClientException("Cannot parse '$number' as numeric response");
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public static function read($socket) {
|
|
|
+ $header = fgets($socket);
|
|
|
+ if ($header === false) {
|
|
|
+ throw new Predis_ClientException('An error has occurred while reading from the network stream');
|
|
|
+ }
|
|
|
+
|
|
|
+ $prefix = $header[0];
|
|
|
+ $payload = substr($header, 1, -2);
|
|
|
+
|
|
|
+ if (!isset(self::$_prefixHandlers)) {
|
|
|
+ self::$_prefixHandlers = self::initializePrefixHandlers();
|
|
|
+ }
|
|
|
+ if (!isset(self::$_prefixHandlers[$prefix])) {
|
|
|
+ throw new Predis_MalformedServerResponse("Unknown prefix '$prefix'");
|
|
|
+ }
|
|
|
+
|
|
|
+ $handler = self::$_prefixHandlers[$prefix];
|
|
|
+ return call_user_func($handler, $socket, $prefix, $payload);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
class Predis_ResponseQueued {
|
|
@@ -421,25 +418,24 @@ class Predis_CommandPipeline {
|
|
|
}
|
|
|
|
|
|
public function flushPipeline() {
|
|
|
- if (count($this->_pipelineBuffer) === 0) {
|
|
|
+ $sizeofPipe = count($this->_pipelineBuffer);
|
|
|
+ if ($sizeofPipe === 0) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
$connection = $this->_redisClient->getConnection();
|
|
|
- $commands = $this->getRecordedCommands();
|
|
|
+ $commands = &$this->_pipelineBuffer;
|
|
|
|
|
|
foreach ($commands as $command) {
|
|
|
$connection->writeCommand($command);
|
|
|
}
|
|
|
- foreach ($commands as $command) {
|
|
|
- $this->_returnValues[] = $connection->readResponse($command);
|
|
|
+ for ($i = 0; $i < $sizeofPipe; $i++) {
|
|
|
+ $this->_returnValues[] = $connection->readResponse($commands[$i]);
|
|
|
+ unset($commands[$i]);
|
|
|
}
|
|
|
-
|
|
|
- $this->_pipelineBuffer = array();
|
|
|
}
|
|
|
|
|
|
private function setRunning($bool) {
|
|
|
- // TODO: I am honest when I say that I don't like this approach.
|
|
|
if ($bool == true && $this->_running == true) {
|
|
|
throw new Predis_ClientException("This pipeline is already opened");
|
|
|
}
|
|
@@ -447,11 +443,18 @@ class Predis_CommandPipeline {
|
|
|
$this->_running = $bool;
|
|
|
}
|
|
|
|
|
|
- public function execute() {
|
|
|
+ public function execute($block = null) {
|
|
|
+ if ($block && !is_callable($block)) {
|
|
|
+ throw new RuntimeException('Argument passed must be a callable object');
|
|
|
+ }
|
|
|
+
|
|
|
$this->setRunning(true);
|
|
|
$pipelineBlockException = null;
|
|
|
|
|
|
try {
|
|
|
+ if ($block !== null) {
|
|
|
+ $block($this);
|
|
|
+ }
|
|
|
$this->flushPipeline();
|
|
|
}
|
|
|
catch (Exception $exception) {
|
|
@@ -497,14 +500,31 @@ class Predis_MultiExecBlock {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public function execute() {
|
|
|
+ public function execute($block = null) {
|
|
|
+ if ($block && !is_callable($block)) {
|
|
|
+ throw new RuntimeException('Argument passed must be a callable object');
|
|
|
+ }
|
|
|
+
|
|
|
$blockException = null;
|
|
|
$returnValues = array();
|
|
|
|
|
|
try {
|
|
|
+ if ($block !== null) {
|
|
|
+ $block($this);
|
|
|
+ }
|
|
|
+
|
|
|
$execReply = $this->_redisClient->exec();
|
|
|
- for ($i = 0; $i < count($execReply); $i++) {
|
|
|
- $returnValues[] = $this->_commands[$i]->parseResponse($execReply[$i]);
|
|
|
+ $commands = &$this->_commands;
|
|
|
+ $sizeofReplies = count($execReply);
|
|
|
+
|
|
|
+ if ($sizeofReplies !== count($commands)) {
|
|
|
+ // TODO: think of a better exception message
|
|
|
+ throw new Predis_ClientException("Out-of-sync");
|
|
|
+ }
|
|
|
+
|
|
|
+ for ($i = 0; $i < $sizeofReplies; $i++) {
|
|
|
+ $returnValues[] = $commands[$i]->parseResponse($execReply[$i]);
|
|
|
+ unset($commands[$i]);
|
|
|
}
|
|
|
}
|
|
|
catch (Exception $exception) {
|
|
@@ -657,24 +677,30 @@ class Predis_Connection implements Predis_IConnection {
|
|
|
}
|
|
|
|
|
|
public function writeCommand(Predis_Command $command) {
|
|
|
- fwrite($this->getSocket(), $command->invoke());
|
|
|
+ $written = fwrite($this->getSocket(), $command->invoke());
|
|
|
+ if ($written === false){
|
|
|
+ throw new Predis_ClientException(sprintf(
|
|
|
+ 'An error has occurred while writing command %s on the network stream'),
|
|
|
+ $command->getCommandId()
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public function readResponse(Predis_Command $command) {
|
|
|
- $socket = $this->getSocket();
|
|
|
- $handler = Predis_Response::getPrefixHandler(fgetc($socket));
|
|
|
- $response = call_user_func($handler, $socket);
|
|
|
+ $response = Predis_Response::read($this->getSocket());
|
|
|
return isset($response->queued) ? $response : $command->parseResponse($response);
|
|
|
}
|
|
|
|
|
|
public function rawCommand($rawCommandData, $closesConnection = false) {
|
|
|
$socket = $this->getSocket();
|
|
|
- fwrite($socket, $rawCommandData);
|
|
|
+ $written = fwrite($socket, $rawCommandData);
|
|
|
+ if ($written === false){
|
|
|
+ throw new Predis_ClientException('An error has occurred while writing a raw command on the network stream');
|
|
|
+ }
|
|
|
if ($closesConnection) {
|
|
|
return;
|
|
|
}
|
|
|
- $handler = Predis_Response::getPrefixHandler(fgetc($socket));
|
|
|
- return call_user_func($handler, $socket);
|
|
|
+ return Predis_Response::read($socket);
|
|
|
}
|
|
|
|
|
|
public function getSocket() {
|
|
@@ -758,7 +784,7 @@ class Predis_ConnectionCluster implements Predis_IConnection, IteratorAggregate
|
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
abstract class Predis_RedisServerProfile {
|
|
|
- const DEFAULT_SERVER_PROFILE = 'Predis_RedisServer__V1_2';
|
|
|
+ const DEFAULT_SERVER_PROFILE = 'Predis_RedisServer_v1_2';
|
|
|
private $_registeredCommands;
|
|
|
|
|
|
public function __construct() {
|
|
@@ -774,17 +800,27 @@ abstract class Predis_RedisServerProfile {
|
|
|
return new $defaultProfile();
|
|
|
}
|
|
|
|
|
|
+ public function compareWith($version, $operator = null) {
|
|
|
+ // one could expect that PHP's version_compare would behave
|
|
|
+ // the same way if invoked with 2 arguments or 3 arguments
|
|
|
+ // with the third being NULL, but it is not like that.
|
|
|
+ // TODO: since version_compare considers 1 < 1.0 < 1.0.0,
|
|
|
+ // we might need to revise the behavior of this method.
|
|
|
+ return ($operator === null
|
|
|
+ ? version_compare($this, $version)
|
|
|
+ : version_compare($this, $version, $operator)
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
public function supportsCommand($command) {
|
|
|
return isset($this->_registeredCommands[$command]);
|
|
|
}
|
|
|
|
|
|
public function createCommand($method, $arguments = array()) {
|
|
|
- $commandClass = $this->_registeredCommands[$method];
|
|
|
-
|
|
|
- if ($commandClass === null) {
|
|
|
+ if (!isset($this->_registeredCommands[$method])) {
|
|
|
throw new Predis_ClientException("'$method' is not a registered Redis command");
|
|
|
}
|
|
|
-
|
|
|
+ $commandClass = $this->_registeredCommands[$method];
|
|
|
$command = new $commandClass();
|
|
|
$command->setArgumentsArray($arguments);
|
|
|
return $command;
|
|
@@ -812,10 +848,14 @@ abstract class Predis_RedisServerProfile {
|
|
|
$this->_registeredCommands[$aliases] = $command;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public function __toString() {
|
|
|
+ return $this->getVersion();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-class Predis_RedisServer__V1_0 extends Predis_RedisServerProfile {
|
|
|
- public function getVersion() { return 1.0; }
|
|
|
+class Predis_RedisServer_v1_0 extends Predis_RedisServerProfile {
|
|
|
+ public function getVersion() { return '1.0'; }
|
|
|
public function getSupportedCommands() {
|
|
|
return array(
|
|
|
/* miscellaneous commands */
|
|
@@ -939,13 +979,13 @@ class Predis_RedisServer__V1_0 extends Predis_RedisServerProfile {
|
|
|
'backgroundSave' => 'Predis_Commands_BackgroundSave',
|
|
|
'lastsave' => 'Predis_Commands_LastSave',
|
|
|
'lastSave' => 'Predis_Commands_LastSave',
|
|
|
- 'shutdown' => 'Predis_Commands_Shutdown'
|
|
|
+ 'shutdown' => 'Predis_Commands_Shutdown',
|
|
|
);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-class Predis_RedisServer__V1_2 extends Predis_RedisServer__V1_0 {
|
|
|
- public function getVersion() { return 1.2; }
|
|
|
+class Predis_RedisServer_v1_2 extends Predis_RedisServer_v1_0 {
|
|
|
+ public function getVersion() { return '1.2'; }
|
|
|
public function getSupportedCommands() {
|
|
|
return array_merge(parent::getSupportedCommands(), array(
|
|
|
/* commands operating on string values */
|
|
@@ -976,17 +1016,24 @@ class Predis_RedisServer__V1_2 extends Predis_RedisServer__V1_0 {
|
|
|
'zscore' => 'Predis_Commands_ZSetScore',
|
|
|
'zsetScore' => 'Predis_Commands_ZSetScore',
|
|
|
'zremrangebyscore' => 'Predis_Commands_ZSetRemoveRangeByScore',
|
|
|
- 'zsetRemoveRangeByScore' => 'Predis_Commands_ZSetRemoveRangeByScore'
|
|
|
+ 'zsetRemoveRangeByScore' => 'Predis_Commands_ZSetRemoveRangeByScore',
|
|
|
));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-class Predis_RedisServer__Futures extends Predis_RedisServer__V1_2 {
|
|
|
- public function getVersion() { return 0; }
|
|
|
+class Predis_RedisServer_vNext extends Predis_RedisServer_v1_2 {
|
|
|
+ public function getVersion() { return '1.3'; }
|
|
|
public function getSupportedCommands() {
|
|
|
return array_merge(parent::getSupportedCommands(), array(
|
|
|
+ /* miscellaneous commands */
|
|
|
'multi' => 'Predis_Commands_Multi',
|
|
|
- 'exec' => 'Predis_Commands_Exec'
|
|
|
+ 'exec' => 'Predis_Commands_Exec',
|
|
|
+
|
|
|
+ /* commands operating on lists */
|
|
|
+ 'blpop' => 'Predis_Commands_ListPopFirstBlocking',
|
|
|
+ 'popFirstBlocking' => 'Predis_Commands_ListPopFirstBlocking',
|
|
|
+ 'brpop' => 'Predis_Commands_ListPopLastBlocking',
|
|
|
+ 'popLastBlocking' => 'Predis_Commands_ListPopLastBlocking',
|
|
|
));
|
|
|
}
|
|
|
}
|
|
@@ -1167,7 +1214,6 @@ class Predis_Commands_RandomKey extends Predis_InlineCommand {
|
|
|
}
|
|
|
|
|
|
class Predis_Commands_Rename extends Predis_InlineCommand {
|
|
|
- // TODO: doesn't RENAME break the hash-based client-side sharding?
|
|
|
public function canBeHashed() { return false; }
|
|
|
public function getCommandId() { return 'RENAME'; }
|
|
|
}
|
|
@@ -1242,6 +1288,14 @@ class Predis_Commands_ListPopLast extends Predis_InlineCommand {
|
|
|
public function getCommandId() { return 'RPOP'; }
|
|
|
}
|
|
|
|
|
|
+class Predis_Commands_ListPopFirstBlocking extends Predis_InlineCommand {
|
|
|
+ public function getCommandId() { return 'BLPOP'; }
|
|
|
+}
|
|
|
+
|
|
|
+class Predis_Commands_ListPopLastBlocking extends Predis_InlineCommand {
|
|
|
+ public function getCommandId() { return 'BRPOP'; }
|
|
|
+}
|
|
|
+
|
|
|
/* commands operating on sets */
|
|
|
class Predis_Commands_SetAdd extends Predis_BulkCommand {
|
|
|
public function getCommandId() { return 'SADD'; }
|
|
@@ -1422,6 +1476,12 @@ class Predis_Commands_Save extends Predis_InlineCommand {
|
|
|
class Predis_Commands_BackgroundSave extends Predis_InlineCommand {
|
|
|
public function canBeHashed() { return false; }
|
|
|
public function getCommandId() { return 'BGSAVE'; }
|
|
|
+ public function parseResponse($data) {
|
|
|
+ if ($data == 'Background saving started') {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return $data;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
class Predis_Commands_LastSave extends Predis_InlineCommand {
|