|
@@ -812,18 +812,15 @@ class Predis_CommandPipeline {
|
|
|
}
|
|
|
|
|
|
class Predis_MultiExecBlock {
|
|
|
- private $_initialized, $_discarded, $_insideBlock;
|
|
|
+ private $_initialized, $_discarded, $_insideBlock, $_checkAndSet;
|
|
|
private $_redisClient, $_options, $_commands;
|
|
|
private $_supportsWatch;
|
|
|
|
|
|
public function __construct(Predis_Client $redisClient, Array $options = null) {
|
|
|
$this->checkCapabilities($redisClient);
|
|
|
- $this->_initialized = false;
|
|
|
- $this->_discarded = false;
|
|
|
- $this->_insideBlock = false;
|
|
|
+ $this->_options = isset($options) ? $options : array();
|
|
|
$this->_redisClient = $redisClient;
|
|
|
- $this->_options = isset($options) ? $options : array();
|
|
|
- $this->_commands = array();
|
|
|
+ $this->reset();
|
|
|
}
|
|
|
|
|
|
private function checkCapabilities(Predis_Client $redisClient) {
|
|
@@ -849,127 +846,169 @@ class Predis_MultiExecBlock {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private function reset() {
|
|
|
+ $this->_initialized = false;
|
|
|
+ $this->_discarded = false;
|
|
|
+ $this->_checkAndSet = false;
|
|
|
+ $this->_insideBlock = false;
|
|
|
+ $this->_commands = array();
|
|
|
+ }
|
|
|
+
|
|
|
private function initialize() {
|
|
|
- if ($this->_initialized === false) {
|
|
|
- if (isset($this->_options['watch'])) {
|
|
|
- $this->watch($this->_options['watch']);
|
|
|
- }
|
|
|
+ if ($this->_initialized === true) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ $options = &$this->_options;
|
|
|
+ $this->_checkAndSet = isset($options['cas']) && $options['cas'];
|
|
|
+ if (isset($options['watch'])) {
|
|
|
+ $this->watch($options['watch']);
|
|
|
+ }
|
|
|
+ if (!$this->_checkAndSet || ($this->_discarded && $this->_checkAndSet)) {
|
|
|
$this->_redisClient->multi();
|
|
|
- $this->_initialized = true;
|
|
|
- $this->_discarded = false;
|
|
|
+ if ($this->_discarded) {
|
|
|
+ $this->_checkAndSet = false;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- private function setInsideBlock($value) {
|
|
|
- $this->_insideBlock = $value;
|
|
|
+ $this->_initialized = true;
|
|
|
+ $this->_discarded = false;
|
|
|
}
|
|
|
|
|
|
public function __call($method, $arguments) {
|
|
|
$this->initialize();
|
|
|
- $command = $this->_redisClient->createCommand($method, $arguments);
|
|
|
- $response = $this->_redisClient->executeCommand($command);
|
|
|
- if (isset($response->queued)) {
|
|
|
- $this->_commands[] = $command;
|
|
|
- return $this;
|
|
|
- }
|
|
|
- else {
|
|
|
- $this->malformedServerResponse('The server did not respond with a QUEUED status reply');
|
|
|
+ $client = $this->_redisClient;
|
|
|
+ if ($this->_checkAndSet) {
|
|
|
+ return call_user_func_array(array($client, $method), $arguments);
|
|
|
+ }
|
|
|
+ $command = $client->createCommand($method, $arguments);
|
|
|
+ $response = $client->executeCommand($command);
|
|
|
+ if (!isset($response->queued)) {
|
|
|
+ $this->malformedServerResponse(
|
|
|
+ 'The server did not respond with a QUEUED status reply'
|
|
|
+ );
|
|
|
}
|
|
|
+ $this->_commands[] = $command;
|
|
|
+ return $this;
|
|
|
}
|
|
|
|
|
|
public function watch($keys) {
|
|
|
$this->isWatchSupported();
|
|
|
- if ($this->_initialized === true) {
|
|
|
+ if ($this->_initialized && !$this->_checkAndSet) {
|
|
|
throw new Predis_ClientException('WATCH inside MULTI is not allowed');
|
|
|
}
|
|
|
-
|
|
|
- $reply = null;
|
|
|
- if (is_array($keys)) {
|
|
|
- $reply = array();
|
|
|
- foreach ($keys as $key) {
|
|
|
- $reply = $this->_redisClient->watch($keys);
|
|
|
- }
|
|
|
- }
|
|
|
- else {
|
|
|
- $reply = $this->_redisClient->watch($keys);
|
|
|
- }
|
|
|
- return $reply;
|
|
|
+ return $this->_redisClient->watch($keys);
|
|
|
}
|
|
|
|
|
|
public function multi() {
|
|
|
+ if ($this->_initialized && $this->_checkAndSet) {
|
|
|
+ $this->_checkAndSet = false;
|
|
|
+ $this->_redisClient->multi();
|
|
|
+ return $this;
|
|
|
+ }
|
|
|
$this->initialize();
|
|
|
+ return $this;
|
|
|
}
|
|
|
|
|
|
public function unwatch() {
|
|
|
$this->isWatchSupported();
|
|
|
$this->_redisClient->unwatch();
|
|
|
+ return $this;
|
|
|
}
|
|
|
|
|
|
public function discard() {
|
|
|
$this->_redisClient->discard();
|
|
|
- $this->_commands = array();
|
|
|
- $this->_initialized = false;
|
|
|
- $this->_discarded = true;
|
|
|
+ $this->reset();
|
|
|
+ $this->_discarded = true;
|
|
|
+ return $this;
|
|
|
}
|
|
|
|
|
|
public function exec() {
|
|
|
return $this->execute();
|
|
|
}
|
|
|
|
|
|
- public function execute($block = null) {
|
|
|
+ private function checkBeforeExecution($block) {
|
|
|
if ($this->_insideBlock === true) {
|
|
|
throw new Predis_ClientException(
|
|
|
"Cannot invoke 'execute' or 'exec' inside an active client transaction block"
|
|
|
);
|
|
|
}
|
|
|
-
|
|
|
- if ($block && !is_callable($block)) {
|
|
|
- throw new InvalidArgumentException('Argument passed must be a callable object');
|
|
|
+ if ($block) {
|
|
|
+ if (!is_callable($block)) {
|
|
|
+ throw new InvalidArgumentException(
|
|
|
+ 'Argument passed must be a callable object'
|
|
|
+ );
|
|
|
+ }
|
|
|
+ if (count($this->_commands) > 0) {
|
|
|
+ throw new Predis_ClientException(
|
|
|
+ 'Cannot execute a transaction block after using fluent interface'
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (isset($this->_options['retry']) && !isset($block)) {
|
|
|
+ $this->discard();
|
|
|
+ throw new InvalidArgumentException(
|
|
|
+ 'Automatic retries can be used only when a transaction block is provided'
|
|
|
+ );
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- $blockException = null;
|
|
|
- $returnValues = array();
|
|
|
+ public function execute($block = null) {
|
|
|
+ $this->checkBeforeExecution($block);
|
|
|
|
|
|
- if ($block !== null) {
|
|
|
- $this->setInsideBlock(true);
|
|
|
- try {
|
|
|
- $block($this);
|
|
|
- }
|
|
|
- catch (Predis_CommunicationException $exception) {
|
|
|
- $blockException = $exception;
|
|
|
- }
|
|
|
- catch (Predis_ServerException $exception) {
|
|
|
- $blockException = $exception;
|
|
|
- }
|
|
|
- catch (Exception $exception) {
|
|
|
- $blockException = $exception;
|
|
|
- if ($this->_initialized === true) {
|
|
|
- $this->discard();
|
|
|
+ $reply = null;
|
|
|
+ $returnValues = array();
|
|
|
+ $attemptsLeft = isset($this->_options['retry']) ? (int)$this->_options['retry'] : 0;
|
|
|
+ do {
|
|
|
+ $blockException = null;
|
|
|
+ if ($block !== null) {
|
|
|
+ $this->_insideBlock = true;
|
|
|
+ try {
|
|
|
+ $block($this);
|
|
|
+ }
|
|
|
+ catch (Predis_CommunicationException $exception) {
|
|
|
+ $blockException = $exception;
|
|
|
+ }
|
|
|
+ catch (Predis_ServerException $exception) {
|
|
|
+ $blockException = $exception;
|
|
|
+ }
|
|
|
+ catch (Exception $exception) {
|
|
|
+ $blockException = $exception;
|
|
|
+ if ($this->_initialized === true) {
|
|
|
+ $this->discard();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ $this->_insideBlock = false;
|
|
|
+ if ($blockException !== null) {
|
|
|
+ throw $blockException;
|
|
|
}
|
|
|
}
|
|
|
- $this->setInsideBlock(false);
|
|
|
- if ($blockException !== null) {
|
|
|
- throw $blockException;
|
|
|
+
|
|
|
+ if ($this->_initialized === false || count($this->_commands) == 0) {
|
|
|
+ return;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- if ($this->_initialized === false) {
|
|
|
- return;
|
|
|
- }
|
|
|
+ $reply = $this->_redisClient->exec();
|
|
|
+ if ($reply === null) {
|
|
|
+ if ($attemptsLeft === 0) {
|
|
|
+ throw new Predis_AbortedMultiExec(
|
|
|
+ 'The current transaction has been aborted by the server'
|
|
|
+ );
|
|
|
+ }
|
|
|
+ $this->reset();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ } while ($attemptsLeft-- > 0);
|
|
|
|
|
|
- $reply = $this->_redisClient->exec();
|
|
|
- if ($reply === null) {
|
|
|
- throw new Predis_AbortedMultiExec('The current transaction has been aborted by the server');
|
|
|
- }
|
|
|
|
|
|
$execReply = $reply instanceof Iterator ? iterator_to_array($reply) : $reply;
|
|
|
- $commands = &$this->_commands;
|
|
|
$sizeofReplies = count($execReply);
|
|
|
|
|
|
+ $commands = &$this->_commands;
|
|
|
if ($sizeofReplies !== count($commands)) {
|
|
|
- $this->malformedServerResponse('Unexpected number of responses for a Predis_MultiExecBlock');
|
|
|
+ $this->malformedServerResponse(
|
|
|
+ 'Unexpected number of responses for a MultiExecBlock'
|
|
|
+ );
|
|
|
}
|
|
|
-
|
|
|
for ($i = 0; $i < $sizeofReplies; $i++) {
|
|
|
$returnValues[] = $commands[$i]->parseResponse($execReply[$i] instanceof Iterator
|
|
|
? iterator_to_array($execReply[$i])
|
|
@@ -980,6 +1019,17 @@ class Predis_MultiExecBlock {
|
|
|
|
|
|
return $returnValues;
|
|
|
}
|
|
|
+
|
|
|
+ private function malformedServerResponse($message) {
|
|
|
+ // Since a MULTI/EXEC block cannot be initialized over a clustered
|
|
|
+ // connection, we can safely assume that Predis_Client::getConnection()
|
|
|
+ // will always return an instance of Predis_Connection.
|
|
|
+ Predis_Shared_Utils::onCommunicationException(
|
|
|
+ new Predis_MalformedServerResponse(
|
|
|
+ $this->_redisClient->getConnection(), $message
|
|
|
+ )
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
class Predis_PubSubContext implements Iterator {
|
|
@@ -1825,6 +1875,10 @@ class Predis_RedisServer_vNext extends Predis_RedisServer_v2_0 {
|
|
|
|
|
|
/* commands operating on string values */
|
|
|
'strlen' => 'Predis_Commands_Strlen',
|
|
|
+ 'setrange' => 'Predis_Commands_SetRange',
|
|
|
+ 'getrange' => 'Predis_Commands_Substr',
|
|
|
+ 'setbit' => 'Predis_Commands_SetBit',
|
|
|
+ 'getbit' => 'Predis_Commands_GetBit',
|
|
|
|
|
|
/* commands operating on the key space */
|
|
|
'persist' => 'Predis_Commands_Persist',
|
|
@@ -1833,6 +1887,7 @@ class Predis_RedisServer_vNext extends Predis_RedisServer_v2_0 {
|
|
|
'rpushx' => 'Predis_Commands_ListPushTailX',
|
|
|
'lpushx' => 'Predis_Commands_ListPushHeadX',
|
|
|
'linsert' => 'Predis_Commands_ListInsert',
|
|
|
+ 'brpoplpush' => 'Predis_Commands_ListPopLastPushHeadBlocking',
|
|
|
|
|
|
/* commands operating on sorted sets */
|
|
|
'zrevrangebyscore' => 'Predis_Commands_ZSetReverseRangeByScore',
|
|
@@ -2355,10 +2410,22 @@ class Predis_Commands_Append extends Predis_MultiBulkCommand {
|
|
|
public function getCommandId() { return 'APPEND'; }
|
|
|
}
|
|
|
|
|
|
+class Predis_Commands_SetRange extends Predis_MultiBulkCommand {
|
|
|
+ public function getCommandId() { return 'SETRANGE'; }
|
|
|
+}
|
|
|
+
|
|
|
class Predis_Commands_Substr extends Predis_MultiBulkCommand {
|
|
|
public function getCommandId() { return 'SUBSTR'; }
|
|
|
}
|
|
|
|
|
|
+class Predis_Commands_SetBit extends Predis_MultiBulkCommand {
|
|
|
+ public function getCommandId() { return 'SETBIT'; }
|
|
|
+}
|
|
|
+
|
|
|
+class Predis_Commands_GetBit extends Predis_MultiBulkCommand {
|
|
|
+ public function getCommandId() { return 'GETBIT'; }
|
|
|
+}
|
|
|
+
|
|
|
class Predis_Commands_Strlen extends Predis_MultiBulkCommand {
|
|
|
public function getCommandId() { return 'STRLEN'; }
|
|
|
}
|
|
@@ -2462,8 +2529,8 @@ class Predis_Commands_ListPopLastPushHead extends Predis_MultiBulkCommand {
|
|
|
public function getCommandId() { return 'RPOPLPUSH'; }
|
|
|
}
|
|
|
|
|
|
-class Predis_Commands_ListPopLastPushHeadBulk extends Predis_MultiBulkCommand {
|
|
|
- public function getCommandId() { return 'RPOPLPUSH'; }
|
|
|
+class Predis_Commands_ListPopLastPushHeadBlocking extends Predis_MultiBulkCommand {
|
|
|
+ public function getCommandId() { return 'BRPOPLPUSH'; }
|
|
|
}
|
|
|
|
|
|
class Predis_Commands_ListPopFirst extends Predis_MultiBulkCommand {
|