|
@@ -15,6 +15,7 @@ use Predis\Cluster\RedisStrategy as RedisClusterStrategy;
|
|
use Predis\Cluster\StrategyInterface;
|
|
use Predis\Cluster\StrategyInterface;
|
|
use Predis\Command\CommandInterface;
|
|
use Predis\Command\CommandInterface;
|
|
use Predis\Command\RawCommand;
|
|
use Predis\Command\RawCommand;
|
|
|
|
+use Predis\Connection\ConnectionException;
|
|
use Predis\Connection\FactoryInterface;
|
|
use Predis\Connection\FactoryInterface;
|
|
use Predis\Connection\NodeConnectionInterface;
|
|
use Predis\Connection\NodeConnectionInterface;
|
|
use Predis\NotSupportedException;
|
|
use Predis\NotSupportedException;
|
|
@@ -50,6 +51,7 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
|
|
private $slotsMap;
|
|
private $slotsMap;
|
|
private $strategy;
|
|
private $strategy;
|
|
private $connections;
|
|
private $connections;
|
|
|
|
+ private $retryLimit = 5;
|
|
|
|
|
|
/**
|
|
/**
|
|
* @param FactoryInterface $connections Optional connection factory.
|
|
* @param FactoryInterface $connections Optional connection factory.
|
|
@@ -63,6 +65,20 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
|
|
$this->strategy = $strategy ?: new RedisClusterStrategy();
|
|
$this->strategy = $strategy ?: new RedisClusterStrategy();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Sets the maximum number of retries for commands upon server failure.
|
|
|
|
+ *
|
|
|
|
+ * -1 = unlimited retry attempts
|
|
|
|
+ * 0 = no retry attempts (fails immediatly)
|
|
|
|
+ * n = fail only after n retry attempts
|
|
|
|
+ *
|
|
|
|
+ * @param int $retry Number of retry attempts.
|
|
|
|
+ */
|
|
|
|
+ public function setRetryLimit($retry)
|
|
|
|
+ {
|
|
|
|
+ $this->retryLimit = (int) $retry;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* {@inheritdoc}
|
|
* {@inheritdoc}
|
|
*/
|
|
*/
|
|
@@ -452,12 +468,46 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
|
|
return $response;
|
|
return $response;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Retries the execution of a command when failure.
|
|
|
|
+ *
|
|
|
|
+ * @param CommandInterface $command Command instance.
|
|
|
|
+ * @param string $method Actual method.
|
|
|
|
+ *
|
|
|
|
+ * @return mixed
|
|
|
|
+ */
|
|
|
|
+ private function retryCommandOnFailure(CommandInterface $command, $method)
|
|
|
|
+ {
|
|
|
|
+ $retries = 0;
|
|
|
|
+
|
|
|
|
+ RETRY_COMMAND: {
|
|
|
|
+ try {
|
|
|
|
+ $response = $this->getConnection($command)->$method($command);
|
|
|
|
+ } catch (ConnectionException $exception) {
|
|
|
|
+ $connection = $exception->getConnection();
|
|
|
|
+ $connection->disconnect();
|
|
|
|
+
|
|
|
|
+ $this->remove($connection);
|
|
|
|
+ $this->askSlotsMap();
|
|
|
|
+
|
|
|
|
+ if ($retries === $this->retryLimit) {
|
|
|
|
+ throw $exception;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ++$retries;
|
|
|
|
+ goto RETRY_COMMAND;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return $response;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* {@inheritdoc}
|
|
* {@inheritdoc}
|
|
*/
|
|
*/
|
|
public function writeRequest(CommandInterface $command)
|
|
public function writeRequest(CommandInterface $command)
|
|
{
|
|
{
|
|
- $this->getConnection($command)->writeRequest($command);
|
|
|
|
|
|
+ $this->retryCommandOnFailure($command, __FUNCTION__);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -465,7 +515,7 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
|
|
*/
|
|
*/
|
|
public function readResponse(CommandInterface $command)
|
|
public function readResponse(CommandInterface $command)
|
|
{
|
|
{
|
|
- return $this->getConnection($command)->readResponse($command);
|
|
|
|
|
|
+ return $this->retryCommandOnFailure($command, __FUNCTION__);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -473,8 +523,7 @@ class RedisCluster implements ClusterInterface, \IteratorAggregate, \Countable
|
|
*/
|
|
*/
|
|
public function executeCommand(CommandInterface $command)
|
|
public function executeCommand(CommandInterface $command)
|
|
{
|
|
{
|
|
- $connection = $this->getConnection($command);
|
|
|
|
- $response = $connection->executeCommand($command);
|
|
|
|
|
|
+ $response = $this->retryCommandOnFailure($command, __FUNCTION__);
|
|
|
|
|
|
if ($response instanceof ErrorResponseInterface) {
|
|
if ($response instanceof ErrorResponseInterface) {
|
|
return $this->onErrorResponse($command, $response);
|
|
return $this->onErrorResponse($command, $response);
|