|
@@ -149,6 +149,10 @@ class Predis_Client {
|
|
|
return new Predis_CommandPipeline($this);
|
|
|
}
|
|
|
|
|
|
+ public function multiExec() {
|
|
|
+ return new Predis_MultiExecBlock($this);
|
|
|
+ }
|
|
|
+
|
|
|
public function registerCommands(Array $commands) {
|
|
|
$this->_serverProfile->registerCommands($commands);
|
|
|
}
|
|
@@ -277,6 +281,7 @@ class Predis_Response {
|
|
|
const NEWLINE = "\r\n";
|
|
|
const OK = 'OK';
|
|
|
const ERROR = 'ERR';
|
|
|
+ const QUEUED = 'QUEUED';
|
|
|
const NULL = 'nil';
|
|
|
|
|
|
private static $_prefixHandlers;
|
|
@@ -314,7 +319,13 @@ class Predis_Response {
|
|
|
|
|
|
public static function handleStatus($socket) {
|
|
|
$status = rtrim(fgets($socket), Predis_Response::NEWLINE);
|
|
|
- return $status === Predis_Response::OK ? true : $status;
|
|
|
+ if ($status === Predis_Response::OK) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ else if ($status === Predis_Response::QUEUED) {
|
|
|
+ return new Predis_ResponseQueued();
|
|
|
+ }
|
|
|
+ return $status;
|
|
|
}
|
|
|
|
|
|
public static function handleError($socket) {
|
|
@@ -335,8 +346,8 @@ class Predis_Response {
|
|
|
return $value;
|
|
|
}
|
|
|
else if ($dataLength == 0) {
|
|
|
- // TODO: I just have a doubt here...
|
|
|
fread($socket, 2);
|
|
|
+ return '';
|
|
|
}
|
|
|
|
|
|
return null;
|
|
@@ -379,6 +390,14 @@ class Predis_Response {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+class Predis_ResponseQueued {
|
|
|
+ public $queued = true;
|
|
|
+
|
|
|
+ public function __toString() {
|
|
|
+ return Predis_Response::QUEUED;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
class Predis_CommandPipeline {
|
|
|
private $_redisClient, $_pipelineBuffer, $_returnValues, $_running;
|
|
|
|
|
@@ -407,7 +426,7 @@ class Predis_CommandPipeline {
|
|
|
}
|
|
|
|
|
|
$connection = $this->_redisClient->getConnection();
|
|
|
- $commands = &$this->getRecordedCommands();
|
|
|
+ $commands = $this->getRecordedCommands();
|
|
|
|
|
|
foreach ($commands as $command) {
|
|
|
$connection->writeCommand($command);
|
|
@@ -449,6 +468,57 @@ class Predis_CommandPipeline {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+class Predis_MultiExecBlock {
|
|
|
+ private $_redisClient, $_commands, $_initialized;
|
|
|
+
|
|
|
+ public function __construct(Predis_Client $redisClient) {
|
|
|
+ $this->_initialized = false;
|
|
|
+ $this->_redisClient = $redisClient;
|
|
|
+ $this->_commands = array();
|
|
|
+ }
|
|
|
+
|
|
|
+ private function initialize() {
|
|
|
+ if ($this->_initialized === false) {
|
|
|
+ $this->_redisClient->multi();
|
|
|
+ $this->_initialized = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public function __call($method, $arguments) {
|
|
|
+ $this->initialize();
|
|
|
+ $command = $this->_redisClient->createCommand($method, $arguments);
|
|
|
+ $response = $this->_redisClient->executeCommand($command);
|
|
|
+ if (isset($response->queued)) {
|
|
|
+ $this->_commands[] = $command;
|
|
|
+ return $response;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ throw new Predis_ClientException('The server did not respond with a QUEUED status reply');
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public function execute() {
|
|
|
+ $blockException = null;
|
|
|
+ $returnValues = array();
|
|
|
+
|
|
|
+ try {
|
|
|
+ $execReply = $this->_redisClient->exec();
|
|
|
+ for ($i = 0; $i < count($execReply); $i++) {
|
|
|
+ $returnValues[] = $this->_commands[$i]->parseResponse($execReply[$i]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (Exception $exception) {
|
|
|
+ $blockException = $exception;
|
|
|
+ }
|
|
|
+
|
|
|
+ if ($blockException !== null) {
|
|
|
+ throw $blockException;
|
|
|
+ }
|
|
|
+
|
|
|
+ return $returnValues;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
class Predis_ConnectionParameters {
|
|
@@ -481,6 +551,12 @@ class Predis_ConnectionParameters {
|
|
|
case 'password':
|
|
|
$details['password'] = $v;
|
|
|
break;
|
|
|
+ case 'connection_timeout':
|
|
|
+ $details['connection_timeout'] = $v;
|
|
|
+ break;
|
|
|
+ case 'read_write_timeout':
|
|
|
+ $details['read_write_timeout'] = $v;
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
$parsed = array_merge($parsed, $details);
|
|
@@ -498,13 +574,19 @@ class Predis_ConnectionParameters {
|
|
|
'host' => self::getParamOrDefault($parameters, 'host', self::DEFAULT_HOST),
|
|
|
'port' => (int) self::getParamOrDefault($parameters, 'port', self::DEFAULT_PORT),
|
|
|
'database' => self::getParamOrDefault($parameters, 'database'),
|
|
|
- 'password' => self::getParamOrDefault($parameters, 'password')
|
|
|
+ 'password' => self::getParamOrDefault($parameters, 'password'),
|
|
|
+ 'connection_timeout' => self::getParamOrDefault($parameters, 'connection_timeout'),
|
|
|
+ 'read_write_timeout' => self::getParamOrDefault($parameters, 'read_write_timeout'),
|
|
|
);
|
|
|
}
|
|
|
|
|
|
public function __get($parameter) {
|
|
|
return $this->_parameters[$parameter];
|
|
|
}
|
|
|
+
|
|
|
+ public function __isset($parameter) {
|
|
|
+ return isset($this->_parameters[$parameter]);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
interface Predis_IConnection {
|
|
@@ -517,7 +599,6 @@ interface Predis_IConnection {
|
|
|
|
|
|
class Predis_Connection implements Predis_IConnection {
|
|
|
const CONNECTION_TIMEOUT = 2;
|
|
|
- const READ_WRITE_TIMEOUT = 5;
|
|
|
|
|
|
private $_params, $_socket, $_initCmds;
|
|
|
|
|
@@ -539,11 +620,17 @@ class Predis_Connection implements Predis_IConnection {
|
|
|
throw new Predis_ClientException('Connection already estabilished');
|
|
|
}
|
|
|
$uri = sprintf('tcp://%s:%d/', $this->_params->host, $this->_params->port);
|
|
|
- $this->_socket = @stream_socket_client($uri, $errno, $errstr, self::CONNECTION_TIMEOUT);
|
|
|
+ $connectionTimeout = isset($this->_params->connection_timeout)
|
|
|
+ ? $this->_params->connection_timeout
|
|
|
+ : self::CONNECTION_TIMEOUT;
|
|
|
+ $this->_socket = @stream_socket_client($uri, $errno, $errstr, $connectionTimeout);
|
|
|
if (!$this->_socket) {
|
|
|
throw new Predis_ClientException(trim($errstr), $errno);
|
|
|
}
|
|
|
- stream_set_timeout($this->_socket, self::READ_WRITE_TIMEOUT);
|
|
|
+
|
|
|
+ if (isset($this->_params->read_write_timeout)) {
|
|
|
+ stream_set_timeout($this->_socket, $this->_params->read_write_timeout);
|
|
|
+ }
|
|
|
|
|
|
if (count($this->_initCmds) > 0){
|
|
|
$this->sendInitializationCommands();
|
|
@@ -576,8 +663,8 @@ class Predis_Connection implements Predis_IConnection {
|
|
|
public function readResponse(Predis_Command $command) {
|
|
|
$socket = $this->getSocket();
|
|
|
$handler = Predis_Response::getPrefixHandler(fgetc($socket));
|
|
|
- $response = $command->parseResponse(call_user_func($handler, $socket));
|
|
|
- return $response;
|
|
|
+ $response = call_user_func($handler, $socket);
|
|
|
+ return isset($response->queued) ? $response : $command->parseResponse($response);
|
|
|
}
|
|
|
|
|
|
public function rawCommand($rawCommandData, $closesConnection = false) {
|
|
@@ -587,7 +674,7 @@ class Predis_Connection implements Predis_IConnection {
|
|
|
return;
|
|
|
}
|
|
|
$handler = Predis_Response::getPrefixHandler(fgetc($socket));
|
|
|
- return $handler($socket);
|
|
|
+ return call_user_func($handler, $socket);
|
|
|
}
|
|
|
|
|
|
public function getSocket() {
|
|
@@ -687,6 +774,10 @@ abstract class Predis_RedisServerProfile {
|
|
|
return new $defaultProfile();
|
|
|
}
|
|
|
|
|
|
+ public function supportsCommand($command) {
|
|
|
+ return isset($this->_registeredCommands[$command]);
|
|
|
+ }
|
|
|
+
|
|
|
public function createCommand($method, $arguments = array()) {
|
|
|
$commandClass = $this->_registeredCommands[$method];
|
|
|
|
|
@@ -890,6 +981,16 @@ class Predis_RedisServer__V1_2 extends Predis_RedisServer__V1_0 {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+class Predis_RedisServer__Futures extends Predis_RedisServer__V1_2 {
|
|
|
+ public function getVersion() { return 0; }
|
|
|
+ public function getSupportedCommands() {
|
|
|
+ return array_merge(parent::getSupportedCommands(), array(
|
|
|
+ 'multi' => 'Predis_Commands_Multi',
|
|
|
+ 'exec' => 'Predis_Commands_Exec'
|
|
|
+ ));
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
/* ------------------------------------------------------------------------- */
|
|
|
|
|
|
class Utilities_HashRing {
|
|
@@ -1359,11 +1460,21 @@ class Predis_Commands_Info extends Predis_InlineCommand {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-class SlaveOf extends Predis_InlineCommand {
|
|
|
+class Predis_Commands_SlaveOf extends Predis_InlineCommand {
|
|
|
public function canBeHashed() { return false; }
|
|
|
public function getCommandId() { return 'SLAVEOF'; }
|
|
|
public function filterArguments(Array $arguments) {
|
|
|
return count($arguments) === 0 ? array('NO ONE') : $arguments;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+class Predis_Commands_Multi extends Predis_InlineCommand {
|
|
|
+ public function canBeHashed() { return false; }
|
|
|
+ public function getCommandId() { return 'MULTI'; }
|
|
|
+}
|
|
|
+
|
|
|
+class Predis_Commands_Exec extends Predis_InlineCommand {
|
|
|
+ public function canBeHashed() { return false; }
|
|
|
+ public function getCommandId() { return 'EXEC'; }
|
|
|
+}
|
|
|
?>
|