|
@@ -11,6 +11,7 @@
|
|
|
|
|
|
namespace Predis\Transaction;
|
|
|
|
|
|
+use InvalidArgumentException;
|
|
|
use SplQueue;
|
|
|
use Predis\BasicClientInterface;
|
|
|
use Predis\ClientException;
|
|
@@ -30,19 +31,14 @@ use Predis\Protocol\ProtocolException;
|
|
|
*/
|
|
|
class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
{
|
|
|
- const STATE_RESET = 0; // 0b00000
|
|
|
- const STATE_INITIALIZED = 1; // 0b00001
|
|
|
- const STATE_INSIDEBLOCK = 2; // 0b00010
|
|
|
- const STATE_DISCARDED = 4; // 0b00100
|
|
|
- const STATE_CAS = 8; // 0b01000
|
|
|
- const STATE_WATCH = 16; // 0b10000
|
|
|
-
|
|
|
private $state;
|
|
|
- private $canWatch;
|
|
|
|
|
|
protected $client;
|
|
|
- protected $options;
|
|
|
protected $commands;
|
|
|
+ protected $exceptions = true;
|
|
|
+ protected $attempts = 0;
|
|
|
+ protected $watchKeys = array();
|
|
|
+ protected $modeCAS = false;
|
|
|
|
|
|
/**
|
|
|
* @param ClientInterface $client Client instance used by the transaction.
|
|
@@ -50,61 +46,13 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
public function __construct(ClientInterface $client, array $options = null)
|
|
|
{
|
|
|
- $this->checkCapabilities($client);
|
|
|
- $this->options = $options ?: array();
|
|
|
- $this->client = $client;
|
|
|
- $this->reset();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Sets the internal state flags.
|
|
|
- *
|
|
|
- * @param int $flags Set of flags
|
|
|
- */
|
|
|
- protected function setState($flags)
|
|
|
- {
|
|
|
- $this->state = $flags;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Gets the internal state flags.
|
|
|
- *
|
|
|
- * @return int
|
|
|
- */
|
|
|
- protected function getState()
|
|
|
- {
|
|
|
- return $this->state;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Sets one or more flags.
|
|
|
- *
|
|
|
- * @param int $flags Set of flags
|
|
|
- */
|
|
|
- protected function flagState($flags)
|
|
|
- {
|
|
|
- $this->state |= $flags;
|
|
|
- }
|
|
|
+ $this->preconditions($client);
|
|
|
+ $this->configure($client, $options ?: array());
|
|
|
|
|
|
- /**
|
|
|
- * Resets one or more flags.
|
|
|
- *
|
|
|
- * @param int $flags Set of flags
|
|
|
- */
|
|
|
- protected function unflagState($flags)
|
|
|
- {
|
|
|
- $this->state &= ~$flags;
|
|
|
- }
|
|
|
+ $this->client = $client;
|
|
|
+ $this->state = new MultiExecState();
|
|
|
|
|
|
- /**
|
|
|
- * Checks is a flag is set.
|
|
|
- *
|
|
|
- * @param int $flags Flag
|
|
|
- * @return Boolean
|
|
|
- */
|
|
|
- protected function checkState($flags)
|
|
|
- {
|
|
|
- return ($this->state & $flags) === $flags;
|
|
|
+ $this->reset();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -113,7 +61,7 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*
|
|
|
* @param ClientInterface $client Client instance used by the transaction object.
|
|
|
*/
|
|
|
- private function checkCapabilities(ClientInterface $client)
|
|
|
+ private function preconditions(ClientInterface $client)
|
|
|
{
|
|
|
if ($client->getConnection() instanceof AggregatedConnectionInterface) {
|
|
|
throw new NotSupportedException(
|
|
@@ -121,24 +69,37 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- $profile = $client->getProfile();
|
|
|
-
|
|
|
- if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
|
|
|
+ if (!$client->getProfile()->supportsCommands(array('multi', 'exec', 'discard'))) {
|
|
|
throw new NotSupportedException(
|
|
|
'The current profile does not support MULTI, EXEC and DISCARD'
|
|
|
);
|
|
|
}
|
|
|
-
|
|
|
- $this->canWatch = $profile->supportsCommands(array('watch', 'unwatch'));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Checks if WATCH and UNWATCH are supported by the server profile.
|
|
|
- */
|
|
|
- private function isWatchSupported()
|
|
|
+ * Configures the transaction using the provided options.
|
|
|
+ *
|
|
|
+ * @param ClientInterface $client Underlying client instance.
|
|
|
+ * @param array $options Array of options for the transaction.
|
|
|
+ **/
|
|
|
+ protected function configure(ClientInterface $client, array $options)
|
|
|
{
|
|
|
- if ($this->canWatch === false) {
|
|
|
- throw new NotSupportedException('The current profile does not support WATCH and UNWATCH');
|
|
|
+ if (isset($options['exceptions'])) {
|
|
|
+ $this->exceptions = (bool) $options['exceptions'];
|
|
|
+ } else {
|
|
|
+ $this->exceptions = $client->getOptions()->exceptions;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isset($options['cas'])) {
|
|
|
+ $this->modeCAS = (bool) $options['cas'];
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isset($options['watch']) && $keys = $options['watch']) {
|
|
|
+ $this->watchKeys = $keys;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isset($options['retry'])) {
|
|
|
+ $this->attempts = (int) $options['retry'];
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -147,7 +108,7 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
protected function reset()
|
|
|
{
|
|
|
- $this->setState(self::STATE_RESET);
|
|
|
+ $this->state->reset();
|
|
|
$this->commands = new SplQueue();
|
|
|
}
|
|
|
|
|
@@ -156,32 +117,31 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
protected function initialize()
|
|
|
{
|
|
|
- if ($this->checkState(self::STATE_INITIALIZED)) {
|
|
|
+ if ($this->state->isInitialized()) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- $options = $this->options;
|
|
|
-
|
|
|
- if (isset($options['cas']) && $options['cas']) {
|
|
|
- $this->flagState(self::STATE_CAS);
|
|
|
+ if ($this->modeCAS) {
|
|
|
+ $this->state->flag(MultiExecState::CAS);
|
|
|
}
|
|
|
- if (isset($options['watch'])) {
|
|
|
- $this->watch($options['watch']);
|
|
|
+
|
|
|
+ if ($this->watchKeys) {
|
|
|
+ $this->watch($this->watchKeys);
|
|
|
}
|
|
|
|
|
|
- $cas = $this->checkState(self::STATE_CAS);
|
|
|
- $discarded = $this->checkState(self::STATE_DISCARDED);
|
|
|
+ $cas = $this->state->isCAS();
|
|
|
+ $discarded = $this->state->isDiscarded();
|
|
|
|
|
|
if (!$cas || ($cas && $discarded)) {
|
|
|
- $this->client->multi();
|
|
|
+ $this->call('multi');
|
|
|
|
|
|
if ($discarded) {
|
|
|
- $this->unflagState(self::STATE_CAS);
|
|
|
+ $this->state->unflag(MultiExecState::CAS);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- $this->unflagState(self::STATE_DISCARDED);
|
|
|
- $this->flagState(self::STATE_INITIALIZED);
|
|
|
+ $this->state->unflag(MultiExecState::DISCARDED);
|
|
|
+ $this->state->flag(MultiExecState::INITIALIZED);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -199,6 +159,25 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
return $response;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Sends a Redis command bypassing the transaction logic.
|
|
|
+ *
|
|
|
+ * @param string $method Command ID.
|
|
|
+ * @param array $arguments Arguments for the command.
|
|
|
+ * @return mixed
|
|
|
+ */
|
|
|
+ protected function call($commandID, $arguments = array())
|
|
|
+ {
|
|
|
+ $command = $this->client->createCommand($commandID, $arguments);
|
|
|
+ $response = $this->client->executeCommand($command);
|
|
|
+
|
|
|
+ if ($response instanceof Response\Error) {
|
|
|
+ throw new Response\ServerException($response->getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ return $response;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Executes the specified Redis command.
|
|
|
*
|
|
@@ -210,12 +189,12 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
$this->initialize();
|
|
|
$response = $this->client->executeCommand($command);
|
|
|
|
|
|
- if ($this->checkState(self::STATE_CAS)) {
|
|
|
+ if ($this->state->isCAS()) {
|
|
|
return $response;
|
|
|
}
|
|
|
|
|
|
if (!$response instanceof Response\StatusQueued) {
|
|
|
- $this->onProtocolError('The server did not respond with a QUEUED status reply');
|
|
|
+ $this->onProtocolError('The server did not respond with a QUEUED status response');
|
|
|
}
|
|
|
|
|
|
$this->commands->enqueue($command);
|
|
@@ -231,16 +210,19 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
public function watch($keys)
|
|
|
{
|
|
|
- $this->isWatchSupported();
|
|
|
+ if (!$this->client->getProfile()->supportsCommand('WATCH')) {
|
|
|
+ throw new NotSupportedException('WATCH is not supported by the current profile');
|
|
|
+ }
|
|
|
|
|
|
- if ($this->checkState(self::STATE_INITIALIZED) && !$this->checkState(self::STATE_CAS)) {
|
|
|
+ if ($this->state->isWatchAllowed()) {
|
|
|
throw new ClientException('WATCH after MULTI is not allowed');
|
|
|
}
|
|
|
|
|
|
- $reply = $this->client->watch($keys);
|
|
|
- $this->flagState(self::STATE_WATCH);
|
|
|
+ $response = $this->call('watch', array($keys));
|
|
|
+
|
|
|
+ $this->state->flag(MultiExecState::WATCH);
|
|
|
|
|
|
- return $reply;
|
|
|
+ return $response;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -250,9 +232,9 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
public function multi()
|
|
|
{
|
|
|
- if ($this->checkState(self::STATE_INITIALIZED | self::STATE_CAS)) {
|
|
|
- $this->unflagState(self::STATE_CAS);
|
|
|
- $this->client->multi();
|
|
|
+ if ($this->state->check(MultiExecState::INITIALIZED | MultiExecState::CAS)) {
|
|
|
+ $this->state->unflag(MultiExecState::CAS);
|
|
|
+ $this->call('multi');
|
|
|
} else {
|
|
|
$this->initialize();
|
|
|
}
|
|
@@ -267,8 +249,11 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
public function unwatch()
|
|
|
{
|
|
|
- $this->isWatchSupported();
|
|
|
- $this->unflagState(self::STATE_WATCH);
|
|
|
+ if (!$this->client->getProfile()->supportsCommand('WATCH')) {
|
|
|
+ throw new NotSupportedException('UNWATCH is not supported by the current profile');
|
|
|
+ }
|
|
|
+
|
|
|
+ $this->state->unflag(MultiExecState::WATCH);
|
|
|
$this->__call('unwatch', array());
|
|
|
|
|
|
return $this;
|
|
@@ -282,11 +267,11 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
public function discard()
|
|
|
{
|
|
|
- if ($this->checkState(self::STATE_INITIALIZED)) {
|
|
|
- $command = $this->checkState(self::STATE_CAS) ? 'unwatch' : 'discard';
|
|
|
- $this->client->$command();
|
|
|
+ if ($this->state->isInitialized()) {
|
|
|
+ $this->call($this->state->isCAS() ? 'unwatch' : 'discard');
|
|
|
+
|
|
|
$this->reset();
|
|
|
- $this->flagState(self::STATE_DISCARDED);
|
|
|
+ $this->state->flag(MultiExecState::DISCARDED);
|
|
|
}
|
|
|
|
|
|
return $this;
|
|
@@ -309,24 +294,30 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
private function checkBeforeExecution($callable)
|
|
|
{
|
|
|
- if ($this->checkState(self::STATE_INSIDEBLOCK)) {
|
|
|
- throw new ClientException("Cannot invoke 'execute' or 'exec' inside an active client transaction block");
|
|
|
+ if ($this->state->isExecuting()) {
|
|
|
+ throw new ClientException(
|
|
|
+ 'Cannot invoke "execute" or "exec" inside an active transaction context'
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
if ($callable) {
|
|
|
if (!is_callable($callable)) {
|
|
|
- throw new \InvalidArgumentException('Argument passed must be a callable object');
|
|
|
+ throw new InvalidArgumentException('Argument passed must be a callable object');
|
|
|
}
|
|
|
|
|
|
if (!$this->commands->isEmpty()) {
|
|
|
$this->discard();
|
|
|
- throw new ClientException('Cannot execute a transaction block after using fluent interface');
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- if (isset($this->options['retry']) && !isset($callable)) {
|
|
|
+ throw new ClientException(
|
|
|
+ 'Cannot execute a transaction block after using fluent interface
|
|
|
+ ');
|
|
|
+ }
|
|
|
+ } else if ($this->attempts) {
|
|
|
$this->discard();
|
|
|
- throw new \InvalidArgumentException('Automatic retries can be used only when a transaction block is provided');
|
|
|
+
|
|
|
+ throw new InvalidArgumentException(
|
|
|
+ 'Automatic retries can be used only when a callable block is provided'
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -340,65 +331,58 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
{
|
|
|
$this->checkBeforeExecution($callable);
|
|
|
|
|
|
- $reply = null;
|
|
|
- $values = array();
|
|
|
- $attempts = isset($this->options['retry']) ? (int) $this->options['retry'] : 0;
|
|
|
+ $execResponse = null;
|
|
|
+ $attempts = $this->attempts;
|
|
|
|
|
|
do {
|
|
|
- if ($callable !== null) {
|
|
|
+ if ($callable) {
|
|
|
$this->executeTransactionBlock($callable);
|
|
|
}
|
|
|
|
|
|
if ($this->commands->isEmpty()) {
|
|
|
- if ($this->checkState(self::STATE_WATCH)) {
|
|
|
+ if ($this->state->isWatching()) {
|
|
|
$this->discard();
|
|
|
}
|
|
|
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- $reply = $this->client->exec();
|
|
|
+ $execResponse = $this->call('exec');
|
|
|
|
|
|
- if ($reply === null) {
|
|
|
+ if ($execResponse === null) {
|
|
|
if ($attempts === 0) {
|
|
|
- $message = 'The current transaction has been aborted by the server';
|
|
|
- throw new AbortedMultiExecException($this, $message);
|
|
|
+ throw new AbortedMultiExecException(
|
|
|
+ $this, 'The current transaction has been aborted by the server'
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
$this->reset();
|
|
|
|
|
|
- if (isset($this->options['on_retry']) && is_callable($this->options['on_retry'])) {
|
|
|
- call_user_func($this->options['on_retry'], $this, $attempts);
|
|
|
- }
|
|
|
-
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
break;
|
|
|
} while ($attempts-- > 0);
|
|
|
|
|
|
+ $response = array();
|
|
|
$commands = $this->commands;
|
|
|
- $size = count($reply);
|
|
|
+ $size = count($execResponse);
|
|
|
|
|
|
if ($size !== count($commands)) {
|
|
|
- $this->onProtocolError("EXEC returned an unexpected number of replies");
|
|
|
+ $this->onProtocolError('EXEC returned an unexpected number of response items');
|
|
|
}
|
|
|
|
|
|
- $clientOpts = $this->client->getOptions();
|
|
|
- $useExceptions = isset($clientOpts->exceptions) ? $clientOpts->exceptions : true;
|
|
|
-
|
|
|
for ($i = 0; $i < $size; $i++) {
|
|
|
- $commandReply = $reply[$i];
|
|
|
+ $cmdResponse = $execResponse[$i];
|
|
|
|
|
|
- if ($commandReply instanceof Response\ErrorInterface && $useExceptions) {
|
|
|
- $message = $commandReply->getMessage();
|
|
|
- throw new Response\ServerException($message);
|
|
|
+ if ($cmdResponse instanceof Response\ErrorInterface && $this->exceptions) {
|
|
|
+ throw new Response\ServerException($cmdResponse->getMessage());
|
|
|
}
|
|
|
|
|
|
- $values[$i] = $commands->dequeue()->parseResponse($commandReply);
|
|
|
+ $response[$i] = $commands->dequeue()->parseResponse($cmdResponse);
|
|
|
}
|
|
|
|
|
|
- return $values;
|
|
|
+ return $response;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -408,24 +392,23 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
protected function executeTransactionBlock($callable)
|
|
|
{
|
|
|
- $blockException = null;
|
|
|
- $this->flagState(self::STATE_INSIDEBLOCK);
|
|
|
+ $exception = null;
|
|
|
+ $this->state->flag(MultiExecState::INSIDEBLOCK);
|
|
|
|
|
|
try {
|
|
|
call_user_func($callable, $this);
|
|
|
} catch (CommunicationException $exception) {
|
|
|
- $blockException = $exception;
|
|
|
+ // NOOP
|
|
|
} catch (Response\ServerException $exception) {
|
|
|
- $blockException = $exception;
|
|
|
+ // NOOP
|
|
|
} catch (\Exception $exception) {
|
|
|
- $blockException = $exception;
|
|
|
$this->discard();
|
|
|
}
|
|
|
|
|
|
- $this->unflagState(self::STATE_INSIDEBLOCK);
|
|
|
+ $this->state->unflag(MultiExecState::INSIDEBLOCK);
|
|
|
|
|
|
- if ($blockException !== null) {
|
|
|
- throw $blockException;
|
|
|
+ if ($exception) {
|
|
|
+ throw $exception;
|
|
|
}
|
|
|
}
|
|
|
|