|
@@ -11,6 +11,7 @@
|
|
|
|
|
|
namespace Predis\Transaction;
|
|
|
|
|
|
+use InvalidArgumentException;
|
|
|
use SplQueue;
|
|
|
use Predis\BasicClientInterface;
|
|
|
use Predis\ClientException;
|
|
@@ -31,11 +32,14 @@ use Predis\Protocol\ProtocolException;
|
|
|
class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
{
|
|
|
private $state;
|
|
|
- private $canWatch;
|
|
|
|
|
|
protected $client;
|
|
|
- protected $options;
|
|
|
protected $commands;
|
|
|
+ protected $canwatch = false;
|
|
|
+ protected $exceptions = true;
|
|
|
+ protected $attempts = 0;
|
|
|
+ protected $watchKeys = array();
|
|
|
+ protected $modeCAS = false;
|
|
|
|
|
|
/**
|
|
|
* @param ClientInterface $client Client instance used by the transaction.
|
|
@@ -43,9 +47,9 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
public function __construct(ClientInterface $client, array $options = null)
|
|
|
{
|
|
|
- $this->checkCapabilities($client);
|
|
|
+ $this->preconditions($client);
|
|
|
+ $this->configure($client, $options ?: array());
|
|
|
|
|
|
- $this->options = $options ?: array();
|
|
|
$this->client = $client;
|
|
|
$this->state = new MultiExecState();
|
|
|
|
|
@@ -58,7 +62,7 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*
|
|
|
* @param ClientInterface $client Client instance used by the transaction object.
|
|
|
*/
|
|
|
- private function checkCapabilities(ClientInterface $client)
|
|
|
+ private function preconditions(ClientInterface $client)
|
|
|
{
|
|
|
if ($client->getConnection() instanceof AggregatedConnectionInterface) {
|
|
|
throw new NotSupportedException(
|
|
@@ -66,25 +70,40 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- $profile = $client->getProfile();
|
|
|
-
|
|
|
- if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
|
|
|
+ if (!$client->getProfile()->supportsCommands(array('multi', 'exec', 'discard'))) {
|
|
|
throw new NotSupportedException(
|
|
|
'The current profile does not support MULTI, EXEC and DISCARD'
|
|
|
);
|
|
|
}
|
|
|
-
|
|
|
- $this->canWatch = $profile->supportsCommands(array('watch', 'unwatch'));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Checks if WATCH and UNWATCH are supported by the server profile.
|
|
|
- */
|
|
|
- private function isWatchSupported()
|
|
|
+ * Configures the transaction using the provided options.
|
|
|
+ *
|
|
|
+ * @param ClientInterface $client Underlying client instance.
|
|
|
+ * @param array $options Array of options for the transaction.
|
|
|
+ **/
|
|
|
+ protected function configure(ClientInterface $client, array $options)
|
|
|
{
|
|
|
- if ($this->canWatch === false) {
|
|
|
- throw new NotSupportedException('The current profile does not support WATCH and UNWATCH');
|
|
|
+ if (isset($options['exceptions'])) {
|
|
|
+ $this->exceptions = (bool) $options['exceptions'];
|
|
|
+ } else {
|
|
|
+ $this->exceptions = $client->getOptions()->exceptions;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isset($options['cas'])) {
|
|
|
+ $this->modeCAS = (bool) $options['cas'];
|
|
|
}
|
|
|
+
|
|
|
+ if (isset($options['watch']) && $keys = $options['watch']) {
|
|
|
+ $this->watchKeys = $keys;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isset($options['retry'])) {
|
|
|
+ $this->attempts = (int) $options['retry'];
|
|
|
+ }
|
|
|
+
|
|
|
+ $this->canwatch = $client->getProfile()->supportsCommands(array('watch', 'unwatch'));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -105,13 +124,12 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- $options = $this->options;
|
|
|
-
|
|
|
- if (isset($options['cas']) && $options['cas']) {
|
|
|
+ if ($this->modeCAS) {
|
|
|
$this->state->flag(MultiExecState::CAS);
|
|
|
}
|
|
|
- if (isset($options['watch'])) {
|
|
|
- $this->watch($options['watch']);
|
|
|
+
|
|
|
+ if ($this->watchKeys) {
|
|
|
+ $this->watch($this->watchKeys);
|
|
|
}
|
|
|
|
|
|
$cas = $this->state->isCAS();
|
|
@@ -160,7 +178,7 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
}
|
|
|
|
|
|
if (!$response instanceof Response\StatusQueued) {
|
|
|
- $this->onProtocolError('The server did not respond with a QUEUED status reply');
|
|
|
+ $this->onProtocolError('The server did not respond with a QUEUED status response');
|
|
|
}
|
|
|
|
|
|
$this->commands->enqueue($command);
|
|
@@ -176,16 +194,18 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
public function watch($keys)
|
|
|
{
|
|
|
- $this->isWatchSupported();
|
|
|
+ if (!$this->canwatch) {
|
|
|
+ throw new NotSupportedException('WATCH is not supported by the current profile');
|
|
|
+ }
|
|
|
|
|
|
if ($this->state->isWatchAllowed()) {
|
|
|
throw new ClientException('WATCH after MULTI is not allowed');
|
|
|
}
|
|
|
|
|
|
- $reply = $this->client->watch($keys);
|
|
|
+ $response = $this->client->watch($keys);
|
|
|
$this->state->flag(MultiExecState::WATCH);
|
|
|
|
|
|
- return $reply;
|
|
|
+ return $response;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -212,7 +232,10 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
public function unwatch()
|
|
|
{
|
|
|
- $this->isWatchSupported();
|
|
|
+ if (!$this->canwatch) {
|
|
|
+ throw new NotSupportedException('UNWATCH is not supported by the current profile');
|
|
|
+ }
|
|
|
+
|
|
|
$this->state->unflag(MultiExecState::WATCH);
|
|
|
$this->__call('unwatch', array());
|
|
|
|
|
@@ -228,8 +251,7 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
public function discard()
|
|
|
{
|
|
|
if ($this->state->isInitialized()) {
|
|
|
- $command = $this->state->isCAS() ? 'unwatch' : 'discard';
|
|
|
- $this->client->$command();
|
|
|
+ $this->client->{$this->state->isCAS() ? 'unwatch' : 'discard'}();
|
|
|
$this->reset();
|
|
|
$this->state->flag(MultiExecState::DISCARDED);
|
|
|
}
|
|
@@ -255,23 +277,29 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
private function checkBeforeExecution($callable)
|
|
|
{
|
|
|
if ($this->state->isExecuting()) {
|
|
|
- throw new ClientException("Cannot invoke 'execute' or 'exec' inside an active client transaction block");
|
|
|
+ throw new ClientException(
|
|
|
+ 'Cannot invoke "execute" or "exec" inside an active transaction context'
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
if ($callable) {
|
|
|
if (!is_callable($callable)) {
|
|
|
- throw new \InvalidArgumentException('Argument passed must be a callable object');
|
|
|
+ throw new InvalidArgumentException('Argument passed must be a callable object');
|
|
|
}
|
|
|
|
|
|
if (!$this->commands->isEmpty()) {
|
|
|
$this->discard();
|
|
|
- throw new ClientException('Cannot execute a transaction block after using fluent interface');
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- if (isset($this->options['retry']) && !isset($callable)) {
|
|
|
+ throw new ClientException(
|
|
|
+ 'Cannot execute a transaction block after using fluent interface
|
|
|
+ ');
|
|
|
+ }
|
|
|
+ } else if ($this->attempts) {
|
|
|
$this->discard();
|
|
|
- throw new \InvalidArgumentException('Automatic retries can be used only when a transaction block is provided');
|
|
|
+
|
|
|
+ throw new InvalidArgumentException(
|
|
|
+ 'Automatic retries can be used only when a callable block is provided'
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -285,12 +313,11 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
{
|
|
|
$this->checkBeforeExecution($callable);
|
|
|
|
|
|
- $reply = null;
|
|
|
- $values = array();
|
|
|
- $attempts = isset($this->options['retry']) ? (int) $this->options['retry'] : 0;
|
|
|
+ $execResponse = null;
|
|
|
+ $attempts = $this->attempts;
|
|
|
|
|
|
do {
|
|
|
- if ($callable !== null) {
|
|
|
+ if ($callable) {
|
|
|
$this->executeTransactionBlock($callable);
|
|
|
}
|
|
|
|
|
@@ -302,48 +329,42 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- $reply = $this->client->exec();
|
|
|
+ $execResponse = $this->client->exec();
|
|
|
|
|
|
- if ($reply === null) {
|
|
|
+ if ($execResponse === null) {
|
|
|
if ($attempts === 0) {
|
|
|
- $message = 'The current transaction has been aborted by the server';
|
|
|
- throw new AbortedMultiExecException($this, $message);
|
|
|
+ throw new AbortedMultiExecException(
|
|
|
+ $this, 'The current transaction has been aborted by the server'
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
$this->reset();
|
|
|
|
|
|
- if (isset($this->options['on_retry']) && is_callable($this->options['on_retry'])) {
|
|
|
- call_user_func($this->options['on_retry'], $this, $attempts);
|
|
|
- }
|
|
|
-
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
break;
|
|
|
} while ($attempts-- > 0);
|
|
|
|
|
|
+ $response = array();
|
|
|
$commands = $this->commands;
|
|
|
- $size = count($reply);
|
|
|
+ $size = count($execResponse);
|
|
|
|
|
|
if ($size !== count($commands)) {
|
|
|
- $this->onProtocolError("EXEC returned an unexpected number of replies");
|
|
|
+ $this->onProtocolError('EXEC returned an unexpected number of response items');
|
|
|
}
|
|
|
|
|
|
- $clientOpts = $this->client->getOptions();
|
|
|
- $useExceptions = isset($clientOpts->exceptions) ? $clientOpts->exceptions : true;
|
|
|
-
|
|
|
for ($i = 0; $i < $size; $i++) {
|
|
|
- $commandReply = $reply[$i];
|
|
|
+ $cmdResponse = $execResponse[$i];
|
|
|
|
|
|
- if ($commandReply instanceof Response\ErrorInterface && $useExceptions) {
|
|
|
- $message = $commandReply->getMessage();
|
|
|
- throw new Response\ServerException($message);
|
|
|
+ if ($cmdResponse instanceof Response\ErrorInterface && $this->exceptions) {
|
|
|
+ throw new Response\ServerException($cmdResponse->getMessage());
|
|
|
}
|
|
|
|
|
|
- $values[$i] = $commands->dequeue()->parseResponse($commandReply);
|
|
|
+ $response[$i] = $commands->dequeue()->parseResponse($cmdResponse);
|
|
|
}
|
|
|
|
|
|
- return $values;
|
|
|
+ return $response;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -353,24 +374,23 @@ class MultiExec implements BasicClientInterface, ExecutableContextInterface
|
|
|
*/
|
|
|
protected function executeTransactionBlock($callable)
|
|
|
{
|
|
|
- $blockException = null;
|
|
|
+ $exception = null;
|
|
|
$this->state->flag(MultiExecState::INSIDEBLOCK);
|
|
|
|
|
|
try {
|
|
|
call_user_func($callable, $this);
|
|
|
} catch (CommunicationException $exception) {
|
|
|
- $blockException = $exception;
|
|
|
+ // NOOP
|
|
|
} catch (Response\ServerException $exception) {
|
|
|
- $blockException = $exception;
|
|
|
+ // NOOP
|
|
|
} catch (\Exception $exception) {
|
|
|
- $blockException = $exception;
|
|
|
$this->discard();
|
|
|
}
|
|
|
|
|
|
$this->state->unflag(MultiExecState::INSIDEBLOCK);
|
|
|
|
|
|
- if ($blockException !== null) {
|
|
|
- throw $blockException;
|
|
|
+ if ($exception) {
|
|
|
+ throw $exception;
|
|
|
}
|
|
|
}
|
|
|
|