|
@@ -11,15 +11,18 @@ use Predis\CommunicationException;
|
|
|
use Predis\Protocol\ProtocolException;
|
|
|
|
|
|
class MultiExecContext {
|
|
|
- private $_client;
|
|
|
- private $_options;
|
|
|
- private $_commands;
|
|
|
- private $_supportsWatch;
|
|
|
- private $_initialized;
|
|
|
- private $_discarded;
|
|
|
- private $_insideBlock;
|
|
|
- private $_checkAndSet;
|
|
|
- private $_watchedKeys;
|
|
|
+ const STATE_RESET = 0x00000;
|
|
|
+ const STATE_INITIALIZED = 0x00001;
|
|
|
+ const STATE_INSIDEBLOCK = 0x00010;
|
|
|
+ const STATE_DISCARDED = 0x00100;
|
|
|
+ const STATE_CAS = 0x01000;
|
|
|
+ const STATE_WATCH = 0x10000;
|
|
|
+
|
|
|
+ private $_state;
|
|
|
+ private $_canWatch;
|
|
|
+ protected $_client;
|
|
|
+ protected $_options;
|
|
|
+ protected $_commands;
|
|
|
|
|
|
public function __construct(Client $client, Array $options = null) {
|
|
|
$this->checkCapabilities($client);
|
|
@@ -28,6 +31,26 @@ class MultiExecContext {
|
|
|
$this->reset();
|
|
|
}
|
|
|
|
|
|
+ protected function setState($flags) {
|
|
|
+ $this->_state = $flags;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected function getState() {
|
|
|
+ return $this->_state;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected function flagState($flags) {
|
|
|
+ $this->_state |= $flags;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected function unflagState($flags) {
|
|
|
+ $this->_state &= ~$flags;
|
|
|
+ }
|
|
|
+
|
|
|
+ protected function checkState($flags) {
|
|
|
+ return ($this->_state & $flags) === $flags;
|
|
|
+ }
|
|
|
+
|
|
|
private function checkCapabilities(Client $client) {
|
|
|
if (Helpers::isCluster($client->getConnection())) {
|
|
|
throw new ClientException(
|
|
@@ -37,52 +60,52 @@ class MultiExecContext {
|
|
|
$profile = $client->getProfile();
|
|
|
if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
|
|
|
throw new ClientException(
|
|
|
- 'The current profile does not support MULTI, EXEC and DISCARD commands'
|
|
|
+ 'The current profile does not support MULTI, EXEC and DISCARD'
|
|
|
);
|
|
|
}
|
|
|
- $this->_supportsWatch = $profile->supportsCommands(array('watch', 'unwatch'));
|
|
|
+ $this->_canWatch = $profile->supportsCommands(array('watch', 'unwatch'));
|
|
|
}
|
|
|
|
|
|
private function isWatchSupported() {
|
|
|
- if ($this->_supportsWatch === false) {
|
|
|
+ if ($this->_canWatch === false) {
|
|
|
throw new ClientException(
|
|
|
- 'The current profile does not support WATCH and UNWATCH commands'
|
|
|
+ 'The current profile does not support WATCH and UNWATCH'
|
|
|
);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private function reset() {
|
|
|
- $this->_initialized = false;
|
|
|
- $this->_discarded = false;
|
|
|
- $this->_checkAndSet = false;
|
|
|
- $this->_insideBlock = false;
|
|
|
- $this->_watchedKeys = false;
|
|
|
- $this->_commands = array();
|
|
|
+ protected function reset() {
|
|
|
+ $this->setState(self::STATE_RESET);
|
|
|
+ $this->_commands = array();
|
|
|
}
|
|
|
|
|
|
- private function initialize() {
|
|
|
- if ($this->_initialized === true) {
|
|
|
+ protected function initialize() {
|
|
|
+ if ($this->checkState(self::STATE_INITIALIZED)) {
|
|
|
return;
|
|
|
}
|
|
|
$options = $this->_options;
|
|
|
- $this->_checkAndSet = isset($options['cas']) && $options['cas'];
|
|
|
+ if (isset($options['cas']) && $options['cas']) {
|
|
|
+ $this->flagState(self::STATE_CAS);
|
|
|
+ }
|
|
|
if (isset($options['watch'])) {
|
|
|
$this->watch($options['watch']);
|
|
|
}
|
|
|
- if (!$this->_checkAndSet || ($this->_discarded && $this->_checkAndSet)) {
|
|
|
+ $cas = $this->checkState(self::STATE_CAS);
|
|
|
+ $discarded = $this->checkState(self::STATE_DISCARDED);
|
|
|
+ if (!$cas || ($cas && $discarded)) {
|
|
|
$this->_client->multi();
|
|
|
- if ($this->_discarded) {
|
|
|
- $this->_checkAndSet = false;
|
|
|
+ if ($discarded) {
|
|
|
+ $this->unflagState(self::STATE_CAS);
|
|
|
}
|
|
|
}
|
|
|
- $this->_initialized = true;
|
|
|
- $this->_discarded = false;
|
|
|
+ $this->unflagState(self::STATE_DISCARDED);
|
|
|
+ $this->flagState(self::STATE_INITIALIZED);
|
|
|
}
|
|
|
|
|
|
public function __call($method, $arguments) {
|
|
|
$this->initialize();
|
|
|
$client = $this->_client;
|
|
|
- if ($this->_checkAndSet) {
|
|
|
+ if ($this->checkState(self::STATE_CAS)) {
|
|
|
return call_user_func_array(array($client, $method), $arguments);
|
|
|
}
|
|
|
$command = $client->createCommand($method, $arguments);
|
|
@@ -96,36 +119,38 @@ class MultiExecContext {
|
|
|
|
|
|
public function watch($keys) {
|
|
|
$this->isWatchSupported();
|
|
|
- $this->_watchedKeys = true;
|
|
|
- if ($this->_initialized && !$this->_checkAndSet) {
|
|
|
- throw new ClientException('WATCH inside MULTI is not allowed');
|
|
|
+ if ($this->checkState(self::STATE_INITIALIZED) && !$this->checkState(self::STATE_CAS)) {
|
|
|
+ throw new ClientException('WATCH after MULTI is not allowed');
|
|
|
}
|
|
|
- return $this->_client->watch($keys);
|
|
|
+ $watchReply = $this->_client->watch($keys);
|
|
|
+ $this->flagState(self::STATE_WATCH);
|
|
|
+ return $watchReply;
|
|
|
}
|
|
|
|
|
|
public function multi() {
|
|
|
- if ($this->_initialized && $this->_checkAndSet) {
|
|
|
- $this->_checkAndSet = false;
|
|
|
+ if ($this->checkState(self::STATE_INITIALIZED | self::STATE_CAS)) {
|
|
|
+ $this->unflagState(self::STATE_CAS);
|
|
|
$this->_client->multi();
|
|
|
- return $this;
|
|
|
}
|
|
|
- $this->initialize();
|
|
|
+ else {
|
|
|
+ $this->initialize();
|
|
|
+ }
|
|
|
return $this;
|
|
|
}
|
|
|
|
|
|
public function unwatch() {
|
|
|
$this->isWatchSupported();
|
|
|
- $this->_watchedKeys = false;
|
|
|
+ $this->unflagState(self::STATE_WATCH);
|
|
|
$this->_client->unwatch();
|
|
|
return $this;
|
|
|
}
|
|
|
|
|
|
public function discard() {
|
|
|
- if ($this->_initialized === true) {
|
|
|
- $command = $this->_checkAndSet ? 'unwatch' : 'discard';
|
|
|
+ if ($this->checkState(self::STATE_INITIALIZED)) {
|
|
|
+ $command = $this->checkState(self::STATE_CAS) ? 'unwatch' : 'discard';
|
|
|
$this->_client->$command();
|
|
|
$this->reset();
|
|
|
- $this->_discarded = true;
|
|
|
+ $this->flagState(self::STATE_DISCARDED);
|
|
|
}
|
|
|
return $this;
|
|
|
}
|
|
@@ -135,7 +160,7 @@ class MultiExecContext {
|
|
|
}
|
|
|
|
|
|
private function checkBeforeExecution($block) {
|
|
|
- if ($this->_insideBlock === true) {
|
|
|
+ if ($this->checkState(self::STATE_INSIDEBLOCK)) {
|
|
|
throw new ClientException(
|
|
|
"Cannot invoke 'execute' or 'exec' inside an active client transaction block"
|
|
|
);
|
|
@@ -167,33 +192,15 @@ class MultiExecContext {
|
|
|
$reply = null;
|
|
|
$returnValues = array();
|
|
|
$attemptsLeft = isset($this->_options['retry']) ? (int)$this->_options['retry'] : 0;
|
|
|
+
|
|
|
do {
|
|
|
- $blockException = null;
|
|
|
if ($block !== null) {
|
|
|
- $this->_insideBlock = true;
|
|
|
- try {
|
|
|
- $block($this);
|
|
|
- }
|
|
|
- catch (CommunicationException $exception) {
|
|
|
- $blockException = $exception;
|
|
|
- }
|
|
|
- catch (ServerException $exception) {
|
|
|
- $blockException = $exception;
|
|
|
- }
|
|
|
- catch (\Exception $exception) {
|
|
|
- $blockException = $exception;
|
|
|
- $this->discard();
|
|
|
- }
|
|
|
- $this->_insideBlock = false;
|
|
|
- if ($blockException !== null) {
|
|
|
- throw $blockException;
|
|
|
- }
|
|
|
+ $this->executeTransactionBlock($block);
|
|
|
}
|
|
|
|
|
|
if (count($this->_commands) === 0) {
|
|
|
- if ($this->_watchedKeys) {
|
|
|
+ if ($this->checkState(self::STATE_WATCH)) {
|
|
|
$this->discard();
|
|
|
- return;
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
@@ -216,21 +223,44 @@ class MultiExecContext {
|
|
|
$execReply = $reply instanceof \Iterator ? iterator_to_array($reply) : $reply;
|
|
|
$sizeofReplies = count($execReply);
|
|
|
|
|
|
- $commands = &$this->_commands;
|
|
|
+ $commands = $this->_commands;
|
|
|
if ($sizeofReplies !== count($commands)) {
|
|
|
- $this->onProtocolError('Unexpected number of responses for a MultiExecContext');
|
|
|
+ $this->onProtocolError("EXEC returned an unexpected number of replies");
|
|
|
}
|
|
|
for ($i = 0; $i < $sizeofReplies; $i++) {
|
|
|
- $returnValues[] = $commands[$i]->parseResponse($execReply[$i] instanceof \Iterator
|
|
|
- ? iterator_to_array($execReply[$i])
|
|
|
- : $execReply[$i]
|
|
|
- );
|
|
|
+ $commandReply = $execReply[$i];
|
|
|
+ if ($commandReply instanceof \Iterator) {
|
|
|
+ $commandReply = iterator_to_array($commandReply);
|
|
|
+ }
|
|
|
+ $returnValues[$i] = $commands[$i]->parseResponse($commandReply);
|
|
|
unset($commands[$i]);
|
|
|
}
|
|
|
|
|
|
return $returnValues;
|
|
|
}
|
|
|
|
|
|
+ protected function executeTransactionBlock($block) {
|
|
|
+ $blockException = null;
|
|
|
+ $this->flagState(self::STATE_INSIDEBLOCK);
|
|
|
+ try {
|
|
|
+ $block($this);
|
|
|
+ }
|
|
|
+ catch (CommunicationException $exception) {
|
|
|
+ $blockException = $exception;
|
|
|
+ }
|
|
|
+ catch (ServerException $exception) {
|
|
|
+ $blockException = $exception;
|
|
|
+ }
|
|
|
+ catch (\Exception $exception) {
|
|
|
+ $blockException = $exception;
|
|
|
+ $this->discard();
|
|
|
+ }
|
|
|
+ $this->unflagState(self::STATE_INSIDEBLOCK);
|
|
|
+ if ($blockException !== null) {
|
|
|
+ throw $blockException;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private function onProtocolError($message) {
|
|
|
// Since a MULTI/EXEC block cannot be initialized over a clustered
|
|
|
// connection, we can safely assume that Predis\Client::getConnection()
|